none
Inconsistent Message behavior with custom pipeline components RRS feed

  • Question

  • Hello Experts,

    We have custom pipeline components at both receive and send ports. Messages are polled every 30 sec from WCF-SQL adapter in receive location. The custom receive pipeline component does property promotion for few fields. The send port subscribes above message and does some processing in custom pipeline component. I am monitoring both custom pipeline component activity details like process start and process end by writing the information to event viewer from custom pipeline component code.

    I am facing inconsistent behavior while processing the different messages in BizTalk send pipeline. For example, if the receive locations polls 3 different messages (message1, message2, message3) from database. message3 takes longer time to process when compared to message1 and message2.

    Sometimes message1, message2 are processed and the service instance shows as completed for these 2 messages with their end time while messages 3 is still in the process. But here comes the issue which is not consistent in behavior, Message1 and message2 service instance still shows active though they are processed in send pipeline and the status of these 2 messages gets updated to completed ONLY after the processing of message3 is completed and end time of all the 3 messages are updated with same message3 service instance end time.

    I have debugged both components individually with various breakpoints. I assume that sometimes receive pipeline component is not disassembling or not processing the messages in default parallel delivery mode. 

    Any inputs to handle this?

    Thanks,

    Sindhu

    BizTalk Developer


    Friday, October 4, 2019 8:38 PM

Answers

  • To be honest, it looks a bit like a mess and solving issues like you are doing now should be a last resort. But i will suggest some solution anyway.

    In your send port component, you're doing 'pInMsg.BodyPart.GetOriginalDataStream'. I'd advise you to put that output into a variable and put that variable into the resourcetracker (just like in your receive port component). Because as you might have learned from the docs:

    'If the pipeline component wants the lifetime of the objects to last until the end of pipeline execution, then you must add such objects to the resource tracker that your pipeline may fetch from the pipeline context'

    And that's exactly what you want, because you're getting the data steam and you're doing nothing with it, so BizTalk assumes that you're done with it. But clearly you're only done with it after you called your 'ProcessDemoProcessCategory'.

    Now with that last bit, use the variable you just created for the resourcetracker, and use that one as the parameter for the ProcessDemoProcessCategory method, instead of the 'pInMsg' variable. 

    If these changes still don't work, you should provide us with the code of the ProcessDemoProcessCategory method. 

    Finally, if you don't want to deal with issues like this anymore, try to use more stuff like orchestrations, dynamic ports, property promotions and stuff like that. Because, you know... they exist for a reason. 

    • Marked as answer by SindhuM Friday, November 15, 2019 9:12 PM
    Friday, October 18, 2019 6:57 PM

