none
Parallel.For vs TaskFactory

    Frage

  • intersting that this logic, all task get processed, and GUI gets updated for all ticker symbols

           public void UseTaskFactorRun()
            {
                taskList = new Task[ParentClass.mTickerSymb.Length];
    
                for (int i = 0; i < ParentClass.mTickerSymb.Length; i++)
                    taskList[i] = taskFactory.StartNew(listSI_UseTaskFactor[i].GetQuoteFromWebSite); //allocate & start
    
                taskFactory.ContinueWhenAll(taskList, completedTasks =>
                {
                    //Do something 
                    foreach(StockItem SI in listSI_UseTaskFactor)
                        UpdateGUI(SI);
                });
    
            }

    but this logic, about a third of the threads never seem to go through the return statement. I put print statements in & after
    listSI[j].GetQuoteFromWebSite();  and just prior to calling Button4processStockList2, but this does occur about a third of the time.

          public void Button4Run()
            {
    
                try
                {
                    Parallel.For<StockItem>(0, listSI.Length, () => null, (j, loopStateVariable, returnValue) =>
                    {
                        listSI[j].GetQuoteFromWebSite();
                        return listSI[j]; // this is coming back in the variable "returnValue"
                    },
                        (x) => 
                            {
                                Button4processStockList2(x);
                            }// to do after thread has finished
                        );
                }
                catch (Exception e)
                {
                    Console.WriteLine(e.Message);
                }
            }
    
            private void Button4processStockList2(StockItem SI)
            {
                // to do after each thread has finished 
                UpdateGUI(SI);
            }

    Why would the task factory logic work but the  Parallel.For does not?
    Freitag, 9. März 2012 22:21

Antworten


  • what I do not understand is each interation of the body delagate is pulled from the ThreadPool. So each iteration is a thread in its  own right. Now not all threads are created at the same time. A bunch are instantiated and executed, after each body deletage thread is finished, it calls the final delegate. This does not coincide with behavior. I may have X number of interations, but the final delegate is called only Y times

    That's not quite true - 

    Each iteration of the body delegate is assigned a Task, but done in "groups" (this is the Partitioning in the TPL).  This task is reused - so, if you have 100 elements, and 4 cores in your system, you'll likely end up using 16 or so Tasks.  This will distribute the workload across the cores in your system well.

    What happens is, for each task (not each item in the body), your local initialization function will be called to initialize some local state that's used for that task.  This gets passed to each item in the body (third argument to the delegate), which you can use.  Then, for each task, that local state variable will get passed to the final delegate, where you can process it.  Again, the 1st and 3rd delegates are called once per task (not once per work item), where the 2nd is called once per work item.

    If you have 100 items in your collection, and 4 cores, you're likely to see the first and third delegate get called around 8-16 times, and the 2nd delegate will get called 100 times.

    It might make a bit more sense if you read up on making custom partitioners: http://msdn.microsoft.com/en-us/library/dd997411.aspx  This is effectively what is happening underneath - the Parallel.For method partitions your work up into "chunks" that get processed by a specific task.

     


    Reed Copsey, Jr. - http://reedcopsey.com
    If a post answers your question, please click "Mark As Answer" on that post and "Mark as Helpful".

    • Als Antwort markiert Scruffy John Mittwoch, 14. März 2012 14:52
    Dienstag, 13. März 2012 23:21

