One stream - Multiple Output Adapters

Answered One stream - Multiple Output Adapters

  • Tuesday, February 12, 2013 1:29 PM
     
      Has Code

    Hi,

    I found this thread:

    http://social.msdn.microsoft.com/Forums/en-US/streaminsight/thread/aae3c6cb-412b-44fa-8b80-cd433a7255fb/

    There JAhlen asked to "Allowing Events to be sent to multiple outputs"
    and Roman Schindlauer answered: "no, not yet".
    This was 2010.

    Are there any changes since then?

    Concrete issue:

    I createt a stream:

    var input = CepStream<Bean>.Create("twitter", typeof(TwitterFactory), inputConfig, EventShape.Point);


    Next I wrote a query:

    var expr = from e in input
       select new Bean
          {...}

    Now I want to use input to send results to Adapter A and I want to use expr to send results to Adapter B.

    query = input.ToQuery(...); query2 = expr2.ToQuery(...);

    query.Start();
    query2.Start();

    The Problem is that only one Adapter will receive results not both.

    Am I wrong or what is the fault?

    Thanks for advice!

    Edit:

    even if I add

     var input2 = CepStream<Tweet>.Create("twitter2", typeof(TwitterFactory), inputConfig, EventShape.Point);

    For the second adapter when executing the start-methods only one of them (it randomly changes) will receive data. o_O

    Edit2:

    I fokused out that executing query2.Start() (after a sleep of 10 seconds) the first query (which was still running within that 10 seconds) is killed by the second an up to now just the second query delivers data....

    • Edited by Kaspatoo Tuesday, February 12, 2013 2:44 PM
    •  

All Replies

  • Tuesday, February 12, 2013 8:05 PM
     
     Answered Has Code

    You need to use what's called Dynamic Query Composition (DQC). Take a look at Composing Queries at Runtime in the MSDN documentation for more information.

    Try this:

    // create source stream
    var input = CepStream<Bean>.Create("twitter", typeof (TwitterFactory), inputConfig, EventShape.Point);
    
    // bind the input stream to the first output adapter
    Query query1 = input.ToQuery(application, "Query1", "Query1 description",
                                    typeof (OutputAdapterFactory1), new OutputAdapterFactory1Config(),
                                    EventShape.Point, StreamEventOrder.FullyOrdered);
    
    // convert query1 to a stream
    CepStream<int> query2SourceStream = query1.ToStream<int>("Query1SourceStream");
    
    // query logic goes here
    CepStream<int> expr = from e in query2SourceStream
            select e;
    
    // bind the results from the query logic to 
    Query query2 = expr.ToQuery(application, "Query2", "Query2 description",
                                typeof (OutputAdapterFactory2), new OutputAdapterFactory2Config(),
                                EventShape.Point, StreamEventOrder.FullyOrdered);
    // start the first query
    query1.Start();
    
    // start the second query
    query2.Start();
    For what it's worth, you may want to look at the newer Rx-based StreamInsight 2.1 APIs. The adapter model is now considered legacy and the new event Sources/Sinks approach is the preferred method of using StreamInsight.
    • Proposed As Answer by TXPower125 Tuesday, February 12, 2013 8:05 PM
    • Marked As Answer by Kaspatoo Thursday, February 14, 2013 8:28 AM
    •  
  • Thursday, February 14, 2013 8:32 AM
     
     

    Hey thanks very much!

    But why does my last "workaround" (I know its not the way I should use) not function?
    I created two separate streams which should work independently, shouldnt they?
    What if I want to create two streams each goind to another source?

    The second one killed the first one, so there must be a resource both streams are going to.
    The only I could image is the Connection of the input adapter to (in my case Twitter).
    Sems it like that or do you probably know what/ where the problem is.

    Thanks again.

  • Thursday, February 14, 2013 1:39 PM
     
     Answered

    Well, you didn't post any input adapter code, so I can't see what you are doing there. Based on what you were doing though, you would end up with 2 instances of your Twitter input adapter. They must be stepping on each other.

    DQC allows us to share the streaming data through 1 Twitter input adapter instance with multiple queries. If you wanted to output the results of the query logic (expr) to more than one place, you would just call ToQuery() on the stream and bind it to whichever output adapter you need.

    Does that help?

    • Edited by TXPower125 Thursday, February 14, 2013 1:46 PM Added more detail
    • Marked As Answer by Kaspatoo Thursday, February 14, 2013 2:08 PM
    •  
  • Thursday, February 14, 2013 2:08 PM
     
     

    Hey,

    well if so you and me now think the problem lies within the InputAdapter (anything seems to run wrong there) then yes, you helped me.

    Thanks much.

  • Thursday, February 14, 2013 2:12 PM
     
     

    Without seeing the code, I can't tell for sure. However, given that you want to pull that data into StreamInsight and then write out the raw data to one output adapter and then do some other logic whose results are written to another output adapter, what I have shown you is the best way to go about it.