none
Join Patterns in Rx

    General discussion

  • One of the most difficult tasks using standard .NET events is defining complex synchronization patterns between multiple event sources where new composite event are created as a result of the occurrence of certain combinations of other events.

    The Joins class in Rx provides a basic implementation of the Join calculus over observable collections, which allows for concise descriptions of complex patterns that match notifications on multiple inputs. For instance, let’s looks that the JoCaml example of a concurrent counter whose value can be increment or read by multiple threads. The example uses a common technique of employing a private channel that contains the exclusive state of the counter, plus two bidirectional channels that represent that Inc and Get methods on the counter.

    We model unidirectional channels with values of type T using an ISubject<T> whose IObservable<out T> part represents the output port of the channel and the IObserver<in T> part represents the input port of the channel. In this case, the channel contains an integer value, so we declare _counter as follows:

    readonly ISubject<int> _counter = new Subject<int>();

    We model bidirectional or duplex channels with values of type T using a subject that contains a subject whose “continuation port” of type IObserver<T> is used to send back the return messages, and whose IObservable<T> part is used to receive the returned messaged.

    readonly ISubject<ISubject<Unit>> _inc = new Subject<ISubject<Unit>>();
           readonly ISubject<ISubject<int>>  _get = new Subject<ISubject<int>>();

    We could tighten up the typing by declaring separate variables for input and output ports, for instance, the static type of the output port of the _get channel only needs to be exposed as an input port carrying an output port IObservable<IObserver<int>>, but insisting on the most strict typing would only hide the inherent symmetry between unidirectional and bidirectional channels.

    The three channels are wired up using a join pattern to guarantee that the counter is exclusively used to read the current value, or to increment the current value.

    ·        In the first case the join pattern _counter.And(_get) becomes active when both the _counter and the _get channels contain a message,  in which case the unchanged value n of the counter is send back to the _counter channel via _counter.OnNext(n) and the current value n is also sent to the input port result.OnNext(n) carried by _get.

    ·        In the second case,  the join pattern _counter.And(_inc) becomes active when both the _counter and the _inc channels contain a message, in which case the value of the counter n is incremented and sent back to the _counter channel via _counter.OnNext(n + 1), and the result channel of _inc is notified using result.OnNext().

    Observable.Join(

       _counter.And(_get).Then((n, result) =>

       {

          _counter.OnNext(n);

          result.OnNext(n);

       }),

       _counter.And(_inc).Then((n, result) =>

       {

          _counter.OnNext(n + 1);

          result.OnNext();

       }))

    The join pattern above is created and started in the constructor of the counter, which also hooks up the disposable to the IDisposable implementation of the counter itself, and sends the initial value of the counter on the _counter channel (see complete code below, which also contains a few simple helper functions to make the use of the Unit type a little more palatable).

    The Inc and Get methods of the counter class are now rather straightforward. Both create a return channel using an AsyncSubject, notify their respective channels of the state machine with that channel and synchronously wait for the result to come back using First().

    public void Inc()

         {

            var result = new AsyncSubject<Unit>();

            _inc.OnNext(result);

            result.First();

         }

         public int Get()

         {

            var result = new AsyncSubject<int>();

            _get.OnNext(result);

            return result.First();

         }

    The complete implementation of a concurrent counter using Rx join patterns then looks like this.

    using System;

    using System.Collections.Generic;

    using System.Linq;

    namespace Counter

    {

        class Counter: IDisposable

        {

            readonly ISubject<ISubject<Unit>> _inc = new Subject<ISubject<Unit>>();

            readonly ISubject<ISubject<int>> _get = new Subject<ISubject<int>>();

            readonly ISubject<int> _counter = new Subject<int>();

            readonly IDisposable _dispose;

     

            public void Inc()

            {

                var result = new AsyncSubject<Unit>();

                _inc.OnNext(result);

                result.First();

            }

     

            public int Get()

            {

                var result = new AsyncSubject<int>();

                _get.OnNext(result);

                return result.First();

            }

     

            public Counter(int init)

            {

                _dispose = Observable.Join(

                            _counter.And(_get).Then((n, result) =>

                            {

                                _counter.OnNext(n);

                                result.OnNext(n);

                            }),

                            _counter.And(_inc).Then((n, result) =>

                            {

                                _counter.OnNext(n + 1);

                                result.OnNext();

                            })).Subscribe();

                _counter.OnNext(init);

     

            }

     

            public void Dispose()

            {

                _dispose.Dispose();

            }

        }

     

        public static class Helpers

        {

            public static void OnNext(this IObserver<Unit> src)

            {

                src.OnNext(new Unit());

            }

     

            public static Plan<Unit> Then<S, T>(this Pattern<S, T> p, Action<S, T> selector)

            {

                return p.Then((first, second) =>

                {

                    selector(first, second);

                    return new Unit();

                });

            }

        }

    }

    Monday, December 28, 2009 9:26 PM
    Owner