Alle Antworten

  • Most likely because you're calling UpdateGUI in parallel in the second case - 

    The last lambda in Parallel.For is used for aggregation - it's called by each task generated in the Parallel.For method concurrently in order to combine the results.  However, you're passing the results to a method to update the UI, which is likely not thread safe.

    The first version calls the Update method sequentially for each result, which is likely why it works.


    Reed Copsey, Jr. - http://reedcopsey.com
    If a post answers your question, please click "Mark As Answer" on that post and "Mark as Helpful".

    Freitag, 9. März 2012 22:44
  • in UpdateGUI, I use the BegiinInvoke to update the grid. So wouldn't each "request" get queued up for the dispatcher (or did I misunderstand that part)?

    for the TaskFactory, true the UpdateGUI is being done after all tasks are completed.

    Freitag, 9. März 2012 23:06
  • oh also, in the 2nd logic, I put a print statement just before the call to  UpdateGUI(SI), but the print statement never gets executed. So it out of the statement listSI[j].GetQuoteFromWebSite();
    ok, but something does not occur at the
     return listSI[j];  statement or in the last lambda  of the Parallet.For
    Freitag, 9. März 2012 23:22
  • oh also, in the 2nd logic, I put a print statement just before the call to  UpdateGUI(SI), but the print statement never gets executed. So it out of the statement listSI[j].GetQuoteFromWebSite();
    ok, but something does not occur at the
     return listSI[j];  statement or in the last lambda  of the Parallet.For

    I think you have a misunderstanding about how the second item works - The return value for each item in listSI is intended to be aggregated, not just used - in your case, you'll only get the last StockItem processed per task, so, if you loop uses 6 tasks to complete, you'll only get 6 items, etc.

    You can make it work more like the first via:

      Parallel.For<StockItem>(0, listSI.Length, () => new List<StockItem>(), (j, loopStateVariable, returnValue) =>
                    {
                        listSI[j].GetQuoteFromWebSite();
                        returnValue.Add(listSI[j]);
                        return returnValue;
                    },
                        (x) => 
                            {
                                foreach(var item in x) // x is now a List!
                                    Button4processStockList2(item);
                            }// to do after thread has finished
                        );

    I wrote a detailed article showing how this overload of Parallel.For is intended to be used here: http://reedcopsey.com/2010/01/22/parallelism-in-net-part-4-imperative-data-parallelism-aggregation/



    Reed Copsey, Jr. - http://reedcopsey.com
    If a post answers your question, please click "Mark As Answer" on that post and "Mark as Helpful".

    Samstag, 10. März 2012 00:15
  • ah, thanks.
    Samstag, 10. März 2012 16:31
  • I was trying to get each Stockitem to be run in its own thread. Then after each thread has been completed, to update the UI for that Stockitem. Thus I was trying to Parallel loop for each listSI[j].GetQuoteFromWebSite();
    then return that individual stock item to be
     Button4processStockList2(item);

    Samstag, 10. März 2012 16:36
  • I tried the following but still not getting it to work for each thread.

    object SyncObj = new object(); Parallel.For<StockItem>(0, listSI.Length, () => null, (j, loopStateVariable, returnValue) => { listSI[j].GetQuoteFromWebSite(); return listSI[j]; // this is coming back in the variable "returnValue" }, (returnValue) => { lock (SyncObj) { Console.WriteLine("in Lock: " + returnValue.mTickerSymbol); Button4processStockList2(returnValue); } }// to do after thread has finished ); private void Button4processStockList2(StockItem SI) { // to do after each thread has finished UpdateGUI(SI); } private void UpdateGUI(StockItem SI) { if (dataGridView1.InvokeRequired) { // It's on a different thread, so use Invoke. SetGridDataCallback d = new SetGridDataCallback(UpdateGUIHelper); dataGridView1.BeginInvoke(d, SI ); // BeginInvoke runs async, while Invoke runs sync } else { UpdateGUIHelper(SI); } } private void UpdateGUIHelper(StockItem SI) { //Console.WriteLine("UpdateGUI2 = " + SI.mTickerSymbol); dataGridView1.Rows[SI.GridRow].Cells[0].Value = SI.mTickerSymbol; dataGridView1.Rows[SI.GridRow].Cells[1].Value = SI.mCurrentQuote; dataGridView1.Rows[SI.GridRow].Cells[2].Value = SI.mChange.ToString("#0.00;(#0.00);Zero"); ; dataGridView1.Rows[SI.GridRow].Cells[2].Style = SI.mChange > 0 ? PositiveValue : NegativeValue; dataGridView1.Rows[SI.GridRow].Cells[3].Value = SI.mPctChange.ToString("#0.00%;(#0.00%);Zero"); dataGridView1.Rows[SI.GridRow].Cells[3].Style = SI.mChange > 0 ? PositiveValue : NegativeValue; }

    inside the call to GetQuoteFromWebSite, I have a footprint that prints the data returned. I also have a footprint inside the Lock statement to see if it actually gets there after each thread. So in this example I should see two footprints per stock ticker symbol. But as shown below, that is not true. About a third never gets to the lamba statement.

    "CSCO",20.02,"3/13/2012","11:32am",+0.19,19.90,20.04,19.89,10511176,107.8B,19.83,"+0.96%",19.90,"13.30 - 20.49",1.288,15.40,"Cisco Systems, In"
    in Lock: CSCO
    "PFE",21.8684,"3/13/2012","11:32am",+0.3284,21.64,21.89,21.62,14138846,164.9B,21.54,"+1.52%",21.64,"16.63 - 22.17",1.272,16.93,"Pfizer, Inc. Comm"
    in Lock: PFE
    "COP",77.59,"3/13/2012","11:32am",+0.28,77.68,77.77,77.04,2570774,99.291B,77.31,"+0.36%",77.68,"58.65 - 81.80",8.97,8.62,"ConocoPhillips Co"
    in Lock: COP
    "F",12.57,"3/13/2012","11:31am",+0.14,12.48,12.59,12.40,14146157,47.775B,12.43,"+1.13%",12.48,"9.05 - 16.18",4.943,2.51,"Ford Motor Compan"
    "QQQ",65.61,"3/13/2012","11:32am",+0.56,65.40,65.62,65.27,18864836,N/A,65.05,"+0.86%",65.40,"49.93 - 65.14",0.00,N/A,"PowerShares QQQ T"
    "WFC",31.92,"3/13/2012","11:32am",+0.41,31.84,31.97,31.68,8553955,168.3B,31.51,"+1.30%",31.84,"22.58 - 32.63",2.82,11.17,"Wells Fargo & Com"
    "AEP",38.915,"3/13/2012","11:32am",+0.235,38.75,39.00,38.68,1536512,18.812B,38.68,"+0.61%",38.75,"33.09 - 41.98",4.023,9.61,"American Electric"
    "C",35.32,"3/13/2012","11:32am",+1.03,34.97,35.34,34.87,24165900,103.4B,34.29,"+3.00%",34.97,"21.40 - 46.90",3.625,9.46,"Citigroup, Inc. C"
    "VZ",39.245,"3/13/2012","11:31am",-0.075,39.33,39.40,39.22,2876608,111.3B,39.32,"-0.19%",39.33,"32.28 - 40.48",0.85,46.26,"Verizon Communica"
    "IBM",202.678,"3/13/2012","11:32am",+1.678,201.72,202.82,201.6107,1426910,234.8B,201.00,"+0.83%",201.72,"151.71 - 201.57",13.06,15.39,"International Bus"
    "MU",8.375,"3/13/2012","11:31am",+0.295,8.24,8.50,8.20,14759766,8.273B,8.08,"+3.65%",8.24,"3.97 - 11.89",-0.174,N/A,"Micron Technology"
    "INTC",27.405,"3/13/2012","11:32am",+0.42,27.13,27.43,27.12,12118570,136.9B,26.985,"+1.56%",27.13,"19.16 - 27.50",2.39,11.29,"Intel Corporation"
    "S",2.78,"3/13/2012","11:32am",+0.06,2.76,2.79,2.73,12246357,8.333B,2.72,"+2.21%",2.76,"2.10 - 6.45",-0.965,N/A,"Sprint  Nextel Co"
    in Lock: S
    "DSCO",3.43,"3/13/2012","11:31am",-0.04,3.43,3.55,3.35,889934,84.0M,3.47,"-1.15%",3.43,"1.44 - 5.39",-1.122,N/A,"Discovery Laborat"
    in Lock: DSCO
    "DUK",21.415,"3/13/2012","11:31am",+0.045,21.41,21.48,21.3501,3165719,28.610B,21.37,"+0.21%",21.41,"16.87 - 22.12",1.281,16.68,"Duke Energy Corpo"
    in Lock: DUK
    "ORCL",30.18,"3/13/2012","11:32am",+0.47,29.99,30.23,29.94,9673571,151.7B,29.71,"+1.58%",29.99,"24.72 - 36.50",1.818,16.34,"Oracle Corporatio"
    in Lock: ORCL

    Dienstag, 13. März 2012 15:55
  • This has the same problem as before - look at my post - I made a List<T> of items per task, and add each result into that list.  The final portion then processes all items in the list.

    Your code is returning list[i] - which is a single element.  You have the exact same issue as before - my previous post still applies...


    Reed Copsey, Jr. - http://reedcopsey.com
    If a post answers your question, please click "Mark As Answer" on that post and "Mark as Helpful".

    Dienstag, 13. März 2012 16:07
  • I know, I'm trying to process each item by itself. I realize that the "proper" design would be to do it as a list as I should not spin off 500 threads (if my list is that big), but as an exercise I was trying to get each Stockitem to be run in its own thread. Then after each thread has been completed, to update the UI for that Stockitem. Thus I was trying to Parallel loop for each listSI[j].GetQuoteFromWebSite();
    then return that individual stock item to be
    Button4processStockList2(item);

    Dienstag, 13. März 2012 16:16
  • I know, I'm trying to process each item by itself. I realize that the "proper" design would be to do it as a list as I should not spin off 500 threads (if my list is that big), but as an exercise I was trying to get each Stockitem to be run in its own thread. Then after each thread has been completed, to update the UI for that Stockitem. Thus I was trying to Parallel loop for each listSI[j].GetQuoteFromWebSite();
    then return that individual stock item to be
    Button4processStockList2(item);

    In this case, don't use that overload - and just process the items within the Parallel.For loop itself:

    Parallel.For<StockItem>(0, listSI.Length, j =>
                    {
                        listSI[j].GetQuoteFromWebSite();
                        lock (SyncObj)
                        {
                            Console.WriteLine("in Lock: " + returnValue.mTickerSymbol);
                                    Button4processStockList2(returnValue);
                        }
                     });

    This will cause each quote to be fetched, then call the method to update the UI when it's done, once per item.


    Reed Copsey, Jr. - http://reedcopsey.com
    If a post answers your question, please click "Mark As Answer" on that post and "Mark as Helpful".

    Dienstag, 13. März 2012 16:31
  • ok, for future reference, any reason why that overload wouldn't/shouldn't work?

    oh, a newbie question: is there one ThreadPool for the entire system/machine or one per application?

    Dienstag, 13. März 2012 17:38
  • ok, for future reference, any reason why that overload wouldn't/shouldn't work?

    oh, a newbie question: is there one ThreadPool for the entire system/machine or one per application?

    For the second - one ThreadPool per process (application).

    For the first - the first overload works by giving you a "local initializer" that initializes some data that is used per used task.  You then are supposed to accumulate your results per item into that initialized value.  The last delegate gets passed the item (again, one item per task), letting you do your final work.

    However, since the initializer is per task, not per item in the loop, you're effectively "throwing away" most of your results when using it the way you were before.  You can easily use that item, but you have to store multiple results into it, not just a single result (which is what I did in my earlier post using List<T> for the local state).


    Reed Copsey, Jr. - http://reedcopsey.com
    If a post answers your question, please click "Mark As Answer" on that post and "Mark as Helpful".

    Dienstag, 13. März 2012 20:21
  • question, from your web site (that you referenced above), the localFinally argument, does this not get called after each iteration?

    From MS description, The Body delegate is invoked once per iteration,

    The localFinally delegate performs a final action on the local state of each thread. Which I read as the localFinally delegate will be called after each iteration(?) and I'm passing into this localFinally delegate the returnVlaue item from the body delegate.


    if the localFinally delegate is called after each iteration, then in your example above (returning a list) , the list will be processed (sent to GUI) after each iteration...
    Dienstag, 13. März 2012 21:24
  • ok, after further reading, other than the fact I'm mis-using this overload, from the behavior I see that after each iteration, the final delegate is being called after each body delegate iteration.

    Thinking out loud here: Each iteration running it's instance of body & final delegates in a thread.  So in my case, the body delegate builds a result list. This result list/returnValue is a feedback variable into the body itself. Ok, and when all threads (each body iteration) are completed, the final delegate is called? Only once? This is the part I don't understand based on description and behavior. Please explain what I missed.

    ----- oh, geeze.... rereading your comments from way above
    >>The return value for each item in listSI is intended to be aggregated, not just used - in your case, you'll only get the last StockItem processed per task, so, if you loop uses 6 tasks to complete, you'll only get 6 items, etc.

    what I do not understand is each interation of the body delagate is pulled from the ThreadPool. So each iteration is a thread in its  own right. Now not all threads are created at the same time. A bunch are instantiated and executed, after each body deletage thread is finished, it calls the final delegate. This does not coincide with behavior. I may have X number of interations, but the final delegate is called only Y times

    Dienstag, 13. März 2012 22:14

  • what I do not understand is each interation of the body delagate is pulled from the ThreadPool. So each iteration is a thread in its  own right. Now not all threads are created at the same time. A bunch are instantiated and executed, after each body deletage thread is finished, it calls the final delegate. This does not coincide with behavior. I may have X number of interations, but the final delegate is called only Y times

    That's not quite true - 

    Each iteration of the body delegate is assigned a Task, but done in "groups" (this is the Partitioning in the TPL).  This task is reused - so, if you have 100 elements, and 4 cores in your system, you'll likely end up using 16 or so Tasks.  This will distribute the workload across the cores in your system well.

    What happens is, for each task (not each item in the body), your local initialization function will be called to initialize some local state that's used for that task.  This gets passed to each item in the body (third argument to the delegate), which you can use.  Then, for each task, that local state variable will get passed to the final delegate, where you can process it.  Again, the 1st and 3rd delegates are called once per task (not once per work item), where the 2nd is called once per work item.

    If you have 100 items in your collection, and 4 cores, you're likely to see the first and third delegate get called around 8-16 times, and the 2nd delegate will get called 100 times.

    It might make a bit more sense if you read up on making custom partitioners: http://msdn.microsoft.com/en-us/library/dd997411.aspx  This is effectively what is happening underneath - the Parallel.For method partitions your work up into "chunks" that get processed by a specific task.

     


    Reed Copsey, Jr. - http://reedcopsey.com
    If a post answers your question, please click "Mark As Answer" on that post and "Mark as Helpful".

    • Als Antwort markiert Scruffy John Mittwoch, 14. März 2012 14:52
    Dienstag, 13. März 2012 23:21
  • ah, I kept reading Parrallel.For and Parrallel.ForEach split things into threads, not tasks. So what's the difference between a TaskFactory vs a Parrallel.For if both split into tasks?  Hmm, Parrallel.For is managing the group of tasks, where TaskFactory allows the developer more control of how tasks are created, started and follow on work... ok..

    guess I need to find more info on the relationship between the Threadpool, Tasks, threads.

    Thanks Reed

    Mittwoch, 14. März 2012 03:47
  • after further reading (inluding some old forum discussions) and listening to one of the channel 9 videos, I have a little better understanding of what is going on under the covers. What I'm having trouble wraping my head around is this reusing of task instances. It certainly would explain the behavior I see.

    each iteration is a task. The 1st delegate (initial delegate) is probably processed once and held in memory. The result of the initial delegate is passed into each task instance. Now a range of these tasks (the partitioning effect) are given to a thread to be processed. The task scheduler comes into play somewhere. After the tasks are created, they are added to the task scheduler.

    Now if I follow you, after a task has been completed, the task instance is not thrown away, but the instance is just "reused". Thus it indeed had a startup phase (getting the initial delegate results), The body delegate is the same except the local variables are reinitialized... Then at some point, the system deems it no longer needs the task instance, and prior to task destructor, the final delelagate is called passing into it the local state variable.

    What that means to me for this exercise, is the list being built in the body and being passed to the final delegate is not the complete list the loop was based on. But is a subset of that list depending on how many items were actually being processed within the task. So in theory the finial delegate would have a chance to combine the results of all tasks into a complete whole or in the above case, I could just use the subset list to update the UI.

    Getting close?


    ok, after trying it out, some of it pans out. In the final delegate, in most cases, the list being passed into it has a length of two items (the subset describe above). So I could have combined them into a final list if desired. But if each iteration is a task in its own right, trying to figure out the local variable ("returnValue" above (a list)) gets passed to multiple tasks to be appended to... it almost seems a task is no one iteration, but could be one or more iterations (depending on the partitioning). So the partitioning algorithm deems the sequence of instructions are short enough that a task could do say two iterations out of the 100, So I could end up with 50 tasks queued up.  Yes?
    Donnerstag, 15. März 2012 14:15
  • A better mental model (at least IMO) is to think of it more like this -

    When you call Parallel.For, the system looks at your collection, and how many cores your system has, and partitions each work item into a chunk - kind of like a collection of work items.  A Task gets assigned a chunk of work items to process.  At this point, that local init function sets up the data for that Task, which then gets used to process each item in its chunk, and then the finalization method is called for that task.

    Note that there is no mention of threads here - in fact, depending on the TaskScheduler being used (which can be set explicitly via ParallelOptions: http://msdn.microsoft.com/en-us/library/system.threading.tasks.paralleloptions.aspx ) this could actually never use threads at all  - the entire Parallel.For loop could run on one thread with the right TaskScheduler, or use the ThreadPool (which is what the default one does), or use some other completely different mechanism to schedule the work.

    I think the confusion is in the definition of "Task" - here, when I say "Task", I mean an actual System.Threading.Task, not a work item (one item processed in the delegate).


    Reed Copsey, Jr. - http://reedcopsey.com
    If a post answers your question, please click "Mark As Answer" on that post and "Mark as Helpful".

    Donnerstag, 15. März 2012 15:28
  • >>ATask gets assigned a chunk of work items to process.

    task = a chunk = intial delegate work, + one or more iterations + the final delegate

    that I can work with (mentally)

    ParallelOptions: yep, already played around with that. Kind of slick, I can actually make every execute serial or limit the parallelism.

    This exercise may not be a good use for this overload, but it got me to see what's going on better. Reason not a good usage for this overload is due to the locking... it certainly slows things down..

    Thanks for your time Reed

    Donnerstag, 15. März 2012 16:03