none
Group by with buffer - Reactive Extensions RRS feed

  • Question

  • Problem from stack exchange - http://stackoverflow.com/questions/16464721/reactive-extensions-group-by-unique-bufferwindow-till-time-span-with-cancellati

    I have a Hot stream of events coming of following type -Event {

    string parentName,string name; int state ; // its 1 or 2 ie active or unactive

    }

    I need to buffer event per parent for 2 minutes, if during this 2 minute , i recv any event for child with state =2 for a given parent , this buffer should cancel and should output 0 otherwise i get the count of the events recvd .

    I know I have to use GroupBy to partition, and then buffer and then count but i am unable to think of a way by which i create Buffer which is unique per parent, i though of using Distinct but this doesnt solve the problem, for i only dont want to create buffer till the parent is active (as once the parent's buffer gets cancelled or 2 minutes is over, the parent buffer can be created again) So I understand I need to create a custom buffer which checks the condition for creating buffer, but how do i do this via reactive extensions.

    The solution provided there doesnt work, any other idea

    Thursday, May 9, 2013 7:47 PM

All replies

  • Assuming that a two minute buffer for each group should open as soon as the first event for that group is seen, and close after two minutes or a zero state is seen, then I think this works:

    public static IObservable<EventCount> EventCountByParent(this IObservable<Event> source, IScheduler scheduler)
    {
        return Observable.Create<EventCount>(observer => source.GroupByUntil(
            evt => GetParent(evt.Name),
            evt => evt,
            group =>
            @group.Where(evt => evt.State == 2)
                    .Merge(Observable.Timer(TimeSpan.FromMinutes(2), scheduler).Select(_ => Event.Null)))
                    .SelectMany(
                        go =>
                        go.Aggregate(0, (acc, evt) => (evt.State == 2 ? 0 : acc + 1))
                        .Select(count => new EventCount(go.Key, count))).Subscribe(observer));
    }

    With EventCount as:

        public class EventCount
        {
            private readonly string _name;
            private readonly int _count;
    
            public EventCount(string name, int count)
            {
                _name = name;
                _count = count;
            }
        }
    

    And Event as:

        public class Event
        {
            public static Event Null = new Event(string.Empty, 0);
    
            private readonly string _name;
            private readonly int _state;
    
            public Event(string name, int state)
            {
                _name = name;
                _state = state;
            }
    
            public string Name { get { return _name;  } }
            public int State { get { return _state; } }
        }
    

    • Proposed as answer by James World Thursday, May 9, 2013 9:50 PM
    Thursday, May 9, 2013 9:18 PM