Event / Subscriber Process using Service Broker in Sql server 2005

Answered Event / Subscriber Process using Service Broker in Sql server 2005

  • Saturday, January 19, 2013 12:16 PM
     
     

     Hi All,

    I am new to service broker and need to achieve below task using event/subscriber process  in service broker in sql server 2005.

    For example:

    Whenever a change is made to  a table in the TestDB databse ,we need to insert records into another two tables of DemoDB database both these databases reside in the same server using event/subscriber process.I was told to create stored procedures for event_subscribe(SP that allows to subscribe to an even),event_unsubscribe (SP that allows to unsubscribe to an event) ,Tables , Messages,queues,service etc to achieve this.

    I just googled and did not find any articles on event /subscriber model using service broker.It seems we need to write "Event Notifications" for event /subscribe process (not sure) .But we can write event notifcations on DDL operations not on DML ? Please clarrify me on this.

    Can we achieve above task without " Event Notifications" using Event / Subscriber Model ?

    Please guide me the steps involved (How many stored procedures, triggers,Messages,queues,services etc ) to achieve above task?

    Thanks in Advance

All Replies

  • Sunday, January 20, 2013 3:20 PM
     
     Answered Has Code

    Can we achieve above task without " Event Notifications" using Event / Subscriber Model ?

    Please guide me the steps involved (How many stored procedures, triggers,Messages,queues,services etc ) to achieve above task?

    Below is an example script of one method to accomplish the publish/subscribe pattern, which uses a trigger on the TestDB table to generate the event message.  The trigger in this example simply publishes insert, update and delete events but you can easily add other data to the event message, such a data from the modified rows.  A summary of the process:

    1. Trigger calls proc to send message to each subscriber
    2. The activated event subscription queue proc in the subscriber database performs any needed actions (inserts event into table1 and table2)

    The event_subscribe proc creates a queue, service and route for the subscriber.  This proc uses dynamic SQL to create the objects so I prepend the subscription name to the objects to guarantee uniqueness and clearly identify their purpose.  Note that you'll need to create the event subscription stored procedure before creating the subscription.

     

    USE master;
    DROP DATABASE TestDB;
    DROP DATABASE DemoDB;
    GO
    
    USE master;
    GO
    CREATE DATABASE TestDB;
    ALTER AUTHORIZATION ON DATABASE::TestDB TO sa;
    ALTER DATABASE TestDB SET ENABLE_BROKER;
    ALTER DATABASE TestDB SET TRUSTWORTHY ON; --can use certificate instead of TRUSTWORTHY
    GO
    CREATE DATABASE DemoDB;
    ALTER AUTHORIZATION ON DATABASE::DemoDB TO sa;
    ALTER DATABASE DemoDB SET ENABLE_BROKER;
    ALTER DATABASE DemoDB SET TRUSTWORTHY ON; --can use certificate instead of TRUSTWORTHY
    GO
    
    USE TestDB;
    --create publication SB objects in publishing database
    USE TestDB;
    CREATE MESSAGE TYPE EventMessageType VALIDATION = WELL_FORMED_XML;
    CREATE CONTRACT EventContract(
    	EventMessageType SENT BY INITIATOR
    	);
    
    --create table to manage event publications
    CREATE TABLE dbo.event_publication(
    	event_publication_name nvarchar(50)
    		CONSTRAINT pk_event_publication PRIMARY KEY
    	);
    
    --create table to manage event subscriptions
    CREATE TABLE dbo.event_subscription(
    	event_publication_name nvarchar(50) NOT NULL
    	,event_subsciption_name nvarchar(50) NOT NULL
    	,conversation_handle uniqueidentifier NULL
    	,CONSTRAINT pk_event_subscription
    		PRIMARY KEY(event_publication_name, event_subsciption_name)
    	,CONSTRAINT fk_event_subscription_event_publication 
    		FOREIGN KEY (event_publication_name) 
    		REFERENCES dbo.event_publication(event_publication_name)
    	);
    GO
    
    --create proc to create subscription
    CREATE PROC dbo.event_subscribe
    	@EventPublicationName nvarchar(50)
    	,@EventSubscriptionDatabaseName sysname
    	,@EventSubscriptionSchemaName sysname
    	,@EventSubscriptionName nvarchar(50)
    	,@EventSubscriptionProcedureName nvarchar(261)
    AS
    
    DECLARE
    	@Sql nvarchar(MAX)
    	,@ConversationHandle uniqueidentifier
    	,@EventSubscriptionTargetQueueName nvarchar(130)
    	,@EventSubscriptionServiceName nvarchar(130)
    	,@EventSubscriptionRouteName nvarchar(130)
    	,@EventPublicationServiceName nvarchar(130);
    
    SELECT 
    	@EventSubscriptionTargetQueueName = @EventSubscriptionSchemaName + N'.' + @EventSubscriptionName + N'_SubscriptionTargetQueue'
    	,@EventSubscriptionServiceName = @EventSubscriptionName + N'_SubscriptionService'
    	,@EventSubscriptionRouteName = @EventSubscriptionName + N'_SubscriptionRoute'
    	,@EventPublicationServiceName = @EventPublicationName + N'_PublicationService';
    
    BEGIN TRY
    	--create Service Broker queue for this subscriber
    	SET @Sql = N'USE ' + @EventSubscriptionDatabaseName
    		+ N';CREATE QUEUE ' + @EventSubscriptionTargetQueueName
    		+ N' WITH ACTIVATION ( PROCEDURE_NAME = ' + @EventSubscriptionProcedureName + N', MAX_QUEUE_READERS = 1, EXECUTE AS OWNER);';
    	EXEC sp_executesql @Sql;
    
    	--create Service Broker service for this subscriber
    	SET @Sql = N'USE ' + @EventSubscriptionDatabaseName
    	    + N';CREATE SERVICE ' + @EventSubscriptionServiceName
    		+ N' ON QUEUE ' + @EventSubscriptionTargetQueueName
    		+ N' (EventContract);';
    	EXEC sp_executesql @Sql;
    
    	--create Service Broker route for this subscriber
    	SET @Sql = N'CREATE ROUTE ' + @EventSubscriptionRouteName
    		+ N' WITH
    			SERVICE_NAME = N''' + @EventSubscriptionServiceName
    			+ N''', ADDRESS = ''LOCAL'';';
    	EXEC sp_executesql @Sql;
    
    	--begin conversation for this subscriber
    	SET @Sql = N'BEGIN DIALOG @ConversationHandle
    		FROM SERVICE ' + @EventPublicationServiceName
    		+ N' TO SERVICE ''' + @EventSubscriptionServiceName + N''''
    		+ N' ON CONTRACT EventContract
    		WITH ENCRYPTION = OFF;';
    	EXEC sp_executesql 
    		@Sql
    		,N'@ConversationHandle uniqueidentifier OUT'
    		,@ConversationHandle = @ConversationHandle OUT;
    
    	--insert subscriber information
    	INSERT INTO dbo.event_subscription (event_publication_name, event_subsciption_name, conversation_handle) 
    		VALUES (@EventPublicationName, @EventSubscriptionName, @ConversationHandle);
    
    END TRY
    BEGIN CATCH
    	DECLARE
    		@ErrorNumber int
    		,@ErrorMessage nvarchar(2048)
    		,@ErrorSeverity int
    		,@ErrorState int
    		,@ErrorLine int;
    
    	SELECT
    		@ErrorNumber =ERROR_NUMBER()
    		,@ErrorMessage =ERROR_MESSAGE()
    		,@ErrorSeverity = ERROR_SEVERITY()
    		,@ErrorState =ERROR_STATE()
    		,@ErrorLine =ERROR_LINE();
    
    	RAISERROR('Error %d caught at line %d: %s'
    		,@ErrorSeverity
    		,@ErrorState
    		,@ErrorNumber
    		,@ErrorLine
    		,@ErrorMessage);
    END CATCH;
    
    RETURN @@ERROR;
    GO
    
    --create proc to remove subscription
    CREATE PROC dbo.event_unsubscribe
    	@EventPublicationName nvarchar(50)
    	,@EventSubscriptionDatabaseName sysname
    	,@EventSubscriptionSchemaName sysname
    	,@EventSubscriptionName nvarchar(50)
    AS
    
    DECLARE
    	@Sql nvarchar(MAX)
    	,@ConversationHandle uniqueidentifier
    	,@EventSubscriptionTargetQueueName nvarchar(130)
    	,@EventSubscriptionServiceName nvarchar(130)
    	,@EventSubscriptionRouteName nvarchar(130)
    	,@EventPublicationServiceName nvarchar(130);
    
    SELECT 
    	@EventSubscriptionTargetQueueName = @EventSubscriptionSchemaName + N'.' + @EventSubscriptionName + N'_SubscriptionTargetQueue'
    	,@EventSubscriptionServiceName = @EventSubscriptionName + N'_SubscriptionService'
    	,@EventSubscriptionRouteName = @EventSubscriptionName + N'_SubscriptionRoute'
    	,@EventPublicationServiceName = @EventPublicationName + N'_PublicationService';
    
    BEGIN TRY
    	--drop subscription information from publisher
    	DELETE FROM TestDB.dbo.event_subscription
    	WHERE 
    		event_publication_name = @EventPublicationName
    		AND event_subsciption_name = @EventSubscriptionName;
    
    	--drop Service Broker subscriber route
    	SET @Sql = N'DROP ROUTE ' + @EventSubscriptionRouteName + N';';
    	EXEC sp_executesql @Sql;
    
    	--drop Service Broker subscriber service
    	SET @Sql = N'USE ' + @EventSubscriptionDatabaseName
    		+ N';DROP SERVICE ' + @EventSubscriptionServiceName + N';';
    	EXEC sp_executesql @Sql;
    
    	--drop Service Broker subscriber queue
    	SET @Sql = N'USE ' + @EventSubscriptionDatabaseName
    		+ N';DROP QUEUE ' + @EventSubscriptionTargetQueueName + N';';
    	EXEC sp_executesql @Sql;
    
    END TRY
    BEGIN CATCH
    	DECLARE
    		@ErrorNumber int
    		,@ErrorMessage nvarchar(2048)
    		,@ErrorSeverity int
    		,@ErrorState int
    		,@ErrorLine int;
    
    	SELECT
    		@ErrorNumber =ERROR_NUMBER()
    		,@ErrorMessage =ERROR_MESSAGE()
    		,@ErrorSeverity = ERROR_SEVERITY()
    		,@ErrorState =ERROR_STATE()
    		,@ErrorLine =ERROR_LINE();
    
    	RAISERROR('Error %d caught at line %d: %s'
    		,@ErrorSeverity
    		,@ErrorState
    		,@ErrorNumber
    		,@ErrorLine
    		,@ErrorMessage);
    END CATCH;
    
    RETURN @@ERROR;
    GO
    
    --create proc to send message to each subscriber
    CREATE PROC dbo.multicast_message_to_subscribers
    	@EventPublicationName nvarchar(50)
    	,@EventPublicationMessage xml
    AS
    DECLARE 
    	@ConversationHandle uniqueidentifier
    	,@EventSubscriptionName nvarchar(50)
    	,@Sql nvarchar(MAX);
    
    DECLARE Subscriptions CURSOR LOCAL FAST_FORWARD FOR
    	SELECT
    		conversation_handle
    		,event_subsciption_name
    	FROM dbo.event_subscription
    	WHERE event_publication_name = @EventPublicationName;
    OPEN Subscriptions;
    
    WHILE 1 = 1
    BEGIN
    
    	FETCH NEXT FROM Subscriptions 
    		INTO @ConversationHandle, @EventSubscriptionName;
    
    	IF @@FETCH_STATUS = -1 BREAK;
    
    	SEND ON CONVERSATION @ConversationHandle
    		MESSAGE TYPE EventMessageType
    		(@EventPublicationMessage);
    
    END;
    
    CLOSE Subscriptions;
    DEALLOCATE Subscriptions;
    --COMMIT;
    GO
    
    --user table
    CREATE TABLE dbo.table1(
    	col1 int NOT NULL IDENTITY
    		CONSTRAINT pk_table1 PRIMARY KEY
    	,col2 int
    	);
    GO
    
    --trigger to generate insert, update and delete events
    CREATE TRIGGER tr_table1
    	ON dbo.table1
    	FOR INSERT, UPDATE, DELETE
    AS
    DECLARE @EventPublicationMessage xml;
    
    IF EXISTS(SELECT * FROM inserted)
    BEGIN
    	IF EXISTS(SELECT * FROM deleted)
    	BEGIN
    		SET @EventPublicationMessage = '<event>update</event>';
    	END
    	ELSE
    	BEGIN
    		SET @EventPublicationMessage = '<event>insert</event>';
    	END;
    END
    ELSE
    BEGIN
    	IF EXISTS(SELECT * FROM deleted)
    	BEGIN
    		SET @EventPublicationMessage = '<event>delete</event>';
    	END
    END;
    
    IF @EventPublicationMessage IS NOT NULL
    BEGIN
    	EXEC dbo.multicast_message_to_subscribers
    		@EventPublicationName = N'TestDB_table1'
    		, @EventPublicationMessage = @EventPublicationMessage;
    END
    GO
    
    --create SB queue and service for publication
    CREATE QUEUE dbo.TestDB_table1_PublicationInitiatorQueue;
    CREATE SERVICE TestDB_table1_PublicationService 
    	ON QUEUE dbo.TestDB_table1_PublicationInitiatorQueue (EventContract);
    --insert row for publication
    INSERT INTO dbo.event_publication VALUES(N'TestDB_table1');
    GO
    
    USE DemoDB;
    --create SB message type and contract for subscriptions
    CREATE MESSAGE TYPE EventMessageType VALIDATION = WELL_FORMED_XML;
    CREATE CONTRACT EventContract(
    	EventMessageType SENT BY INITIATOR
    	);
    GO
    
    CREATE TABLE dbo.table1(
    	event_name char(7) NOT NULL
    	,insert_timestamp datetime NOT NULL
    		CONSTRAINT df_table1_insert_timestamp DEFAULT GETDATE()
    	);
    CREATE TABLE dbo.table2(
    	event_name char(7) NOT NULL
    	,insert_timestamp datetime NOT NULL
    		CONSTRAINT df_table2_insert_timestamp DEFAULT GETDATE()
    	);
    GO
    
    CREATE PROC dbo.insert_table1
    AS
    DECLARE 
    	@EventMessage xml
    	,@EventName char(7);
    
    SET XACT_ABORT ON;
    
    WHILE 1 = 1
    BEGIN
    
    	BEGIN TRAN;
    
    	WAITFOR(
    		RECEIVE TOP (1)
    			@EventMessage = CAST(message_body AS xml)
    		FROM dbo.DemoDB_table1_SubscriptionTargetQueue
    		), TIMEOUT 1000;
    	IF @@ROWCOUNT = 0
    	BEGIN
    		COMMIT;
    		BREAK;
    	END;
    
    	SET @EventName = @EventMessage.value('/event[1]', 'char(7)');
    
    	INSERT INTO dbo.table1(event_name)
    		VALUES (@EventName);
    
    	COMMIT;
    
    END;
    
    RETURN @@ERROR;
    GO
    
    CREATE PROC dbo.insert_table2
    AS
    DECLARE 
    	@EventMessage xml
    	,@EventName char(7);
    
    SET XACT_ABORT ON;
    
    WHILE 1 = 1
    BEGIN
    
    	BEGIN TRAN;
    
    	WAITFOR(
    		RECEIVE TOP (1)
    			@EventMessage = CAST(message_body AS xml)
    		FROM dbo.DemoDB_table2_SubscriptionTargetQueue
    		), TIMEOUT 1000;
    	IF @@ROWCOUNT = 0
    	BEGIN
    		COMMIT;
    		BREAK;
    	END;
    
    	SET @EventName = @EventMessage.value('/event[1]', 'char(7)');
    
    	INSERT INTO dbo.table2(event_name) 
    		VALUES (@EventName);
    
    	COMMIT;
    
    END;
    
    RETURN @@ERROR;
    GO
    
    USE TestDB;
    --create event subscription 1
    EXEC TestDB.dbo.event_subscribe
    	@EventPublicationName = N'TestDB_table1'
    	,@EventSubscriptionDatabaseName = N'DemoDB'
    	,@EventSubscriptionSchemaName = N'dbo'
    	,@EventSubscriptionName = N'DemoDB_table1'
    	,@EventSubscriptionProcedureName = N'dbo.insert_table1';
    
    --create event subscription 2
    EXEC TestDB.dbo.event_subscribe
    	@EventPublicationName = N'TestDB_table1'
    	,@EventSubscriptionDatabaseName = N'DemoDB'
    	,@EventSubscriptionSchemaName = N'dbo'
    	,@EventSubscriptionName = N'DemoDB_table2'
    	,@EventSubscriptionProcedureName = N'dbo.insert_table2';
    
    --make some changes
    INSERT INTO dbo.table1 VALUES(1);
    INSERT INTO dbo.table1 VALUES(2);
    UPDATE dbo.table1 SET col2 = 2;
    DELETE FROM dbo.table1;
    
    --changes should be in subscriber tables after a brief delay
    WAITFOR DELAY '00:00:01';
    SELECT * FROM DemoDB.dbo.table1;
    SELECT * FROM DemoDB.dbo.table2;
    
    --remove event subscriptions
    EXEC TestDB.dbo.event_unsubscribe
    	@EventPublicationName = N'TestDB_table1'
    	,@EventSubscriptionDatabaseName = N'DemoDB'
    	,@EventSubscriptionSchemaName = N'dbo'
    	,@EventSubscriptionName = N'DemoDB_table1';
    
    EXEC TestDB.dbo.event_unsubscribe
    	@EventPublicationName = N'TestDB_table1'
    	,@EventSubscriptionDatabaseName = N'DemoDB'
    	,@EventSubscriptionSchemaName = N'dbo'
    	,@EventSubscriptionName = N'DemoDB_table2';

     

     

     

     

     

     


    Dan Guzman, SQL Server MVP, http://www.dbdelta.com


    • Edited by Dan GuzmanMVP Sunday, January 20, 2013 3:25 PM corrected description
    • Marked As Answer by ksd1234 Sunday, January 20, 2013 4:21 PM
    •  
  • Sunday, January 20, 2013 4:20 PM
     
     

    Hi Dan,

    Thank you so much for your valuable inputs on this task. I really appreciate your help.

    I will walk-through the above mentioned steps and start working on this task and will  let you know if any help needed.

    Thank you once again.

    Thanks,

    ksd123

  • Sunday, February 17, 2013 10:56 AM
     
     

    Hi Dan,

    There are some minor changes and I was asked to achieve this task in more generic way. So instead of tables subscribing for an event, applications would subscribe for an event.

    Process:Trigger->Throw(SP)->Distribute(SP)->Execute(SP)->Acknolwdge(SP)

    event_subscribe (application, event, Procedure): application to subscribe to an event or to change the target of an existing subscription.

    event_unsubscribe (application, event) : application to unsubscribe from an event.

    Throw: This SP will be invoked by the application that is throwing the event.

    Distribute: This SP takes the thrown event and arguments merges the arguments and then starts a conversation with each subscriber to the event.

    Execute: This SP will be invoked by service broker when there are event(s) in the queue that will then invoke the designated subscriber procedure.After completion of the called procedure it will send back an acknowledgement

    Acknowledge: This SP invoked by service broker when there are responses to events in the queue from the designated subscriber procedure.After completion of the called procedure the event will be closed

    event  (table):

    Event_name                                                  Defalutxml.

    created /updated/deleted                         values of all parameters

    event_ subscriptions(table): This table contains each event that an application wants to be notified about along with the name of the target service (SP) that is to be invoked when the event occurs.

     application                   Event_name                          Target_service(SP)

    application_1             created /updated/deleted        Target_service_SP_1

    Here is the  detailed process flow:

    1)Trigger calls Throw(sp) and  starts conversation with Distribute(SP), passing it the event name and provided XML and Throw terminates.

    2)Service Broker starts the Distribute procedure in response to an conversation being started with it.
    Distribute(SP) merges the provided parameters with the default parameters for the event to ensure all parameters are populated.

    For each application that has subscribed to the event, Distribute begins a conversation with Execute Procedure giving it the name of the procedure specified in the subscription of that application to execute  and the merged parameters as XML.
    Distribute(SP) completes the conversation with Throw and terminates.

    3)Service broker starts the Execute Procedure in response to a conversation being started with it.

    Execute Procedure invokes the specified procedure passing it the provided XML parameters.
    When the procedure returns, execute completes the conversation with Distribute and terminates.

    I am little bit confused and need example scripts for the Throw,Distribute,Execute,Acknowledge procedures and Trigger (for updated I need old and new values) .

    Thanks in Advance

  • Monday, February 18, 2013 7:34 PM
     
     Answered Has Code

    I am little bit confused and need example scripts for the Throw,Distribute,Execute,Acknowledge procedures and Trigger (for updated I need old and new values) .

    I took a stab at some scripts based on your requirements as I understand them.  I assumed you wanted asynchronous processing throughout so I added table pending_executor to keep track of the executor requests.  The conversation from the distributor to the publisher is ended once all are complete.  You'll need to implement the defaultxml merge (in the Publish proc) as well as the execute stored procedure.

    I recommend you add a logging table to facilitate troubleshooting, monitoring and auditing.

    CREATE DATABASE TestDB;
    ALTER AUTHORIZATION ON DATABASE::TestDB TO sa;
    ALTER DATABASE TestDB SET ENABLE_BROKER;
    ALTER DATABASE TestDB SET TRUSTWORTHY ON; --can use certificate instead of TRUSTWORTHY
    GO
    
    CREATE DATABASE DemoDB;
    ALTER AUTHORIZATION ON DATABASE::DemoDB TO sa;
    ALTER DATABASE DemoDB SET ENABLE_BROKER;
    ALTER DATABASE DemoDB SET TRUSTWORTHY ON; --can use certificate instead of TRUSTWORTHY
    GO
    
    USE TestDB;
    GO
    CREATE TABLE dbo.event(
    	Event_name nvarchar(50) NOT NULL 
    		CONSTRAINT PK_Event PRIMARY KEY
    	,Defaultxml xml NOT NULL
    	);
    INSERT INTO dbo.event VALUES
    	('created', '<default />');
     
    CREATE TABLE dbo.event_subscription(
    	application sysname NOT NULL
    	,Event_name nvarchar(50) NOT NULL
    		CONSTRAINT FK_event_subscription_event 
    		FOREIGN KEY REFERENCES dbo.event(Event_name)
    	,Target_service_sp_name nvarchar(392) NOT NULL
    	,CONSTRAINT PK_event_subscription PRIMARY KEY CLUSTERED
    		(application, Event_name)
    	);
    
    CREATE TABLE dbo.pending_executor(
    	publisher_conversation_handle uniqueidentifier NOT NULL
    	,executor_conversation_handle uniqueidentifier NOT NULL
    	,CONSTRAINT PK_pending_executor PRIMARY KEY CLUSTERED
    		(publisher_conversation_handle, executor_conversation_handle)
    	);
    GO
    
    CREATE PROC dbo.AcknowledgeDistributorMessages
    AS
    DECLARE
    	@conversation_handle uniqueidentifier
    	,@message_type_name sysname
    	,@message_body varbinary(MAX);
    
    WHILE 1 = 1
    BEGIN
    	WAITFOR (
    		RECEIVE TOP(1)
    			 @conversation_handle = conversation_handle
    			,@message_type_name = message_type_name
    			,@message_body = message_body
    		FROM dbo.PublisherQueue
    		), timeout 1000;
    	IF @@ROWCOUNT = 0 BREAK;
    
    	IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
    	BEGIN
    		END CONVERSATION @conversation_handle;
    	END
    	ELSE
    	BEGIN
    		END CONVERSATION @conversation_handle WITH ERROR = 1 DESCRIPTION = 'Unexpected message type';
    	END;
    
    END;
    
    RETURN @@ERROR;
    GO
    
    CREATE PROC dbo.AcknowledgeExecutorMessages
    AS
    DECLARE
    	@conversation_handle uniqueidentifier
    	,@publisher_conversation_handle uniqueidentifier
    	,@message_type_name sysname
    	,@message_body xml;
    
    WHILE 1 = 1
    BEGIN
    	WAITFOR (
    		RECEIVE TOP(1)
    			@conversation_handle = conversation_handle
    			,@message_type_name = message_type_name
    			,@message_body = CAST(message_body AS xml)
    		FROM dbo.DistributorInitiatorQueue
    		), timeout 1000;
    	IF @@ROWCOUNT = 0 BREAK;
    
    	IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
    	BEGIN
    
    		END CONVERSATION @conversation_handle;
    
    		--get related publisher conversation
    		SELECT @publisher_conversation_handle = publisher_conversation_handle
    		FROM dbo.pending_executor
    		WHERE executor_conversation_handle = @conversation_handle;
    
    	END
    	ELSE
    	BEGIN
    		END CONVERSATION @conversation_handle WITH ERROR = 1 DESCRIPTION = 'Unexpected message type';
    	END;
    
    	--remove completed executor
    	DELETE FROM dbo.pending_executor
    	WHERE
    		publisher_conversation_handle = @publisher_conversation_handle
    		AND executor_conversation_handle = @conversation_handle;
    
    	IF NOT  EXISTS(SELECT *
    		FROM dbo.pending_executor
    		WHERE publisher_conversation_handle = @publisher_conversation_handle
    		)
    	BEGIN
    		--end publisher conversation after all executors have completed
    		END CONVERSATION @publisher_conversation_handle;
    	END;
    
    END;
    
    RETURN @@ERROR;
    GO
    
    --create proc to send message to distributor
    CREATE PROC dbo.throw_to_distributor
    	@Event_name nvarchar(50)
    	,@event_xml xml
    AS
    
    DECLARE @conversation_handle uniqueidentifier;
    
    BEGIN DIALOG CONVERSATION @conversation_handle
    	FROM SERVICE PublisherService
    	TO SERVICE 'DistributorTargetService'
    	ON CONTRACT EventContract
    	WITH ENCRYPTION = OFF;
    
    	--add EventName to message
    	SET @event_xml.modify('insert <EventName>{ sql:variable("@Event_name")  }</EventName> as last into (/)');
    
    	SEND ON CONVERSATION @conversation_handle
    		MESSAGE TYPE EventMessageType
    		(@event_xml);
    
    RETURN @@ERROR;
    GO
    
    --user table
    CREATE TABLE dbo.table1(
    	col1 int NOT NULL IDENTITY
    		CONSTRAINT pk_table1 PRIMARY KEY
    	,col2 int
    	);
    GO
    
    --trigger to generate insert, update and delete events
    CREATE TRIGGER tr_table1
    	ON dbo.table1
    	FOR INSERT, UPDATE, DELETE
    AS
    DECLARE
    	@Event_name nvarchar(50)
    	,@event_xml xml;
    
    IF EXISTS(SELECT * FROM inserted)
    BEGIN
    	IF EXISTS(SELECT * FROM deleted)
    	BEGIN
    		SET @Event_name = 'updated';
    		SET @event_xml = (
    			SELECT *
    			FROM inserted
    			FOR XML PATH('')
    			)
    	END
    	ELSE
    	BEGIN
    		SET @Event_name = 'created';
    		SET @event_xml = (
    			SELECT *
    			FROM inserted
    			FOR XML PATH('')
    			)
    	END;
    END
    ELSE
    BEGIN
    	IF EXISTS(SELECT * FROM deleted)
    	BEGIN
    		SET @Event_name = 'deleted';
    		SET @event_xml = (
    			SELECT *
    			FROM deleted
    			FOR XML PATH('')
    			)
    	END
    END;
    
    IF @Event_name IS NOT NULL
    BEGIN
    	
    	EXEC dbo.throw_to_distributor
    		@Event_name = @Event_name
    		, @event_xml = @event_xml;
    
    END
    GO
    
    CREATE PROC dbo.Distribute
    AS
    DECLARE 
    	@conversation_handle uniqueidentifier
    	,@executor_conversation_handle uniqueidentifier
    	,@message_body xml
    	,@executor_message_body xml
    	,@message_type_name sysname
    	,@Event_name nvarchar(50)
    	,@Target_service_sp_name nvarchar(392)
    	,@Defaultxml xml;
    
    WHILE 1 = 1
    BEGIN
    
    	WAITFOR (
    		RECEIVE TOP(1)
    			@conversation_handle = conversation_handle
    			,@message_body = CAST(message_body AS xml)
    			,@message_type_name = message_type_name
    		FROM dbo.DistributorTargetQueue
    		), timeout 1000;
    	IF @@ROWCOUNT = 0 BREAK;
    
    	IF @message_type_name = 'EventMessageType'
    	BEGIN
    	
    		--extract event name from message
    		SELECT @Event_name = @message_body.query('/EventName').value('.', 'nvarchar(50)');
    
    		SELECT @Defaultxml
    		FROM dbo.event
    		WHERE Event_name = @Event_name;
    
    		---------------------------------------------------------
    		--TODO: merge @Defaultxml into @message_body and validate
    		---------------------------------------------------------
    
    		--invoke executor for each subscriber
    		DECLARE Subscriptions CURSOR LOCAL STATIC FOR
    			SELECT
    				Target_service_sp_name
    			FROM dbo.event_subscription
    			WHERE Event_name = @Event_name;
    
    		OPEN Subscriptions;
    
    		WHILE 1 = 1
    		BEGIN
    
    			FETCH NEXT FROM Subscriptions 
    				INTO @Target_service_sp_name;
    
    			IF @@FETCH_STATUS = -1 BREAK;
    
    			BEGIN DIALOG CONVERSATION @executor_conversation_handle
    				FROM SERVICE DistributorInitiatorService
    				TO SERVICE 'ExecutorService'
    				ON CONTRACT EventContract
    				WITH ENCRYPTION = OFF;
    
    			--add ExecutorProcName and PublisherConversationHandle to message
    			SET @executor_message_body = @message_body;
    			SET @executor_message_body.modify('insert <ExecutorProcName>{ sql:variable("@Target_service_sp_name")  }</ExecutorProcName> as last into (/)');
    			SET @executor_message_body.modify('insert <PublisherConversationHandle>{ sql:variable("@conversation_handle")  }</PublisherConversationHandle> as last into (/)');
    
    			SEND ON CONVERSATION @executor_conversation_handle
    				MESSAGE TYPE EventMessageType
    				(@executor_message_body);
    
    			INSERT INTO dbo.pending_executor(
    				publisher_conversation_handle
    				,executor_conversation_handle
    				)
    			VALUES(
    				@conversation_handle
    				,@executor_conversation_handle
    				);
    				
    		END; --next subscription
    
    		CLOSE Subscriptions;
    		DEALLOCATE Subscriptions;
    
    	END;
    	ELSE
    	BEGIN
    		END CONVERSATION @conversation_handle WITH ERROR = 1 DESCRIPTION = 'Unexpected message type';
    	END;
    
    END;
    
    RETURN @@ERROR;
    GO
    
    --create SB message type and contract for published events
    CREATE MESSAGE TYPE EventMessageType VALIDATION = WELL_FORMED_XML;
    CREATE CONTRACT EventContract(
    	EventMessageType SENT BY INITIATOR
    	);
    GO
    
    --create SB queue and service for publisher
    CREATE QUEUE dbo.PublisherQueue
    	WITH ACTIVATION ( PROCEDURE_NAME = dbo.AcknowledgeDistributorMessages, MAX_QUEUE_READERS = 1, EXECUTE AS OWNER);
    CREATE SERVICE PublisherService 
    	ON QUEUE dbo.PublisherQueue (EventContract);
    GO
    
    --create SB queue and service for distributor target
    CREATE QUEUE dbo.DistributorTargetQueue
    	WITH ACTIVATION ( PROCEDURE_NAME = dbo.Distribute, MAX_QUEUE_READERS = 1, EXECUTE AS OWNER);
    CREATE SERVICE DistributorTargetService 
    	ON QUEUE dbo.DistributorTargetQueue (EventContract);
    GO
    
    --create SB queue and service for distributor initiator
    CREATE QUEUE dbo.DistributorInitiatorQueue
    	WITH ACTIVATION ( PROCEDURE_NAME = dbo.AcknowledgeExecutorMessages, MAX_QUEUE_READERS = 1, EXECUTE AS OWNER);
    CREATE SERVICE DistributorInitiatorService 
    	ON QUEUE dbo.DistributorInitiatorQueue (EventContract);
    GO
    
    --create proc to create subscription
    CREATE PROC dbo.event_subscribe
    	@Event_name nvarchar(50)
    	,@application sysname
    	,@Target_service_sp_name nvarchar(392)
    AS
    
    INSERT INTO dbo.event_subscription (Event_name, application, Target_service_sp_name) 
    	VALUES (@Event_name, @application, @Target_service_sp_name);
    
    RETURN @@ERROR;
    GO
    
    --create proc to remove subscription
    CREATE PROC dbo.event_unsubscribe
    	@Event_name nvarchar(50)
    	,@application sysname
    AS
    
    DELETE FROM dbo.event_subscription
    WHERE
    	Event_name = @Event_name
    	AND application = @application;
    
    RETURN @@ERROR;
    GO
    
    EXEC dbo.event_subscribe
    	@Event_name = 'created'
    	,@application = 'app1'
    	,@Target_service_sp_name = 'Target_service_SP_1'
    GO
    
    USE DemoDB;
    GO
    
    --create SB message type and contract for events
    CREATE MESSAGE TYPE EventMessageType VALIDATION = WELL_FORMED_XML;
    CREATE CONTRACT EventContract(
    	EventMessageType SENT BY INITIATOR
    	);
    GO
    
    CREATE PROC dbo.Executor
    AS
    DECLARE
    	@conversation_handle uniqueidentifier
    	,@service_name sysname
    	,@message_type_name sysname
    	,@message_body xml
    	,@Target_service_sp_name nvarchar(392);
    
    WHILE 1 = 1
    BEGIN
    	WAITFOR (
    		RECEIVE TOP(1)
    			@conversation_handle = conversation_handle
    			,@service_name = service_name
    			,@message_type_name = message_type_name
    			,@message_body = CAST(message_body AS xml)
    		FROM dbo.ExecutorQueue
    		), timeout 1000;
    	IF @@ROWCOUNT = 0 BREAK;
    
    	SET @Target_service_sp_name = @message_body.query('/ExecutorProcName').value('.', 'nvarchar(261)');
    
    	BEGIN TRY
    		EXEC @Target_service_sp_name @message_body;
    		END CONVERSATION @conversation_handle;
    	END TRY
    	BEGIN CATCH
    
    		DECLARE
    			@ErrorNumber int
    			,@ErrorMessage nvarchar(2048)
    			,@ErrorSeverity int
    			,@ErrorState int
    			,@ErrorLine int;
    
    		SELECT
    			@ErrorNumber =ERROR_NUMBER()
    			,@ErrorMessage =ERROR_MESSAGE()
    			,@ErrorSeverity = ERROR_SEVERITY()
    			,@ErrorState =ERROR_STATE()
    			,@ErrorLine =ERROR_LINE();
    
    		SET @ErrorMessage = 'Error '
    			+ CAST(@ErrorNumber AS varchar(10))
    			+ ' caught at line '
    			+ CAST(@ErrorLine AS varchar(10))
    			+ ': ' + @ErrorMessage
    
    		END CONVERSATION @conversation_handle WITH ERROR = 1 DESCRIPTION = @ErrorMessage;
    
    	END CATCH;
    
    END;
    
    RETURN @@ERROR;
    GO
    
    --create SB queue and service for publisher
    CREATE QUEUE dbo.ExecutorQueue
    	WITH ACTIVATION ( PROCEDURE_NAME = dbo.Executor, MAX_QUEUE_READERS = 1, EXECUTE AS OWNER);
    CREATE SERVICE ExecutorService 
    	ON QUEUE dbo.ExecutorQueue (EventContract);
    GO
    
    CREATE PROC dbo.Target_service_SP_1
    	@event_message xml
    AS
    
    --TODO: implement @event_message processing
    
    RETURN @@ERROR;
    GO


    Dan Guzman, SQL Server MVP, http://www.dbdelta.com

    • Marked As Answer by ksd1234 Monday, February 25, 2013 6:02 PM
    •  
  • Tuesday, February 19, 2013 7:43 PM
     
     

    Hi Dan,

    Thank you for your time.Could you please give me more insight on below statements from Syntax & declaration perspective (.modify,sql:variable,last into ,.query)

    1)throw_to_distributor

    SET @event_xml.modify('insert <EventName>{ sql:variable("@Event_name")  }</EventName> as last into (/)');

    2)Executor

    SET @Target_service_sp_name = @message_body.query('/ExecutorProcName').value('.', 'nvarchar(261)');

    3)Distribute

             IF @message_type_name = 'EventMessageType'
                         BEGIN
     
                          --extract event name from message
                         SELECT @Event_name = @message_body.query('/EventName').value('.', 'nvarchar(50)');

            

  • Wednesday, February 20, 2013 3:24 AM
     
     

    Thank you for your time.Could you please give me more insight on below statements from Syntax & declaration perspective (.modify,sql:variable,last into ,.query)

    For syntax and examples of the XML modify method, see http://msdn.microsoft.com/en-us/library/ms175466.aspx.

    #1 adds the specified event name parameter to the message with the element name EventName.  The EventName is needed by the distributer to determine the subscribers and may be used by the target proc too.

    #2 adds the proc name from the event_subscriber table to the message with element name ExecutorProcName.  This is needed by the Executor.

    #3 extracts the EventName from the message into a local variable, which is used to determine the event subscribers.


    Dan Guzman, SQL Server MVP, http://www.dbdelta.com

  • Monday, February 25, 2013 6:06 PM
     
     

    Hi Dan,

    I sincerely appreciate the time you spent on these scripts.

    Again, thank you so much for your help.