Why no TaskCompletionSource<classic>?
- I'm impressed with TaskCompletionSource - a very clever pattern!
I've implemented a producer/consumer queue using this class and all works well - apart from one minor problem. Because there's no nongeneric version of TaskCompletionSource, I need to use duck typing which feels a bit grubby:
public class PCQueue : IDisposable
{
class WorkItem { public dynamic TaskSource; public Delegate Func; }
Task [] workers;
BlockingCollection <WorkItem > taskQ = new BlockingCollection <WorkItem > ();
public PCQueue (int workerCount)
{
workers = new Task [workerCount];
for (int i = 0; i < workerCount; i++)
workers [i] = Task .Factory.StartNew (Consume);
}
public void Dispose ()
{
// Enqueue one null task per worker to make each exit.
foreach (Task worker in workers) EnqueueTask (null );
}
public Task EnqueueTask (Action action)
{
var tcs = new TaskCompletionSource <object >();
taskQ.Add (new WorkItem { TaskSource = tcs, Func = action } );
return tcs.Task;
}
public Task <TResult> EnqueueTask<TResult> (Func <TResult> func)
{
var tcs = new TaskCompletionSource <TResult>();
taskQ.Add (new WorkItem { TaskSource = tcs, Func = func } );
return tcs.Task;
}
void Consume ()
{
while (true )
{
WorkItem workItem = taskQ.Take ();
if (workItem == null ) return ;
try
{
if (workItem.TaskSource.Task.IsCancellationRequested)
workItem.TaskSource.Task.AcknowledgeCancellation ();
else
workItem.TaskSource.SetResult (((dynamic )workItem.Func).Invoke ());
}
catch (Exception ex)
{
workItem.TaskSource.SetException (ex);
}
}
}
}
(It goes without saying that the producer/consumer queue needs to hold hetrogeneously-typed tasks.)
I can live with the cast to dynamic in Consume (for the delegate) - it's really just an optimized way of calling DynamicInvoke and the WorkItem field is restricted to type Delegate. But WorkItem.TaskSource should really be of type TaskCompletionSource<classic>.
Perhaps I am missing something.
P.S. If I wanted to make this a bounded blocking queue, how would I ensure that a task's cancellation immediately unblocks a full queue?
Joe
Write LINQ queries interactively - www.linqpad.net- BearbeitetJoe AlbahariMVPFreitag, 3. Juli 2009 06:12
Antworten
- Hi Joe-
I would have expected something more like this:
This avoids the need for dynamic completely, at least to me feels like a cleaner design, and also highlights that a TaskCompletionSource (non-generic) isn't really necessary. We did consider adding one, but we were able to meet all of the scenarios we wanted to with the generic one and thus didn't see a great deal of value in providing a non-generic one. If you have scenarios that would really benefit from a non-generic one and that can't be accomplished with just the generic one, we'd love to hear about them.public class PCQueue : IDisposable { abstract class WorkItem { internal abstract void Invoke(); } class WorkItem<TResult> : WorkItem { public TaskCompletionSource<TResult> TaskSource; public Func<TResult> Func; internal override void Invoke() { try { TaskSource.SetResult(Func()); } catch (Exception exc) { TaskSource.SetException(exc); } } } Task[] workers; BlockingCollection<WorkItem> taskQ = new BlockingCollection<WorkItem>(); public PCQueue(int workerCount) { workers = (from i in Enumerable.Range(0, workerCount) select Task.Factory.StartNew(Consume)).ToArray(); } public void Dispose() { taskQ.CompleteAdding(); } public Task EnqueueTask(Action action) { return EnqueueTask<object>(() => { action(); return null; }); } public Task<TResult> EnqueueTask<TResult>(Func<TResult> func) { var tcs = new TaskCompletionSource<TResult>(); taskQ.Add(new WorkItem<TResult> { TaskSource = tcs, Func = func }); return tcs.Task; } void Consume() { foreach (var workItem in taskQ.GetConsumingEnumerable()) { workItem.Invoke(); } } }
You'll also notice I've taken advantage of a few others features, such as BlockingCollection's GetConsumingEnumerable and CompleteAdding methods, which clean up the Consume loop and the Dispose method.
Regarding the question around cancellation of a task unblocking a full queue, you can register a continuation on the Task (OnlyOnCanceled) if you want code executed when a Task transitions into a completed state, or you can Register with the Task's CancellationToken, which will notify you when cancellation is requested (the latter is what you'd want in this case). However, with BlockingCollection<T>, there's no way to remove items other than through its *Take methods, and with those you can't control which item is removed, as that's left up to the underlying IProducerConsumerCollection<T>'s TryTake method. Given that, one option would be to forego usage of BlockingCollection<T> and manage your own collection of tasks using a SemaphoreSlim as the gating mechanism (or more likely one for empty and one for full); this is what BlockingCollection is built on top of, and you'd have direct control of changing the semaphore's count, such that you could increase it when you removed an entry from your own data structure upon item cancellation.
-Steve- Als Antwort vorgeschlagenStephen Toub - MSFTMSFT, ModeratorFreitag, 3. Juli 2009 20:41
- Als Antwort markiertJoe AlbahariMVPSamstag, 4. Juli 2009 00:43
Alle Antworten
- Hi Joe-
I would have expected something more like this:
This avoids the need for dynamic completely, at least to me feels like a cleaner design, and also highlights that a TaskCompletionSource (non-generic) isn't really necessary. We did consider adding one, but we were able to meet all of the scenarios we wanted to with the generic one and thus didn't see a great deal of value in providing a non-generic one. If you have scenarios that would really benefit from a non-generic one and that can't be accomplished with just the generic one, we'd love to hear about them.public class PCQueue : IDisposable { abstract class WorkItem { internal abstract void Invoke(); } class WorkItem<TResult> : WorkItem { public TaskCompletionSource<TResult> TaskSource; public Func<TResult> Func; internal override void Invoke() { try { TaskSource.SetResult(Func()); } catch (Exception exc) { TaskSource.SetException(exc); } } } Task[] workers; BlockingCollection<WorkItem> taskQ = new BlockingCollection<WorkItem>(); public PCQueue(int workerCount) { workers = (from i in Enumerable.Range(0, workerCount) select Task.Factory.StartNew(Consume)).ToArray(); } public void Dispose() { taskQ.CompleteAdding(); } public Task EnqueueTask(Action action) { return EnqueueTask<object>(() => { action(); return null; }); } public Task<TResult> EnqueueTask<TResult>(Func<TResult> func) { var tcs = new TaskCompletionSource<TResult>(); taskQ.Add(new WorkItem<TResult> { TaskSource = tcs, Func = func }); return tcs.Task; } void Consume() { foreach (var workItem in taskQ.GetConsumingEnumerable()) { workItem.Invoke(); } } }
You'll also notice I've taken advantage of a few others features, such as BlockingCollection's GetConsumingEnumerable and CompleteAdding methods, which clean up the Consume loop and the Dispose method.
Regarding the question around cancellation of a task unblocking a full queue, you can register a continuation on the Task (OnlyOnCanceled) if you want code executed when a Task transitions into a completed state, or you can Register with the Task's CancellationToken, which will notify you when cancellation is requested (the latter is what you'd want in this case). However, with BlockingCollection<T>, there's no way to remove items other than through its *Take methods, and with those you can't control which item is removed, as that's left up to the underlying IProducerConsumerCollection<T>'s TryTake method. Given that, one option would be to forego usage of BlockingCollection<T> and manage your own collection of tasks using a SemaphoreSlim as the gating mechanism (or more likely one for empty and one for full); this is what BlockingCollection is built on top of, and you'd have direct control of changing the semaphore's count, such that you could increase it when you removed an entry from your own data structure upon item cancellation.
-Steve- Als Antwort vorgeschlagenStephen Toub - MSFTMSFT, ModeratorFreitag, 3. Juli 2009 20:41
- Als Antwort markiertJoe AlbahariMVPSamstag, 4. Juli 2009 00:43
- Thanks, Stephen.
Write LINQ queries interactively - www.linqpad.net

