locked
Multi-threading or batch processing - Undrestanding code RRS feed

  • Question

  • I am trying to figure out if this function is a blocking call or unblocking i-e it performs batch processing or multi-processing in a multi-threaded code.

    I receive a fast stream of data in callbacks, couple of hundred times or more every second.
    I look up that data in a queue which returns me a list of relevant objects.
    In a for loop, I call a function StartProcessing() for each of these objects. I am trying to find out if StartProcessing process one item at a time in a sequential batch mode or it runs in multi-thread mode with multiple items getting processed at a time.

    private void InfoUpdateRcvd(string arg1, object arg2) //callback - data recvd
    {
    //Look up the table to get a list of relevant objects
    ArrayList mylist = (ArrayList)MyLookUpTable[arg1]

    //StartProcessing() for each object
    foreach( MyClass myobject in mylist )
       myobject.StartProcessing(arg2); 
    //StartProcessing() Calls Start() using ThreadPool.QueueUserWorkItem(...) which in turn calls a BusinessLogic() in child class. If mylist has 3 //items, Is item#1 finished processing first before the item#2 is processed OR item#1,2, and 3 are processed concurrently in a multi-//threaded way.

    }

    The StartProcessing() has the following implementation and I am not sure if it is processing each object sequentially or in a multi-threaded non-blocking way.

    Public Class myClass
    {
    public bool bInProcessing;
    private  Queue myQueue
    internal void StartProcessing(object arg2) //Is this a blocking function or multi-threaded????
    {
    lock (myQueue)
    {

    myQueue.Enqueue(arg2)
    if (!bInProcessing)
    {
    //Call Start() - Does the following line make processing multi-threaded??
    bInProcessing = ThreadPool.QueueUserWorkItem( new WaitCallback(this.Start), m_EventQueue.Dequeue() );
    }

    }//lock ends
    }//function ends

    private void Start(object obj) //PErforms the processing 
    {
    lock (this)
    {

    BusinessLogic() //This function is implemented in child classes of MyClass

    } //lock ends

    lock( myQueue)
    {
    if ( myQueue.Count == 0 )
    m_bInProcessing = false;
    else
    m_bInProcessing = ThreadPool.QueueUserWorkItem( new WaitCallback(this.Start), m_EventQueue.Dequeue() );
    }

    }

    }//class ends
    • Edited by bsobaid Monday, January 18, 2010 7:39 PM mistakenly did'nt add argument in Start() definition
    Friday, January 15, 2010 7:50 PM

