none
How to control the number of instatiations when compositing observables RRS feed

  • Question

  • Hello,

    I've was tinkering with the Rx framework a bit and using the observables to work with some images. A colleague spotted that it was demonstrating some very strange memory behaviour. Here is the observable of code that exhibited it:

    var selectedImage = from item in selectedFilename
                              let filename = item.Filename
                              let image = Image.FromFile(filename)
                              let thumb = new Bitmap(image, new Size(64, 64))
                              select new
                              {
                                Item = item.Item,
                                Filename = item.Filename,
                                Image = image,
                                Thumb = thumb
                              };

    It looks very innocent, but what was happening is this was being subscribed to and filtered on a couple of times, and what was happening is for each combination of observable, the Image.FromFile() was being executed.

    My main questions are:
    a) Is this supposed to happen, and
    b) If so, is there a way to control it?

    Below is some code that exhibits it in a more easily testable fashion:

        private static void test()
        {
          Subject<int> numbers = new Subject<int>();

          var transformedNumbers = from n in numbers
                                   select new
                                   {
                                     Number = DoDebug(n)
                                   };


          var evens = from n in transformedNumbers
                      select n.Number;

          evens.Subscribe(i => Console.WriteLine("evens observer a {0}", i));
          evens.Subscribe(i => Console.WriteLine("evens observer b {0}", i));
          evens.Subscribe(i => Console.WriteLine("evens observer c {0}", i));

          foreach (var i in Enumerable.Range(0, 100))
          {
            numbers.OnNext(i);
          }
        }

        private static int DoDebug(int n)
        {
          Console.WriteLine("creating temp number {0}", n);
          return n;
        }

    When put inside a console application, and the test() method is called, what happens is the DoDebug() method is called twice for each time one of the observables are called.

    Here is the example output:

    creating temp number 0
    evens observer a 0
    creating temp number 0
    evens observer b 0
    creating temp number 0
    evens observer c 0
    creating temp number 1
    evens observer a 1
    creating temp number 1
    evens observer b 1
    creating temp number 1
    evens observer c 1
    ...
    ...

    As you can see, DoDebug() is called twice per number. If you imagine a much more complex tree of observables, and DoDebug() is a far more complicated operation, it is easy to see how this would be undesirable.

    Ideally it should be:
    creating temp number 0
    evens observer a 0
    evens observer b 0
    evens observer c 0
    creating temp number 1
    evens observer a 1
    evens observer b 1
    evens observer c 1
    ...
    ...


    I've tried a number of combinations of different ordering, filtering, but no matter how I organise things, the DoDebug() method is always called too often.

    Any ideas would be greatly appreciated,

    Kind regards,
    Phil
    Tuesday, February 2, 2010 1:06 PM

Answers

  • Hi Phil,

    What you're observing is the normal behavior. Each observer gets to observe the IObservable independently. It behaves exactly the same way as with IEnumerable. However Rx has an operator to achieve what you're trying to do: Publish.

    Use publish with the first query:

                var transformedNumbers = (from n in numbers
                                         select new
                                         {
                                             Number = DoDebug(n)
                                         }).Publish();
    Publish returns a special subject that will observe the source (your first query) and will publish the notification to its observers (3 in this case). Since there is only one observer for your original query, DoDebug is called only once.
    Tuesday, February 2, 2010 2:15 PM

All replies

  • Hi Phil,

    What you're observing is the normal behavior. Each observer gets to observe the IObservable independently. It behaves exactly the same way as with IEnumerable. However Rx has an operator to achieve what you're trying to do: Publish.

    Use publish with the first query:

                var transformedNumbers = (from n in numbers
                                         select new
                                         {
                                             Number = DoDebug(n)
                                         }).Publish();
    Publish returns a special subject that will observe the source (your first query) and will publish the notification to its observers (3 in this case). Since there is only one observer for your original query, DoDebug is called only once.
    Tuesday, February 2, 2010 2:15 PM
  • That is great, thanks. It works perfectly. are just so many little tricks to learn!
    Tuesday, February 2, 2010 2:19 PM
  • Hi fcharlon.  I am missing something below as I don't see that behavior (in rx 3.5 build) 

    1) DoDebug is called with each subscribe.
    2) Using empty Publish() will not return any results.  I require Publish(n=>n).
    3) still confused about Publish.

    static void ToHot()
    {
        var nums = new int[] {1,2,3,4 }.ToObservable();
        var ob = from n in nums
                   select new { Num = DoDebug(n) };
        var pub = ob.Publish(n=>n);
        pub.Subscribe(n=>Console.WriteLine(n.Num));
        pub.Subscribe(n=>Console.WriteLine(n.Num));
    }
    static int DoDebug(int i)
    {
        Console.WriteLine("Debug:" + i);
        return i;
    }

    Output:
    Debug:1
    1
    Debug:2
    2
    Debug:3
    3
    Debug:4
    4
    Debug:1
    1
    Debug:2
    2
    Debug:3
    3
    Debug:4
    4

    Tuesday, June 8, 2010 4:23 PM
  • Publish returns a connectable observable. You need to connect it before it will return anything.

    connectableObservable.Connect();

    Or you could use .Publish().RefCount();

    See more here; http://enumeratethis.com/2010/04/17/warm-observables-with-publish-refcount/

     

    Tuesday, June 8, 2010 4:47 PM