Ensuring that the order of messages in pipeline is preserved
As per Programmer's Guide the following code ensures that the order of messages in pipeline is preserved:
My question is how does it happen? The variable number is of type OrderedInteractionPoint<int>. It means that the messages are queued up in the order they arrive to the node number and leave it in the same order. But how can this "ordered" node affect the order of the messages in the whole pipeline ("ProcessResult will receive the results in the same order that the corresponding inputs have entered the numbers interaction point")? It works and it is really great but I am interested in how it was implemented and what is its impact on performance.var numbers = new OrderedInteractionPoint<int>();<br/> // Create pipeline: numbers ==> Fibonacci ==> ProcessResult;<br/> // Send messages to numbers: for( int i=0; i<numCount; i++ ) numbers <-- 42-i;
Thanks,
Andrei
Answers
- Andrei,
The dataflow network doesn't wait for the elements (that would be impossible -- it would need to know something's coming ahead of time). It kicks in as soon as a) an element appears on the source and b) it has the "permission" (we call it "token") to run.
In the Fibonacci case the pipeline doesn't start before the agent constructor has finished executing. If it were allowed to start, it would be able to observe a change in the agent state performed by the agent constructor -- because agent constructor can modify agent state and a function can read agent state.
Artur Laksberg - MSFT- Marked As Answer byArtur Laksberg MSFTModeratorThursday, July 09, 2009 8:27 PM
All Replies
Andrei,
Fibonacci node takes all the elements that are available at the time in numbers and executes the function (and only if it is declared as a function!) using Parallel.For from the Parallel Extensions Library. The results are accumulated in an array. Then, after all the elements have cleared the functional node, they are sent to the next node in the same order in which they have arrived.
Regarding the impact on performance. There are some unrealized opportunities in our current implementation. We always wait for all elements to complete the parallel.for, even though it is safe to pass the first elements along to the next node in the pipeline without waiting for the other elements.
To demonstrate this, rewrite the last line of the sample like this:
for( int i=0; i<numCount; i++ )
//numbers <-- 42-i;
numbers <-- 33+i;
Now the program calculates the same 10 numbers in a different order. One might expect Fibonacci(33) to come out first very quickly, followed by Fibonacc(34) etc. However, the pipeline waits for all numbers to come out of the Fibonacci node before starting the next node, ProcessResult. As a result, the element that takes the longest to calculate dominates, even if it is the last in the pipeline.
For a more involved example, imagine a multi-stage pipeline with functional nodes f1, f2, ... fN:numbers ==> f1 ==> f2 ==> ... fN ==> ProcessResult;
Since the execution of the functional nodes has no side-effects and can happen independently of each other, there is no need to join the results after each node -- all that matters is that they arrive at the last node in the right order. Here again our current implementation results in less than optimal throughput.
Thanks for your question!
Artur Laksberg - MSFT- Artur,
Thanks for your answer. It elucidates some subtle things. I have another questions. When does functional node start executing? How much time does it wait while items are accumulated in its queue? For example if we have two loops executed in succession like this
for (int i = 0; i < numCount; i++)
numbers <-- (42 - i);
for (int i = 0; i < numCount; i++)
numbers <-- (30 - i);
does it mean that all the elements will be queued before the functional node will start executing?
Andrei - Andrei,
The dataflow network doesn't wait for the elements (that would be impossible -- it would need to know something's coming ahead of time). It kicks in as soon as a) an element appears on the source and b) it has the "permission" (we call it "token") to run.
In the Fibonacci case the pipeline doesn't start before the agent constructor has finished executing. If it were allowed to start, it would be able to observe a change in the agent state performed by the agent constructor -- because agent constructor can modify agent state and a function can read agent state.
Artur Laksberg - MSFT- Marked As Answer byArtur Laksberg MSFTModeratorThursday, July 09, 2009 8:27 PM
Hi Andrei/Artur,
I'm kind of interested in this one as well. Let me see if my understanding is correct -
Data networks, composed of functions residing in the same agent (as this is an example of), don't execute as independent agents. As such, the current agent must yield its execution (after generating the sequences in the numbers source) to allow for the next node in the network to execute, because it is the agent's thread that needs to kick this off.
So in Andrie's final example, the following should occur:
- Agent thread will first complete both for loops in sequence, populating the numbers source from both loops. Nothing else will be happening in parrallel.
- Once the agent's constructor exits, it will yield the logical thread, which will allow the data network to start receiving messages from the numbers source.
- Axum uses a parrallel for to partition the sequence into chunks to be executed on seperate threads in parrallel.
- The entire sequece will go throught he Fibonacci function and then yield back to the agent thread, because ProcessResults is not a function and can't run in parrallel.
- Finally, the agent thread executes the the ProcessResults function sequentially for each Fibonacci number
Is that correct?
Thanks,
Brandon- Brandon,
I think you got it right. The overriding principle is actually quite simple: no two threads should be able to (read and write) or (write and write) to the same partition of state.
There are two kinds of state -- the domain and the agent. The ability of a method to read or write that state is determined by a) for agent-level methods, whether the enclosing agent is a reader/writer/or no access agent and b) whether the method is declated as a function.
For example, a function defined in a reader agent cannot mutate domain state because it's a part of a reader agent; it cannot mutate agent state because it declares itself as a function.
Artur Laksberg - MSFT


