none
ExecutionContext does not flow consistently through Observables

    Question

  • So, I've got a server app that is a fusion of async/await, traditional TPL, threads, and Rx.  Clients make socket connections and send asynchronous requests that flow through all of the above during their processing.

    I'm trying to get some contextual data for each request in the ambient logical call context to flow through the system as the request is processed.  This ambient context will be used for logging and security purposes.  The .NET way of doing this is to make use of the ExecutionContext (and the LogicalCallContext which is managed by the ExecutionContext).  All (most?) of the built-in .NET asynchronous facilities ensure that the ExecutionContext flows with your asynchronous code.  So when you need to wait for a task to finish, your continuation will have access to the same context that was in place when you started waiting for the continuation, even if the task that you waited upon had a different context.

    Background info on ExecutionContext and how it flows:  Stephen Toub article.

    Background info on how the LogicalCallContext works: Stephen Cleary article.

    So, I've done some testing on how ExecutionContext flows with Rx.  And the results are inconsistent.

    ALL MY TESTING IS DONE WITH .NET 4.5, RX 2.2.4


    Lets Define a Context

    For the purposes of this discussion, lets define a helper that lets us store a String in our logical context:

    public static class MyContext
    {
      private static readonly string _KEY = "my-context";
     
      public static string Value
      {
        get { return CallContext.LogicalGetData(_KEY) as string ?? "not set"; }
        set { CallContext.LogicalSetData(_KEY, value); }
      }
    }
     
    // Usage
    MyContext.Value = "some value";
    Console.WriteLine("context={0}", MyContext.Value);


    I'll use this MyContext for the remainder of this discussion.

    Async/Await

    Consider this code:

    private async Task Observe(Task<string> job)
    {
      MyContext.Value = "observe";
      await job;
      Console.WriteLine(MyContext.Value);
    }
    
    public void Run()
    {
      MyContext.Value = "global";
      Observe(Task.Delay(1000)).Wait();
      Console.WriteLine(MyContext.Value);
    }
    


    What should print when we call Run()?

    observe
    global


    This shows how context flows and how logical call context is scoped.

    Rx

    Now lets look at an Rx version.  Note I'm still using async/await to create a separate logical scope for the context, so that Run() can have a different context than Observe().  async/await just happens to be the easiest way to accomplish that.

    private async Task Observe<T>(IObservable<T> source)
    {
      MyContext.Value = "observe";
      var tcs = new TaskCompletionSource<int>();
      source.Subscribe(_ => {}, () =>
      {
        Console.WriteLine(MyContext.Value);
        tcs.TrySetResult(0);
      });
      await tcs.Task;
    }
    
    public void Run()
    {
      MyContext.Value = "global";
      var o = // ...
      Observe(o).Wait();
      Console.WriteLine(MyContext.Value);
    }


    What should print now?  I'd expect to get the same result as I did with the Task version.  I'd like to see "observe", "global" because I'd like my observer's callbacks to run within the observer context.  This fits with the way Tasks work.  Task continuations are really just observer callbacks that fire when the task sends out its one notification.  The continuations are observing the asynchronous single-values observable Task.

    But as it turns out, the results are inconsistent.  The result changes depending upon whether source is hot or cold.

    If source is cold (e.g. var o = Observable.Timer(...)), then I get the desired results: "observe", "global".

    If source is hot (e.g. var o = Observable.Timer(...).Publish(); o.Connect();), then I get undesired results: "global", "global".

    What if I make the above source cold?  (e.g. Observable.Timer(...).Publish().RefCount())?  I get the desired results again. "observe", "global"

    Now what if I take that cold observable and make it hot again by first letting another similarly written observer subscribe?  Something like this:

    void Run()
    {
      MyContext.Value = "global";
      var o = Observable.Timer(TimeSpan.FromSeconds(10)).Publish().RefCount();
      var t1 = ObserveOther(o); // ObserveOther has a context of "observeother";
      var t2 = Observe(o);
      Task.WaitAll(t1, t2);
    }

    This time I get "observeother", "global"!  In other words, my Observe() function ends up seeing ObserverOther's context!  If I reverse the order of the calls to Observe() and ObserveOther(), then ObserveOther sees Observe's context.

    What is happening?

    This is because what is really happening is that the notification callbacks are running under whatever context generated the notification.  For cold observables, this is almost always the same context that issued the Subscribe request.  For hot observables, this usually is not the same context.

    Now, what if I set o to this: Observable.Merge(cold1, cold2, cold3, hot1, hot2, hot3)?  Now Observe's OnNext notifications will pickup whatever context happened to produce each notification.  The OnCompleted will use the context of whichever observable finished last.

    This level of inconsistency makes it very difficult to make use of ExecutionContext while using Rx.

    I've written an operator (at the bottom of this post) that can be applied to an observable so that it captures the observer's context and issues notifications in that context.  But this requires a developer to remember to use it and doesn't really match the spirit of the way the .NET async APIs all implicitly do this for you.

    My question is: Should Rx itself be making an effort to flow context the way the rest of .NET flows it?

    FlowObserverExecutionContext

    In all of the above hot examples, just adding a o.FlowObserverExecutionContext() before calling Observe() makes them behave the same as the cold examples.

    public static IObservable<T> FlowObserverExecutionContext<T>(this IObservable<T> source)
    {
        return Observable.Create<T>(observer =>
        {
            // Capture the observer's execution context
            var context = ExecutionContext.Capture();
            if (context == null)
            {
                // Context flow is suppressed.
                return source.Subscribe(observer);
            }
    
            try
            {
                var observerContext = context;
                var subscription = new SingleAssignmentDisposable();
                var disposables = new CompositeDisposable(subscription, observerContext);
                subscription.Disposable = source.Subscribe(
                    value =>
                    {
                        // contexts are only usable once.  So create a copy for each onNext notification
                        using (var c = observerContext.CreateCopy())
                        {
                            // Run the notification with this context
                            ExecutionContext.Run(c, _ => observer.OnNext(value), null);
                        }
                    },
                    error =>
                    {
                        // OnError or OnComplete get called at most once, so we can use the original context copy
                        ExecutionContext.Run(observerContext, _ => observer.OnError(error), null);
                    },
                    () =>
                    {
                        // OnError or OnComplete get called at most once, so we can use the original context copy
                        ExecutionContext.Run(observerContext, o => ((IObserver<T>)o).OnCompleted(), observer);
                    });
    
                context = null; // prevent it from being disposed in finally block below
                return disposables;
            }
            finally
            {
                if (context != null)
                {
                    context.Dispose();
                }
            }
        });
    }
    

    Wednesday, July 02, 2014 2:30 PM

