How to run Insert/Update statement to Linked server via SQL Server Broker

Answered How to run Insert/Update statement to Linked server via SQL Server Broker

  • jueves, 26 de julio de 2012 12:10
     
     

    Hi
    I currently have a trigger that  runs a INSERT and UPDATE statement to a Linked Server. I would like to run the trigger (or just the UPDATE/INSERT statement) via SQL Server broker in order to improve performance.

    Can someone give me a complete script to do this?

    Current Trigger:
    CREATE TRIGGER MyTrigger
    ON SampleTable
    FOR UPDATE
    AS 
    BEGIN
    UPDATE [192.xxx.xxx.xx].Northwind.dbo.Products
    SET ProductName = Inserted.ProductName
    FROM Inserted, Products

    INSERT INTO [192.xxx.xxx.xx].Northwind.dbo.Products (ID, ProductName)
    VALUES ('a', 'b')

    END

Todas las respuestas

  • jueves, 26 de julio de 2012 12:48
     
      Tiene código

    Hi
    I currently have a trigger that  runs a INSERT and UPDATE statement to a Linked Server. I would like to run the trigger (or just the UPDATE/INSERT statement) via SQL Server broker in order to improve performance.

    Can someone give me a complete script to do this?

    Current Trigger:
    CREATE TRIGGER MyTrigger
    ON SampleTable
    FOR UPDATE
    AS 
    BEGIN
    UPDATE [192.xxx.xxx.xx].Northwind.dbo.Products
    SET ProductName = Inserted.ProductName
    FROM Inserted, Products

    INSERT INTO [192.xxx.xxx.xx].Northwind.dbo.Products (ID, ProductName)
    VALUES ('a', 'b')

    END

    Before you go down the Service Broker path, I want to point out that the current trigger has some serious errors.  Every row in the remote table will be updated, which may be the root cause of current performance problems as well as data issues.  Also, the trigger is declared only for UPDATE statements so it will not fire after INSERTs.  Below is an example that uses separate triggers:

    CREATE TRIGGER TR_SampleTable_Update
    ON dbo.SampleTable
    FOR UPDATE
    AS 
    SET NOCOUNT ON;
    BEGIN
    	UPDATE remote_table
    	SET ProductName = Inserted.ProductName
    	FROM inserted
    	JOIN [192.xxx.xxx.xx].Northwind.dbo.Products AS remote_table ON
    		inserted.ID = remote_table.ID;
    END
    GO
    
    CREATE TRIGGER TR_SampleTable_Insert
    ON dbo.SampleTable
    FOR INSERT
    AS 
    SET NOCOUNT ON;
    BEGIN
    	INSERT INTO [192.xxx.xxx.xx].Northwind.dbo.Products(ID, ProductName)
    	SELECT ID, ProductName
    	FROM inserted;
    END
    GO
    

    Let me know if you still need a complete Service Broker sample script.  Is the remote table read-only?  You might also consider replication for your requirement.


    Dan Guzman, SQL Server MVP, http://weblogs.sqlteam.com/dang/

  • jueves, 26 de julio de 2012 13:15
     
     

    Thanks

    Yes, I did a bit of copy and paste and strip down to get the code siplified, so there migth be a couple of mistakes.

    But the issue still is: how do I run the 2 triggers via SQL Service Broker?

  • jueves, 26 de julio de 2012 13:26
     
     

    But the issue still is: how do I run the 2 triggers via SQL Service Broker?

    Do you need to apply the remote changes synchronously or asynchronously, so that the local INSERT/UPDATE statement doesn't need to wait for the remote changes to be applied?  This will affect the design.

    I'm off to my day job now so I won't be able to provide a sample script until later.  I'll need to know the answer to the synchronous/asynchronous question since the design and code will differ.


    Dan Guzman, SQL Server MVP, http://weblogs.sqlteam.com/dang/

  • jueves, 26 de julio de 2012 13:29
     
     

    Thanks for the attention Dan.
    It needs to be asynchronous for performance.  It currently works as normal triggers (synchronous) on table edit but it's slow.

  • domingo, 29 de julio de 2012 3:10
     
     Respondida Tiene código

    Below are example service broker scripts to replicate changes to an identical table in a remote database.  You'll need to customize the scripts with your actual server names and file paths.  Also, you'll need to make sure the endpoint ports (I used 12345 here) are allowed through the firewall.

    Run this script on the source server, copy the ServiceBrokerCertificate.cer and ServiceBrokerCertificate.key files to the destination server and run the second script on the destination server.

    USE master;
    CREATE MASTER KEY ENCRYPTION BY PASSWORD = 'example p@ssw0Rd';
    CREATE CERTIFICATE ServiceBrokerCertificate
    WITH 
        SUBJECT = 'Service Broker Cert',
        START_DATE = '01/01/2012',
        EXPIRY_DATE = '01/01/2030';
    BACKUP CERTIFICATE ServiceBrokerCertificate
    	TO FILE = 'C:\temp\ServiceBrokerCertificate.cer'
    	WITH PRIVATE KEY ( FILE = 'C:\temp\ServiceBrokerCertificate.key'
    		,ENCRYPTION BY PASSWORD = 'example p@ssw0Rd' );
    CREATE ENDPOINT ServiceBrokerEndPoint
    	STATE=STARTED
    	AS TCP (LISTENER_PORT = 12345)
    	FOR SERVICE_BROKER 
    	(
    		AUTHENTICATION = CERTIFICATE ServiceBrokerCertificate,
    		ENCRYPTION = SUPPORTED
    	);
    CREATE LOGIN ServiceBrokerCertificateLogin
    	FROM CERTIFICATE ServiceBrokerCertificate;
    GRANT CONNECT ON ENDPOINT::ServiceBrokerEndPoint To ServiceBrokerCertificateLogin;
    CREATE DATABASE SourceProductDatabase;
    ALTER AUTHORIZATION ON DATABASE::SourceProductDatabase TO sa;
    ALTER DATABASE SourceProductDatabase SET ENABLE_BROKER;
    GO
    
    USE SourceProductDatabase;
    CREATE MASTER KEY ENCRYPTION BY PASSWORD = 'example p@ssw0Rd';
    CREATE USER ServiceBrokerUser WITHOUT LOGIN;
    GRANT CONNECT TO ServiceBrokerUser;
    CREATE CERTIFICATE ServiceBrokerCertificate
    	FROM FILE = 'C:\temp\ServiceBrokerCertificate.cer'
    	WITH PRIVATE KEY ( FILE = 'C:\temp\ServiceBrokerCertificate.key'
    		,DECRYPTION BY PASSWORD = 'example p@ssw0Rd' );
    CREATE MESSAGE TYPE [//example.com/InsertProduct] VALIDATION = WELL_FORMED_XML;
    CREATE MESSAGE TYPE [//example.com/UpdateProduct] VALIDATION = WELL_FORMED_XML;
    CREATE MESSAGE TYPE [//example.com/DeleteProduct] VALIDATION = WELL_FORMED_XML;
    CREATE CONTRACT [//example.com/ProductReplicationContract](
    	[//example.com/InsertProduct] SENT BY INITIATOR
    	,[//example.com/UpdateProduct] SENT BY INITIATOR
    	,[//example.com/DeleteProduct] SENT BY INITIATOR
    	);
    CREATE TABLE dbo.ProductReplicationErrors(
    	ErrorTime datetime NOT NULL
    	,conversation_handle uniqueidentifier NOT NULL
    	,message_type_name nvarchar(256) NOT NULL
    	,message_body varbinary(MAX) NULL
    	);
    GO
    CREATE PROCEDURE dbo.usp_CleanupProductReplicationInitiatorQueue
    AS 
    SET NOCOUNT ON;
    DECLARE 
    	@conversation_handle uniqueidentifier = '00000000-0000-0000-0000-000000000000'
    	,@message_type_name nvarchar(256)
    	,@message_body varbinary(MAX)
    	,@description nvarchar(3000);
    
    WHILE @conversation_handle IS NOT NULL
    BEGIN
    	SET @conversation_handle = NULL;
    	WAITFOR (
    		RECEIVE TOP (1)
    			@conversation_handle = conversation_handle
    			,@message_type_name = message_type_name
    			,@message_body = message_body
    		FROM dbo.ProductReplicationInitiatorQueue)
    		,TIMEOUT 1000;
    	
    	IF @conversation_handle IS NOT NULL
    	BEGIN
    		IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
    		BEGIN
    			END CONVERSATION @conversation_handle;
    		END
    		ELSE
    		BEGIN
    			INSERT INTO dbo.ProductReplicationErrors(
    				ErrorTime
    				,conversation_handle
    				,message_type_name
    				,message_body)
    			VALUES(
    				GETDATE()
    				,@conversation_handle
    				,@message_type_name
    				,@message_body)
    			SET @description = 'Unexpected message type ' + @message_type_name + ' received';
    			END CONVERSATION @conversation_handle WITH ERROR = 1 DESCRIPTION = @description;
    		END;
    	END;
    END;
    GO
    
    CREATE QUEUE ProductReplicationInitiatorQueue
    	WITH ACTIVATION (
              PROCEDURE_NAME = dbo.usp_CleanupProductReplicationInitiatorQueue,
              MAX_QUEUE_READERS = 1,
              EXECUTE AS SELF);
    CREATE SERVICE [//source-server.example.com/ProductReplicationInitiatorService] 
    	ON QUEUE ProductReplicationInitiatorQueue ([//example.com/ProductReplicationContract]);
    GRANT SEND ON SERVICE::[//source-server.example.com/ProductReplicationInitiatorService] To ServiceBrokerUser;
    CREATE ROUTE ProductReplicationTargetRoute
    WITH
    	SERVICE_NAME = '//destination-server.example.com/ProductReplicationTargetService',
    	ADDRESS = 'TCP://destination-server:12345';
    CREATE REMOTE SERVICE BINDING ProductReplicationTargetBinding
    	TO SERVICE '//destination-server.example.com/ProductReplicationTargetService'
    	WITH USER = dbo;
    GO
    
    CREATE TABLE dbo.Product(
    	ID varchar(50) NOT NULL
    		CONSTRAINT PK_Product PRIMARY KEY
    	, ProductName varchar(50)
    	);
    GO
    
    CREATE TRIGGER TR_Product_Insert
    ON dbo.Product
    FOR INSERT
    AS
    DECLARE @ConversationHandle uniqueidentifier;
    DECLARE @InsertedRows xml;
    
    IF EXISTS(SELECT * FROM inserted)
    BEGIN
    	SELECT @InsertedRows = InsertedRows
    	FROM (
    		SELECT ID, ProductName
    		FROM inserted
    		FOR XML PATH, TYPE) AS InsertedRows(InsertedRows);
    
    	BEGIN Dialog @ConversationHandle
    		FROM SERVICE [//source-server.example.com/ProductReplicationInitiatorService] 
    		TO SERVICE '//destination-server.example.com/ProductReplicationTargetService'
    		ON CONTRACT [//example.com/ProductReplicationContract]
    		WITH ENCRYPTION = OFF;
    
    	SEND ON CONVERSATION @ConversationHandle
    		MESSAGE TYPE [//example.com/InsertProduct]
    		(@InsertedRows);
    END
    GO
    
    CREATE TRIGGER TR_Product_Update
    ON dbo.Product
    FOR UPDATE
    AS
    DECLARE @ConversationHandle uniqueidentifier;
    DECLARE @ChangedRows xml;
    IF EXISTS(SELECT * FROM inserted)
    BEGIN
    	SELECT @ChangedRows = ChangedRows
    	FROM (
    		SELECT ID, ProductName
    		FROM inserted
    		FOR XML PATH, TYPE) AS ChangedRows(ChangedRows);
    
    	BEGIN Dialog @ConversationHandle
    		FROM SERVICE [//source-server.example.com/ProductReplicationInitiatorService] 
    		TO SERVICE '//destination-server.example.com/ProductReplicationTargetService'
    		ON CONTRACT [//example.com/ProductReplicationContract]
    		WITH ENCRYPTION = OFF;
    
    	SEND ON CONVERSATION @ConversationHandle
    		MESSAGE TYPE [//example.com/UpdateProduct]
    		(@ChangedRows);
    END
    GO
    
    CREATE TRIGGER TR_Product_Delete
    ON dbo.Product
    FOR DELETE
    AS
    DECLARE @ConversationHandle uniqueidentifier;
    DECLARE @DeletedRows xml;
    IF EXISTS(SELECT * FROM deleted)
    BEGIN
    	SELECT @DeletedRows = DeletedRows
    	FROM (
    		SELECT ID, ProductName
    		FROM deleted
    		FOR XML PATH, TYPE) AS DeletedRows(DeletedRows);
    
    	BEGIN Dialog @ConversationHandle
    		FROM SERVICE [//source-server.example.com/ProductReplicationInitiatorService] 
    		TO SERVICE '//destination-server.example.com/ProductReplicationTargetService'
    		ON CONTRACT [//example.com/ProductReplicationContract]
    		WITH ENCRYPTION = OFF;
    
    	SEND ON CONVERSATION @ConversationHandle
    		MESSAGE TYPE [//example.com/DeleteProduct]
    		(@DeletedRows);
    END
    GO

    Run the script below on the destination server after copying the certificate files:

    USE master;
    CREATE MASTER KEY ENCRYPTION BY PASSWORD = 'example p@ssw0Rd';
    CREATE CERTIFICATE ServiceBrokerCertificate
    	FROM FILE = 'C:\temp\ServiceBrokerCertificate.cer'
    	WITH PRIVATE KEY ( FILE = 'C:\temp\ServiceBrokerCertificate.key'
    		,DECRYPTION BY PASSWORD = 'example p@ssw0Rd' );
    CREATE ENDPOINT ServiceBrokerEndPoint
    	STATE=STARTED
    	AS TCP (LISTENER_PORT = 12345)
    	FOR SERVICE_BROKER 
    	(
    		AUTHENTICATION = CERTIFICATE ServiceBrokerCertificate,
    		ENCRYPTION = SUPPORTED
    		);
    CREATE LOGIN ServiceBrokerCertificateLogin
    	FROM CERTIFICATE ServiceBrokerCertificate;
    GRANT CONNECT ON ENDPOINT::ServiceBrokerEndPoint To ServiceBrokerCertificateLogin;
    CREATE DATABASE DestinationProductDatabase;
    ALTER AUTHORIZATION ON DATABASE::DestinationProductDatabase TO sa;
    ALTER DATABASE DestinationProductDatabase SET ENABLE_BROKER;
    GO
    
    USE DestinationProductDatabase;
    CREATE MASTER KEY ENCRYPTION BY PASSWORD = 'example p@ssw0Rd';
    CREATE USER ServiceBrokerUser WITHOUT LOGIN;
    GRANT CONNECT TO ServiceBrokerUser;
    CREATE CERTIFICATE ServiceBrokerCertificate
    	FROM FILE = 'C:\temp\ServiceBrokerCertificate.cer'
    	WITH PRIVATE KEY ( FILE = 'C:\temp\ServiceBrokerCertificate.key'
    		,DECRYPTION BY PASSWORD = 'example p@ssw0Rd' );
    CREATE MESSAGE TYPE [//example.com/InsertProduct] VALIDATION = WELL_FORMED_XML;
    CREATE MESSAGE TYPE [//example.com/UpdateProduct] VALIDATION = WELL_FORMED_XML;
    CREATE MESSAGE TYPE [//example.com/DeleteProduct] VALIDATION = WELL_FORMED_XML;
    CREATE CONTRACT [//example.com/ProductReplicationContract](
    	[//example.com/InsertProduct] SENT BY INITIATOR
    	,[//example.com/UpdateProduct] SENT BY INITIATOR
    	,[//example.com/DeleteProduct] SENT BY INITIATOR
    	);
    CREATE TABLE dbo.ProductReplicationErrors(
    	ErrorTime datetime NOT NULL
    	,conversation_handle uniqueidentifier NOT NULL
    	,message_type_name nvarchar(256) NOT NULL
    	,message_body varbinary(MAX) NULL
    	);
    CREATE TABLE dbo.Product(
    	ID varchar(50) NOT NULL
    		CONSTRAINT PK_Product PRIMARY KEY
    	, ProductName varchar(50)
    	);
    GO
    
    CREATE PROCEDURE dbo.usp_ProductReplicationTargetService
    AS 
    SET NOCOUNT ON;
    SET XACT_ABORT ON;
    DECLARE 
    	@conversation_handle uniqueidentifier = '00000000-0000-0000-0000-000000000000'
    	,@message_type_name nvarchar(256)
    	,@message_body varbinary(MAX)
    	,@description nvarchar(3000)
    	,@xml xml;
    
    WHILE @conversation_handle IS NOT NULL
    BEGIN
    	SET @conversation_handle = NULL;
    
    	BEGIN TRAN;
    
    	WAITFOR (
    		RECEIVE TOP (1)
    			@conversation_handle = conversation_handle
    			,@message_type_name = message_type_name
    			,@message_body = message_body
    		FROM dbo.ProductReplicationTargetQueue)
    		,TIMEOUT 1000;
    	
    	IF @conversation_handle IS NOT NULL
    	BEGIN
    		IF @message_type_name = N'//example.com/InsertProduct'
    		BEGIN
    			SET @xml = CAST(@message_body AS xml);
    			INSERT INTO dbo.Product(ID, ProductName)
    				SELECT
    					inserts.row.query('./ID').value('.', 'varchar(50)') AS ID
    					,inserts.row.query('./ProductName').value('.', 'varchar(50)') AS ProductName
    				FROM @xml.nodes('/row') AS inserts(row);
    			END CONVERSATION @conversation_handle;
    		END
    		ELSE
    		BEGIN
    			IF @message_type_name = N'//example.com/UpdateProduct'
    			BEGIN
    				SET @xml = CAST(@message_body AS xml);
    				UPDATE p
    				SET ProductName = updates.ProductName
    				FROM dbo.Product AS p
    				JOIN (
    					SELECT
    						rows.row.query('./ID').value('.', 'varchar(50)') AS ID
    						,rows.row.query('./ProductName').value('.', 'varchar(50)') AS ProductName
    					FROM @xml.nodes('/row') AS rows(row)) AS updates ON
    						p.ID = updates.ID;
    				END CONVERSATION @conversation_handle;
    			END
    			ELSE
    			IF @message_type_name = N'//example.com/DeleteProduct'
    			BEGIN
    				SET @xml = CAST(@message_body AS xml);
    				DELETE dbo.Product
    				WHERE ID IN(
    					SELECT rows.row.query('./ID').value('.', 'varchar(50)')
    					FROM @xml.nodes('/row') AS rows(row)
    					);
    				END CONVERSATION @conversation_handle;
    			END
    			ELSE
    			BEGIN
    				INSERT INTO dbo.ProductReplicationErrors(
    					ErrorTime
    					,conversation_handle
    					,message_type_name
    					,message_body)
    				VALUES(
    					GETDATE()
    					,@conversation_handle
    					,@message_type_name
    					,@message_body)
    				SET @description = 'Unexpected message type ' + @message_type_name + ' received';
    				END CONVERSATION @conversation_handle WITH ERROR = 1 DESCRIPTION = @description;
    			END;
    		END;
    	END;
    	COMMIT;
    END;
    GO
    
    CREATE QUEUE ProductReplicationTargetQueue
    	WITH ACTIVATION (
              PROCEDURE_NAME = dbo.usp_ProductReplicationTargetService,
              MAX_QUEUE_READERS = 1,
              EXECUTE AS SELF);
    CREATE SERVICE [//destination-server.example.com/ProductReplicationTargetService]
    	ON QUEUE ProductReplicationTargetQueue ([//example.com/ProductReplicationContract]);
    GRANT SEND ON SERVICE::[//destination-server.example.com/ProductReplicationTargetService] To ServiceBrokerUser;
    CREATE ROUTE ProductReplicationInitiatorRoute
    WITH
    	SERVICE_NAME = '//source-server.example.com/ProductReplicationInitiatorService',
    	ADDRESS = 'TCP://source-server:12345'
    CREATE REMOTE SERVICE BINDING ProductReplicationInitiatorBinding
    	TO SERVICE '//source-server.example.com/ProductReplicationInitiatorService'
    	WITH USER = dbo;
    GO


    Dan Guzman, SQL Server MVP, http://weblogs.sqlteam.com/dang/


    • Editado Dan GuzmanMVP domingo, 29 de julio de 2012 3:15 added firewall note
    • Propuesto como respuesta Kowalski77 jueves, 02 de agosto de 2012 7:19
    • Marcado como respuesta Iric WenModerator viernes, 03 de agosto de 2012 5:49
    •  
  • jueves, 02 de agosto de 2012 7:20
     
     

    Thanks. It's hard to imagine that Microsoft could make it more complex.  Mine is not working yet, but it might be the port that is closed.
    But at least I can execute the script - thanks.

  • jueves, 02 de agosto de 2012 12:21
     
     Respondida Tiene código

    The remote Service Broker method has a lot of moving parts, which adds complexity.  Another method is a local Service Broker implementation that uses your existing linked server to perform the remote update.  This will avoid the separate endpoint and firewall rule.

    USE master;
    
    CREATE DATABASE SourceProductDatabase;
    ALTER AUTHORIZATION ON DATABASE::SourceProductDatabase TO sa;
    ALTER DATABASE SourceProductDatabase SET ENABLE_BROKER;
    GO
    
    USE SourceProductDatabase;
    
    CREATE MESSAGE TYPE [//example.com/InsertProduct] VALIDATION = WELL_FORMED_XML;
    CREATE MESSAGE TYPE [//example.com/UpdateProduct] VALIDATION = WELL_FORMED_XML;
    CREATE MESSAGE TYPE [//example.com/DeleteProduct] VALIDATION = WELL_FORMED_XML;
    CREATE CONTRACT [//example.com/ProductReplicationContract](
    	[//example.com/InsertProduct] SENT BY INITIATOR
    	,[//example.com/UpdateProduct] SENT BY INITIATOR
    	,[//example.com/DeleteProduct] SENT BY INITIATOR
    	);
    
    CREATE TABLE dbo.ProductReplicationErrors(
    	ErrorTime datetime NOT NULL
    	,conversation_handle uniqueidentifier NOT NULL
    	,service_name sysname
    	,message_type_name nvarchar(256) NOT NULL
    	,message_body varbinary(MAX) NULL
    	);
    GO
    
    CREATE PROCEDURE dbo.usp_CleanupProductReplicationInitiatorQueue
    AS 
    SET NOCOUNT ON;
    DECLARE 
    	@conversation_handle uniqueidentifier = '00000000-0000-0000-0000-000000000000'
    	,@service_name sysname
    	,@message_type_name nvarchar(256)
    	,@message_body varbinary(MAX)
    	,@description nvarchar(3000);
    
    WHILE @conversation_handle IS NOT NULL
    BEGIN
    	SET @conversation_handle = NULL;
    	WAITFOR (
    		RECEIVE TOP (1)
    			@conversation_handle = conversation_handle
    			,@service_name = service_name
    			,@message_type_name = message_type_name
    			,@message_body = message_body
    		FROM dbo.ProductReplicationInitiatorQueue)
    		,TIMEOUT 1000;
    	
    	IF @conversation_handle IS NOT NULL
    	BEGIN
    		IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
    		BEGIN
    			END CONVERSATION @conversation_handle;
    		END
    		ELSE
    		BEGIN
    			INSERT INTO dbo.ProductReplicationErrors(
    				ErrorTime
    				,conversation_handle
    				,service_name
    				,message_type_name
    				,message_body)
    			VALUES(
    				GETDATE()
    				,@conversation_handle
    				,@message_type_name
    				,@service_name
    				,@message_body)
    			SET @description = 'Unexpected message type ' + @message_type_name + ' received';
    			END CONVERSATION @conversation_handle WITH ERROR = 1 DESCRIPTION = @description;
    		END;
    	END;
    END;
    GO
    
    CREATE QUEUE ProductReplicationInitiatorQueue
    	WITH ACTIVATION (
              PROCEDURE_NAME = usp_CleanupProductReplicationInitiatorQueue,
              MAX_QUEUE_READERS = 1,
              EXECUTE AS SELF);
    CREATE SERVICE [//source-server.example.com/ProductReplicationInitiatorService] 
    	ON QUEUE ProductReplicationInitiatorQueue ([//example.com/ProductReplicationContract]);
    GO
    
    CREATE TABLE dbo.Product(
    	ID varchar(50) NOT NULL
    		CONSTRAINT PK_Product PRIMARY KEY
    	, ProductName varchar(50)
    	);
    GO
    
    
    CREATE TRIGGER TR_Product_Insert
    ON dbo.Product
    FOR INSERT
    AS
    DECLARE @ConversationHandle uniqueidentifier;
    DECLARE @InsertedRows xml;
    
    IF EXISTS(SELECT * FROM inserted)
    BEGIN
    	SELECT @InsertedRows = InsertedRows
    	FROM (
    		SELECT ID, ProductName
    		FROM inserted
    		FOR XML PATH, TYPE) AS InsertedRows(InsertedRows);
    
    	BEGIN Dialog @ConversationHandle
    		FROM SERVICE [//source-server.example.com/ProductReplicationInitiatorService] 
    		TO SERVICE '//source-server.example.com/ProductReplicationTargetService'
    		ON CONTRACT [//example.com/ProductReplicationContract]
    		WITH ENCRYPTION = OFF;
    
    	SEND ON CONVERSATION @ConversationHandle
    		MESSAGE TYPE [//example.com/InsertProduct]
    		(@InsertedRows);
    END
    GO
    
    CREATE TRIGGER TR_Product_Update
    ON dbo.Product
    FOR UPDATE
    AS
    DECLARE @ConversationHandle uniqueidentifier;
    DECLARE @ChangedRows xml;
    IF EXISTS(SELECT * FROM inserted)
    BEGIN
    	SELECT @ChangedRows = ChangedRows
    	FROM (
    		SELECT ID, ProductName
    		FROM inserted
    		FOR XML PATH, TYPE) AS ChangedRows(ChangedRows);
    
    	BEGIN Dialog @ConversationHandle
    		FROM SERVICE [//source-server.example.com/ProductReplicationInitiatorService] 
    		TO SERVICE '//source-server.example.com/ProductReplicationTargetService'
    		ON CONTRACT [//example.com/ProductReplicationContract]
    		WITH ENCRYPTION = OFF;
    
    	SEND ON CONVERSATION @ConversationHandle
    		MESSAGE TYPE [//example.com/UpdateProduct]
    		(@ChangedRows);
    END
    GO
    
    CREATE TRIGGER TR_Product_Delete
    ON dbo.Product
    FOR DELETE
    AS
    DECLARE @ConversationHandle uniqueidentifier;
    DECLARE @DeletedRows xml;
    IF EXISTS(SELECT * FROM deleted)
    BEGIN
    	SELECT @DeletedRows = DeletedRows
    	FROM (
    		SELECT ID, ProductName
    		FROM deleted
    		FOR XML PATH, TYPE) AS DeletedRows(DeletedRows);
    
    	BEGIN Dialog @ConversationHandle
    		FROM SERVICE [//source-server.example.com/ProductReplicationInitiatorService] 
    		TO SERVICE '//source-server.example.com/ProductReplicationTargetService'
    		ON CONTRACT [//example.com/ProductReplicationContract]
    		WITH ENCRYPTION = OFF;
    
    	SEND ON CONVERSATION @ConversationHandle
    		MESSAGE TYPE [//example.com/DeleteProduct]
    		(@DeletedRows);
    END
    GO
    
    
    CREATE PROCEDURE dbo.usp_ProductReplicationTargetService
    AS 
    SET NOCOUNT ON;
    SET XACT_ABORT ON;
    DECLARE 
    	@conversation_handle uniqueidentifier = '00000000-0000-0000-0000-000000000000'
    	,@message_type_name nvarchar(256)
    	,@message_body varbinary(MAX)
    	,@description nvarchar(3000)
    	,@xml xml;
    
    WHILE @conversation_handle IS NOT NULL
    BEGIN
    	SET @conversation_handle = NULL;
    
    	BEGIN TRAN;
    
    	WAITFOR (
    		RECEIVE TOP (1)
    			@conversation_handle = conversation_handle
    			,@message_type_name = message_type_name
    			,@message_body = message_body
    		FROM dbo.ProductReplicationTargetQueue)
    		,TIMEOUT 1000;
    	
    	IF @conversation_handle IS NOT NULL
    	BEGIN
    		IF @message_type_name = N'//example.com/InsertProduct'
    		BEGIN
    			SET @xml = CAST(@message_body AS xml);
    			INSERT INTO [LinkedServer].DestinationProductDatabase.dbo.Product(ID, ProductName)
    				SELECT
    					inserts.row.query('./ID').value('.', 'varchar(50)') AS ID
    					,inserts.row.query('./ProductName').value('.', 'varchar(50)') AS ProductName
    				FROM @xml.nodes('/row') AS inserts(row);
    			END CONVERSATION @conversation_handle;
    		END
    		ELSE
    		BEGIN
    			IF @message_type_name = N'//example.com/UpdateProduct'
    			BEGIN
    				SET @xml = CAST(@message_body AS xml);
    				UPDATE p
    				SET ProductName = updates.ProductName
    				FROM [LinkedServer].DestinationProductDatabase.dbo.Product AS p
    				JOIN (
    					SELECT
    						rows.row.query('./ID').value('.', 'varchar(50)') AS ID
    						,rows.row.query('./ProductName').value('.', 'varchar(50)') AS ProductName
    					FROM @xml.nodes('/row') AS rows(row)) AS updates ON
    						p.ID = updates.ID;
    				END CONVERSATION @conversation_handle;
    			END
    			ELSE
    			IF @message_type_name = N'//example.com/DeleteProduct'
    			BEGIN
    				SET @xml = CAST(@message_body AS xml);
    				DELETE [LinkedServer].DestinationProductDatabase.dbo.Product
    				WHERE ID IN(
    					SELECT rows.row.query('./ID').value('.', 'varchar(50)')
    					FROM @xml.nodes('/row') AS rows(row)
    					);
    				END CONVERSATION @conversation_handle;
    			END
    			ELSE
    			BEGIN
    				INSERT INTO dbo.ProductReplicationErrors(
    					ErrorTime
    					,conversation_handle
    					,message_type_name
    					,message_body)
    				VALUES(
    					GETDATE()
    					,@conversation_handle
    					,@message_type_name
    					,@message_body)
    				SET @description = 'Unexpected message type ' + @message_type_name + ' received';
    				END CONVERSATION @conversation_handle WITH ERROR = 1 DESCRIPTION = @description;
    			END;
    		END;
    	END;
    	COMMIT;
    END;
    GO
    
    CREATE QUEUE ProductReplicationTargetQueue
    	WITH ACTIVATION (
              PROCEDURE_NAME = usp_ProductReplicationTargetService,
              MAX_QUEUE_READERS = 1,
              EXECUTE AS SELF);
    CREATE SERVICE [//source-server.example.com/ProductReplicationTargetService]
    	ON QUEUE ProductReplicationTargetQueue ([//example.com/ProductReplicationContract]);
    GO
    


    Dan Guzman, SQL Server MVP, http://weblogs.sqlteam.com/dang/