locked
Biztalk streaming disassemble pipeline and throttling RRS feed

  • Question

  • Hi,

    My requirements seem pretty simple but, after some research, I just can't figure out how I'm gonna put this all together. I need to limit the number of orchestration instances spawned while debatching a large message in a streaming disassemble receive pipeline. Let’s say that I have a large xml coming in that contains 100 000 separate "Order" message. The receive pipeline would then debatch it and create 100 000 "ProcessOrder" orchestrations. This is too much and I need to limit that. My two requirements are simple, the debatching needs to be done in a streaming manner so that I only load one "Order" message in memory at a time before sending it to the messagebox and the debatching needs to be throttled based on the number of current running "ProcessOrder" orchestration instances (say if I already have 100 running instances, the debatching would wait till one is over to send another "Order" message to the messagebox).

    So far, I have the receive pipeline that does the debatching and functional modifications to my messages. It does what it should in a streaming manner and puts individual messages in VirtualStreams. I also have an orchestration and helper methods that can limits the number of “ProcessOrder” orchestration instances. The problem is putting them together and keeping the streaming. I know that I can run a receive pipeline inside an orchestration (and that would solve my problem since on every "getnext" call to the disassemble pipeline, I could just hold on if there are too many running orchestration instances) BUT, digging in biztalk dlls, I noticed that using Microsoft.XLANGs.Pipeline.XLANGPipelineManager still loads up all the messages in memory instead of enumerating them like Microsoft.BizTalk.PipelineOM.PipelineManager does. That’s a bummer. Yes, I know they are putting every messages in VirtualStream (I do too in my receive pipeline) but this is still inadequate, memory wise, for such a large message number.

    So my next step would be to run the receive pipeline directly in the receive port without having the orchestration that limits the number of “ProcessOrder” instances, but to do that I would need to add a delay logic in my pipeline. I read online that this isn’t a good idea (why?), so what other alternative do I have? Any insight will be much appreciated.

    Thanks


    • Edited by chgag Monday, April 6, 2015 1:47 PM
    Monday, April 6, 2015 1:46 PM

Answers

  • A Delay in the Pipeline won't work because the entire batch is committed to the MessageBox at the same time. So, they would all 'appear' and be routed at the same time anyway.

    If you want to throttle the debatching, another solution is to wrap the base disassembler, XmlDasmComp in this case, and instead of returning each message to the Pipeline engine, push it to a MSMQ Queue.

    Then use the MSMQ Adapter to pop them which has the effect of publishing them to the MessageBox one by one.  One thing you lose with this is knowing that the entire batch will be handled one way or another.

    If you must be able to process the whole batch, you can do something similar by staging the debatched messages in SQL Server then drain one by one with the SQL Adapter.

    • Marked as answer by chgag Wednesday, April 8, 2015 12:23 AM
    Monday, April 6, 2015 3:22 PM
    Moderator
  • Yes first of all I would like to reiterate that Pipeline if called within Orchestration works in same way as in receive port and it doesn't loads the complete message in memory.

    Now coming on to your problem, we had also faced this issue and we achieved it in two ways.

    1) By debatching in orchestration and passing delay parameter to child Orch:-

    We created a parent orchestration which will do debatching of the complete batch and in loop we will use Start Orchestration shape to spawn new orchestration threads. Here we passed  a intDelayTime (custom int type variable to hold the value of delay, initial value =0) parameter.

    The child orchestration first shape will be a delay shape with code as below.

    new System.TimeSpan(0,0,delaytime,0)

    intDelayTime was calculated in parent orch as below after looping for 5 times-

    intDelayTime = intDelayTime + intConfiguredDelay, where intConfiguredDelay was fetched from BRE(was equals to 5 in our case.)

    So basically here we were passing delay parameter to Child Orchestration which will delay it's processing. So first 5 orchestration instances will spawn immediately, while next 5 will be delayed by 5 minutes.

    2) Perform debatching in Receive pipeline but keep Orchestration in Stopped State:-

    Here we did debatching in Receive pipeline, so each debatched message instance will spawn a new Orchestration Instance, but as Orchestration was in Stopped mode. So all the instances will stay in Suspended but resumable state.

    Later wrote a C# exe which will resume top 5 instances of orchestration. This exe was scheduled to run after every 5 mins.

    Note:- As orchestration is in Stopped state(not Unenlisted) so it will subscribe to all the new messages but will be in suspended state. But if you resume them manually or programmatically it will process perfectly fine.

    So, by using the above technique we were able to allow running of only 5 instances for a span of 5 minutes.

    You can choose any of the above techniques whichever you find easy.

    Let me know if you have any question.


    Thanks,
    Prashant
    ----------------------------------------
    Please mark this post accordingly if it answers your query or is helpful.

    • Marked as answer by chgag Wednesday, April 8, 2015 12:23 AM
    Tuesday, April 7, 2015 9:43 AM

