Monday, March 05, 2012 3:36 PM
Monday, March 05, 2012 4:46 PM
You could group first and then buffer each group using this simple overload that buffers by time or count.
For example: (Untested)
var buffersByWeight = from request in requests group request by request.Weight into weighted from buffer in weighted.Buffer(TimeSpan.FromSeconds(5), 100) select buffer;
Though it's good to know how Buffer (and Window) support data-based buffering/windowing as well as timer-based. This topic has been discussed in the forum several times before. You are correct, the overloads that have closing selector parameters do in fact allow you to create data-based buffers/windows. If your sequence is cold, then you must Publish it first to make it hot and then you can use the published sequence as the source of the data being buffered as well as the source of the data that indicates when buffers must be closed.
For example: (Untested)
var buffersByAnyWeight = requests.Publish(published => published.Buffer(() => Observable.Timer(TimeSpan.FromSeconds(5)) .Amb( from request in published group request by request.Weight into weighted from count in weighted.Scan(0, (acc, cur) => acc + 1) where count >= 100 select 0L)));
Monday, March 05, 2012 8:40 PM
Thank you very much for your prompt response but I just want to clarify whats going on in your examples. Bear with me as I'm new to rx and my comprehensions is concentrated around the functional aspects of rx i.e. I get lost when I look at rx with linq.
What I think your doing in your examples is grouping by weights and then selecting groups that have reached a count of 100 items or more, please correct me if I'm wrong?
What I was looking to do was something slightly different sorry If I was unclear.
Say I have a sequence of 10 requests. Say they all some way before 5 seconds elapse (so time chucking is out).
The accumulated weight of the first 3 is over 100, the next 5 add up to over 100 as well and the next 2 add up to, lets say 50. I want to generate a sequence where the first 3 come through right away, wait another timeout(not 5sec lets say 500ms, Im using your rxx AsTimer for that), then the next 5 come through, again wait 500ms, then wait till 5 seconds have elapsed to give me the remaining 2 or if there are more messages that add up to 100 with #9 and #10, give me those once they arrive/accumulate.
I hope I'm making sense here ...
Any further help would be greatly appreciated.
- Edited by Dmitry Orlovsky Monday, March 05, 2012 9:01 PM typos
Monday, March 05, 2012 10:05 PM
> What I think your doing in your examples is grouping by weights and then selecting groups that have reached a count
> of 100 items or more, please correct me if I'm wrong?
You're kind of right, but leaving out a few things.
The first query generates buffers with distinct weights when a timer elapses for a given weight, when the specified count has been reached for a given weight, or when the sequence completes, whichever comes first. So the buffers in the generated sequence do not contain requests of mixed weights; however, buffers of different weights will be generated reactively as their timers elapse or their counts are reached.
The second query works similarly except that it "accumlates" different weights into the same buffers. The timer behaves the same as the first query. However, if the number of requests for any weight reaches 100, then a single buffer is generated containing all of the requests since the previous buffer, with mixed weights. (I should have just specified "== 100" instead of ">= 100". To be clear, they will have the exact same behavior in this query, so either way you'll never get a buffer that has more than 100 requests of any given weight.)
Note that I just tested them and they work as expected, though perhaps they aren't what you need. However, my second query had a couple of syntax errors, which I've now fixed by editing my original post. The first error was that I was passing in the closing selector as an observable, not as a function, so making it into a lambda by adding () => solved that problem. The second error was that Amb requires observables with the same T type, so I changed Unit.Default into 0L to solve that problem.
> What I was looking to do was something slightly different sorry If I was unclear. [snip]
Thanks for the clarification. It seems that I was on the right track using Scan, but not GroupBy, to meet your "accumulation" requirement.
The following query should be closer to what you need.
var buffersByTimeOrWeight = requests.Publish(published => published.Buffer(() => Observable.Timer(TimeSpan.FromSeconds(6)) .Amb( from weight in published.Scan(0, (weight, request) => weight + request.Weight) where weight >= 100 select 0L)));