One Producer with two Consumers that both get all posts
-
Monday, January 14, 2013 8:40 PM
I need to receive a continuous stream of double values from the USB port, save the values to a database and store them in memory to be displayed. Previously I've used a rather elaborate Producer-Consumer type setup. I was thinking of trying if I can use TPL Dataflow, but can't figure out how to set up two consumers that both receive all values. (It is imperative that all values are stored in the database.) I tried to do a demonstrator by adapting the MSDN sample here. The problem is that each post is received by only one consumer.
class Program { static void Main(string[] args) { var buffer = new BufferBlock<double>(); var consumers = new Task<int>[2]; consumers[0] = ConsumeAsync(buffer); consumers[1] = ConsumeAsync(buffer); Produce(buffer); Task.WaitAll(consumers); Console.WriteLine("Processed {0}/{1} values.", consumers[0].Result, consumers[1].Result); } static void Produce(ITargetBlock<double> target) { Random rand = new Random(); for (int i = 0; i < 100; i++) { target.Post(rand.NextDouble()); } target.Complete(); } static async Task<int> ConsumeAsync(IReceivableSourceBlock<double> source) { var list = new List<double>(); int valuesProcessed = 0; double value; while (await source.OutputAvailableAsync()) { while (source.TryReceive(out value)) { list.Add(value); valuesProcessed++; } } return valuesProcessed; } }
All Replies
-
Monday, January 14, 2013 9:51 PM
First, if you want to somehow process data from a source block, you don't need all the complicated code you have in ConsumeAsync(). Instead, link your source block to an ActionBlock, which will execute your code.
But BufferBlock (and most other blocks) always sends each message only to one consumer (whether you use TryReceive(), LinkTo(), or some other method). The exception is BroadcastBlock, which is exactly what you need: it sends each message to each of its linked targets.
One thing you have to be careful about BroadcastBlock is that it can drop a message if the consumer decides to postpone it. To avoid this issue, don't set BoundedCapacity on the targets of BroadcastBlock.
- Edited by svick Monday, January 14, 2013 9:51 PM
-
Tuesday, January 15, 2013 7:09 AM
svick,
The BroadcastBlock, by name, seemed to me the most obvious choice, but the MSDN doc says the following, "The BroadcastBlock<T> class is useful when you must pass multiple messages to another component, but that component needs only the most recent value." Well, I don't need the most recent value, I absolutely need all values. Maybe a dataflow pipeline would be more suitable for my needs. One receiver block would store the value to the database and another would save it to the in-memory structure and display it. Also, I'm not sure what you mean by postponing a message.
At first look the TPL Dataflow seemed awesome, but I'm surprised that a what I'm looking for here, essentially an async event mechanism I guess, isn't readily available. (And once again I find the documentation seriously lacking.)
-
Tuesday, January 15, 2013 2:53 PM
If a block is full, it can postpone a message, so it can process it at a later time. But if your target block doesn't have BoundedCapacity set, it will never postpone a message.
With BroadcastBlock, postponing a message means you may miss it. But if you never postpone a message, you can be sure that you will receive all messages. So, what you want is “readily available”. And I agree that the documentation for dataflow blocks is quite lacking.
-
Tuesday, January 15, 2013 6:56 PM
svick,
I keep reading and reading the docs but can't figure it out. To me it's not readily available if it isn't document so that I can use it. Maybe I'm just stupid. I'll stick to my old implementation. Thanks anyway.
-
Tuesday, January 15, 2013 8:35 PMIf what I say doesn't convince you that BroadcastBlock actually works that way, maybe Introduction to TPL Dataflow (written by Stephen Toub, a member of the Parallel Computing Platform team at Microsoft) will. On page 10, it explains how BroadcastBlock operates in more detail than the documentation.
- Marked As Answer by Stephen Toub - MSFTMicrosoft Employee, Owner Wednesday, January 16, 2013 5:55 PM
- Unmarked As Answer by Rubio Thursday, January 17, 2013 4:48 PM
-
Wednesday, January 16, 2013 8:50 AM
svick,
I don't need convincing, I need understanding. Thanks for the link. Haven't seen this before.
-
Wednesday, January 16, 2013 7:54 PM
svick,
OK, here's what I've got. Is this what you had in mind? Seems to be working. What bothers me, if I've understood the BroadcastBlock correctly, is that it doesn't buffer. The producer waits until both consumers have handled, or at least received, the message and only then proceeds to post the next message. This is not ideal for my purpose.
class Program { static List<double> _produced = new List<double>(); static List<double> _consumed1 = new List<double>(); static List<double> _consumed2 = new List<double>(); static void Main(string[] args) { var broadcast = new BroadcastBlock<double>(d => d, new DataflowBlockOptions { BoundedCapacity = DataflowBlockOptions.Unbounded }); var consumer1 = new ActionBlock<double>(new Action<double>(Consume1)); var consumer2 = new ActionBlock<double>(new Action<double>(Consume2)); broadcast.LinkTo(consumer1); broadcast.LinkTo(consumer2); Produce(broadcast); broadcast.Completion.Wait(); } private static void Consume1(double val) { _consumed1.Add(val); } private static void Consume2(double val) { _consumed2.Add(val); } static void Produce(ITargetBlock<double> target) { Random rand = new Random(); double value; for (int i = 0; i < 10000; i++) { value = rand.NextDouble(); target.Post(value); _produced.Add(value); } target.Complete(); } }
- Marked As Answer by Rubio Thursday, January 17, 2013 4:48 PM

