locked
using begininvoke and designing a concurrent task processor RRS feed

  • Question

  • Hello all,

    I am designing a concurrent task processor class. Responsiblity of this class will be to accept a task(each task will contain n number of atomic work units that needs to be processed on separate threads concurrently). THis class will then spawn threads using begin invoke for each work unit in the task.

    1. The task needs to know when all work units have finished. I implemented a callback for each begininvoke, which will increment some class level static variable under the lock. Is this the correct way?
    2. Now, BeginInvoke takes the thread from the threadpool. What will happen if there is thread starvattion in some case.
    3. When I do BeginInvoke, will the work be queued to threadpool, if no thread is currently present and also the max thread creation limit of the threadpool has reached? If yes, then whts the timeout of that request.

    Can anyone also suggest any different approach for this concurrent task processor...I dont want to manually create threads, but to stick to threadpool....

    Thanks in advance.
    Amandeep
    Saturday, January 2, 2010 8:02 PM

Answers

  • 1. No.  Store your IAsyncResult objects in a List<IAsyncResult> and the delegates you're using to run BeginInvoke in another List<Delegate>.  After you've queued up each task using BeginInvoke, use a second foreach loop to loop through each delegate and call EndInvoke on each Delegate, passing in the IAsyncResult. Once you've passed that loop, everything should be finished. 

    2. If there aren't enough threads, the system will wait for a second or two.  If there still aren't threads available, the system will automatically start adding threads to the threadpool. That being said, this shouldn't be an issue.  One thread per processor is plenty, and anything more than that will simply start to starve out other processes on the system.

    3. There's no timeout.  It'll simply wait until a thread is available, and execute the task. 

    For more information on this, check out this link and this link.
    Coding Light - Illuminated Ideas and Algorithms in Software
    Coding Light WikiLinkedInForumsBrowser
    Monday, January 4, 2010 2:30 PM

All replies

  • 1. No.  Store your IAsyncResult objects in a List<IAsyncResult> and the delegates you're using to run BeginInvoke in another List<Delegate>.  After you've queued up each task using BeginInvoke, use a second foreach loop to loop through each delegate and call EndInvoke on each Delegate, passing in the IAsyncResult. Once you've passed that loop, everything should be finished. 

    2. If there aren't enough threads, the system will wait for a second or two.  If there still aren't threads available, the system will automatically start adding threads to the threadpool. That being said, this shouldn't be an issue.  One thread per processor is plenty, and anything more than that will simply start to starve out other processes on the system.

    3. There's no timeout.  It'll simply wait until a thread is available, and execute the task. 

    For more information on this, check out this link and this link.
    Coding Light - Illuminated Ideas and Algorithms in Software
    Coding Light WikiLinkedInForumsBrowser
    Monday, January 4, 2010 2:30 PM
  • Thanks for your reply David. I implemented using the same approach that you mentioned and it served my purpose. I do have few more queries:
    1. WaitHandle has a property called WaitTimeout, is the value of this property considered when I use WaitHandle.WaitAll. If yes, then is it for 1 handle or all the handles?

    2. Is there a max limit on the WaitHandles that the waithandle class can wait on?

    3. I have the below code which I implemented: It is specific to a task named task1. I just need to generalize it. May be shifting some common functionality to base class etc.. But I am not able to generalize it as every task could have different DoTask function with different parameters, so each task need to have a separate delegate. Can you please suggest something better.

     

     

     

    public class Task1BatchProcessor
        {
            public delegate Task1Response DlgDoTask1(int workId, Task1RequestEntry Task1Request);
            private List<Task1RequestEntry> Task1Entries;
            private List<Task1Response> Task1Responses = new List<Task1Response>();
            List<DlgDoTask1> delegates = new List<DlgDoTask1>();
            List<IAsyncResult> results = new List<IAsyncResult>();
            List<WaitHandle> waitHandles = new List<WaitHandle>();
    
            public Task1BatchProcessor(List<Task1RequestEntry> lstTask1s)
            {
                Task1Entries = lstTask1s;
            }
    
            static Task1BatchProcessor()
            {
                ThreadPool.SetMinThreads(5, 5);
            }
            public List<Task1Response> DoBatchProcessing()
            {
                if (Task1Entries != null && Task1Entries.Count > 0)
                {
                    for (int i = 0; i < Task1Entries.Count; i++)
                    {
                        Task1RequestEntry dispReq = Task1Entries[i];
                        DlgDoTask1 dlg = new DlgDoTask1(new Task1().DoWork);
                        IAsyncResult result = dlg.BeginInvoke(0, dispReq, null, null);
                        results.Add(result);
                        waitHandles.Add(result.AsyncWaitHandle);
                        delegates.Add(dlg);
                    }
                }
    
                WaitHandle.WaitAll(waitHandles.ToArray());
    
                for (int i = 0; i < delegates.Count; i++)
                {
                    Task1Responses.Add(delegates[i].EndInvoke(results[i]));
                }
                return Task1Responses;
            }
     
    Thanks,
    Aman
    Wednesday, January 6, 2010 10:05 AM
  • Can you please review and provide any suggestion?

    Thanks in Advance.
    Aman.
    Thursday, January 7, 2010 11:38 AM