Is it possible to have a dynamic buffer size/time with Rx. I have a stream of data coming in and normally I don't want to buffer this stream at all but under certain conditions I would like too. I would then like to remove this buffer when those conditions
no longer exist.
I have tried something along the lines of :-
var bufferTimeSubject = new BehaviorSubject<int>(200);
var dataStream = new Subject<long>();
var results = (from bufferTime in bufferTimeSubject.AsObservable()
select dataStream.Buffer(TimeSpan.FromMilliseconds(bufferTime)).Finally(()=> Console.WriteLine("sequence terminated"))
However when the buffer value changes the dataStream is unsubscribed and I loose some values.
I guess I could write my own buffer method that takes an IObservable<count, time> or something. I'm hoping there is an easier way using standard Rx operators though.