All replies

  • For clarity, the Disassembly process works the same when running in a Pipeline or Orchestration, except, as you've noticed, the Orchestration result is returned as a (sort of) in memory collection while the Receive Port results are streamed to the MessageBox and committed all at once.

    So, why do you need to limit the number of ProcessOrder Orchestrations?

    One option is a Resource Dispenser: http://social.technet.microsoft.com/wiki/contents/articles/23924.biztalk-resource-dispenser-send-port-edition.aspx

    Monday, April 6, 2015 2:41 PM
    Moderator
  • Hi John, thanks for you input! I need to limit the number of orchestration because the ProcessOrder, which is a legacy app, takes a lot of resources (working on that too). We share the biztalk server farm with a lot of other teams and the organisation established that some throttling should be done to prevent bogging down the servers (as it currently does).

    I saw the resource dispenser pattern but I think it would spawn as many orchestrations as if I would simply start the ProcessOrder ones?

    What do you think about the option of having a delay in the pipeline executing on the receive port?

    Thanks

    Monday, April 6, 2015 3:07 PM
  • A Delay in the Pipeline won't work because the entire batch is committed to the MessageBox at the same time. So, they would all 'appear' and be routed at the same time anyway.

    If you want to throttle the debatching, another solution is to wrap the base disassembler, XmlDasmComp in this case, and instead of returning each message to the Pipeline engine, push it to a MSMQ Queue.

    Then use the MSMQ Adapter to pop them which has the effect of publishing them to the MessageBox one by one.  One thing you lose with this is knowing that the entire batch will be handled one way or another.

    If you must be able to process the whole batch, you can do something similar by staging the debatched messages in SQL Server then drain one by one with the SQL Adapter.

    • Marked as answer by chgag Wednesday, April 8, 2015 12:23 AM
    Monday, April 6, 2015 3:22 PM
    Moderator
  • Yes first of all I would like to reiterate that Pipeline if called within Orchestration works in same way as in receive port and it doesn't loads the complete message in memory.

    Now coming on to your problem, we had also faced this issue and we achieved it in two ways.

    1) By debatching in orchestration and passing delay parameter to child Orch:-

    We created a parent orchestration which will do debatching of the complete batch and in loop we will use Start Orchestration shape to spawn new orchestration threads. Here we passed  a intDelayTime (custom int type variable to hold the value of delay, initial value =0) parameter.

    The child orchestration first shape will be a delay shape with code as below.

    new System.TimeSpan(0,0,delaytime,0)

    intDelayTime was calculated in parent orch as below after looping for 5 times-

    intDelayTime = intDelayTime + intConfiguredDelay, where intConfiguredDelay was fetched from BRE(was equals to 5 in our case.)

    So basically here we were passing delay parameter to Child Orchestration which will delay it's processing. So first 5 orchestration instances will spawn immediately, while next 5 will be delayed by 5 minutes.

    2) Perform debatching in Receive pipeline but keep Orchestration in Stopped State:-

    Here we did debatching in Receive pipeline, so each debatched message instance will spawn a new Orchestration Instance, but as Orchestration was in Stopped mode. So all the instances will stay in Suspended but resumable state.

    Later wrote a C# exe which will resume top 5 instances of orchestration. This exe was scheduled to run after every 5 mins.

    Note:- As orchestration is in Stopped state(not Unenlisted) so it will subscribe to all the new messages but will be in suspended state. But if you resume them manually or programmatically it will process perfectly fine.

    So, by using the above technique we were able to allow running of only 5 instances for a span of 5 minutes.

    You can choose any of the above techniques whichever you find easy.

    Let me know if you have any question.


    Thanks,
    Prashant
    ----------------------------------------
    Please mark this post accordingly if it answers your query or is helpful.

    • Marked as answer by chgag Wednesday, April 8, 2015 12:23 AM
    Tuesday, April 7, 2015 9:43 AM
  • Hi Prashant and thanks for your help. Well, in my case, I think that the entire message will end up in memory since the threshold used by the virtualstream in which the debatched messages are copied while executing the pipeline in an orchestration is 10kb. All of my messages are less than 10kb so the individual virtualstreams would not overflow to disk and stay in their memorystream. Is this right?

    You mention that you use the start orchestration shape, which I guess is in the atomic scope of the execution of the pipeline? If so, does the started orchestration starts right away or does it only starts at the end of the atomic scope (when you have processed all of your messages)?

    The problem with your approaches is that your delay is fixed. In my scenario I need to start a new orchestration as soon as another one complete and their execution time is unknown and can vary by a large amount.

    Can you see another possibility so I could monitor the number of running orchestration instances (that part is fine and I have a working test case) and start another one as soon as one finishes?


    • Edited by chgag Tuesday, April 7, 2015 2:18 PM
    Tuesday, April 7, 2015 1:47 PM
  • Hi John and thanks again for your support. Well, I'm glad you clarified the delay in the pipeline option. I thought that each one of the debatched messages were committed sequentially to the messagebox, but I guess it makes more sense to commit them all at once.

    As for MSMQ, this really is a good idea but unfortunately, it isn't something used in my organisation (or installed on the servers). Trying to have that installed everywhere and have it tested, interfaced by common modules, etc. would take forever so I better find another alternative.

    I'll look into the database option, even though I hoped Biztalk could do this kind of throttling natively without having to put the messages back in there.

    Tuesday, April 7, 2015 1:58 PM
  • You would only require MSMQ on the BizTalk Servers where this process would run.  It's probably already there, just not enabled in Windows.

    Have you tried a Singleton Pattern?  Even just enabling Ordered Delivery on the Send Port.

    BizTalk will manage all the Orchestration Instances itself pretty well though you will likely need to isolate this particular Orchestration to it's own Host.

    • Marked as answer by chgag Wednesday, April 8, 2015 12:22 AM
    • Unmarked as answer by chgag Wednesday, April 8, 2015 12:22 AM
    Tuesday, April 7, 2015 3:12 PM
    Moderator
  • Hi Prashant and thanks for your help. Well, in my case, I think that the entire message will end up in memory since the threshold used by the virtualstream in which the debatched messages are copied while executing the pipeline in an orchestration is 10kb. All of my messages are less than 10kb so the individual virtualstreams would not overflow to disk and stay in their memorystream. Is this right?

    [Prashant]- As per my knowledge pipeline processes messages in streams and doesn't loads complete message in memory. Messages gets loaded in memory while using XMLDoc or xpath etc. 

    You mention that you use the start orchestration shape, which I guess is in the atomic scope of the execution of the pipeline? If so, does the started orchestration starts right away or does it only starts at the end of the atomic scope (when you have processed all of your messages)?

    [Prashant]:- Yes Child Orchestrations will start at the end of Orchestration only but all the Child Orchestrations will have configured delay time. For ex- first 5 will have dealaytime as 0 and next 5 will have delay of 5mins and so on.

    The problem with your approaches is that your delay is fixed. In my scenario I need to start a new orchestration as soon as another one complete and their execution time is unknown and can vary by a large amount.

    [Prashant]:- Yes I agree that it will have configured fixed time.

    Can you see another possibility so I could monitor the number of running orchestration instances (that part is fine and I have a working test case) and start another one as soon as one finishes?

    [Prashant]:- Other option's I can think of are- 

    1) Using Ordered delivery(Singleton Orch) as explained by John, but this will have performance impact.

    Not able to think of any other option for now :(



    Thanks,
    Prashant
    ----------------------------------------
    Please mark this post accordingly if it answers your query or is helpful.

    Tuesday, April 7, 2015 4:43 PM
  • I think that the entire message will end up in memory since the threshold used by the virtualstream in which the debatched messages are copied while executing the pipeline in an orchestration is 10kb. All of my messages are less than 10kb so the individual virtualstreams would not overflow to disk and stay in their memorystream. Is this right?

    To clarify this point, the location of a message's physical stream is very much up to the collective behavior of the various Pipeline Component authors.  It is very easy to force an entire message into memory by copying it to a MemoryStream (which could mean disk if that memory is paged, but that's a different issue).

    Conversely, you can create a VirtualStream with a very low threshold so that effectively, the entire message is on disk.

    The point is, you don't really know unless you test or look at all of the component implementations.

    Tuesday, April 7, 2015 7:25 PM
    Moderator
  • Hi John and Prashant. Thanks for your responses. I think I'm going to have to redesign how I split/create my messages. The main problem in my case is that I also need to "categorize" the pipeline output messages and prioritize some of them. This means that, even though I'm already processing 100 000 "unimportant" messages, I need to be able to give priority to an incoming "important" message (and make sure it is not natively throttled by biztalk). With your help, I think I understood that the same exact throttling and prioritizing I was previously doing from an orchestration (splitting the messages manually with xpath and checking priority from the BRE) is not possible using a pipeline. To make a long story short, the current system is adequately throttling the incoming messages like I described, but we get some outofmemory exceptions because of the xpath and maps that loads the large message in memory, hence the need of a streaming pipeline that disassembles it.

    As for the running of a pipeline in an orchestration, here is what I see in biztalk 2009 dll, so you understand why I bring this up:

    Microsoft.XLANGs.Pipeline.dll (XLANGPipelineManager)

    private static ReceivePipelineOutputMessages ExecutePipeline(ReceivePipeline p, XLANGMessage inMsg)
    {
    //(snip) here they call the "normal" pipeline manager (Microsoft.Biztalk.PipelineOM.PipelineManager)
        IBTMMessageSet btMsgSet = PipelineManager.ExecuteReceivePipeline(p, msg, out resourceTracker);
        result = new ReceivePipelineXMessages(btMsgSet);
    //(snip)
    }
    
    internal ReceivePipelineXMessages(IBTMMessageSet btMsgSet)
    {
        for (IBaseMessage next = btMsgSet.GetNext(); next != null; next = btMsgSet.GetNext())
        {
            XLANGPipelineManager.WrapMessageWithVirtualStream(next);
            this.bmsgList.Add(next);
        }
        this.be = this.bmsgList.GetEnumerator();
    }
    
    internal static void WrapMessageWithVirtualStream(IBaseMessage msg)
    {
    //(snip)
        partByIndex.Data = XLANGPipelineManager.ConvertStreamToVirtualStream(partByIndex.GetOriginalDataStream())
    //(snip)
    }
    
    private static VirtualStream ConvertStreamToVirtualStream(Stream inputStream)
    {
        VirtualStream virtualStream = null;
        if (inputStream != null)
        {
            virtualStream = new VirtualStream();
            byte[] array = new byte[1024];
            int count;
            while (0 < (count = inputStream.Read(array, 0, array.Length)))
            {
                virtualStream.Write(array, 0, count);
            }
            virtualStream.Position = 0L;
        }
        return virtualStream;
    }
    
    public VirtualStream() : this(10240, VirtualStream.MemoryFlag.AutoOverFlowToDisk, new MemoryStream())
    {
    }
    
    private VirtualStream(int bufferSize, VirtualStream.MemoryFlag flag, Stream dataStream) : this(bufferSize, bufferSize, flag, dataStream)
    {
    }
    
    private VirtualStream(int bufferSize, int thresholdSize, VirtualStream.MemoryFlag flag, Stream dataStream)
    {
    //(snip)
        this.ThreshholdSize = thresholdSize;
    //(snip)
    }
    
    As you can see, it seems that executing a pipeline from an orchestration copy each messages coming back from the pipeline in a 10kb virtualstream and then return an array of theses virtualstream. In my case, since my messages are all under 10kb, I think they would all end up in the memorystream part of the virtualstreams.

    Back to the drawing table I guess, I'll let you know when I have another solution, and probably more questions :)

    Tuesday, April 7, 2015 11:29 PM
  • You can route 'Priority' messages to another Host Instance where they will not be throttled by the 100K regular messages.  It's simple enough with a custom Context Property.

    It may be time to mark Answers or Helpful posts here and open a new thread for any different issues.

    Wednesday, April 8, 2015 12:12 AM
    Moderator