Answered How to retrieve a published stream?

  • 2012年2月7日 9:12
     
     

    Hello,

    I created a data flow with intermediate queries using published streams. When created, i used the function:

    BindOutputToPublishedStream(Uri publishedStreamName, EventShape eventShape, StreamEventOrder streamEventOrder);

    in order to connect the flow. But now i would like to retrieve a published stream in order to connect it with an other query later in my application. Is there a way to get the stream knowing his uri?

    Thank you for your help,


すべての返信

  • 2012年2月7日 13:39
     
     

    Have you tried to use BindProducer()?

    You could also use the PublishedStreamInputAdapter - the configuration takes the query's URI.


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

  • 2012年2月7日 15:04
     
     

    This works for me. I am assuming based on what you said that you have a published stream with a uri like the one shown below

    private static string s_UriTemplate = @"cep:/Server/Application/MyApp/PublishedStream/MyStream";
    private Application m_App;
    private QueryBinder m_QueryBinder;

    Uri uri = new Uri(String.Format(s_UriTemplate,
                                "VantagePoint",
                                m_EventDefinition.FullyQualifiedName));
    m_QueryBinder.AddConsumer("StreamConsumerName",
                            m_App.App.GetPublishedStreamOutputAdapter(),
                            new PublishedStreamOutputAdapterConfiguration { PublishedStreamName = uri },
                            inputEventShape,
                            StreamEventOrder.FullyOrdered);

     

  • 2012年2月7日 17:33
     
     

    well actually, i need to get the stream in order to use it in a query ( as a where clause). So i can't use any function like addConsumer or something else because i need to explicitly get the published stream.

    For example, this is the query i need (quite similar to the one in the Hitchhiker_s Guide):

    var tReferenceStream = from e in tStream1.ToPointEventStream()
    select e;

    var tObservedStream = from e in tStream2.ToPointEventStream()
    where 0 != e.Tag.Length
    select e;

    var filteredStream= from left in tReferenceStream
    where (from right in tObservedStream
    select right).IsEmpty()
    select left;

    QueryTemplate queryQT = application.CreateQueryTemplate("queryName", "description", filteredStream);
    QueryBinder queryBinder = new QueryBinder(queryQT);

    So i need to have valids streams: tStream1 and tStream2. These are two streams that i created before in the application, using published streams. So i know both stream's uri but i don't know how to retrive them in order to have something like that:

    CepStream<InputEvent> tStream1 = RetriveStreamByUri("cep:/Server/Application/SqlApplication/PublishedStream/ps1");

    CepStream<InputEvent> tStream2 = RetriveStreamByUri("cep:/Server/Application/SqlApplication/PublishedStream/ps2");

    var tReferenceStream = from e in tStream1.ToPointEventStream()
    select e;

    var tObservedStream = from e in tStream2.ToPointEventStream()
    where 0 != e.Tag.Length
    select e;

    var filteredStream= from left in tReferenceStream
    where (from right in tObservedStream
    select right).IsEmpty()
    select left;

    QueryTemplate queryQT = application.CreateQueryTemplate("queryName", "description", filteredStream);
    QueryBinder queryBinder = new QueryBinder(queryQT);

    Is-it possible?

    And the PublishedStreamInputAdapter returns an inputAdapter but i need a stream :/

    Thank you,


  • 2012年2月7日 17:56
     
     回答済み

    How about this to create your streams by consuming the published streams

                    var consumeStream = CepStream<MyPayload>.Create("consumeStream",
                        typeof(PublishedStreamAdapterFactory),
                        new PublishedStreamSourceConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApplication/PublishedStream/MyStream") },
                         EventShape.Point);

  • 2012年2月8日 9:16
     
     

    Hum, it sounds like a good idea but i guess that it is PublishedStreamInputAdapterConfiguration instead of PublishedStreamSourceConfiguration, isn't it? because i don't see any class called PublishedStreamSourceConfiguration.

    I will give it a try and keep you in touch, thank you.

  • 2012年2月8日 10:32
     
     
    Yes, PublishedStreamInputAdapterConfiguration. Sorry for the typo
  • 2012年2月8日 13:54
     
     

    Ok i tested it but i have an error when i am launching the application. An exception is raised when i create the query from the queryBinder with application.CreateQuery("myQuery", "myDescription", queryBinder), it tells me that it can't create the query "myQuery" because the input stream "observedStream" is not bound with a valid data source :/

    here is a sample of the code i used:

                    var observedStream = CepStream<MyPayload>.Create("observedStream",
                        typeof(PublishedStreamAdapterFactory),
                        new PublishedStreamInputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/SqlApplication/PublishedStream/ps0") },
                         EventShape.Point);

                     var referenceStream = from ev in tempStream
                                          select ev; // referenceStream is a valid stream i created before so i don't have to retrieve it

                     var filteredStream = from left in referenceStream
                                          where (from right in observedStream
                                                 select right).IsEmpty<MyPayload>()
                                          select left;

                     QueryTemplate queryQT = application.CreateQueryTemplate("myQueryTemplate", "myDescription",  filteredStream);
                     queryBinder = new QueryBinder(queryQT);

                    queryBinder.BindProducer("myInputStream",
                                              new Uri("cep:/Server/Application/SqlApplicatio/PublishedStream/ps0"),
                                              eventShape,
                                              inputAdvaceTimeSettings);

                    queryBinder.AddConsumer("myOutputStream",
                        outputAdpater,                   //valid and previously defined
                        outputConfigInfo,              //valid and previously defined
                        eventShape,
                        StreamEventOrder.FullyOrdered);

                    application.CreateQuery("myQuery", "myDescription", queryBinder) //exception raised here!

    Thank you,


  • 2012年2月8日 15:46
     
     

    I don't follow the structure of all your streams too well but you could try initially setting up a simple publishing server and then try consuming that with a client that just binds to the published stream. No other stream initially. It would then turn that stream into a Query which binds itself to an output adapter.

    This snippet shows how I bind to a published stream and then use Stream.ToQuery to create a query. This method access the configInfo and the AdapterFactoryType of your output adapter to setup an output adapter for the query.

                    var consumeStream = CepStream<VQPayload>.Create("consumeStream",
                        typeof(cepa.PublishedStreamAdapterFactory),
                        new cepa.PublishedStreamSourceConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/PublishedSI/PublishedStream/SimplePassThroughStream") },
                         EventShape.Point);
                    var consumingQuery = consumeStream.ToQuery(myApp,
                        "ConsumingQuery",
                        "",
                        typeof(MyOutputAdapterFactoryType),
                        MyOutputAdapterConfigInfo,
                        EventShape.Point,
                        StreamEventOrder.FullyOrdered);                 
                     consumingQuery.Start();

  • 2012年2月8日 16:15
     
     回答済み コードあり

    I don't follow the structure of all your streams too well but you could try initially setting up a simple publishing server and then try consuming that with a client that just binds to the published stream. No other stream initially. It would then turn that stream into a Query which binds itself to an output adapter.

    This snippet shows how I bind to a published stream and then use Stream.ToQuery to create a query. This method access the configInfo and the AdapterFactoryType of your output adapter to setup an output adapter for the query.

                    var consumeStream = CepStream<VQPayload>.Create("consumeStream",
                        typeof(cepa.PublishedStreamAdapterFactory),
                        new cepa.PublishedStreamSourceConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/PublishedSI/PublishedStream/SimplePassThroughStream") },
                         EventShape.Point);
                    var consumingQuery = consumeStream.ToQuery(myApp,
                        "ConsumingQuery",
                        "",
                        typeof(MyOutputAdapterFactoryType),
                        MyOutputAdapterConfigInfo,
                        EventShape.Point,
                        StreamEventOrder.FullyOrdered);                 
                     consumingQuery.Start();

  • 2012年2月9日 9:06
     
      コードあり

    You don't follow the structure because there is really something wrong ^^

    Well i tried to create a simple publishing server and i had the same problem :/ After some hours searching for the cause, i finally find that i missed something. It appears that my new consuming stream was not bound with the published stream, so i have to do it manually:

    var observedStream = CepStream.Create("observedStream",
                        typeof(PublishedStreamAdapterFactory),
                        new PublishedStreamInputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/SqlApplication/PublishedStream/ps0") },
                         EventShape.Point);// here i believed that the stream was automatically bound to the published stream but it wasn't
    
                     var referenceStream = from ev in tempStream
                                          select ev; // tempStream is a valid stream i created before so i don't have to retrieve it
    
                     var filteredStream = from left in referenceStream
                                          where (from right in observedStream
                                                 select right).IsEmpty()
                                          select left;
    
                     QueryTemplate queryQT = application.CreateQueryTemplate("myQueryTemplate", "myDescription",  filteredStream);
                     queryBinder = new QueryBinder(queryQT);
    
                    queryBinder.BindProducer("observedStream", //here was my mistake, i have to bind manually the stream with the published stream
                                              new Uri("cep:/Server/Application/SqlApplicatio/PublishedStream/ps0"),
                                              eventShape,
                                              inputAdvaceTimeSettings);
    
                    queryBinder.AddConsumer("myOutputStream",
                        outputAdpater,                   //valid and previously defined
                        outputConfigInfo,              //valid and previously defined
                        eventShape,
                        StreamEventOrder.FullyOrdered);
    
                    application.CreateQuery("myQuery", "myDescription", queryBinder)

    And now the flow is working

    Thank you for your help!