locked
Possible bug: Repeat observables using "Immediate Scheduler" RRS feed

  • Question

  • Possible bug...

     

    Observable.Return(5)
    .Repeat()
    .Take(1) // never completes



    Observable.Return(5, Scheduler.CurrentThread)
    .Repeat()
    .Take(1) // completes after 1 item is yielded

    Monday, August 23, 2010 4:35 PM

Answers

  • Hi James,

    this is by design. Observable.Return by default uses the Immediate scheduler which is blocking. because of this, the Take 1 operator never gets a hold of the subscription disposable object. The CurrentThread scheduler is there exactly to avoid this situation. At some point CurrentThread was the default scheduler for many operators. We found however that introducing any kind of concurrency, even giving queue semantics instead of stack semantics needs to be an explicit decision, we made Immediate the default scheduler where possible.

     

    Jeffrey

    • Marked as answer by James Miles Monday, August 23, 2010 5:01 PM
    Monday, August 23, 2010 4:50 PM

All replies

  • Hi James,

    this is by design. Observable.Return by default uses the Immediate scheduler which is blocking. because of this, the Take 1 operator never gets a hold of the subscription disposable object. The CurrentThread scheduler is there exactly to avoid this situation. At some point CurrentThread was the default scheduler for many operators. We found however that introducing any kind of concurrency, even giving queue semantics instead of stack semantics needs to be an explicit decision, we made Immediate the default scheduler where possible.

     

    Jeffrey

    • Marked as answer by James Miles Monday, August 23, 2010 5:01 PM
    Monday, August 23, 2010 4:50 PM
  • Understood.
    Monday, August 23, 2010 5:00 PM
  • Hi all,

    how does it exactly work? What is the detailed sequence of steps which lead to block? I think this would deserve more precise explanation.

    Thank you

    Tomas

    Sunday, September 5, 2010 9:11 PM
  • , we made Immediate the default scheduler where possible.

    Jeffrey

    It would be nice to have a list of functions which default to schedulers other than Immediate.
    Sunday, September 5, 2010 11:20 PM
  • Hi all,

    how does it exactly work? What is the detailed sequence of steps which lead to block? I think this would deserve more precise explanation.

    Thank you

    Tomas

    This is roughly what's happening in the two cases.

    (Using immediate scheduler)

    1. Start subscription to the source.
    2. In the subscription function, source synchronously raises OnNext via the immediate scheduler
    3. Repeat process the notification by subscribing to the source.
    etc... infinite loop & the subscription never finishes

    (Using current thread scheduler)

    1. Start subscription to the source.
    2. In the subscription function, source shcedules the OnNext action on the current thread's trampoline.
    3. Subscription completes
    4. Current thread trampoline is now processed & OnNext is actually raised.
    5. Take(1) recieves the notification and calls OnCompleted

     

    Tuesday, September 7, 2010 10:48 AM
  • (Using immediate scheduler)

    1. Start subscription to the source.
    2. In the subscription function, source synchronously raises OnNext via the immediate scheduler
    3. Repeat process the notification by subscribing to the source.
    etc... infinite loop & the subscription never finishes

    (Using current thread scheduler)

    1. Start subscription to the source.
    2. In the subscription function, source shcedules the OnNext action on the current thread's trampoline.
    3. Subscription completes
    4. Current thread trampoline is now processed & OnNext is actually raised.
    5. Take(1) recieves the notification and calls OnCompleted

    Can you clarify further, please ?

    does Repeat re-subscribes to its IObservable upon OnCompleted in order to facilitate the repitition ?

    Thanks !

    Tuesday, February 1, 2011 3:45 PM
  • Hi,

    Return calls OnNext followed by OnCompletedRepeat is resubscribing to the source because of OnCompleted, not because of OnNext.

    - Dave


    http://davesexton.com/blog
    Tuesday, February 1, 2011 3:55 PM
  • Hi,

    Return calls OnNext followed by OnCompletedRepeat is resubscribing to the source because of OnCompleted , not because of OnNext .

    - Dave


    http://davesexton.com/blog


    Thanks, Let's see if I got it right:

    (Using immediate scheduler)

    1. Start subscription to the source.
    2. In the subscription function, source synchronously raises OnNext via the immediate scheduler
    3. Repeat receives the OnNext and passes it to Take

    4. Take passes both the OnNext item and OnCompleted to the original subscriber (which is now out of the picture)

    5. Take unsubscribes from Repeat

    6. Repeat receives OnCompleted  and subscribes again (causing the endless loop) - presumably this could have been optimized away, since no one is listening on Repeat at this stage

     

    The misleading thing here is that OnCompleted is actually called in both cases. But in Scheduler.Immediate case, the subscription call never returns.

    Tuesday, February 1, 2011 4:28 PM
  • Yes, you are correct.

    The OnCompleted notification is raised, however the IDisposable required by Take(1) has not yet been returned by Repeat's subscription code.

     

    public static IObservable<T> MyRepeat<T>(this IObservable<T> source)
    {
     return Observable.Create<T>(o =>
     {
      var disposable = Scheduler.Immediate.Schedule(self => 
       // if this subscription is synchronous
       source.Subscribe(o.OnNext, o.OnError, () =>
      {
       o.OnCompleted();
       self();
      }));
        
      // this line is never reached :(
      Console.WriteLine("returning disposable");
      return disposable.Dispose;
     });
    }
    

     


    James Miles http://enumeratethis.com

    Tuesday, February 1, 2011 5:26 PM
  • Yes, you are correct.

    The OnCompleted notification is raised, however the IDisposable required by Take(1) has not yet been returned by Repeat's subscription code.

    Thanks!

    Though, shouldn't that be:

     var
     disposable = Scheduler.Immediate.Schedule(self => 
    // if this subscription is synchronous
    source.Subscribe(o.OnNext, o.OnError, self);
    }));

    Since Repeat should never call OnCompleted if I understand correctly ?

    Tuesday, February 1, 2011 5:51 PM
  • Yes - sorry!


    James Miles http://enumeratethis.com
    Tuesday, February 1, 2011 5:53 PM