none
Help me evangelize Rx: Select + async selector lambda RRS feed

  • Question

  • Hi,

    In this blog post by ayende, I tried to give an alternate solution to his problem using Rx.

    However I need to use Rx in an async fashion that I'm unfamiliar with. I'd want to write this query:

    var result = Observable
      .Range(0, 10 * 1000)
      .ObserveOn(Scheduler.TaskPool)
      .SelectAsync(async i => await fetchDataAsync(i)) // IO<R> SelectAsync<T,R>(IO<T> source, Func<T,Task<R>> asyncSelector)
      .Sum();

    The missing piece is of course the SelectAsync operator. 

    How can I do this?

    (Be sure to read Ayende's post, my comment to him and his reply)


    Omer Mor

    Thursday, April 12, 2012 6:38 PM

Answers

  • Hi Omer,

    It seems that one of Ayende's requirements is sequential processing instead of parallel processing.  It's kind of a strange requirement though considering that the tasks are web requests.  Fork-join may be a better approach.  But perhaps the end point is limited to handling one request at a time, or something ;)

    Regardless of the reason, the solution is simple in Rx.  Recall that SelectMany behaves like Select -> Merge, so a slight modification to your query will change it to Select -> Concat, which meets Ayende's requirement that items must be processed sequentially, one-at-a-time.

    But we're now back to the beginning.  We're trying to use Select to project each item into a Task<T>, but we'll need some way of doing that without using SelectMany.  Furthermore, we want each Task<T> to be cold, which is quite unlike Task<T>.  We also want to wrap the Task<T> into an IObservable<T>.

    We can do both simultaneously in Rx 2.0 Beta with the new FromAsync operator, which creates a cold observable that, when subscribed to, executes the specified function to generate a Task<T>.

    (Note that if you wanted to convert an async function into a hot observable, you could use the new StartAsync operator.)

    For example: (untested)

    var result = Observable
    	.Range(0, 10 * 1000)
    	.ObserveOn(TaskPoolScheduler.Default)
    	.Select(i => Observable.FromAsync(() => fetchDataAsync(i)))
    	.Concat()
    	.Sum();

    - Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Friday, April 13, 2012 3:13 AM Small correction/clarification about StartAsync. Removed unnecessary async/await keywords.
    • Marked as answer by Omer Mor Saturday, April 14, 2012 11:41 AM
    Friday, April 13, 2012 3:03 AM

All replies

  • Hi Omer,

    SelectMany is the async implementation of Select.  Recall that IObservable<T> represents an asynchronous sequence, thus it can also represent a singleton sequence similar to Task<T>; therefore, an overload of SelectMany could easily be defined to accept a Task<T> instead of an IObservable<T>.  And as a matter of fact, the Rx team already did that!  You'll find this operator in Rx 2.0 Beta.

    public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> selector)

    Admittedly, I haven't read the blog post yet, but I'm about to now.  This was just my first impression of your question.

    - Dave


    http://davesexton.com/blog

    Friday, April 13, 2012 2:37 AM
  • Hi Omer,

    It seems that one of Ayende's requirements is sequential processing instead of parallel processing.  It's kind of a strange requirement though considering that the tasks are web requests.  Fork-join may be a better approach.  But perhaps the end point is limited to handling one request at a time, or something ;)

    Regardless of the reason, the solution is simple in Rx.  Recall that SelectMany behaves like Select -> Merge, so a slight modification to your query will change it to Select -> Concat, which meets Ayende's requirement that items must be processed sequentially, one-at-a-time.

    But we're now back to the beginning.  We're trying to use Select to project each item into a Task<T>, but we'll need some way of doing that without using SelectMany.  Furthermore, we want each Task<T> to be cold, which is quite unlike Task<T>.  We also want to wrap the Task<T> into an IObservable<T>.

    We can do both simultaneously in Rx 2.0 Beta with the new FromAsync operator, which creates a cold observable that, when subscribed to, executes the specified function to generate a Task<T>.

    (Note that if you wanted to convert an async function into a hot observable, you could use the new StartAsync operator.)

    For example: (untested)

    var result = Observable
    	.Range(0, 10 * 1000)
    	.ObserveOn(TaskPoolScheduler.Default)
    	.Select(i => Observable.FromAsync(() => fetchDataAsync(i)))
    	.Concat()
    	.Sum();

    - Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Friday, April 13, 2012 3:13 AM Small correction/clarification about StartAsync. Removed unnecessary async/await keywords.
    • Marked as answer by Omer Mor Saturday, April 14, 2012 11:41 AM
    Friday, April 13, 2012 3:03 AM
  • Thanks Dave!

    I can confirm this query works great.

    I posted a new comment on his blog and referred him to this thread for explanations.

    I also wrote the following operator which someone might find useful:

    public IObservable<R> SelectAsync<T,R>(IObservable<T> source, Func<T,Task<R>> asyncSelector)
    {
        return source
            .Select(value => Observable.FromAsync(() => asyncSelector(value)))
            .Concat();
    }


    Omer Mor

    Saturday, April 14, 2012 11:41 AM
  • Is there a difference between this and just calling this overload of Concat()?

    public static IObservable<TSource> Concat<TSource>(this IObservable<Task<TSource>> sources);


    Tuesday, January 20, 2015 1:24 PM
  • I think the only difference is in how you get the tasks. Remember that Task<T> is hot, so you're probably going to want to wrap them in FromAsync and you'll end up with the code I've shown anyway; otherwise, Concat would be effectively the same as Merge in terms of concurrency. Of course, Concat differs in that the results remain in the original order though.

    - Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Tuesday, January 20, 2015 4:14 PM
    Tuesday, January 20, 2015 4:13 PM