none
Rx pipelines with bounded queue

    Question

  • Hey guys,

    I'm having the common problems of fast-producer/Slow-consumer in my project.

    Just consider the subscription in the following code as my slow-consumer. 

                var obs = new Subject<int>();
                var disi = obs
                    .ObserveOn(Scheduler.NewThread)
                    .Subscribe(x =>
                {
                    if (x < 10)
                        Thread.Sleep(1000);
                    Console.WriteLine(string.Format("Hello it's {0}", x));
    
                }, o =>
                {
                }
           );
                for (int i = 0; i < 100000; i++)
                {
                    obs.OnNext(i);
                }
    
    
                Console.WriteLine("I'm Done!");
                Console.ReadKey();

    Obviously, "I'm Done!" will be printed first and then the numbers start to appear, so far so good.

    first, does that mean all 100000 int are somewhere in a Queue?

    if so, what if we have big chunks of data (e.g. 1 MB) instead of a simple int, and we only have limited memory to assign to the whole process?

    I think I'm missing something or maybe looking at it from the wrong angle.

    Cheers,

    Mohsen

    Tuesday, July 15, 2014 1:38 PM

All replies

  • Your assumption is correct. The ObserveOn operator has an internal queue so that it can safely serialize your OnNext calls onto the provided scheduler.

    As you said, this is a classic problem. You have many options to choose from:

    1. Enqueue values that are produced faster than the consumer can process them, which is what ObserveOn is doing. (Buffer)
    2. You might only take the most recent value, or aggregate all buffered data (count/sum/average) or apply another algorithm to deal with bursts of data. (Conflation)
    3. Block the producer to stop it producing data. (Backpressure)

    Each of these will have different impacts on your application and some are more appropriate for various solutions. Currently Rx doesn't support the Backpressure option. I would suggest, that if you did want the backpressure option, then you may want to consider a different technology to Rx, maybe the Disruptor of the Blocking Collections from TPL. If you really want to stick with Rx, then I would suggest creating your own version of the ObserveOn operator that still takes an IScheduler but also takes a integer for the max queue length.


    Lee Campbell http://LeeCampbell.blogspot.com

    Friday, July 25, 2014 4:14 PM
  • I made a custom filter (Pipe<TSource,TResult>) like this, to represent a generic pipeline stage with feedback control so that we can control the maximum number of unconsumed data buffered inside the pipe. One or more pipes can be connected to form a pipeline.

    The code is below. Feedbak control is done using IPulser interface. Pulse is given to the previous stage pipe every time we consume a data, so that the previous stage pipe can know when to start generating a new data.
     
        public interface IPulser {
            void Pulse();
        }
    
        public static class Extensions {
    
            public class Pulser : IPulser {
                private Subject<bool> subject;
                public IObservable<bool> Sequence { get { return subject;  } }
                public Pulser() { subject = new Subject<bool>(); }
                public void Pulse() { subject.OnNext(true); }
            }
        
            public static IObservable<TResult> Pipe<TSource,TResult>
                ( this IObservable<TSource> source
                , int no_buffered_items
                , IScheduler scheduler
                , Func<TSource,Task<TResult>> process
                , IPulser prev_pulser
                , out IPulser pulser
                ) 
            {
                // check parameters
                if (no_buffered_items <= 0) throw new Exception("invalid param");
                // Create pulser for this stage
                var p = new Pulser();
                pulser = p;
                // Create observable to return
                return Observable.Create<TResult>(observer => { // subscription handler
                    // create output sequence
                    var output = source
                        // Zip source and pulser sequence
                        .Zip(p.Sequence, (s, t) => s)
                        .ObserveOn(scheduler)
                        // pulse previous pipeline stage every time we pull item from the source
                        .Do(m => { if(prev_pulser != null) prev_pulser.Pulse(); })
                        // process the source item
                        .Select(s => process(s).ToObservable())
                        .Concat()
                        .ObserveOn(scheduler)
                        // send to subscriber
                        .Subscribe(
                            n => { observer.OnNext(n); }
                            ,
                            e => { observer.OnError(e); }
                            ,
                            () => { observer.OnCompleted(); }
                        )
                        ;
                    // Initially pulse the stage with the size of buffer
                    Observable.Range(0, no_buffered_items).Subscribe(m => { p.Pulse(); });
                    // return output observable
                    return output;
                });
            }
        }
    

    Here is the example code which uses the above filter.

        class Program {
            static EventLoopScheduler scheduler1 = new EventLoopScheduler();
            static EventLoopScheduler scheduler2 = new EventLoopScheduler();
            static EventLoopScheduler scheduler3 = new EventLoopScheduler();
            static EventLoopScheduler scheduler4 = new EventLoopScheduler();
    
            static int Process(int id) {
                var sleepMs = 100 * id * (new Random()).Next(5);
                Thread.Sleep(sleepMs);
                return id;
            }
            static Task<int> ProcessAsync(int id) {
                return Task.Run<int>(() => { return Process(id); });
            }
    
            static void Main(string[] args) {
                Console.WriteLine("[{0}] MAIN THREAD ", Thread.CurrentThread.ManagedThreadId);
    
                IPulser p1, p2, p3;
                var ret = Observable.Range(0, 10)
                    .Pipe<int, int>(2, scheduler1, m => ProcessAsync(m), null, out p1)
                    .Do(m => { Console.WriteLine("[{0}] Pipe-1: {1}", Thread.CurrentThread.ManagedThreadId, m); })
                    .Pipe<int, int>(2, scheduler2, m => ProcessAsync(m), p1, out p2)
                    .Do(m => { Console.WriteLine("  [{0}] Pipe-2: {1}", Thread.CurrentThread.ManagedThreadId, m); })
                    .Pipe<int, int>(5, scheduler3, m => ProcessAsync(m), p2, out p3)
                    .Do(m => { Console.WriteLine("    [{0}] Pipe-3: {1}", Thread.CurrentThread.ManagedThreadId, m); })
                    .ObserveOn(scheduler4)
                    .Subscribe(
                        n => {
                            p3.Pulse(); 
                            Console.WriteLine("      [{0}] @ Result: {1}", Thread.CurrentThread.ManagedThreadId, n);
                        }
                        ,
                        e => { Console.WriteLine("[{0}] OnError: {1}", Thread.CurrentThread.ManagedThreadId, e); }
                        ,
                        () => { Console.WriteLine("[{0}] OnCompleted", Thread.CurrentThread.ManagedThreadId); }
                        )
                   ;
    
                Console.ReadLine();
                ret.Dispose();
            }
        }
    

    Friday, August 15, 2014 9:25 AM