Error -2146233088 Could not find the corresponding event for an incoming retraction or expansion event. This can be caused by the usage of a non-deterministic user-defined function, aggregate or operator.
-
Wednesday, February 20, 2013 4:09 PM
Simple alarm, Send, Ack idea here. Stuck with no way to debug. Not sure if this is an error with Stream Insight or my brain not working right.
Everything works great, popping events for every alarm every 20 minutes, until I get an Ack message. Then the query dies and I get the error on the next checkpoint.
I think my problem is with the CreateSendEvent query event being removed but I am not sure why.
static void Main(string[] args)
{
//
// Add Check Points to recover from a start and Stop
//
var metaConfig = new SqlCeMetadataProviderConfiguration
{
DataSource = "C:\\SI\\CAA\\SI_Critical_Alarm_Alert.sdf",
CreateDataSourceIfMissing = true
};
// Set up checkpointing. This needs a location to place the log files.
var chkConfig = new CheckpointConfiguration
{
LogPath = "C:\\SI\\CAA",
CreateLogPathIfMissing = true
};
using (var server = Server.Create("Default", metaConfig, chkConfig))
//
// Add Check Points to recover from a start and Stop
//
//
//using (var server = Server.Create("Default")) // if just embedded SI
{
var host = new ServiceHost(server.CreateManagementService());
host.AddServiceEndpoint(typeof(IManagementService), new WSHttpBinding(SecurityMode.Message), "http://localhost/SI_Critical_Alarm_Alert");
host.Open();
//
Application myApp;
if (!server.Applications.TryGetValue("SI_CAA", out myApp))
{
myApp = server.CreateApplication("SI_CAA");
}
CepProcess procAll;
if (myApp.Processes.TryGetValue("CAA_All", out procAll))
{
Console.WriteLine("Resuming process...");
procAll.Resume();
}
else
{
Console.WriteLine("Creating process...");
//
// AJS 01/10/2013 Add Check Points to recover from a start and Stop
//
// SOURCE
// QUEUE Critical Alarm Alert Event (677) SQL
var mySourceMessages = myApp.DefineObservable<CAA_Message>(() => new QEventCreator_CriticalAlarmAlert(677)).ToPointStreamable(
e => PointEvent.CreateInsert<CAA_Message>(DateTime.UtcNow, e),
AdvanceTimeSettings.StrictlyIncreasingStartTime);
//QUERY
var FirstFilter = from r in mySourceMessages
where r.ActivityType != -1
select r;
var Opens = from r in FirstFilter
where r.ActivityType == 1
select r;
var Alarms = from r in FirstFilter
where r.ActivityType == 6
select r;
var ACKs = from r in FirstFilter
where r.ActivityType == 3
select r;
var Sents = from r in FirstFilter
where r.ActivityType == 4
select r;
Alarms = Alarms.AlterEventDuration(e => TimeSpan.MaxValue);
// all events ending dates will be set to the starting date of the ACK event. If no ACK event has yet happend, the ending date will stay open forever.
var OpenAlarms = Alarms.ClipEventDuration(ACKs, (e1, e2) => e1.Alarm_ID == e2.Alarm_ID);
// Sents events lasts 20 minutes
Sents = Sents.AlterEventDuration(e => TimeSpan.FromMinutes(20));
// If no Sent event has happend yet, or the 20 minutes is up then a new one needs to happen.
var OpensWithNothingSent = OpenAlarms.LeftAntiJoin(Sents, (e1, e2) => e1.Alarm_ID == e2.Alarm_ID).AlterEventDuration(e => TimeSpan.FromTicks(1));
//
var CreateSendEvent = from r in OpensWithNothingSent
select new CAA_Message
{
Alarm_ID = r.Alarm_ID,
MessBody = r.MessBody,
ENCNTR_ID = r.ENCNTR_ID,
PRSNL_ID = r.PRSNL_ID,
ActivityType = 4,
Tree_Level = FindTreeLevel(r.Alarm_ID),
AlarmDTTM = DateTime.Now
};
//Point to Signal Conversion http://msdn.microsoft.com/en-us/library/ee362414.aspx "fold pairs" http://www.devbiker.net/post/Cool-StreamInsight-querye28093Point-input-to-Edge-Output.aspx
var QryDone = Opens.Union(CreateSendEvent).Union(ACKs);
//SINK
var mySink_NewAlert = myApp.DefineObserver(() => Observer.Create<PointEvent<CAA_Message>>(OneSinktoRuleThemAll));
//BIND
procAll = QryDone.Bind(mySink_NewAlert).RunCheckpointable("CAA_All");
} //if (myApp.Processes.TryGetValue("LabNotify", out proc))
//RUN
using (CheckpointLoop(server, myApp.CheckpointableProcesses["CAA_All"], TimeSpan.FromSeconds(1)))
{
Console.WriteLine("Started checkpointing... Press enter to shut down normally...");
Console.ReadLine(); //keep thread alive
}
//
Console.WriteLine("Stopped query.");
//
host.Close();
} //using (var server = Server.Create("Default", metaConfig, chkConfig))
}
System.AggregateException was caught
HResult=-2146233088
Message=One or more errors occurred.
Source=mscorlib
StackTrace:
at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
at System.Threading.Tasks.Task.Wait()
at SI_Critical_Alarm_Alert.Program.<>c__DisplayClass9.<CheckpointLoop>b__6(Int64 i) in c:\TFS\Datapumps\StreamInsight\SI_Critical_Alarm_Alert\SI_Critical_Alarm_Alert\Program.cs:line 375
InnerException: Microsoft.ComplexEventProcessing.ManagementException
HResult=-2146233088
Message=Could not find the corresponding event for an incoming retraction or expansion event. This can be caused by the usage of a non-deterministic user-defined function, aggregate or operator.
Source=Microsoft.ComplexEventProcessing.Diagnostics
StackTrace:
at Microsoft.ComplexEventProcessing.Diagnostics.Exceptions.Throw(Exception exception)
at Microsoft.ComplexEventProcessing.EmbeddedServerProxy.EndCheckpoint(IAsyncResult asyncResult)
at Microsoft.ComplexEventProcessing.CepCheckpointableProcess.<>c__DisplayClass7.<CheckpointAsync>b__3(IAsyncResult iar)
InnerException: Microsoft.ComplexEventProcessing.Engine.OperatorExecutionException
HResult=-2146233088
Message=Could not find the corresponding event for an incoming retraction or expansion event. This can be caused by the usage of a non-deterministic user-defined function, aggregate or operator.
Source=Microsoft.ComplexEventProcessing.Diagnostics
StackTrace:
at Microsoft.ComplexEventProcessing.Diagnostics.Exceptions.Throw(Exception exception)
at Microsoft.ComplexEventProcessing.Engine.AsyncResult`1.EndInvoke()
at Microsoft.ComplexEventProcessing.CommandDispatcher.CommandDispatcher.EndCheckpoint(IAsyncResult asyncResult)
at Microsoft.ComplexEventProcessing.EmbeddedServerProxy.EndCheckpoint(IAsyncResult asyncResult)
InnerException:
Any help will be appreciated,
Andrew
All Replies
-
Wednesday, February 20, 2013 5:10 PM
Here's the key: non-deterministic user-defined function, aggregate or operator.
Using DateTime.Now() is a non-deterministic UDF. You should use the event headers (StartTime/EndTime) instead of trying to add a timestamp into your payload as a part of a query.
DevBiker (aka J Sawyer)
Microsoft MVP - Sql Server (StreamInsight)
If I answered your question, please mark as answer.
If my post was helpful, please mark as helpful. -
Thursday, February 21, 2013 6:55 PM
Spot on. A thousand points for Gryffindor!
//
var CreateSendEvent = from r in OpensWithNothingSent
select new CAA_Message
{
Alarm_ID = r.Alarm_ID,
MessBody = r.MessBody,
ENCNTR_ID = r.ENCNTR_ID,
PRSNL_ID = r.PRSNL_ID,
ActivityType = 4,
Tree_Level = FindTreeLevel(r.Alarm_ID),
AlarmDTTM = r.AlarmDTTM,
EventDTTM = r.EventDTTM
};
Thanks for your help,
Andrew
- Marked As Answer by Andrew Slaughter Thursday, February 21, 2013 6:57 PM


