Answered Dataflow & recursion

  • 2012年4月12日 上午 05:44
     
      包含代碼

    Hi,

    Let say I've millions of records to process and each record needs to achieve two TransformBlock.

    I can't process all records at once because it would kill the system.

    So, I've to do it in chunks (in batch).

    Here are the different phase I see :

    • Read some data
    • Post all records
    • Wait for completion
    • Restart again

    It is the restart that boders me because after calling the Complete method the TransformBlock doesn't accept any more message.

    How can I avoid this behaviour ?

    Here is my code :

        class Program
        {
            private static TransformBlock<int, string> m_trb1 = new TransformBlock<int, string>(i =>
                {
                    var msg = i.ToString() + " trb1";
                    Console.WriteLine(msg);
                    return msg;
                });
    
            private static TransformBlock<string, string> m_trb2 = new TransformBlock<string, string>(s =>
            {
                var msg = s + " trb2";
                Console.WriteLine(msg);
                return msg;
            });
    
            static void Main(string[] args)
            {
                m_trb1.LinkTo(m_trb2);
    
                Task.Factory.StartNew(() => Producer());
    
                Console.Read();
            }
    
            private static void Producer()
            {
                long y = 0;
    
                while (y < 1000000)
                {
                    foreach (var i in GetData())
                    {
                        m_trb1.Post(i);
                    }
    
                    m_trb1.Complete();
                    m_trb1.Completion.Wait();
    
                    y++;
                }
            }
    
            private static IEnumerable<int> GetData()
            {
                return Enumerable.Range(0, 10);
            }
        }
    

    Thanks

    philippe

所有回覆

  • 2012年4月12日 下午 01:22
     
     提議的解答 包含代碼

    I think somehow “restarting” the processing is not the right solution. But explicitly limiting the amount of data in the network and making Producer() asynchronous is.

    To limit the amount of items that can be inside your first TransformBlock you can do:

    new TransformBlock<int, string>(…, new ExecutionDataflowBlockOptions { BoundedCapacity = 10 });

    And to make Producer() asynchronous:

    private static async Task Producer()
    {
        long y = 0;
    
        while (y < 1000000)
        {
            foreach (var i in GetData())
                await m_trb1.SendAsync(i);
    
            y++;
        }
    
        // handle completion here
    }

    This way, if the first TranformBlock can't accept more data for now, Producer() will asynchronously wait, until it can.

  • 2012年4月13日 上午 10:38
     
     

    Thanks.

    BTW: When I run this the used memory keeps rising.

    How ca I avoid that ?


    philippe

  • 2012年4月13日 下午 03:01
     
     提議的解答
    That's because your second TransformBlock is producing items, but nobody is consuming them, so they are piling up in the output queue of the TransformBlock. To fix that, either link it to some block that will consume the items (like an ActionBlock), or change it to ActionBlock, so that it doesn't produce anything.
  • 2012年4月13日 下午 07:14
     
     

    Thanks.

    There should be a way to monitor those queues (ETW, perf counter, ...).


    philippe

  • 2012年4月16日 下午 04:27
    擁有者
     
     已答覆

    Hi Phillippe-

    With the DLL included in .NET 4.5 Beta, there are ETW events available from System.Threading.Tasks.Dataflow.dll.  The events fired when a dataflow block spins up a task to process messages will include how many messages were in the queue.