none
Service Broker: "Waitfor" und "wie Daten aus dem well formed XML wieder auslesen" RRS feed

  • Frage

  • Hallo Experts,

    künftig möchte ich Service Broker nutzen um das DWH in einer anderen Instanz zu befüllen und auch für asynchrone Datenverarbeitung innerhalb einer Instanz.

    Um anzufangen habe ich ein kleines Beispiel nachgebaut mit der Vorlage von Olaf Helper:

    http://olafhelper.over-blog.de/article-26077104.html

    Die kleine Aufgabe besteht darin, die neue Mitarbeiter 60 Tage nach ihren Neueintritt aus der Tabelle Neueintritt zu löschen (selbstverstendlich gibt es auch andere Möglichkeiten dies zu gewährleisten, mir geht es darum den Service Broker zum laufen zu bringen). So habe ich 60 Tage durch 5 Sekunden ersetzt und warte dass sie 5 Sek. nach dem Insert wieder gelöscht werden.

    Meine Fragen:

    - Die Daten werden nicht gelöscht weil ich nicht sicher bin, wie ich den Per_Nr aus dem well formed XML auslesen kann. In diesem kleinen Fall könnte ich mit SUBSTR oder INSTR, LEFT oder RIGHT arbeiten, aber das scheint mir unbequem zu sein, theoretisch für grossen Datenmengen.

    - in der Prozedur funktioniert die zweite Teil der Verarbeitung nach dem WAITFOR TIME nicht - was habe ich hier falsch gemacht?

    - in meinem Beispiel verarbeite ich immer eine Zeile, welche von dem Trigger losgeschickt wird. Welche SB-Konzepte gibt es für einen Massendaten transfer (z. DWH befüllen)

    Hier ist die Prozedur, bitte schaut Euch an:

    USE [TEST DB]
    GO
    /****** Objekt:  StoredProcedure [dbo].[proc_WaitForNeueintrittBeendet]    Skriptdatum: 12/17/2014 12:09:02 ******/
    SET ANSI_NULLS ON
    GO
    SET QUOTED_IDENTIFIER ON
    GO




    ALTER PROCEDURE [dbo].[proc_WaitForNeueintrittBeendet]

     AS

     BEGIN

      DECLARE @RecvReqDlgHandle UNIQUEIDENTIFIER;
      DECLARE @RecvReqMsg NVARCHAR(1000);
      DECLARE @RecvReqMsgName sysname;
      declare @Zeitpunkt datetime;

      -- Erste Nachricht abholen und je Lauf
      -- auch nur eine abarbeiten (alternative WHILE Schleife)

      RECEIVE TOP(1)

      @RecvReqDlgHandle = conversation_handle,

      @RecvReqMsg = message_body,

      @RecvReqMsgName = message_type_name

      FROM dbo.WaitForNeueintrittBeendet

      --Nachricht auch für mich gedacht?

      IF @RecvReqMsgName = N'//TEST DB/WaitForNeueintrittBeendet/SubmitBeendet'

      BEGIN

      --Zeitpunkt für Action bestimmen  

      --select @Zeitpunkt = convert(datetime,dateadd(day, +60, getdate()));
    select @Zeitpunkt = convert(datetime,dateadd(SECOND, +5, getdate()));

      -- Aktivität protokollieren

      INSERT INTO  dbo.Async_Procedures
      (LogDate, LogAction, LogMessage, LogBody)
      VALUES (GetDate(), 'Inserted', @RecvReqMsgName, @RecvReqMsg);

      -- 2 min so tun, als sein man mächtig beschäftigt

      WAITFOR time @Zeitpunkt;  
     
      -- Action ausführen

      delete from [Neueeintritt]
      where PER_NUMMER = 88883;    

       -- Aktivität protokollieren

      INSERT INTO dbo.Async_Procedures
      (LogDate, LogAction, LogMessage, LogBody)
      VALUES (GetDate(), 'Deleted', @RecvReqMsgName, @RecvReqMsg);

      -- Jetzt könnte man eine Nachricht zurück senden

      END

     END;


    Irina



    Mittwoch, 17. Dezember 2014 13:14

