none
Observable.Delay calling Dispose before OnNext is fired

    Question

  • I am having problem understanding how Observable.Delay works and when the Dispose() is meant to be called. Would anyone be able to help explain please?

    The following code snippet:

      static void Main(string[] args)
      {
        var oneNumberEveryFiveSeconds = new SomeObservable();
        // Instant echo
        oneNumberEveryFiveSeconds.SubscribeOn(Scheduler.ThreadPool).Subscribe(num => Console.WriteLine(num));
        // One second delay
        oneNumberEveryFiveSeconds.Delay(TimeSpan.FromSeconds(1)).SubscribeOn(Scheduler.ThreadPool).Subscribe(num => Console.WriteLine("...{0}...", num));
        // Two second delay
        oneNumberEveryFiveSeconds.Delay(TimeSpan.FromSeconds(2)).SubscribeOn(Scheduler.ThreadPool).Subscribe(num => Console.WriteLine("......{0}......", num));
    
        Console.ReadKey();
      }
    
      public class SomeObservable : IObservable<int>
      {
        public IDisposable Subscribe(IObserver<int> o)
        {
          for (var i = 0; i < 2; i++)
          {
            o.OnNext(i);
          }
          o.OnCompleted();
    
          return new DisposableAction(() => { Console.WriteLine("DISPOSED"); });
        }
      }
    
      public class DisposableAction : IDisposable
      {
        public DisposableAction(Action dispose)
        {
          this.dispose = dispose;
        }
    
        readonly Action dispose;
    
        public void Dispose()
        {
          dispose();
        }
      }
    

    produces the below result:

    0
    1
    DISPOSED
    DISPOSED
    DISPOSED
    ...0...
    ...1...
    ......0......
    ......1......
    

    I was expecting it to be more like:

    0
    1
    DISPOSED
    ...0...
    ...1...
    DISPOSED
    ......0......
    ......1......
    DISPOSED
    

    Any idea?

    Thanks

    Wednesday, October 20, 2010 1:46 AM

Answers

  • Any observable sequence created with Rx operators will try to dispose of its resources as soon as an OnCompleted or OnError message has come through.

    What you're seeing is that inner observable sequences of the query are done and are disposing of their subscriptions. The Delay operator is still running and won't dispose of its subscriptions until  it has fired all messages to the observers subscribed to its output observable sequence.

    We'll talk about this in the rx design guidelines (they're written and are currently being reviewed by the team, should be online soon)

     

    Jeffrey

    Friday, October 22, 2010 5:15 PM

