Multilevel group by
-
2012年4月24日 13:13
I have a hot stream of quotes for stocks and would like to do a multi-level grouping the equivalent of the following sql
select symbol, avg(px), sum(volume)
from quote
group by exchange, symbol
Any help will be appreciated. Below is test setup code.
void Main()
{
Subject<Quote> quotes = new Subject<Quote>();var groupedQuotes = from q in quotes
group q by new { q.InstrumentId, q.exchange } into grp
select grp;var subscribe = from g in groupedQuotes.Window(TimeSpan.FromSeconds(1)) select g;
subscribe.Dump();
quotes.OnNext(new Quote{InstrumentId = "A", Px = 10.10m, exchange="O", sector="B", volume=100L});
quotes.OnNext(new Quote{InstrumentId = "B", Px = 50.10m, exchange="N", sector="B", volume=100L});
quotes.OnNext(new Quote{InstrumentId = "A", Px = 20.11m, exchange="R", sector="B", volume=100L});
quotes.OnNext(new Quote{InstrumentId = "A", Px = 26.11m, exchange="R", sector="B", volume=100L});
//Expected result
//A, O, 10.10, 100
//A, R, 23.11, 200
//B, N, 50.10, 100
}public class Quote
{
public string InstrumentId { get; set; }
public decimal Px { get; set; }
public string symbol { get; set; }
public string sector { get; set; }
public string exchange { get; set; }
public long volume { get; set; }
}
}
- 已编辑 AtEventHorizon 2012年4月30日 17:47
全部回复
-
2012年5月1日 13:00
Hi,
This isn't really a multi-level group by problem. It's just a single group by query with a projection containing aggregates. The trick is to project a new anonymous type for each window and to use Zip to project the aggregates into a child anonymous type to remain reactive. However, the Average operator will fail if the window is empty (division by zero), so you must ensure there's always at least one element; I've done this below with the Merge operator so that the average value for an empty window is 0.
from q in quotes group q by new { q.InstrumentId, q.Exchange } into g from window in g.Window(TimeSpan.FromSeconds(1)) select new { InstrumentId = g.Key.InstrumentId, Exchange = g.Key.Exchange, Aggregates = Observable.Zip( window.Select(q => q.Px).Merge(window.IsEmpty().Where(i => i).Select(_ => 0M)).Average(), window.Select(q => q.Volume).Sum(), (avg, sum) => new { avg, sum }) };Edit: Here's the complete lab to illustrate how to subscribe to this query:
using System; using System.Reactive.Linq; using System.Reactive.Subjects; namespace Rxx.Labs.Reactive { public sealed class GroupByMultiLevel : BaseConsoleLab { protected override void Main() { var quotes = new Subject<Quote>(); var groupedQuotes = from q in quotes group q by new { q.InstrumentId, q.Exchange } into g from window in g.Window(TimeSpan.FromSeconds(1)) select new { InstrumentId = g.Key.InstrumentId, Exchange = g.Key.Exchange, Aggregates = Observable.Zip( window.Select(q => q.Px).Merge(window.IsEmpty().Where(i => i).Select(_ => 0M)).Average(), window.Select(q => q.Volume).Sum(), (avg, sum) => new { avg, sum }) }; using (groupedQuotes .Subscribe(group => group.Aggregates .Subscribe(value => TraceLine(group.InstrumentId + ',' + group.Exchange + ',' + value.avg + ',' + value.sum)))) { quotes.OnNext(new Quote { InstrumentId = "A", Px = 10.10m, Exchange = "O", Volume = 100L }); quotes.OnNext(new Quote { InstrumentId = "B", Px = 50.10m, Exchange = "N", Volume = 100L }); quotes.OnNext(new Quote { InstrumentId = "A", Px = 20.11m, Exchange = "R", Volume = 100L }); quotes.OnNext(new Quote { InstrumentId = "A", Px = 26.11m, Exchange = "R", Volume = 100L }); WaitForKey(); } } class Quote { public string InstrumentId { get; set; } public decimal Px { get; set; } public string Exchange { get; set; } public long Volume { get; set; } } } }- Dave
- 已编辑 Dave Sexton 2012年5月1日 13:03 Added complete lab
-
2012年5月1日 13:12
Hi,
You could also modify the query to include a sequential projection when the Zip completes by using another from statement (SelectMany). This is probably better because it avoids multi-level subscription.
For example: (Untested)
from q in quotes group q by new { q.InstrumentId, q.Exchange } into g from window in g.Window(TimeSpan.FromSeconds(1)) from aggs in Observable.Zip( window.Select(q => q.Px).Merge(window.IsEmpty().Where(i => i).Select(_ => 0M)).Average(), window.Select(q => q.Volume).Sum(), (avg, sum) => new { avg, sum }) select new { InstrumentId = g.Key.InstrumentId, Exchange = g.Key.Exchange, AveragePx = aggs.avg, VolumeSum = aggs.sum };- Dave
-
2012年5月1日 14:17Thanks Dave. One issue with both above solutions is that the average is being calculated only on the window - however I need a continous average over the stream and if the publishing window is empty the average price should not change.
-
2012年5月1日 14:50
Hi,
Well that's an important piece of the spec that you left out :)
The Scan operator allows you to create a running aggregate. Apply it to the group instead of the window.
- Dave
-
2012年5月2日 14:29
Came up with this - pls let me know if you see any problems with this.
var groupedQuotes =
(from q in quotes
group q by new {q.InstrumentId, exchange = q.Exchange}).
SelectMany(q => q.Scan(new {sum = 0m, count = 0, vsum=0L},
(agg, quote) => new {sum = agg.sum + quote.Px, count = agg.count + 1, vsum = agg.vsum + quote.Volume}).Select(
agg => (new Quote() { InstrumentId = q.Key.InstrumentId, Px=agg.sum/agg.count, Volume=agg.vsum})));Next step is to run 200 msgs/s with 7K groups through it for a day and hope it runs efficiently
-
2012年5月2日 15:21
Hi,
Looks good, though you may want to stick with one kind of syntax for clarity; e.g., query comprehension syntax:
from quote in quotes group quote by new { quote.InstrumentId, quote.Exchange } into quotesByIdAndExchange from agg in quotesByIdAndExchange.Scan( new { sum = 0m, count = 0, vsum = 0L }, (agg, quote) => new { sum = agg.sum + quote.Px, count = agg.count + 1, vsum = agg.vsum + quote.Volume }) select new Quote() { InstrumentId = quotesByIdAndExchange.Key.InstrumentId, Px = agg.sum / agg.count, Volume = agg.vsum };- Dave
- 已建议为答案 LeeCampbell 2012年5月15日 14:31
-
2012年5月2日 17:12
True that,cleans it up. But coming from esper/streambase, the syntax there is literally
select InstrumentId, Exchange, avg(price), sum(volume) from Quotes group
by InstrumentId, ExchangeI find I need to work much harder with Reactive to achieve the same output. IMHO, I find that NOT having the concept of "OnComplete" events in other cep platforms simplified querying indeterminate length streams.
-
2012年5月2日 18:13
Hi,
I don't know anything about esper/streambase, but here are some questions that might be worth asking when comparing it to Rx:
- Are CEP queries statically typed?
- Can CEP queries be combined easily with queries from other sequences/models? E.g., LINQ to Objects, LINQ to XML, LINQ to Entities, etc.
- Can CEP queries easily handle side-effects of any kind and at any time? E.g., the Do operator.
- Are CEP queries extensible? E.g., custom Rx operators are pretty easy to create.
This is probably not an exhaustive list of useful rhetorical questions :)
- Dave