Antworten

  • Hallo Irina,

    das WAITFOR kannst Du komplett entfernen, siehe Code Kommentar, das diente nur zu Demozwecken, das man damit auch gut langlaufende Prozesse abarbeiten kann.

    Die Daten aus der XML Nachricht kannst Du per XQuery auslesen, siehe XQuery Language Reference (SQL Server). Hier ein Teil aus einer meiner Service Broker Lösung als Beispiel:

    .....
        DECLARE @msg AS XML (dbo.XSD_MasterIndexSync);
    
        -- Erste Nachricht abholen und je Lauf
        -- auch nur eine abarbeiten (alternative WHILE Schleife)
        RECEIVE TOP(1)
            @RecvReqDlgHandle = conversation_handle,
            @msg = message_body,
            @RecvReqMsgName = message_type_name
        FROM dbo.SAZ_MasterIndexSync_TargetQueue;
    
        -- Werte überprüfen
        IF NOT @msg IS NULL
        BEGIN
            SELECT @adrnr = node.row.value('adrnr[1]', 'bigint')
                  ,@hasErpTransaction = node.row.value('haserptransactions[1]', 'bit')
                  ,@sageContent = node.row.value('sagecontent[1]', 'nvarchar(4000)')
            FROM  @msg.nodes('//masterindexsync') AS node(row);
    ....

    Was konkret heisst "... funktioniert die zweite Teil der Verarbeitung nach dem WAITFOR TIME nicht"? Gibt es eine Fehlermeldung, passiert sich nichts oder was geschieht?

    In die XML Nachricht kannst Du durchaus mehrere Datensätze einfügen & so übertragen, Dein T-SQL Code muss nur entsprechend ausgelegt sein.


    Olaf Helper

    [ Blog] [ Xing] [ MVP]

    Mittwoch, 17. Dezember 2014 13:36

