How to bind multiple queries to one process

Answered How to bind multiple queries to one process

  • Thursday, October 11, 2012 9:39 AM
     
     

    Hi,

    I have one stream of data that has to be processed by multiple queries and I would like to do that in one process.
    Is this possible and if it is, can you provide some sample code?

    Thanks in advance,

    Henry

All Replies

  • Thursday, October 11, 2012 1:10 PM
     
     Answered Has Code

    Henry,

    Are you using StreamInsight 2.1 with the new Sources/Sinks model? If so, this should get you started.

    var observable = Application.DefineObservable(()=> Observable.Interval(TimeSpan.FromSeconds(1)));
    var streamable = observable.ToPointStreamable(e=> PointEvent.CreateInsert(DateTimeOffset.UtcNow, e),
    												AdvanceTimeSettings.IncreasingStartTime);
    var odd = from e in streamable
    			where e % 2 != 0
    			select e;
    var even = from e in streamable
    			where e % 2 == 0
    			select e;
    var evenSink = Application.DefineObserver(() => Observer.Create<long>(x => x.Dump("Even")));
    var oddSink = Application.DefineObserver(() => Observer.Create<long>(x => x.Dump("Odd")));
    var binding = odd.Bind(oddSink)
    			.With(even.Bind(evenSink));
    binding.Run("MyProcess");
    The trick to getting all your different pieces of query logic in the same process is to use the "With()" method. Using that method, you can wrap two or more queries in a single process.
    • Marked As Answer by HHoud Friday, October 12, 2012 7:52 AM
    •  
  • Thursday, October 11, 2012 2:47 PM
     
     

    This is great!

    The only problem with it is that I can't do RunCheckpointable, which can be useful later for resiliency.
    Or can I add resilience in another way like with server.BeginCheckpoint() and server.EndCheckpoint()?

  • Friday, October 12, 2012 1:25 AM
     
      Has Code

    That query should be fine with RunCheckpointable(). Have you tried? If so, are you getting an exception?

    Keep in mind that you have to create the server as checkpointable and specify the checkpoint configuration so there's a little extra work that you need to do at startup. See below:

    SqlCeMetadataProviderConfiguration metadataProviderConfiguration =
        new SqlCeMetadataProviderConfiguration()
            {
                CreateDataSourceIfMissing = true,
                DataSource = System.Reflection.Assembly.GetEntryAssembly().GetName().Name + ".sdf"
            };
    
    CheckpointConfiguration checkpointConfiguration =
        new CheckpointConfiguration()
            {
                CreateLogPathIfMissing = true,
                LogPath = System.Reflection.Assembly.GetEntryAssembly().GetName().Name + "_log.sdf"
            };
    
    server = Server.Create(instanceName, metadataProviderConfiguration,
        checkpointConfiguration);


    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

  • Friday, October 12, 2012 2:50 PM
     
     

    Thanks, I already had the configuration part for the server.

    I can't use RunCheckpointable on that query, the error I get is the following:

    Error 5 'Microsoft.ComplexEventProcessing.Linq.IRemoteBinding' does not contain a definition for 'RunCheckpointable' and the best extension method overload 'Microsoft.ComplexEventProcessing.Linq.CepCheckpointableProcessExtensions.RunCheckpointable(Microsoft.ComplexEventProcessing.Linq.IRemoteStreamableBinding, string)' has some invalid arguments C:\Users\Administrator\Documents\Visual Studio 2012\Projects\WCF\StreamInsight WCF Demo\StreamInsightAppWCF.cs 224 21 StreamInsight WCF Demo