Query help - Event count by minute with realtime updates for current minute
-
Wednesday, March 06, 2013 8:11 PM
I'm trying to get a count of the number of incidences of an event per minute for a 5 minute time period. I want the subscriber to receive an updated 5-minute view every time an event occurs. The first 4 items in the result would be historical and the last one would be realtime.
I'm thinking I would need a ReplaySubject that stores the previous 4 values and that the Buffer and Count() operations would come into effect, but I'm having trouble getting something working.
12:00-12:01 (app isn't running yet)
12:01-12:02 = 5 occurrences
12:02-12:03 = 3 occurrences
12:03-12:04 = 4 occurrences
12:04-12:05 = 5 occurrences
---
12:04:53 -- User comes online here. --> { 0, 5, 3, 4, 5}
12:05:03 --> { 5, 3, 4, 5, 1 }
12:05:27 --> { 5, 3, 4, 5, 2 }
12:05:53 --> { 5, 3, 4, 5, 3 }
12:06:25 --> { 3, 4, 5, 3, 1 }
Scott Holodak
- Edited by sholodak Wednesday, March 06, 2013 8:11 PM
All Replies
-
Thursday, March 07, 2013 4:06 AM
Hi Scott,
Try the following:
using System; using System.Reactive; using System.Reactive.Linq; using Microsoft.Reactive.Testing; namespace ConsoleApplication1 { enum EventOrInterval { Event, Interval } class Program : ReactiveTest { static void Main() { new Program().Run(); } void Run() { // Setup var scheduler = new TestScheduler(); var events = scheduler.CreateColdObservable( OnNext(new TimeSpan(0, 1, 12).Ticks, Unit.Default), OnNext(new TimeSpan(0, 1, 24).Ticks, Unit.Default), OnNext(new TimeSpan(0, 1, 36).Ticks, Unit.Default), OnNext(new TimeSpan(0, 1, 48).Ticks, Unit.Default), OnNext(new TimeSpan(0, 1, 55).Ticks, Unit.Default), OnNext(new TimeSpan(0, 2, 20).Ticks, Unit.Default), OnNext(new TimeSpan(0, 2, 40).Ticks, Unit.Default), OnNext(new TimeSpan(0, 2, 55).Ticks, Unit.Default), OnNext(new TimeSpan(0, 3, 15).Ticks, Unit.Default), OnNext(new TimeSpan(0, 3, 30).Ticks, Unit.Default), OnNext(new TimeSpan(0, 3, 45).Ticks, Unit.Default), OnNext(new TimeSpan(0, 3, 55).Ticks, Unit.Default), OnNext(new TimeSpan(0, 4, 12).Ticks, Unit.Default), OnNext(new TimeSpan(0, 4, 24).Ticks, Unit.Default), OnNext(new TimeSpan(0, 4, 36).Ticks, Unit.Default), OnNext(new TimeSpan(0, 4, 48).Ticks, Unit.Default), OnNext(new TimeSpan(0, 4, 55).Ticks, Unit.Default), OnNext(new TimeSpan(0, 5, 03).Ticks, Unit.Default), OnNext(new TimeSpan(0, 5, 27).Ticks, Unit.Default), OnNext(new TimeSpan(0, 5, 53).Ticks, Unit.Default), OnNext(new TimeSpan(0, 6, 25).Ticks, Unit.Default), OnCompleted(Unit.Default, new TimeSpan(0, 6, 26).Ticks)); // Query var interval = Observable.Interval(TimeSpan.FromMinutes(1), scheduler) .Take(6); // Only required for testing to ensure that the query halts var eventsAndIntervals = events.Select(_ => EventOrInterval.Event) .Merge( interval.Select(_ => EventOrInterval.Interval)); var query = eventsAndIntervals.Scan( new { Oldest = 0, NextToOldest = 0, Middle = 0, NextToNewest = 0, Newest = 0 }, (counts, next) => { if (next == EventOrInterval.Event) { return new { Oldest = counts.Oldest, NextToOldest = counts.NextToOldest, Middle = counts.Middle, NextToNewest = counts.NextToNewest, Newest = counts.Newest + 1 }; } else { return new { Oldest = counts.NextToOldest, NextToOldest = counts.Middle, Middle = counts.NextToNewest, NextToNewest = counts.Newest, Newest = 0 }; } }) .Where(view => view.Newest > 0) .Publish(); // Just to simulate a hot sequence using (query.Connect()) { var observer = scheduler.Start( () => query, created: 0, subscribed: new TimeSpan(0, 4, 53).Ticks, // User comes online here disposed: new TimeSpan(0, 6, 30).Ticks); foreach (var message in observer.Messages) { if (message.Value.Kind == NotificationKind.OnNext) { Console.WriteLine("{0}: {{ {1}, {2}, {3}, {4}, {5} }}", TimeSpan.FromTicks(message.Time), message.Value.Value.Oldest, message.Value.Value.NextToOldest, message.Value.Value.Middle, message.Value.Value.NextToNewest, message.Value.Value.Newest); } } } Console.WriteLine("Done"); Console.ReadLine(); } } }- Dave
- Marked As Answer by sholodak Thursday, March 07, 2013 3:37 PM
-
Thursday, March 07, 2013 4:16 AM
Hi Scott,
I forgot to paste the results of the test:
00:04:55: { 0, 5, 3, 4, 5 } 00:05:03: { 5, 3, 4, 5, 1 } 00:05:27: { 5, 3, 4, 5, 2 } 00:05:53: { 5, 3, 4, 5, 3 } 00:06:25: { 3, 4, 5, 3, 1 } Done- Dave

