none
Forwarding / Dropping Events

    Question

  • Hi,

    I have written a sensor model that plugs into WCF as a transport channel.  Each sensor that is 'plugged' in via configuration can emit events up through the channel stack (either as individual events or in a stream).  When an event is emitted it is evaluated to determine whether the event should be forwarded or dropped (filtered), in other words, the events are effectively being filtered at the edge.  My current simple interface for evaluation passes the 'rules definition' and the event instance to an evaluator.  It then returns true/false as to whether the event is forwarded or dropped.

    I'm trying to work out how I can use the StreamInsight engine as an evaluator in my system such that the rules definition is a StreamInsight LINQ query that performs the filtering of the event.  Looking at the architecture of StreamInsight the events that appear in the output adapter are as a result of the query being applied to the events enqueued in the input adapter.

    I would ideally like to mark the event as either to be forwarded or dropped and I'm wondering whether I can do this.  Alternatively, I'm guessing that I could have two output adapters and two queries: one query that is for forwarded events and the other query for dropped events.  I'm assuming this is possible with StreamInsight?

    Just as an aside, the events may be of different types and each event emission passes the rules definition, so the definition is not known in advance.  I'm also assuming that I can dynamically add queries (each one representing a different rules definition in effect) and then work out at runtime which query to apply to which event based on the rules definition.

    Finally, I am assuming again that I need to create an untyped input and output adapter to get the events into and out of the CEP engine and interfacing with my sensor model?

    Sorry for what might seem basic questions but I'm trying to work out how I can utilise StreamInsight in my scenario.
    Wednesday, September 23, 2009 1:12 PM

Answers

  • Hi Paul,

    StreamInsight requires that all events on an input stream have the same payload type. The input adapters can “normalize” the original events by transforming them to a set of properties that are inspected by the filtering queries and serializing the rest as a binary blob (byte[]) or an xml string to carry along with the event payload.

    You can use a “where” statement in the LINQ queries to filter out (drop) events and they will not reach the output adapter. Alternatively, you can write the filtering query such that it outputs a new event which contains all the fields of the input event plus one computed boolean field which indicates whether the event must be dropped or forwarded.

    You can dynamically add queries and write untyped input and output adapters that work with events of different types. The limitation is that each input adapter instance must only produce one type of events matching the expected type of the input stream (and the query) connected to the adapter.


    Regards,
    Tiho
    MS StreamInsight Team

    Disclaimer: This posting is provided "AS IS" with no warranties, and confers no rights
    • Marked as answer by Paul.Brown Monday, September 28, 2009 3:13 PM
    Friday, September 25, 2009 2:01 AM

