Error -2146233088 Could not find the corresponding event for an incoming retraction or expansion event. This can be caused by the usage of a non-deterministic user-defined function, aggregate or operator.

Answered Error -2146233088 Could not find the corresponding event for an incoming retraction or expansion event. This can be caused by the usage of a non-deterministic user-defined function, aggregate or operator.

  • Wednesday, February 20, 2013 4:09 PM
     
     

    Simple alarm, Send, Ack idea here. Stuck with no way to debug. Not sure if this is an error with Stream Insight or my brain not working right.

    Everything works great, popping events for every alarm every 20 minutes, until I get an Ack message. Then the query dies and I get the error on the next checkpoint.

    I think my problem is with the CreateSendEvent query event being removed  but I am not sure why.

    static void Main(string[] args)

            {

                //

                //  Add Check Points to recover from a start and Stop

                //

                var metaConfig = new SqlCeMetadataProviderConfiguration

                {

                    DataSource = "C:\\SI\\CAA\\SI_Critical_Alarm_Alert.sdf",

                    CreateDataSourceIfMissing = true

                };

                // Set up checkpointing. This needs a location to place the log files.

                var chkConfig = new CheckpointConfiguration

                {

                    LogPath = "C:\\SI\\CAA",

                    CreateLogPathIfMissing = true

                };

                using (var server = Server.Create("Default", metaConfig, chkConfig))

                //

                //  Add Check Points to recover from a start and Stop

                //

                //

                //using (var server = Server.Create("Default")) // if just embedded SI

                {

                    var host = new ServiceHost(server.CreateManagementService());

                    host.AddServiceEndpoint(typeof(IManagementService), new WSHttpBinding(SecurityMode.Message), "http://localhost/SI_Critical_Alarm_Alert");

                    host.Open();

                    //

                    Application myApp;

                    if (!server.Applications.TryGetValue("SI_CAA", out myApp))

                    {

                        myApp = server.CreateApplication("SI_CAA");

                    }

                    CepProcess procAll;

                    if (myApp.Processes.TryGetValue("CAA_All", out procAll))

                    {

                        Console.WriteLine("Resuming process...");

                        procAll.Resume(); 

                    }

                    else

                    {

                        Console.WriteLine("Creating process...");

                        //

                        // AJS 01/10/2013 Add Check Points to recover from a start and Stop

                        //

                        // SOURCE

                        // QUEUE Critical Alarm Alert Event (677) SQL 

                        var mySourceMessages = myApp.DefineObservable<CAA_Message>(() => new QEventCreator_CriticalAlarmAlert(677)).ToPointStreamable(

                                        e => PointEvent.CreateInsert<CAA_Message>(DateTime.UtcNow, e),

                                        AdvanceTimeSettings.StrictlyIncreasingStartTime);

                        //QUERY

                        var FirstFilter = from r in mySourceMessages

                                          where r.ActivityType != -1

                                          select r;

                        var Opens = from r in FirstFilter

                                    where r.ActivityType == 1

                                    select r;

                        var Alarms = from r in FirstFilter

                                    where r.ActivityType == 6

                                    select r;

                        var ACKs = from r in FirstFilter

                                   where r.ActivityType == 3

                                   select r;

                        var Sents = from r in FirstFilter

                                    where r.ActivityType == 4

                                    select r;

                        Alarms = Alarms.AlterEventDuration(e => TimeSpan.MaxValue);

                        //  all events ending dates will be set to the starting date of the ACK event. If no ACK  event has yet happend, the ending date will stay open forever.

                        var OpenAlarms = Alarms.ClipEventDuration(ACKs, (e1, e2) => e1.Alarm_ID == e2.Alarm_ID);

                        // Sents events lasts 20 minutes

                        Sents = Sents.AlterEventDuration(e => TimeSpan.FromMinutes(20));

                        // If no Sent event has happend yet, or the 20 minutes is up then a new one needs to happen.

                        var OpensWithNothingSent = OpenAlarms.LeftAntiJoin(Sents, (e1, e2) => e1.Alarm_ID == e2.Alarm_ID).AlterEventDuration(e => TimeSpan.FromTicks(1));

                        //

                        var CreateSendEvent = from r in OpensWithNothingSent

                                               select new CAA_Message

                                         {

                                             Alarm_ID = r.Alarm_ID,

                                             MessBody = r.MessBody,

                                             ENCNTR_ID = r.ENCNTR_ID,

                                             PRSNL_ID = r.PRSNL_ID,

                                             ActivityType = 4,

                                             Tree_Level = FindTreeLevel(r.Alarm_ID),

                                             AlarmDTTM = DateTime.Now

                                         };

                        //Point to Signal Conversion http://msdn.microsoft.com/en-us/library/ee362414.aspx "fold pairs" http://www.devbiker.net/post/Cool-StreamInsight-querye28093Point-input-to-Edge-Output.aspx

                        var QryDone = Opens.Union(CreateSendEvent).Union(ACKs);

                        //SINK

                        var mySink_NewAlert = myApp.DefineObserver(() => Observer.Create<PointEvent<CAA_Message>>(OneSinktoRuleThemAll));

                

                        //BIND

                        procAll = QryDone.Bind(mySink_NewAlert).RunCheckpointable("CAA_All");

                    } //if (myApp.Processes.TryGetValue("LabNotify", out proc))

                    //RUN

                    using (CheckpointLoop(server, myApp.CheckpointableProcesses["CAA_All"], TimeSpan.FromSeconds(1)))

                    {

                                Console.WriteLine("Started checkpointing... Press enter to shut down normally...");

                                Console.ReadLine(); //keep thread alive

                    }

                    //

                    Console.WriteLine("Stopped query.");

                    //

                    host.Close();

                } //using (var server = Server.Create("Default", metaConfig, chkConfig))

            }

    System.AggregateException was caught

      HResult=-2146233088

      Message=One or more errors occurred.

      Source=mscorlib

      StackTrace:

           at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)

           at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)

           at System.Threading.Tasks.Task.Wait()

           at SI_Critical_Alarm_Alert.Program.<>c__DisplayClass9.<CheckpointLoop>b__6(Int64 i) in c:\TFS\Datapumps\StreamInsight\SI_Critical_Alarm_Alert\SI_Critical_Alarm_Alert\Program.cs:line 375

      InnerException: Microsoft.ComplexEventProcessing.ManagementException

           HResult=-2146233088

           Message=Could not find the corresponding event for an incoming retraction or expansion event. This can be caused by the usage of a non-deterministic user-defined function, aggregate or operator.

           Source=Microsoft.ComplexEventProcessing.Diagnostics

           StackTrace:

                at Microsoft.ComplexEventProcessing.Diagnostics.Exceptions.Throw(Exception exception)

                at Microsoft.ComplexEventProcessing.EmbeddedServerProxy.EndCheckpoint(IAsyncResult asyncResult)

                at Microsoft.ComplexEventProcessing.CepCheckpointableProcess.<>c__DisplayClass7.<CheckpointAsync>b__3(IAsyncResult iar)

           InnerException: Microsoft.ComplexEventProcessing.Engine.OperatorExecutionException

                HResult=-2146233088

                Message=Could not find the corresponding event for an incoming retraction or expansion event. This can be caused by the usage of a non-deterministic user-defined function, aggregate or operator.

                Source=Microsoft.ComplexEventProcessing.Diagnostics

                StackTrace:

                     at Microsoft.ComplexEventProcessing.Diagnostics.Exceptions.Throw(Exception exception)

                     at Microsoft.ComplexEventProcessing.Engine.AsyncResult`1.EndInvoke()

                     at Microsoft.ComplexEventProcessing.CommandDispatcher.CommandDispatcher.EndCheckpoint(IAsyncResult asyncResult)

                     at Microsoft.ComplexEventProcessing.EmbeddedServerProxy.EndCheckpoint(IAsyncResult asyncResult)

                InnerException:

    Any help will be appreciated,

    Andrew

All Replies

  • Wednesday, February 20, 2013 5:10 PM
     
     

    Here's the key: non-deterministic user-defined function, aggregate or operator.

    Using DateTime.Now() is a non-deterministic UDF. You should use the event headers (StartTime/EndTime) instead of trying to add a timestamp into your payload as a part of a query.


    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

  • Thursday, February 21, 2013 6:55 PM
     
     Answered

    Spot on. A thousand points for Gryffindor!                

      //

                        var CreateSendEvent = from r in OpensWithNothingSent

                                               select new CAA_Message

                                         {

                                             Alarm_ID = r.Alarm_ID,

                                             MessBody = r.MessBody,

                                             ENCNTR_ID = r.ENCNTR_ID,

                                             PRSNL_ID = r.PRSNL_ID,

                                             ActivityType = 4,

                                             Tree_Level = FindTreeLevel(r.Alarm_ID),

                                             AlarmDTTM = r.AlarmDTTM,

                                             EventDTTM = r.EventDTTM

                                         };

    Thanks for your help,

    Andrew