All replies

  • It sounds like there is something in your pipeline component that isn't thread safe.  But without knowing what is in the code, we would just be guessing. 
    • Proposed as answer by Leo Erlandsson Tuesday, October 8, 2019 7:01 AM
    Sunday, October 6, 2019 8:32 PM
  • Hi,

    I have the following code in my pipeline component. 

    Receive Pipeline Component:

           public IBaseMessage Execute(IPipelineContext pContext, IBaseMessage pInMsg)

            {

                try

                {

                    IBaseMessagePart bodyPart = pInMsg.BodyPart;

                    if (bodyPart != null)

                    {

                        Stream originalStream = bodyPart.GetOriginalDataStream();

                        if (originalStream != null)

                        {

                            XDocument xDoc = XDocument.Load(originalStream);

                            string categoryClassification = string.Empty;

                            if (xDoc.XPathSelectElement("//ProcessCategory[1]/CategoryClassification") != null)

                            {

                                categoryClassification = xDoc.XPathSelectElement("//ProcessCategory [1]/CategoryClassification"). Value;

                                pInMsg.Context.Promote("Category_Classification", "https://Demo.Schemas.Demo_PropertySchema", categoryClassification);

                            }

                            string categoryType = string.Empty;

                            if (xDoc.XPathSelectElement("//ProcessCategory[1]/CategoryType") != null)

                            {

                                categoryType = xDoc.XPathSelectElement("//ProcessCategory[1]/CategoryType").Value;

                                pInMsg.Context.Promote("Category_Type", "https://Demo. Schemas.Demo_PropertySchema", categoryType);

                            }

                            byte[] outBytes = System.Text.Encoding.ASCII.GetBytes(xDoc.ToString());

                            MemoryStream memStream = new MemoryStream();

                            memStream.Write(outBytes, 0, outBytes.Length);

                            memStream.Position = 0;

                            bodyPart.Data = memStream;

                            pContext.ResourceTracker.AddResource(memStream);

                        }

                    }

                }

                catch (Exception)

                {

                }           

                return pInMsg;

            }

       Send Pipeline Component:  

     

    public IBaseMessage Execute (IPipelineContext pContext, IBaseMessage pInMsg)

            {

                try

                {

                    int ProcessID = 0;

                    int histHeaderId = 0;

                    ProcessInstruction processInstr = new ProcessInstruction();

                    processInstr.pContext = pContext;

                    processInstr.pInMsg = pInMsg;

                    try

                    {

                        if (pInMsg.BodyPart != null || pInMsg.BodyPart.GetOriginalDataStream() != null)

                        {

                            Stream sourceStream = StreamUtility.EnsureSeekableStream(pInMsg, pContext);

                            using (StreamReader sr = new StreamReader(sourceStream))

                            {

                                processInstr.ProcessingXml = sr.ReadToEnd();

                            }

                            processInstr.BizTalkUserID = BizTalkUserID;                        

                            XDocument queueItem = XDocument.Parse(processInstr.ProcessingXml);                       

                            Int32.TryParse(queueItem.Descendants("ProcessCategory"). FirstOrDefault(). Element("ProcessID"). Value, out ProcessID);

                            int histHeaderId = 0;

                                Int32.TryParse(queueItem.Descendants("ProcessCategory"). FirstOrDefault(). Element("HistoryHeaderID").Value, out histHeaderId);

                            processInstr.HistoryID = histHeaderId;

                            processInstr. ProcessID = ProcessID;

                        }

                    }

                    catch

                    {

                    }

                    try

                    {                    

                       Demo.DemClassLibraries.ProcessHelper.ProcessDemoProcessCategory(ref processInstr, pContext, pInMsg);

                       

                    }

                    catch (Exception)

                    {

                        throw;

                    }

                catch

                {

                }

               return null;      

            }

    Please advise me in solving this issue.

    Wednesday, October 9, 2019 3:16 PM
  • To be honest, it looks a bit like a mess and solving issues like you are doing now should be a last resort. But i will suggest some solution anyway.

    In your send port component, you're doing 'pInMsg.BodyPart.GetOriginalDataStream'. I'd advise you to put that output into a variable and put that variable into the resourcetracker (just like in your receive port component). Because as you might have learned from the docs:

    'If the pipeline component wants the lifetime of the objects to last until the end of pipeline execution, then you must add such objects to the resource tracker that your pipeline may fetch from the pipeline context'

    And that's exactly what you want, because you're getting the data steam and you're doing nothing with it, so BizTalk assumes that you're done with it. But clearly you're only done with it after you called your 'ProcessDemoProcessCategory'.

    Now with that last bit, use the variable you just created for the resourcetracker, and use that one as the parameter for the ProcessDemoProcessCategory method, instead of the 'pInMsg' variable. 

    If these changes still don't work, you should provide us with the code of the ProcessDemoProcessCategory method. 

    Finally, if you don't want to deal with issues like this anymore, try to use more stuff like orchestrations, dynamic ports, property promotions and stuff like that. Because, you know... they exist for a reason. 

    • Marked as answer by SindhuM Friday, November 15, 2019 9:12 PM
    Friday, October 18, 2019 6:57 PM
  • Implemented using orchestration. It worked. Thanks

    Friday, November 15, 2019 9:12 PM