Proposed Answer No Output from the Subscriber

  • Monday, February 11, 2013 2:40 PM
     
      Has Code

    I am enqueing CTI event throught GetPointEvent() method. The Subscribe method gets called but am facing another issue where I am unable to get the output of the query. When I subscribe observer, its type of StreamInputEvent<TpayloadType>

     protected void PublishEvent(StreamInputEvent<TPayloadType> newEvent)
     {
       _observer.OnNext(newEvent);
     }

    But when trying to get data through sink as

    //Sink to get the data after calculation
    query.ToObservable().Subscribe(OnNext);

    OnNext method use myObject class to get calculated data

    public void OnNext(myObject value)
    {
     myObjectList.Add(value);
    }

    Anything that I might have missed due to which I am not getting the output? Please suggest.

All Replies

  • Monday, February 11, 2013 3:40 PM
     
     Proposed Answer Has Code

    Based on what you have posted in your other thread, you are not enqueueing CTI events. CTI events advance application time. Without them, you will not see any output because your events will never leave your input adapter/source. You can confirm this by tracing the workflow with Event Flow Debugger. As a starting point, you'll want to add the "AdvanceTimeSettings.IncreasingStartTime" to your ToPointStreamable() call. This will enqueue 1 CTI event after each Insert event. The example code below was written to be used within LinqPad, but with some modifications will run just fine in a regular application.

    var startTime = new DateTimeOffset(DateTime.Today);
    
    var source = Application
    	.DefineEnumerable(()=> Enumerable.Range(0, 10))
    	.ToPointStreamable(e => PointEvent.CreateInsert(startTime.AddSeconds(e), e),
    	AdvanceTimeSettings.IncreasingStartTime);
    
    var query = from e in source
    	where e % 2 == 0
    	select e;
    
    query.ToObservable().Subscribe((e)=> e.Dump());

    • Proposed As Answer by TXPower125 Monday, February 11, 2013 3:42 PM
    •  
  • Tuesday, February 12, 2013 5:06 AM
     
      Has Code

    Thanks for your response.

    I am enqueing CTI event after Insert event using below code through my observable source.

    this.PublishEvent(new StreamInputEvent<TCPDataEvent>(new TCPDataEvent() { Data = bytes, EnqueueTimestamp = DateTime.Now }));
    //Enqueue our CTI
    this.PublishEvent(new StreamInputEvent<TCPDataEvent>(DateTime.Now.AddTicks(1)));

    I also have an UDSO to convert input payload into myObject but is being called just once. Is this an issue? I was expecting that it would be called for each event. Not sure what needs to be fixed ? (if my assumption is correct).

    //Use UDSO for data conversion (byte[] to myObject)
    var myObjectQuery = from d in data.Scan(() => new myObjectOperator())
    select d;
    

    After that, with in query I have on UDO to calculate statistics for an interval (say 2 min).UDO also get called only once.

    //Calculation
    var query = from e in myObjectQuery .HoppingWindow(TimeSpan.FromSeconds(interval), TimeSpan.FromSeconds(interval)) 
    select new myObject { Value = e.StdDev(i => i.Value), Time = e.Max(i => i.Time) };
    

    The code below is used to get the output:

    query.ToObservable().Subscribe(OnNext);
    
    //Collecting  data into list for UI
    public void OnNext(myObject value)
    {
                myObjectDataList.Add(value);
    }

    Please suggest changes that would help me get output for each event rather than just the first one.

  • Tuesday, February 12, 2013 5:08 PM
     
     

    It still sounds to me like you aren't getting CTI events. Your hopping window is going to return results every 2 minutes, because that is the size of the hopping window that you configured. Do your events occur over a greater timespan of 2 minutes?

  • Friday, February 15, 2013 3:32 AM
     
     

    Thanks for your reply.

    I am developing this application with WPF, may be because of this its giving such problem.

    Now i am developing WPF application using WCF Observble and Observer.

    I will update soon on this problem.

  • Thursday, February 28, 2013 2:21 PM
     
     
    How is this going?

    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

  • Tuesday, March 12, 2013 4:58 AM
     
     

    I have implemented using WCF services and its worked.