none
Asynchronous code in OnNext will break the Rx contract

    Question

  • I can't find an asynchronous way to keep a subscription synchronized while the subscription executes asynchronous code.

    Essentially, I want to use and combine Rx with a lot of (standard) async/await code and I really hope there is a first-class way to do it. Examples with Console.WriteLine and Thread.Sleep are nice, but now how does Rx really fit in with .NET 4.5???

    Because sure I can use Task.WaitAll, but that defeats the asynchronicity of my code and will in the end lead to the old problem of having too many threads blocking for !another! thread doing the work that the blocked thread should have been doing.

    Here's some code to show about what I'm trying to do. It yields 2 observations, 1 seconds apart and runs processing on each that takes 2 seconds, writing to the console when items are observed and when starting/stopping processing.

    static void Main(string[] args) {
       Task.WaitAll(TestObservable());
    }
    static async Task TestObservable() {
       var stopwatch = new Stopwatch();
       Action<string, long> writeline = (format, value) =>
         Console.WriteLine("{1,6:0.00}: " + format, value, stopwatch.Elapsed.TotalSeconds);
       stopwatch.Start();
       var subject = new Subject<long>();
       Observable.Interval(TimeSpan.FromSeconds(1))
          .Do(value => writeline("Emitting {0}", value))
          .Take(2)
          .Subscribe(_ => subject.OnNext(_));
       var observable = subject.AsObservable();
       using (observable.Subscribe(async value => {
          writeline("Start Handling {0}", value);
          await Task.Delay(TimeSpan.FromSeconds(2));
          writeline("Stop Handling {0}", value);
       })) {
        await Task.Run(() => Console.ReadKey(true));
       }
    }

    Here's what it writes to the console:

      1.13: Emitting 0
      1.13: Start Handling 0
      2.13: Emitting 1
      2.13: Start Handling 1
      3.14: Stop Handling 0
      4.14: Stop Handling 1

    While reading about the Rx contract, I'd have expected there was a way to produce the following output instead (see Reactive Extensions Design Guidelines 4.2)

      1.15: Emitting 0
      1.16: Start Handling 0
      3.16: Stop Handling 0
      3.16: Emitting 1
      3.16: Start Handling 1
      5.16: Stop Handling 1

    Even though I wouldn't mind (and more or less expected) the second emitting line to happen somewhere near 2 seconds after the stopwatch started.

    Friday, June 28, 2013 12:16 PM

