Answered by:
How is the experimental Create(iterator) redundant with .NET 4.5?

Question
-
Hi Rx Team,
I posted this question a few days ago as a comment on the Rx 2.0 RTW blog post, but it hasn't shown up yet (this has happened to me before - I think it may be a problem with the blog software / IE). My apologies for repeating it here, if you had decided to exclude it during moderation, though it seems like a reasonable question to ask. :P
You wrote the following:
> Create for use with iterators (largely redundant due to the introduction of the new async features)
How is it redundant given that Task doesn't model a reactive sequence (cardinality > 1)?
Is the following analysis flawed?
http://davesexton.com/blog/post/async-iterators.aspx
In summary, my conclusion was that Create(iterator) is appropriate for writing truly reactive asynchronous iterators using imperative-style programming, which can't be accomplished using async/await or IAsyncEnumerable.
Thanks,
Dave
- Edited by Dave Sexton Tuesday, August 21, 2012 7:49 PM Improved some wording
Tuesday, August 21, 2012 8:26 AM
Answers
-
All we're really doing with the IObservable<object> sequences returned from the iterator is a Concat, i.e. a one-at-a-time awaiting of their completion. As such, this is only used for the OnCompleted or OnError messages, for which a Task can be used perfectly fine. At the same time, an iterator used to drive an asynchronous computation is really the poor man's await; instead we can use the state machine generated by await to achieve the same effect. Finally, the result type we want is an IObservable<T>, not an IEnumerable<T> or an IAsyncEnumerable<T>, so the use of an iterator was merely a workaround.
Below is an example on how to achieve a similar effect using the Create method (in Beta/RC called CreateAsync):
var res = Observable.Create<int>(async observer => { var i = 0; while (true) { await Task.Delay(1000); observer.OnNext(i++); } });
Hope this helps
using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }
- Edited by Bart De Smet [MSFT] Tuesday, August 21, 2012 6:30 PM
- Proposed as answer by Bart De Smet [MSFT] Tuesday, August 21, 2012 6:31 PM
- Marked as answer by Dave Sexton Tuesday, August 21, 2012 7:01 PM
Tuesday, August 21, 2012 6:30 PM
All replies
-
All we're really doing with the IObservable<object> sequences returned from the iterator is a Concat, i.e. a one-at-a-time awaiting of their completion. As such, this is only used for the OnCompleted or OnError messages, for which a Task can be used perfectly fine. At the same time, an iterator used to drive an asynchronous computation is really the poor man's await; instead we can use the state machine generated by await to achieve the same effect. Finally, the result type we want is an IObservable<T>, not an IEnumerable<T> or an IAsyncEnumerable<T>, so the use of an iterator was merely a workaround.
Below is an example on how to achieve a similar effect using the Create method (in Beta/RC called CreateAsync):
var res = Observable.Create<int>(async observer => { var i = 0; while (true) { await Task.Delay(1000); observer.OnNext(i++); } });
Hope this helps
using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }
- Edited by Bart De Smet [MSFT] Tuesday, August 21, 2012 6:30 PM
- Proposed as answer by Bart De Smet [MSFT] Tuesday, August 21, 2012 6:31 PM
- Marked as answer by Dave Sexton Tuesday, August 21, 2012 7:01 PM
Tuesday, August 21, 2012 6:30 PM -
Hi Bart,
Thanks for the explanation. I see now that my post was biased toward trying to prove that async iterators should be included as a native C# feature based on the current yield syntax rather than using a static factory method, though I failed to see the simpler alternative provided by the await syntax within Rx's new Create(Async) static factory method.
Your code is a nice example of how Rx is still capable of producing a truly reactive asynchronous iterator without using native C# iterators, though I think the conclusion to my post has merit: it would be nice (though not absolutely necessary) if C# natively supported asynchronous iterators as I've shown in my blog post; e.g., based on your example:
async IObservable<int> Foo() { var i = 0; while (true) { await Task.Delay(1000); yield return i++; } }
Thanks,
DaveTuesday, August 21, 2012 7:01 PM -
Gentlemen,
Could either of you provide some guidance on how you would unit test this kind of operation?
When using schedulers (in v1) I would substitute in a TestScheduler to control time. Here we don't have that seam for substituting a different implementation.
Regards
Lee
Lee Campbell http://LeeCampbell.blogspot.com
Tuesday, August 28, 2012 8:03 AM -
Hi Lee,
Perhaps await an observable instead:
var res = Observable.Create<int>(async observer => { var i = 0; while (true) { await Observable.Timer(TimeSpan.FromSeconds(1000), testScheduler); observer.OnNext(i++); } });
- Dave
Tuesday, August 28, 2012 9:03 AM -
error CS0121: The call is ambiguous between the following methods or properties: 'System.Reactive.Linq.Observable.Create<int>(System.Func<System.IObserver<int>,System.Threading.Tasks.Task<System.Action>>)' and 'System.Reactive.Linq.Observable.Create<int>(System.Func<System.IObserver<int>,System.Threading.Tasks.Task>)'
I'd have to add return; after the second to last } to make that compile and that would introduce a warning about unreachable code. Boy annoying, can you change that? How could it be mistaking that it might return Task<Action> in the first place. (same problem in VS2012 and VS2013 preview btw)Saturday, July 6, 2013 4:43 PM -
Hi,
I'm able to reproduce the warning. I don't recall having this problem in the past though. I'm currently running VS 2012 with Update 3. Out of curiosity, are you as well?
Unfortunately, I don't know of any elegant workarounds. Perhaps the best approach is to specify the delegate explicitly or create a real method.
var res = Observable.Create<int>( new Func<IObserver<int>, Task>( async observer => { var i = 0; while (true) { await Task.Delay(1000); observer.OnNext(i++); } }));
- Dave
Saturday, July 6, 2013 6:42 PM