none
How replace ForkJoin for more than 2 IObservable

    Question

  • Hello,

    I was using forkjoin for 4 or 5 IObservable, as there is no more ForkJoin on the last version how can i implement it?

     

    Thanks.

     

    Friday, June 24, 2011 6:25 PM

Answers

  • Hi,

    ForkJoin is available in the Rx experimental release.  Although, if you can only depend upon the stable release, then nest Zip calls instead.  For each Zip projection, you can create a new array, anonymous type or Tuple, depending upon your needs.

    In the next version of Rxx there will be a Zip operator that works on Tuple, which makes this kind of query easier to write.

    - Dave


    http://davesexton.com/blog
    Friday, June 24, 2011 7:52 PM

All replies

  • Hi,

    ForkJoin is available in the Rx experimental release.  Although, if you can only depend upon the stable release, then nest Zip calls instead.  For each Zip projection, you can create a new array, anonymous type or Tuple, depending upon your needs.

    In the next version of Rxx there will be a Zip operator that works on Tuple, which makes this kind of query easier to write.

    - Dave


    http://davesexton.com/blog
    Friday, June 24, 2011 7:52 PM
  • Thanks alot.
    Friday, June 24, 2011 8:28 PM
  • If you wanted to use the existing stable operators, here is one potential implementation of ForkJoin(params IObservable<T>) using the current stable release (minimally tested):

            private static IObservable<TSource[]> ForkJoinWithStable<TSource>(params IObservable<TSource>[] sources)
            {
                return Observable.Create<TSource[]>(
                    observer =>
                    {
                        var results = new TSource[sources.Length];
                        return sources.Select((o, i) => o.TakeLast(1).Do(v => results[i] = v))
                                      .Merge() // Subscribe to each observable
                                      .Count() // Wait until end of all observables
                                      .Where(c => c == sources.Length) // Ensure all observables returned a result
                                      .Select(i => results) // output the results array
                                      .Subscribe(observer);
                    });
            }
    
    

    Martin

    Saturday, June 25, 2011 6:49 AM
  • Hi Martin,

    Nice query.  Here's an alternative that doesn't require Observable.Defer or Create because it has no closure: 

     

    private static IObservable<TSource[]> ForkJoinWithStable<TSource>(params IObservable<TSource>[] sources)
    {
    	return sources
    		.Select((o, i) => o.TakeLast(1).Select(value => new { i, value }))
    		.Merge()
    		.Aggregate(
    			new { array = new TSource[sources.Length], count = 0 },
    			(results, result) =>
    			{
    				results.array[result.i] = result.value;
    				return new { results.array, count = results.count + 1 };
    			})
    		.Where(results => results.count == sources.Length)
    		.Select(results => results.array);
    }
    

     

    - Dave


    http://davesexton.com/blog
    Saturday, June 25, 2011 9:59 AM
  • Hi Dave,

    That's a nicer approach.

    For a slightly more esoteric version (although I'm cheating a little by including one of my workhorse operators):

            /// <summary>
            /// Create an observable which fires immediately on subscription with the source 
            /// observable as the value, and then connects to the source observable. It will complete
            /// once the source observable completes, and then disconnect the source.<br/>         /// </summary>
            /// <typeparam name="T">The element type of the source observable</typeparam>
            /// <param name="source">The source observable</param>
            /// <returns>An observable containing which will returns the source observable on subscription, but only completes when the source completes./</returns>
            public static IObservable<IObservable<T>> ConnectAndWrap<T>(this IConnectableObservable<T> source)
            {
                return Observable.Create<IObservable<T>>(o =>
                {
                    o.OnNext(source);
    
                    var disp = new CompositeDisposable(
                        source.Subscribe(_ => { }, o.OnError, o.OnCompleted),
                        source.Connect());
    
                    return disp;
                });
            }
    
            private static IObservable<TSource[]> ForkJoinWithStable2<TSource>(params IObservable<TSource>[] sources)
            {
                return sources.Select(o => o.PublishLast().ConnectAndWrap())
                              .Merge() // Subscribe to all the wrapped observables, which internally connects the PublishLast
                              .Concat() // Subscribe to the un-wrapped AsyncSubject in order, to output the results
                              .ToArray()
                              .Where(a => a.Length == sources.Length);                
            }
    
    

     

    Martin

     

     

    Saturday, June 25, 2011 11:37 AM
  • Below is I think a better approach compared to my previous post, although very similar in nature. The MulticastEmbedded method I describe is very useful to allow the use of SelectMany for the separation of time of connection of a subject and the consumption of the results. I would appreciate any feedback on any issues you can see, also whether the naming is sensible given that this subscribes to the source each time the outer observable is "connected", instead of just once (as in the normal Multicast method):

    /// <summary>
    /// Multicasts the source observable using the provided subject Factory. 
    /// Each subscription to the outer observable will start a new subscription to the source.
    /// The outer return observable controls the lifetime of the subscription to the source.
    /// (Similar to the Connect method on an IConnectableObservable, except with multiple source subscriptions)
    /// The inner observable is the subject that has been created. 
    /// (Equivalent to the Subscribe method on an IConnectableObservable)
    /// </summary>
    /// <typeparam name="TSource">The type of the source.</typeparam>
    /// <typeparam name="TResult">The type of the result.</typeparam>
    /// <param name="source">The source obsevable.</param>
    /// <param name="subjectFactory">The subject factory.</param>
    /// <returns>An observable whose lifetime is the same as the original source, and a subject which is immediately returned on subscription.</returns>
    public static IObservable<IObservable<TResult>> MulticastEmbedded<TSource, TResult>(
        this IObservable<TSource> source, 
        Func<ISubject<TSource, TResult>> subjectFactory)
    {
        return Observable.Create<IObservable<TResult>>(
            o =>
                {
                    var s = subjectFactory();
    
                    // Ensure that the subject is sent out prior to subscribing to the source
                    o.OnNext(s);
    
                    return source.Subscribe(
                        s.OnNext,
                        ex =>
                            {
                                // Ensure inner observable completes before us
                                s.OnError(ex);
                                o.OnError(ex);
                            },
                        () =>
                            {
                                // Ensure inner observable completes before us
                                s.OnCompleted();
                                o.OnCompleted();
                            });
                });
    }
    
    private static IObservable<TSource[]> ForkJoinWithStable3<TSource>(params IObservable<TSource>[] sources)
    {
        return sources.Select(o => o.MulticastEmbedded(() => new AsyncSubject<TSource>()))
                        .Merge() // Subscribe to all the wrapped observables, which internally connects to the source
                        .Concat() // Subscribe to the AsyncSubject in order, to output the results
                        .ToArray()
                        .Where(a => a.Length == sources.Length);
    }
    


    Martin



    Saturday, June 25, 2011 10:30 PM
  • Hi Martin,

    Brain benders!

    The first one is cool.  The Merge().Concat() threw me at first, until I realized that it's actually using an enumerable of doubly-nested observables :)

    The second one is also cool.  It's quite interesting how you're using the Create observer's OnNext to send out subjects in the order that the observables are specified, and then Concat them to maintain the same order in the resulting array.

    How about calling it MulticastMany?  Perhaps you're correct though that usage of the name Multicast is misleading, because subscription side-effects are not being shared like the normal Multicast.  What about ConnectMany?  That sort of cuts out the middle between Multicast and IConnectableObservable, which is kind of what the operator is doing.

    - Dave


    http://davesexton.com/blog
    Monday, June 27, 2011 7:54 PM
  • Hi Martin,

    Actually, the name ConnectMany made me wonder whether you could make the operator hot to begin with, which is more like the semantics of Multicast/Connect.  In other words, drop the Observable.Create?

    Edit: Oops, nevermind.  Then OnNext wouldn't be available!

    - Dave


    http://davesexton.com/blog
    Monday, June 27, 2011 8:00 PM
  • Hi Dave,

    How about ReturnConnected? The return implies that a single item is produced per subscription, and the connected implies that the source is connected to the new subject. It also allow creating specialized version e.g. ReturnConnectedAsyncSubject, ReturnConnectedReplaySubject

    I'm already finding this operator useful for Linq syntax:

     

    var source = 
     Observable.Generate(0, i => i < 10, i => i + 1, i => i, i => TimeSpan.FromSeconds(i));
    
    var rollingSum =
     from sourceSubject in source.ReturnConnected(() => new Subject<int>())
     from sum in sourceSubject.Merge(sourceSubject.Delay(TimeSpan.FromSeconds(20))
                      .Select(i => -i))
              .Scan((a, b) => a + b)
     select sum;
     
    
    Martin
    Tuesday, June 28, 2011 11:20 PM
  • Hi Martin,

    I think Many might be more accurate though, because it returns a new subject for each subscription.  Although I can see how it could be confused with "many calls to OnNext per subscription", which of course wouldn't be accurate.  Actually, Multicast seems more like return semantics to me, because it's hot (Edit: Sorry, don't know what I was thinking - it's actually cold because it returns IConnectableObservable) and returns a single connectable that shares subscription side-effects through a single subject.  It also seems like Connected implies that the outer observable is hot, when actually a subscription is still required to perform what would normally be the Connect action.

    I'm still trying to wrap my head around this, so let me know if I'm off here :)

    It seems like we're lacking a good term to describe a Cold Multicast.  In Rx, when I think of cold I think of Defer.  And it seems that the Connect method is the best way to imply a connection to the subject.  So how about DeferConnection?  It can be understood as, "Connects the source to a new subject for each subscription to the returned observable."

    Perhaps DeferConnection also makes sense because Defer defers the call to Subscribe, and in your operator Connect from IConnectableObservable is replaced by a normal call to Subscribe, thus the underlying connection to the source is deferred until each call to Subscribe.

    - Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Wednesday, June 29, 2011 1:19 AM Fix
    Wednesday, June 29, 2011 12:43 AM
  • Hi Martin,

    Just noticed that you may want to call AsObservable before pushing the subject, to prevent unsolicited casting.

    - Dave


    http://davesexton.com/blog
    Wednesday, June 29, 2011 12:51 AM
  • Actually, what about just calling it Defer?  As an overload that accepts a subject, the connection part should be implied :)

    - Dave


    http://davesexton.com/blog
    Wednesday, June 29, 2011 12:59 AM
  • This also might fall into the Prime family of operators (2) that we have in Rxx.  Prime is essentially the term that we use to describe a Cold Multicast replacing Connect with Subscribe.  So perhaps this would make a fine overload of Prime?

    Speaking of Rxx, would you consider allowing us to include this operator?  I wasn't sure of its usage until your last LINQ example, and now I'm starting to see the benefits.

    - Dave


    http://davesexton.com/blog
    Wednesday, June 29, 2011 1:13 AM
  • Hi Dave,

    Yes to all the above. I think it matches the behavior of the Prime operators, but out of the names above I think I like the idea of DeferConnect the best, as it includes the OnSubscribe behavior and the Connect behavior. However if you put it in Rxx it might make more sense to keep with the existing Prime family.

    Happy for it to be included in Rxx.

    I agree AsObservable should be included.

    Yours

    Martin

     

    Wednesday, June 29, 2011 4:01 AM
  • Hi Martin,

    Created a work item in Rxx:

    http://rxx.codeplex.com/workitem/21044

    - Dave


    http://davesexton.com/blog
    Wednesday, June 29, 2011 10:58 AM
  • Interesting...

    Correct me if i'm wrong, but isn't the rolling sum the same as;

    	var rollingSum = source.Publish(xs =>
    		from sum in xs.Merge(xs.Delay(TimeSpan.FromSeconds(20))
    			.Select(i => -i))
    			.Scan((a, b) => a + b)
    	 	select sum);
    


    James Miles http://enumeratethis.com
    Wednesday, June 29, 2011 3:16 PM
  • Hi James,

    Good point.  I wonder if there's any reason to use this operator then outside of the specified example.

    - Dave


    http://davesexton.com/blog
    Wednesday, June 29, 2011 3:22 PM
  • It still seems to make sense in the ForkJoinWithStable2 example, although the alternatives to that also seem a bit easier to understand.

    - Dave


    http://davesexton.com/blog
    Wednesday, June 29, 2011 3:24 PM
  • It's basically an inversion: Publish requires the query to be specified as a function parameter, whereas DeferConnect returns the primed source as a nested observable so that it may be connected through LINQ.

    - Dave


    http://davesexton.com/blog
    Wednesday, June 29, 2011 3:37 PM