All replies

  • Here is Erik's example in F#.  Taking a slightly more functional approach, the "constructor" mkCounter is a higher-order function that creates a counter process and exposes the relevant "methods" as a tuple of functions.  For those new to F#, the helper methods aren't necessary because F# has a unit type whose sole value is denoted ().

    module JoinExample 
      open System
      open System.Collections.Generic
      open System.Linq
    
      let mkCounter init =
        let _inc = new Subject<ISubject<unit>>()
        let _get = new Subject<ISubject<int>>()
        let _counter = new Subject<int>()
    
        let inc () =
          let result = new AsyncSubject<unit>()
          _inc.OnNext(result)
          result.First()
    
        let get () =
          let result = new AsyncSubject<int>()
          _get.OnNext(result)
          result.First()
    
        let _disp = 
          Observable.Join(
            _counter.And(_get).Then(fun n result -> _counter.OnNext(n)
                                                    result.OnNext(n)),
            _counter.And(_inc).Then(fun n result -> _counter.OnNext(n + 1)
                                                    result.OnNext())).Subscribe()
        _counter.OnNext(init)
        (inc, get, _disp.Dispose)


    The following F# Interactive session demonstrates simple (synchronous) usage. 

    open JoinExample;;
    let (inc, get, disp) = mkCounter 0;;
    
    val inc : (unit -> unit)
    val get : (unit -> int)
    val disp : (unit -> unit)
    
    > get ();;
    val it : int = 0
    > inc ();;
    val it : unit = ()
    > get ();;
    val it : int = 1
    > disp ();;
    val it : unit = ()
    Sunday, February 21, 2010 9:48 PM
  • I don't *think* this example works with the new AsyncSubject<T>, as it doesn't yield notifications until OnCompleted() is called.

    Tuesday, April 20, 2010 11:03 AM
  • new AsyncSubject<T>, as it doesn't yield notifications until OnCompleted() is called.

    That is correct. 

     

    BTW Matthew's latest blog post is also about Join patterns.

    Tuesday, April 20, 2010 2:07 PM
    Owner
  • Hi Afra Herring,

    The previous AsyncSubject implementation would yield the first value that it received (OnNext) and would then call OnCompleted itself.  The latest AsyncSubject implementation skips all values except for the last value (OnNext, followed immediately by OnCompleted from the source).

    The sample above uses an AsyncSubject as the notification channel for each operation, but they don't call OnCompleted after calling OnNext, thus it will never notify observers of the value and First() will block indefinitely.

    - Dave


    http://davesexton.com/blog
    Monday, August 30, 2010 1:00 AM
  • Hi Afra Herring,

    The previous AsyncSubject implementation would yield the first value that it received (OnNext) and would then call OnCompleted itself.  The latest AsyncSubject implementation skips all values except for the last value (OnNext, followed immediately by OnCompleted from the source).

    The sample above uses an AsyncSubject as the notification channel for each operation, but they don't call OnCompleted after calling OnNext, thus it will never notify observers of the value and First() will block indefinitely.

    - Dave


    http://davesexton.com/blog

    That's a good explanation ... but the question was spam.  Like a bunch of posts that are bumping old threads.  I have no idea why anyone would waste their time spamming this way, or if they write bots to do it.  PITA.
    Monday, August 30, 2010 3:38 AM
  • Haha, thanks Richard.  I believe you're correct.  (Doh!)
    http://davesexton.com/blog
    Monday, August 30, 2010 4:56 AM
  • LOL!
    Monday, August 30, 2010 9:32 AM