Automatically Subscribe is not getting called
-
12 Februari 2012 15:41
I am trying to use observer pattern in the StreamInsight. I have a hopping window query which should give the count of events occurred within an hour every 15 seconds. But as of now the result is unpredictable and does not call subscribe evey 15 secods automatically on the observer. My understanding, If i add the event once, the same count should be given every 15 seconds and automatically subscribe on observer should be called. As of now call to subscribe is not predictable.
Here is the sample code from the windows application which I have written to get the feel of Observer pattern in StreamInsight.
Sample Windows App code:
/*Module Variables*/
Server m_server = null;
Microsoft.ComplexEventProcessing.Application m_application;
Subject<MonitoringInputEvent> m_subject = new Subject<MonitoringInputEvent>();
MonitoringObserver m_observer = new MonitoringObserver();
List<IDisposable> m_subscribers = new List<IDisposable>();/*Code in form load */
m_subject = new Subject<MonitoringInputEvent>();
m_server = Server.Create("NewInstance");
m_application = m_server.CreateApplication("My Application");
var m_InputStream = m_subject.ToPointStream(
m_application,
p => PointEvent.CreateInsert(p.RequestDateTime,p),
AdvanceTimeSettings.IncreasingStartTime);//Event 15 seconds we check how many requests by each key in 1 hour.
var m_QueryStream = from e in m_InputStream
group e by e.Key into keyGroups
from win in keyGroups.HoppingWindow(TimeSpan.FromHours(1), TimeSpan.FromSeconds(15), HoppingWindowOutputPolicy.ClipToWindowEnd)
select new MonitoringEventOutput
{
Key = keyGroups.Key,
Count = win.Count(),
EventType = MonitoringOutputEventType.OneHour
};/*Code to Push the events on button click*/
MonitoringInputEvent mevent = new MonitoringInputEvent();
mevent.RequestDateTime = DateTime.Now;
mevent.Key = txtValue.Text + " - A - ";
m_subject.OnNext(mevent);mevent = new MonitoringInputEvent();
mevent.RequestDateTime = DateTime.Now;
mevent.Key = txtValue.Text + " - B - ";
m_subject.OnNext(mevent);
Semua Balasan
-
12 Februari 2012 17:18How often are events getting pushed into the engine? Is there at least one every 15 seconds? Because ... if there are no events in the hop/tumbling window, the aggregate won't be recalculated. You need events pushing through the engine for the queries to run.
DevBiker (aka J Sawyer)
My Blog
My Bike - Concours 14
If I answered your question, please mark as answer.
If my post was helpful, please mark as helpful. -
13 Februari 2012 3:49
Well, there is no guarantee that there will be a new event every 15 seconds. But why will the engine need a new event every 15 seconds?
For example: If I add 5 events @ 9:00 AM into SI and ask for the count using hopping window every 15 seconds in last one hour then atleast every 15 seconds its should give me the same count till 10:00 AM, isnt it? If not, then what does an hour limit signifies?
-
13 Februari 2012 13:01
Basically, you are expecting a poll rather than a push architecture. StreamInsight doesn't poll. So if there is nothing to push the events along ...
With your hopping window, in your example, the hour limit signifies how long the aggregate window is ... so it'll aggregate the last hour's worth of events, recalculated every 15 seconds ... as long as the stream is moving along. You'll need, however, events and CTIs within that 15 second hop size. Since you are using IncreasingStartTime rather than enqueuing the CTIs in your adaper, there is no way that you'll have a CTI without having any events in the queue.
DevBiker (aka J Sawyer)
My Blog
My Bike - Concours 14
If I answered your question, please mark as answer.
If my post was helpful, please mark as helpful.- Disarankan sebagai Jawaban oleh DevBikerMVP 17 Februari 2012 20:32
- Ditandai sebagai Jawaban oleh Stephanie LvModerator 21 Februari 2012 7:38