All replies

  • Hi Paul,

    StreamInsight requires that all events on an input stream have the same payload type. The input adapters can “normalize” the original events by transforming them to a set of properties that are inspected by the filtering queries and serializing the rest as a binary blob (byte[]) or an xml string to carry along with the event payload.

    You can use a “where” statement in the LINQ queries to filter out (drop) events and they will not reach the output adapter. Alternatively, you can write the filtering query such that it outputs a new event which contains all the fields of the input event plus one computed boolean field which indicates whether the event must be dropped or forwarded.

    You can dynamically add queries and write untyped input and output adapters that work with events of different types. The limitation is that each input adapter instance must only produce one type of events matching the expected type of the input stream (and the query) connected to the adapter.


    Regards,
    Tiho
    MS StreamInsight Team

    Disclaimer: This posting is provided "AS IS" with no warranties, and confers no rights
    • Marked as answer by Paul.Brown Monday, September 28, 2009 3:13 PM
    Friday, September 25, 2009 2:01 AM
  • Hi Tiho,

    Thanks for the response again!

    It mostly makes sense to me.  So, a stream of events is specifically typed but I can join across event types by joining on the streams.  So, I could emit events and then determine which input adapter should receive the event based on its type (as each input adapter instance must only handle events of one type).

    I guess that computing the flag in the query (forward or drop) would be more efficient then running two separate queries over the same event so that seems like the way to go.  As it seems I might need to have a wrapper for my base event class (nested types and arrays) I could add the flag to this wrapper and populate it in the query.

    Does this seem like a reasonable approach?

    Cheers
    Friday, September 25, 2009 7:17 AM
  • Yes, you should determine which input adapter should receive the event based on its type.

    You only have to add the flag to the output event if you want to read this flag and drop/forward the event in the output adapter. If you don’t want the output adapter to even see the dropped event then you can use a filter in the query and you don’t need the flag in the event. E.g.

    from e in input
    select e
    where e.f1 > e.f2

    Also, you don’t have to add the flag to your incoming event type. You can add it later in the query – e.g.:

    from e in input
    select new { f1 = e.f1, f2 = e.f2, …, shouldDrop = (e.f1 <= e.f2) }

    This example uses an anonymous output type but you can create an explicit type if you want to write a typed output adapter.

    Both options are reasonable and your choice depends on what you want to do in the output adapter.
    Regards,
    Tiho
    MS StreamInsight Team

    Disclaimer: This posting is provided "AS IS" with no warranties, and confers no rights
    Friday, September 25, 2009 7:16 PM
  • Hi Tiho,

    Thanks again for the reply.  Makes perfect sense.

    The only question I have left (for now!) is the CepStream class itself.  While I can have an untyped adapter, it looks like I will need to typed streams (naturally I guess).  However, the CepStream class is a generic that means I need to know the type at design time rather than runtime.  What if I genuinely do not know what type of payload I will receive at design time and will only know at runtime?  How can I create a CepStream object for a payload type at runtime?

    I guess this comes back to the fact if you've got to compile the LINQ query into the application you're probably going to have to know what the payload type is!  But if I wanted to provide almost a generic 'host' of the CEP engine and allow people to dynamically specify a query at runtime for an arbitrary event type I'm going to struggle.  Would you agree?

    Cheers
    Friday, September 25, 2009 8:27 PM
  • CepStream takes a type because you use it to write a query template and you have to know the type of the input events of a query to write it. You can later bind it to generic adapters but you have to know the type when writing the query template.

    You can still provide generic adapters and “generic host” but people would have to know the types of the events when they write a specific query. Once the query is written, it can be bound to the generic adapters and it will work as long as the adapters produce events of the correct type.

    There’s no way to write a query without knowing the types of the input events. This goes back to “normalizing” the original events to a set of well known properties and a blob. Otherwise people will have to know the exact type of the inputs before they write a new query. The “host” will also have to know what queries can process what events.

    To summarize, you can have a “generic host” and dynamically send different queries to it but (1) the authors of the queries would have to know to what input events the queries will be applied and (2) the host would have to know what queries can process what events to correctly forward the input events to the queries for them.

    Regards,
    Tiho
    MS StreamInsight Team

    Disclaimer: This posting is provided "AS IS" with no warranties, and confers no rights
    Monday, September 28, 2009 4:31 AM
  • Hi Tiho,

    Thanks again.  I understand now and makes sense.  I guess it would be nice if there was some way to be able to dynamically update the queries that are used (even if we have a static type for the event stream) but it seems that is not possible with LINQ at this time without some custom expression tree serialization approach and hook that in at runtime via some remote management interface.  An interesting problem to solve I think :)

    Cheers
    Paul
    Monday, September 28, 2009 3:12 PM
  • Paul, that's a real challenge at present (expression trees on the fly) due to issues with dynamic assemblies staying loaded and a host of other issues.  A pluggable LINQ provider (and an easier approach to writing LINQ providers!) might mitigate this issue.  Alternatively, I would like to propose a model for a truly dynamic LINQ approach that was able to do runtime parsing/interpretation of queries (which could be modified on the fly).  It need not be LINQ-based - maybe LINQ is just one of the "query" options.  I would be nice to allow a pluggable model for query processors.  By definition, LINQ if focused more on "Language Integration" - design time/compile time capability - rather than runtime capability.

    In our case, we have a "generic" data structure (similar to an ExpandoObject or a C# 4.0 dynamic) that supports data binding and custom type descriptors, but LINQ can't be dynamic enough to do what we need.  We've built our own query parser/processor for a custom application but I'd like to be able to plug it into the StreamInsight infrastructure.

    Friday, October 16, 2009 2:35 PM
  • Yes, in my case it would be nice to be able to change a query using some remote UI that 'packages' the query and sends it to the endpoint that is hosting the StreamInsight engine and automatically apply it to a running query instance such that all new events are evaluated against the new updated query.  Hey ho - plan B then :)
    Friday, October 16, 2009 4:06 PM
  • Hi,

      I am facing a similar requirement.

    I have an application that receives data/events from different sources. So the event type is different from each source.For example:

    from source s1 i may receive a structure

    class s1Event

    {

    public int field1;

    public int field2;

    }

    and from source s2 i may receive

    class s2Event

    {

    public int field1;

    public int field3;

    public int field5;

    }

    each event passes the rules definition, so the definition is not known in advance and I also need to add queries dynamicaly decide at runtime as to which queries need to be applied on which events.

    Is there a way to dynamicaly handle eventtypes at runtime and add queries dynamicaly.

    Any help or links will be appreciated.

    Thanks,

    smita.

    Wednesday, July 24, 2013 1:07 PM
  • As the original replies indicated, a stream can only have one event type. So you'd need to have multiple input streams that you can union together and/or join. You can use the same adapter/source - for adapters, you'd use an untyped adapter and for a source you could use reflection or a builder method on an interface common to each item. Alternatively, you could have fields as nullable types and simply mark them null for the events where they aren't appropriate.

    I'm not sure what you mean by "passes the rules definition".

    Adding queries dynamically simply requires some code to handle this. There's many different ways that you could do this but I don't know enough about what your use case is to even suggest one. That said, in 2.1, using subjects for publishing source data streams and the subscribing to them from your analytic streams would be the way to do this ... the differences will come in how you want to determine those additional queries (i.e. what your use cases are for when they need to be loaded). Subjects will allow you to have both publishers and subscribers come and go at runtime.


    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Wednesday, July 24, 2013 4:57 PM