locked
Recommended pattern for stopping/restarting a subscription? RRS feed

  • Question

  • I would like to know what is the standard way of stopping a subscription, then restarting it?

     

    Currently i use the approach of locking a class variable, disposing of the handle to the subscription, then resubscribing and assigning the new subscription to the handle (that was just disposed). 

    I am not sure if this is causing a threading issue or not, since the app in question will crash sporadically without throwing an exception.

     

    Here is a bit of psuedo-code that illustrated the example:

     

    object sync = new object();
    IDisposable subscriptionHandle;
    
    
    void RestartSubscription()
    {
      lock (sync)
      {
        // This code happens on one thread
    
        if (subscriptionHandle != null)
          subscriptionHandle.Dispose();
    
        subscriptionHandle = Observable.Timer(100ms).Subscribe(_ =>
        {
          // this code happens on a different thread. Could that cause problems when I try to cancel the subscription as above?
    
          DoSomeStuff();
        });
      }
    }
     



    • Edited by Arash Emami Wednesday, August 10, 2011 9:19 PM added code
    Wednesday, August 10, 2011 8:17 PM

Answers

  • You can use `System.Reactive.Disposables.SerialDisposable` as one option:

    SerialDisposable subscriptionHandle = new SerialDisposable();
    
    void RestartSubscription()
    {
    	subscriptionHandle.Disposable =
    		Observable.Timer(TimeSpan.FromMilliseconds(100.0)).Subscribe(_ =>
    		{
    			DoSomeStuff();
    		});
    }
    

    No need for locks using `SerialDisposable`.

    Another way is to use the `Switch` extension method. Switch takes an `IObservable<IObservable<T>>` and turns it into an `IObservable<T>` by automatically subscribing to the inner observable when the outer observable produces it and unsubscribes from the previous one. If you use `Switch` and if you don't need to manually cancel your observable then you don't need to keep a subscription reference.

    	someOtherObservable
    		.Select(x => Observable.Timer(TimeSpan.FromMilliseconds(100.0)))
    		.Switch()
    		.Subscribe(_ =>
    		{
    			DoSomeStuff();
    		});
    


    The `someOtherObservable` can be any observable. You could use `FromEventPattern` to get a button click observable so that a user could manually reset. Or you could use a `Subject` and programmatically resubscribe.

    Do these options help?


    James C-S
    • Marked as answer by Arash Emami Thursday, August 11, 2011 12:55 AM
    Thursday, August 11, 2011 12:21 AM

All replies

  • You can use `System.Reactive.Disposables.SerialDisposable` as one option:

    SerialDisposable subscriptionHandle = new SerialDisposable();
    
    void RestartSubscription()
    {
    	subscriptionHandle.Disposable =
    		Observable.Timer(TimeSpan.FromMilliseconds(100.0)).Subscribe(_ =>
    		{
    			DoSomeStuff();
    		});
    }
    

    No need for locks using `SerialDisposable`.

    Another way is to use the `Switch` extension method. Switch takes an `IObservable<IObservable<T>>` and turns it into an `IObservable<T>` by automatically subscribing to the inner observable when the outer observable produces it and unsubscribes from the previous one. If you use `Switch` and if you don't need to manually cancel your observable then you don't need to keep a subscription reference.

    	someOtherObservable
    		.Select(x => Observable.Timer(TimeSpan.FromMilliseconds(100.0)))
    		.Switch()
    		.Subscribe(_ =>
    		{
    			DoSomeStuff();
    		});
    


    The `someOtherObservable` can be any observable. You could use `FromEventPattern` to get a button click observable so that a user could manually reset. Or you could use a `Subject` and programmatically resubscribe.

    Do these options help?


    James C-S
    • Marked as answer by Arash Emami Thursday, August 11, 2011 12:55 AM
    Thursday, August 11, 2011 12:21 AM
  • Going to try out SerialDisposable, looks very good from what is doing in Reflector.  Thank you for help with both questions James.
    Thursday, August 11, 2011 12:55 AM
  • Hi,

    Just make sure that DoSomeStuff doesn't call RestartSubscription; otherwise, you'll prematurely dispose of the latest subscription, without disposing of the former subscription.  You can use double indirection to avoid this.

    Rxx provides an extension method SerialDisposable.SetDisposableIndirectly that encapsulates this pattern:

    http://rxx.codeplex.com/SourceControl/changeset/view/62412#1055922

    - Dave


    http://davesexton.com/blog
    Thursday, August 11, 2011 3:37 AM