Risposta suggerita Multilevel group by

  • 2012年4月24日 13:13
     
     

    I have a hot stream of quotes for stocks and would like to do a multi-level grouping the equivalent of the following sql

    select symbol, avg(px), sum(volume)

    from quote

    group by exchange, symbol

    Any help will be appreciated. Below is test setup code.

    void Main()
    {
     Subject<Quote> quotes = new Subject<Quote>();

      var groupedQuotes = from q in quotes
      group q by new { q.InstrumentId, q.exchange } into grp              
      select grp;

       var subscribe = from g in groupedQuotes.Window(TimeSpan.FromSeconds(1)) select g;
       subscribe.Dump();

         
      quotes.OnNext(new Quote{InstrumentId = "A", Px = 10.10m,  exchange="O", sector="B", volume=100L});
      quotes.OnNext(new Quote{InstrumentId = "B", Px = 50.10m, exchange="N", sector="B", volume=100L});
      quotes.OnNext(new Quote{InstrumentId = "A", Px = 20.11m, exchange="R", sector="B", volume=100L});
      quotes.OnNext(new Quote{InstrumentId = "A", Px = 26.11m, exchange="R", sector="B", volume=100L});
      
      //Expected result
      //A, O, 10.10, 100
      //A, R, 23.11, 200
      //B, N, 50.10, 100
     }

     public class Quote
     {
      public string InstrumentId { get; set; }
      public decimal Px { get; set; }
      public string symbol { get; set; }
      public string sector { get; set; }
      public string exchange { get; set; }
      public long volume { get; set; }
      
     }
    }




