none
looking for sample with multiple senders and multiple receivers

    Question

  • Hi,

    is there a good sample which uses the streaminsight service and support multiple senders and multiple receivers (sinks)

    All the samples I can found are limited to 1 sender and 1 receiver (with multiple sinks) in 1 box as a standalone service. But I need something less linked and more opened...

    the scenario is like this:

    I have 50 stores sending data, they have to send this to some shared entry point. Let say, 10 stores send data to the same observable collection A, 10 others to another collection B etc...

    now I have different sinks. some will be Windows services and will send emails for alerts, some others will gather stats for a Dashboard etc...

    I have to create new observers in the future (using client/server apps, Silverlight apps, asp.net ...)

    So is there a complete example based on this model?

    does streaminsingt will keep in memory the data send by the stores?

    If I start any observer in the day, I want to be able to access the entire day data. (I want to create a chart for the day with a custom aggregation of the event like every 10min, every hour etc...)

    (my events will also be stored in an SQL database, so I can read missing information from here... but its another story and I'll ask for another example for this: producing events from historical database then complete the data with real time events)

    thanks for your guides :)

    Sunday, July 28, 2013 8:51 PM

Answers

  • It occurred to me that I didn't convert the observables to streams. Not really sure how I missed that. That said, here's an updated sample that takes the observables and call ToPointStream() to make sure that we have temporal streams:

    using (Server server = Server.Create("RealTimeDataMgmt21"))
        {
            Application cepApplication = server.CreateApplication("DemoApp");
    
            var mySubject = cepApplication.CreateSubject(
                "MySubject", () => new Subject<GeneratedEvent>());
    
            var firstObservable = cepApplication.DefineObservable<GeneratedEvent>(
                () => Observable.Interval(TimeSpan.FromSeconds(1)).Select(
                    e => new GeneratedEvent {Source = "Source1", Value = e}));
    
            var firstStream = firstObservable.ToPointStreamable(e =>
                PointEvent<GeneratedEvent>.CreateInsert(DateTimeOffset.Now, e),
                AdvanceTimeSettings.IncreasingStartTime);
    
            firstStream.Bind(mySubject).Run("First Observable Process");
    
            Console.WriteLine("Press ENTER to bind the first observer to the subject");
            Console.ReadLine(); 
    
            //If this wasn't the same method, you can use Application.GetSubject to get an 
            //already-defined subject. 
            //cepApplication.GetSubject<GeneratedEvent, GeneratedEvent>("MySubject");
    
            var firstObserver = cepApplication.DefineObserver<GeneratedEvent>(() => Observer.Create<GeneratedEvent>(e => 
                   Console.WriteLine("First Observer - Event Source=" + e.Source + " Value=" + e.Value)));
    
            mySubject.Bind(firstObserver).Run("First Observer Process");
    
            Console.WriteLine("Press ENTER to bind a new observable to the subject");
            Console.ReadLine(); 
    
            var secondObservable = cepApplication.DefineObservable<GeneratedEvent>(
                () => Observable.Interval(TimeSpan.FromSeconds(1)).Select(
                    e => new GeneratedEvent {Source = "Source2", Value = e * e}));
    
            var secondStream = secondObservable.ToPointStreamable(e => 
                PointEvent<GeneratedEvent>.CreateInsert(DateTimeOffset.Now, e),
                AdvanceTimeSettings.IncreasingStartTime);
    
            secondStream.Bind(mySubject).Run("Second Observable Process"); 
    
            Console.WriteLine("Press ENTER to bind a new observer to the subject");
            Console.ReadLine(); 
            var secondObserver = cepApplication.DefineObserver<GeneratedEvent>(() => Observer.Create<GeneratedEvent>(e => 
                    Console.WriteLine("Second Observer - Event Source=" + e.Source + " Value=" + e.Value)));
    
            mySubject.Bind(secondObserver).Run("Second Observer Process"); 
    
            Console.WriteLine("Press ENTER to exit");
            Console.ReadLine(); 
    
            cepApplication.Delete(); 
                    
        }
    The one thing that you will need to be very careful of with this - in the real world - is if you have latency from any of the sources. Because it's a temporal stream, CTI rules apply. To the subject. So if you have one stream that has a more latency than the other stream, you will need to create a custom subject that handles some of the synchronization between the streams. I'm not sure how much of this StreamInsight will do for you.

    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Monday, August 05, 2013 5:36 PM
    Moderator

