Help me "think in Rx". :) Workflow Design


  • Hello,

    I am relatively new to Rx and it seems to be solution to my async woes. I'm hoping you can help me to "think in Rx".

    Using C# i am trying to create an asynchronous data processing workflow.

    Currently i am using threadpools and background workers.... you can only imagine the horror.

    Rx looks like the end of all that.

    So i would like to start discussion on how i would implement my model with Rx. Links to other examples would be appreciated, but advice is all that is required at present.

    The workflow i have in mind has a Data Source, a Network Node and a Node Process as it's three components. Node processes can be different on each node making each "network node" a "node process" factory where ach "node procss" is running on it's own thread and pushes it's data to the next node in the chain. 

    So a data source generates data, which i believe should be an IObservable. The first node in the network is subscribed to this IObservable and creates a node process when data is pushed to it. The node process is async and does not stop the node recieving further data to create new processes and its data will be pushed to the next node once it is completed. 

    The second node can observe the first node's "processFinished" event which will push the original data, plus any data added by the last node's process. If this node has all that it needs to continue, it creates a process and the chain continues until it hits the final output node. Otherwise no processing is required.

    Here is an idealised ascii diagram. Where O represents node and dashes present connections. 

                   /   \
    Input----O    O-----Output
                   \   /

    Input - A data source, lines of text, feed of numbers, list of classes etc which we will call ProcessData class.

    The first node will be given the data as it pushed from the Input node. The node will create a Node Process for this chuck of data provided that it has the requirements for the process to continue.

    Nodes 2 and 3 are subscribed to node 1 and will recieve the pushed data at the same time. They may not finish at the same time, so node 4 may have to wait until node 2 and node 3 are both ready.

    Rx can help me achieve this from what i can tell. I am just uncertain how the interfaces would relate to one another.

    Could someone be willing to discuss how to go about implementing this?

    My thoughts.

    1) Nodes are IObservbles and IObservers. How do inherit and create events doWork and processFinished?
    2) With a list of node types and a connection matrix, we create node instances each with same interface. Then subscribe each node to it's partners.
    3) Load the data in the input node causing it to push to the first node.

    I have been through the Channel9 tutorials to get a feel for this new way of trying things but i don't want to run before i can walk. So what thoughts and ideas do you guys have for making these different parts ones together using Rx?

    Ultimately we want to open a data source and the network reacts and the data flows through.



    Sunday, November 10, 2013 6:54 AM

All replies

  • Let's be a little bit more explicit with some hypothetical code.

    This is the workFlow Manager. Which has a list of nodes and all of the connections.

    public class WorkFlow(){
    //Container for nodes
    public List<Node> nodes;
    //Which nodes connect to which nodes
    public float[,] connections;
    //Runs until processes finished this flag knows when.
    public bool running;
    public WorkFlow(List<Data> data)
     //Check dowork is still being done
    public void GenerateNetwork
    //Go through all of the nodes and connections subscribing.
    public AddData(List<Data> data)
    //Add input data and push to node 1

    This is the node class which should be IObservable so other nodes can have data pushed to them.

    public class Node : IObservable
    public List<Process> processes;
    public Node()
    public void GotNewData()
    if (process.hasData)
    process.doWork(); //Async

    This is the class for the node process with a dowork function. This is just an interface as the dowork process could be anything. We could chain classes that represent electrical components etc.

    public class Process{
    public Process()
    public DoWork()
    //The function here varies depending on what type of data is required.
    //Whe finished we push the data to the next node.
    public hasData(List<Data> data)
    //Checks that there is all of the data needed in order to run the new process.

    From the above i cannot picture how to do this with the beauty of Rx.

    Sunday, November 10, 2013 7:41 AM