How to retrieve a published stream?
-
2012年2月7日 上午 09:12
Hello,
I created a data flow with intermediate queries using published streams. When created, i used the function:
BindOutputToPublishedStream(Uri publishedStreamName, EventShape eventShape, StreamEventOrder streamEventOrder);
in order to connect the flow. But now i would like to retrieve a published stream in order to connect it with an other query later in my application. Is there a way to get the stream knowing his uri?
Thank you for your help,
- 已編輯 qerluisdhivdufhwxcs 2012年2月7日 下午 12:58
所有回覆
-
2012年2月7日 下午 01:39
Have you tried to use BindProducer()?
You could also use the PublishedStreamInputAdapter - the configuration takes the query's URI.
DevBiker (aka J Sawyer)
My Blog
My Bike - Concours 14
If I answered your question, please mark as answer.
If my post was helpful, please mark as helpful. -
2012年2月7日 下午 03:04
This works for me. I am assuming based on what you said that you have a published stream with a uri like the one shown below
private static string s_UriTemplate = @"cep:/Server/Application/MyApp/PublishedStream/MyStream";
private Application m_App;
private QueryBinder m_QueryBinder;Uri uri = new Uri(String.Format(s_UriTemplate,
"VantagePoint",
m_EventDefinition.FullyQualifiedName));
m_QueryBinder.AddConsumer("StreamConsumerName",
m_App.App.GetPublishedStreamOutputAdapter(),
new PublishedStreamOutputAdapterConfiguration { PublishedStreamName = uri },
inputEventShape,
StreamEventOrder.FullyOrdered);
-
2012年2月7日 下午 05:33
well actually, i need to get the stream in order to use it in a query ( as a where clause). So i can't use any function like addConsumer or something else because i need to explicitly get the published stream.
For example, this is the query i need (quite similar to the one in the Hitchhiker_s Guide):
var tReferenceStream = from e in tStream1.ToPointEventStream()
select e;
var tObservedStream = from e in tStream2.ToPointEventStream()
where 0 != e.Tag.Length
select e;
var filteredStream= from left in tReferenceStream
where (from right in tObservedStream
select right).IsEmpty()
select left;QueryTemplate queryQT = application.CreateQueryTemplate("queryName", "description", filteredStream);
QueryBinder queryBinder = new QueryBinder(queryQT);So i need to have valids streams: tStream1 and tStream2. These are two streams that i created before in the application, using published streams. So i know both stream's uri but i don't know how to retrive them in order to have something like that:
CepStream<InputEvent> tStream1 = RetriveStreamByUri("cep:/Server/Application/SqlApplication/PublishedStream/ps1");
CepStream<InputEvent> tStream2 = RetriveStreamByUri("cep:/Server/Application/SqlApplication/PublishedStream/ps2");
var tReferenceStream = from e in tStream1.ToPointEventStream()
select e;
var tObservedStream = from e in tStream2.ToPointEventStream()
where 0 != e.Tag.Length
select e;
var filteredStream= from left in tReferenceStream
where (from right in tObservedStream
select right).IsEmpty()
select left;QueryTemplate queryQT = application.CreateQueryTemplate("queryName", "description", filteredStream);
QueryBinder queryBinder = new QueryBinder(queryQT);Is-it possible?
And the PublishedStreamInputAdapter returns an inputAdapter but i need a stream :/
Thank you,
- 已編輯 qerluisdhivdufhwxcs 2012年2月7日 下午 05:35
-
2012年2月7日 下午 05:56
How about this to create your streams by consuming the published streams
var consumeStream = CepStream<MyPayload>.Create("consumeStream",
typeof(PublishedStreamAdapterFactory),
new PublishedStreamSourceConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApplication/PublishedStream/MyStream") },
EventShape.Point);
- 已標示為解答 qerluisdhivdufhwxcs 2012年2月9日 上午 09:08
-
2012年2月8日 上午 09:16
Hum, it sounds like a good idea but i guess that it is PublishedStreamInputAdapterConfiguration instead of PublishedStreamSourceConfiguration, isn't it? because i don't see any class called PublishedStreamSourceConfiguration.
I will give it a try and keep you in touch, thank you.
- 已標示為解答 qerluisdhivdufhwxcs 2012年2月9日 上午 09:08
- 已取消標示為解答 qerluisdhivdufhwxcs 2012年2月9日 上午 09:08
-
2012年2月8日 上午 10:32Yes, PublishedStreamInputAdapterConfiguration. Sorry for the typo
-
2012年2月8日 下午 01:54
Ok i tested it but i have an error when i am launching the application. An exception is raised when i create the query from the queryBinder with application.CreateQuery("myQuery", "myDescription", queryBinder), it tells me that it can't create the query "myQuery" because the input stream "observedStream" is not bound with a valid data source :/
here is a sample of the code i used:
var observedStream = CepStream<MyPayload>.Create("observedStream",
typeof(PublishedStreamAdapterFactory),
new PublishedStreamInputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/SqlApplication/PublishedStream/ps0") },
EventShape.Point);var referenceStream = from ev in tempStream
select ev; // referenceStream is a valid stream i created before so i don't have to retrieve it
var filteredStream = from left in referenceStream
where (from right in observedStream
select right).IsEmpty<MyPayload>()
select left;
QueryTemplate queryQT = application.CreateQueryTemplate("myQueryTemplate", "myDescription", filteredStream);
queryBinder = new QueryBinder(queryQT);queryBinder.BindProducer("myInputStream",
new Uri("cep:/Server/Application/SqlApplicatio/PublishedStream/ps0"),
eventShape,
inputAdvaceTimeSettings);queryBinder.AddConsumer("myOutputStream",
outputAdpater, //valid and previously defined
outputConfigInfo, //valid and previously defined
eventShape,
StreamEventOrder.FullyOrdered);application.CreateQuery("myQuery", "myDescription", queryBinder) //exception raised here!
Thank you,
- 已編輯 qerluisdhivdufhwxcs 2012年2月8日 下午 01:54
-
2012年2月8日 下午 03:46
I don't follow the structure of all your streams too well but you could try initially setting up a simple publishing server and then try consuming that with a client that just binds to the published stream. No other stream initially. It would then turn that stream into a Query which binds itself to an output adapter.
This snippet shows how I bind to a published stream and then use Stream.ToQuery to create a query. This method access the configInfo and the AdapterFactoryType of your output adapter to setup an output adapter for the query.
var consumeStream = CepStream<VQPayload>.Create("consumeStream",
typeof(cepa.PublishedStreamAdapterFactory),
new cepa.PublishedStreamSourceConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/PublishedSI/PublishedStream/SimplePassThroughStream") },
EventShape.Point);
var consumingQuery = consumeStream.ToQuery(myApp,
"ConsumingQuery",
"",
typeof(MyOutputAdapterFactoryType),
MyOutputAdapterConfigInfo,
EventShape.Point,
StreamEventOrder.FullyOrdered);
consumingQuery.Start(); -
2012年2月8日 下午 04:15
I don't follow the structure of all your streams too well but you could try initially setting up a simple publishing server and then try consuming that with a client that just binds to the published stream. No other stream initially. It would then turn that stream into a Query which binds itself to an output adapter.
This snippet shows how I bind to a published stream and then use Stream.ToQuery to create a query. This method access the configInfo and the AdapterFactoryType of your output adapter to setup an output adapter for the query.
var consumeStream = CepStream<VQPayload>.Create("consumeStream", typeof(cepa.PublishedStreamAdapterFactory), new cepa.PublishedStreamSourceConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/PublishedSI/PublishedStream/SimplePassThroughStream") }, EventShape.Point); var consumingQuery = consumeStream.ToQuery(myApp, "ConsumingQuery", "", typeof(MyOutputAdapterFactoryType), MyOutputAdapterConfigInfo, EventShape.Point, StreamEventOrder.FullyOrdered); consumingQuery.Start();
- 已標示為解答 qerluisdhivdufhwxcs 2012年2月9日 上午 09:08
-
2012年2月9日 上午 09:06
You don't follow the structure because there is really something wrong ^^
Well i tried to create a simple publishing server and i had the same problem :/ After some hours searching for the cause, i finally find that i missed something. It appears that my new consuming stream was not bound with the published stream, so i have to do it manually:
var observedStream = CepStream.Create("observedStream", typeof(PublishedStreamAdapterFactory), new PublishedStreamInputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/SqlApplication/PublishedStream/ps0") }, EventShape.Point);// here i believed that the stream was automatically bound to the published stream but it wasn't var referenceStream = from ev in tempStream select ev; // tempStream is a valid stream i created before so i don't have to retrieve it var filteredStream = from left in referenceStream where (from right in observedStream select right).IsEmpty() select left; QueryTemplate queryQT = application.CreateQueryTemplate("myQueryTemplate", "myDescription", filteredStream); queryBinder = new QueryBinder(queryQT); queryBinder.BindProducer("observedStream", //here was my mistake, i have to bind manually the stream with the published stream new Uri("cep:/Server/Application/SqlApplicatio/PublishedStream/ps0"), eventShape, inputAdvaceTimeSettings); queryBinder.AddConsumer("myOutputStream", outputAdpater, //valid and previously defined outputConfigInfo, //valid and previously defined eventShape, StreamEventOrder.FullyOrdered); application.CreateQuery("myQuery", "myDescription", queryBinder)
And now the flow is working
Thank you for your help!
- 已編輯 qerluisdhivdufhwxcs 2012年2月9日 上午 09:07

