Processing TCP raw data with streaminsight
-
Thursday, January 31, 2013 6:02 AM
I have created a class which inherits the IObservable interface so that my class takes an advantage to Reactive extension.
Under this class I have one function which is responsible for getting data from TCP through TCPClient class. Below are code sample which get data and deserialize it into one myobject class and publish it for pushing into Streaminsight engine.
tcpNetworkStream = tcpClient.GetStream();
byte[] bytes = new byte[tcpClient.ReceiveBufferSize];
int totalCount = tcpNetworkStream.Read(bytes, 0, tcpClient.ReceiveBufferSize);
stringBuilder.Clear();
for (int iterator = 0; iterator < totalCount; iterator++)
{
char data = Convert.ToChar(bytes[iterator]);
if (data == ';')
{
string[] dataValues = stringBuilder.ToString().Split(',');
if (dataValues.Length >= 1)
{
DateTime date = DateTime.Now;
try
{
date = Convert.ToDateTime(dataValues[0]);
}
catch (FormatException ex)
{
stringBuilder.Clear();
continue;
}
value = new myObject() { Time = date, Value = Convert.ToDouble(dataValues[1]) };
OnNext(value);
}
stringBuilder.Clear();
}
else
stringBuilder.Append(data);
}
}
Is there any way to push the TCP data into streaminsight without processing (as above code) it into own class object?
All Replies
-
Thursday, January 31, 2013 2:39 PM
Yes and no. No, because StreamInsight needs the payload type to determine the event's schema. Yes, because byte[] is a valid payload type. You still need to split the stream into the individual events, but you can enqueue the resulting byte[] payloads into StreamInsight and then defer converting the byte[] to myObject till later in your StreamInsight query.- Proposed As Answer by TXPower125 Friday, February 01, 2013 6:20 AM
-
Friday, February 01, 2013 4:27 AM
Thanks for the response. Could you please help clarify the following
"converting the byte[] to myObject till later in your StreamInsight query".
I am using the below query in my current implementation:
var query = from e in stream.HoppingWindow(TimeSpan.FromSeconds(interval), TimeSpan.FromSeconds(interval)) select new myObject{ Value = e.StdDev(i => i.Value), Time = e.Max(i => i.Time) }; using (query.ToObservable().Subscribe(OnNext)) { source.GetTCPData(); }
In my source, after converting byte[] to myObject, we enqueue myObject into StreamInsight engine and use the above query to calculate standard deviation on hoppingWindow.
Do I need to convert byte[] of individual event into myObject before calculation?? Could you please clarify.
-
Friday, February 01, 2013 6:20 AM
Remember in StreamInsight, your payload type specifies the schema for your event. For use in your calculation in the hopping window, you will need to convert that TCP byte stream to a stream/streamable of myObjects.
In StreamInsight, your CepStream<T> or IQStreamable<T> can have only 1 payload type. What I am suggesting you do is have your source split the incoming TCP data stream into byte[] arrays (1 for each event). Then you can filter/convert/project the byte[] payloads to the actual payload types. This will let you send multiple events (payloads in a byte[]) through a single instance of the source.
-
Friday, February 01, 2013 9:01 AM
Thanks for the quick response.
What i understood, instead of converting byte[] to myObject at source level, I should only enqueue byte[] of individual event in StreamInsight and before querying it should be convert into myObject
Am i getting it correct??
Can you please share me the sample code or link for the same?
-
Friday, February 01, 2013 2:03 PM
Correct. Your source should be enqueueing payload types of byte[]. Then you can implement a User Defined Stream Operator (UDSO) to convert the byte[] payload to myObject. Now you can do your HoppingWindow aggregation before sending the output to a sink.
- Source - accepts TCP stream data and splits the TCP stream into events with payloads of byte[].
- UDSO - converts the events with the byte[] payload to events with a myObject payload
- HoppingWindow - perform the calculations you were wanting to do over the window (standard deviation, Min, Max, etc)
- Sink - outputs the results of the HoppingWindow calculation to whatever you wanted to output it to.
- Marked As Answer by Iric WenModerator Wednesday, February 06, 2013 8:21 AM
-
Wednesday, February 06, 2013 2:03 AM
I could think of returning the raw byte array for processing later inside an Rx or a StreamInsight query:
//...
byte semicolon = (byte)';';
List<byte> currentChunk = new List<byte>();
for (int iterator = 0; iterator < totalCount; iterator++)
{
byte data = bytes[iterator];
if (data == semicolon)
{
OnNext(currentChunk);
currentChunk.Clear();
}
else
{
currentChunk.Add(data);
}
}Thanks,
Firaz.
- Marked As Answer by Iric WenModerator Wednesday, February 06, 2013 8:21 AM
-
Wednesday, February 06, 2013 10:48 AM
Thanks for your response.
TXPower125 :
I have implemented my code the way you suggested. Below is the application code
using (Server server = Server.Create("Default")) { Microsoft.ComplexEventProcessing.Application cepApplication = server.CreateApplication("cepapp"); //Call to my source fot data var data = cepApplication.DefineObservable( () => new TCPDataProducer(config)).ToPointStreamable(e => e.GetPointEvent()); //Use USDO for data conversion (byte[] to myObject) var myObjectQuery = from d in data.Scan(() => new myObjectOperator()) select d; //Calculation var query = from e in myObjectQuery .HoppingWindow(TimeSpan.FromSeconds(interval), TimeSpan.FromSeconds(interval)) select new myObject { Value = e.Avg(i => i.Value), Time = e.Max(i => i.Time) }; //Sink to get the data after output query.ToObservable().Subscribe(OnNext); }Does this look correct? Please suggest any changes if needed.
One of the issue I am encountering is related to the Subscriber method not being called for my Observable source? Not sure what I am missing here. Please help.
I followed the link below for my source implementation:
http://www.devbiker.net/post/Dual-Mode-Data-Sourcese28093Part-I.aspx
- Edited by DevCode13 Wednesday, February 06, 2013 1:06 PM updates
-
Wednesday, February 06, 2013 2:16 PM
I don't see anything out of line. Are you enqueing CTI events? If not, you may want to specify AdvanceTimeSettings.
-
Friday, February 08, 2013 1:51 PM
Thanks for your response. Yes, I am enqueing CTI event throught GetPointEvent() method. Now the Subscribe method gets called (had some coding error) but now facing another issue where I am unable to get the output of the query. When I subscribe observer, its type of StreamInputEvent<TpayloadType>
protected void PublishEvent(StreamInputEvent<TPayloadType> newEvent) { _observer.OnNext(newEvent); }But when trying to get data through sink as
//Sink to get the data after calculation query.ToObservable().Subscribe(OnNext);
OnNext method use myObject class to get calculated data
public void OnNext(myObject value) { myObjectList.Add(value); }Anything that I might have missed due to which I am not getting the output? Please suggest.
- Edited by DevCode13 Friday, February 08, 2013 3:16 PM
-
Monday, February 11, 2013 3:42 PM
You are not enqueueing CTI events. CTI events advance application time. Without them, you will not see any output because your events will never leave your input adapter/source. You can confirm this by tracing the workflow with Event Flow Debugger. As a starting point, you'll want to add the "AdvanceTimeSettings.IncreasingStartTime" to your ToPointStreamable() call. This will enqueue 1 CTI event after each Insert event. The example code below was written to be used within LinqPad, but with some modifications will run just fine in a regular application.
var startTime = new DateTimeOffset(DateTime.Today); var source = Application .DefineEnumerable(()=> Enumerable.Range(0, 10)) .ToPointStreamable(e => PointEvent.CreateInsert(startTime.AddSeconds(e), e), AdvanceTimeSettings.IncreasingStartTime); var query = from e in source where e % 2 == 0 select e; query.ToObservable().Subscribe((e)=> e.Dump());