All replies

  • I see what you are trying to do here and I think I understand your confusion. The intersection of TPL and Rx is a tricky one (and I must confess one that I have not fully mastered yet either).

    To reduce your confusion, first lets clear up some things.

    1. No need for the subject in your sample. Lets remove that.

    //LINQPad query sample
    void Main()
    {
    	Task.WaitAll(TestObservable());
    }
    
    // Define other methods and classes here
    static async Task TestObservable() {
       var stopwatch = new Stopwatch();
       Action<string, long> writeline = (format, value) => 
         Console.WriteLine("{1,6:0.00}: " + format, value, stopwatch.Elapsed.TotalSeconds);
       stopwatch.Start();
       var observable = Observable.Interval(TimeSpan.FromSeconds(1))
          .Do(value => writeline("Emitting {0}", value))
          .Take(2);
       
       using (observable.Subscribe(async value => {
          writeline("Start Handling {0}", value);
          await Task.Delay(TimeSpan.FromSeconds(2));
          writeline("Stop Handling {0}", value);
       })) {
        await Task.Run(() => Console.Read());
       }
    }

    2. Next let's remove the mixed metaphor in the subscribe method. As the OnNext calls are serialized, adding async in there will break that paradigm. Here we make the OnNext handler syncho

    void Main()
    {
    	Task.WaitAll(TestObservable());
    }
    
    // Define other methods and classes here
    static async Task TestObservable() {
       var stopwatch = new Stopwatch();
       Action<string, long> writeline = (format, value) => 
         Console.WriteLine("{1,6:0.00}: " + format, value, stopwatch.Elapsed.TotalSeconds);
       stopwatch.Start();
       var observable = Observable.Interval(TimeSpan.FromSeconds(1))
          .Do(value => writeline("Emitting {0}", value))
          .Take(2);
       
       using (observable.Subscribe(value => {
          writeline("Start Handling {0}", value);
          Task.Delay(TimeSpan.FromSeconds(2));
          writeline("Stop Handling {0}", value);
       })) {
        await Task.Run(() => Console.Read());
       }
    }

    Right now we get the output you are looking for, but we are blocking and not leveraging the TPL very well. Here I think you want to bridge to Rx via the ToObservable() operator.

    3. Leverage ToObservable and SelectMany to serialize but not burn threads.

    Here I also add a variation of a Log extension method I find extremely useful in my applications. Sorry that it screws up your output though.

    void Main()
    {
    	Task.WaitAll(TestObservable());
    }
    
    // Define other methods and classes here
    static async Task TestObservable() {
       var stopwatch = new Stopwatch();
       Action<string, long> writeline = (format, value) => 
         Console.WriteLine("{1,6:0.00}: " + format, value, stopwatch.Elapsed.TotalSeconds);
       stopwatch.Start();
       var observable = Observable.Interval(TimeSpan.FromSeconds(1))
          .Do(value => writeline("Emitting {0}", value))
          .Take(2)
          .SelectMany(i=>Task.Delay(TimeSpan.FromSeconds(2))
    		 		    .ToObservable()
    		 		    .Log(string.Format("Task.Delay {0}",i)));
       
       using (observable.Subscribe(value => {
          writeline("done {0}", 0);
       })) 
       {
        await Task.Run(() => Console.Read());
       }
    }
    public static class LoggerExtensions
    {
       public static IObservable<T> Log<T>(this IObservable<T> source, string name)
       {
           return Observable.Using(
               ()=> new Timer(name),
               timer=> Observable.Create<T>(
                   o =>
                   {
                       Console.WriteLine("{0}.Subscribe()", name);
                       var subscription = source
                           .Do(
                               i => Console.WriteLine("{0}.OnNext({1})", name, i),
                               ex => Console.WriteLine("{0}.OnError({1})", name, ex),
                               () => Console.WriteLine("{0}.OnCompleted()", name))
                           .Subscribe(o);
                       var disposal = Disposable.Create(() => Console.WriteLine("{0}.Dispose()", name));
                       return new CompositeDisposable(subscription, disposal);
                   })
               );
       }
    
       private sealed class Timer : IDisposable
       {
           private readonly string _name;
           private readonly Stopwatch _stopwatch;
    
           public Timer(string name)
           {
               _name = name;
               _stopwatch = Stopwatch.StartNew();
           }
    
           public void Dispose()
           {
               _stopwatch.Stop();
               Console.WriteLine("{0} took {1}", _name, _stopwatch.Elapsed);
           }
       }
    }

    Output:

      1.02: Emitting 0
    Task.Delay 0.Subscribe()
      2.02: Emitting 1
    Task.Delay 1.Subscribe()
    Task.Delay 0.OnNext(())
      3.03: done 0
    Task.Delay 0.OnCompleted()
    Task.Delay 0.Dispose()
    Task.Delay 0 took 00:00:02.0006715
    Task.Delay 1.OnNext(())
      4.03: done 0
    Task.Delay 1.OnCompleted()
    Task.Delay 1.Dispose()
    Task.Delay 1 took 00:00:02.0124837
    

    Here we take the processing out of the Subscription and make it part of the sequence. This allows us to stay within the asynchronous sequence paradigm, or staying within the 'Mondad'.

    We do this because we can think of the processing of the value as an Async process. This Async processes can be thought of either as a Task or a Single value Observable Sequence. As Rx excels at composing observable sequences we take advantage of that. We also take advantage of the Rx contact that ensures that sequences are guaranteed to be delivered in order.

    I hope this helps

    Lee Campbell

    www.IntroToRx.com




    Lee Campbell http://LeeCampbell.blogspot.com

    • Proposed as answer by LeeCampbell Friday, June 28, 2013 7:10 PM
    Friday, June 28, 2013 7:09 PM
  • Hi,

    I agree with Lee that there's no need to use a subject for your example.  There's also some unnecessary use of Task.

    I also agree with Lee that the reason you're getting the first output instead of the second is because you're subscribing asynchronously.  That's a fairly common code smell.  It seems that all you're really asking about is why the observer effect isn't happening in your example.  In order to see the output slow down due to observations, you must subscribe synchronously.

    The following example illustrates how the observer effect can slow down subsequent notifications.  Note the call to Wait on Task.Delay, which blocks the observable from pushing the next value for an extended period of time.

    using System;
    using System.Diagnostics;
    using System.Reactive.Linq;
    using System.Threading.Tasks;
    
    namespace RxLabs.Net45
    {
    	class BlockingLab
    	{
    		public static void Main()
    		{
    			var stopwatch = new Stopwatch();
    
    			Action<string, long> writeline = (format, value) =>
    				Console.WriteLine("{1,6:0.00}: " + format, value, stopwatch.Elapsed.TotalSeconds);
    
    			stopwatch.Start();
    
    			var xs = Observable
    				.Interval(TimeSpan.FromSeconds(1))
    				.Do(value => writeline("Emitting {0}", value))
    				.Take(2);
    
    			using (xs.Subscribe(value =>
    				{
    					writeline("Start Handling {0}", value);
    
    					Task.Delay(TimeSpan.FromSeconds(2)).Wait();
    
    					writeline("Stop Handling {0}", value);
    				}))
    			{
    				Console.ReadLine();
    			}
    		}
    	}
    }

    Example Output:

      1.37: Emitting 0
      1.37: Start Handling 0
      3.38: Stop Handling 0
      3.38: Emitting 1
      3.38: Start Handling 1
      5.40: Stop Handling 1

     

    - Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Friday, June 28, 2013 7:47 PM Code formatting
    Friday, June 28, 2013 7:38 PM
  • Well yes but sure the code that I encountered this in was a rewrite (from TPL await/async) to IObservable and this code did not simply perform Task.Delay as processing...

    In fact that Task was reading a NetworkStream which yielded deserialized messages, but upon using IObservable turned out to read data out of order ...that sucked, believe me)

    If the takeaway is that I need to compose everything asynchronous into the IObservable query then I guess I get it, but whether I'll like it...I doubt that, but I'll give it another go.

    Yet it means - from my point of view - that most of the processing will amount to producing another IObservable which no one will consume. That's weird, isn't it. Now the paradigm isn't 'all the way async' but 'all the way Rx' which is a bad thing in my point of view. You see this 'bug' surfacing just because I declare an async anonymous delegate...which is such a powerful option in .NET 4.5 without Rx...

    I mean if I do regular UI processing of mouse events, for the overall usability of event streams (think outside the Rx box), whatever must happen in response to those events 1) has to be able to be regular code as opposed to another observable stream and 2) has to be able to support async/await (or it would not be 'regular code' in .NET 4.5)

    I just learned about Monets - or whatever - and it's all nice and so forth, but must I now conclude that Rx code again imposes rewrites to get code working the way it needs to???

    Friday, June 28, 2013 8:14 PM
  • Thanks Dave,

    I know - I had copied the example here: http://rxwiki.wikidot.com/101samples#toc47

    Now you argue that my code is wrong. Sure, I know. I had already produced your output using Thread.Sleep ...

    But

    The thing that I'm surprised about is that I wanted an asynchronous subscription to be a first-class feature, not a code smell. See my response above.

    There's enough examples that make subscribers process data in parallel if I wanted that, but now I understand there's no way of enabling regular asynchronous code at the 'consumer' end of an observable sequence. That's just a plain pity.

    Can you see why I say that?

    Friday, June 28, 2013 8:20 PM
  • Thanks Lee,

    Next to my other response, can you explain to me the difference between your last output and the output below. Yours produces 'emitting' messages 1 second apart while the example below delays them until after the processing and I can't see why..is it the SelectMany or the Task.ToObservable?

    It's better I think if that NetworkStream reads messages as fast as possible and each producer (I intend to have many) will see them as fast as they are individually.

    Thanks for that amount of work you already put in to rewrite my code)

    Eric

    Friday, June 28, 2013 8:25 PM
  • Hi,

    > If the takeaway is that I need to compose everything asynchronous into the IObservable query [snip]

    Often it's simply a matter of cardinality.

    IObservable<T> represents an asynchronous computation that generates a sequence of values.
    Task<T> represents an asynchronous computation that generates a single value.

    Therefore, IObservable<T> can also represent an asynchronous computation that generates a single value only, thus it provides a superset of Task<T>.

    Choose the model that is appropriate for your requirements.

    > Yet it means - from my point of view - that most of the processing will amount to producing another IObservable which no one will consume.

    How do you figure?  That's actually another code smell in Rx.  Sometimes it's useful to subscribe to an observable for its side-effects only, but not typically.

    > Now the paradigm isn't 'all the way async' but 'all the way Rx' which is a bad thing in my point of view.

    Rx provides an awaitable implementation for IObservable<T> and it provides operators that convert to/from IObservable<T> and Task<T>.

    Furthermore, calling Subscribe is generally asynchronous.  Whether it is or not is the responsibility of the observable, not the observer.

    For example, Observable.Interval uses pooled threads by default, but you can pass in any IScheduler.

    > 1) has to be able to be regular code as opposed to another observable stream

    Do you mean in terms of cardinality?   Clearly if your algorithm potentially needs to generate more than a single value, then IObservable<T> is a better model; otherwise, you may find that Task<T> is more suitable, though not entirely necessary.  In that case, as Lee has shown, you can easily convert a task via ToObservable so that it may be composed with your query.

    > 2) has to be able to support async/await (or it would not be 'regular code' in .NET 4.5)

    Rx offers the ability to write async iterators via an overload of Observable.Create.

    For example (Rx 2.0):

    IObservable<string> xs = 
      (from m in mouseMoves
       select Observable.Create<string>(
         async (observer, cancel) =>
         {
           var r1 = await DoSomethingAsync(cancel);
           string r2 = await DoSomethingElseAsync(r1, cancel);
    
           observer.OnNext(r2);
           observer.OnCompleted();
         }))
      .Switch();
    
    xs.ObserveOnDispatcher().Subscribe(UpdateUI);

    > but must I now conclude that Rx code again imposes rewrites to get code working the way it needs to???

    You must choose a model that is appropriate for your requirements.  If you find that mixing models is helpful, then Rx offers conversion operators and async iterators.

    Edit: Rx also offers overloads of SelectMany that compose observables and tasks, making it very easy to mix models.

    - Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Friday, June 28, 2013 9:37 PM Additional note about SelectMany
    Friday, June 28, 2013 9:13 PM
  • Hi,

    > Now you argue that my code is wrong.

    Well not wrong, per se.  My argument was simply that the §4.2 contract applies to synchronous observations only.  My intention was to show how blocking produces the expected behavior.  Running code asynchronously in a subscription is a code smell, but it's not necessarily wrong.  However, it certainly breaks the §4.2 contract.

    > The thing that I'm surprised about is that I wanted an asynchronous subscription to be a first-class feature, not a code smell.

    Yep.  See my previous reply about cardinality, conversion operators and async iterators.

    It's a code smell because when you introduce your own asynchrony in the call to Subscribe, you're no longer within the query.  Rx has nothing to do with that.  Rx is all about controlling complexity in asynchronous computations through composition.  Subscribe exits the composition.

    The most powerful and common operator in Rx is SelectMany.  It represents sequential asynchronous computation.  If you want to compose together an observable and a task so that they execute sequentially, then you must convert the task into an observable and apply SelectMany(Edit: Rx 2.0 enables composition of tasks to observables without conversion!)

    Note that SelectMany executes the task for each value in the observable, thus if the task introduces concurrency, then you may have tasks running simultaneously.  There are various ways of controlling this if it's not what you want.  For example, you can use Concat instead of SelectMany to avoid concurrency altogether (assuming that you're composing a cold observable, though Task<T> is hot so you may have to wrap it with Observable.Defer).

    Another approach uses Switch to ensure that only the latest observable is active, as shown in my previous reply.

    > now I understand there's no way of enabling regular asynchronous code at the 'consumer' end of an observable sequence.

    Sure there is.  You can add an asynchronous observer as you have, just don't expect Rx to know about it and maintain any of its contracts.  If you want Rx to know about it, then you must compose it into the query; e.g., via SelectMany.

    - Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Friday, June 28, 2013 9:40 PM Note about SelectMany accepting Task
    Friday, June 28, 2013 9:32 PM
  • Hi,

    > It's better I think if that NetworkStream reads messages as fast as possible and each producer (I intend to have many) will
    > see them as fast as they are individually.

    Do you want them to execute concurrently or sequentially?

    If concurrently, then you can use SelectMany.  Alternatively, you can use Merge to limit concurrency.

    If sequentially, then you can use Concat to ensure that all messages are eventually processed.  Alternatively, you can use Switch to drop old messages.

    For example, the SelectMany approach is quite simple:

    IObservable<string> messages = ParseMessagesFromNetworkStream();
    
    var q = 
      from message in messages
      from result in ProcessAsync(message)
      select new { message, result };
    
    q.Subscribe(Log);
    
    ...
    
    async Task<string> ProcessAsync(string message)
    {
      await DoSomething(message);
      await DoSomethingElse(message);
    }

    However, notice that it doesn't integrate with Rx's cancellation model.  That's where async iterators come in handy.  (See a previous reply.)

    - Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Friday, June 28, 2013 9:53 PM Formatting
    Friday, June 28, 2013 9:51 PM
  • ewjnl,

    You can use following extension:

    public static IDisposable SubscribeAsync<T>(
        this IObservable<T> o,
        Func<T, Task> observer,
        Action<Exception> onError,
        Action onCompleted,
        IScheduler scheduler)
    {
        return o.Select(x => Observable.FromAsync(() => observer(x))
                                       .SubscribeOn(scheduler))
                .Concat()
                .Subscribe(_ => {}, onError, onCompleted);
    }


    • Edited by Igor Mikushkin Thursday, March 13, 2014 10:35 AM Added scheduler to prevent stack overflow
    Saturday, February 15, 2014 2:22 PM