Window Operation followed by Group By - How to do in reactive extensions


  • Hello,

    Say I have a Class MyClass


    String Id1;

    String Id2;

    string message;

    int val;


    I want to calculate sum of this val per group (Id1+Id2) per second for the last 3 seconds. I am trying to do it, but i am not able to get the syntax right, can anyone help me.

    public static IObservable<MyClass> CountFrequency(this IObservable<MyClass> source, TimeSpan sec)
                return source.Window(TimeSpan.FromSeconds(sec), TimeSpan.FromSeconds(sec)).Select(sl => sl.GroupBy(a => a.Id1 + a.Id2)

                         .SelectMany(go => go.Aggregate(new MyClass(), (acc, evt) => CustomAgg(acc, evt, marker))
                  .Select(count => count)));



    CustomAgg - is my custom class which has signature (MyClass a, MyClass b, string message)

    Thursday, March 13, 2014 5:34 AM

All replies

  • I would approach this in smaller bit size chunks.

    The first thing I notice is the sliding window aggregate requirement. Let's tackle that first.

    //I want to calculate sum of this val per group (Id1+Id2) 
    //	per second 
    //	for the last 3 seconds. 
    //Break it into parts 
    //	1) per second --> Open a new window every second
    //	2) for the last 3 seconds --> close the window after 3 seconds
    var interval = TimeSpan.FromSeconds(1);
    var window = TimeSpan.FromSeconds(3);

    With these variables defined first, I would then (with some help look at the most powerful Windowing operator in Rx; GroupJoin.

    	Observable.Timer(TimeSpan.Zero, interval), //Open window every 1second)
    	input, //Collect all the values
    	left=>Observable.Timer(window), //Close the left window after 3seconds
    	(left, right)=>right.Aggregate((curr,acc)=>acc+curr)
    Running that with some dummy data, seems fine. I pull that out into a custom operator

    public static class ObservableEx
    	public static IObservable<TAggregate> SlidingAggregate<TValue, TAggregate>(this IObservable<TValue> source, TimeSpan interval, TimeSpan window, TAggregate seed, Func<TAggregate, TValue, TAggregate> aggregator)
    		return Observable.GroupJoin
    			Observable.Timer(TimeSpan.Zero, interval),
    			(left, rights)=>rights.Aggregate(seed, aggregator)

    Next, I look at the Grouping requirement. Now that I have my operator, this should be easy to leverage that from my grouped sequence.

    input.GroupBy(a => a.Id1 + a.Id2)
         .Select(grp=>grp.SlidingAggregate(interval, window, new MyClass(), (acc, evt) => CustomAgg(acc, evt, marker))
                         .Select(count=>new{grp.Key, Count=count}))

    I think this will give you what you need.



    Next time, if you post some working code (LinqPad for example), it will be much easier for someone to help you to a solution.

    Lee Campbell

    Tuesday, May 13, 2014 2:06 PM