none
Window Operation followed by Group By - How to do in reactive extensions

    Question

  • Hello,

    Say I have a Class MyClass

    {

    String Id1;

    String Id2;

    string message;

    int val;

    }

    I want to calculate sum of this val per group (Id1+Id2) per second for the last 3 seconds. I am trying to do it, but i am not able to get the syntax right, can anyone help me.

    public static IObservable<MyClass> CountFrequency(this IObservable<MyClass> source, TimeSpan sec)
            {
                return source.Window(TimeSpan.FromSeconds(sec), TimeSpan.FromSeconds(sec)).Select(sl => sl.GroupBy(a => a.Id1 + a.Id2)

                         .SelectMany(go => go.Aggregate(new MyClass(), (acc, evt) => CustomAgg(acc, evt, marker))
                  .Select(count => count)));

            

            }

    CustomAgg - is my custom class which has signature (MyClass a, MyClass b, string message)

    Thursday, March 13, 2014 5:34 AM

All replies

  • I would approach this in smaller bit size chunks.

    The first thing I notice is the sliding window aggregate requirement. Let's tackle that first.

    //I want to calculate sum of this val per group (Id1+Id2) 
    //	per second 
    //	for the last 3 seconds. 
    
    //Break it into parts 
    //	1) per second --> Open a new window every second
    //	2) for the last 3 seconds --> close the window after 3 seconds
    
    var interval = TimeSpan.FromSeconds(1);
    var window = TimeSpan.FromSeconds(3);

    With these variables defined first, I would then (with some help http://introtorx.com/Content/v1.0.10621.0/17_SequencesOfCoincidence.html#GroupJoin) look at the most powerful Windowing operator in Rx; GroupJoin.

    Observable.GroupJoin
    ( 
    	Observable.Timer(TimeSpan.Zero, interval), //Open window every 1second)
    	input, //Collect all the values
    	left=>Observable.Timer(window), //Close the left window after 3seconds
    	right=>Observable.Empty<Unit>(), 
    	(left, right)=>right.Aggregate((curr,acc)=>acc+curr)
    );
    Running that with some dummy data, seems fine. I pull that out into a custom operator

    public static class ObservableEx
    {
    	public static IObservable<TAggregate> SlidingAggregate<TValue, TAggregate>(this IObservable<TValue> source, TimeSpan interval, TimeSpan window, TAggregate seed, Func<TAggregate, TValue, TAggregate> aggregator)
    	{
    		return Observable.GroupJoin
    		( 
    			Observable.Timer(TimeSpan.Zero, interval),
    			source,
    			left=>Observable.Timer(window),
    			right=>Observable.Empty<Unit>(),
    			(left, rights)=>rights.Aggregate(seed, aggregator)
    		).Merge();
    	}
    }

    Next, I look at the Grouping requirement. Now that I have my operator, this should be easy to leverage that from my grouped sequence.

    input.GroupBy(a => a.Id1 + a.Id2)
         .Select(grp=>grp.SlidingAggregate(interval, window, new MyClass(), (acc, evt) => CustomAgg(acc, evt, marker))
                         .Select(count=>new{grp.Key, Count=count}))
         .Merge()
         .Dump();

    I think this will give you what you need.

    Lee

    P.S.

    Next time, if you post some working code (LinqPad for example), it will be much easier for someone to help you to a solution.


    Lee Campbell http://LeeCampbell.blogspot.com

    Tuesday, May 13, 2014 2:06 PM