Answers

  • Hi,

    Yes, you're right.  It seems that an extension method is probably the best solution, such as a "native" version of yours.

    Given that Rx has gone on this long apparently without anybody commenting on this issue until now, I'd say that flowing context in Rx is definitely an edge case requirement.  So it would seem that having an opt-in model is preferable.

    As for the performance characteristics, I'm sure that in high-throughput scenarios that don't need to flow context at all, such as real-time trading apps, even a generally negligible cost is probably too costly.

    However, its cost might not be negligible even in general, as hinted to by this article:

    http://msdn.microsoft.com/en-us/magazine/hh456402.aspx

    "There are multiple optimizations in place in the Framework to avoid capturing and running under a captured ExecutionContext when doing so is unnecessary, as doing so can be quite expensive."

    - Dave


    http://davesexton.com/blog

    • Marked as answer by bman654 Wednesday, July 09, 2014 5:17 PM
    Wednesday, July 02, 2014 6:38 PM

All replies

  • Hi,

    This is an interesting problem.  The free-threaded nature of Rx is one of the keys to Rx's performance.  And as you've shown, it's also the reason that ExecutionContext isn't flowed as Task flows it.

    > Should Rx itself be making an effort to flow context the way the rest of .NET flows it?
    > In all of the above hot examples, just adding a o.FlowObserverExecutionContext() before
    > calling Observe() makes them behave the same as the cold examples.

    Personally, I think the answer is no, in general.  The current performance profile is much more important.

    However, perhaps Subject<T> could have a constructor overload with a bool parameter enabling it to capture the ExecutionContext of each observer, similar to your extension method but with less overhead since it already maintains a list of observers; i.e., it could simply maintain a list of tuples instead.

    Then, a Publish overload with a bool parameter could also be defined.

    Essentially, instead of your extension method, you'd just pass true when calling Publish.

    Thoughts?

    Regardless, you should create a work item in the Rx project.  That way the problem is documented, at least.

    - Dave


    http://davesexton.com/blog

    • Proposed as answer by LeeCampbell Friday, July 18, 2014 3:47 PM
    Wednesday, July 02, 2014 4:24 PM
  • Thanks Dave,

    I made a Github issue for it (it is actually unclear if I should create the issue on Github or CodePlex).

    I definitely think moving the code into Rx internals can allow it to be implemented more efficiently.  Even if we just moved the extension method into Rx so that it could be "trusted" and avoid all the additional layers of checks & balances that Rx applies to "untrusted" code.

    I don't think allowing us to construct Subjects with this mode is sufficient.  Like const-ness or async, it eventually forces the option to appear almost everywhere internally and externally.  It seems like an API explosion.  All subjects would need to be modified, not just Subject<T>.  All operators that internally construct subjects would need to be modified.  In some cases I think it would require a different name to avoid overload resolution conflicts (how would we distinguish the new Publish<T>(bool) from Publish<T>(T initialValue)?)  And other observables that might not use Subjects could be affected (Observable.FromEvent* are an example), so you'd end up needing to add this option to the internal Observer implementations...

    I wonder what are the performance characteristics of this.  I dislike needing to clone the context for every notification, but would like to believe that in the long run someone on the project could bend the right ear to get that changed for scenarios like this.  But how expensive is the overhead of a call to context.Run()?  I certainly don't know.  I can try to benchmark it on my machine this week.

    Wednesday, July 02, 2014 5:58 PM
  • Hi,

    Yes, you're right.  It seems that an extension method is probably the best solution, such as a "native" version of yours.

    Given that Rx has gone on this long apparently without anybody commenting on this issue until now, I'd say that flowing context in Rx is definitely an edge case requirement.  So it would seem that having an opt-in model is preferable.

    As for the performance characteristics, I'm sure that in high-throughput scenarios that don't need to flow context at all, such as real-time trading apps, even a generally negligible cost is probably too costly.

    However, its cost might not be negligible even in general, as hinted to by this article:

    http://msdn.microsoft.com/en-us/magazine/hh456402.aspx

    "There are multiple optimizations in place in the Framework to avoid capturing and running under a captured ExecutionContext when doing so is unnecessary, as doing so can be quite expensive."

    - Dave


    http://davesexton.com/blog

    • Marked as answer by bman654 Wednesday, July 09, 2014 5:17 PM
    Wednesday, July 02, 2014 6:38 PM
  • Thanks Dave
    Wednesday, July 09, 2014 5:17 PM
  • I would imagine that if you wanted to have the context flow to the final subscriber you can do what is common in WPF applications and use the SubscribeOn/ObserveOn pattern. Instead of using the standard DispatcherScheduler that you would in WPF, you would just have a ExecutionContextScheduler (maybe one already exists). 

    If this works, then you dont need to change all of the subjects. It seems a more compositional approach, which I feel is more in line with the spirit of LINQ/Rx.

    Lee


    Lee Campbell http://LeeCampbell.blogspot.com

    Friday, July 18, 2014 3:50 PM
  • Interesting idea.  I always forget about the option of creating custom schedulers instead of custom Rx operators.  I'll give it a try.  Thanks
    Friday, July 18, 2014 3:59 PM