none
ObserveOn with Scheduler.NewThread does not observe so i.e. if observer's OnNext is blocked and continued

    Question

  • Can some one please help explain why when I "block and continue" observer's onNext sequence subscribed to a buffer with time observable sequence, that Scheduler.NewThread does not apply anymore?


    For example:


    If I buffer a sequence of number via

            var query = from number in Enumerable.Range(1,200)

                        select SnoozeNumberProduction(number);


            var observableQuery = query.ToObservable();

            var bufferedSequence = observableQuery.Buffer(TimeSpan.FromSeconds(2));

    Where SnoozeNumberProduction delays the number generation by 250 ms

            static Int32 SnoozeNumberProduction(Int32 number)

            {

                Thread.Sleep(250);

                return number;

            }

    Now later if i subscribe to the bufferedSequence with an "ObserveOn(Scheduler.NewThread)" such that I block on the fourth buffer with a Console.ReadKey

            Random random = new Random();

            Int32 count = 0;

            bufferedSequence.ObserveOn(Scheduler.NewThread).Subscribe(list =>

            {

                Console.WriteLine("({0}) Numbers from {1}-{2} produced on Thread ID {3}", list.Count, list[0], list[list.Count -1], Thread.CurrentThread.ManagedThreadId);


                Thread.Sleep(1000);

                count++;

                if (count == 4)

                {

                    Console.WriteLine("count reached to 4, blocking ... press any key to continue ");

                    Console.ReadKey(); // Block and build up the queue

                }


                Console.WriteLine("Woken " + list[0] + " - " + list[list.Count - 1]);

            });

    In this case if I hit any key after say 10 seconds or so, I see that the following next few buffers execute on the same ManagedThread even when Scheduler.NewThread is mentioned in the ObserveOn. Can someone please help explain this behavior?

    Sample output:

     

    (7) Numbers from 1-7 produced on Thread ID 12

     

    Woken 1 - 7

     

    (9) Numbers from 8-16 produced on Thread ID 14

     

    Woken 8 - 16

     

    (8) Numbers from 17-24 produced on Thread ID 15

     

    Woken 17 - 24

     

    (8) Numbers from 25-32 produced on Thread ID 16

     

    count reached to 4, blocking ... press any key to continue

     

    fWoken 25 - 32

     

    (8) Numbers from 33-40 produced on Thread ID 16

     

    Woken 33 - 40

     

    (8) Numbers from 41-48 produced on Thread ID 16

     

    Woken 41 - 48

     

    (8) Numbers from 49-56 produced on Thread ID 16

     

    Woken 49 - 56

     

    (8) Numbers from 57-64 produced on Thread ID 16

     

    Woken 57 - 64

     

    (8) Numbers from 65-72 produced on Thread ID 16

     

    Woken 65 - 72

     

    (8) Numbers from 73-80 produced on Thread ID 16

     

    Woken 73 - 80

     

    (8) Numbers from 81-88 produced on Thread ID 16

     

    Woken 81 - 88

     

    (8) Numbers from 89-96 produced on Thread ID 16

     

    Saturday, May 21, 2011 9:38 AM

Answers

  • Hi,

    You're seeing this behavior for two reasons:

    1. You're blocking the call to OnNext in Subscribe.
    2. The ObserveOn operator ensures that OnNext will be called as many times as possible on the current thread.

    The ObserveOn operator reuses the current thread to call OnNext, sequentially, for as many values as are currently available.  Since you're blocking in Subscribe, multiple calls to OnNext will accumulate.  After you unblock, the queued calls are executed on the same thread.  I believe this is to avoid the overhead of creating a new thread per notification when it's unnecessary.

    Your query seems strange to begin with, though.  Have you considered replacing it with a single call to Observable.Generate?  (Previously named Observable.GenerateWithTime)

    Otherwise, at least consider using ToObservable(Scheduler.NewThread) instead of ObserveOn to simplify the query.

    - Dave


    http://davesexton.com/blog
    • Marked as answer by rohits79 Monday, May 23, 2011 3:06 AM
    Saturday, May 21, 2011 11:13 AM

All replies

  • Hi,

    You're seeing this behavior for two reasons:

    1. You're blocking the call to OnNext in Subscribe.
    2. The ObserveOn operator ensures that OnNext will be called as many times as possible on the current thread.

    The ObserveOn operator reuses the current thread to call OnNext, sequentially, for as many values as are currently available.  Since you're blocking in Subscribe, multiple calls to OnNext will accumulate.  After you unblock, the queued calls are executed on the same thread.  I believe this is to avoid the overhead of creating a new thread per notification when it's unnecessary.

    Your query seems strange to begin with, though.  Have you considered replacing it with a single call to Observable.Generate?  (Previously named Observable.GenerateWithTime)

    Otherwise, at least consider using ToObservable(Scheduler.NewThread) instead of ObserveOn to simplify the query.

    - Dave


    http://davesexton.com/blog
    • Marked as answer by rohits79 Monday, May 23, 2011 3:06 AM
    Saturday, May 21, 2011 11:13 AM
  • Thanks a bunch Dave for going through the problem and answering. These are the things I want to learn, are you aware of any documentation etc. where what you mentioned is documented? i would want to learn these and other intricate details? In absence of books etc and ever changing api it is difficult to learn Rx effectively.

    My query is a consequence of playing with Rx and is contorted i know. Thanks for the hint though :).

     

    Best Regards,

    Rohit

    Monday, May 23, 2011 3:09 AM
  • Hi Rohit,

    The latest version of Rx comes in two flavors: stable and experimental.  Hopefully we won't see substantial changes to the stable version.

    I'm unaware of any official conceptual documentation other than the Rx Design Guidelines and Channel 9 videos, which are highly recommended although somewhat outdated.

    This forum also provides a lot of conceptual information if you search for the right terms.

    There used to be official reference documentation on MSDN, although I can't find it right now.  I believe it was for Windows Phone 7 anyway and it may be outdated now as well.

    In case you're unaware, the Rx hub is here:

    http://msdn.microsoft.com/en-us/data/gg577609

    - Dave


    http://davesexton.com/blog
    Monday, May 23, 2011 3:36 AM
  • Thanks again Dave, much appreciate it. Cheers
    Monday, May 23, 2011 5:27 AM