Answered Memory Leak using TPL Dataflow

  • Wednesday, February 06, 2013 8:25 PM
     
      Has Code
    using System;
    using System.Threading;
    using System.Threading.Tasks;
    using System.Threading.Tasks.Dataflow;
    
    namespace TaskTesting
    {
        public static class Program
        {
            public static void Main(string[] args)
            {
                Test().Wait();
            }
    
            private static async Task Test()
            {
                using (var ct = new CancellationTokenSource())
                {
                    for (;;)
                    {
                        const int tasks = 4;
                        var b = new BufferBlock<int>[tasks];
                        var t = new Task[tasks];
                        for (var i = 0; i < tasks; ++i)
                        {
                            var index = i;
                            b[index] =
                                new BufferBlock<int>(new DataflowBlockOptions
                                    {
                                        CancellationToken = ct.Token,
                                        BoundedCapacity = 16*1024
                                    });
                            t[index] = Task.Factory.StartNew(() => TestInt(b[index], ct.Token), ct.Token);
                        }
                        for (var i = 0;i<1024*16; ++i)
                        {
                            await b[i%tasks].SendAsync(i);
                        }
                        for (var i = 0; i < tasks; ++i)
                        {
                            await b[i].SendAsync(-1);
                            b[i].Complete();
                            await t[i];
                            await b[i].Completion;
                        }
                    }
                }
            }
    
            private static async Task TestInt(BufferBlock<int> b, CancellationToken token)
            {
                while (!token.IsCancellationRequested)
                {
                    var n = await b.ReceiveAsync(token);
                    if (token.IsCancellationRequested) return;
                    if (n < 0) return;
                    var i = 0;
                    for (var j = 0; j < n && !token.IsCancellationRequested; ++j) i += j;
                    Console.Write(i + " ");
                }
            }
        }
    }



    Appears to leak. (I can't attach the screen shot of ANTs because it won't let me include images.)


    It looks like the BufferBlock is registering for the cancellation token callback but then not unregistering for the callback when it's complete.

    I'm working around it by using an inner cancellation token source chained to the outer one:

    using System;
    using System.Diagnostics;
    using System.Threading;
    using System.Threading.Tasks;
    using System.Threading.Tasks.Dataflow;
    
    namespace TaskTesting
    {
        public static class Program
        {
            public static void Main(string[] args)
            {
                Test().Wait();
            }
    
            private static async Task Test()
            {
                using (var ct = new CancellationTokenSource())
                {
                    for (;;)
                    {
                        using (var cti = new CancellationTokenSource())
                        {
                            const int tasks = 4;
                            var cancellationToken = cti.Token;
                            using (ct.Token.Register(() => cti.Cancel()))
                            {
                                var b = new BufferBlock<int>[tasks];
                                var t = new Task[tasks];
                                for (var i = 0; i < tasks; ++i)
                                {
                                    var index = i;
                                    b[index] =
                                        new BufferBlock<int>(new DataflowBlockOptions
                                            {
                                                CancellationToken = cancellationToken,
                                                BoundedCapacity = 16
                                            });
                                    t[index] = Task.Factory.StartNew(() => TestInt(b[index], cancellationToken),
                                                                     cancellationToken);
                                }
                                for (var i = 0; i < 16; ++i)
                                {
                                    await b[i%tasks].SendAsync(i, cancellationToken);
                                }
                                for (var i = 0; i < tasks; ++i)
                                {
                                    await b[i].SendAsync(-1);
                                    await t[i];
                                    t[i].Dispose();
                                }
                            }
                        }
                        Console.Write("*");
                    }
                }
            }
    
            private static async Task TestInt(BufferBlock<int> b, CancellationToken token)
            {
                while (!token.IsCancellationRequested)
                {
                    var n = await b.ReceiveAsync(token);
                    if (token.IsCancellationRequested) return;
                    if (n < 0) return;
                    var i = 0;
                    for (var j = 0; j < n && !token.IsCancellationRequested; ++j) i += j;
                    Debug.Write(i + " ");
                }
            }
        }
    }

    Is there a better way? (Code above is test code based on the real code.)


All Replies