locked
How to Join Point and Edge Event RRS feed

  • Question

  • Hello,

    I want to get the following results.

    I have RX framework is being developed. (StreamInsight V2.1)

    point events and the edge event should try to join the event.

    As expected, the result does not come out.

    The code I wrote is as follows:

    var observableSource1 = app.DefineObservable(() => new Observable1()); //Source1 (edge event) var observableSource2 = app.DefineObservable(() => new Observable2()); //Source2 (point event) var observableSink = app.DefineObserver(() => new Observer()); //Sink var query = from item1 in observableSource1.ToEdgeStreamable(input1 => EdgeEvent.CreateStart<SpecInputPayload>(DateTimeOffset.Now, input1), AdvanceTimeSettings.IncreasingStartTime) join item2 in observableSource2.ToPointStreamable(input2 => PointEvent.CreateInsert<InputPayload>(DateTimeOffset.Now, input2), AdvanceTimeSettings.IncreasingStartTime) on item1.ID equals item2.ID select new OutputEvent { outID = item1.ID, outValue = item1.Value };

    query.Bind(observableSink).Run();

    Wait for your answer.

    Thanks.




    • Edited by alojong Friday, February 15, 2013 8:05 AM
    Friday, February 15, 2013 3:44 AM

Answers

  • From the MSDN StreamInsight 2.1 docs on Joins: "A join operation compares each event from one input stream with each event from one or more other input streams. If their valid time intervals overlap and the join condition holds, the operation produces one output event."

    In a nutshell, the events that occur in your point stream (event duration is 1 tick for point events), must occur during the lifetime of the edge events you would like to join with. Remember that Edge events have a known start time and an end time that occurs sometime in the future.

    Here's an example you can use as a starting point.

    app = server.CreateApplication("MyApplication");
    
    var startTime = new DateTimeOffset(DateTime.Today);
    
    var pointStreamable = app.DefineEnumerable(() => Enumerable.Range(0, 100))
                                .ToPointStreamable(e => PointEvent.CreateInsert(startTime.AddSeconds(e),
                                                                                new PointPayload
                                                                                    {
                                                                                        Id = (e%2 == 0) ? "A" : "B",
                                                                                        Value = e,
                                                                                        Timestamp = startTime.AddSeconds(e).UtcDateTime
                                                                                    }),
                                                AdvanceTimeSettings.IncreasingStartTime);
    
    var startEdgeStreamable = app.DefineEnumerable(() => Enumerable.Range(0, 10))
                                    .ToEdgeStreamable(e => EdgeEvent.CreateStart(startTime.AddSeconds(e*3),
                                                                                new EdgePayload
                                                                                    {
                                                                                        Id = "A",
                                                                                        Value = e,
                                                                                        Timestamp = startTime.AddSeconds(e*3).UtcDateTime
                                                                                    }),
                                                    AdvanceTimeSettings.IncreasingStartTime);
    
    // Add end edges by clipping the event duration of the start edges to the next event with the same Id
    var edgeStreamable = startEdgeStreamable.ClipEventDuration(startEdgeStreamable, (e1, e2) => e1.Id == e2.Id);
    
    var query = from p in pointStreamable
                                        join e in edgeStreamable
                                            on p.Id equals e.Id
                                        select new JoinedEvent
                                            {
                                                EdgeId = e.Id,
                                                EdgeValue = e.Value,
                                                EdgeTimestamp = e.Timestamp,
                                                PointId = p.Id,
                                                PointValue = p.Value,
                                                PointTimestamp = p.Timestamp
                                            };
    
    var observer = app.DefineObserver(() => Observer.Create<JoinedEvent>(OnNext, OnError, OnCompleted));
    using (IDisposable myProcess = query.Bind(observer).Run("MyProcess"))
    {
        Console.ReadLine();
    }

    And here's my payload classes:

    public class JoinedEvent
    {
        public string EdgeId { get; set; }
        public int EdgeValue { get; set; }
        public string PointId { get; set; }
        public int PointValue { get; set; }
        public DateTime EdgeTimestamp { get; set; }
        public DateTime PointTimestamp { get; set; }
    
        public override string ToString()
        {
            return string.Format("Edge {{{0},{1},{2}}}, Point {{{3},{4},{5}}}", EdgeId, EdgeValue, EdgeTimestamp, PointId, PointValue, PointTimestamp);
        }
    }
    
    public class EdgePayload
    {
        public string Id { get; set; }
        public int Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
    
    public class PointPayload
    {
        public string Id { get; set; }
        public int Value { get; set; }
        public DateTime Timestamp { get; set; }
    }

    And here are the static methods I used for my Observer:

    private static void OnCompleted()
    {
        Console.WriteLine("OnCompleted");
    }
    
    private static void OnError(Exception ex)
    {
        Console.WriteLine("OnError - " + ex.Message);
    }
    
    private static void OnNext<T>(T item)
    {
        Console.WriteLine("OnNext - " + item);
    }

    • Marked as answer by alojong Monday, February 25, 2013 6:29 AM
    Sunday, February 17, 2013 2:47 PM

All replies

  • A common reason for joins between streams to fail is that the event lifetimes are not overlapping. Are you sure that the point streams events overlap with the edge stream events?

    What does the data in your observables look like?

    Friday, February 15, 2013 2:34 PM
  • How are you creating the end edge events? Every start must have an end or you'll have "dangling edges" and every end must have a corresponding start (or you'll get exceptions).

    And Tony's right on ... if your join is failing it's because they aren't overlapping in time. Try using the Event Flow Debugger to see the events in the stream. You'll probably want to start the debugger recording with trace.cmd.


    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Friday, February 15, 2013 3:07 PM
    Moderator
  • Hi TXPower125,

    thanks for your answer.

    observables data is.. 

    public class InputPayload{
    
    string ID,	string Value }
    
    public class SpecInputPayload{
    
    string ID,	string Value }

    how to keep the edge event time?

    or How does the overlapping point and edge of time?



    • Edited by alojong Sunday, February 17, 2013 2:11 PM
    Sunday, February 17, 2013 1:54 PM
  • Hi DevBiker,

    thanks for your answer.

    um..  I know that the end edge events to create.

    but the creation of the end edge events is ambiguous.

    How can I make a start and end edge events in a single query expression?


    • Edited by alojong Sunday, February 17, 2013 2:12 PM
    Sunday, February 17, 2013 2:09 PM
  • Since I don't know what your source observable looks like, I can't say. But it's really not hard ... your source would have some sort of "marker" property to determine if its a start or an end. You then have either a function call or an immediate if operator that selects.

    In many cases, however, you should think long and hard about whether edges are appropriate for input. They do present quite a few challenges. It's usually easier and better to create your edge through queries. But since I don't know what you are trying to accomplish or what your source is, I can't say if that's the case.

    But ... to the original question ... did you use the event flow debugger to see what's happening with your joins?


    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Sunday, February 17, 2013 2:37 PM
    Moderator
  • From the MSDN StreamInsight 2.1 docs on Joins: "A join operation compares each event from one input stream with each event from one or more other input streams. If their valid time intervals overlap and the join condition holds, the operation produces one output event."

    In a nutshell, the events that occur in your point stream (event duration is 1 tick for point events), must occur during the lifetime of the edge events you would like to join with. Remember that Edge events have a known start time and an end time that occurs sometime in the future.

    Here's an example you can use as a starting point.

    app = server.CreateApplication("MyApplication");
    
    var startTime = new DateTimeOffset(DateTime.Today);
    
    var pointStreamable = app.DefineEnumerable(() => Enumerable.Range(0, 100))
                                .ToPointStreamable(e => PointEvent.CreateInsert(startTime.AddSeconds(e),
                                                                                new PointPayload
                                                                                    {
                                                                                        Id = (e%2 == 0) ? "A" : "B",
                                                                                        Value = e,
                                                                                        Timestamp = startTime.AddSeconds(e).UtcDateTime
                                                                                    }),
                                                AdvanceTimeSettings.IncreasingStartTime);
    
    var startEdgeStreamable = app.DefineEnumerable(() => Enumerable.Range(0, 10))
                                    .ToEdgeStreamable(e => EdgeEvent.CreateStart(startTime.AddSeconds(e*3),
                                                                                new EdgePayload
                                                                                    {
                                                                                        Id = "A",
                                                                                        Value = e,
                                                                                        Timestamp = startTime.AddSeconds(e*3).UtcDateTime
                                                                                    }),
                                                    AdvanceTimeSettings.IncreasingStartTime);
    
    // Add end edges by clipping the event duration of the start edges to the next event with the same Id
    var edgeStreamable = startEdgeStreamable.ClipEventDuration(startEdgeStreamable, (e1, e2) => e1.Id == e2.Id);
    
    var query = from p in pointStreamable
                                        join e in edgeStreamable
                                            on p.Id equals e.Id
                                        select new JoinedEvent
                                            {
                                                EdgeId = e.Id,
                                                EdgeValue = e.Value,
                                                EdgeTimestamp = e.Timestamp,
                                                PointId = p.Id,
                                                PointValue = p.Value,
                                                PointTimestamp = p.Timestamp
                                            };
    
    var observer = app.DefineObserver(() => Observer.Create<JoinedEvent>(OnNext, OnError, OnCompleted));
    using (IDisposable myProcess = query.Bind(observer).Run("MyProcess"))
    {
        Console.ReadLine();
    }

    And here's my payload classes:

    public class JoinedEvent
    {
        public string EdgeId { get; set; }
        public int EdgeValue { get; set; }
        public string PointId { get; set; }
        public int PointValue { get; set; }
        public DateTime EdgeTimestamp { get; set; }
        public DateTime PointTimestamp { get; set; }
    
        public override string ToString()
        {
            return string.Format("Edge {{{0},{1},{2}}}, Point {{{3},{4},{5}}}", EdgeId, EdgeValue, EdgeTimestamp, PointId, PointValue, PointTimestamp);
        }
    }
    
    public class EdgePayload
    {
        public string Id { get; set; }
        public int Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
    
    public class PointPayload
    {
        public string Id { get; set; }
        public int Value { get; set; }
        public DateTime Timestamp { get; set; }
    }

    And here are the static methods I used for my Observer:

    private static void OnCompleted()
    {
        Console.WriteLine("OnCompleted");
    }
    
    private static void OnError(Exception ex)
    {
        Console.WriteLine("OnError - " + ex.Message);
    }
    
    private static void OnNext<T>(T item)
    {
        Console.WriteLine("OnNext - " + item);
    }

    • Marked as answer by alojong Monday, February 25, 2013 6:29 AM
    Sunday, February 17, 2013 2:47 PM
  • Thanks.

    I have more question.


    Finally, enqueue CTI Information

    when multiple input entered on the StreamInsight Server with the same GUID

    and then I have to Guid "Count".

    .

    Here's an example (using streaminsight v2.0, adapter)

    Create CepStream :

    var inputStream = CepStream<InputPayload>.Create("InputStream"

    , typeof(InputFactory)

    , InputConfig

    , EventShape.Point

    , null); // use not AdvanceTimeSettings

    InputAdapter ProduceEvents :

    DateTimeOffset now = DateTimeOffset.Now; Guid guid = Guid.NewGuid();

    Random r = new Random();

    int rnum = r.Next(0, 100);

    Guid guid = Guid.NewGuid(); for (int idx = 0; idx < rnum; idx++) { // Produce INSERT event currEvent = CreateInsertEvent(); currEvent.StartTime = now; currEvent.Payload = new InputPayload(guid, idx.ToString(), idx); EnqueueOperationResult ret = Enqueue(ref currEvent); if (ret == EnqueueOperationResult.Success) { Console.WriteLine("Success"); } } EnqueueCtiEvent(now.Add(TimeSpan.FromTicks(1)));


    Output Query :

    var query = from input in inputStream group input by new { input.GUID } into inputGroup from inputGroup in inputGroup.TumblingWindow(TimeSpan.FromTicks(1)) select new OutputPayload { GUID = inputGroup.Key, Count = inputGroup.Count() };


    I want the same test using RX(v2.1)

    How to entered only once in the last CTI from RX?









    • Edited by alojong Tuesday, February 19, 2013 5:56 AM
    Tuesday, February 19, 2013 4:53 AM
  • What are you trying to accomplish here?

    If you are trying to enqueue a batch of Insert events before a CTI is enqueued, there are settings to do that in AdvanceTimeSettings Class on MSDN. Unless you are replaying events, that is the way I would go.

    In 2.1, you can use the legacy adapter model with the new Rx APIs. Take a look at this extension method RemoteProvider.DefineStreamableSink<tpayload>Method (IRemoteProvider, Type, Object, EventShape, StreamEventOrder)</tpayload>. It will let you create a IQStreamable<InputPayload> directly from your existing adapter.

    Tuesday, February 19, 2013 3:00 PM