全部回复

  • 2012年5月1日 13:00
     
      包含代码

    Hi,

    This isn't really a multi-level group by problem.  It's just a single group by query with a projection containing aggregates.  The trick is to project a new anonymous type for each window and to use Zip to project the aggregates into a child anonymous type to remain reactive.  However, the Average operator will fail if the window is empty (division by zero), so you must ensure there's always at least one element; I've done this below with the Merge operator so that the average value for an empty window is 0.

    from q in quotes
    group q by new { q.InstrumentId, q.Exchange } into g
    from window in g.Window(TimeSpan.FromSeconds(1))
    select new
    {
    	InstrumentId = g.Key.InstrumentId,
    	Exchange = g.Key.Exchange,
    	Aggregates = Observable.Zip(
    		window.Select(q => q.Px).Merge(window.IsEmpty().Where(i => i).Select(_ => 0M)).Average(),
    		window.Select(q => q.Volume).Sum(),
    		(avg, sum) => new { avg, sum })
    };

    Edit: Here's the complete lab to illustrate how to subscribe to this query:

    using System;
    using System.Reactive.Linq;
    using System.Reactive.Subjects;
    
    namespace Rxx.Labs.Reactive
    {
    	public sealed class GroupByMultiLevel : BaseConsoleLab
    	{
    		protected override void Main()
    		{
    			var quotes = new Subject<Quote>();
    
    			var groupedQuotes =
    				from q in quotes
    				group q by new { q.InstrumentId, q.Exchange } into g
    				from window in g.Window(TimeSpan.FromSeconds(1))
    				select new
    				{
    					InstrumentId = g.Key.InstrumentId,
    					Exchange = g.Key.Exchange,
    					Aggregates = Observable.Zip(
    						window.Select(q => q.Px).Merge(window.IsEmpty().Where(i => i).Select(_ => 0M)).Average(),
    						window.Select(q => q.Volume).Sum(),
    						(avg, sum) => new { avg, sum })
    				};
    
    			using (groupedQuotes
    				.Subscribe(group => group.Aggregates
    				.Subscribe(value => TraceLine(group.InstrumentId + ',' + group.Exchange + ',' + value.avg + ',' + value.sum))))
    			{
    				quotes.OnNext(new Quote { InstrumentId = "A", Px = 10.10m, Exchange = "O", Volume = 100L });
    				quotes.OnNext(new Quote { InstrumentId = "B", Px = 50.10m, Exchange = "N", Volume = 100L });
    				quotes.OnNext(new Quote { InstrumentId = "A", Px = 20.11m, Exchange = "R", Volume = 100L });
    				quotes.OnNext(new Quote { InstrumentId = "A", Px = 26.11m, Exchange = "R", Volume = 100L });
    
    				WaitForKey();
    			}
    		}
    
    		class Quote
    		{
    			public string InstrumentId { get; set; }
    			public decimal Px { get; set; }
    			public string Exchange { get; set; }
    			public long Volume { get; set; }
    		}
    	}
    }

    - Dave


    http://davesexton.com/blog

    • 已编辑 Dave Sexton 2012年5月1日 13:03 Added complete lab
    •  
  • 2012年5月1日 13:12
     
      包含代码

    Hi,

    You could also modify the query to include a sequential projection when the Zip completes by using another from statement (SelectMany).  This is probably better because it avoids multi-level subscription.

    For example: (Untested)

    from q in quotes
    group q by new { q.InstrumentId, q.Exchange } into g
    from window in g.Window(TimeSpan.FromSeconds(1))
    from aggs in Observable.Zip(
    	window.Select(q => q.Px).Merge(window.IsEmpty().Where(i => i).Select(_ => 0M)).Average(),
    	window.Select(q => q.Volume).Sum(),
    	(avg, sum) => new { avg, sum })
    select new
    {
    	InstrumentId = g.Key.InstrumentId,
    	Exchange = g.Key.Exchange,
    	AveragePx = aggs.avg, 
    	VolumeSum = aggs.sum
    };

    - Dave


    http://davesexton.com/blog

  • 2012年5月1日 14:17
     
     
    Thanks Dave. One issue with both above solutions is that the average is being calculated only on the window - however I need a continous average over the stream and if the publishing window is empty the average price should not change.
  • 2012年5月1日 14:50
     
     

    Hi,

    Well that's an important piece of the spec that you left out :)

    The Scan operator allows you to create a running aggregate.  Apply it to the group instead of the window.

    - Dave


    http://davesexton.com/blog

  • 2012年5月2日 14:29
     
     

    Came up with this - pls let me know if you see any problems with this.

                var groupedQuotes =
                    (from q in quotes
                     group q by new {q.InstrumentId, exchange = q.Exchange}).
                     SelectMany(q => q.Scan(new {sum = 0m, count = 0, vsum=0L},
                         (agg, quote) => new {sum = agg.sum + quote.Px, count = agg.count + 1, vsum = agg.vsum + quote.Volume}).Select(
                    agg => (new Quote() { InstrumentId = q.Key.InstrumentId,  Px=agg.sum/agg.count, Volume=agg.vsum})));

    Next step is to run 200 msgs/s with 7K groups through it for a day and hope it runs efficiently

  • 2012年5月2日 15:21
     
     建议的答复 包含代码

    Hi, 

    Looks good, though you may want to stick with one kind of syntax for clarity; e.g., query comprehension syntax:

    from quote in quotes
    group quote by new { quote.InstrumentId, quote.Exchange } into quotesByIdAndExchange
    from agg in quotesByIdAndExchange.Scan(
    	new { sum = 0m, count = 0, vsum = 0L },
    	(agg, quote) => new
    	{
    		sum = agg.sum + quote.Px,
    		count = agg.count + 1,
    		vsum = agg.vsum + quote.Volume
    	})
    select new Quote()
    {
    	InstrumentId = quotesByIdAndExchange.Key.InstrumentId,
    	Px = agg.sum / agg.count,
    	Volume = agg.vsum
    };

    - Dave


    http://davesexton.com/blog

    • 已建议为答案 LeeCampbell 2012年5月15日 14:31
    •  
  • 2012年5月2日 17:12
     
     

    True that,cleans it up. But coming from esper/streambase, the syntax there is literally

    select InstrumentId, Exchange, avg(price), sum(volume) from Quotes group
    by InstrumentId, Exchange

    I find I need to work much harder with Reactive to achieve the same output. IMHO, I find that NOT having the concept of "OnComplete" events in other cep platforms simplified querying indeterminate length streams.

  • 2012年5月2日 18:13
     
     

    Hi,

    I don't know anything about esper/streambase, but here are some questions that might be worth asking when comparing it to Rx:

    1. Are CEP queries statically typed?
    2. Can CEP queries be combined easily with queries from other sequences/models?  E.g., LINQ to Objects, LINQ to XML, LINQ to Entities, etc.
    3. Can CEP queries easily handle side-effects of any kind and at any time?  E.g., the Do operator.
    4. Are CEP queries extensible?  E.g., custom Rx operators are pretty easy to create.

    This is probably not an exhaustive list of useful rhetorical questions :)

    - Dave


    http://davesexton.com/blog