Streaming of continuous computation on grouped events returning computation + events

Answered Streaming of continuous computation on grouped events returning computation + events

  • Thursday, February 28, 2013 10:35 AM
     
     

    Hi

    First of all let me explain what I have and what I'd like to do:

    - I start off an unbuffered and unordered stream of events

    - I'm looking for a query which will yield a tuple of both computations and underlying events that were used for it. This consumer of this is a WPF application and to smooth things out on the dispatcher thread I would like to group the original stream of events using a window, on that group I would buffer up all of its events and I would like as well to run a computation that needs doing across ALL events with the same grouping key, not just events from this group.

    So say I have IObservable<T>, I would like to transform this into IObservable<Tuple<T[], ComputationResult>>

    What I started doing was :

    - Subscribe to original event and use GroupByUntil with my grouping predicate and a sensible window size (say 500ms)

    - I subscribe (Subscribe or SelectMany?) to the groups which will require two bits of processing :

       - Buffer up all events of the current group

       - A computation could either use Do or Scan, If I use Scan the seed object would need to be looked up against say a global dictionary because ComputationResult is against all events against a given predicate key, not just a group

    - Then finaly Zip the results of both the buffer and of the Do/Scan and returned this to the consumer.

    I sadly no longer have the code when I was trying this out but will try to find some free time and provide a sample.

    Many thanks

    Daniel

