none
Continue after await on the same thread RRS feed

  • Question

  • Hi

    I am using EventLoopScheduler to synchronize RX subscriptions. 

    I would like to use async/await on the subscription handling method, but I am concerned that the continuation will occur on a thread other than the EventLoopScheduler's thread.

    Is it possible to ensue that await will return on the same thread in this case?

     

    Thanks

    David

    Sunday, January 1, 2012 1:26 PM

Answers

  • So using await might leave the EventLoopScheduler thread, which could be undesirable and also a bit counter-intuitive.

    The best option I could think of is to write a custom SynchronizationContext and have the EventLoopScheduler set it on its thread.

    I wrote a proof-of-concept here: https://gist.github.com/1554548

    I'll paste it here for your convenience:

        public class RxAsyncTest
        {
            private readonly EventLoopScheduler m_eventLoopScheduler;
            private int m_eventLoopSchedulerThreadId;
    
            public RxAsyncTest()
            {
                m_eventLoopScheduler = new EventLoopScheduler(start => new Thread(() =>
                {
                    m_eventLoopSchedulerThreadId = Thread.CurrentThread.ManagedThreadId;
                    Console.WriteLine("EventLoopScheduler: Thread Id = {0}",
                                      m_eventLoopSchedulerThreadId);
                    start();
                }));
            }
    
            [Test]
            public void Await_in_subscription()
            {
                Console.WriteLine("Main: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId);
                var finishedEvent = new AutoResetEvent(false);
                Observable
                    .Return(Tuple.Create(10, 20), m_eventLoopScheduler)
                    .Subscribe(async tuple =>
                    {
                        Console.WriteLine("Observer before: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId);
                        var result = await AddAsync(tuple.Item1, tuple.Item2);
                        Console.WriteLine("Observer after: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId);
                        finishedEvent.Set();
                    });
    
                finishedEvent.WaitOne();
            }
    
            [Test]
            public void Use_SchedulerSynchronizationContext()
            {
                var schedulerThreadId = -1;
                var scheduler = new SyncContextEventLoopScheduler(
                    start => new Thread(() =>
                    {
                        schedulerThreadId = Thread.CurrentThread.ManagedThreadId;
                        Console.WriteLine("SyncContextEventLoopScheduler: Thread Id = {0}",
                                          schedulerThreadId);
                        start();
                    }));
    
                Console.WriteLine("Test: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId);
                var finishedEvent = new AutoResetEvent(false);
                var threadIds = new List<int>();
                Observable
                    .Return(Tuple.Create(10, 20), scheduler)
                    .Subscribe(async tuple =>
                    {
                        Console.WriteLine("Observer before: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId);
                        var result = await AddWithRxAsync(tuple.Item1, tuple.Item2, scheduler);
                        threadIds.Add(Thread.CurrentThread.ManagedThreadId);
                        Console.WriteLine("Observer after: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId);
                        finishedEvent.Set();
                    });
    
                finishedEvent.WaitOne();
                Assert.AreEqual(schedulerThreadId, threadIds.Distinct().Single());
            }
    
            public Task<int> AddAsync(int x, int y)
            {
                var task = Task.Factory.StartNew(() =>
                {
                    Console.WriteLine("Add: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId);
                    Thread.Sleep(TimeSpan.FromSeconds(1));
                    return x + y;
                });
                return task;
            }
        }
    
        public class SchedulerSynchronizationContext : SynchronizationContext
        {
            private readonly IScheduler m_scheduler;
    
            public SchedulerSynchronizationContext(IScheduler scheduler)
            {
                m_scheduler = scheduler;
            }
    
            public override void Send(SendOrPostCallback callback, object state)
            {
                throw new NotImplementedException("Too lazy to implemenet synchronous invocation now...");
            }
    
            public override void Post(SendOrPostCallback callback, object state)
            {
                m_scheduler.Schedule(() => callback.Invoke(state));
            }
        }
    
        public class SyncContextEventLoopScheduler : IScheduler
        {
            private readonly EventLoopScheduler m_scheduler;
    
            public SyncContextEventLoopScheduler(Func<ThreadStart, Thread> factory)
            {
                m_scheduler = new EventLoopScheduler(factory);
                setSyncContext();
            }
    
            public SyncContextEventLoopScheduler()
            {
                m_scheduler = new EventLoopScheduler();
                setSyncContext();
            }
    
            #region IScheduler Members
    
            public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
            {
                return m_scheduler.Schedule(state, action);
            }
    
            public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
            {
                return m_scheduler.Schedule(state, dueTime, action);
            }
    
            public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime,
                                                Func<IScheduler, TState, IDisposable> action)
            {
                return m_scheduler.Schedule(state, dueTime, action);
            }
    
            public DateTimeOffset Now
            {
                get { return m_scheduler.Now; }
            }
    
            #endregion
    
            private void setSyncContext()
            {
                m_scheduler.Schedule(() =>
                {
                    var syncContext = new SchedulerSynchronizationContext(this);
                    SynchronizationContext.SetSynchronizationContext(syncContext);
                });
            }
    
            public void Dispose()
            {
                m_scheduler.Dispose();
            }
        }

     

    The SyncContextEventLoopScheduler class is a replacement for the EventLoopScheduler. It behaves the same, but will also set a SchedulerSynchronizatioContext on its thread. This will make sure that all awaits will eventually return to the original thread.

    What do you think?

    I think the Rx team should provide something like that out-of-the-box, especially now, in this new c#5/async era.


    Omer Mor





    • Edited by Omer Mor Tuesday, January 3, 2012 6:03 PM
    • Marked as answer by David Ohana Wednesday, January 4, 2012 8:55 PM
    Tuesday, January 3, 2012 12:36 PM

All replies

  • Hi David,

    The best approach, if possible, is to pass your EventLoopScheduler to the observable or task so that it can marshal continuations directly.   In other words, it's best to control concurrency at the source whenever possible.

    I believe the awaiter of the async/await pattern is in control of marshaling the continuation. The Task and IObservable<T> awaiters use the current SynchronizationContext if it's available. If it's not available or if the ConfigureAwait method is used to disable that behavior, then they simply stay on the "current" thread of the awaited's callback. If the operation introduced concurrency, then it's likely that the "current" thread will be a pooled thread or some other context, but not your EventLoopScheduler thread.

    What do you mean by "subscription handling method"?   Could you post example code?

    Assuming that you're referring to the code passed to Subscribe, it's possible to make additional asynchronous calls from within those handlers by using a separate IObservable<T>, Task or async/await; however, doing so loses some potentially valuable behavior, such as the task being cancelled along with the subscription's IDisposable.  For this reason it may be better to do your asynchronous work as part of the main query instead.  If you prefer the imperative syntax using async/await, then you could use an overload of Observable.Create that accepts a Task-returning function (with the option of accepting a CancellationToken that is tied to the subscription's IDisposable).  Attach this observable to your query using SelectMany.  (Note: These Create extensions are only available in the Rx Experimental release.)

    Regardless of where you use async/await, you'll find that the awaiter is unaware of your EventLoopScheduler.  If you've followed my latter advice and added your asynchronous operation as part of the query instead of as part of the observer, then you can add ObserveOn before calling Subscribe; otherwise, you can use something like this extension proposed here within your observer.

    To avoid an extra context switch when a SynchronizationContext is available, you may want to call ConfigureAwait(false) as well.

    - Dave


    http://davesexton.com/blog
    • Edited by Dave Sexton Sunday, January 1, 2012 3:47 PM formatting; note about experimental release.
    Sunday, January 1, 2012 3:42 PM
  • Actually await should be thought as ContinuWith
    on continuation the completed we may get the completed thread most of the time
    but we don’t have guaranty that this is really the case.

    In the case EventLoopScheduler, I believe that it will never be the case because
    the does not rely on a ThreadPool thread.

    Any why consider the following sample.

    Task t1 = Task.Delay(10);

    Task t2 = Task.Delay(10);

    await Task.WhenAll(t1, t2);

    which thread will be schedule for the continuation?

    If you want to run on the  EventLoopScheduler thread

    You should use the scheduler.Schedule(…)


    Bnaya Eshet
    • Edited by Bnaya Eshet Monday, January 2, 2012 6:31 AM
    Monday, January 2, 2012 6:31 AM
  • Hi Bnaya,

    > await Task.WhenAll(t1, t2);
    >
    which thread will be schedule for the continuation?

    WhenAll returns a Task, so I suspect the default TaskAwaiter will use the current SynchronizationContext if it's available, if this behavior isn't disabled with ConfigureAwait.

    If a SynchronizationContext isn't available, then it would logically follow that the last Task being awaited will provide the thread context for the continuation.  In the case of Delay, I assume it will be a pooled thread that is created by the Timer class.

    - Dave


    http://davesexton.com/blog
    Monday, January 2, 2012 2:55 PM
  • if it come from SynchronizationContext you are probobly right but

    as I understand unlike the DispatcherScheduler the EventLoopScheduler does guarantee

    a single thread synchronization but this thread is not the UI thread therefore
    it doesn’t part of the SynchronizationContext.

    await goes back to the SynchronizationContext only if it start from one.


    Bnaya Eshet
    Tuesday, January 3, 2012 5:33 AM
  • So using await might leave the EventLoopScheduler thread, which could be undesirable and also a bit counter-intuitive.

    The best option I could think of is to write a custom SynchronizationContext and have the EventLoopScheduler set it on its thread.

    I wrote a proof-of-concept here: https://gist.github.com/1554548

    I'll paste it here for your convenience:

        public class RxAsyncTest
        {
            private readonly EventLoopScheduler m_eventLoopScheduler;
            private int m_eventLoopSchedulerThreadId;
    
            public RxAsyncTest()
            {
                m_eventLoopScheduler = new EventLoopScheduler(start => new Thread(() =>
                {
                    m_eventLoopSchedulerThreadId = Thread.CurrentThread.ManagedThreadId;
                    Console.WriteLine("EventLoopScheduler: Thread Id = {0}",
                                      m_eventLoopSchedulerThreadId);
                    start();
                }));
            }
    
            [Test]
            public void Await_in_subscription()
            {
                Console.WriteLine("Main: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId);
                var finishedEvent = new AutoResetEvent(false);
                Observable
                    .Return(Tuple.Create(10, 20), m_eventLoopScheduler)
                    .Subscribe(async tuple =>
                    {
                        Console.WriteLine("Observer before: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId);
                        var result = await AddAsync(tuple.Item1, tuple.Item2);
                        Console.WriteLine("Observer after: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId);
                        finishedEvent.Set();
                    });
    
                finishedEvent.WaitOne();
            }
    
            [Test]
            public void Use_SchedulerSynchronizationContext()
            {
                var schedulerThreadId = -1;
                var scheduler = new SyncContextEventLoopScheduler(
                    start => new Thread(() =>
                    {
                        schedulerThreadId = Thread.CurrentThread.ManagedThreadId;
                        Console.WriteLine("SyncContextEventLoopScheduler: Thread Id = {0}",
                                          schedulerThreadId);
                        start();
                    }));
    
                Console.WriteLine("Test: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId);
                var finishedEvent = new AutoResetEvent(false);
                var threadIds = new List<int>();
                Observable
                    .Return(Tuple.Create(10, 20), scheduler)
                    .Subscribe(async tuple =>
                    {
                        Console.WriteLine("Observer before: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId);
                        var result = await AddWithRxAsync(tuple.Item1, tuple.Item2, scheduler);
                        threadIds.Add(Thread.CurrentThread.ManagedThreadId);
                        Console.WriteLine("Observer after: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId);
                        finishedEvent.Set();
                    });
    
                finishedEvent.WaitOne();
                Assert.AreEqual(schedulerThreadId, threadIds.Distinct().Single());
            }
    
            public Task<int> AddAsync(int x, int y)
            {
                var task = Task.Factory.StartNew(() =>
                {
                    Console.WriteLine("Add: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId);
                    Thread.Sleep(TimeSpan.FromSeconds(1));
                    return x + y;
                });
                return task;
            }
        }
    
        public class SchedulerSynchronizationContext : SynchronizationContext
        {
            private readonly IScheduler m_scheduler;
    
            public SchedulerSynchronizationContext(IScheduler scheduler)
            {
                m_scheduler = scheduler;
            }
    
            public override void Send(SendOrPostCallback callback, object state)
            {
                throw new NotImplementedException("Too lazy to implemenet synchronous invocation now...");
            }
    
            public override void Post(SendOrPostCallback callback, object state)
            {
                m_scheduler.Schedule(() => callback.Invoke(state));
            }
        }
    
        public class SyncContextEventLoopScheduler : IScheduler
        {
            private readonly EventLoopScheduler m_scheduler;
    
            public SyncContextEventLoopScheduler(Func<ThreadStart, Thread> factory)
            {
                m_scheduler = new EventLoopScheduler(factory);
                setSyncContext();
            }
    
            public SyncContextEventLoopScheduler()
            {
                m_scheduler = new EventLoopScheduler();
                setSyncContext();
            }
    
            #region IScheduler Members
    
            public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
            {
                return m_scheduler.Schedule(state, action);
            }
    
            public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
            {
                return m_scheduler.Schedule(state, dueTime, action);
            }
    
            public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime,
                                                Func<IScheduler, TState, IDisposable> action)
            {
                return m_scheduler.Schedule(state, dueTime, action);
            }
    
            public DateTimeOffset Now
            {
                get { return m_scheduler.Now; }
            }
    
            #endregion
    
            private void setSyncContext()
            {
                m_scheduler.Schedule(() =>
                {
                    var syncContext = new SchedulerSynchronizationContext(this);
                    SynchronizationContext.SetSynchronizationContext(syncContext);
                });
            }
    
            public void Dispose()
            {
                m_scheduler.Dispose();
            }
        }

     

    The SyncContextEventLoopScheduler class is a replacement for the EventLoopScheduler. It behaves the same, but will also set a SchedulerSynchronizatioContext on its thread. This will make sure that all awaits will eventually return to the original thread.

    What do you think?

    I think the Rx team should provide something like that out-of-the-box, especially now, in this new c#5/async era.


    Omer Mor





    • Edited by Omer Mor Tuesday, January 3, 2012 6:03 PM
    • Marked as answer by David Ohana Wednesday, January 4, 2012 8:55 PM
    Tuesday, January 3, 2012 12:36 PM
  • We'll improve our async/await support by the .NET 4.5 Beta timeframe. Some changes have been made to the await pattern and infrastructure in the BCL, which we'll piggyback on.
    using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }
    Tuesday, January 3, 2012 2:07 PM
  • Hi Omer

    I do like your Idea of writing your own SynchronizationContext

    It may have interesting implication for other scenario.

    and of Hi Omer

    I do like your Idea of writing your own SynchronizationContext

    It may have interesting implication for other scenario.

    and of Hi Omer

    I do like your Idea of writing your own SynchronizationContext

    It may have interesting implication for other scenario.

    and of coaese I will say tune to see what coming out in the next release (as Bart hint)

     

     

     


    Bnaya Eshet
    Tuesday, January 3, 2012 5:49 PM