Note: Forums will be making significant UX changes to address key usability improvements surrounding search, discoverability and navigation. To learn more about these changes please visit the announcement which can be found HERE.
How to retrieve a published stream?

已答覆 How to retrieve a published stream?

  • 2012年2月7日 上午 09:12
     
     

    Hello,

    I created a data flow with intermediate queries using published streams. When created, i used the function:

    BindOutputToPublishedStream(Uri publishedStreamName, EventShape eventShape, StreamEventOrder streamEventOrder);

    in order to connect the flow. But now i would like to retrieve a published stream in order to connect it with an other query later in my application. Is there a way to get the stream knowing his uri?

    Thank you for your help,


所有回覆

  • 2012年2月7日 下午 01:39
     
     

    Have you tried to use BindProducer()?

    You could also use the PublishedStreamInputAdapter - the configuration takes the query's URI.


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

  • 2012年2月7日 下午 03:04
     
     

    This works for me. I am assuming based on what you said that you have a published stream with a uri like the one shown below

    private static string s_UriTemplate = @"cep:/Server/Application/MyApp/PublishedStream/MyStream";
    private Application m_App;
    private QueryBinder m_QueryBinder;

    Uri uri = new Uri(String.Format(s_UriTemplate,
                                "VantagePoint",
                                m_EventDefinition.FullyQualifiedName));
    m_QueryBinder.AddConsumer("StreamConsumerName",
                            m_App.App.GetPublishedStreamOutputAdapter(),
                            new PublishedStreamOutputAdapterConfiguration { PublishedStreamName = uri },
                            inputEventShape,
                            StreamEventOrder.FullyOrdered);

     

  • 2012年2月7日 下午 05:33
     
     

    well actually, i need to get the stream in order to use it in a query ( as a where clause). So i can't use any function like addConsumer or something else because i need to explicitly get the published stream.

    For example, this is the query i need (quite similar to the one in the Hitchhiker_s Guide):

    var tReferenceStream = from e in tStream1.ToPointEventStream()
    select e;

    var tObservedStream = from e in tStream2.ToPointEventStream()
    where 0 != e.Tag.Length
    select e;

    var filteredStream= from left in tReferenceStream
    where (from right in tObservedStream
    select right).IsEmpty()
    select left;

    QueryTemplate queryQT = application.CreateQueryTemplate("queryName", "description", filteredStream);
    QueryBinder queryBinder = new QueryBinder(queryQT);

    So i need to have valids streams: tStream1 and tStream2. These are two streams that i created before in the application, using published streams. So i know both stream's uri but i don't know how to retrive them in order to have something like that:

    CepStream<InputEvent> tStream1 = RetriveStreamByUri("cep:/Server/Application/SqlApplication/PublishedStream/ps1");

    CepStream<InputEvent> tStream2 = RetriveStreamByUri("cep:/Server/Application/SqlApplication/PublishedStream/ps2");

    var tReferenceStream = from e in tStream1.ToPointEventStream()
    select e;

    var tObservedStream = from e in tStream2.ToPointEventStream()
    where 0 != e.Tag.Length
    select e;

    var filteredStream= from left in tReferenceStream
    where (from right in tObservedStream
    select right).IsEmpty()
    select left;

    QueryTemplate queryQT = application.CreateQueryTemplate("queryName", "description", filteredStream);
    QueryBinder queryBinder = new QueryBinder(queryQT);

    Is-it possible?

    And the PublishedStreamInputAdapter returns an inputAdapter but i need a stream :/

    Thank you,


  • 2012年2月7日 下午 05:56
     
     已答覆

    How about this to create your streams by consuming the published streams

                    var consumeStream = CepStream<MyPayload>.Create("consumeStream",
                        typeof(PublishedStreamAdapterFactory),
                        new PublishedStreamSourceConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApplication/PublishedStream/MyStream") },
                         EventShape.Point);

  • 2012年2月8日 上午 09:16
     
     

    Hum, it sounds like a good idea but i guess that it is PublishedStreamInputAdapterConfiguration instead of PublishedStreamSourceConfiguration, isn't it? because i don't see any class called PublishedStreamSourceConfiguration.

    I will give it a try and keep you in touch, thank you.

  • 2012年2月8日 上午 10:32
     
     
    Yes, PublishedStreamInputAdapterConfiguration. Sorry for the typo
  • 2012年2月8日 下午 01:54
     
     

    Ok i tested it but i have an error when i am launching the application. An exception is raised when i create the query from the queryBinder with application.CreateQuery("myQuery", "myDescription", queryBinder), it tells me that it can't create the query "myQuery" because the input stream "observedStream" is not bound with a valid data source :/

    here is a sample of the code i used:

                    var observedStream = CepStream<MyPayload>.Create("observedStream",
                        typeof(PublishedStreamAdapterFactory),
                        new PublishedStreamInputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/SqlApplication/PublishedStream/ps0") },
                         EventShape.Point);

                     var referenceStream = from ev in tempStream
                                          select ev; // referenceStream is a valid stream i created before so i don't have to retrieve it

                     var filteredStream = from left in referenceStream
                                          where (from right in observedStream
                                                 select right).IsEmpty<MyPayload>()
                                          select left;

                     QueryTemplate queryQT = application.CreateQueryTemplate("myQueryTemplate", "myDescription",  filteredStream);
                     queryBinder = new QueryBinder(queryQT);

                    queryBinder.BindProducer("myInputStream",
                                              new Uri("cep:/Server/Application/SqlApplicatio/PublishedStream/ps0"),
                                              eventShape,
                                              inputAdvaceTimeSettings);

                    queryBinder.AddConsumer("myOutputStream",
                        outputAdpater,                   //valid and previously defined
                        outputConfigInfo,              //valid and previously defined
                        eventShape,
                        StreamEventOrder.FullyOrdered);

                    application.CreateQuery("myQuery", "myDescription", queryBinder) //exception raised here!

    Thank you,


  • 2012年2月8日 下午 03:46
     
     

    I don't follow the structure of all your streams too well but you could try initially setting up a simple publishing server and then try consuming that with a client that just binds to the published stream. No other stream initially. It would then turn that stream into a Query which binds itself to an output adapter.

    This snippet shows how I bind to a published stream and then use Stream.ToQuery to create a query. This method access the configInfo and the AdapterFactoryType of your output adapter to setup an output adapter for the query.

                    var consumeStream = CepStream<VQPayload>.Create("consumeStream",
                        typeof(cepa.PublishedStreamAdapterFactory),
                        new cepa.PublishedStreamSourceConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/PublishedSI/PublishedStream/SimplePassThroughStream") },
                         EventShape.Point);
                    var consumingQuery = consumeStream.ToQuery(myApp,
                        "ConsumingQuery",
                        "",
                        typeof(MyOutputAdapterFactoryType),
                        MyOutputAdapterConfigInfo,
                        EventShape.Point,
                        StreamEventOrder.FullyOrdered);                 
                     consumingQuery.Start();

  • 2012年2月8日 下午 04:15
     
     已答覆 包含代碼

    I don't follow the structure of all your streams too well but you could try initially setting up a simple publishing server and then try consuming that with a client that just binds to the published stream. No other stream initially. It would then turn that stream into a Query which binds itself to an output adapter.

    This snippet shows how I bind to a published stream and then use Stream.ToQuery to create a query. This method access the configInfo and the AdapterFactoryType of your output adapter to setup an output adapter for the query.

                    var consumeStream = CepStream<VQPayload>.Create("consumeStream",
                        typeof(cepa.PublishedStreamAdapterFactory),
                        new cepa.PublishedStreamSourceConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/PublishedSI/PublishedStream/SimplePassThroughStream") },
                         EventShape.Point);
                    var consumingQuery = consumeStream.ToQuery(myApp,
                        "ConsumingQuery",
                        "",
                        typeof(MyOutputAdapterFactoryType),
                        MyOutputAdapterConfigInfo,
                        EventShape.Point,
                        StreamEventOrder.FullyOrdered);                 
                     consumingQuery.Start();

  • 2012年2月9日 上午 09:06
     
      包含代碼

    You don't follow the structure because there is really something wrong ^^

    Well i tried to create a simple publishing server and i had the same problem :/ After some hours searching for the cause, i finally find that i missed something. It appears that my new consuming stream was not bound with the published stream, so i have to do it manually:

    var observedStream = CepStream.Create("observedStream",
                        typeof(PublishedStreamAdapterFactory),
                        new PublishedStreamInputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/SqlApplication/PublishedStream/ps0") },
                         EventShape.Point);// here i believed that the stream was automatically bound to the published stream but it wasn't
    
                     var referenceStream = from ev in tempStream
                                          select ev; // tempStream is a valid stream i created before so i don't have to retrieve it
    
                     var filteredStream = from left in referenceStream
                                          where (from right in observedStream
                                                 select right).IsEmpty()
                                          select left;
    
                     QueryTemplate queryQT = application.CreateQueryTemplate("myQueryTemplate", "myDescription",  filteredStream);
                     queryBinder = new QueryBinder(queryQT);
    
                    queryBinder.BindProducer("observedStream", //here was my mistake, i have to bind manually the stream with the published stream
                                              new Uri("cep:/Server/Application/SqlApplicatio/PublishedStream/ps0"),
                                              eventShape,
                                              inputAdvaceTimeSettings);
    
                    queryBinder.AddConsumer("myOutputStream",
                        outputAdpater,                   //valid and previously defined
                        outputConfigInfo,              //valid and previously defined
                        eventShape,
                        StreamEventOrder.FullyOrdered);
    
                    application.CreateQuery("myQuery", "myDescription", queryBinder)

    And now the flow is working

    Thank you for your help!