Query help - Event count by minute with realtime updates for current minute

Answered Query help - Event count by minute with realtime updates for current minute

  • Wednesday, March 06, 2013 8:11 PM
     
     

    I'm trying to get a count of the number of incidences of an event per minute for a 5 minute time period.  I want the subscriber to receive an updated 5-minute view every time an event occurs.  The first 4 items in the result would be historical and the last one would be realtime.

    I'm thinking I would need a ReplaySubject that stores the previous 4 values and that the Buffer and Count() operations would come into effect, but I'm having trouble getting something working.

    12:00-12:01 (app isn't running yet)

    12:01-12:02 = 5 occurrences

    12:02-12:03 = 3 occurrences

    12:03-12:04 = 4 occurrences

    12:04-12:05 = 5 occurrences

    ---

    12:04:53 -- User comes online here. --> { 0, 5, 3, 4, 5}

    12:05:03 --> { 5, 3, 4, 5, 1 }

    12:05:27 --> { 5, 3, 4, 5, 2 }

    12:05:53 --> { 5, 3, 4, 5, 3 }

    12:06:25 --> { 3, 4, 5, 3, 1 }


    Scott Holodak


    • Edited by sholodak Wednesday, March 06, 2013 8:11 PM
    •  

All Replies

  • Thursday, March 07, 2013 4:06 AM
     
     Answered Has Code

    Hi Scott,

    Try the following:

    using System;
    using System.Reactive;
    using System.Reactive.Linq;
    using Microsoft.Reactive.Testing;
    
    namespace ConsoleApplication1
    {
    	enum EventOrInterval
    	{
    		Event,
    		Interval
    	}
    
    	class Program : ReactiveTest
    	{
    		static void Main()
    		{
    			new Program().Run();
    		}
    
    		void Run()
    		{
    			// Setup
    
    			var scheduler = new TestScheduler();
    
    			var events = scheduler.CreateColdObservable(
    				OnNext(new TimeSpan(0, 1, 12).Ticks, Unit.Default),
    				OnNext(new TimeSpan(0, 1, 24).Ticks, Unit.Default),
    				OnNext(new TimeSpan(0, 1, 36).Ticks, Unit.Default),
    				OnNext(new TimeSpan(0, 1, 48).Ticks, Unit.Default),
    				OnNext(new TimeSpan(0, 1, 55).Ticks, Unit.Default),
    
    				OnNext(new TimeSpan(0, 2, 20).Ticks, Unit.Default),
    				OnNext(new TimeSpan(0, 2, 40).Ticks, Unit.Default),
    				OnNext(new TimeSpan(0, 2, 55).Ticks, Unit.Default),
    
    				OnNext(new TimeSpan(0, 3, 15).Ticks, Unit.Default),
    				OnNext(new TimeSpan(0, 3, 30).Ticks, Unit.Default),
    				OnNext(new TimeSpan(0, 3, 45).Ticks, Unit.Default),
    				OnNext(new TimeSpan(0, 3, 55).Ticks, Unit.Default),
    
    				OnNext(new TimeSpan(0, 4, 12).Ticks, Unit.Default),
    				OnNext(new TimeSpan(0, 4, 24).Ticks, Unit.Default),
    				OnNext(new TimeSpan(0, 4, 36).Ticks, Unit.Default),
    				OnNext(new TimeSpan(0, 4, 48).Ticks, Unit.Default),
    				OnNext(new TimeSpan(0, 4, 55).Ticks, Unit.Default),
    
    				OnNext(new TimeSpan(0, 5, 03).Ticks, Unit.Default),
    				OnNext(new TimeSpan(0, 5, 27).Ticks, Unit.Default),
    				OnNext(new TimeSpan(0, 5, 53).Ticks, Unit.Default),
    
    				OnNext(new TimeSpan(0, 6, 25).Ticks, Unit.Default),
    				OnCompleted(Unit.Default, new TimeSpan(0, 6, 26).Ticks));
    
    
    			// Query
    
    			var interval = Observable.Interval(TimeSpan.FromMinutes(1), scheduler)
    				.Take(6);		// Only required for testing to ensure that the query halts
    
    			var eventsAndIntervals =
    				events.Select(_ => EventOrInterval.Event)
    							.Merge(
    				interval.Select(_ => EventOrInterval.Interval));
    
    			var query = eventsAndIntervals.Scan(
    				new
    				{
    					Oldest = 0,
    					NextToOldest = 0,
    					Middle = 0,
    					NextToNewest = 0,
    					Newest = 0
    				},
    				(counts, next) =>
    				{
    					if (next == EventOrInterval.Event)
    					{
    						return new
    						{
    							Oldest = counts.Oldest,
    							NextToOldest = counts.NextToOldest,
    							Middle = counts.Middle,
    							NextToNewest = counts.NextToNewest,
    							Newest = counts.Newest + 1
    						};
    					}
    					else
    					{
    						return new
    						{
    							Oldest = counts.NextToOldest,
    							NextToOldest = counts.Middle,
    							Middle = counts.NextToNewest,
    							NextToNewest = counts.Newest,
    							Newest = 0
    						};
    					}
    				})
    				.Where(view => view.Newest > 0)
    				.Publish();		// Just to simulate a hot sequence
    
    			using (query.Connect())
    			{
    				var observer = scheduler.Start(
    					() => query,
    					created: 0,
    					subscribed: new TimeSpan(0, 4, 53).Ticks,		// User comes online here
    					disposed: new TimeSpan(0, 6, 30).Ticks);
    
    				foreach (var message in observer.Messages)
    				{
    					if (message.Value.Kind == NotificationKind.OnNext)
    					{
    						Console.WriteLine("{0}: {{ {1}, {2}, {3}, {4}, {5} }}",
    							TimeSpan.FromTicks(message.Time),
    							message.Value.Value.Oldest,
    							message.Value.Value.NextToOldest,
    							message.Value.Value.Middle,
    							message.Value.Value.NextToNewest,
    							message.Value.Value.Newest);
    					}
    				}
    			}
    
    			Console.WriteLine("Done");
    			Console.ReadLine();
    		}
    	}
    }

    - Dave


    http://davesexton.com/blog

    • Marked As Answer by sholodak Thursday, March 07, 2013 3:37 PM
    •  
  • Thursday, March 07, 2013 4:16 AM
     
      Has Code

    Hi Scott,

    I forgot to paste the results of the test:

    00:04:55: { 0, 5, 3, 4, 5 }
    00:05:03: { 5, 3, 4, 5, 1 }
    00:05:27: { 5, 3, 4, 5, 2 }
    00:05:53: { 5, 3, 4, 5, 3 }
    00:06:25: { 3, 4, 5, 3, 1 }
    Done

    - Dave


    http://davesexton.com/blog