Dataflow & recursion
-
12. dubna 2012 5:44
Hi,
Let say I've millions of records to process and each record needs to achieve two TransformBlock.
I can't process all records at once because it would kill the system.
So, I've to do it in chunks (in batch).
Here are the different phase I see :
- Read some data
- Post all records
- Wait for completion
- Restart again
It is the restart that boders me because after calling the Complete method the TransformBlock doesn't accept any more message.
How can I avoid this behaviour ?
Here is my code :
class Program { private static TransformBlock<int, string> m_trb1 = new TransformBlock<int, string>(i => { var msg = i.ToString() + " trb1"; Console.WriteLine(msg); return msg; }); private static TransformBlock<string, string> m_trb2 = new TransformBlock<string, string>(s => { var msg = s + " trb2"; Console.WriteLine(msg); return msg; }); static void Main(string[] args) { m_trb1.LinkTo(m_trb2); Task.Factory.StartNew(() => Producer()); Console.Read(); } private static void Producer() { long y = 0; while (y < 1000000) { foreach (var i in GetData()) { m_trb1.Post(i); } m_trb1.Complete(); m_trb1.Completion.Wait(); y++; } } private static IEnumerable<int> GetData() { return Enumerable.Range(0, 10); } }
Thanksphilippe
Všechny reakce
-
12. dubna 2012 13:22
I think somehow “restarting” the processing is not the right solution. But explicitly limiting the amount of data in the network and making Producer() asynchronous is.
To limit the amount of items that can be inside your first TransformBlock you can do:
new TransformBlock<int, string>(…, new ExecutionDataflowBlockOptions { BoundedCapacity = 10 });And to make Producer() asynchronous:
private static async Task Producer() { long y = 0; while (y < 1000000) { foreach (var i in GetData()) await m_trb1.SendAsync(i); y++; } // handle completion here }This way, if the first TranformBlock can't accept more data for now, Producer() will asynchronously wait, until it can.
- Upravený svick 12. dubna 2012 13:23
- Upravený svick 12. dubna 2012 13:23
- Upravený svick 12. dubna 2012 13:24
- Navržen jako odpověď Stephen Toub - MSFTMicrosoft Employee, Owner 16. dubna 2012 16:27
-
13. dubna 2012 10:38
Thanks.
BTW: When I run this the used memory keeps rising.
How ca I avoid that ?
philippe
-
13. dubna 2012 15:01
That's because your second TransformBlock is producing items, but nobody is consuming them, so they are piling up in the output queue of the TransformBlock. To fix that, either link it to some block that will consume the items (like an ActionBlock), or change it to ActionBlock, so that it doesn't produce anything.- Navržen jako odpověď Stephen Toub - MSFTMicrosoft Employee, Owner 16. dubna 2012 16:28
-
13. dubna 2012 19:14
Thanks.
There should be a way to monitor those queues (ETW, perf counter, ...).
philippe
-
16. dubna 2012 16:27Vlastník
Hi Phillippe-
With the DLL included in .NET 4.5 Beta, there are ETW events available from System.Threading.Tasks.Dataflow.dll. The events fired when a dataflow block spins up a task to process messages will include how many messages were in the queue.
- Navržen jako odpověď Stephen Toub - MSFTMicrosoft Employee, Owner 16. dubna 2012 16:27
- Označen jako odpověď Stephen Toub - MSFTMicrosoft Employee, Owner 27. dubna 2012 10:28