none
Rx pipelines with bounded queue

    Question

  • Hey guys,

    I'm having the common problems of fast-producer/Slow-consumer in my project.

    Just consider the subscription in the following code as my slow-consumer. 

                var obs = new Subject<int>();
                var disi = obs
                    .ObserveOn(Scheduler.NewThread)
                    .Subscribe(x =>
                {
                    if (x < 10)
                        Thread.Sleep(1000);
                    Console.WriteLine(string.Format("Hello it's {0}", x));
    
                }, o =>
                {
                }
           );
                for (int i = 0; i < 100000; i++)
                {
                    obs.OnNext(i);
                }
    
    
                Console.WriteLine("I'm Done!");
                Console.ReadKey();

    Obviously, "I'm Done!" will be printed first and then the numbers start to appear, so far so good.

    first, does that mean all 100000 int are somewhere in a Queue?

    if so, what if we have big chunks of data (e.g. 1 MB) instead of a simple int, and we only have limited memory to assign to the whole process?

    I think I'm missing something or maybe looking at it from the wrong angle.

    Cheers,

    Mohsen

    Tuesday, July 15, 2014 1:38 PM

All replies

  • Your assumption is correct. The ObserveOn operator has an internal queue so that it can safely serialize your OnNext calls onto the provided scheduler.

    As you said, this is a classic problem. You have many options to choose from:

    1. Enqueue values that are produced faster than the consumer can process them, which is what ObserveOn is doing. (Buffer)
    2. You might only take the most recent value, or aggregate all buffered data (count/sum/average) or apply another algorithm to deal with bursts of data. (Conflation)
    3. Block the producer to stop it producing data. (Backpressure)

    Each of these will have different impacts on your application and some are more appropriate for various solutions. Currently Rx doesn't support the Backpressure option. I would suggest, that if you did want the backpressure option, then you may want to consider a different technology to Rx, maybe the Disruptor of the Blocking Collections from TPL. If you really want to stick with Rx, then I would suggest creating your own version of the ObserveOn operator that still takes an IScheduler but also takes a integer for the max queue length.


    Lee Campbell http://LeeCampbell.blogspot.com

    Friday, July 25, 2014 4:14 PM