Answers

  • As I said, you can get this functionality for .NET 3.5sp1 by installing the Rx Extensions.  This lets it be used in .NET 3.5, today.
    Reed Copsey, Jr. - http://reedcopsey.com
    • Marked as answer by Harry Zhu Monday, January 25, 2010 2:56 AM
    Monday, January 18, 2010 8:03 PM
    Moderator
  • Thanks,

    Is there any context-switching going on in this code?

    RX extension is an easy solution, I am wondering if it is the fastest ?

    Yes, there will be some context switching, but its minimal.  The context switches here happen between threads as things get launched, but since this is pretty much a "fire and forget" situation (you're pushing everything into background threads, and not getting info back out - at least in the code shown).

    As for performance, using the TPL constructs available via Rx will most likely do a better job than this.  This is using quite a bit of locking and waiting, and that would let you take better advantage (if you wanted) of multiple processors, with very little code.

    Reed Copsey, Jr. - http://reedcopsey.com
    • Marked as answer by Harry Zhu Monday, January 25, 2010 2:56 AM
    Wednesday, January 20, 2010 7:27 PM
    Moderator

All replies

  • This is doing something fairly odd -

    Right now, the way this works, is that a background thread is launched for the work in StartProcessing, when the FIRST item is added.

    When that item has finished processing, StartProcessing starts a NEW background thread to process the next item, then exits.

    This basically pushes all of your business logic into the background, so the call:

    foreach( MyClass myobject in mylist )
       myobject.StartProcessing(arg2); 

    Is asynchronous.  However, the work is processed in a way that it will only do one work item at a time (pseudo-synchronously) even though it uses multiple threads.

    What do you want this code to do?  This seems like a very inefficient way to work...

    Reed Copsey, Jr. - http://reedcopsey.com
    Friday, January 15, 2010 8:17 PM
    Moderator
  • Thanks,
    This is legacy code I inherited and performance of this code is'nt too bad and it uses very low CPU, however I had trouble understanding it.
    Inside BusinessLogic(), there are no lock statements anymore. However, the data structures it accesses that are outside of the class that contains BusinessLogic() i-e MyClass are wrapped with lock statements, so this shows that only one thread is executing the BusinessLogic() at a time even though it launches a New thread for every item.

    So, basically, you are saying in StartProcessing()
    ThreadPool.QueueUserWorkItem( new WaitCallback(this.Start), m_EventQueue.Dequeue() );
    launches a new background thread and Start() for every item is executed on a NEW thread, however even though it launches multiple threads using QueueUserWorkItem, something keep thread waiting until the former thread is finished processing??

    Which line of Code prevents it to wait until the former thread is done processing the BusinessLogic()??

    Is it the bInProcessing flag in StartProcessing()??
    Monday, January 18, 2010 2:42 PM
  • When you call QueueUserWorkItem, you push the delegate in the method off into a background thread.

    In your case, you're adding all of your work into a queue.  (StartProcessing adds the item to the queue).

    Then, start processing pops the first item off the queue, and runs it in a background thread, but only if it's not already processing an item [if (!bInProcessing)].

    If it's processing an item, it just leaves it in the queue and exits.

    The item processes in the background thread, then, at the end of it's processing (Start method), it checks the queue, and if it's got items, starts the next item running (second QueueUserWorkItem call).  The first item's complete at this point, and shuts down.

    This basically starts new threads regularly, but only allows one item to run at a time (in the background thread).


    Reed Copsey, Jr. - http://reedcopsey.com
    Monday, January 18, 2010 5:49 PM
    Moderator
  • Thank you Reed,

    It probably runs it in a background thread because main thread is left for GUI processing.
    To make it truly multi-threaded and multi-processing I probably need to get rid of the bInProcess flag, but then I'd have to make the BunsinessLogic() thread safe that has all the logic.

    Any suggestions?
    Monday, January 18, 2010 5:59 PM
  • I'd strongly recommend taking a look at the Task Parallel Library (blogged about here), coming in .NET 4.  There is a backported version available as part of the Rx Extensions which you can install and use in .NET 3.5sp1.

    The TPL has many built-in features that make this type of work trivial.  Basically, with what you're doing, you could just start a new task for each work item, and you'd be done... .Most of this code could be simplified to a couple of lines of pure framework code as needed.


    Reed Copsey, Jr. - http://reedcopsey.com
    Monday, January 18, 2010 6:49 PM
    Moderator
  • It's difficult to tell exactly what the original coder intended.  bInProcessing = ThreadPool.QueueUserWorkItem( new WaitCallback(this.Start), m_EventQueue.Dequeue() );  m_EventQueue.Dequeue is supposed to be passed to Start(), but Start() accepts no parameters.  I guess he did this to keep the queue functional (if(!bInProcessing) looks redundant and would result in skipped threads if it weren't for the lock.  Then he's got locks which do not appear to be keeping things thread safe so much as attempting to force ThreadPool to run synchronously. 

    One of the major problems with this design is that the myQueue object count will only show that m_EventQueue.Dequeue has been called.  It will not tell you whether QueueUserWorkItem has completed it's asynchronous operation.

    I recommend you review the original requirements and throw this code out the window.
    BrianMackey.NET
    Monday, January 18, 2010 6:59 PM
  • @Brian,

    Apologies. There is an overloaded Start(Object obj) function which gets called and accepts an argument as you pointed out. Function definition is same as Start().

    There is no way I can find original requirement or original developer and this piece has been running for years with no major issues, so I was told.


    When Start() is invoked, the intention is to finish processing it before doing anything else. Please be reminded InfoUpdateRcvd() call back that invokes all this get called couple of hundred times per second and BusinessLogic is only concerned about the latest invokation of  InfoUpdateRcvd(). It does'nt mind if it misses some of them while it is doing processing in Start() and BusinessLogic().

    I want to be able to process all items in 

    foreach( MyClass myobject in mylist )
       myobject.StartProcessing(arg2); 

    at a time so item#3 does'nt have to wait for item#1 and 2 to finish processing. Essentially, I want to speed this whole thing up. What is the best way to go about it.


    Monday, January 18, 2010 7:27 PM
  • I'd strongly recommend taking a look at the Task Parallel Library (blogged about here), coming in .NET 4.  There is a backported version available as part of the Rx Extensions which you can install and use in .NET 3.5sp1.

    The TPL has many built-in features that make this type of work trivial.  Basically, with what you're doing, you could just start a new task for each work item, and you'd be done... .Most of this code could be simplified to a couple of lines of pure framework code as needed.


    Reed Copsey, Jr. - http://reedcopsey.com
    .NET 4.0 wont get approval because it has'nt been out for long enough time to be in production.
    Monday, January 18, 2010 7:50 PM
  • As I said, you can get this functionality for .NET 3.5sp1 by installing the Rx Extensions.  This lets it be used in .NET 3.5, today.
    Reed Copsey, Jr. - http://reedcopsey.com
    • Marked as answer by Harry Zhu Monday, January 25, 2010 2:56 AM
    Monday, January 18, 2010 8:03 PM
    Moderator
  • Thanks,

    Is there any context-switching going on in this code?

    RX extension is an easy solution, I am wondering if it is the fastest ?
    Wednesday, January 20, 2010 7:19 PM
  • Thanks,

    Is there any context-switching going on in this code?

    RX extension is an easy solution, I am wondering if it is the fastest ?

    Yes, there will be some context switching, but its minimal.  The context switches here happen between threads as things get launched, but since this is pretty much a "fire and forget" situation (you're pushing everything into background threads, and not getting info back out - at least in the code shown).

    As for performance, using the TPL constructs available via Rx will most likely do a better job than this.  This is using quite a bit of locking and waiting, and that would let you take better advantage (if you wanted) of multiple processors, with very little code.

    Reed Copsey, Jr. - http://reedcopsey.com
    • Marked as answer by Harry Zhu Monday, January 25, 2010 2:56 AM
    Wednesday, January 20, 2010 7:27 PM
    Moderator