Memory Leak using TPL Dataflow
-
Wednesday, February 06, 2013 8:25 PM
using System; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; namespace TaskTesting { public static class Program { public static void Main(string[] args) { Test().Wait(); } private static async Task Test() { using (var ct = new CancellationTokenSource()) { for (;;) { const int tasks = 4; var b = new BufferBlock<int>[tasks]; var t = new Task[tasks]; for (var i = 0; i < tasks; ++i) { var index = i; b[index] = new BufferBlock<int>(new DataflowBlockOptions { CancellationToken = ct.Token, BoundedCapacity = 16*1024 }); t[index] = Task.Factory.StartNew(() => TestInt(b[index], ct.Token), ct.Token); } for (var i = 0;i<1024*16; ++i) { await b[i%tasks].SendAsync(i); } for (var i = 0; i < tasks; ++i) { await b[i].SendAsync(-1); b[i].Complete(); await t[i]; await b[i].Completion; } } } } private static async Task TestInt(BufferBlock<int> b, CancellationToken token) { while (!token.IsCancellationRequested) { var n = await b.ReceiveAsync(token); if (token.IsCancellationRequested) return; if (n < 0) return; var i = 0; for (var j = 0; j < n && !token.IsCancellationRequested; ++j) i += j; Console.Write(i + " "); } } } }
Appears to leak. (I can't attach the screen shot of ANTs because it won't let me include images.)
It looks like the BufferBlock is registering for the cancellation token callback but then not unregistering for the callback when it's complete.
I'm working around it by using an inner cancellation token source chained to the outer one:
using System; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; namespace TaskTesting { public static class Program { public static void Main(string[] args) { Test().Wait(); } private static async Task Test() { using (var ct = new CancellationTokenSource()) { for (;;) { using (var cti = new CancellationTokenSource()) { const int tasks = 4; var cancellationToken = cti.Token; using (ct.Token.Register(() => cti.Cancel())) { var b = new BufferBlock<int>[tasks]; var t = new Task[tasks]; for (var i = 0; i < tasks; ++i) { var index = i; b[index] = new BufferBlock<int>(new DataflowBlockOptions { CancellationToken = cancellationToken, BoundedCapacity = 16 }); t[index] = Task.Factory.StartNew(() => TestInt(b[index], cancellationToken), cancellationToken); } for (var i = 0; i < 16; ++i) { await b[i%tasks].SendAsync(i, cancellationToken); } for (var i = 0; i < tasks; ++i) { await b[i].SendAsync(-1); await t[i]; t[i].Dispose(); } } } Console.Write("*"); } } } private static async Task TestInt(BufferBlock<int> b, CancellationToken token) { while (!token.IsCancellationRequested) { var n = await b.ReceiveAsync(token); if (token.IsCancellationRequested) return; if (n < 0) return; var i = 0; for (var j = 0; j < n && !token.IsCancellationRequested; ++j) i += j; Debug.Write(i + " "); } } } }Is there a better way? (Code above is test code based on the real code.)
All Replies
-
Friday, February 15, 2013 3:49 PM
Hi Cliff,
I've been running your first piece of code for an hour and my memory usage stays between 22 and 24mb ram.
What version of Dataflow are you using? I was running 4.5.9, maybe it's been fixed already.
- Proposed As Answer by Stephen Toub - MSFTMicrosoft Employee, Owner Friday, February 22, 2013 5:15 PM
- Marked As Answer by Stephen Toub - MSFTMicrosoft Employee, Owner Thursday, February 28, 2013 3:12 AM

