# Multilevel group by

### שאלה

• I have a hot stream of quotes for stocks and would like to do a multi-level grouping the equivalent of the following sql

select symbol, avg(px), sum(volume)

from quote

group by exchange, symbol

Any help will be appreciated. Below is test setup code.

void Main()
{
Subject<Quote> quotes = new Subject<Quote>();

var groupedQuotes = from q in quotes
group q by new { q.InstrumentId, q.exchange } into grp
select grp;

var subscribe = from g in groupedQuotes.Window(TimeSpan.FromSeconds(1)) select g;
subscribe.Dump();

quotes.OnNext(new Quote{InstrumentId = "A", Px = 10.10m,  exchange="O", sector="B", volume=100L});
quotes.OnNext(new Quote{InstrumentId = "B", Px = 50.10m, exchange="N", sector="B", volume=100L});
quotes.OnNext(new Quote{InstrumentId = "A", Px = 20.11m, exchange="R", sector="B", volume=100L});
quotes.OnNext(new Quote{InstrumentId = "A", Px = 26.11m, exchange="R", sector="B", volume=100L});

//Expected result
//A, O, 10.10, 100
//A, R, 23.11, 200
//B, N, 50.10, 100
}

public class Quote
{
public string InstrumentId { get; set; }
public decimal Px { get; set; }
public string symbol { get; set; }
public string sector { get; set; }
public string exchange { get; set; }
public long volume { get; set; }

}
}

• נערך על-ידי יום שני 30 אפריל 2012 17:47
יום שלישי 24 אפריל 2012 13:13

### כל התגובות

• Hi,

This isn't really a multi-level group by problem.  It's just a single group by query with a projection containing aggregates.  The trick is to project a new anonymous type for each window and to use Zip to project the aggregates into a child anonymous type to remain reactive.  However, the Average operator will fail if the window is empty (division by zero), so you must ensure there's always at least one element; I've done this below with the Merge operator so that the average value for an empty window is 0.

```from q in quotes
group q by new { q.InstrumentId, q.Exchange } into g
from window in g.Window(TimeSpan.FromSeconds(1))
select new
{
InstrumentId = g.Key.InstrumentId,
Exchange = g.Key.Exchange,
Aggregates = Observable.Zip(
window.Select(q => q.Px).Merge(window.IsEmpty().Where(i => i).Select(_ => 0M)).Average(),
window.Select(q => q.Volume).Sum(),
(avg, sum) => new { avg, sum })
};```

Edit: Here's the complete lab to illustrate how to subscribe to this query:

```using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace Rxx.Labs.Reactive
{
public sealed class GroupByMultiLevel : BaseConsoleLab
{
protected override void Main()
{
var quotes = new Subject<Quote>();

var groupedQuotes =
from q in quotes
group q by new { q.InstrumentId, q.Exchange } into g
from window in g.Window(TimeSpan.FromSeconds(1))
select new
{
InstrumentId = g.Key.InstrumentId,
Exchange = g.Key.Exchange,
Aggregates = Observable.Zip(
window.Select(q => q.Px).Merge(window.IsEmpty().Where(i => i).Select(_ => 0M)).Average(),
window.Select(q => q.Volume).Sum(),
(avg, sum) => new { avg, sum })
};

using (groupedQuotes
.Subscribe(group => group.Aggregates
.Subscribe(value => TraceLine(group.InstrumentId + ',' + group.Exchange + ',' + value.avg + ',' + value.sum))))
{
quotes.OnNext(new Quote { InstrumentId = "A", Px = 10.10m, Exchange = "O", Volume = 100L });
quotes.OnNext(new Quote { InstrumentId = "B", Px = 50.10m, Exchange = "N", Volume = 100L });
quotes.OnNext(new Quote { InstrumentId = "A", Px = 20.11m, Exchange = "R", Volume = 100L });
quotes.OnNext(new Quote { InstrumentId = "A", Px = 26.11m, Exchange = "R", Volume = 100L });

WaitForKey();
}
}

class Quote
{
public string InstrumentId { get; set; }
public decimal Px { get; set; }
public string Exchange { get; set; }
public long Volume { get; set; }
}
}
}```

