Subscription disposed before OnCompleted (?)
-
Tuesday, February 05, 2013 10:36 AM
Hi,
I observe strange behavior in my Silverlight application. I have this helper for calling WCF Ria services:
public static IObservable<LoadOperation<TEntity>> LoadAsync<TEntity>( this DomainContext domainContext, EntityQuery<TEntity> query, LoadBehavior loadBehavior = LoadBehavior.RefreshCurrent, object userState = null) where TEntity : Entity { var q = Observable.Create<LoadOperation<TEntity>>(o => { var dbg = query.QueryName; domainContext.Load(query, loadBehavior, lo => { if (lo.HasError) { lo.MarkErrorAsHandled(); o.OnError(lo.Error); } else { o.OnNext(lo); o.OnCompleted(); } }, userState); return () => { Debug.WriteLine(dbg); }; }); return q; }What is strange to me is that sometimes it happend this sequence of callings:
o.OnNext(lo); Debug.WriteLine(dbg); o.OnCompleted();
I thought, it should always be (in no exception cases):
o.OnNext(lo); o.OnCompleted(); Debug.WriteLine(dbg);
So am I missing something here?
Thanks,
Tomas
- Edited by tomas.K Tuesday, February 05, 2013 1:30 PM title
All Replies
-
Tuesday, February 05, 2013 1:59 PM
Hi Tomas,
How do you know the order of the calls? Your example doesn't show how you subscribe to the query, or how you dispose of the subscription. Is it possible that the order of the output does not match the order in which the methods were actually called?
If you're disposing of the subscription on another thread, then there's a race condition. OnNext may be called, then Debug.WriteLine preempts the call to OnCompleted on another thread; however, in this case Rx prevents the observer from receiving OnCompleted. So it would seem that if you're logging OnCompleted after Debug.WriteLine, then it may simply be a problem with your logger.
- Dave
- Edited by Dave Sexton Tuesday, February 05, 2013 2:00 PM Small gramatically update
-
Tuesday, February 05, 2013 3:21 PM
Hi Dave,
Thanks for interest. The order I know from debugging - hit breakpoints. The whole query looks like this:
var q = Observable.Zip(
m_serviceContext.LoadAsync(m_serviceContext.GetReservationInfoTreeQuery(resId), lb)
.Select(lo => lo.Entities)
.Do(
t =>
{
IsBusy = false;
ReservationId_Text = ReservationId_Opened = resId;
AddToHistoryIfNew(resId);
},
ex => { IsBusy = false; m_eventAggregator.Publish(new ShowExceptionWinEventArgs(ex)); },
() => { }
)
... // other callings of LoadAsync, which are similar to the first one - just lot of loadings
, ( _1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11 ) => Unit.Default)
.Do(
_ => { IsBusy = false; },
ex => { IsBusy = false; },
() => { });
q.Subscribe();There are no explicit disposings, no concurrency, only standard Rx operators Zip, SelectMany, Do, Switch and my LoadAsync. So I don't know how this can happen sometimes.
- Edited by tomas.K Tuesday, February 05, 2013 3:25 PM
-
Tuesday, February 05, 2013 8:45 PM
Hi,
domainContext.Load introduces concurrency, does it not? I assume that it executes o.OnNext on a pooled thread. Though if you're not disposing of the subscription, then it doesn't explain why o.OnCompleted is sometimes called after Debug.WriteLine.
If you can repro the problem, it may be helpful if you would post 3 stack traces with a breakpoint on o.OnNext, o.OnCompleted and Debug.WriteLine. Please also include the identity of the current thread.
- Dave
-
Tuesday, February 05, 2013 8:48 PM
Hi,
And since you're using Zip, are you sure that the latter o.OnCompleted corresponds with the former Debug.WriteLine in the same observable? In other words, is it possible that you're simply pressing F5 and Visual Studio is breaking on two different observables (running on two different pooled threads)?
- Dave
-
Wednesday, February 06, 2013 10:48 AM
Hi Dave,
-There is always the same thread in all breakpoint hits.
-I am sure that the observable is the same as well - that brakpoints are conditional on the same query name and that quey is subscribed only once.
Here are dumps - that strange case order is LoadAsync.OnNext, Zip.OnCompleted, LoadAsync.Disposed, LoadAsync.OnCompleted, so it seems to me any problem with Zip ?:
Zip.OnCompleted: - Is called from LoadAsync.OnNext: > SVBF!SVBF.Views.ShellVM.LoadReservation.AnonymousMethod__55() Line 567 + 0x1 bytes C# System.Reactive.Linq!System.Reactive.Linq.Observαble.Do<System.Reactive.Unit>._.OnCompleted() + 0x43 bytes System.Reactive.Linq!System.Reactive.Linq.Observαble.ZipSink<System.Reactive.Unit>.Next(int index) + 0x1cf bytes System.Reactive.Linq!System.Reactive.Linq.Observαble.ZipObserver<System.Collections.Generic.IEnumerable<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>>.OnNext(System.Collections.Generic.IEnumerable<Pear.Silverlight.RIAServices.Web.ReservationInfoNode> value) + 0x78 bytes System.Reactive.Linq!System.Reactive.Linq.Observαble.Do<System.Collections.Generic.IEnumerable<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>>._.OnNext(System.Collections.Generic.IEnumerable<Pear.Silverlight.RIAServices.Web.ReservationInfoNode> value) + 0x122 bytes System.Reactive.Linq!System.Reactive.Linq.Observαble.Do<System.Collections.Generic.IEnumerable<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>>._.OnNext(System.Collections.Generic.IEnumerable<Pear.Silverlight.RIAServices.Web.ReservationInfoNode> value) + 0x122 bytes System.Reactive.Linq!System.Reactive.Linq.Observαble.Select<System.ServiceModel.DomainServices.Client.LoadOperation<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>,System.Collections.Generic.IEnumerable<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>>._.OnNext(System.ServiceModel.DomainServices.Client.LoadOperation<Pear.Silverlight.RIAServices.Web.ReservationInfoNode> value) + 0x13e bytes System.Reactive.Core!System.Reactive.AutoDetachObserver<System.ServiceModel.DomainServices.Client.LoadOperation<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>>.OnNextCore(System.ServiceModel.DomainServices.Client.LoadOperation<Pear.Silverlight.RIAServices.Web.ReservationInfoNode> value) + 0x95 bytes System.Reactive.Core!System.Reactive.ObserverBase<System.ServiceModel.DomainServices.Client.LoadOperation<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>>.OnNext(System.ServiceModel.DomainServices.Client.LoadOperation<Pear.Silverlight.RIAServices.Web.ReservationInfoNode> value) + 0x3d bytes LoadAsync.OnNext: Pear.Silverlight!Pear.Silverlight.Utility.ObservableExtensions.LoadAsync<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>.AnonymousMethod__a(System.ServiceModel.DomainServices.Client.LoadOperation<Pear.Silverlight.RIAServices.Web.ReservationInfoNode> lo) Line 66 + 0x54 bytes C# System.ServiceModel.DomainServices.Client!System.ServiceModel.DomainServices.Client.DomainContext.Load<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>.AnonymousMethod__11(System.ServiceModel.DomainServices.Client.LoadOperation lo) + 0xe0 bytes System.ServiceModel.DomainServices.Client!System.ServiceModel.DomainServices.Client.LoadOperation.Create<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>.AnonymousMethod__0(System.ServiceModel.DomainServices.Client.LoadOperation<Pear.Silverlight.RIAServices.Web.ReservationInfoNode> arg) + 0x2f bytes System.ServiceModel.DomainServices.Client!System.ServiceModel.DomainServices.Client.LoadOperation<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>.InvokeCompleteAction() + 0x3b bytes System.ServiceModel.DomainServices.Client!System.ServiceModel.DomainServices.Client.OperationBase.Complete(object result) + 0x75 bytes System.ServiceModel.DomainServices.Client!System.ServiceModel.DomainServices.Client.LoadOperation.Complete(System.ServiceModel.DomainServices.Client.DomainClientResult result) + 0xb6 bytes System.ServiceModel.DomainServices.Client!System.ServiceModel.DomainServices.Client.DomainContext.CompleteLoad(System.IAsyncResult asyncResult) + 0x3f7 bytes System.ServiceModel.DomainServices.Client!System.ServiceModel.DomainServices.Client.DomainContext.Load.AnonymousMethod__17() + 0x30 bytes LoadAsync.Disposed: Pear.Silverlight!Pear.Silverlight.Utility.ObservableExtensions.LoadAsync<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>.AnonymousMethod__b() Line 76 C# System.Reactive.Core!System.Reactive.Disposables.AnonymousDisposable.Dispose() + 0x55 bytes System.Reactive.Core!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x57 bytes System.Reactive.Core!System.Reactive.AutoDetachObserver<System.ServiceModel.DomainServices.Client.LoadOperation<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>>.Dispose(bool disposing) + 0x44 bytes System.Reactive.Core!System.Reactive.ObserverBase<System.ServiceModel.DomainServices.Client.LoadOperation<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>>.Dispose() + 0x29 bytes System.Reactive.Core!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x57 bytes System.Reactive.Core!System.Reactive.Sink<System.Collections.Generic.IEnumerable<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>>.Dispose() + 0xb4 bytes System.Reactive.Core!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x57 bytes System.Reactive.Core!System.Reactive.Disposables.CompositeDisposable.Dispose() + 0x13f bytes System.Reactive.Core!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x57 bytes System.Reactive.Core!System.Reactive.Sink<System.Collections.Generic.IEnumerable<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>>.Dispose() + 0xb4 bytes System.Reactive.Core!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x57 bytes System.Reactive.Core!System.Reactive.Disposables.CompositeDisposable.Dispose() + 0x13f bytes System.Reactive.Core!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x57 bytes System.Reactive.Core!System.Reactive.Sink<System.Collections.Generic.IEnumerable<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>>.Dispose() + 0xb4 bytes System.Reactive.Core!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x57 bytes System.Reactive.Core!System.Reactive.Disposables.CompositeDisposable.Dispose() + 0x13f bytes System.Reactive.Core!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x57 bytes System.Reactive.Core!System.Reactive.Disposables.CompositeDisposable.Dispose() + 0x13f bytes System.Reactive.Core!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x57 bytes System.Reactive.Core!System.Reactive.Sink<System.Reactive.Unit>.Dispose() + 0x67 bytes System.Reactive.Core!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x57 bytes System.Reactive.Core!System.Reactive.Disposables.CompositeDisposable.Dispose() + 0x13f bytes System.Reactive.Core!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x57 bytes System.Reactive.Core!System.Reactive.Sink<System.Reactive.Unit>.Dispose() + 0x67 bytes System.Reactive.Core!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x57 bytes System.Reactive.Core!System.Reactive.Disposables.CompositeDisposable.Dispose() + 0x13f bytes System.Reactive.Core!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x57 bytes System.Reactive.Core!System.Reactive.Sink<System.Reactive.Unit>.Dispose() + 0x67 bytes System.Reactive.Core!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x57 bytes System.Reactive.Core!System.Reactive.Disposables.CompositeDisposable.Dispose() + 0x13f bytes System.Reactive.Core!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x57 bytes System.Reactive.Linq!System.Reactive.Linq.Observαble.Switch<System.Reactive.Unit>._.ι.OnCompleted() + 0x5e bytes System.Reactive.Linq!System.Reactive.Linq.Observαble.Defer<System.Reactive.Unit>._.OnCompleted() + 0x25 bytes System.Reactive.Linq!System.Reactive.Linq.Observαble.Do<System.Reactive.Unit>._.OnCompleted() + 0x89 bytes System.Reactive.Linq!System.Reactive.Linq.Observαble.ZipSink<System.Reactive.Unit>.Next(int index) + 0x1cf bytes System.Reactive.Linq!System.Reactive.Linq.Observαble.ZipObserver<System.Collections.Generic.IEnumerable<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>>.OnNext(System.Collections.Generic.IEnumerable<Pear.Silverlight.RIAServices.Web.ReservationInfoNode> value) + 0x78 bytes System.Reactive.Linq!System.Reactive.Linq.Observαble.Do<System.Collections.Generic.IEnumerable<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>>._.OnNext(System.Collections.Generic.IEnumerable<Pear.Silverlight.RIAServices.Web.ReservationInfoNode> value) + 0x122 bytes System.Reactive.Linq!System.Reactive.Linq.Observαble.Do<System.Collections.Generic.IEnumerable<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>>._.OnNext(System.Collections.Generic.IEnumerable<Pear.Silverlight.RIAServices.Web.ReservationInfoNode> value) + 0x122 bytes System.Reactive.Linq!System.Reactive.Linq.Observαble.Select<System.ServiceModel.DomainServices.Client.LoadOperation<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>,System.Collections.Generic.IEnumerable<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>>._.OnNext(System.ServiceModel.DomainServices.Client.LoadOperation<Pear.Silverlight.RIAServices.Web.ReservationInfoNode> value) + 0x13e bytes System.Reactive.Core!System.Reactive.AutoDetachObserver<System.ServiceModel.DomainServices.Client.LoadOperation<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>>.OnNextCore(System.ServiceModel.DomainServices.Client.LoadOperation<Pear.Silverlight.RIAServices.Web.ReservationInfoNode> value) + 0x95 bytes System.Reactive.Core!System.Reactive.ObserverBase<System.ServiceModel.DomainServices.Client.LoadOperation<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>>.OnNext(System.ServiceModel.DomainServices.Client.LoadOperation<Pear.Silverlight.RIAServices.Web.ReservationInfoNode> value) + 0x3d bytes LoadAsync.OnNext: Pear.Silverlight!Pear.Silverlight.Utility.ObservableExtensions.LoadAsync<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>.AnonymousMethod__a(System.ServiceModel.DomainServices.Client.LoadOperation<Pear.Silverlight.RIAServices.Web.ReservationInfoNode> lo) Line 66 + 0x54 bytes C# System.ServiceModel.DomainServices.Client!System.ServiceModel.DomainServices.Client.DomainContext.Load<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>.AnonymousMethod__11(System.ServiceModel.DomainServices.Client.LoadOperation lo) + 0xe0 bytes System.ServiceModel.DomainServices.Client!System.ServiceModel.DomainServices.Client.LoadOperation.Create<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>.AnonymousMethod__0(System.ServiceModel.DomainServices.Client.LoadOperation<Pear.Silverlight.RIAServices.Web.ReservationInfoNode> arg) + 0x2f bytes System.ServiceModel.DomainServices.Client!System.ServiceModel.DomainServices.Client.LoadOperation<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>.InvokeCompleteAction() + 0x3b bytes System.ServiceModel.DomainServices.Client!System.ServiceModel.DomainServices.Client.OperationBase.Complete(object result) + 0x75 bytes System.ServiceModel.DomainServices.Client!System.ServiceModel.DomainServices.Client.LoadOperation.Complete(System.ServiceModel.DomainServices.Client.DomainClientResult result) + 0xb6 bytes System.ServiceModel.DomainServices.Client!System.ServiceModel.DomainServices.Client.DomainContext.CompleteLoad(System.IAsyncResult asyncResult) + 0x3f7 bytes System.ServiceModel.DomainServices.Client!System.ServiceModel.DomainServices.Client.DomainContext.Load.AnonymousMethod__17() + 0x30 bytes LoadAsync.OnCompleted: > Pear.Silverlight!Pear.Silverlight.Utility.ObservableExtensions.LoadAsync<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>.AnonymousMethod__a(System.ServiceModel.DomainServices.Client.LoadOperation<Pear.Silverlight.RIAServices.Web.ReservationInfoNode> lo) Line 67 C# System.ServiceModel.DomainServices.Client!System.ServiceModel.DomainServices.Client.DomainContext.Load<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>.AnonymousMethod__11(System.ServiceModel.DomainServices.Client.LoadOperation lo) + 0xe0 bytes System.ServiceModel.DomainServices.Client!System.ServiceModel.DomainServices.Client.LoadOperation.Create<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>.AnonymousMethod__0(System.ServiceModel.DomainServices.Client.LoadOperation<Pear.Silverlight.RIAServices.Web.ReservationInfoNode> arg) + 0x2f bytes System.ServiceModel.DomainServices.Client!System.ServiceModel.DomainServices.Client.LoadOperation<Pear.Silverlight.RIAServices.Web.ReservationInfoNode>.InvokeCompleteAction() + 0x3b bytes System.ServiceModel.DomainServices.Client!System.ServiceModel.DomainServices.Client.OperationBase.Complete(object result) + 0x75 bytes System.ServiceModel.DomainServices.Client!System.ServiceModel.DomainServices.Client.LoadOperation.Complete(System.ServiceModel.DomainServices.Client.DomainClientResult result) + 0xb6 bytes System.ServiceModel.DomainServices.Client!System.ServiceModel.DomainServices.Client.DomainContext.CompleteLoad(System.IAsyncResult asyncResult) + 0x3f7 bytes System.ServiceModel.DomainServices.Client!System.ServiceModel.DomainServices.Client.DomainContext.Load.AnonymousMethod__17() + 0x30 bytes
BTW, how to write here in normal line spacing, it always force me to 1.5 line space after enter (use Firefox) ?
Thanks,
Tomas
- Edited by tomas.K Wednesday, February 06, 2013 12:48 PM
-
Wednesday, February 06, 2013 2:08 PM
Hi Tomas,
You didn't post the code that uses Switch and Defer. What is being switched?
I'm wondering whether OnNext is causing Switch to dispose of the previous subscription before OnCompleted is called, though of course it implies that you may be looking at two different observables.
Looking at your stack trace for LoadAsync.Disposed, it appears that LoadAsync's AutoDetachObserver is being disposed (top of the stack):
LoadAsync<...>.AnonymousMethod__b() Line 76 C# Disposables.AnonymousDisposable.Dispose() + 0x55 bytes Disposables.SingleAssignmentDisposable.Dispose() + 0x57 bytes AutoDetachObserver<LoadOperation<...>>.Dispose(bool disposing) + 0x44 bytes ObserverBase<LoadOperation<...>>.Dispose() + 0x29 bytes
...indirectly by a call to OnNext in the base frame:
...
AutoDetachObserver<LoadOperation<...>>.OnNextCore(LoadOperation<...> value) + 0x95 bytes ObserverBase<LoadOperation<...>>.OnNext(LoadOperation<...> value) + 0x3d bytesI couldn't repro the overall problem in a console program, though I didn't use Switch. But in my test, the stack trace showed that the call to o.OnCompleted was directly responsible for disposing of LoadAsync's AutoDetachObserver. Here's the bottom of my stack trace
... AutoDetachObserver<LoadOperation<...>>.OnCompletedCore() + 0x65 bytes ObserverBase<LoadOperation<...>>.OnCompleted() + 0x3b bytes
This is why I think that Switch is playing a role in this behavior.
> BTW, how to write here in normal line spacing, it always force me to 1.5 line space after enter (use Firefox) ?
Try Shift+Enter. It works in IE10.
- Dave
-
Wednesday, February 06, 2013 7:04 PM
Hi Dave,
I think the Switch should not be the problem - there is only one observable pushed to it, so there is no disposing in this case.
My whole query with Defer and Switch is: The big Zip (above) is wrapped inside Defer and this whole query is pushed to Switch when the user click on refresh or so. - I didn't want to complicate my description unnecessarily, but here it is for completeness:
ReservationsLoadings // Subject .Switch() .Do(_ => { }, ex => m_eventAggregator.Publish(new ShowExceptionWinEventArgs(ex))) .Retry() .Subscribe();I think suspicious is rather the Zip in Zip.OnCompleted, which is called from LoadAsync.OnNext:
...
...Do<System.Reactive.Unit>._.OnCompleted() + 0x43 bytes ...ZipSink<System.Reactive.Unit>.Next(int index) + 0x1cf bytes ...ZipObserver<IEnumerable<ReservationInfoNode>>.OnNext(System.Collections.Generic.IEnumerable<Pear.Silverlight.RIAServices.Web.ReservationInfoNode> value) + 0x78 bytes
...- OnCompleted after Zip's OnNext. - How can Zip call OnCompleted in its nested observable's OnNext? -What do you think?
Tomas
- Edited by tomas.K Saturday, February 09, 2013 12:34 PM
-
Wednesday, February 13, 2013 3:18 PM
-
Wednesday, February 13, 2013 5:06 PM
Hi Dave,
I am using Rx 2.0 for Silverlight 5.0.
Thanks for continuation of your interest,
Tomas
-
Wednesday, February 13, 2013 6:06 PM
Hi Tomas,
At this point it's going to be hard or impossible to explain the behavior if I can't repro. If you can provide a short but complete program that reproduces the behavior and the steps that I can take to reproduce it, then I'd be happy to take a look at it. Try to eliminate factors that you consider to be irrelevant, such as Switch and Defer, to see if the behavior remains. At some point you should be able to identify a minimal query that can repro the behavior - and by that time perhaps you'll have already found the answer :)
- Dave

