Split stream into groups and combinelatest
-
Monday, February 20, 2012 7:38 PM
I have 2 streams Quotes - for financial instruments and Orders for the instruments. I need a new stream with Quote.Price * OrderQty to publish anytime there is an updated quote or order. When the stream contains data for one instrument I was able to get the result using combinelatest like so
var values= quotes.CombineLatest(orders,(q,o)=>new {Value=q.Last*o.OrderQty})
- however if the streams have prices and quantities across many instruments I think I will need to use a groupby but am unable to get my head around to fashion the query. Also if data is arriving at approx 2-300 events/sec - will i need to be worried about memory or cpu (4gb - 4x1.9ghz core) using a groupby across 7-10K instruments ?
All Replies
-
Tuesday, February 21, 2012 4:17 AM
Hi,
It seems that you'll need to use Join as well. Try something like the following: (Untested)
var query = quotes.GroupBy(quote => quote.InstrumentId).Publish(quotesByInstrument => orders.GroupBy(order => order.InstrumentId).Publish(ordersByInstrument => from instrumentQuotes in quotesByInstrument join instrumentOrders in ordersByInstrument on ordersByInstrument.Where(g => g.Key == instrumentQuotes.Key) equals quotesByInstrument.Where(g => g.Key == instrumentOrders.Key) where instrumentQuotes.Key == instrumentOrders.Key from price in instrumentQuotes.CombineLatest(instrumentOrders, (quote, order) => quote.Last * order.OrderQty) select price));
- Dave
- Edited by Dave Sexton Tuesday, February 21, 2012 4:19 AM Added Publish to query
-
Tuesday, February 21, 2012 4:22 AM
Note that I just updated my previous example to use Publish. My original comment about quotes and orders being hot was actually incorrect - only the GroupBy obsevables must be hot.
- Dave
http://davesexton.com/blog
-
Tuesday, February 21, 2012 4:51 AM
>"a groupby across 7-10K instruments
GroupBy is designed precisely for this use case, otherwise you could just use a "Where">"2-300 events/sec"
Is that 2-300 events/sec per instrument or total?It is impossible to say with out benchmarking the application, however 300 events/sec is nothing. I've written applications using Rx that handle many thousands of events per second and it's hardly breaking a sweat.
James Miles http://enumeratethis.com
- Edited by James Miles Tuesday, February 21, 2012 5:02 AM
-
Tuesday, February 21, 2012 5:07 AM
I think the inner grouped stream might need some sort of caching.
Also I don't think the join is going to work.
*Update* I think the join is fine... however the combine latest misses notifications
James Miles http://enumeratethis.com
- Edited by James Miles Tuesday, February 21, 2012 5:21 AM
-
Tuesday, February 21, 2012 5:25 AMI think we somehow need Replay(1) on both instrumentQuotes & instrumentOrders
James Miles http://enumeratethis.com
-
Tuesday, February 21, 2012 5:36 AM
This "works" but.... MY EYES! ;)
var query = quotes.GroupBy(quote => quote.InstrumentId).Select(g => { var x = new {g.Key, Stream = g.Replay(1)}; x.Stream.Connect(); return x; }).Publish(quotesByInstrument => orders.GroupBy(order => order.InstrumentId).Select(g => { var x = new {g.Key, Stream = g.Replay(1)}; x.Stream.Connect(); return x; }).Publish(ordersByInstrument => from instrumentQuotes in quotesByInstrument join instrumentOrders in ordersByInstrument on ordersByInstrument.Where(g => g.Key == instrumentQuotes.Key) equals quotesByInstrument.Where(g => g.Key == instrumentOrders.Key) where instrumentQuotes.Key == instrumentOrders.Key from price in instrumentQuotes.Stream.CombineLatest(instrumentOrders.Stream, (quote, order) => new{ quote, order}) select price));
James Miles http://enumeratethis.com
-
Tuesday, February 21, 2012 5:37 AM
void Main() { Subject quotes = new Subject(); Subject orders = new Subject(); var query = quotes.GroupBy(quote => quote.InstrumentId).Select(g => { var x = new {g.Key, Stream = g.Replay(1)}; x.Stream.Connect(); return x; }).Publish(quotesByInstrument => orders.GroupBy(order => order.InstrumentId).Select(g => { var x = new {g.Key, Stream = g.Replay(1)}; x.Stream.Connect(); return x; }).Publish(ordersByInstrument => from instrumentQuotes in quotesByInstrument join instrumentOrders in ordersByInstrument on ordersByInstrument.Where(g => g.Key == instrumentQuotes.Key) equals quotesByInstrument.Where(g => g.Key == instrumentOrders.Key) where instrumentQuotes.Key == instrumentOrders.Key from price in instrumentQuotes.Stream.CombineLatest(instrumentOrders.Stream, (quote, order) => new{ quote, order}) select price)); query.Dump(); quotes.OnNext(new Quote{InstrumentId = "DOWJ", Last = 9998.10m}); orders.OnNext(new Order{InstrumentId = "DOWJ", OrderQty = 10000}); quotes.OnNext(new Quote{InstrumentId = "FTSE", Last = 5678.10m}); quotes.OnNext(new Quote{InstrumentId = "FTSE", Last = 5678.11m}); orders.OnNext(new Order{InstrumentId = "FTSE", OrderQty = 2000}); orders.OnNext(new Order{InstrumentId = "FTSE", OrderQty = 1000}); orders.OnNext(new Order{InstrumentId = "DOWJ", OrderQty = 500}); } public class Quote { public string InstrumentId { get; set; } public decimal Last { get; set; } } public class Order { public string InstrumentId { get; set; } public int OrderQty { get; set; } }(with sample data)
James Miles http://enumeratethis.com
- Edited by James Miles Tuesday, February 21, 2012 5:37 AM
- Marked As Answer by AtEventHorizon Tuesday, February 21, 2012 11:32 PM
-
Tuesday, February 21, 2012 6:34 AM
Hi James,
Yep, I see that my original query is missing Replay(1). Thanks for the correction.
It might be nice for Rx to have GroupBy and GroupByUntil overloads that accept a multicast parameter. I can add them into Rxx for now as well - thoughts?.
Here's a working example, though for the GroupByUntil implementations (not shown here) the connection disposable should probably be attached to the lifetime of the group. I guess there's no place to attach it for GroupBy.
using System; using System.Reactive.Linq; using System.Reactive.Subjects; namespace Rxx.Labs.Reactive { public sealed class GroupByJoinCombineLatest : BaseConsoleLab { protected override void Main() { var quotes = new Subject<Quote>(); var orders = new Subject<Order>(); var query = quotes.GroupBy(quote => quote.InstrumentId, _ => new ReplaySubject<Quote>(1)).Publish(quotesByInstrument => orders.GroupBy(order => order.InstrumentId, _ => new ReplaySubject<Order>(1)).Publish(ordersByInstrument => from instrumentQuotes in quotesByInstrument join instrumentOrders in ordersByInstrument on ordersByInstrument.Where(g => g.Key == instrumentQuotes.Key) equals quotesByInstrument.Where(g => g.Key == instrumentOrders.Key) where instrumentQuotes.Key == instrumentOrders.Key from price in instrumentQuotes.CombineLatest(instrumentOrders, (quote, order) => quote.Last * order.OrderQty) select price)); using (query.Subscribe(price => TraceLine(price))) { quotes.OnNext(new Quote { InstrumentId = "DOWJ", Last = 9998.10m }); orders.OnNext(new Order { InstrumentId = "DOWJ", OrderQty = 10000 }); quotes.OnNext(new Quote { InstrumentId = "FTSE", Last = 5678.10m }); quotes.OnNext(new Quote { InstrumentId = "FTSE", Last = 5678.11m }); orders.OnNext(new Order { InstrumentId = "FTSE", OrderQty = 2000 }); orders.OnNext(new Order { InstrumentId = "FTSE", OrderQty = 1000 }); orders.OnNext(new Order { InstrumentId = "DOWJ", OrderQty = 500 }); WaitForKey(); } } public class Quote { public string InstrumentId { get; set; } public decimal Last { get; set; } } public class Order { public string InstrumentId { get; set; } public int OrderQty { get; set; } } } public static partial class Observable3 { public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>( this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TKey, ISubject<TSource>> subjectSelector) { return source .GroupBy(keySelector) .Select(g => new MulticastGroupedObservable<TKey, TSource>(g, subjectSelector(g.Key))); } private sealed class MulticastGroupedObservable<TKey, TSource> : IGroupedObservable<TKey, TSource> { public TKey Key { get; private set; } private readonly IObservable<TSource> observable; public MulticastGroupedObservable(IGroupedObservable<TKey, TSource> observable, ISubject<TSource> subject) { var multicasted = observable.Multicast(subject); this.Key = observable.Key; this.observable = multicasted; multicasted.Connect(); } public IDisposable Subscribe(IObserver<TSource> observer) { return observable.Subscribe(observer); } } } }- Dave
http://davesexton.com/blog
-
Tuesday, February 21, 2012 6:55 AMYup - sounds like a good Rxx operator.
James Miles http://enumeratethis.com
-
Tuesday, February 21, 2012 11:36 PM
James
Am glad I asked on this forum - its going to take me a few days to even grok the answer and Dave's initial approach. In the meantime, I will plugin the solution and see how it scales.
Thanks
-
Wednesday, February 22, 2012 6:18 AM
NOTE:
Something to be aware of with this solution.
The way you have spec'd this, you are ALWAYS doing this for ALL instruments. If that is what you want, I *think* this will perform OK.
Sometimes you need a system where you only run a "transient query" for a given instrument. Here is an example of how this can be achieved.
using System; using System.Reactive.Linq; using System.Reactive.Subjects; namespace ConsoleApplication13 { class Program { public class Quote { public string Instrument { get; set; } public decimal Price { get; set; } } public class Order { public string Instrument { get; set; } public int Qty { get; set; } } static void Main() { var quotes = new Subject<Quote>(); var orders = new Subject<Order>(); var groupedQuotes = quotes.GroupBy(q => q.Instrument, _ => new ReplaySubject<Quote>(1)).Replay(); var groupedOrders = orders.GroupBy(o => o.Instrument, _ => new ReplaySubject<Order>(1)).Replay(); var ftseQuotes = from grp in groupedQuotes where grp.Key == "FTSE" from quote in grp select quote; var ftseOrders = from grp in groupedOrders where grp.Key == "FTSE" from order in grp select order; Console.WriteLine("Subscribing FTSE"); var ftseSubscription = ftseOrders.CombineLatest(ftseQuotes, (order, quote) => order.Qty * quote.Price) .Subscribe(x => Console.WriteLine("ftse: " + x)); groupedOrders.Connect(); groupedQuotes.Connect(); quotes.OnNext(new Quote { Instrument = "DOWJ", Price = 9998.10m }); orders.OnNext(new Order { Instrument = "DOWJ", Qty = 10000 }); quotes.OnNext(new Quote { Instrument = "FTSE", Price = 5678.10m }); quotes.OnNext(new Quote { Instrument = "FTSE", Price = 5678.11m }); orders.OnNext(new Order { Instrument = "FTSE", Qty = 2000 }); Console.WriteLine("Disposing FTSE"); ftseSubscription.Dispose(); var dowjQuotes = from grp in groupedQuotes where grp.Key == "DOWJ" from quote in grp select quote; var dowjOrders = from grp in groupedOrders where grp.Key == "DOWJ" from order in grp select order; Console.WriteLine("Subscribing DOWJ"); var dowjSubscription = dowjOrders.CombineLatest(dowjQuotes, (order, quote) => order.Qty * quote.Price) .Subscribe(x => Console.WriteLine("dowj: " + x)); orders.OnNext(new Order { Instrument = "FTSE", Qty = 1000 }); orders.OnNext(new Order { Instrument = "DOWJ", Qty = 500 }); Console.WriteLine("Disposing DOWJ"); dowjSubscription.Dispose(); } } public static partial class Observable3 { public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>( this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TKey, ISubject<TSource>> subjectSelector) { return source .GroupBy(keySelector) .Select(g => new MulticastGroupedObservable<TKey, TSource>(g, subjectSelector(g.Key))); } private sealed class MulticastGroupedObservable<TKey, TSource> : IGroupedObservable<TKey, TSource> { public TKey Key { get; private set; } private readonly IObservable<TSource> observable; public MulticastGroupedObservable(IGroupedObservable<TKey, TSource> observable, ISubject<TSource> subject) { var multicasted = observable.Multicast(subject); this.Key = observable.Key; this.observable = multicasted; multicasted.Connect(); } public IDisposable Subscribe(IObserver<TSource> observer) { return observable.Subscribe(observer); } } } }
Hope that makes sense!
James Miles http://enumeratethis.com

