none
Calling an async method in Rx2 RRS feed

  • Question

  • Very quick question.

    Is the optimal way (performance-wise) of calling an async method on an observed value -

    observable.SelectMany(async (value, token) => await DoSomething(value, token));

    Only, in performance testing, this is ~5 times slower than creating all the tasks and using a Task.WhenAny() loop.  I know the two are different in that the above streams processing whilst the latter doesn't start processing results until everything is queued up, however I'm trying to make the case for Rx and passing in an Observable to get out an observable is elegant but doesn't come close to sticking in TPL throughout.

    Is there is a better/faster way than above that I'm missing?  For example I could possible cheat and use buffer before creating a Task[] and using WhenAny() on it, but that seems inelegant.

    In short, this question is just is there a more recommended way than SelectMany?  I'm trying not to start a flame war of TPL/Rx as I think both solve different problems and are great, in fact I'm trying to support both - e.g.

    Task<Result> Get(Request)

    for singular requests and

    IObservable<Result> Get(IObservable<Request>)

    for multiple requests, in the same interfaces, which gives my API consumers the ability to use the best paradigm for their problem...  So far I'm largely having to duplicate code paths one in TPL and one in Rx for each branch to get optimal performance as interchanging (i.e. using only Rx or TPL under the hood) seems to kill performance.

    I'll stop there before I make my usual mistake of over complicating the question!!!


    thargy

    Craig Dean (MCPD)
    Chief Executive
    www.webappuk.com

    Please consider marking this as the answer if you have been genuinely helped!

    Wednesday, October 3, 2012 8:21 PM

Answers

  • It's the most optimal way, but it hasn't been optimized :-). Overloads for SelectMany and Merge that operate on a Task<T> are merely tiny wrappers that use FromAsync or ToObservable to convert the Task<T> to an IObservable<T>. There's significant wrapping involved, creation of disposables to support cancellation, and use of AsyncSubject<T> objects. This implementation choice was made based on implementation and test cost (with a compositional approach allowing for reused testing of the building blocks), and because the single-value case for an inner stream is not a primary design point for operators like SelectMany and Merge (or, more broadly speaking, a single-value use of an observable stream is considered the edge case where the overhead of Subscribe/Dispose may overshadow the single OnNext call).

    In post v2.0 RTM builds, we've made some optimizations for those operators to handle the received Task<T> objects directly. IIRC - it's been a while - this provides a 2-3x speedup because we can reuse cancellation token sources across all inner streams, and avoid allocation of extra disposables (in favor of a simple count-based scheme). A refresh build for v2.0 is expected later this fall, primarily aimed at Windows Phone 8 support, but also including a small number of fixes and performance improvements.

    This said, you shouldn't expect the two worlds to get on par for single-value cases. Most of the cost for SelectMany and Merge comes from its internal synchronization over the downstream observer to retain the observer contract and its sequential nature (no concurrent callbacks). In a Task-only world, multiple tasks can go into a completed state simultaneously, with pull-based result retrieval schemes allowing for a lock-free completion of multiple tasks (each time one task completes, scan the list - even if only partially - for others that may have completed too). In our world, we have to guard against the result callbacks coming from different sources, each of which contributes to the output in a 1-to-1 correspondence, requiring a pessimistic approach of grabbing a lock. (Note that there are hypothetical ways around this for limited parts of the pipeline; e.g. a Merge followed by a Where could allow for reordering of operators, therefore allowing concurrent filtering by Where, reducing the lock contention in the Merge phase that's been moved downstream. But, as I said, this doesn't reflect today's implementation and would more likely be a strategy layered on top as part of a query pre-processor/optimizer. Alternatively, a queue-based merge could be implemented, but opening up for a while different set of potential issues such as queue pumping ownership, and potentially unbounded queuing behavior.)


    using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }

    • Edited by Bart De Smet [MSFT] Thursday, October 4, 2012 5:40 AM
    • Marked as answer by thargy Thursday, October 4, 2012 8:15 AM
    Thursday, October 4, 2012 5:33 AM

