How is the experimental Create(iterator) redundant with .NET 4.5?
-
Tuesday, August 21, 2012 8:26 AM
Hi Rx Team,
I posted this question a few days ago as a comment on the Rx 2.0 RTW blog post, but it hasn't shown up yet (this has happened to me before - I think it may be a problem with the blog software / IE). My apologies for repeating it here, if you had decided to exclude it during moderation, though it seems like a reasonable question to ask. :P
You wrote the following:
> Create for use with iterators (largely redundant due to the introduction of the new async features)
How is it redundant given that Task doesn't model a reactive sequence (cardinality > 1)?
Is the following analysis flawed?
http://davesexton.com/blog/post/async-iterators.aspx
In summary, my conclusion was that Create(iterator) is appropriate for writing truly reactive asynchronous iterators using imperative-style programming, which can't be accomplished using async/await or IAsyncEnumerable.
Thanks,
Dave
- Edited by Dave Sexton Tuesday, August 21, 2012 7:49 PM Improved some wording
All Replies
-
Tuesday, August 21, 2012 6:30 PMOwner
All we're really doing with the IObservable<object> sequences returned from the iterator is a Concat, i.e. a one-at-a-time awaiting of their completion. As such, this is only used for the OnCompleted or OnError messages, for which a Task can be used perfectly fine. At the same time, an iterator used to drive an asynchronous computation is really the poor man's await; instead we can use the state machine generated by await to achieve the same effect. Finally, the result type we want is an IObservable<T>, not an IEnumerable<T> or an IAsyncEnumerable<T>, so the use of an iterator was merely a workaround.
Below is an example on how to achieve a similar effect using the Create method (in Beta/RC called CreateAsync):
var res = Observable.Create<int>(async observer => { var i = 0; while (true) { await Task.Delay(1000); observer.OnNext(i++); } });Hope this helps
using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }
- Edited by Bart De Smet [MSFT]Owner Tuesday, August 21, 2012 6:30 PM
- Proposed As Answer by Bart De Smet [MSFT]Owner Tuesday, August 21, 2012 6:31 PM
- Marked As Answer by Dave Sexton Tuesday, August 21, 2012 7:01 PM
-
Tuesday, August 21, 2012 7:01 PM
Hi Bart,
Thanks for the explanation. I see now that my post was biased toward trying to prove that async iterators should be included as a native C# feature based on the current yield syntax rather than using a static factory method, though I failed to see the simpler alternative provided by the await syntax within Rx's new Create(Async) static factory method.
Your code is a nice example of how Rx is still capable of producing a truly reactive asynchronous iterator without using native C# iterators, though I think the conclusion to my post has merit: it would be nice (though not absolutely necessary) if C# natively supported asynchronous iterators as I've shown in my blog post; e.g., based on your example:
async IObservable<int> Foo() { var i = 0; while (true) { await Task.Delay(1000); yield return i++; } }Thanks,
Dave -
Tuesday, August 28, 2012 8:03 AM
Gentlemen,
Could either of you provide some guidance on how you would unit test this kind of operation?
When using schedulers (in v1) I would substitute in a TestScheduler to control time. Here we don't have that seam for substituting a different implementation.
Regards
Lee
Lee Campbell http://LeeCampbell.blogspot.com
-
Tuesday, August 28, 2012 9:03 AM
Hi Lee,
Perhaps await an observable instead:
var res = Observable.Create<int>(async observer => { var i = 0; while (true) { await Observable.Timer(TimeSpan.FromSeconds(1000), testScheduler); observer.OnNext(i++); } });- Dave

