Event / Subscriber Process using Service Broker in Sql server 2005
-
Saturday, January 19, 2013 12:16 PM
Hi All,
I am new to service broker and need to achieve below task using event/subscriber process in service broker in sql server 2005.
For example:
Whenever a change is made to a table in the TestDB databse ,we need to insert records into another two tables of DemoDB database both these databases reside in the same server using event/subscriber process.I was told to create stored procedures for event_subscribe(SP that allows to subscribe to an even),event_unsubs
cribe (SP that allows to unsubscribe to an event) ,Tables , Messages,queues,service etc to achieve this. I just googled and did not find any articles on event /subscriber model using service broker.It seems we need to write "Event Notifications" for event /subscribe process (not sure) .But we can write event notifcations on DDL operations not on DML ? Please clarrify me on this.
Can we achieve above task without " Event Notifications" using Event / Subscriber Model ?
Please guide me the steps involved (How many stored procedures, triggers,Messages,queues,services etc ) to achieve above task?
Thanks in Advance
All Replies
-
Sunday, January 20, 2013 3:20 PM
Can we achieve above task without " Event Notifications" using Event / Subscriber Model ?
Please guide me the steps involved (How many stored procedures, triggers,Messages,queues,services etc ) to achieve above task?
Below is an example script of one method to accomplish the publish/subscribe pattern, which uses a trigger on the TestDB table to generate the event message. The trigger in this example simply publishes insert, update and delete events but you can easily add other data to the event message, such a data from the modified rows. A summary of the process:
- Trigger calls proc to send message to each subscriber
- The activated event subscription queue proc in the subscriber database performs any needed actions (inserts event into table1 and table2)
The event_subscribe proc creates a queue, service and route for the subscriber. This proc uses dynamic SQL to create the objects so I prepend the subscription name to the objects to guarantee uniqueness and clearly identify their purpose. Note that you'll need to create the event subscription stored procedure before creating the subscription.
USE master; DROP DATABASE TestDB; DROP DATABASE DemoDB; GO USE master; GO CREATE DATABASE TestDB; ALTER AUTHORIZATION ON DATABASE::TestDB TO sa; ALTER DATABASE TestDB SET ENABLE_BROKER; ALTER DATABASE TestDB SET TRUSTWORTHY ON; --can use certificate instead of TRUSTWORTHY GO CREATE DATABASE DemoDB; ALTER AUTHORIZATION ON DATABASE::DemoDB TO sa; ALTER DATABASE DemoDB SET ENABLE_BROKER; ALTER DATABASE DemoDB SET TRUSTWORTHY ON; --can use certificate instead of TRUSTWORTHY GO USE TestDB; --create publication SB objects in publishing database USE TestDB; CREATE MESSAGE TYPE EventMessageType VALIDATION = WELL_FORMED_XML; CREATE CONTRACT EventContract( EventMessageType SENT BY INITIATOR ); --create table to manage event publications CREATE TABLE dbo.event_publication( event_publication_name nvarchar(50) CONSTRAINT pk_event_publication PRIMARY KEY ); --create table to manage event subscriptions CREATE TABLE dbo.event_subscription( event_publication_name nvarchar(50) NOT NULL ,event_subsciption_name nvarchar(50) NOT NULL ,conversation_handle uniqueidentifier NULL ,CONSTRAINT pk_event_subscription PRIMARY KEY(event_publication_name, event_subsciption_name) ,CONSTRAINT fk_event_subscription_event_publication FOREIGN KEY (event_publication_name) REFERENCES dbo.event_publication(event_publication_name) ); GO --create proc to create subscription CREATE PROC dbo.event_subscribe @EventPublicationName nvarchar(50) ,@EventSubscriptionDatabaseName sysname ,@EventSubscriptionSchemaName sysname ,@EventSubscriptionName nvarchar(50) ,@EventSubscriptionProcedureName nvarchar(261) AS DECLARE @Sql nvarchar(MAX) ,@ConversationHandle uniqueidentifier ,@EventSubscriptionTargetQueueName nvarchar(130) ,@EventSubscriptionServiceName nvarchar(130) ,@EventSubscriptionRouteName nvarchar(130) ,@EventPublicationServiceName nvarchar(130); SELECT @EventSubscriptionTargetQueueName = @EventSubscriptionSchemaName + N'.' + @EventSubscriptionName + N'_SubscriptionTargetQueue' ,@EventSubscriptionServiceName = @EventSubscriptionName + N'_SubscriptionService' ,@EventSubscriptionRouteName = @EventSubscriptionName + N'_SubscriptionRoute' ,@EventPublicationServiceName = @EventPublicationName + N'_PublicationService'; BEGIN TRY --create Service Broker queue for this subscriber SET @Sql = N'USE ' + @EventSubscriptionDatabaseName + N';CREATE QUEUE ' + @EventSubscriptionTargetQueueName + N' WITH ACTIVATION ( PROCEDURE_NAME = ' + @EventSubscriptionProcedureName + N', MAX_QUEUE_READERS = 1, EXECUTE AS OWNER);'; EXEC sp_executesql @Sql; --create Service Broker service for this subscriber SET @Sql = N'USE ' + @EventSubscriptionDatabaseName + N';CREATE SERVICE ' + @EventSubscriptionServiceName + N' ON QUEUE ' + @EventSubscriptionTargetQueueName + N' (EventContract);'; EXEC sp_executesql @Sql; --create Service Broker route for this subscriber SET @Sql = N'CREATE ROUTE ' + @EventSubscriptionRouteName + N' WITH SERVICE_NAME = N''' + @EventSubscriptionServiceName + N''', ADDRESS = ''LOCAL'';'; EXEC sp_executesql @Sql; --begin conversation for this subscriber SET @Sql = N'BEGIN DIALOG @ConversationHandle FROM SERVICE ' + @EventPublicationServiceName + N' TO SERVICE ''' + @EventSubscriptionServiceName + N'''' + N' ON CONTRACT EventContract WITH ENCRYPTION = OFF;'; EXEC sp_executesql @Sql ,N'@ConversationHandle uniqueidentifier OUT' ,@ConversationHandle = @ConversationHandle OUT; --insert subscriber information INSERT INTO dbo.event_subscription (event_publication_name, event_subsciption_name, conversation_handle) VALUES (@EventPublicationName, @EventSubscriptionName, @ConversationHandle); END TRY BEGIN CATCH DECLARE @ErrorNumber int ,@ErrorMessage nvarchar(2048) ,@ErrorSeverity int ,@ErrorState int ,@ErrorLine int; SELECT @ErrorNumber =ERROR_NUMBER() ,@ErrorMessage =ERROR_MESSAGE() ,@ErrorSeverity = ERROR_SEVERITY() ,@ErrorState =ERROR_STATE() ,@ErrorLine =ERROR_LINE(); RAISERROR('Error %d caught at line %d: %s' ,@ErrorSeverity ,@ErrorState ,@ErrorNumber ,@ErrorLine ,@ErrorMessage); END CATCH; RETURN @@ERROR; GO --create proc to remove subscription CREATE PROC dbo.event_unsubscribe @EventPublicationName nvarchar(50) ,@EventSubscriptionDatabaseName sysname ,@EventSubscriptionSchemaName sysname ,@EventSubscriptionName nvarchar(50) AS DECLARE @Sql nvarchar(MAX) ,@ConversationHandle uniqueidentifier ,@EventSubscriptionTargetQueueName nvarchar(130) ,@EventSubscriptionServiceName nvarchar(130) ,@EventSubscriptionRouteName nvarchar(130) ,@EventPublicationServiceName nvarchar(130); SELECT @EventSubscriptionTargetQueueName = @EventSubscriptionSchemaName + N'.' + @EventSubscriptionName + N'_SubscriptionTargetQueue' ,@EventSubscriptionServiceName = @EventSubscriptionName + N'_SubscriptionService' ,@EventSubscriptionRouteName = @EventSubscriptionName + N'_SubscriptionRoute' ,@EventPublicationServiceName = @EventPublicationName + N'_PublicationService'; BEGIN TRY --drop subscription information from publisher DELETE FROM TestDB.dbo.event_subscription WHERE event_publication_name = @EventPublicationName AND event_subsciption_name = @EventSubscriptionName; --drop Service Broker subscriber route SET @Sql = N'DROP ROUTE ' + @EventSubscriptionRouteName + N';'; EXEC sp_executesql @Sql; --drop Service Broker subscriber service SET @Sql = N'USE ' + @EventSubscriptionDatabaseName + N';DROP SERVICE ' + @EventSubscriptionServiceName + N';'; EXEC sp_executesql @Sql; --drop Service Broker subscriber queue SET @Sql = N'USE ' + @EventSubscriptionDatabaseName + N';DROP QUEUE ' + @EventSubscriptionTargetQueueName + N';'; EXEC sp_executesql @Sql; END TRY BEGIN CATCH DECLARE @ErrorNumber int ,@ErrorMessage nvarchar(2048) ,@ErrorSeverity int ,@ErrorState int ,@ErrorLine int; SELECT @ErrorNumber =ERROR_NUMBER() ,@ErrorMessage =ERROR_MESSAGE() ,@ErrorSeverity = ERROR_SEVERITY() ,@ErrorState =ERROR_STATE() ,@ErrorLine =ERROR_LINE(); RAISERROR('Error %d caught at line %d: %s' ,@ErrorSeverity ,@ErrorState ,@ErrorNumber ,@ErrorLine ,@ErrorMessage); END CATCH; RETURN @@ERROR; GO --create proc to send message to each subscriber CREATE PROC dbo.multicast_message_to_subscribers @EventPublicationName nvarchar(50) ,@EventPublicationMessage xml AS DECLARE @ConversationHandle uniqueidentifier ,@EventSubscriptionName nvarchar(50) ,@Sql nvarchar(MAX); DECLARE Subscriptions CURSOR LOCAL FAST_FORWARD FOR SELECT conversation_handle ,event_subsciption_name FROM dbo.event_subscription WHERE event_publication_name = @EventPublicationName; OPEN Subscriptions; WHILE 1 = 1 BEGIN FETCH NEXT FROM Subscriptions INTO @ConversationHandle, @EventSubscriptionName; IF @@FETCH_STATUS = -1 BREAK; SEND ON CONVERSATION @ConversationHandle MESSAGE TYPE EventMessageType (@EventPublicationMessage); END; CLOSE Subscriptions; DEALLOCATE Subscriptions; --COMMIT; GO --user table CREATE TABLE dbo.table1( col1 int NOT NULL IDENTITY CONSTRAINT pk_table1 PRIMARY KEY ,col2 int ); GO --trigger to generate insert, update and delete events CREATE TRIGGER tr_table1 ON dbo.table1 FOR INSERT, UPDATE, DELETE AS DECLARE @EventPublicationMessage xml; IF EXISTS(SELECT * FROM inserted) BEGIN IF EXISTS(SELECT * FROM deleted) BEGIN SET @EventPublicationMessage = '<event>update</event>'; END ELSE BEGIN SET @EventPublicationMessage = '<event>insert</event>'; END; END ELSE BEGIN IF EXISTS(SELECT * FROM deleted) BEGIN SET @EventPublicationMessage = '<event>delete</event>'; END END; IF @EventPublicationMessage IS NOT NULL BEGIN EXEC dbo.multicast_message_to_subscribers @EventPublicationName = N'TestDB_table1' , @EventPublicationMessage = @EventPublicationMessage; END GO --create SB queue and service for publication CREATE QUEUE dbo.TestDB_table1_PublicationInitiatorQueue; CREATE SERVICE TestDB_table1_PublicationService ON QUEUE dbo.TestDB_table1_PublicationInitiatorQueue (EventContract); --insert row for publication INSERT INTO dbo.event_publication VALUES(N'TestDB_table1'); GO USE DemoDB; --create SB message type and contract for subscriptions CREATE MESSAGE TYPE EventMessageType VALIDATION = WELL_FORMED_XML; CREATE CONTRACT EventContract( EventMessageType SENT BY INITIATOR ); GO CREATE TABLE dbo.table1( event_name char(7) NOT NULL ,insert_timestamp datetime NOT NULL CONSTRAINT df_table1_insert_timestamp DEFAULT GETDATE() ); CREATE TABLE dbo.table2( event_name char(7) NOT NULL ,insert_timestamp datetime NOT NULL CONSTRAINT df_table2_insert_timestamp DEFAULT GETDATE() ); GO CREATE PROC dbo.insert_table1 AS DECLARE @EventMessage xml ,@EventName char(7); SET XACT_ABORT ON; WHILE 1 = 1 BEGIN BEGIN TRAN; WAITFOR( RECEIVE TOP (1) @EventMessage = CAST(message_body AS xml) FROM dbo.DemoDB_table1_SubscriptionTargetQueue ), TIMEOUT 1000; IF @@ROWCOUNT = 0 BEGIN COMMIT; BREAK; END; SET @EventName = @EventMessage.value('/event[1]', 'char(7)'); INSERT INTO dbo.table1(event_name) VALUES (@EventName); COMMIT; END; RETURN @@ERROR; GO CREATE PROC dbo.insert_table2 AS DECLARE @EventMessage xml ,@EventName char(7); SET XACT_ABORT ON; WHILE 1 = 1 BEGIN BEGIN TRAN; WAITFOR( RECEIVE TOP (1) @EventMessage = CAST(message_body AS xml) FROM dbo.DemoDB_table2_SubscriptionTargetQueue ), TIMEOUT 1000; IF @@ROWCOUNT = 0 BEGIN COMMIT; BREAK; END; SET @EventName = @EventMessage.value('/event[1]', 'char(7)'); INSERT INTO dbo.table2(event_name) VALUES (@EventName); COMMIT; END; RETURN @@ERROR; GO USE TestDB; --create event subscription 1 EXEC TestDB.dbo.event_subscribe @EventPublicationName = N'TestDB_table1' ,@EventSubscriptionDatabaseName = N'DemoDB' ,@EventSubscriptionSchemaName = N'dbo' ,@EventSubscriptionName = N'DemoDB_table1' ,@EventSubscriptionProcedureName = N'dbo.insert_table1'; --create event subscription 2 EXEC TestDB.dbo.event_subscribe @EventPublicationName = N'TestDB_table1' ,@EventSubscriptionDatabaseName = N'DemoDB' ,@EventSubscriptionSchemaName = N'dbo' ,@EventSubscriptionName = N'DemoDB_table2' ,@EventSubscriptionProcedureName = N'dbo.insert_table2'; --make some changes INSERT INTO dbo.table1 VALUES(1); INSERT INTO dbo.table1 VALUES(2); UPDATE dbo.table1 SET col2 = 2; DELETE FROM dbo.table1; --changes should be in subscriber tables after a brief delay WAITFOR DELAY '00:00:01'; SELECT * FROM DemoDB.dbo.table1; SELECT * FROM DemoDB.dbo.table2; --remove event subscriptions EXEC TestDB.dbo.event_unsubscribe @EventPublicationName = N'TestDB_table1' ,@EventSubscriptionDatabaseName = N'DemoDB' ,@EventSubscriptionSchemaName = N'dbo' ,@EventSubscriptionName = N'DemoDB_table1'; EXEC TestDB.dbo.event_unsubscribe @EventPublicationName = N'TestDB_table1' ,@EventSubscriptionDatabaseName = N'DemoDB' ,@EventSubscriptionSchemaName = N'dbo' ,@EventSubscriptionName = N'DemoDB_table2';
Dan Guzman, SQL Server MVP, http://www.dbdelta.com
- Edited by Dan GuzmanMVP Sunday, January 20, 2013 3:25 PM corrected description
- Marked As Answer by ksd1234 Sunday, January 20, 2013 4:21 PM
-
Sunday, January 20, 2013 4:20 PM
Hi Dan,
Thank you so much for your valuable inputs on this task. I really appreciate your help.
I will walk-through the above mentioned steps and start working on this task and will let you know if any help needed.
Thank you once again.
Thanks,
ksd123
-
Sunday, February 17, 2013 10:56 AM
Hi Dan,
There are some minor changes and I was asked to achieve this task in more generic way. So instead of tables subscribing for an event, applications would subscribe for an event.
Process:Trigger->Throw(SP)->Distribute(SP)->Execute(SP)->Acknolwdge(SP)
event_subscribe (application, event, Procedure): application to subscribe to an event or to change the target of an existing subscription.
event_unsubscribe (application, event) : application to unsubscribe from an event.
Throw: This SP will be invoked by the application that is throwing the event.
Distribute: This SP takes the thrown event and arguments merges the arguments and then starts a conversation with each subscriber to the event.
Execute: This SP will be invoked by service broker when there are event(s) in the queue that will then invoke the designated subscriber procedure.After completion of the called procedure it will send back an acknowledgement
Acknowledge: This SP invoked by service broker when there are responses to events in the queue from the designated subscriber procedure.After completion of the called procedure the event will be closed
event (table):
Event_name Defalutxml.
created /updated/deleted values of all parameters
event_ subscriptions(table): This table contains each event that an application wants to be notified about along with the name of the target service (SP) that is to be invoked when the event occurs.
application Event_name Target_service(SP)
application_1 created /updated/deleted Target_service_SP_1
Here is the detailed process flow:
1)Trigger calls Throw(sp) and starts conversation with Distribute(SP), passing it the event name and provided XML and Throw terminates.
2)Service Broker starts the Distribute procedure in response to an conversation being started with it.
Distribute(SP) merges the provided parameters with the default parameters for the event to ensure all parameters are populated.
For each application that has subscribed to the event, Distribute begins a conversation with Execute Procedure giving it the name of the procedure specified in the subscription of that application to execute and the merged parameters as XML.
Distribute(SP) completes the conversation with Throw and terminates.
3)Service broker starts the Execute Procedure in response to a conversation being started with it.
Execute Procedure invokes the specified procedure passing it the provided XML parameters.
When the procedure returns, execute completes the conversation with Distribute and terminates.I am little bit confused and need example scripts for the Throw,Distribute,Execute,Acknowledge procedures and Trigger (for updated I need old and new values) .
Thanks in Advance
-
Monday, February 18, 2013 7:34 PM
I am little bit confused and need example scripts for the Throw,Distribute,Execute,Acknowledge procedures and Trigger (for updated I need old and new values) .
I took a stab at some scripts based on your requirements as I understand them. I assumed you wanted asynchronous processing throughout so I added table pending_executor to keep track of the executor requests. The conversation from the distributor to the publisher is ended once all are complete. You'll need to implement the defaultxml merge (in the Publish proc) as well as the execute stored procedure.
I recommend you add a logging table to facilitate troubleshooting, monitoring and auditing.
CREATE DATABASE TestDB; ALTER AUTHORIZATION ON DATABASE::TestDB TO sa; ALTER DATABASE TestDB SET ENABLE_BROKER; ALTER DATABASE TestDB SET TRUSTWORTHY ON; --can use certificate instead of TRUSTWORTHY GO CREATE DATABASE DemoDB; ALTER AUTHORIZATION ON DATABASE::DemoDB TO sa; ALTER DATABASE DemoDB SET ENABLE_BROKER; ALTER DATABASE DemoDB SET TRUSTWORTHY ON; --can use certificate instead of TRUSTWORTHY GO USE TestDB; GO CREATE TABLE dbo.event( Event_name nvarchar(50) NOT NULL CONSTRAINT PK_Event PRIMARY KEY ,Defaultxml xml NOT NULL ); INSERT INTO dbo.event VALUES ('created', '<default />'); CREATE TABLE dbo.event_subscription( application sysname NOT NULL ,Event_name nvarchar(50) NOT NULL CONSTRAINT FK_event_subscription_event FOREIGN KEY REFERENCES dbo.event(Event_name) ,Target_service_sp_name nvarchar(392) NOT NULL ,CONSTRAINT PK_event_subscription PRIMARY KEY CLUSTERED (application, Event_name) ); CREATE TABLE dbo.pending_executor( publisher_conversation_handle uniqueidentifier NOT NULL ,executor_conversation_handle uniqueidentifier NOT NULL ,CONSTRAINT PK_pending_executor PRIMARY KEY CLUSTERED (publisher_conversation_handle, executor_conversation_handle) ); GO CREATE PROC dbo.AcknowledgeDistributorMessages AS DECLARE @conversation_handle uniqueidentifier ,@message_type_name sysname ,@message_body varbinary(MAX); WHILE 1 = 1 BEGIN WAITFOR ( RECEIVE TOP(1) @conversation_handle = conversation_handle ,@message_type_name = message_type_name ,@message_body = message_body FROM dbo.PublisherQueue ), timeout 1000; IF @@ROWCOUNT = 0 BREAK; IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @conversation_handle; END ELSE BEGIN END CONVERSATION @conversation_handle WITH ERROR = 1 DESCRIPTION = 'Unexpected message type'; END; END; RETURN @@ERROR; GO CREATE PROC dbo.AcknowledgeExecutorMessages AS DECLARE @conversation_handle uniqueidentifier ,@publisher_conversation_handle uniqueidentifier ,@message_type_name sysname ,@message_body xml; WHILE 1 = 1 BEGIN WAITFOR ( RECEIVE TOP(1) @conversation_handle = conversation_handle ,@message_type_name = message_type_name ,@message_body = CAST(message_body AS xml) FROM dbo.DistributorInitiatorQueue ), timeout 1000; IF @@ROWCOUNT = 0 BREAK; IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @conversation_handle; --get related publisher conversation SELECT @publisher_conversation_handle = publisher_conversation_handle FROM dbo.pending_executor WHERE executor_conversation_handle = @conversation_handle; END ELSE BEGIN END CONVERSATION @conversation_handle WITH ERROR = 1 DESCRIPTION = 'Unexpected message type'; END; --remove completed executor DELETE FROM dbo.pending_executor WHERE publisher_conversation_handle = @publisher_conversation_handle AND executor_conversation_handle = @conversation_handle; IF NOT EXISTS(SELECT * FROM dbo.pending_executor WHERE publisher_conversation_handle = @publisher_conversation_handle ) BEGIN --end publisher conversation after all executors have completed END CONVERSATION @publisher_conversation_handle; END; END; RETURN @@ERROR; GO --create proc to send message to distributor CREATE PROC dbo.throw_to_distributor @Event_name nvarchar(50) ,@event_xml xml AS DECLARE @conversation_handle uniqueidentifier; BEGIN DIALOG CONVERSATION @conversation_handle FROM SERVICE PublisherService TO SERVICE 'DistributorTargetService' ON CONTRACT EventContract WITH ENCRYPTION = OFF; --add EventName to message SET @event_xml.modify('insert <EventName>{ sql:variable("@Event_name") }</EventName> as last into (/)'); SEND ON CONVERSATION @conversation_handle MESSAGE TYPE EventMessageType (@event_xml); RETURN @@ERROR; GO --user table CREATE TABLE dbo.table1( col1 int NOT NULL IDENTITY CONSTRAINT pk_table1 PRIMARY KEY ,col2 int ); GO --trigger to generate insert, update and delete events CREATE TRIGGER tr_table1 ON dbo.table1 FOR INSERT, UPDATE, DELETE AS DECLARE @Event_name nvarchar(50) ,@event_xml xml; IF EXISTS(SELECT * FROM inserted) BEGIN IF EXISTS(SELECT * FROM deleted) BEGIN SET @Event_name = 'updated'; SET @event_xml = ( SELECT * FROM inserted FOR XML PATH('') ) END ELSE BEGIN SET @Event_name = 'created'; SET @event_xml = ( SELECT * FROM inserted FOR XML PATH('') ) END; END ELSE BEGIN IF EXISTS(SELECT * FROM deleted) BEGIN SET @Event_name = 'deleted'; SET @event_xml = ( SELECT * FROM deleted FOR XML PATH('') ) END END; IF @Event_name IS NOT NULL BEGIN EXEC dbo.throw_to_distributor @Event_name = @Event_name , @event_xml = @event_xml; END GO CREATE PROC dbo.Distribute AS DECLARE @conversation_handle uniqueidentifier ,@executor_conversation_handle uniqueidentifier ,@message_body xml ,@executor_message_body xml ,@message_type_name sysname ,@Event_name nvarchar(50) ,@Target_service_sp_name nvarchar(392) ,@Defaultxml xml; WHILE 1 = 1 BEGIN WAITFOR ( RECEIVE TOP(1) @conversation_handle = conversation_handle ,@message_body = CAST(message_body AS xml) ,@message_type_name = message_type_name FROM dbo.DistributorTargetQueue ), timeout 1000; IF @@ROWCOUNT = 0 BREAK; IF @message_type_name = 'EventMessageType' BEGIN --extract event name from message SELECT @Event_name = @message_body.query('/EventName').value('.', 'nvarchar(50)'); SELECT @Defaultxml FROM dbo.event WHERE Event_name = @Event_name; --------------------------------------------------------- --TODO: merge @Defaultxml into @message_body and validate --------------------------------------------------------- --invoke executor for each subscriber DECLARE Subscriptions CURSOR LOCAL STATIC FOR SELECT Target_service_sp_name FROM dbo.event_subscription WHERE Event_name = @Event_name; OPEN Subscriptions; WHILE 1 = 1 BEGIN FETCH NEXT FROM Subscriptions INTO @Target_service_sp_name; IF @@FETCH_STATUS = -1 BREAK; BEGIN DIALOG CONVERSATION @executor_conversation_handle FROM SERVICE DistributorInitiatorService TO SERVICE 'ExecutorService' ON CONTRACT EventContract WITH ENCRYPTION = OFF; --add ExecutorProcName and PublisherConversationHandle to message SET @executor_message_body = @message_body; SET @executor_message_body.modify('insert <ExecutorProcName>{ sql:variable("@Target_service_sp_name") }</ExecutorProcName> as last into (/)'); SET @executor_message_body.modify('insert <PublisherConversationHandle>{ sql:variable("@conversation_handle") }</PublisherConversationHandle> as last into (/)'); SEND ON CONVERSATION @executor_conversation_handle MESSAGE TYPE EventMessageType (@executor_message_body); INSERT INTO dbo.pending_executor( publisher_conversation_handle ,executor_conversation_handle ) VALUES( @conversation_handle ,@executor_conversation_handle ); END; --next subscription CLOSE Subscriptions; DEALLOCATE Subscriptions; END; ELSE BEGIN END CONVERSATION @conversation_handle WITH ERROR = 1 DESCRIPTION = 'Unexpected message type'; END; END; RETURN @@ERROR; GO --create SB message type and contract for published events CREATE MESSAGE TYPE EventMessageType VALIDATION = WELL_FORMED_XML; CREATE CONTRACT EventContract( EventMessageType SENT BY INITIATOR ); GO --create SB queue and service for publisher CREATE QUEUE dbo.PublisherQueue WITH ACTIVATION ( PROCEDURE_NAME = dbo.AcknowledgeDistributorMessages, MAX_QUEUE_READERS = 1, EXECUTE AS OWNER); CREATE SERVICE PublisherService ON QUEUE dbo.PublisherQueue (EventContract); GO --create SB queue and service for distributor target CREATE QUEUE dbo.DistributorTargetQueue WITH ACTIVATION ( PROCEDURE_NAME = dbo.Distribute, MAX_QUEUE_READERS = 1, EXECUTE AS OWNER); CREATE SERVICE DistributorTargetService ON QUEUE dbo.DistributorTargetQueue (EventContract); GO --create SB queue and service for distributor initiator CREATE QUEUE dbo.DistributorInitiatorQueue WITH ACTIVATION ( PROCEDURE_NAME = dbo.AcknowledgeExecutorMessages, MAX_QUEUE_READERS = 1, EXECUTE AS OWNER); CREATE SERVICE DistributorInitiatorService ON QUEUE dbo.DistributorInitiatorQueue (EventContract); GO --create proc to create subscription CREATE PROC dbo.event_subscribe @Event_name nvarchar(50) ,@application sysname ,@Target_service_sp_name nvarchar(392) AS INSERT INTO dbo.event_subscription (Event_name, application, Target_service_sp_name) VALUES (@Event_name, @application, @Target_service_sp_name); RETURN @@ERROR; GO --create proc to remove subscription CREATE PROC dbo.event_unsubscribe @Event_name nvarchar(50) ,@application sysname AS DELETE FROM dbo.event_subscription WHERE Event_name = @Event_name AND application = @application; RETURN @@ERROR; GO EXEC dbo.event_subscribe @Event_name = 'created' ,@application = 'app1' ,@Target_service_sp_name = 'Target_service_SP_1' GO USE DemoDB; GO --create SB message type and contract for events CREATE MESSAGE TYPE EventMessageType VALIDATION = WELL_FORMED_XML; CREATE CONTRACT EventContract( EventMessageType SENT BY INITIATOR ); GO CREATE PROC dbo.Executor AS DECLARE @conversation_handle uniqueidentifier ,@service_name sysname ,@message_type_name sysname ,@message_body xml ,@Target_service_sp_name nvarchar(392); WHILE 1 = 1 BEGIN WAITFOR ( RECEIVE TOP(1) @conversation_handle = conversation_handle ,@service_name = service_name ,@message_type_name = message_type_name ,@message_body = CAST(message_body AS xml) FROM dbo.ExecutorQueue ), timeout 1000; IF @@ROWCOUNT = 0 BREAK; SET @Target_service_sp_name = @message_body.query('/ExecutorProcName').value('.', 'nvarchar(261)'); BEGIN TRY EXEC @Target_service_sp_name @message_body; END CONVERSATION @conversation_handle; END TRY BEGIN CATCH DECLARE @ErrorNumber int ,@ErrorMessage nvarchar(2048) ,@ErrorSeverity int ,@ErrorState int ,@ErrorLine int; SELECT @ErrorNumber =ERROR_NUMBER() ,@ErrorMessage =ERROR_MESSAGE() ,@ErrorSeverity = ERROR_SEVERITY() ,@ErrorState =ERROR_STATE() ,@ErrorLine =ERROR_LINE(); SET @ErrorMessage = 'Error ' + CAST(@ErrorNumber AS varchar(10)) + ' caught at line ' + CAST(@ErrorLine AS varchar(10)) + ': ' + @ErrorMessage END CONVERSATION @conversation_handle WITH ERROR = 1 DESCRIPTION = @ErrorMessage; END CATCH; END; RETURN @@ERROR; GO --create SB queue and service for publisher CREATE QUEUE dbo.ExecutorQueue WITH ACTIVATION ( PROCEDURE_NAME = dbo.Executor, MAX_QUEUE_READERS = 1, EXECUTE AS OWNER); CREATE SERVICE ExecutorService ON QUEUE dbo.ExecutorQueue (EventContract); GO CREATE PROC dbo.Target_service_SP_1 @event_message xml AS --TODO: implement @event_message processing RETURN @@ERROR; GO
Dan Guzman, SQL Server MVP, http://www.dbdelta.com
- Marked As Answer by ksd1234 Monday, February 25, 2013 6:02 PM
-
Tuesday, February 19, 2013 7:43 PM
Hi Dan,
Thank you for your time.Could you please give me more insight on below statements from Syntax & declaration perspective (.modify,sql:variable,last into ,.query)
1)throw_to_distributor
SET @event_xml.modify('insert <EventName>{ sql:variable("@Event_name") }</EventName> as last into (/)');
2)Executor
SET @Target_service_sp_name = @message_body.query('/ExecutorProcName').value('.', 'nvarchar(261)');
3)Distribute
IF @message_type_name = 'EventMessageType'
BEGIN
--extract event name from message
SELECT @Event_name = @message_body.query('/EventName').value('.', 'nvarchar(50)'); -
Wednesday, February 20, 2013 3:24 AM
Thank you for your time.Could you please give me more insight on below statements from Syntax & declaration perspective (.modify,sql:variable,last into ,.query)
For syntax and examples of the XML modify method, see http://msdn.microsoft.com/en-us/library/ms175466.aspx.
#1 adds the specified event name parameter to the message with the element name EventName. The EventName is needed by the distributer to determine the subscribers and may be used by the target proc too.
#2 adds the proc name from the event_subscriber table to the message with element name ExecutorProcName. This is needed by the Executor.
#3 extracts the EventName from the message into a local variable, which is used to determine the event subscribers.
Dan Guzman, SQL Server MVP, http://www.dbdelta.com
-
Monday, February 25, 2013 6:06 PM
Hi Dan,
I sincerely appreciate the time you spent on these scripts.
Again, thank you so much for your help.

