Dynamic Buffer


    Is it possible to have a dynamic buffer size/time with Rx. I have a stream of data coming in and normally I don't want to buffer this stream at all but under certain conditions I would like too. I would then like to remove this buffer when those conditions no longer exist.

    I have tried something along the lines of :-

                var bufferTimeSubject = new BehaviorSubject<int>(200);
                var dataStream = new Subject<long>();
                var results = (from bufferTime in bufferTimeSubject.AsObservable()
                              select dataStream.Buffer(TimeSpan.FromMilliseconds(bufferTime)).Finally(()=> Console.WriteLine("sequence terminated"))

    However when the buffer value changes the dataStream is unsubscribed and I loose some values.

    I guess I could write my own buffer method that takes an IObservable<count, time> or something. I'm hoping there is an easier way using standard Rx operators though.

    Thanks for any help

    Friday, October 11, 2013 4:11 PM

