locked
CurrentThreadScheduler vs ImmediateScheduler RRS feed

  • Question

  • Hi

    ImmediateScheduler's Schedule method is pretty straightforward - it simply invokes the action.

    In contrast, CurrentThreadScheduler seems more involved - it creates something called a trampoline, which in turn iterates over an action queue, sleeping between invocations of items in the queue and so forth

    I've been trying to follow the code with reflector but I'm having a hard time understanding the difference. As far as I can tell, CurrentThreadSchedule's schedule method calls Trampoline 's Run method, which will end up blocking the current thread until the queued action is performed (on the current thread as well) - apparently just like in the case of ImmediateScheduler

    I realize I'm missing something, so an explanation would be really appreciated

    Thanks !

    EDIT - In the meantime I've found a couple of resources that may shed light on the subject, if anyone's interested:

    http://channel9.msdn.com/blogs/j.van.gogh/controlling-concurrency-in-rx

    http://community.bartdesmet.net/blogs/bart/archive/2009/11/08/jumping-the-trampoline-in-c-stack-friendly-recursion.aspx

    Tuesday, February 1, 2011 6:23 PM

Answers

  • Hi,

    The trampoline seems to serve three purposes: 

    1. Prevents dead-locks from scheduler reentrancy.
    2. Prevents infinite loops in observables that require recursion through scheduler reentrancy.
    3. Cooperative single-threaded multitasking; I guess it's similar to the proposed async/await feature in C# 5.0.  Calling CurrentThreadScheduler.Schedule is sort of like using await when the currently executing code was also scheduled via CurrentThreadScheduler.

    In the observable world, calling Subscribe should be an asynchronous operation.  There's a problem if the scheduling of an observable dead-locks or blocks the current thread indefinitely because it attempts to execute immediately and never completes.

    Ignore the type of scheduler for a moment and consider a scheduled action that eventually, through some sequence of method calls, uses the same scheduler to schedule another action.

    With the ImmediateScheduler, the inner action is executed immediately.

    • If the outer action acquires some resource on which the inner action depends, and the inner action cannot acquire this resource until it's released by the outer action, then these actions dead-lock.
    • If the outer action depends upon the inner action, and the inner action depends upon the outer action, then this could result in an infinite loop that never yields control to other actions.

      For example: Observable.Return(1).Repeat().Take(1)

      By default, Return uses the ImmediateScheduler to call OnNext(1) then OnCompleted()Repeat does not introduce any concurrency, so it sees OnCompleted immediately and then immediately resubscribes to Return.  Because there's no trampoline in Return, this pattern repeats itself, blocking the current thread indefinitely.  Calling Subscribe on this observable never returns.  See this discussion for more information.

    With the CurrentThreadScheduler, the inner action is scheduled (queued) for execution when the outer action ends.  Conceptually, inner actions are bounced on the trampoline until the current thread is ready to execute them.

    • If the outer action acquires some resource on which the inner action depends, and the inner action cannot acquire this resource until it's released by the outer action, then these actions do not dead-lock because the inner action is not executed until the outer action completes.
    • If the outer action recurses when the inner action completes, then there won't be an immediately infinite loop because the inner action does not complete until the outer action completes first.

      For example: Observable.Return(1, Scheduler.CurrentThread).Repeat().Take(1)

      Here, Return is using the CurrentTheadScheduler to call OnNext(1) then OnCompleted()Repeat does not introduce any concurrency, so it sees OnCompleted immediately and then immediately resubscribes to Return; however, this second subscription to Return schedules its (inner) actions on the trampoline because it's still executing on the OnCompleted callback from the first scheduled (outer) action, thus the repetition does not occur immediately.  This allows Repeat to return a disposable to Take, which eventually calls OnCompleted, cancels the repetition by disposing Repeat, and ultimately the call from Subscribe returns.

    Keep in mind that the examples with Return and Repeat do not introduce any concurrency.  When you call Subscribe, it will not return until the observable completes regardless of which of these schedulers you choose.  With the ImmediateScheduler, Take calls OnCompleted but it cannot cancel the repetition, so Subscribe blocks indefinitely.  Alternatively, the CurrentThreadScheduler allows for cooperative single-threaded multitasking between the Return and Repeat operators, thus allowing Take to cancel the repetition without having to introduce any concurrency.

    - Dave


    http://davesexton.com/blog
    • Proposed as answer by fixedpoint Wednesday, February 2, 2011 2:20 AM
    • Marked as answer by Ohad Schneider Wednesday, February 2, 2011 7:16 PM
    Tuesday, February 1, 2011 8:47 PM
  • Hi,

    Are you familiar with the WPF/Silverlight dispatcher? This is the same principal. Effectively rather than executing the action straight away, it is placed in a queue and executed once execution of the currect action is complete.

    Build you own! ;)

     

    void Main()
    {
    	var dispatcher = new Tramp();
    	
    	Action myappcode = () =>
    	{
    		for(int i = 0; i<10; i++)
    		{
    			var x = i;
    			dispatcher.Enqueue(() => Console.WriteLine(x));
    		}
    	};
    	
    	dispatcher.Enqueue(myappcode);
    	dispatcher.Run();
    	Console.WriteLine("Finished!");
    }
    
    public class Tramp
    {
    	private readonly Queue<Action> queue = new Queue<Action>();
    	
    	// Start processing
    	public void Run()
    	{
    		while(queue.Count > 0)
    		{
    			queue.Dequeue()();
    		}
    	}
    	
    	public void Enqueue(Action a)
    	{
    		queue.Enqueue(a);
    	}
    }
    

     

    There a serveral reasons why this technique is useful. In WPF this allows operations to be prioritised, and allows the framework to perform tasks like data binding etc in between executing "user code" (your code). In Rx this reduces the stack-depth for recursive operations, and can be used to avoid senarios that would otherwise result in a deadlock using normal execution (as I believe you've seen in another thread!).

    Hope that helps. If i've missed something i'm sure one of the experts will point it out.


    James Miles http://enumeratethis.com
    • Proposed as answer by fixedpoint Wednesday, February 2, 2011 2:20 AM
    • Marked as answer by Ohad Schneider Wednesday, February 2, 2011 7:21 PM
    Tuesday, February 1, 2011 9:28 PM

All replies

  • Hi,

    The trampoline seems to serve three purposes: 

    1. Prevents dead-locks from scheduler reentrancy.
    2. Prevents infinite loops in observables that require recursion through scheduler reentrancy.
    3. Cooperative single-threaded multitasking; I guess it's similar to the proposed async/await feature in C# 5.0.  Calling CurrentThreadScheduler.Schedule is sort of like using await when the currently executing code was also scheduled via CurrentThreadScheduler.

    In the observable world, calling Subscribe should be an asynchronous operation.  There's a problem if the scheduling of an observable dead-locks or blocks the current thread indefinitely because it attempts to execute immediately and never completes.

    Ignore the type of scheduler for a moment and consider a scheduled action that eventually, through some sequence of method calls, uses the same scheduler to schedule another action.

    With the ImmediateScheduler, the inner action is executed immediately.

    • If the outer action acquires some resource on which the inner action depends, and the inner action cannot acquire this resource until it's released by the outer action, then these actions dead-lock.
    • If the outer action depends upon the inner action, and the inner action depends upon the outer action, then this could result in an infinite loop that never yields control to other actions.

      For example: Observable.Return(1).Repeat().Take(1)

      By default, Return uses the ImmediateScheduler to call OnNext(1) then OnCompleted()Repeat does not introduce any concurrency, so it sees OnCompleted immediately and then immediately resubscribes to Return.  Because there's no trampoline in Return, this pattern repeats itself, blocking the current thread indefinitely.  Calling Subscribe on this observable never returns.  See this discussion for more information.

    With the CurrentThreadScheduler, the inner action is scheduled (queued) for execution when the outer action ends.  Conceptually, inner actions are bounced on the trampoline until the current thread is ready to execute them.

    • If the outer action acquires some resource on which the inner action depends, and the inner action cannot acquire this resource until it's released by the outer action, then these actions do not dead-lock because the inner action is not executed until the outer action completes.
    • If the outer action recurses when the inner action completes, then there won't be an immediately infinite loop because the inner action does not complete until the outer action completes first.

      For example: Observable.Return(1, Scheduler.CurrentThread).Repeat().Take(1)

      Here, Return is using the CurrentTheadScheduler to call OnNext(1) then OnCompleted()Repeat does not introduce any concurrency, so it sees OnCompleted immediately and then immediately resubscribes to Return; however, this second subscription to Return schedules its (inner) actions on the trampoline because it's still executing on the OnCompleted callback from the first scheduled (outer) action, thus the repetition does not occur immediately.  This allows Repeat to return a disposable to Take, which eventually calls OnCompleted, cancels the repetition by disposing Repeat, and ultimately the call from Subscribe returns.

    Keep in mind that the examples with Return and Repeat do not introduce any concurrency.  When you call Subscribe, it will not return until the observable completes regardless of which of these schedulers you choose.  With the ImmediateScheduler, Take calls OnCompleted but it cannot cancel the repetition, so Subscribe blocks indefinitely.  Alternatively, the CurrentThreadScheduler allows for cooperative single-threaded multitasking between the Return and Repeat operators, thus allowing Take to cancel the repetition without having to introduce any concurrency.

    - Dave


    http://davesexton.com/blog
    • Proposed as answer by fixedpoint Wednesday, February 2, 2011 2:20 AM
    • Marked as answer by Ohad Schneider Wednesday, February 2, 2011 7:16 PM
    Tuesday, February 1, 2011 8:47 PM
  • Hi,

    Are you familiar with the WPF/Silverlight dispatcher? This is the same principal. Effectively rather than executing the action straight away, it is placed in a queue and executed once execution of the currect action is complete.

    Build you own! ;)

     

    void Main()
    {
    	var dispatcher = new Tramp();
    	
    	Action myappcode = () =>
    	{
    		for(int i = 0; i<10; i++)
    		{
    			var x = i;
    			dispatcher.Enqueue(() => Console.WriteLine(x));
    		}
    	};
    	
    	dispatcher.Enqueue(myappcode);
    	dispatcher.Run();
    	Console.WriteLine("Finished!");
    }
    
    public class Tramp
    {
    	private readonly Queue<Action> queue = new Queue<Action>();
    	
    	// Start processing
    	public void Run()
    	{
    		while(queue.Count > 0)
    		{
    			queue.Dequeue()();
    		}
    	}
    	
    	public void Enqueue(Action a)
    	{
    		queue.Enqueue(a);
    	}
    }
    

     

    There a serveral reasons why this technique is useful. In WPF this allows operations to be prioritised, and allows the framework to perform tasks like data binding etc in between executing "user code" (your code). In Rx this reduces the stack-depth for recursive operations, and can be used to avoid senarios that would otherwise result in a deadlock using normal execution (as I believe you've seen in another thread!).

    Hope that helps. If i've missed something i'm sure one of the experts will point it out.


    James Miles http://enumeratethis.com
    • Proposed as answer by fixedpoint Wednesday, February 2, 2011 2:20 AM
    • Marked as answer by Ohad Schneider Wednesday, February 2, 2011 7:21 PM
    Tuesday, February 1, 2011 9:28 PM
  • For example: Observable.Return(1, Scheduler.CurrentThread).Repeat().Take(1)

    Here, Return is using the CurrentTheadScheduler to call OnNext(1) then OnCompleted()Repeat does not introduce any concurrency, so it sees OnCompleted immediately and then immediately resubscribes to Return; however, this second subscription to Return schedules its (inner) actions on the trampoline because it's still executing on the OnCompleted callback from the first scheduled (outer) action, thus the repetition does not occur immediately.  This allows Repeat to return a disposable to Take, which eventually calls OnCompleted, cancels the repetition by disposing Repeat, and ultimately the call from Subscribe returns.

    Thanks Dave for your great explanation. For anyone getting into Rx and trying to understand this issue in 2015, I have the following correction (which is probably due to a change in Rx since the initial post). I hope it is not out of place here.

    Let's take: Observable.Return(1, CurrentThreadScheduler.Instance).Repeat().Take(1).Subscribe(...)

    It is actually the call to Subscribe that places an action outermost on the CurrentThreadScheduler queue (this is coded into ObservableBase.Subscribe).

    This means that the innermost Return is scheduled to run only after the Subscribe method is finished, and Subscribe will then finish right away when called (via Take subscribing to Repeat subscribing to Return, none introducing any concurrency or actual work). This means that before even starting, there is already a subscription disposable waiting and listening for OnCompleted from Take, unlike in the ImmediateScheduler case where Subscribe is not allowed to finish first.

    So the call chain is simply: when the innermost Return resumes with its OnNext, Repeat also calls OnNext and Take calls both OnNext and OnCompleted, this outermost OnCompleted here signals the disposal of the entire subscription chain. Thus, when Return continues with its next line of code and calls OnCompleted, there is no longer a Repeat subscription listening and it does not loop and resubscribe.

    Best regards,

    Dennis S

    Wednesday, March 25, 2015 7:04 AM