All replies

  • Hello,

    The behavior you are seeing is correct. Lets just walk through the 2 second delay.

    1. When you subscribe to the Delay observable, it in turn subscribes to your underlying source.
    2. Your underlying observable then yields 2 results.
    3. The delay observable takes these two results, and shedules them to be dispatched to you in 2 seconds time.
    3. Your underlying observable then yields OnCompleted.
    4. The delay observable takes this OnCompleted notification, and schedules it to be dispatched to you in 2 seconds time.
    5. The delay observable, after receiving the OnCompleted notification, disposes of its subscription to the underlying source (because it's complete).

    Does that make sense?


    James Miles http://enumeratethis.com/
    Wednesday, October 20, 2010 8:11 AM
  • Hi,

    I can confirm this behavior and it appears to be a bug.  I think perhaps that Delay caches values and since OnCompleted is being called immediately, it seemed reasonable to dispose of the subscription immediately as well; however, I can imagine a case where side effects in the source observable rely on subscription disposal to clean up referenced objects, breaking their invariants even before OnNext receives a reference to them.

    There seems to be some flaws with your test though, so here's one that perhaps is a bit cleaner:

    using System;
    using System.Concurrency;
    using System.Disposables;
    using System.Linq;
    
    namespace ReactiveProgrammingConsole
    {
    	class DisposeSubscriptionLab
    	{
    		static void Main()
    		{
    			Test(TimeSpan.FromSeconds(0), Scheduler.Immediate).Subscribe(Console.WriteLine);
    			Test(TimeSpan.FromSeconds(1), Scheduler.Immediate).Subscribe(Console.WriteLine);
    			Test(TimeSpan.FromSeconds(2), Scheduler.Immediate).Subscribe(Console.WriteLine);
    
    			Console.ReadKey();
    
    			Test(TimeSpan.FromSeconds(0), Scheduler.ThreadPool).Subscribe(Console.WriteLine);
    			Test(TimeSpan.FromSeconds(1), Scheduler.ThreadPool).Subscribe(Console.WriteLine);
    			Test(TimeSpan.FromSeconds(2), Scheduler.ThreadPool).Subscribe(Console.WriteLine);
    
    			Console.ReadKey();
    		}
    
    		static IObservable<string> Test(TimeSpan delay, IScheduler scheduler)
    		{
    			return Observable.CreateWithDisposable<string>(
    				observer =>
    				{
    					string name = scheduler.GetType().Name + " " + delay.ToString();
    
    					return new CompositeDisposable(
    						Disposable.Create(() => Console.WriteLine("Disposed {0}", name)),
    						Observable.Return(name, scheduler).Subscribe(observer)
    						);
    				})
    				.Delay(delay);
    		}
    	}
    }
    
    

    - Dave


    http://davesexton.com/blog
    Wednesday, October 20, 2010 8:23 AM
  • Hi Dave,

    I hadn't really thought of that. I understand what your saying, however I can't think of a senario where this would be a problem.

    Perhaps you could give an example?


    http://enumeratethis.com/
    Wednesday, October 20, 2010 10:25 AM
  • Hi James,

    Sure, here's a contrived example to illustrate.  It may still require some imagination on your part, but I think the crux of the issue is clear; i.e., when the source observable is responsible for the lifetime of its values, simply adding Delay to a query can cause unexpected things to happen.  On its own, the CreateReaders method below looks innocent enough, but adding Delay outside of it throws ObjectDisposedException in the subscriber.

    (Imagine that generating each reader must actually be an async operation, hence the need for IObservable; perhaps a DB call is required or some other long-running process to acquire the reader.)

    static void DelayDisposalExample()
    {
    	var documents = Environment.GetFolderPath(Environment.SpecialFolder.MyDocuments);
    
    	CreateReaders(new[] { 
    		Path.Combine(documents, "file1.txt"), 
    		Path.Combine(documents, "file2.txt"), 
    		Path.Combine(documents, "file3.txt")
    		})
    		.Delay(TimeSpan.FromSeconds(1))		// without this, ObjectDisposedException is not thrown
    		.Subscribe(reader => Console.WriteLine(reader.ReadLine()));	// throws ObjectDisposedException
    }
    
    static IObservable<StreamReader> CreateReaders(IEnumerable<string> files)
    {
    	return Observable.For(files,
    		file => Observable.Using(
    			() => File.OpenText(file),
    			reader => Observable.Return(reader)));
    }
    

    - Dave


    http://davesexton.com/blog
    Wednesday, October 20, 2010 3:04 PM
  • .ObserveOn(Scheduler.ThreadPool)

    Has a similar problem then

    Wednesday, October 20, 2010 3:29 PM
  • Or any combinator where you asynchronously re-introduce the object who's lifetime is being managed by the underlying observable.

    For example;

     CreateReaders(files)
        .CombineLatest(Observable.Timer(TimeSpan.FromSeconds(1)), (reader, tick) => reader)
        .Select(r => r.ReadLine())
        .Subscribe(Console.WriteLine, ex => Console.WriteLine(ex));


    http://enumeratethis.com/
    Wednesday, October 20, 2010 3:37 PM
  • Demo same problem with ObserveOn

     using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.IO;
    using System.Concurrency;

    namespace ConsoleApplication8
    {
        class Program
        {
            static void Main(string[] args)
            {
                // using issue.

                var documents = Environment.GetFolderPath(Environment.SpecialFolder.MyDocuments);

                CreateReaders(new[] {
      Path.Combine(documents, "file1.txt"),
      Path.Combine(documents, "file2.txt"),
      Path.Combine(documents, "file3.txt")
      })
                    //.Delay(TimeSpan.FromSeconds(1))
                    .ObserveOn(Scheduler.ThreadPool)
                    .Select(r => r.ReadLine())
                    .Subscribe(Console.WriteLine, ex => Console.WriteLine(ex));

                Console.ReadLine();
            }

            static IObservable<StreamReader> CreateReaders(IEnumerable<string> files)
            {
                return Observable.For(files,
                    file => Observable.Using(
                        () => File.OpenText(file),
                        reader => Observable.Return(reader)));
            }

        }
    }


    http://enumeratethis.com/
    Wednesday, October 20, 2010 3:56 PM
  • Wow that's interesting. Thanks guys!

    I wonder if this is actually by design or a bug, can anyone confirm please?

    Wednesday, October 20, 2010 10:31 PM
  • Any observable sequence created with Rx operators will try to dispose of its resources as soon as an OnCompleted or OnError message has come through.

    What you're seeing is that inner observable sequences of the query are done and are disposing of their subscriptions. The Delay operator is still running and won't dispose of its subscriptions until  it has fired all messages to the observers subscribed to its output observable sequence.

    We'll talk about this in the rx design guidelines (they're written and are currently being reviewed by the team, should be online soon)

     

    Jeffrey

    Friday, October 22, 2010 5:15 PM