locked
Release Notes for Reactive Extensions 1.1.11111 (Experimental Release) RRS feed

  • General discussion

  • We're happy to announce a new release of Rx, with the nice binary version number v1.1.11111. The major focus of this release is performance-related work, but includes a few other changes as well.
     
    Performance of query operators
    Various binary operators such as Amb, CombineLatest, SkipUntil, TakeUntil, and Zip were traditionally implemented using a common internal operator called Combine. This helper performed coarse-grained locking on the sequence, and materialized the message stream in order for derived operators to switch on the type of the notifications. For small streams, this is not much of a problem, but for high-velocity streams with lots of messages, the excessive materialization puts quite a load on the GC. In this release, we rewrote those operators to reduce allocation rates and to leverage operator-specific characteristics that allow for optimizations.
     
    Example
    var xs = Observable.Range(0, 1000000, Scheduler.ThreadPool);
    xs.Zip(xs, (x, y) => x + y).ForEach(_ => { });
    Console.WriteLine(GC.CollectionCount(0)); // 158 (v1.0.10621) --> 114 (v1.1.11111) 

    Optimization of query operator chains
     
    It's pretty common for queries to contain chains of consecutive uses of the same query operators. For example, one can write multiple where clauses in a LINQ expression, or write multiple adjacent projections by using let and select. In this release, we optimize such uses by coalescing the use of adjacent query operators in various cases. This includes minor rewrites of operators like Do to use a Select internally, hence enabling this optimization to have a wider effect across different types of queries.
     
    Example
    var xs = Observable.Range(0, 1000000);
    var ys = from x in xs
                let y = x + 1
                let z = x * y
                where x > 0
                where y > 0
                where z > 0
                select x + y + z;
    
    var sw = Stopwatch.StartNew();
    ys.ForEach(_ => { });
    sw.Stop();
    Console.WriteLine(sw.Elapsed); // 16.65 (v1.0.10621) --> 9.79 (v1.1.11111) 

    Faster subjects
    Over the years, we've optimized the subjects in Rx (also used behind the scenes by operators like Publish, GroupBy, etc.) in a variety of ways. Old time followers of Rx will recall the introduction of "fast subjects" during this transition, reducing the number of guarantees made by subjects with regards to concurrency and scheduling. Ultimately, we settled for explicit opt-in synchronization on subjects and defaulted to the fast versions in the API. In this release, we optimize subjects further based on those relaxed guarantees.
     
    Example
    for (int n = 0; n < 10; n++)
    {
        var c = new CountdownEvent(n);
    
        var s = new Subject<int>();
        for (int i = 0; i < n; i++)
            s.Subscribe(_ => { }, () => c.Signal());
    
        var sw = Stopwatch.StartNew();
    
        Scheduler.ThreadPool.Schedule(() =>
        {
            for (int i = 0; i < 100000000; i++)
                s.OnNext(42);
            s.OnCompleted();
        });
    
        c.Wait();
        sw.Stop();
        Console.WriteLine(sw.Elapsed);
    
        // 1.0.10621  1.1.11111
        // ---------  ---------
        //    00.002     00.002
        //    04.459     01.560
        //    04.396     02.504
        //    05.840     03.976
        //    05.977     03.900
        //    07.581     04.489
        //    07.566     05.331
        //    08.434     05.998
        //    09.141     06.867
        //    09.993     08.089
    }


    N-ary overloads of Zip and CombineLatest
    While one can build N-ary compositions of sequences using the binary versions of Zip and CombineLatest, this is rather inefficient due to the need of intermediate objects, typically using anonymous types or tuples. It also reduces the readability of the query. In this release, we introduce N-ary overloads of those query operators (with N being the maximum arity of Func<> delegate types in the targeted platform) that are optimized for N-ary use (rather than being implemented in terms of the binary version).
     
    Example
    var before = xs.Zip(xs, (a, b) => new { a, b }).Zip(xs, (ab, c) => new { ab, c }).Zip(xs, (abc, d) => new { abc, d }).Select(abcd => abcd.abc.ab.a + abcd.abc.ab.b + abcd.abc.c + abcd.d);
    var after  = Observable.Zip(xs, xs, xs, xs, (a, b, c, d) => a + b + c + d);

    New DelaySubscription operator

    The original Delay family of operators in Rx doesn't delay the subscription to the underlying source, but records timestamped messages from the underlying source immediately, delaying their delivery to the subscriber. As a result, large internal queues can be maintained and materialization of each message adds potentially significant GC stress. While delaying a subscription is easy using a composition of the Timer operator and the SelectMany operator, we added a separate operator for this common scenario we encountered a number of times internally with partner teams.
      
    Example
    var xs = Observable.Defer(() =>
    {
        Console.WriteLine(DateTime.Now + " - Subscribed");
        return Observable.Return(42).Do(x => Console.WriteLine(DateTime.Now + " - Produced " + x));
    });
    
    var ys = Observable.Defer(() =>
    {
        Console.WriteLine(DateTime.Now + " - Started");
        return xs.Delay(TimeSpan.FromSeconds(5));
    });
    
    // 11/14/2011 1:00:12 PM - Started
    // 11/14/2011 1:00:12 PM - Subscribed
    // 11/14/2011 1:00:12 PM - Produced 42
    // 11/14/2011 1:00:17 PM - Received 42
    ys.ForEach(x => Console.WriteLine(DateTime.Now + " - Received " + x));
    
    
    var zs = Observable.Defer(() =>
    {
        Console.WriteLine(DateTime.Now + " - Started");
        return xs.DelaySubscription(TimeSpan.FromSeconds(5));
    });
    
    // 11/14/2011 1:00:20 PM - Started
    // 11/14/2011 1:00:25 PM - Subscribed
    // 11/14/2011 1:00:25 PM - Produced 42
    // 11/14/2011 1:00:25 PM - Received 42
    zs.ForEach(x => Console.WriteLine(DateTime.Now + " - Received " + x));

    Various bug fixes
     
    This release also includes a number of small bug fixes:
    • SkipUntil now correctly propagates error messages from the signaling "other" sequence. Before, such errors were swallowed which is in conflict with our "fail fast, exceptions are hard errors" behavior.
    • NewThreadScheduler now correctly uses the used-supplied thread factory. Before, this parameter got ignored since a rewrite of the scheduler in terms of EventLoopScheduler.
    • TaskPoolScheduler now guarantees that concurrency is always introduced on the task pool. Before, its use of the Current task pool scheduler could lead to executing work elsewhere (e.g. the UI thread) when the scheduling happened in the context of an existing Task object. In case this fix leads to cross-thread violation exceptions in your code, use ObserveOn to control concurrency explicitly.

    Monday, November 14, 2011 9:19 PM

