Help me evangelize Rx: Select + async selector lambda
-
jueves, 12 de abril de 2012 18:38
Hi,
In this blog post by ayende, I tried to give an alternate solution to his problem using Rx.
However I need to use Rx in an async fashion that I'm unfamiliar with. I'd want to write this query:
var result = Observable .Range(0, 10 * 1000) .ObserveOn(Scheduler.TaskPool) .SelectAsync(async i => await fetchDataAsync(i)) // IO<R> SelectAsync<T,R>(IO<T> source, Func<T,Task<R>> asyncSelector) .Sum();
The missing piece is of course the SelectAsync operator.
How can I do this?
(Be sure to read Ayende's post, my comment to him and his reply)

Omer Mor
Todas las respuestas
-
viernes, 13 de abril de 2012 2:37
Hi Omer,
SelectMany is the async implementation of Select. Recall that IObservable<T> represents an asynchronous sequence, thus it can also represent a singleton sequence similar to Task<T>; therefore, an overload of SelectMany could easily be defined to accept a Task<T> instead of an IObservable<T>. And as a matter of fact, the Rx team already did that! You'll find this operator in Rx 2.0 Beta.
public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> selector)
Admittedly, I haven't read the blog post yet, but I'm about to now. This was just my first impression of your question.
- Dave
http://davesexton.com/blog
-
viernes, 13 de abril de 2012 3:03
Hi Omer,
It seems that one of Ayende's requirements is sequential processing instead of parallel processing. It's kind of a strange requirement though considering that the tasks are web requests. Fork-join may be a better approach. But perhaps the end point is limited to handling one request at a time, or something ;)
Regardless of the reason, the solution is simple in Rx. Recall that SelectMany behaves like Select -> Merge, so a slight modification to your query will change it to Select -> Concat, which meets Ayende's requirement that items must be processed sequentially, one-at-a-time.
But we're now back to the beginning. We're trying to use Select to project each item into a Task<T>, but we'll need some way of doing that without using SelectMany. Furthermore, we want each Task<T> to be cold, which is quite unlike Task<T>. We also want to wrap the Task<T> into an IObservable<T>.
We can do both simultaneously in Rx 2.0 Beta with the new FromAsync operator, which creates a cold observable that, when subscribed to, executes the specified function to generate a Task<T>.
(Note that if you wanted to convert an async function into a hot observable, you could use the new StartAsync operator.)
For example: (untested)
var result = Observable .Range(0, 10 * 1000) .ObserveOn(TaskPoolScheduler.Default) .Select(i => Observable.FromAsync(() => fetchDataAsync(i))) .Concat() .Sum();
- Dave
- Editado Dave Sexton viernes, 13 de abril de 2012 3:05 Small correction/clarification about StartAsync
- Editado Dave Sexton viernes, 13 de abril de 2012 3:13 Removed unnecessary async/await keywords
- Editado Dave Sexton viernes, 13 de abril de 2012 3:13 Small correction/clarification about StartAsync. Removed unnecessary async/await keywords.
- Marcado como respuesta Omer Mor sábado, 14 de abril de 2012 11:41
-
sábado, 14 de abril de 2012 11:41
Thanks Dave!
I can confirm this query works great.
I posted a new comment on his blog and referred him to this thread for explanations.
I also wrote the following operator which someone might find useful:
public IObservable<R> SelectAsync<T,R>(IObservable<T> source, Func<T,Task<R>> asyncSelector) { return source .Select(value => Observable.FromAsync(() => asyncSelector(value))) .Concat(); }
Omer Mor