Alle Antworten

  • Hallo Irina,

    das WAITFOR kannst Du komplett entfernen, siehe Code Kommentar, das diente nur zu Demozwecken, das man damit auch gut langlaufende Prozesse abarbeiten kann.

    Die Daten aus der XML Nachricht kannst Du per XQuery auslesen, siehe XQuery Language Reference (SQL Server). Hier ein Teil aus einer meiner Service Broker Lösung als Beispiel:

    .....
        DECLARE @msg AS XML (dbo.XSD_MasterIndexSync);
    
        -- Erste Nachricht abholen und je Lauf
        -- auch nur eine abarbeiten (alternative WHILE Schleife)
        RECEIVE TOP(1)
            @RecvReqDlgHandle = conversation_handle,
            @msg = message_body,
            @RecvReqMsgName = message_type_name
        FROM dbo.SAZ_MasterIndexSync_TargetQueue;
    
        -- Werte überprüfen
        IF NOT @msg IS NULL
        BEGIN
            SELECT @adrnr = node.row.value('adrnr[1]', 'bigint')
                  ,@hasErpTransaction = node.row.value('haserptransactions[1]', 'bit')
                  ,@sageContent = node.row.value('sagecontent[1]', 'nvarchar(4000)')
            FROM  @msg.nodes('//masterindexsync') AS node(row);
    ....

    Was konkret heisst "... funktioniert die zweite Teil der Verarbeitung nach dem WAITFOR TIME nicht"? Gibt es eine Fehlermeldung, passiert sich nichts oder was geschieht?

    In die XML Nachricht kannst Du durchaus mehrere Datensätze einfügen & so übertragen, Dein T-SQL Code muss nur entsprechend ausgelegt sein.


    Olaf Helper

    [ Blog] [ Xing] [ MVP]

    Mittwoch, 17. Dezember 2014 13:36
  • Hallo Olaf, ich werde gleich dein Beispiel genau anschauen. Wollte nur schnell zweiten Teil der Verarbeitung verdäutlichen. Es passiert nichts. Ich wolte eigentlich tatsächlich die Neueeintritt nach 60 Tage löschen:

    -- 2 min so tun, als sein man mächtig beschäftigt
    select @Zeitpunkt = convert(datetime,dateadd(SECOND, +5, getdate()));
    ...

    ...

      WAITFOR time @Zeitpunkt;  
     
      -- Action ausführen

      delete from Neue_Mitarbeiter
      where PER_NUMMER = 88883;    

       -- Aktivität protokollieren

      INSERT INTO dbo.Async_Procedures
      (LogDate, LogAction, LogMessage, LogBody)
      VALUES (GetDate(), 'Deleted', @RecvReqMsgName, @RecvReqMsg);


    Irina

    Mittwoch, 17. Dezember 2014 13:43
  • ...wenn man die "dbo.Async_Procedures " anguckt - werden da nicht nach 5 sek. deleted Sätze eingefügt.


    Irina

    Mittwoch, 17. Dezember 2014 13:47
  • Kommen die Nachrichten in der Queue an und werden sie auch abgearbeitet? Kannst Du mit

    SELECT *
    FROM sys.transmission_queue

    prüfen, is_conversation_error sollte 0 und transmission_status leer sein.


    Olaf Helper

    [ Blog] [ Xing] [ MVP]

    Mittwoch, 17. Dezember 2014 13:59
  • seltsamerweise wenn ich die Daten einfüge:

    insert into dbo.GH_NEUEINTRITT
    values (88883, 22, getdate(), null)

    trigger spring ein (print ist im Trigger eingebaut):

    <PER_NUMMER>88883</PER_NUMMER>

    (1 Zeile(n) betroffen)

    SELECT *
    FROM sys.transmission_queue

    gibt kein ergebnis

    aber, wenn ich RECEIVE Teil in der Prozedur auskommentiere,

    und den insert Befehl dann ausführe, kann ich Ergebnisse sehen:

    RECEIVE * FROM dbo.WaitForNeueintrittBeendet

    1    0    54    2A284CA6-F685-E411-838F-005056B630F5    2B284CA6-F685-E411-838F-005056B630F5    0    //DB/WaitForNeueintrittBeendet/TargetService    65537    //DB/WaitForNeueintrittBeendet/myContract    65539    //DB/WaitForNeueintrittBeendet/SubmitBeendet    65539    X  

    Es bedeutet doch dass der Nachricht durch den Trigger versendet wurde - oder etwa nicht?


    Mittwoch, 17. Dezember 2014 14:15
  • allerdings

    SELECT *
    FROM sys.conversation_endpoints

    ergibt dass es keine is_initiator = 1 gibt

    und State_Desc: DISCONNECTED_INBOUND

    ???


    Irina

    Mittwoch, 17. Dezember 2014 14:34
  • Hallo Irina,

    Siehe sys.conversation_endpoints (Transact-SQL) => "state": "DO   Disconnected Outbound (Ausgehend getrennt).  Die lokale Seite der Konversation hat END CONVERSATION ausgegeben. Die Konversation bleibt so lange in diesem Status, bis die Remoteseite der Konversation END CONVERSATION anerkennt. Eine Anwendung kann keine Nachrichten für die Konversation senden oder empfangen. Wenn die Remoteseite der Konversation die END CONVERSATION-Anweisung bestätigt, geht die Konversation in den CD-Zustand (Geschlossen) über. "

    Kann es evtl. sein, das die Konversation vorzeitig per END CONVERSATION beendet wird?


    Olaf Helper

    [ Blog] [ Xing] [ MVP]


    Donnerstag, 18. Dezember 2014 10:59