- Dave

http://davesexton.com/blog

• נערך על-ידי יום שלישי 01 מאי 2012 13:03 Added complete lab
יום שלישי 01 מאי 2012 13:00
• Hi,

You could also modify the query to include a sequential projection when the Zip completes by using another from statement (SelectMany).  This is probably better because it avoids multi-level subscription.

For example: (Untested)

```from q in quotes
group q by new { q.InstrumentId, q.Exchange } into g
from window in g.Window(TimeSpan.FromSeconds(1))
from aggs in Observable.Zip(
window.Select(q => q.Px).Merge(window.IsEmpty().Where(i => i).Select(_ => 0M)).Average(),
window.Select(q => q.Volume).Sum(),
(avg, sum) => new { avg, sum })
select new
{
InstrumentId = g.Key.InstrumentId,
Exchange = g.Key.Exchange,
AveragePx = aggs.avg,
VolumeSum = aggs.sum
};```

- Dave

http://davesexton.com/blog

יום שלישי 01 מאי 2012 13:12
• Thanks Dave. One issue with both above solutions is that the average is being calculated only on the window - however I need a continous average over the stream and if the publishing window is empty the average price should not change.
יום שלישי 01 מאי 2012 14:17
• Hi,

Well that's an important piece of the spec that you left out :)

The Scan operator allows you to create a running aggregate.  Apply it to the group instead of the window.

- Dave

http://davesexton.com/blog

יום שלישי 01 מאי 2012 14:50
• Came up with this - pls let me know if you see any problems with this.

var groupedQuotes =
(from q in quotes
group q by new {q.InstrumentId, exchange = q.Exchange}).
SelectMany(q => q.Scan(new {sum = 0m, count = 0, vsum=0L},
(agg, quote) => new {sum = agg.sum + quote.Px, count = agg.count + 1, vsum = agg.vsum + quote.Volume}).Select(
agg => (new Quote() { InstrumentId = q.Key.InstrumentId,  Px=agg.sum/agg.count, Volume=agg.vsum})));

Next step is to run 200 msgs/s with 7K groups through it for a day and hope it runs efficiently

יום רביעי 02 מאי 2012 14:29
• Hi,

Looks good, though you may want to stick with one kind of syntax for clarity; e.g., query comprehension syntax:

```from quote in quotes
group quote by new { quote.InstrumentId, quote.Exchange } into quotesByIdAndExchange
from agg in quotesByIdAndExchange.Scan(
new { sum = 0m, count = 0, vsum = 0L },
(agg, quote) => new
{
sum = agg.sum + quote.Px,
count = agg.count + 1,
vsum = agg.vsum + quote.Volume
})
select new Quote()
{
InstrumentId = quotesByIdAndExchange.Key.InstrumentId,
Px = agg.sum / agg.count,
Volume = agg.vsum
};```

- Dave

http://davesexton.com/blog

• הוצע כתשובה על-ידי יום שלישי 15 מאי 2012 14:31
יום רביעי 02 מאי 2012 15:21
• True that,cleans it up. But coming from esper/streambase, the syntax there is literally

select InstrumentId, Exchange, avg(price), sum(volume) from Quotes group
by InstrumentId, Exchange

I find I need to work much harder with Reactive to achieve the same output. IMHO, I find that NOT having the concept of "OnComplete" events in other cep platforms simplified querying indeterminate length streams.

יום רביעי 02 מאי 2012 17:12
• Hi,

I don't know anything about esper/streambase, but here are some questions that might be worth asking when comparing it to Rx:

1. Are CEP queries statically typed?
2. Can CEP queries be combined easily with queries from other sequences/models?  E.g., LINQ to Objects, LINQ to XML, LINQ to Entities, etc.
3. Can CEP queries easily handle side-effects of any kind and at any time?  E.g., the Do operator.
4. Are CEP queries extensible?  E.g., custom Rx operators are pretty easy to create.

This is probably not an exhaustive list of useful rhetorical questions :)

- Dave

http://davesexton.com/blog

יום רביעי 02 מאי 2012 18:13