Is it possible to do Observable.Buffer on something other then time

Answered Is it possible to do Observable.Buffer on something other then time

  • Monday, March 05, 2012 3:36 PM
     
     

    I've been looking for examples on how to use Observable.Buffer in rx but cant find anything more substantial then boiler plate time buffered stuff.

    There does seem to be an overload to specify a "bufferClosingSelector" but I can't wrap my mind around it.

    What im trying to do is create a sequence that buffers by time or by an "accumulation". Consider a request stream where every request has some sort of weight to it and I do not want to process more then x accumulated weight at a time, or if not enough has accumulated just give me what has come trough in the last timeframe(regular Buffer functionality)

All Replies

  • Monday, March 05, 2012 4:46 PM
     
     Answered Has Code

    Hi,

    You could group first and then buffer each group using this simple overload that buffers by time or count.

    For example: (Untested)

    var buffersByWeight = 
    	from request in requests
    	group request by request.Weight into weighted
    	from buffer in weighted.Buffer(TimeSpan.FromSeconds(5), 100)
    	select buffer;

    Though it's good to know how Buffer (and Window) support data-based buffering/windowing as well as timer-based.  This topic has been discussed in the forum several times before.  You are correct, the overloads that have closing selector parameters do in fact allow you to create data-based buffers/windows.  If your sequence is cold, then you must Publish it first to make it hot and then you can use the published sequence as the source of the data being buffered as well as the source of the data that indicates when buffers must be closed.

    For example: (Untested)

    var buffersByAnyWeight = requests.Publish(published => 
    	published.Buffer(() =>
    		Observable.Timer(TimeSpan.FromSeconds(5))
    			.Amb(
    		from request in published
    		group request by request.Weight into weighted
    		from count in weighted.Scan(0, (acc, cur) => acc + 1)
    		where count >= 100
    		select 0L)));

    - Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Monday, March 05, 2012 4:49 PM Grammar
    • Edited by Dave Sexton Monday, March 05, 2012 9:27 PM Grammar and syntax errors
    • Marked As Answer by Dmitry Orlovsky Thursday, March 08, 2012 7:18 PM
    •  
  • Monday, March 05, 2012 8:40 PM
     
      Has Code

    Hello Dave

    Thank you very much for your prompt response but I just want to clarify whats going on in your examples. Bear with me as I'm new to rx and my comprehensions is concentrated around the functional aspects of rx i.e. I get lost when I look at rx with linq.

    What I think your doing in your examples is  grouping by weights and then selecting groups that have reached a count of 100 items or more, please correct me if I'm wrong?

    What I was looking to do was something slightly different sorry If I was unclear. 

    Say I have a sequence of 10 requests. Say they all some way before 5 seconds elapse (so time chucking is out).

    The accumulated weight of the first 3 is over 100, the next 5 add up to over 100 as well and the next 2 add up to, lets say 50. I want to generate a sequence where the first 3 come through right away, wait another timeout(not 5sec lets say 500ms, Im using your rxx AsTimer for that), then the next 5 come through, again wait 500ms, then wait till 5 seconds have elapsed to give me the remaining 2 or if there are more messages that add up to 100 with #9 and #10, give me those once they arrive/accumulate.

    I hope I'm making sense here ... 

    Any further help would be greatly appreciated.


  • Monday, March 05, 2012 10:05 PM
     
     Answered Has Code

    Hi Dmitry, 

    > What I think your doing in your examples is  grouping by weights and then selecting groups that have reached a count
    > of 100 items or more, please correct me if I'm wrong?

    You're kind of right, but leaving out a few things.

    The first query generates buffers with distinct weights when a timer elapses for a given weight, when the specified count has been reached for a given weight, or when the sequence completes, whichever comes first.  So the buffers in the generated sequence do not contain requests of mixed weights; however, buffers of different weights will be generated reactively as their timers elapse or their counts are reached.

    The second query works similarly except that it "accumlates" different weights into the same buffers.  The timer behaves the same as the first query.  However, if the number of requests for any weight reaches 100, then a single buffer is generated containing all of the requests since the previous buffer, with mixed weights.  (I should have just specified "== 100" instead of ">= 100".  To be clear, they will have the exact same behavior in this query, so either way you'll never get a buffer that has more than 100 requests of any given weight.)

    Note that I just tested them and they work as expected, though perhaps they aren't what you need.  However, my second query had a couple of syntax errors, which I've now fixed by editing my original post.  The first error was that I was passing in the closing selector as an observable, not as a function, so making it into a lambda by adding () => solved that problem.  The second error was that Amb requires observables with the same T type, so I changed Unit.Default into 0L to solve that problem.

    > What I was looking to do was something slightly different sorry If I was unclear.  [snip]

    Thanks for the clarification.  It seems that I was on the right track using Scan, but not GroupBy, to meet your "accumulation" requirement.

    The following query should be closer to what you need.

    var buffersByTimeOrWeight = requests.Publish(published =>
    	published.Buffer(() =>
    		Observable.Timer(TimeSpan.FromSeconds(6))
    			.Amb(
    		from weight in published.Scan(0, (weight, request) => weight + request.Weight)
    		where weight >= 100
    		select 0L)));

    - Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Monday, March 05, 2012 10:07 PM Grammar
    • Marked As Answer by Dmitry Orlovsky Thursday, March 08, 2012 7:18 PM
    •