All Replies

  • Thursday, February 28, 2013 5:50 PM
     
      Has Code

    Hi Daniel,

    It seems like you're attempting to group twice.  The first grouping is meant to batch process events by some key to eventually improve performance of the observer (WPF); let's call this key A and its output IObservable<TofA[]>.  The actual computation accepts IObservable<TofA[]> as input, yet it requires a different grouping of the original sequence to perform its work; let's call this key B and its output IObservable<Tuple<TofB[], CR>>.  Furthermore, A and B are not necessarily equivalent.  So the problem is, how do you go from TofA[] to TofB[] within a single query, or how do you join two distinct queries by different keys?

    Is that correct?

    I think the issue is that you're mixing two requirements together, and in the wrong order.  The first grouping is meant to lighten the load on the UI thread, so it should actually be done last, or not at all if performance testing proves it to be unnecessary.  Furthermore, to my latter point, the second grouping for the actual computation seems to be fit for buffering of its own, which may render the first grouping unnecessary.

    If you're sure that you need to lighten the load on the UI thread and the query is running on a thread other than the UI thread, then consider using the BufferIntrospective operator in Rxx.

    For example: (Untested)

    IObservable<IList<Tuple<IList<Input>, ComputationResult>>> query = source
    	.GroupByUntil(GetComputationKey, group => WhenLastKey(group.Key))
    	.Publish(batch => 
    			batch.ToList()
    		.Zip(
    			batch.Aggregate((result, input) => new ComputationResult()),	// TODO: Computation
    			(inputs, result) => Tuple.Create(inputs, result)))
    	.BufferIntrospective(DispatcherScheduler.Current);	// Additional batching, if needed

    - Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Thursday, February 28, 2013 5:54 PM Missing parens; Removed unnecessary requirement for using BufferIntro
    •  
  • Friday, March 08, 2013 10:52 AM
     
     

    Hi Dave

    Thanks for your reply and I will try it out but I don't think it's exactly what I'm after.
    For info, the reason why I would like to dispatch this to a worker thread and batch process events is because I have to process around 20k+ events on screen load and even though the computations aren't heavy it would make sense to dispatch that to a GUI thread and then send batch updates to the GUI.
    Re your answer, am not sure what WhenLastKey is supposed to do.

    Maybe to help here is the classes that could be used for an example:

            public class Event
            {
                public Event(EventKey key, decimal amount)
                {
                    Key = key;
                    Amount = amount;
                }

                public EventKey Key { get; set; }
                public decimal Amount { get; set; }
                public override string ToString()
                {
                    return Key.Value + ":" + Amount;
                }
            }

            public class EventKey
            {
                public string Value { get; set; }
            }

            public class ComputationResult
            {
                public ComputationResult(EventKey key)
                {
                    Key = key;
                }
                public EventKey Key { get; set; }
                public decimal TotalAmount { get; set; }
                public ComputationResult Add(Event ev)
                {
                    TotalAmount += ev.Amount;
                    return this;
                }
            }

  • Tuesday, March 12, 2013 2:22 AM
     
     

    Hi,

    > For info, the reason why I would like to dispatch this to a worker thread and batch process events  [snip]
    > it would make sense to dispatch that to a GUI thread and then send batch updates to the GUI.

    Aren't these contradictory statements?  In the second statement, did you mean to say the following?

    "it would make sense to dispatch that to a background thread and then send batch updates to the GUI."

    Regardless, my assumption was that the source sequence is already pushing values on a non-UI thread.  In that case, batching first wouldn't affect the UI at all, it only makes the query more difficult to write because of the extra grouping.  If my assumption was wrong, the source sequence runs on the UI thread, then I would agree that introducing concurrency can help, but that can be done simply by applying the ObserveOn operator and passing in an instance of a concurrency-introducing scheduler, such as NewThreadScheduler.  Marshaling the results back to the UI thread in batches may still be required, but my suggestion was to place this logic at the end of the query.  Furthermore, Rxx's BufferIntrospective operator could handle the marshaling for you, introspectively, so that the size of the buffers are dynamic.  The size is computed at runtime based on the latency of the UI.

    Have I misunderstood?

    Perhaps a marble diagram would be helpful.  Also, additional details about where the concurrency exists in your query would be helpful.

    > Re your answer, am not sure what WhenLastKey is supposed to do.

    Me too, you haven't provided a specification :)

    You only wrote this:

    - Subscribe to original event and use GroupByUntil with my grouping predicate [snip]

    What is your grouping predicate?

    Perhaps if you were to provide your current implementation it would clear things up a bit.

    - Dave


    http://davesexton.com/blog

  • Wednesday, March 13, 2013 1:37 PM
     
     

    Hi Dave

    Hi indeed, it's a mistake, yes I indeed meant that whatever processing would need to be performed on a background thread.
    I will have to review what BufferIntrospective does as I find it intersting it can deal with UI latency dynamically.

    I had the time today to play a little bit with my problem and this is what I came up with, thinking about it now my problem is rather simple and I probably made it too complicated than it really is!

    // FAKE SOURCE

    IObservable<Event> source = Observable.Range(0, 100).Select(x =>
        {
          var ev = new Event(CreateRandomEventKey(), 1);
          return ev;
        });

    IObservable<IGroupedObservable<GroupingKey, Event>> groupedSource = source.GroupBy(x => x.Key);

    var result = groupedSource.SelectMany(x =>
                {

                 var buffered = x.Buffer(TimeSpan.FromMilliseconds(100));
                
                 return buffered
                                  .Where(b=>b.Count > 0)
                                  .Zip(buffered.Scan(new ComputationResult(x.Key), (seed, acc) =>
                                 {
                                     foreach (var a in acc)
                                     {
                                            seed.Add(a);
                                     }                    
                                      return seed;
                        }));

              });

    I've used a time based buffer here but having used a count based one I can confirm easily the results are indeed correct.
    I have now to possibly parallelize that using each group as partition.

    Thanks for your help!

    Daniel



  • Wednesday, March 20, 2013 12:12 PM
     
     Proposed Has Code

    Hi Dan,

    I kind of agree with Dave here. Lets clarify some assumptions so that we can best design this:

    1) The events are being produced off the dispatcher(UI Thread). i.e. there is either an Event Loop Scheduler or other thread that is getting these off the wire/disk and publishing these out as an observable sequence

    2) You want to groupby a key and perform calculations on this data in its groups. Maybe you are getting pricing information off the wire and you want to keep a running average of the price, but only by Symbol i.e. it doesn't make sense to average IBM's stock price with MSFT's one.

    3) You want to keep the original data with the computation

    4) You dont want to flood the UI with too much data. As you will be aggregating, if you get 100 updates in 50ms, you only really need to provide the last value as it will also have the array of all values bound to it anyway.

    Right so with all those assumptions I suggest two main changes:

    Update your ComputaionResult type to expose the Events that have been added to it. Instead of trying to cart around a Tuple<Event[], ComputationResult> just have a ComputationResult with a Events Property.

    public class ComputationResult
    {
    	private readonly List<Event> _events = new List<Event>();
    	public ComputationResult(EventKey key)
    	{
    		Key = key;
    	}
    	public EventKey Key { get; private set; }
    	public decimal TotalAmount { get; private set; }
    	public Event[] Events { get { return _events.ToArray(); } }
    	public ComputationResult Add(Event ev)
    	{
    		TotalAmount += ev.Amount;
    		_events.Add(ev);
    		return this;
    	}	
    }

    So this now allows us to write a solution like this

    source.GroupBy(ev=>ev.Key)
    	.SelectMany(grp=>grp.Scan(new ComputationResult(grp.Key), (computation, ev) => computation.Add(ev)));

    Which will return us the ComputationResult per group when an Event is pushed. It will also have on it's Events property all of the events that made up the Computation.

    Now this may hit our dispatcher a bit too hard if a lot of data came at it, so, we now can add our buffering. As we are getting an accumulation of the Events with each event, we can probably safely buffer most of the events out with this sort of code.

    source.GroupBy(ev=>ev.Key.Value)
    	.SelectMany(grp=>grp.Scan(new ComputationResult(grp.Key), (computation, ev) => computation.Add(ev))
    		.Buffer(TimeSpan.FromMilliseconds(100))
    		.Where(buffer=>buffer.Count>0)
    		.Select(buffer=>buffer.Last())
    	)

    Here we buffer each aggregate/computation/group back by 100ms and then only take the most recent of those computations. Now we should be able to safely push this to the Dispatcher.

    source.GroupBy(ev=>ev.Key.Value)
    	.SelectMany(grp=>grp.Scan(new ComputationResult(grp.Key), (computation, ev) => computation.Add(ev))
    		.Buffer(TimeSpan.FromMilliseconds(100))
    		.Where(buffer=>buffer.Count>0)
    		.Select(buffer=>buffer.Last())
    	)
    	.ObserveOnDispatcher()
    	.Subscribe(...);

    Finally, w.r.t Improving performance by trying to parallelise the code, I think we may have negatged the need for this now. Previously you were batching up 100ms of data and then trying to process batches of these. Now we should just have one thread constantly processing the updates. I would expect this to be faster than batching, starting up new threads, performing the work and then context switching back to the main thread.

    HTH

    Lee Campbell


    Lee Campbell http://LeeCampbell.blogspot.com

    • Proposed As Answer by LeeCampbell Wednesday, March 20, 2013 12:13 PM
    •  
  • Wednesday, March 20, 2013 2:36 PM
     
     

    Hi Lee

    All your assumptions are correct, the events are basically current traders positions (finance) which have been themselves aggregated by a server. The GUI is receiving these through a callback against an Oracle Coherence cache via a Continuous Query. On initialization the Continuous Query will basically initially yield a lot of INSERT events then UPDATE/REMOVE events as they occur against the cache.
    The source stream will originally yield a reasonable amount of these INSERT events then every couple of minutes mostly UPDATE events and some INSERT events.
    The reason I don't think to yield all events belonging to a group is a good idea is that the WPF GUI will be showing every one of them and there are quite a few hence my original aim to only yield the updated positions and some other computation which is required to done by the client against all current positions given a group (currency pair).

    Hope this clarifies things a little bit.

    Daniel

  • Wednesday, March 20, 2013 4:44 PM
     
     

    Sounds good. So are there any other questions? AFAIK the solution above is what you are looking for right?

    Lee


    Lee Campbell http://LeeCampbell.blogspot.com

  • Thursday, March 21, 2013 8:22 AM
     
      Has Code

    Hi Lee

    If I understand correctly your solution will yield every 100ms the resulting aggregation and all of the events since we started getting these from the source, I don't think it will quite work in my case because I wouldn't want having to deal with an ever increasing list of events on the GUI which runs all day long.
    An alternative to your situation would be to reset the events collection on the ComputationResult object when a buffer is yielded from the original stream of Event which implies the buffer would be done before the Scan. I could have a look at implementing this.
    I'm intrigued about this call :

    .Select(buffer=>buffer.Last())

    I'm not sure what's the behaviour to expect if that statement wasn't there.

    Daniel

  • Friday, March 22, 2013 11:31 AM
     
     

    I feel like this is a bit of a game of "Guess the number in my head"  :-)

    i.e. I still don't really know what your requirements are, or if you are happy with the solution, and if not, what is missing?

    w.r.t to the select(buffer=>buffer.Last()) question; that just takes the buffered values (as buffer turns an IOb<T> into an IOb<IList<T>>) and just ignores all but the last value. ie. the buffer variable is a List and you only want the last value from the list. The reason you only need the last is because the last will be the latest computation which includes all the previous computations and events.

    I am confused by these two statements:

    >I'm looking for a query which will yield a tuple of both computations and underlying events that were used for it.

    > I wouldn't want having to deal with an ever increasing list of events on the GUI which runs all day long.

    But isn't the computation a result of all of the previous events? So which is it?

    *Confused*


    Lee Campbell http://LeeCampbell.blogspot.com

  • Monday, March 25, 2013 11:16 AM
     
     

    Hi Lee

    I realize I should have framed my query much better and I now understand your use of Last now if indeed I only wanted the one result but as you pointed it out I wasn't clear there so please excuse me.
    What I wanted is out of of a source stream get a stream of computations alongside all the events belonging to the same window/buffer used for yielding the computation themselves. So given a buffer size of 100 items, I want to return some computation with these 100 items with the computation not being a produce of just the buffered items but the whole set that will have been streamed from the beginning.
    Hope it makes sense now and again sorry for the confusion :)

    Daniel

  • Monday, March 25, 2013 1:19 PM
     
     Answered Has Code

    Maybe this is what you are looking for then?

    void Main()
    {
    	var source = Observable.Interval(TimeSpan.FromMilliseconds(30))
    				.Take(10)
    				.Select(x =>
    					{
    						var ev = new Event(CreateRandomEventKey(), 1);
    						return ev;
    					});
    		
    	source.GroupBy(ev=>ev.Key.Value)
    				.SelectMany(grp=>grp.Buffer(TimeSpan.FromMilliseconds(100))
    							.Where(buffer=>buffer.Count>0)
    							.Scan(new ComputationResult(grp.Key), (computation, buffer) => computation.Add(buffer))
    				)
    				//.ObserveOnDispatcher()
    				//.Subscribe(...);
    				.Dump();
    				
    }
    
    // Define other methods and classes here
    private static Random _rnd = new Random();
    public EventKey CreateRandomEventKey()
    {
    	var key = _rnd.Next(1,10).ToString();
    	return new EventKey{Value=key};
    }
    
    public class Event
    {
    		public Event(EventKey key, decimal amount)
    		{
    				Key = key;
    				Amount = amount;
    		}
    
    		public EventKey Key { get; private set; }
    		public decimal Amount { get; private set; }
    		public override string ToString()
    		{
    				return Key.Value + ":" + Amount;
    		}
    }
    
    public class EventKey //: IComparable
    {
    		public string Value { get; set; }
    }
    
    public class ComputationResult
    {
    		private readonly Event[] _events;
    		//public ComputationResult(EventKey key)
    		public ComputationResult(string key)
    		{
    				_events = new Event[0];
    				Key = new EventKey{Value= key};
    		}
    		private ComputationResult(string key, decimal initialTotal, Event[] events)
    		{
    			Key = new EventKey{Value=key};
    			TotalAmount = initialTotal + events.Sum (e => e.Amount);
    			_events = events;
    		}
    		public EventKey Key { get; private set; }
    		public decimal TotalAmount { get; private set; }
    		public Event[] Events { get { return _events.ToArray(); } }
    		public ComputationResult Add(IList<Event> events)
    		{
    			return new ComputationResult(Key.Value, TotalAmount, events.ToArray());
    		}
    		
    }

    This is sort of less to do with Rx now, and more to do with how you want to model your code. The GroupBy, Buffer, Scan are the operators you are looking for and it is just how you combine them and how you want to represent the result.

    HTH

    Lee


    Lee Campbell http://LeeCampbell.blogspot.com

  • Monday, March 25, 2013 2:33 PM
     
     

    Hi Lee

    You are correct in that if my running computation just returns the buffer of the in-flight events for further downstream processing then I could just model it to include these and not have to bother with Zip altogether. My first hurdle was handling calls to Scan and Zip so that the same buffers were passed to both of these but following your model this isn't needed and reduces the query to something simpler.

    Thanks for your help.

    Daniel