No Output from the Subscriber
-
Monday, February 11, 2013 2:40 PM
I am enqueing CTI event throught GetPointEvent() method. The Subscribe method gets called but am facing another issue where I am unable to get the output of the query. When I subscribe observer, its type of StreamInputEvent<TpayloadType>
protected void PublishEvent(StreamInputEvent<TPayloadType> newEvent) { _observer.OnNext(newEvent); }
But when trying to get data through sink as
//Sink to get the data after calculation query.ToObservable().Subscribe(OnNext);
OnNext method use myObject class to get calculated data
public void OnNext(myObject value) { myObjectList.Add(value); }
Anything that I might have missed due to which I am not getting the output? Please suggest.
All Replies
-
Monday, February 11, 2013 3:40 PM
Based on what you have posted in your other thread, you are not enqueueing CTI events. CTI events advance application time. Without them, you will not see any output because your events will never leave your input adapter/source. You can confirm this by tracing the workflow with Event Flow Debugger. As a starting point, you'll want to add the "AdvanceTimeSettings.IncreasingStartTime" to your ToPointStreamable() call. This will enqueue 1 CTI event after each Insert event. The example code below was written to be used within LinqPad, but with some modifications will run just fine in a regular application.
var startTime = new DateTimeOffset(DateTime.Today); var source = Application .DefineEnumerable(()=> Enumerable.Range(0, 10)) .ToPointStreamable(e => PointEvent.CreateInsert(startTime.AddSeconds(e), e), AdvanceTimeSettings.IncreasingStartTime); var query = from e in source where e % 2 == 0 select e; query.ToObservable().Subscribe((e)=> e.Dump());
- Proposed As Answer by TXPower125 Monday, February 11, 2013 3:42 PM
-
Tuesday, February 12, 2013 5:06 AM
Thanks for your response.
I am enqueing CTI event after Insert event using below code through my observable source.
this.PublishEvent(new StreamInputEvent<TCPDataEvent>(new TCPDataEvent() { Data = bytes, EnqueueTimestamp = DateTime.Now })); //Enqueue our CTI this.PublishEvent(new StreamInputEvent<TCPDataEvent>(DateTime.Now.AddTicks(1)));I also have an UDSO to convert input payload into myObject but is being called just once. Is this an issue? I was expecting that it would be called for each event. Not sure what needs to be fixed ? (if my assumption is correct).
//Use UDSO for data conversion (byte[] to myObject) var myObjectQuery = from d in data.Scan(() => new myObjectOperator()) select d;
After that, with in query I have on UDO to calculate statistics for an interval (say 2 min).UDO also get called only once.
//Calculation var query = from e in myObjectQuery .HoppingWindow(TimeSpan.FromSeconds(interval), TimeSpan.FromSeconds(interval)) select new myObject { Value = e.StdDev(i => i.Value), Time = e.Max(i => i.Time) };The code below is used to get the output:
query.ToObservable().Subscribe(OnNext); //Collecting data into list for UI public void OnNext(myObject value) { myObjectDataList.Add(value); }Please suggest changes that would help me get output for each event rather than just the first one.
-
Tuesday, February 12, 2013 5:08 PM
It still sounds to me like you aren't getting CTI events. Your hopping window is going to return results every 2 minutes, because that is the size of the hopping window that you configured. Do your events occur over a greater timespan of 2 minutes?
-
Friday, February 15, 2013 3:32 AM
Thanks for your reply.
I am developing this application with WPF, may be because of this its giving such problem.
Now i am developing WPF application using WCF Observble and Observer.
I will update soon on this problem.
-
Thursday, February 28, 2013 2:21 PM
-
Tuesday, March 12, 2013 4:58 AM
I have implemented using WCF services and its worked.


