locked
How is the experimental Create(iterator) redundant with .NET 4.5? RRS feed

  • Question

  • Hi Rx Team,

    I posted this question a few days ago as a comment on the Rx 2.0 RTW blog post, but it hasn't shown up yet (this has happened to me before - I think it may be a problem with the blog software / IE).  My apologies for repeating it here, if you had decided to exclude it during moderation, though it seems like a reasonable question to ask.  :P

    You wrote the following:

    > Create for use with iterators (largely redundant due to the introduction of the new async features)

    How is it redundant given that Task doesn't model a reactive sequence (cardinality > 1)?

    Is the following analysis flawed?

    http://davesexton.com/blog/post/async-iterators.aspx

    In summary, my conclusion was that Create(iterator) is appropriate for writing truly reactive asynchronous iterators using imperative-style programming, which can't be accomplished using async/await or IAsyncEnumerable.

    Thanks,
    Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Tuesday, August 21, 2012 7:49 PM Improved some wording
    Tuesday, August 21, 2012 8:26 AM

Answers

  • All we're really doing with the IObservable<object> sequences returned from the iterator is a Concat, i.e. a one-at-a-time awaiting of their completion. As such, this is only used for the OnCompleted or OnError messages, for which a Task can be used perfectly fine. At the same time, an iterator used to drive an asynchronous computation is really the poor man's await; instead we can use the state machine generated by await to achieve the same effect. Finally, the result type we want is an IObservable<T>, not an IEnumerable<T> or an IAsyncEnumerable<T>, so the use of an iterator was merely a workaround.

    Below is an example on how to achieve a similar effect using the Create method (in Beta/RC called CreateAsync):

    var res = Observable.Create<int>(async observer =>
    {
        var i = 0;
        while (true)
        {
            await Task.Delay(1000);
            observer.OnNext(i++);
        }
    });
    Hope this helps

    using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }

    Tuesday, August 21, 2012 6:30 PM

All replies

  • All we're really doing with the IObservable<object> sequences returned from the iterator is a Concat, i.e. a one-at-a-time awaiting of their completion. As such, this is only used for the OnCompleted or OnError messages, for which a Task can be used perfectly fine. At the same time, an iterator used to drive an asynchronous computation is really the poor man's await; instead we can use the state machine generated by await to achieve the same effect. Finally, the result type we want is an IObservable<T>, not an IEnumerable<T> or an IAsyncEnumerable<T>, so the use of an iterator was merely a workaround.

    Below is an example on how to achieve a similar effect using the Create method (in Beta/RC called CreateAsync):

    var res = Observable.Create<int>(async observer =>
    {
        var i = 0;
        while (true)
        {
            await Task.Delay(1000);
            observer.OnNext(i++);
        }
    });
    Hope this helps

    using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }

    Tuesday, August 21, 2012 6:30 PM
  • Hi Bart,

    Thanks for the explanation.  I see now that my post was biased toward trying to prove that async iterators should be included as a native C# feature based on the current yield syntax rather than using a static factory method, though I failed to see the simpler alternative provided by the await syntax within Rx's new Create(Async) static factory method.

    Your code is a nice example of how Rx is still capable of producing a truly reactive asynchronous iterator without using native C# iterators, though I think the conclusion to my post has merit: it would be nice (though not absolutely necessary) if C# natively supported asynchronous iterators as I've shown in my blog post; e.g., based on your example:

    async IObservable<int> Foo()
    {
        var i = 0;
        while (true)
        {
            await Task.Delay(1000);
    	yield return i++;
        }	
    }

    Thanks,
    Dave


    http://davesexton.com/blog

    Tuesday, August 21, 2012 7:01 PM
  • Gentlemen,

    Could either of you provide some guidance on how you would unit test this kind of operation?

    When using schedulers (in v1) I would substitute in a TestScheduler to control time. Here we don't have that seam for substituting a different implementation.

    Regards

    Lee


    Lee Campbell http://LeeCampbell.blogspot.com

    Tuesday, August 28, 2012 8:03 AM
  • Hi Lee,

    Perhaps await an observable instead:

    var res = Observable.Create<int>(async observer =>
    {
    	var i = 0;
    	while (true)
    	{
    		await Observable.Timer(TimeSpan.FromSeconds(1000), testScheduler);
    		observer.OnNext(i++);
    	}
    });

    - Dave


    http://davesexton.com/blog

    Tuesday, August 28, 2012 9:03 AM
  • error CS0121: The call is ambiguous between the following methods or properties: 'System.Reactive.Linq.Observable.Create<int>(System.Func<System.IObserver<int>,System.Threading.Tasks.Task<System.Action>>)' and 'System.Reactive.Linq.Observable.Create<int>(System.Func<System.IObserver<int>,System.Threading.Tasks.Task>)'

    I'd have to add return; after the second to last } to make that compile and that would introduce a warning about unreachable code. Boy annoying, can you change that? How could it be mistaking that it might return Task<Action> in the first place. (same problem in VS2012 and VS2013 preview btw)
    Saturday, July 6, 2013 4:43 PM
  • Hi,

    I'm able to reproduce the warning.  I don't recall having this problem in the past though.  I'm currently running VS 2012 with Update 3.  Out of curiosity, are you as well?

    Unfortunately, I don't know of any elegant workarounds.  Perhaps the best approach is to specify the delegate explicitly or create a real method.

    var res = Observable.Create<int>(
    	new Func<IObserver<int>, Task>(
    		async observer =>
    		{
    			var i = 0;
    			while (true)
    			{
    				await Task.Delay(1000);
    				observer.OnNext(i++);
    			}
    		}));

    - Dave


    http://davesexton.com/blog

    Saturday, July 6, 2013 6:42 PM