All replies

  • Recursive scheduling using async/await in .NET 4.5

    ISchedulers are at the heart of the event processing execution engine that drives Rx. All concurrency gets introduced by schedulers, whenever we need to produce messages (e.g. for a factory method on the Observable class) or whenever we need to run time-centric operations (e.g. Delay, Timeout, Window, etc). The new recursive scheduling facility in this experimental release of Rx helps with the former case of writing IObservable<T>-producing factory methods where recursion is needed.

    Before the advent of this new facility, code to perform recursive scheduling looked as follows:

    static IObservable<int> Range(int start, int count, IScheduler scheduler)
    {
        return Observable.Create<int>(observer =>
        {
            return scheduler.Schedule(new { start, count }, (state, rec) =>
            {
                if (state.count > 0)
                {
                    observer.OnNext(state.start);
                    rec(new { start = state.start + 1, count = state.count - 1 });
                }
                else
                {
                    observer.OnCompleted();
                }
            });
        });
    }
    

    If you thought the implementation of a Range operator would look like a regular for loop with OnNext calls inside of it, this clearly proves you wrong. This form of recursive scheduling allows for quite instant cancellation that can occur at any point in the recursion (controlled by the recursive Schedule extension method provided on IScheduler). Having fine grained units of work on our schedulers allows for responsiveness (you don't want long-running work to end up on the UI loop scheduler for instance) and great control of cancellation without having to poll cancellation flags. Unfortunately, the code doesn't look very nice.

    With the new async/await based recursive scheduling, one can now write the code much more like it was intended to look to begin with. Instead of having to invoke some obscure recursive action and having to thread state around (which could be avoided above by using closures, but I wanted to be explicit in illustrating what's really going on from a state passing perspective), you now get to write code using regular control flow primitives, without losing control over cancellation and without scheduling long-running work. The sample below shows how:

    static IObservable<int> Range(int start, int count, IScheduler scheduler)
    {
        return Observable.Create<int>(observer =>
        {
            return scheduler.ScheduleAsync(new { start, count }, async (self, state, ct) =>
            {
                for (int i = state.start; i < state.start + state.count; i++)
                {
                    observer.OnNext(i);
                    await self.Yield();
                }
                observer.OnCompleted();
    
                return Disposable.Empty;
            });
        });
    }
    

    Keep in mind this is early prototyping work, and things may change as .NET 4.5 evolves towards the final release. This said, the code above uses an async lambda using the await keyword, with regular control flow structures (a for-loop in this particular case). The call to Yield on the scheduler introduces a suspension point, taking the rest of the code as a continuation to be scheduled recursively. When a cancellation comes in, the continuation will be prevented from running regularly (we still guarantee finally blocks to run within the ScheduleAsync lambda though...). Cancellation can also be observed through the CancellationToken that's passed in to the async lambda (here called ct in the sample), allowing for deep cancellation if asynchronous methods are called within the lambda.

    Time-based scheduling is equally easy to perform using this new technique, using an awaitable Sleep method:

    static IObservable<int> Timer(TimeSpan period, IScheduler scheduler)
    {
        return Observable.Create<int>(observer =>
        {
            return scheduler.ScheduleAsync(async (self, ct) =>
            {
                int i = 0;
                while (true)
                {
                    observer.OnNext(i++);
                    await self.Sleep(period);
                }
    
                return Disposable.Empty;
            });
        });
    }
    
    Note: The sample above uses a closure to capture the "period" variable, though it could use explicit state passing as well. The need to return an IDisposable object - in this case unreachably so - from the async lambda is a restriction in the current experimental release and may be removed in the future. It allows for other scheduling calls to be wired back to the root caller.

    The Sleep method returns an awaitable object used to schedule the continuation captured by the async method's state machine (i.e. the code beyond the await point) after the specified time elapses. If cancellation comes in, the sleep will be interrupted immediately, allowing the async method to be torn down through an exceptional code path (allowing finally blocks to run).


    using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }
    Tuesday, November 15, 2011 2:35 AM
  • Congratulations on the release, great job.

    The following F#+Rx snippet is 6x faster than the current stable release:

    #r @"System.Reactive.dll"
    
    open System
    open System.Reactive.Subjects
    open System.Reactive.Linq
    
    let run n m =
        let now = DateTime.Now
    
        let nodes = List.init n (fun _ -> new Subject<_>())
        nodes.[nodes.Length - 1]
            .Finally(fun () -> printfn "Completed in %f" (DateTime.Now - now).TotalSeconds)
            .Publish().Connect() |> ignore
    
        Seq.iter2 (fun l r -> (l :> IObservable<_>).Subscribe(r) |> ignore) nodes nodes.Tail
    
        Seq.init m id |> Seq.iter nodes.Head.OnNext
        nodes.Head.OnCompleted();;
    
    run 1000 100000
    

    Still getting a stack overflow at 100k nodes (1000 messages) though. Although I concede this would never be a realistic use-case.


    Neither my F# agent implementation nor CCR could rival that.

    Thursday, November 17, 2011 10:36 PM
  • I'm surprised you didn't release this on the 11/11/11!


    James Miles http://enumeratethis.com
    Monday, November 21, 2011 4:51 AM
  • And you are going to need an extra bit for your next release!
    James Miles http://enumeratethis.com
    Monday, November 21, 2011 4:54 AM
  • Hi Bart,

    "In this release, we optimize subjects further based on those relaxed guarantees."

    I've noticed that Subject<T> now employees non-blocking synchronization.

    AsyncSubject<T>, ReplaySubject<T> & BehaviorSubject<T> do not.

    Time constraints?
    Technical difficulties?
    Not beneficial?

    *UPDATE* After thinking about this, I can see that the additional state poses a problem.


    James Miles http://enumeratethis.com
    • Edited by James Miles Monday, November 21, 2011 3:46 PM
    Monday, November 21, 2011 3:11 PM
  • We were planning to release on 11/11/11 but the download compliance checking process took longer than anticipated :-).
    using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }
    Tuesday, November 22, 2011 4:50 AM
  • Other subjects haven't been optimized, in part due to additional complexity (as you notice, the state is tricky). At the same time, the default use of e.g. AsyncSubject<T> typically involves a single value source (e.g. FromAsyncPattern), so message volume is typically lower.
    using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }
    Tuesday, November 22, 2011 4:53 AM
  • Hi Bart,

    I came to the same conclusion RE AsyncSubject.

    I've found that using ReplaySubject(1) is very common. In fact in most applications I've worked on with Rx, we've created a ReplaySubject1 or "LatestValueSubject" to increase throughput.

    NOTE: ReplaySubject(1) behaviour is different to that of BehaviorSubject.


    James Miles http://enumeratethis.com


    • Edited by James Miles Tuesday, November 22, 2011 5:03 AM
    Tuesday, November 22, 2011 5:01 AM