none
What does the new ManySelect operator do?

    Question

  • public static IObservable<TResult> ManySelect<TSource, TResult>(
    	IObservable<TSource> source,
    	Func<IObservable<TSource>, TResult> selector,
    	IScheduler scheduler
    )
    
    What does the new ManySelect operator do? (It is called the comonadic bind operator, but I'm not much into monads or into Haskell samples.)
    Sunday, March 20, 2011 4:21 PM

Answers

  • Yes, First is the comonadic extract.  No, it doesn't work for any scheduler.  Note that comonadic extract doesn't take a scheduler argument and the equivalent overload of ManySelect does the right thing.
    Wednesday, March 23, 2011 3:11 PM

All replies

  • Hi,

    SelectMany calls a user-defined selector function that projects a new observable for each value, and it flattens all of the observables into a new observable sequence.

    ManySelect internally projects a new observable for each value, and it pushes each observable into a user-defined selector function that projects each observable into some result.

    The observable that ManySelect projects into the selector function is actually the same observable sequence as the source, only it starts from the current value. Thus, the first value is projected into an observable sequence that starts with itself, followed by all values that follow it in the source observable. The second value is projected into an observable sequence that starts with itself (not the first value), followed by all values that follow it in the source observable. The third value is projected into an observable sequence that starts with itself (not the first or second values), followed by all values that follow it in the source observable. Etc.

    In other words, ManySelect selects many queries, one for each value in the source; whereas SelectMany selects many projections, one for each value in the source.

    To elaborate: ManySelect provides a view of the sequence starting from the current value, for each value, and expects the selector function to return a query.  SelectMany simply provides each value to the selector function and expects it to return the view.

    I haven't thought of a scenario where ManySelect would be useful. Anyone got an idea?

    Here's a lab to illustrate ManySelect's behavior:

    (Keep in mind that the default scheduler for the projected observables is ThreadPool, but I'm using CurrentThread here to make the output predictable.)

    using System;
    using System.Concurrency;
    using System.Linq;
    
    namespace ReactiveProgrammingConsole
    {
    	class ManySelectLab
    	{
    		static void Main()
    		{
    			var xs = Observable.Range(1, 3).Do(x => Console.WriteLine("Generated: {0}", x));
    
    			var projection = new[] { "A", "B", "C" };
    			int counter = 0;
    
    			var manySelect = xs.ManySelect(ys =>
    				{
    					var x = ++counter;
    
    					return ys.Select(y => new { x, y = projection[y - 1] });
    				},
    				Scheduler.CurrentThread);
    
    			using (manySelect.Concat().Subscribe(
    				x => Console.WriteLine("Observed: {0}", x),
    				() => Console.WriteLine("Completed")))
    			{
    				Console.ReadKey();
    			}
    		}
    	}
    }
    

    Results:

    Generated: 1
    Generated: 2
    Observed: { x = 1, y = A }
    Generated: 3
    Observed: { x = 1, y = B }
    Observed: { x = 1, y = C }
    Observed: { x = 2, y = B }
    Observed: { x = 2, y = C }
    Observed: { x = 3, y = C }
    Completed

    - Dave

    • Edited by Dave Sexton Sunday, March 20, 2011 9:17 PM Additional clarity
    Sunday, March 20, 2011 9:13 PM
  • A simple example would be if you wanted many "aggregations" starting from each notification. 

    Observable.Range(1,10).ManySelect(xs => xs.Sum(), Scheduler.CurrentThread)
    
    

    OUTPUT

    55
    54
    52
    49
    45
    40
    34
    27
    19
    10

    Or you might want to create a rolling window (like buffer with count).

    Observable.Range(1,10).ManySelect(xs => xs.Take(3).ToList(), Scheduler.CurrentThread)

    OUTPUT

    List<Int32> (3 items) 
    1
    2
    3

    List<Int32> (3 items) 
    2
    3
    4

    List<Int32> (3 items) 
    3
    4
    5

    List<Int32> (3 items) 
    4
    5
    6

    List<Int32> (3 items) 
    5
    6
    7

    List<Int32> (3 items) 
    6
    7
    8

    List<Int32> (3 items) 
    7
    8
    9

    List<Int32> (3 items) 
    8
    9
    10

    List<Int32> (2 items) 
    9
    10

    List<Int32> (1 item) 
    10

    Mmmm c o m o n a d..... ;)

    @Wes / Bart - I think default scheduler of thread pool is probably a mistake. I don't think the default behaviour should be to introduce concurrency. (You already know that - I'm just regurgitating your own guidelines)


    James Miles http://enumeratethis.com
    Monday, March 21, 2011 9:36 AM
  • Good question by the way!
    James Miles http://enumeratethis.com
    Monday, March 21, 2011 9:40 AM
  • A "responsive" WindowWithTime operator, however the window opening is in response to a notification on the source rather than a forever ticking timer.

    *update* there is a flaw with my design, concurrency issue.

    public static IObservable<IObservable<T>> ResponsiveWindowWithTime<T>(this IObservable<T> source, TimeSpan interval)
    {
      var published = 0;
      var count = 0;
      return source.ManySelect(xs =>
      {
        count++;
        if (count > published)
        {
          var window = new Subject<T>();
          xs.TakeUntil(Observable.Timer(interval)).Do(x => published++).Subscribe(window);
          return window;
        }
        return null;
      }, Scheduler.CurrentThread).Where(w => w != null);
    }

    James Miles http://enumeratethis.com

    Monday, March 21, 2011 2:36 PM
  • The ManySelect operator is perfect for doing blocking operations.  If you have some operation that takes an observable and blocks then use the ManySelect operator to do it and have the whole thing look asynchronous.  Note, that the blocking still takes place, but on a context where hopefully it is ok to do it.  This is why ThreadPool is the default scheduler for ManySelect.

    • Proposed as answer by Dave Sexton Tuesday, March 22, 2011 8:23 AM
    Tuesday, March 22, 2011 1:26 AM
  • Hi Wes,

    Would you consider "blocking operations" as the main (or only?) use case for ManySelect?


    James Miles http://enumeratethis.com
    Tuesday, March 22, 2011 9:53 AM
  • Hi Wes,

    If ManySelect is comonadic bind then what is comonadic extract? Is that Observable.First?

    It seems to me that if it's Observable.First then it should be possible to call First inside selector of ManySelect on any scheduler. And such call should not lead to deadlock.

    Currently the following piece of code results in deadlock:

    Observable.Range(1, 10).ManySelect(obs => obs.First(), Scheduler.CurrentThread).Subscribe(Console.WriteLine);      
    

    Regards,

    Gregory

    Tuesday, March 22, 2011 10:26 AM
  • Yes, First is the comonadic extract.  No, it doesn't work for any scheduler.  Note that comonadic extract doesn't take a scheduler argument and the equivalent overload of ManySelect does the right thing.
    Wednesday, March 23, 2011 3:11 PM