All replies

  • Hi,

    Well, the async lambda's unnecessary so you could probably avoid having the compiler create an additional method, but other than that I'd say it's about as simple, and perhaps as "performant", as a sequential query is going to get in Rx.  Note that I haven't done any performance testing on this myself, I'm just assuming based on the fact that SelectMany is a core operator and this is pretty succinct:

    observable.SelectMany(DoSomething);

    - Dave


    http://davesexton.com/blog

    Wednesday, October 3, 2012 11:20 PM
  • It's the most optimal way, but it hasn't been optimized :-). Overloads for SelectMany and Merge that operate on a Task<T> are merely tiny wrappers that use FromAsync or ToObservable to convert the Task<T> to an IObservable<T>. There's significant wrapping involved, creation of disposables to support cancellation, and use of AsyncSubject<T> objects. This implementation choice was made based on implementation and test cost (with a compositional approach allowing for reused testing of the building blocks), and because the single-value case for an inner stream is not a primary design point for operators like SelectMany and Merge (or, more broadly speaking, a single-value use of an observable stream is considered the edge case where the overhead of Subscribe/Dispose may overshadow the single OnNext call).

    In post v2.0 RTM builds, we've made some optimizations for those operators to handle the received Task<T> objects directly. IIRC - it's been a while - this provides a 2-3x speedup because we can reuse cancellation token sources across all inner streams, and avoid allocation of extra disposables (in favor of a simple count-based scheme). A refresh build for v2.0 is expected later this fall, primarily aimed at Windows Phone 8 support, but also including a small number of fixes and performance improvements.

    This said, you shouldn't expect the two worlds to get on par for single-value cases. Most of the cost for SelectMany and Merge comes from its internal synchronization over the downstream observer to retain the observer contract and its sequential nature (no concurrent callbacks). In a Task-only world, multiple tasks can go into a completed state simultaneously, with pull-based result retrieval schemes allowing for a lock-free completion of multiple tasks (each time one task completes, scan the list - even if only partially - for others that may have completed too). In our world, we have to guard against the result callbacks coming from different sources, each of which contributes to the output in a 1-to-1 correspondence, requiring a pessimistic approach of grabbing a lock. (Note that there are hypothetical ways around this for limited parts of the pipeline; e.g. a Merge followed by a Where could allow for reordering of operators, therefore allowing concurrent filtering by Where, reducing the lock contention in the Merge phase that's been moved downstream. But, as I said, this doesn't reflect today's implementation and would more likely be a strategy layered on top as part of a query pre-processor/optimizer. Alternatively, a queue-based merge could be implemented, but opening up for a while different set of potential issues such as queue pumping ownership, and potentially unbounded queuing behavior.)


    using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }

    • Edited by Bart De Smet [MSFT] Thursday, October 4, 2012 5:40 AM
    • Marked as answer by thargy Thursday, October 4, 2012 8:15 AM
    Thursday, October 4, 2012 5:33 AM
  • I'm actually doing more than just calling the method (I have to wrap the result and stuff), otherwise I would have just used that syntax, however, if DoSomething returns a Task<> and takes a request and token, this is just a shortcut for writing (request,token) => DoSomething(request,token) anyway.  The async/await combination which is identical to async (request, token) => await DoSomething(request,token) in the simple case (I believe but I'd have to double check the IL).

    thargy

    Craig Dean (MCPD)
    Chief Executive
    www.webappuk.com

    Please consider marking this as the answer if you have been genuinely helped!

    Thursday, October 4, 2012 8:15 AM
  • Thanks Bart,

    That makes complete sense (and I had figured as much by looking into the source code), it's fantastic news to hear you've already revisited this and improved the performance!

    We believe the single call case will be the majority case so it makes sense for us to use TPL for that, as such the Observable version is best implemented as a wrapper.  Although I fully accept what you're saying, with TPL being quite 'established' it is not going to be an uncommon scenario for devs to take this approach - at least initially.  For now I'll await the next refresh as our live deployment isn't until Q2 '13 so if it's probable that a 2-3x improvement will be enough for us, if not I'll just have to write separate code paths at that time.


    thargy

    Craig Dean (MCPD)
    Chief Executive
    www.webappuk.com

    Please consider marking this as the answer if you have been genuinely helped!

    Thursday, October 4, 2012 8:18 AM
  • Btw, the "simple" case of a single await inside an async method doesn't get compiled away. For one thing, the DoSomething method may throw synchronously (which would be not conform the new Task-based Asynchronous Pattern method guidelines) during its invocation. In an asynchronous method, this error gets routed into the returned Task<T>. If this were to get compiled away to a plain return of DoSomething, the behavior has changed, hence the async method builder dance is kept under all circumstances. (Notice there's also no way to hijack a returned task to force an error into it; a new TaskCompletionSource has to be created to perform this wrapping.)

    using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }

    Thursday, October 4, 2012 9:51 PM
  • That's a pain.  To be fair we were largely removing async/awaits all over the place anyway and falling back on TPL.  It seems that for really performance critical work that's always the best option, but that devalues the elegance and simplicity of the code.  I'm a big fan of the new async features in terms of maintainability, and readability - especially for larger teams - but there's a lot of compromises if you're after ultimate performance.

    thargy

    Craig Dean (MCPD)
    Chief Executive
    www.webappuk.com

    Please consider marking this as the answer if you have been genuinely helped!

    Friday, October 5, 2012 9:41 AM