Possible bug: Repeat observables using "Immediate Scheduler"
-
Monday, August 23, 2010 4:35 PM
Possible bug...
Observable.Return(5)
.Repeat()
.Take(1) // never completes
Observable.Return(5, Scheduler.CurrentThread)
.Repeat()
.Take(1) // completes after 1 item is yielded
All Replies
-
Monday, August 23, 2010 4:50 PM
Hi James,
this is by design. Observable.Return by default uses the Immediate scheduler which is blocking. because of this, the Take 1 operator never gets a hold of the subscription disposable object. The CurrentThread scheduler is there exactly to avoid this situation. At some point CurrentThread was the default scheduler for many operators. We found however that introducing any kind of concurrency, even giving queue semantics instead of stack semantics needs to be an explicit decision, we made Immediate the default scheduler where possible.
Jeffrey
- Marked As Answer by James Miles Monday, August 23, 2010 5:01 PM
-
Monday, August 23, 2010 5:00 PMUnderstood.
-
Sunday, September 05, 2010 9:11 PM
Hi all,
how does it exactly work? What is the detailed sequence of steps which lead to block? I think this would deserve more precise explanation.
Thank you
Tomas
-
Sunday, September 05, 2010 11:20 PM
, we made Immediate the default scheduler where possible.
It would be nice to have a list of functions which default to schedulers other than Immediate.Jeffrey
-
Tuesday, September 07, 2010 10:48 AM
Hi all,
how does it exactly work? What is the detailed sequence of steps which lead to block? I think this would deserve more precise explanation.
Thank you
Tomas
This is roughly what's happening in the two cases.
(Using immediate scheduler)
1. Start subscription to the source.
2. In the subscription function, source synchronously raises OnNext via the immediate scheduler
3. Repeat process the notification by subscribing to the source.
etc... infinite loop & the subscription never finishes
(Using current thread scheduler)
1. Start subscription to the source.
2. In the subscription function, source shcedules the OnNext action on the current thread's trampoline.
3. Subscription completes
4. Current thread trampoline is now processed & OnNext is actually raised.
5. Take(1) recieves the notification and calls OnCompleted
-
Tuesday, February 01, 2011 3:45 PM
(Using immediate scheduler)
1. Start subscription to the source.
2. In the subscription function, source synchronously raises OnNext via the immediate scheduler
3. Repeat process the notification by subscribing to the source.
etc... infinite loop & the subscription never finishes
(Using current thread scheduler)
1. Start subscription to the source.
2. In the subscription function, source shcedules the OnNext action on the current thread's trampoline.
3. Subscription completes
4. Current thread trampoline is now processed & OnNext is actually raised.
5. Take(1) recieves the notification and calls OnCompletedCan you clarify further, please ?
does Repeat re-subscribes to its IObservable upon OnCompleted in order to facilitate the repitition ?
Thanks !
-
Tuesday, February 01, 2011 3:55 PM
Hi,
Return calls OnNext followed by OnCompleted. Repeat is resubscribing to the source because of OnCompleted, not because of OnNext.
- Dave
http://davesexton.com/blog -
Tuesday, February 01, 2011 4:28 PM
Hi,
Return calls OnNext followed by OnCompleted . Repeat is resubscribing to the source because of OnCompleted , not because of OnNext .
- Dave
http://davesexton.com/blog
Thanks, Let's see if I got it right:(Using immediate scheduler)
1. Start subscription to the source.
2. In the subscription function, source synchronously raises OnNext via the immediate scheduler
3. Repeat receives the OnNext and passes it to Take4. Take passes both the OnNext item and OnCompleted to the original subscriber (which is now out of the picture)
5. Take unsubscribes from Repeat
6. Repeat receives OnCompleted and subscribes again (causing the endless loop) - presumably this could have been optimized away, since no one is listening on Repeat at this stage
The misleading thing here is that OnCompleted is actually called in both cases. But in Scheduler.Immediate case, the subscription call never returns.
-
Tuesday, February 01, 2011 5:26 PM
Yes, you are correct.
The OnCompleted notification is raised, however the IDisposable required by Take(1) has not yet been returned by Repeat's subscription code.
public static IObservable<T> MyRepeat<T>(this IObservable<T> source) { return Observable.Create<T>(o => { var disposable = Scheduler.Immediate.Schedule(self => // if this subscription is synchronous source.Subscribe(o.OnNext, o.OnError, () => { o.OnCompleted(); self(); })); // this line is never reached :( Console.WriteLine("returning disposable"); return disposable.Dispose; }); }
James Miles http://enumeratethis.com -
Tuesday, February 01, 2011 5:51 PM
Yes, you are correct.
The OnCompleted notification is raised, however the IDisposable required by Take(1) has not yet been returned by Repeat's subscription code.
Thanks!
Though, shouldn't that be:
var disposable = Scheduler.Immediate.Schedule(self =>
// if this subscription is synchronous
source.Subscribe(o.OnNext, o.OnError, self);
}));Since Repeat should never call OnCompleted if I understand correctly ?
-
Tuesday, February 01, 2011 5:53 PM

