import cti's
-
Wednesday, December 19, 2012 6:00 PM
apologies if this has been asked and answered... but what is the best way to import cti's from one IQStreamable<T> interface to another (pref. without using DefineObservable )?
stream 1 (fast moving) ...
var stream1 = application.DefineObservable(() => new StreamProducer(config));//yields IQStreamable<PointEvent<TPayload>> depending on type from producer.
varctis = stream1.Where(e => e.EventKind == EventKind.Cti).Select(e => e.Start);//event info required for filtering out cti's
stream 2 (slower moving) ...
varstream2 = from t in someOtherQuery select t;//yields IQStreamable<TPayload> interface - perhaps from another query
Need to import cti's from stream 1 into stream 2. Since DefineObservable interface yields Event Info + Payload (IQStreamable<PointEvent<TPayload>>) and someOtherQuery query only TPayload a conversion needs to take place.
1) I assume I need to convert stream2 to observable -> IQStreamable<TPayload>
2) Create query for creating new event i.e. Event + Payload -> IQStreamable<PointEvent<TPayload>>
3) Import cti's
4) Create Query for further use -> TPayload only
5) Create Subject (*Optional) and bind
Any help with the above would be greatly appreciated.
Thanks.
All Replies
-
Thursday, December 20, 2012 12:45 PM
See my previous reply and this blog entry.
Now ... depending on how your producer is designed, it may be possible to get an observable directly from the producer's observable to import into the reference stream. If, however, you are declaring your advance time settings, you won't be able to do that. If you look at the event structure that I introduced in "Dual Mode Data Sources", there is a "marker" for a CTI and there's no reason that you couldn't use that for your source for CTIs.
DevBiker (aka J Sawyer)
Microsoft MVP - Sql Server (StreamInsight)
If I answered your question, please mark as answer.
If my post was helpful, please mark as helpful. -
Thursday, December 20, 2012 1:58 PM
Hi DevBiker - I looked at your previous blog and implemented part of your provide solution re Cti import - quite cool indeed! However, your example assumes that both the reference stream and the data stream are created from DefineObservable and are of the same type (PublishedEvent<TPayload>). It also assumes a creational preference which is fine since its an example.
In my case my reference stream is of producer type PublishedEvent<TPayload> whilst the data stream type is IQStreamable<TPayload>. As soon as I try to convert the data stream to IQStreamable<PublishedEvent<TPayload>> re the Cti merge, I get serialize errors. This is probably due to the fact that the query is no longer remotable/ bindable.
So the question remains - how does one convert a normal IQStreamable type (payload only) to point event IQStreamable<PointEvent<TPayload>> for ease of cti importation.
I hope this makes sense...
-
Thursday, December 20, 2012 3:00 PM
Your serialization errors probably come in to play because the DefineObservable() method isn't serializable ... with 2.1, the entire expression is serialized so there are some things that you need to do to make sure that you can serialize it, particularly if you are using configuration classes - that's why (you'll notice) that the configuration class in my latest series is marked as a DataContract. Without seeing more of your code, however, it's difficult to say exactly where the serialization issue is popping up but looking at the code above, I'd guess that it's the config that isn't serializing correctly. Now ... you won't get this exception until you run the process ... it's the whole "deferred execution" Linq paradigm. Your entire query tree is parsed and executed only when you run it.
As for converting IQStreamable<Payload> to IObservable<PointEvent<TPayload>> ... IQStreamable already has the temporal properties. You can't have an IQStreamable of PointEvent<TPayload> ... the temporal properties defined by the point event are a part of the event header in the StreamInsight engine (and you can't have a DateTimeOffset in your payload either ...) so it's not necessary anyway. How you define the sink then determines whether you get the full event (Point/Interval/Edge) or if you get just the payload. The different event shapes are really an expression of the events ... all events, in the temporal stream, have start and end times and the shape is a matter of how they are expressed to the output. You can take the same exact stream and express it as point, interval AND edge ... at the same time ... to different sinks.
Now ... your PublishedEvent class ... what does that look like? And your Observable "becomes" a IQStreamable via ToPointStreamable() ... that "brings it into the StreamInsight engine" and where you define the temporal properties.
I'm not sure if that clarifies anything ...
DevBiker (aka J Sawyer)
Microsoft MVP - Sql Server (StreamInsight)
If I answered your question, please mark as answer.
If my post was helpful, please mark as helpful. -
Thursday, December 20, 2012 3:34 PM
Ah I think the penny is dropping albeit slowly :) So serialization issues occur because my config is not marked serializable (this makes sense and I will check it out) and IQStreamable is in fact temporal - so here I am trying to convert something into a type which is basically wrong from the start - yep still learning :) I have posted some of my code to illustrate the problem further but I think I need to revise the interfaces. In my example below the first #region is a fast moving price stream (market data of some sorts) - the second #region the Stock price query which, in this example, does not take in the Price directly but a derivative of it via instrumentQuery; lastly the #region "Stock Query" is the Stock Class interface - which returns a type IQStreamable<StockEvent>. The published event class is from your example and the PriceProducer inherits IObservable<PublishedEvent<TPayloadType>>.
Lets agree this thread for further q&a. tx.c
#region Price Source var priceConfiguration = new priceConfig(){ ... }; var PriceSource = application.DefineObservable(() => new PriceProducer(PriceConfiguration)); var PriceCti = PriceSource.Where(e => e.EventKind == EventKind.Cti).Select(e => e.Start); var PriceStream = PriceSource.ToPointStreamable(e => e.GetPointEvent()); var PriceQuery = from m in PriceStream select m; var PriceSubject = Helpers.Subject.GetOrCreateSubject<PriceEvent>(application, "PriceSubject"); var PriceSink = application.DefineObserver( () => Observer.Create<PriceEvent>(x => Console.WriteLine(" Price Instrument {0}", x.Instrument))); PriceQuery.Bind(PriceSubject).With(PriceSubject.Bind(PriceSink)).Run("PriceProcess"); #endregion #region Stock Stream var stockStream = Stock.StockQuery(application, instrumentQuery, tradeConfigurationQuery); var stockQuery = from q in stockStream select q; var stockSubject = Helpers.Subject.GetOrCreateSubject<StockEvent>(application, "StockSubject"); var stockSink = application.DefineObserver( () => Observer.Create<StockEvent>(x => Console.WriteLine("Stock Id {0}", x.StockId))); stockQuery.Bind(stockSubject).With(stockSubject.Bind(stockSink)).Run("StockProcess"); #endregion #region Stock Query public class Stock { /// <summary> /// Stock. Calculates stock price. /// </summary> /// <returns>IQStreamable StockEvent.</returns> public static IQStreamable<StockEvent> StockQuery(Application application, IQStreamable<InstumentEvent> instrumentStream, IQStreamable<TradeConfigurationEvent> tradeConfigurationStream) { ///... shortened for brevity } } #endregion
-
Thursday, December 20, 2012 3:42 PMforgot to add to above ... its the Stock Stream that needs to import PriceCti from #Price Source.
-
Thursday, December 20, 2012 8:35 PM
OK ... you have an InstrumentEvent stream and a TradeConfigurationEvent stream ... these are both streams (IQStreamable) so they already have CTIs. Do you want to import the CTIs into these streams? What is the source for those streams? That may be where you want to do it - I'm not sure exactly just yet.
Better yet ... how are you doing this in the Query (as opposed to Reactive) model? That might help me point you in the right direction.
DevBiker (aka J Sawyer)
Microsoft MVP - Sql Server (StreamInsight)
If I answered your question, please mark as answer.
If my post was helpful, please mark as helpful. -
Thursday, December 20, 2012 9:45 PMThanks for your reply. In the above example I have used the tradeConfiguration stream, it's source being defined with DefineObserver interface, to import ctis via PriceCti. This only works because there is an indirect inheritance of both streams via Instrument and TradeConfiguration. The solution at the moment involves importing cti's via the configuration stream. There are other situations where there one or streams (via IQStreamable) not related, require to be joined with common cti source. In the above example, ignoring the Instrument and TradeConfiguration streams ( and their respective Cti's) how would you import PriceCti into StockStream/ StockQuery?
-
Friday, December 21, 2012 3:09 PM
Assuming that stock stream is a reference stream, I would import the CTIs from the data (PriceCti) stream. You would need to modify the CtiImporter from my blog (that's what I'd use). To make sure that all of the reference events match with the data events, I would make sure that the data stream enqueues a CTI on startup that is before any expected events (even if it's 1/1/2000) and use that as the start time for the reference events. The reference events would be enqueued as an interval with the aforementioned start time and an end time of DateTimeOffset.MaxValue. Once in a streamable, I would use the ToSignal pattern to make sure that I have the latest version of the reference data ... this would be refreshed occassionally. ToSignal will clip old reference events and drop them off so there's no worry about that end time. On refresh, the start time would be based on the last CTI processed through the stream.
This is a level of flexibility that you have with 2.1 ... you could do something like this in <=2.0 by importing CTIs with an AdvanceTimePolicy of Adjust but you didn't have the control and awareness of what was going on.
DevBiker (aka J Sawyer)
Microsoft MVP - Sql Server (StreamInsight)
If I answered your question, please mark as answer.
If my post was helpful, please mark as helpful.- Marked As Answer by 2deviate Friday, December 21, 2012 6:29 PM
-
Friday, December 21, 2012 6:29 PM
Thanks for looking into this DevBiker - I will battle the hatch a bit and try your suggestion re importer... on a whole i think your earlier comment re "expression to outputs" seems to be a bit more fitting now that I have understood why merging a payload stream with a temporal one is "wrong". I am going to mark your last comment as marked...tx for all the input.
Regards
c.


