locked
Canceling an asynchronous operation RRS feed

  • Question

  • I've seen references to canceling operations in Rx on blogs and in the videos, but haven't been able to find a description/sample of how to do this.

    If I have something like this:

    var result = Observable.FromAsyncPattern<string>(BeginDoSomething, EndDoSomething);
    result.Subscribe(something =>
    {
        // Process when something is done, don't want this code executed if I cancel
    }

    but at some point during the processing decide I need to cancel/abandon the processing, is there a mechanism to either actively cancel (stop processing) or at least unsubscribe/ignore the results so that the subscribe handler will not be called?

    NOTE: I don't necessarily have to use FromAsyncPattern, I can rewrite the async code itself to fit a different pattern (such as deriving from a Subject class?)
    Monday, December 28, 2009 8:32 PM

Answers

  • I suppose you can dispose the observer to cancel the processing.

    var result = Observable.FromAsyncPattern<string>(BeginDoSomething, EndDoSomething);
    IDisposable registration = result.Subscribe(something =>
    {
        // Process when something is done, don't want this code executed if I cancel
    }

    registration.Dispose() // cancel

    Benjamin
    Tuesday, December 29, 2009 9:43 AM

All replies

  • I suppose you can dispose the observer to cancel the processing.

    var result = Observable.FromAsyncPattern<string>(BeginDoSomething, EndDoSomething);
    IDisposable registration = result.Subscribe(something =>
    {
        // Process when something is done, don't want this code executed if I cancel
    }

    registration.Dispose() // cancel

    Benjamin
    Tuesday, December 29, 2009 9:43 AM
  • Hi Andy,

    you might also have a look at the System.Threading.CancellationTokenSource class that will be available natively in .Net 4 and is back-ported with the Rx framework to 3.5.

    This little sample might help:

    IObservable<int> ob =
        Observable.CreateWithDisposable<int>(o =>
        {
            var cancel = new CancellationDisposable(); // internally creates a new CancellationTokenSource
            Scheduler.Later.Schedule(() =>
            {
                int i = 0;
                for (; ; )
                {
                    Thread.Sleep(200);  // here we do the long lasting background operation
                    if (!cancel.Token.IsCancellationRequested)    // check cancel token periodically
                        o.OnNext(i++);
                    else
                    {
                        Console.WriteLine("Aborting because cancel event was signaled!");
                        o.OnCompleted();
                        return;
                    }
                }
            }
            );
    
            return cancel;
        }
        );
    
    IDisposable subscription = ob.Subscribe(i => Console.WriteLine(i));
    Console.WriteLine("Press any key to cancel");
    Console.ReadKey();
    subscription.Dispose();
    Console.WriteLine("Press any key to quit");
    Console.ReadKey();  // give background thread chance to write the cancel acknowledge message

    I will also add it to the http://rxwiki.wikidot.com/101samples page if something similar is missing there.

    Andreas Köpf
    • Edited by Andreas Köpf Tuesday, December 29, 2009 9:09 PM corrected sample
    Tuesday, December 29, 2009 8:11 PM
  • In simple scenarios as in my last sample that simply poll for a boolean value it is of course sufficient to use a BooleanDisposable object. If other parts of the infrastructure require a CancellationToken or if you can wait for incoming events (such as socket accepts) and the CancellationToken.WaitHandle together with WaitHandle WaitHandle.WaitAny() to allow a quick cancellation it makes sense to use the more heavy-weight CancellationToken "system".

    Here is my last mini-sample using BooleanDisposable instead of CancellationDisposable:

    IObservable<int> ob =
        Observable.CreateWithDisposable<int>(o =>
            {
                var cancel = new BooleanDisposable(); // will set the wrapped bool to true on Dispose()
                Scheduler.Later.Schedule(() =>
                {
                    int i = 0;
                    for (; ; )
                    {
                        Thread.Sleep(200);  // simulate a long lasting background operation
                        if (!cancel.IsDisposed)    // check (poll) if the BooleanDisposable was disposed
                            o.OnNext(i++);
                        else
                        {
                            Console.WriteLine("Cancel signal acknowledged");
                            o.OnCompleted();
                            return;
                        }
                    }
                }
                );
    
                return cancel;
            }
        );
    
    IDisposable subscription = ob.Subscribe(i => Console.WriteLine(i));
    Console.WriteLine("Press any key to cancel");
    Console.ReadKey();
    subscription.Dispose();
    Console.WriteLine("Press any key to quit");
    Console.ReadKey();  // give background thread chance to write the cancel acknowledge message

    Wednesday, December 30, 2009 8:50 AM