Answered Split stream into groups and combinelatest

  • Monday, February 20, 2012 7:38 PM
     
     

    I have 2 streams Quotes - for financial instruments and Orders for the instruments. I need a new stream with Quote.Price * OrderQty to publish anytime there is an updated quote or order. When the stream contains data for one instrument I was able to get the result using combinelatest like so

    var values= quotes.CombineLatest(orders,(q,o)=>new {Value=q.Last*o.OrderQty})

    - however if the streams have prices and quantities across many instruments I think I will need to use a groupby but am unable to get my head around to fashion the query. Also if data is arriving at approx 2-300 events/sec - will i need to be worried about memory or cpu (4gb - 4x1.9ghz core)  using a groupby across 7-10K instruments ?

All Replies

  • Tuesday, February 21, 2012 4:17 AM
     
      Has Code

    Hi,

    It seems that you'll need to use Join as well.  Try something like the following: (Untested)

    var query = 
    	quotes.GroupBy(quote => quote.InstrumentId).Publish(quotesByInstrument => 
    	orders.GroupBy(order => order.InstrumentId).Publish(ordersByInstrument =>
    	from instrumentQuotes in quotesByInstrument
    	join instrumentOrders in ordersByInstrument
    	on ordersByInstrument.Where(g => g.Key == instrumentQuotes.Key)
    	equals quotesByInstrument.Where(g => g.Key == instrumentOrders.Key)
    	where instrumentQuotes.Key == instrumentOrders.Key
    	from price in instrumentQuotes.CombineLatest(instrumentOrders, (quote, order) => quote.Last * order.OrderQty)
    	select price));

    - Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Tuesday, February 21, 2012 4:19 AM Added Publish to query
    •  
  • Tuesday, February 21, 2012 4:22 AM
     
     

    Note that I just updated my previous example to use Publish.  My original comment about quotes and orders being hot was actually incorrect - only the GroupBy obsevables must be hot.

    - Dave


    http://davesexton.com/blog

  • Tuesday, February 21, 2012 4:51 AM
     
     

    >"a groupby across 7-10K instruments

    GroupBy is designed precisely for this use case, otherwise you could just use a "Where"

    >"2-300 events/sec"

    Is that 2-300 events/sec per instrument or total?

    It is impossible to say with out benchmarking the application, however 300 events/sec is nothing. I've written applications using Rx that handle many thousands of events per second and it's hardly breaking a sweat.



    James Miles http://enumeratethis.com



    • Edited by James Miles Tuesday, February 21, 2012 5:02 AM
    •  
  • Tuesday, February 21, 2012 5:07 AM
     
     

    I think the inner grouped stream might need some sort of caching.

    Also I don't think the join is going to work.

    *Update* I think the join is fine... however the combine latest misses notifications


    James Miles http://enumeratethis.com



    • Edited by James Miles Tuesday, February 21, 2012 5:21 AM
    •  
  • Tuesday, February 21, 2012 5:25 AM
     
     
    I think we somehow need Replay(1) on both instrumentQuotes & instrumentOrders 

    James Miles http://enumeratethis.com

  • Tuesday, February 21, 2012 5:36 AM
     
      Has Code

    This "works" but.... MY EYES! ;)

    var query = 
    	quotes.GroupBy(quote => quote.InstrumentId).Select(g => 
    	{	
    		var x = new {g.Key, Stream = g.Replay(1)};
    		x.Stream.Connect();
    		return x;
    	}).Publish(quotesByInstrument => 
    	orders.GroupBy(order => order.InstrumentId).Select(g => 
    	{	
    		var x = new {g.Key, Stream = g.Replay(1)};
    		x.Stream.Connect();
    		return x;
    	}).Publish(ordersByInstrument =>
    	from instrumentQuotes in quotesByInstrument
    	join instrumentOrders in ordersByInstrument
    	on ordersByInstrument.Where(g => g.Key == instrumentQuotes.Key)
    	equals quotesByInstrument.Where(g => g.Key == instrumentOrders.Key)
    	where instrumentQuotes.Key == instrumentOrders.Key
    		from price in instrumentQuotes.Stream.CombineLatest(instrumentOrders.Stream, (quote, order) => new{ quote, order})
    	select price));


    James Miles http://enumeratethis.com

  • Tuesday, February 21, 2012 5:37 AM
     
     Answered Has Code
    void Main()
    {
    	Subject quotes = new Subject();
    	Subject orders = new Subject();
    	
    var query = 
    	quotes.GroupBy(quote => quote.InstrumentId).Select(g => 
    	{	
    		var x = new {g.Key, Stream = g.Replay(1)};
    		x.Stream.Connect();
    		return x;
    	}).Publish(quotesByInstrument => 
    	orders.GroupBy(order => order.InstrumentId).Select(g => 
    	{	
    		var x = new {g.Key, Stream = g.Replay(1)};
    		x.Stream.Connect();
    		return x;
    	}).Publish(ordersByInstrument =>
    	from instrumentQuotes in quotesByInstrument
    	join instrumentOrders in ordersByInstrument
    	on ordersByInstrument.Where(g => g.Key == instrumentQuotes.Key)
    	equals quotesByInstrument.Where(g => g.Key == instrumentOrders.Key)
    	where instrumentQuotes.Key == instrumentOrders.Key
    		from price in instrumentQuotes.Stream.CombineLatest(instrumentOrders.Stream, (quote, order) => new{ quote, order})
    	select price));
    	
    query.Dump();
    			
    	quotes.OnNext(new Quote{InstrumentId = "DOWJ", Last = 9998.10m});
    	orders.OnNext(new Order{InstrumentId = "DOWJ", OrderQty = 10000});
    	quotes.OnNext(new Quote{InstrumentId = "FTSE", Last = 5678.10m});
    	quotes.OnNext(new Quote{InstrumentId = "FTSE", Last = 5678.11m});
    	orders.OnNext(new Order{InstrumentId = "FTSE", OrderQty = 2000});
    	orders.OnNext(new Order{InstrumentId = "FTSE", OrderQty = 1000});
    	orders.OnNext(new Order{InstrumentId = "DOWJ", OrderQty = 500});
    }
    
    public class Quote
    {
    	public string InstrumentId { get; set; }
    	public decimal Last { get; set; }
    }
    
    public class Order
    {
    	public string InstrumentId { get; set; }
    	public int OrderQty { get; set; }
    }
    (with sample data)

    James Miles http://enumeratethis.com


    • Edited by James Miles Tuesday, February 21, 2012 5:37 AM
    • Marked As Answer by AtEventHorizon Tuesday, February 21, 2012 11:32 PM
    •  
  • Tuesday, February 21, 2012 6:34 AM
     
      Has Code

    Hi James,

    Yep, I see that my original query is missing Replay(1).  Thanks for the correction.

    It might be nice for Rx to have GroupBy and GroupByUntil overloads that accept a multicast parameter.  I can add them into Rxx for now as well - thoughts?.

    Here's a working example, though for the GroupByUntil implementations (not shown here) the connection disposable should probably be attached to the lifetime of the group.  I guess there's no place to attach it for GroupBy.

    using System;
    using System.Reactive.Linq;
    using System.Reactive.Subjects;
    
    namespace Rxx.Labs.Reactive
    {
    	public sealed class GroupByJoinCombineLatest : BaseConsoleLab
    	{
    		protected override void Main()
    		{
    			var quotes = new Subject<Quote>();
    			var orders = new Subject<Order>();
    
    			var query =
    				quotes.GroupBy(quote => quote.InstrumentId, _ => new ReplaySubject<Quote>(1)).Publish(quotesByInstrument =>
    				orders.GroupBy(order => order.InstrumentId, _ => new ReplaySubject<Order>(1)).Publish(ordersByInstrument =>
    				from instrumentQuotes in quotesByInstrument
    				join instrumentOrders in ordersByInstrument
    				on ordersByInstrument.Where(g => g.Key == instrumentQuotes.Key)
    				equals quotesByInstrument.Where(g => g.Key == instrumentOrders.Key)
    				where instrumentQuotes.Key == instrumentOrders.Key
    				from price in instrumentQuotes.CombineLatest(instrumentOrders, (quote, order) => quote.Last * order.OrderQty)
    				select price));
    
    			using (query.Subscribe(price => TraceLine(price)))
    			{
    				quotes.OnNext(new Quote { InstrumentId = "DOWJ", Last = 9998.10m });
    				orders.OnNext(new Order { InstrumentId = "DOWJ", OrderQty = 10000 });
    				quotes.OnNext(new Quote { InstrumentId = "FTSE", Last = 5678.10m });
    				quotes.OnNext(new Quote { InstrumentId = "FTSE", Last = 5678.11m });
    				orders.OnNext(new Order { InstrumentId = "FTSE", OrderQty = 2000 });
    				orders.OnNext(new Order { InstrumentId = "FTSE", OrderQty = 1000 });
    				orders.OnNext(new Order { InstrumentId = "DOWJ", OrderQty = 500 });
    
    				WaitForKey();
    			}
    		}
    
    		public class Quote
    		{
    			public string InstrumentId { get; set; }
    			public decimal Last { get; set; }
    		}
    
    		public class Order
    		{
    			public string InstrumentId { get; set; }
    			public int OrderQty { get; set; }
    		}
    	}
    
    	public static partial class Observable3
    	{
    		public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(
    			this IObservable<TSource> source,
    			Func<TSource, TKey> keySelector,
    			Func<TKey, ISubject<TSource>> subjectSelector)
    		{
    			return source
    				.GroupBy(keySelector)
    				.Select(g => new MulticastGroupedObservable<TKey, TSource>(g, subjectSelector(g.Key)));
    		}
    
    		private sealed class MulticastGroupedObservable<TKey, TSource> : IGroupedObservable<TKey, TSource>
    		{
    			public TKey Key
    			{
    				get;
    				private set;
    			}
    
    			private readonly IObservable<TSource> observable;
    
    			public MulticastGroupedObservable(IGroupedObservable<TKey, TSource> observable, ISubject<TSource> subject)
    			{
    				var multicasted = observable.Multicast(subject);
    
    				this.Key = observable.Key;
    				this.observable = multicasted;
    
    				multicasted.Connect();
    			}
    
    			public IDisposable Subscribe(IObserver<TSource> observer)
    			{
    				return observable.Subscribe(observer);
    			}
    		}
    	}
    }

    - Dave


    http://davesexton.com/blog

  • Tuesday, February 21, 2012 6:55 AM
     
     
    Yup - sounds like a good Rxx operator.

    James Miles http://enumeratethis.com

  • Tuesday, February 21, 2012 11:36 PM
     
     

    James

    Am glad I asked on this forum - its going to take me a few days to even grok the answer and Dave's initial approach. In the meantime, I will plugin the solution and see how it scales. 

    Thanks

  • Wednesday, February 22, 2012 6:18 AM
     
      Has Code

    NOTE:

    Something to be aware of with this solution.

    The way you have spec'd this, you are ALWAYS doing this for ALL instruments. If that is what you want, I *think* this will perform OK.

    Sometimes you need a system where you only run a "transient query" for a given instrument. Here is an example of how this can be achieved.

    using System;
    using System.Reactive.Linq;
    using System.Reactive.Subjects;
    
    namespace ConsoleApplication13
    {
        class Program
        {
            public class Quote
            {
                public string Instrument { get; set; }
                public decimal Price { get; set; }
            }
    
            public class Order
            {
                public string Instrument { get; set; }
                public int Qty { get; set; }
            }
    
            static void Main()
            {
                var quotes = new Subject<Quote>();
                var orders = new Subject<Order>();
    
                var groupedQuotes = quotes.GroupBy(q => q.Instrument, _ => new ReplaySubject<Quote>(1)).Replay();
                var groupedOrders = orders.GroupBy(o => o.Instrument, _ => new ReplaySubject<Order>(1)).Replay();
    
                var ftseQuotes =
                    from grp in groupedQuotes
                    where grp.Key == "FTSE"
                    from quote in grp
                    select quote;
    
                var ftseOrders =
                    from grp in groupedOrders
                    where grp.Key == "FTSE"
                    from order in grp
                    select order;
    
                Console.WriteLine("Subscribing FTSE");
                var ftseSubscription = ftseOrders.CombineLatest(ftseQuotes, (order, quote) => order.Qty * quote.Price)
                    .Subscribe(x => Console.WriteLine("ftse: " + x));
    
                groupedOrders.Connect();
                groupedQuotes.Connect();
    
                quotes.OnNext(new Quote { Instrument = "DOWJ", Price = 9998.10m });
                orders.OnNext(new Order { Instrument = "DOWJ", Qty = 10000 });
                quotes.OnNext(new Quote { Instrument = "FTSE", Price = 5678.10m });
                quotes.OnNext(new Quote { Instrument = "FTSE", Price = 5678.11m });
                orders.OnNext(new Order { Instrument = "FTSE", Qty = 2000 });
    
                Console.WriteLine("Disposing FTSE");
                ftseSubscription.Dispose();
    
                var dowjQuotes =
                    from grp in groupedQuotes
                    where grp.Key == "DOWJ"
                    from quote in grp
                    select quote;
    
                var dowjOrders =
                    from grp in groupedOrders
                    where grp.Key == "DOWJ"
                    from order in grp
                    select order;
    
                Console.WriteLine("Subscribing DOWJ");
                var dowjSubscription = dowjOrders.CombineLatest(dowjQuotes, (order, quote) => order.Qty * quote.Price)
                    .Subscribe(x => Console.WriteLine("dowj: " + x));
    
                orders.OnNext(new Order { Instrument = "FTSE", Qty = 1000 });
                orders.OnNext(new Order { Instrument = "DOWJ", Qty = 500 });
    
                Console.WriteLine("Disposing DOWJ");
                dowjSubscription.Dispose();
            }
        }
    
        public static partial class Observable3
        {
            public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(
                this IObservable<TSource> source,
                Func<TSource, TKey> keySelector,
                Func<TKey, ISubject<TSource>> subjectSelector)
            {
                return source
                    .GroupBy(keySelector)
                    .Select(g => new MulticastGroupedObservable<TKey, TSource>(g, subjectSelector(g.Key)));
            }
    
            private sealed class MulticastGroupedObservable<TKey, TSource> : IGroupedObservable<TKey, TSource>
            {
                public TKey Key
                {
                    get;
                    private set;
                }
    
                private readonly IObservable<TSource> observable;
    
                public MulticastGroupedObservable(IGroupedObservable<TKey, TSource> observable, ISubject<TSource> subject)
                {
                    var multicasted = observable.Multicast(subject);
    
                    this.Key = observable.Key;
                    this.observable = multicasted;
    
                    multicasted.Connect();
                }
    
                public IDisposable Subscribe(IObserver<TSource> observer)
                {
                    return observable.Subscribe(observer);
                }
            }
        }
    
    }
    

    Hope that makes sense!



    James Miles http://enumeratethis.com