none
Dataflow & recursion

    שאלה

  • Hi,

    Let say I've millions of records to process and each record needs to achieve two TransformBlock.

    I can't process all records at once because it would kill the system.

    So, I've to do it in chunks (in batch).

    Here are the different phase I see :

    • Read some data
    • Post all records
    • Wait for completion
    • Restart again

    It is the restart that boders me because after calling the Complete method the TransformBlock doesn't accept any more message.

    How can I avoid this behaviour ?

    Here is my code :

        class Program
        {
            private static TransformBlock<int, string> m_trb1 = new TransformBlock<int, string>(i =>
                {
                    var msg = i.ToString() + " trb1";
                    Console.WriteLine(msg);
                    return msg;
                });
    
            private static TransformBlock<string, string> m_trb2 = new TransformBlock<string, string>(s =>
            {
                var msg = s + " trb2";
                Console.WriteLine(msg);
                return msg;
            });
    
            static void Main(string[] args)
            {
                m_trb1.LinkTo(m_trb2);
    
                Task.Factory.StartNew(() => Producer());
    
                Console.Read();
            }
    
            private static void Producer()
            {
                long y = 0;
    
                while (y < 1000000)
                {
                    foreach (var i in GetData())
                    {
                        m_trb1.Post(i);
                    }
    
                    m_trb1.Complete();
                    m_trb1.Completion.Wait();
    
                    y++;
                }
            }
    
            private static IEnumerable<int> GetData()
            {
                return Enumerable.Range(0, 10);
            }
        }
    

    Thanks

    philippe

    יום חמישי 12 אפריל 2012 05:44

תשובות

כל התגובות

  • I think somehow “restarting” the processing is not the right solution. But explicitly limiting the amount of data in the network and making Producer() asynchronous is.

    To limit the amount of items that can be inside your first TransformBlock you can do:

    new TransformBlock<int, string>(…, new ExecutionDataflowBlockOptions { BoundedCapacity = 10 });

    And to make Producer() asynchronous:

    private static async Task Producer()
    {
        long y = 0;
    
        while (y < 1000000)
        {
            foreach (var i in GetData())
                await m_trb1.SendAsync(i);
    
            y++;
        }
    
        // handle completion here
    }

    This way, if the first TranformBlock can't accept more data for now, Producer() will asynchronously wait, until it can.

    יום חמישי 12 אפריל 2012 13:22
  • Thanks.

    BTW: When I run this the used memory keeps rising.

    How ca I avoid that ?


    philippe

    יום שישי 13 אפריל 2012 10:38
  • That's because your second TransformBlock is producing items, but nobody is consuming them, so they are piling up in the output queue of the TransformBlock. To fix that, either link it to some block that will consume the items (like an ActionBlock), or change it to ActionBlock, so that it doesn't produce anything.
    יום שישי 13 אפריל 2012 15:01
  • Thanks.

    There should be a way to monitor those queues (ETW, perf counter, ...).


    philippe

    יום שישי 13 אפריל 2012 19:14
  • Hi Phillippe-

    With the DLL included in .NET 4.5 Beta, there are ETW events available from System.Threading.Tasks.Dataflow.dll.  The events fired when a dataflow block spins up a task to process messages will include how many messages were in the queue.

    יום שני 16 אפריל 2012 16:27
    בעלים