All replies

  • Hi Willgart1, 

    Thank you for your question.  

    I am trying to involve someone more familiar with this topic for a further look at this issue. Sometime delay might be expected from the job transferring. Your patience is greatly appreciated.  

    Thank you for your understanding and support. 

    Allen Li
    TechNet Community Support

    Tuesday, July 30, 2013 2:59 AM
    Moderator
  • If I read your scenario correctly, you would use a subject for this. I don't have a sample that I can post handy but here's the link to the docs: http://msdn.microsoft.com/en-us/library/jj136802(v=sql.111).aspx. You can also add/remove observers and observables dynamically to/from the subject at runtime.

    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Tuesday, July 30, 2013 5:44 PM
    Moderator
  • Hi Willgart1,

    I could not find any existing sample for your scenario. We will have to write a new sample and from a support perspective this is really beyond what we can do here in the forums. If you cannot determine your answer here or on your own, consider opening a support case with us. Visit this link to see the various support options that are available to better meet your needs:  http://support.microsoft.com/default.aspx?id=fh;en-us;offerprophone.

    Thanks,

    Kunal (MSFT)

    Friday, August 02, 2013 7:53 PM
  • Hi Willgart,

      I finally had a chance to put together a quick sample for you - I was pretty bogged down prepping for Sql Saturday Baton Rouge. Now, this is pretty contrived but it should give you the idea. The sample starts with a single observable and then publishes that to a subject. From there, it will then bind an observer to the subject. Next, we add a new event source to the subject (another observable) and finally, a new sink. It all revolves around the subject.

    static void Main(string[] args)
    {
        using (Server server = Server.Create("RealTimeDataMgmt21"))
        {
            Application cepApplication = server.CreateApplication("DemoApp");
    
            var mySubject = cepApplication.CreateSubject(
                "MySubject", () => new Subject<GeneratedEvent>());
    
            var firstObservable = cepApplication.DefineObservable<GeneratedEvent>(
                () => Observable.Interval(TimeSpan.FromSeconds(1)).Select(
                    e => new GeneratedEvent {Source = "Source1", Value = e}));
    
            firstObservable.Bind(mySubject).Run("First Observable Process");
    
            Console.WriteLine("Press ENTER to bind the first observer to the subject");
            Console.ReadLine(); 
    
            //If this wasn't the same method, you can use Application.GetSubject to get an 
            //already-defined subject. 
            //cepApplication.GetSubject<GeneratedEvent, GeneratedEvent>("MySubject");
    
            var firstObserver = cepApplication.DefineObserver<GeneratedEvent>(() => Observer.Create<GeneratedEvent>(e => 
                    Console.WriteLine("First Observer - Event Source=" + e.Source + " Value=" + e.Value)));
    
            mySubject.Bind(firstObserver).Run("First Observer Process");
    
            Console.WriteLine("Press ENTER to bind a new observable to the subject");
            Console.ReadLine(); 
    
            var secondObservable = cepApplication.DefineObservable<GeneratedEvent>(
                () => Observable.Interval(TimeSpan.FromSeconds(1)).Select(
                    e => new GeneratedEvent {Source = "Source2", Value = e * e}));
    
            secondObservable.Bind(mySubject).Run("Second Observable Process"); 
    
            Console.WriteLine("Press ENTER to bind a new observer to the subject");
            Console.ReadLine(); 
            var secondObserver = cepApplication.DefineObserver<GeneratedEvent>(() => Observer.Create<GeneratedEvent>(e => 
                    Console.WriteLine("Second Observer - Event Source=" + e.Source + " Value=" + e.Value)));
    
            mySubject.Bind(secondObserver).Run("Second Observer Process"); 
    
            Console.WriteLine("Press ENTER to exit");
            Console.ReadLine(); 
    
            cepApplication.Delete(); 
                    
        }
    
    }

    The definition for the GeneratedEvent class is pretty basic ...

    public class GeneratedEvent
    {
         public string Source;
         public long Value; 
    }


    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Monday, August 05, 2013 3:28 PM
    Moderator
  • It occurred to me that I didn't convert the observables to streams. Not really sure how I missed that. That said, here's an updated sample that takes the observables and call ToPointStream() to make sure that we have temporal streams:

    using (Server server = Server.Create("RealTimeDataMgmt21"))
        {
            Application cepApplication = server.CreateApplication("DemoApp");
    
            var mySubject = cepApplication.CreateSubject(
                "MySubject", () => new Subject<GeneratedEvent>());
    
            var firstObservable = cepApplication.DefineObservable<GeneratedEvent>(
                () => Observable.Interval(TimeSpan.FromSeconds(1)).Select(
                    e => new GeneratedEvent {Source = "Source1", Value = e}));
    
            var firstStream = firstObservable.ToPointStreamable(e =>
                PointEvent<GeneratedEvent>.CreateInsert(DateTimeOffset.Now, e),
                AdvanceTimeSettings.IncreasingStartTime);
    
            firstStream.Bind(mySubject).Run("First Observable Process");
    
            Console.WriteLine("Press ENTER to bind the first observer to the subject");
            Console.ReadLine(); 
    
            //If this wasn't the same method, you can use Application.GetSubject to get an 
            //already-defined subject. 
            //cepApplication.GetSubject<GeneratedEvent, GeneratedEvent>("MySubject");
    
            var firstObserver = cepApplication.DefineObserver<GeneratedEvent>(() => Observer.Create<GeneratedEvent>(e => 
                   Console.WriteLine("First Observer - Event Source=" + e.Source + " Value=" + e.Value)));
    
            mySubject.Bind(firstObserver).Run("First Observer Process");
    
            Console.WriteLine("Press ENTER to bind a new observable to the subject");
            Console.ReadLine(); 
    
            var secondObservable = cepApplication.DefineObservable<GeneratedEvent>(
                () => Observable.Interval(TimeSpan.FromSeconds(1)).Select(
                    e => new GeneratedEvent {Source = "Source2", Value = e * e}));
    
            var secondStream = secondObservable.ToPointStreamable(e => 
                PointEvent<GeneratedEvent>.CreateInsert(DateTimeOffset.Now, e),
                AdvanceTimeSettings.IncreasingStartTime);
    
            secondStream.Bind(mySubject).Run("Second Observable Process"); 
    
            Console.WriteLine("Press ENTER to bind a new observer to the subject");
            Console.ReadLine(); 
            var secondObserver = cepApplication.DefineObserver<GeneratedEvent>(() => Observer.Create<GeneratedEvent>(e => 
                    Console.WriteLine("Second Observer - Event Source=" + e.Source + " Value=" + e.Value)));
    
            mySubject.Bind(secondObserver).Run("Second Observer Process"); 
    
            Console.WriteLine("Press ENTER to exit");
            Console.ReadLine(); 
    
            cepApplication.Delete(); 
                    
        }
    The one thing that you will need to be very careful of with this - in the real world - is if you have latency from any of the sources. Because it's a temporal stream, CTI rules apply. To the subject. So if you have one stream that has a more latency than the other stream, you will need to create a custom subject that handles some of the synchronization between the streams. I'm not sure how much of this StreamInsight will do for you.

    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Monday, August 05, 2013 5:36 PM
    Moderator