none
OnCompleted event in observer is never called

    Question

  • I have a dynamically created observable (using CodeDom) and it runs just fine. OnNext and OnError are called on the subscribing observer as expected by my underlying observable.

    When I call dispose on the observable, however, things get strange. In my observable, as I am about to dispose, I make a call to OnCompleted(), but the Observer never sees this OnCompleted event.

    Am  I making a huge error in the way I am putting things together. I have read threads (http://stackoverflow.com/questions/13561842/rx-for-net-when-calling-observer-oncompleted-from-within-disposable-create-no) talking about AutoDetachObserver and how this might block the underlying object's OnCompleted event but I guess I don't understand it enough to know how to solve my problem. One thing I have found is that when I get to the line to fire the OnCompleted event to the observer, the observer instance's IsStopped property hass changed from 0 to 1 (stopped). I guess this is relevant, but have no idea how to get ahead of it being stopped, so that I can get my OnCompleted event through to the observer.

    Here is the code that subscribes to the observable:

    var observable = queryWrapper.CreateObservable().Publish(); // create and publish a ConnectableObservable
    Subscription = observable.Connect();
    observable.ObserveOn(System.Reactive.Concurrency.Scheduler.CurrentThread).
    			Subscribe(action.CreateObserver());

    Here is the observable code (snippet). The _DataSource class instance calls OnNext() on the observer:

    IObservable<XMLPayload> _Underlying;
    CancellationDisposable _Cancel = new CancellationDisposable();
    _Underlying = Observable.Create<MyPayload>(
    	o =>
    	{
    		NewThreadScheduler.Default.Schedule(
    			() =>
    			{
    				try
    				{
    					_DataSource.Start(o);
    					while (!_Cancel.Token.IsCancellationRequested)
    					{
    						Thread.Sleep(100);
    					}
    					Dispose();
    				}
    				catch (Exception ex)
    				{
    					o.OnError(ex);
    				}
    				o.OnCompleted();
    			});
    		return _Cancel;
    	});

     

     

     

     

     


    Thursday, July 04, 2013 3:06 PM

Answers

  • Hi,

    First, I suspect that this line doesn't do what you think it does, though it probably has nothing to do with your actual problem:

    observable.ObserveOn(System.Reactive.Concurrency.Scheduler.CurrentThread)

    CurrentThread introduces single-threaded asynchrony (via a "trampoline"); it doesn't dispatch to the thread on which ObserveOn was called, if that's what you had assumed.  In other words, your example means that each notification will be observed on the thread on which it's already being observed on, though only after that thread completes all of its pending work.  Rx automatically introduces that trampoline when you call Subscribe, so this use of CurrentThread is actually useless.  There are several discussions on this forum about that topic.

    As for your question, you shouldn't call OnCompleted during/after disposal.  Disposal is how observers unsubscribe.  If an observer is unsubscribing, then it's because it doesn't want to observe any more notifications, thus calling OnCompleted is pointless because it's going to be ignored anyway.  Auto-detach behavior prevents OnCompleted from being seen, but that's fine because it just means that the disposal process is fast.

    > Am  I making a huge error in the way I am putting things together

    I suspect that your logic is flawed, but it's hard to say because you haven't provided a complete program or a specification.  What exactly are you trying to accomplish by forcing a call to OnCompleted?

    - Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Sunday, July 07, 2013 9:36 PM Clarification
    • Marked as answer by FarnhamSurrey Wednesday, July 10, 2013 9:35 AM
    Sunday, July 07, 2013 9:34 PM