none
Still puzzled by Select vs. SelectMany

    Question

  • Hello,

    I am trying to work my way through the book "Programming Reactive Extensions and LINQ". I made it to chapter 4 and there's this example that shows how to pipe results of one asynchronous method into another. The code I am playing with now looks like this: 

       class Program
        {
            static void Main(string[] args)
            {
                Program p = new Program();
                var test = p.AddTwoNumbersAsync(5, 10)
                    .Select(aPlusB => p.MultiplyByFIveObservable(aPlusB))
                    .Subscribe((j) =>
                    {
                        Console.WriteLine(j);
                    });
                Console.Read();
            }
    
            IObservable<int> AddTwoNumbersAsync(int a, int b)
            {
                return Observable.Start(() => AddTwoNumbers(a, b));
            }
            IObservable<int> MultiplyByFIveObservable(int x)
            {
                Console.WriteLine(x);
                return Observable.Return(MultiplyByFive(x));
            }
            int MultiplyByFive(int x)
            {
                return x * 5;
            }
    
            int AddTwoNumbers(int a, int b)
            {
                return a + b;
            }
        }

    In the book, the example is more or less the same but differs in the sense that instead of using "Select" in the third line of the Main-method, they use "SelectMany". When SelectMany is used, 75 is written to the console as expected. I however fail to see why this doesn't work by just using "Select". When I use "Select", "System.Reactive.Linq.Observable.Return`1[System.Int32]" is written to the console.

    Moreover, to get a better understanding of Select I tried to make this example work by using select. Up until now this has also failed. I have tried by changing the main-method as followed:

    p.AddTwoNumbersAsync(5, 10)
       .Select(aPlusB => p.MultiplyByFIveObservable(aPlusB))  
       .ForEach(bla => Console.WriteLine(bla));

    So my question is twofold: a) Why is it essential here that "SelectMany" is used instead of "Select".
                                               b) How could I get the same result by using "Select"?

    Wednesday, June 04, 2014 3:35 PM

Answers

  • As a collegue of mine was said, "Think of SelectMany as 'From-one-SelectMany'". Maybe something else, that could help is that in virtually all other functional languages/apis this operator is called FlatMap.

    The difference is quite simple. Select will take one value and transform it to a single value of potentially another type. SelectMany takes a single value and transforms it to a range of values (potentially of another type).

    This is covered in detail here http://introtorx.com/Content/v1.0.10621.0/08_Transformation.html#SelectMany

    To directly answer your question:

    a) Because your method 'MultiplyByFIveObservable' returns an IObservable<int>. If you swap the method call for the scalar version 'MultiplyByFive', then select would be fine. However as you use Select you are effectively saying take a type int and it to an IObservable<int> for each value. This transforms your input from IObservable<int> to IObservable<IObservable<int>>. If you use SelectMany you would be saying, take a value of type int and get an IObservable<int> sequence and project those values as the result of this operator.

    b) Swap your call from 'MultiplyByFIveObservable' to 'MultiplyByFive'

    I think the problem here is that the Example is overly contrived and doesn't actually present a problem that at all justifies SelectMany to solve.

    Consider these two examples, one with good old IEnumerable and the other with Observable sequences.

    This classic nested for loop can be converted to Linq

    for (int i = 0; i < 5; i++)
    {
    	for (int j = 0; j < 3; j++)
    	{
    		Console.WriteLine (i*j);
    	}
    }

    Like this

    Enumerable.Range(0,5)
              .SelectMany(i => Enumerable.Range(0,3).Select (j => i*j))
    	  .ToList()
    	  .ForEach(Console.WriteLine);

    And the exact same thing written a different way

    var query = from i in Enumerable.Range(0,5)
    	    from j in Enumerable.Range(0,3)
    	    select i*j;
    query.ToList().ForEach(Console.WriteLine);

    Hopefully here you can see that for each value 'i', a new sequence of j's are returned. However they are not transformed from int to IEnumerable<int>, they are flattened back as they are produced.

    For an Observable example; imagine you have two feeds. One is a feed of the Twitter accounts you follow, the other is a feed of Tweets from a given account. As you receive a new value from the Following feed, you want to automatically add the tweets from that account into the tweets feed.

    You could write that as either:

    GetMyTwitterAccountsIFollow("@LeeRyanCampbell")
    	.SelectMany(account=>GetTweetsByAccount(account))
    	.Subscribe(tweet=>Console.WriteLine(tweet));
    		
    //Same as 
    var query = from account in GetMyTwitterAccountsIFollow("@LeeRyanCampbell")
    	    from tweet in GetTweetsByAccount(account)
    	    select tweet;
    query.Subscribe(tweet=>Console.WriteLine(tweet));

    See here that we get a single result sequence of all of the tweets, not a sequence of sequences, which would be annoying to workwith, but we could use Merge to solved that anyway.

    Full LinqPad sample code here if it helps:

    void Main()
    {
    	GetMyTwitterAccountsIFollow("@LeeRyanCampbell")
    			.SelectMany(account=>GetTweetsByAccount(account))
    			.Subscribe(tweet=>Console.WriteLine(tweet));
    			
    	//Same as 
    	var query = from account in GetMyTwitterAccountsIFollow("@LeeRyanCampbell")
    				from tweet in GetTweetsByAccount(account)
    				select tweet;
    	query.Subscribe(tweet=>Console.WriteLine(tweet));
    }
    
    // Define other methods and classes here
    public IObservable<string> GetMyTwitterAccountsIFollow(string account)
    {
    	var accounts = new [] {"@ReactiveX", "@headinthebox", "@AdaptiveLimited"};
    	return accounts.ToObservable()
    				   .Zip(Observable.Interval(TimeSpan.FromSeconds(1)), (acc,_)=>acc);
    }
    
    public IObservable<string> GetTweetsByAccount(string account)
    {
    	switch (account)
    	{
    		case "@ReactiveX":
    			return new []{"RxJS v2.2.25 is now updated on NuGet and shortly will be available on CDNJS and jsDelivr",
    						"RxJS v2.2.25 is now out and available on Bower, NPM and Jam: https://github.com/Reactive-Extensions/RxJS/releases/tag/v2.2.25 …",
    						"For those unable to make today's Rx talk at #ndcoslo, here are the slides and demos: https://github.com/Reactive-Extensions/NDC-Oslo-2014 …"
    			}.ToObservable()
    			 .Zip(Observable.Interval(TimeSpan.FromSeconds(1)), (tweet,_)=>account + " - " + tweet);
    			
    		case "@headinthebox":
    			return new []{"You can sign up *now* for FP101x https://www.edx.org/course/delftx/delftx-fp101x-introduction-functional-2126#.U48mSJS1Z7Q …",
    						"Public Service Announcement: To sound sophisticated, from now I will start calling functions of type `Boolean => T`` *binomial functions*.",
    						"Last time I had a recruiter spamming me must have been > 2 years ago. Guess doing FP helps with that. so, learn it!"
    			}.ToObservable()
    			 .Zip(Observable.Interval(TimeSpan.FromSeconds(1)), (tweet,_)=>account + " - " + tweet);
    		case "@AdaptiveLimited":
    			return new []{"Checkout our sample Reactive Web App using #TypeScript, #RxJS and #WebSocket, it's now live on #Azure: http://buff.ly/1mPv8PV  Please RT #OSS",
    						"Last early bird ticket left for @LeeRyanCampbell Reactive Extensions training in London July 7-8th! Please RT @ReactiveX",						
    						"Blogged: Everything is a Stream - http://buff.ly/RlZETY  - a deeper dive into our open sourced Reactive Trader - http://buff.ly/RlZETZ ",
    			}.ToObservable()
    			 .Zip(Observable.Interval(TimeSpan.FromSeconds(1)), (tweet,_)=>account + " - " + tweet);		
    	}
    	return Observable.Empty<string>();
    }


    Lee Campbell http://LeeCampbell.blogspot.com

    • Marked as answer by DuffmanBE Thursday, June 05, 2014 11:29 AM
    Thursday, June 05, 2014 7:20 AM

All replies

  • As a collegue of mine was said, "Think of SelectMany as 'From-one-SelectMany'". Maybe something else, that could help is that in virtually all other functional languages/apis this operator is called FlatMap.

    The difference is quite simple. Select will take one value and transform it to a single value of potentially another type. SelectMany takes a single value and transforms it to a range of values (potentially of another type).

    This is covered in detail here http://introtorx.com/Content/v1.0.10621.0/08_Transformation.html#SelectMany

    To directly answer your question:

    a) Because your method 'MultiplyByFIveObservable' returns an IObservable<int>. If you swap the method call for the scalar version 'MultiplyByFive', then select would be fine. However as you use Select you are effectively saying take a type int and it to an IObservable<int> for each value. This transforms your input from IObservable<int> to IObservable<IObservable<int>>. If you use SelectMany you would be saying, take a value of type int and get an IObservable<int> sequence and project those values as the result of this operator.

    b) Swap your call from 'MultiplyByFIveObservable' to 'MultiplyByFive'

    I think the problem here is that the Example is overly contrived and doesn't actually present a problem that at all justifies SelectMany to solve.

    Consider these two examples, one with good old IEnumerable and the other with Observable sequences.

    This classic nested for loop can be converted to Linq

    for (int i = 0; i < 5; i++)
    {
    	for (int j = 0; j < 3; j++)
    	{
    		Console.WriteLine (i*j);
    	}
    }

    Like this

    Enumerable.Range(0,5)
              .SelectMany(i => Enumerable.Range(0,3).Select (j => i*j))
    	  .ToList()
    	  .ForEach(Console.WriteLine);

    And the exact same thing written a different way

    var query = from i in Enumerable.Range(0,5)
    	    from j in Enumerable.Range(0,3)
    	    select i*j;
    query.ToList().ForEach(Console.WriteLine);

    Hopefully here you can see that for each value 'i', a new sequence of j's are returned. However they are not transformed from int to IEnumerable<int>, they are flattened back as they are produced.

    For an Observable example; imagine you have two feeds. One is a feed of the Twitter accounts you follow, the other is a feed of Tweets from a given account. As you receive a new value from the Following feed, you want to automatically add the tweets from that account into the tweets feed.

    You could write that as either:

    GetMyTwitterAccountsIFollow("@LeeRyanCampbell")
    	.SelectMany(account=>GetTweetsByAccount(account))
    	.Subscribe(tweet=>Console.WriteLine(tweet));
    		
    //Same as 
    var query = from account in GetMyTwitterAccountsIFollow("@LeeRyanCampbell")
    	    from tweet in GetTweetsByAccount(account)
    	    select tweet;
    query.Subscribe(tweet=>Console.WriteLine(tweet));

    See here that we get a single result sequence of all of the tweets, not a sequence of sequences, which would be annoying to workwith, but we could use Merge to solved that anyway.

    Full LinqPad sample code here if it helps:

    void Main()
    {
    	GetMyTwitterAccountsIFollow("@LeeRyanCampbell")
    			.SelectMany(account=>GetTweetsByAccount(account))
    			.Subscribe(tweet=>Console.WriteLine(tweet));
    			
    	//Same as 
    	var query = from account in GetMyTwitterAccountsIFollow("@LeeRyanCampbell")
    				from tweet in GetTweetsByAccount(account)
    				select tweet;
    	query.Subscribe(tweet=>Console.WriteLine(tweet));
    }
    
    // Define other methods and classes here
    public IObservable<string> GetMyTwitterAccountsIFollow(string account)
    {
    	var accounts = new [] {"@ReactiveX", "@headinthebox", "@AdaptiveLimited"};
    	return accounts.ToObservable()
    				   .Zip(Observable.Interval(TimeSpan.FromSeconds(1)), (acc,_)=>acc);
    }
    
    public IObservable<string> GetTweetsByAccount(string account)
    {
    	switch (account)
    	{
    		case "@ReactiveX":
    			return new []{"RxJS v2.2.25 is now updated on NuGet and shortly will be available on CDNJS and jsDelivr",
    						"RxJS v2.2.25 is now out and available on Bower, NPM and Jam: https://github.com/Reactive-Extensions/RxJS/releases/tag/v2.2.25 …",
    						"For those unable to make today's Rx talk at #ndcoslo, here are the slides and demos: https://github.com/Reactive-Extensions/NDC-Oslo-2014 …"
    			}.ToObservable()
    			 .Zip(Observable.Interval(TimeSpan.FromSeconds(1)), (tweet,_)=>account + " - " + tweet);
    			
    		case "@headinthebox":
    			return new []{"You can sign up *now* for FP101x https://www.edx.org/course/delftx/delftx-fp101x-introduction-functional-2126#.U48mSJS1Z7Q …",
    						"Public Service Announcement: To sound sophisticated, from now I will start calling functions of type `Boolean => T`` *binomial functions*.",
    						"Last time I had a recruiter spamming me must have been > 2 years ago. Guess doing FP helps with that. so, learn it!"
    			}.ToObservable()
    			 .Zip(Observable.Interval(TimeSpan.FromSeconds(1)), (tweet,_)=>account + " - " + tweet);
    		case "@AdaptiveLimited":
    			return new []{"Checkout our sample Reactive Web App using #TypeScript, #RxJS and #WebSocket, it's now live on #Azure: http://buff.ly/1mPv8PV  Please RT #OSS",
    						"Last early bird ticket left for @LeeRyanCampbell Reactive Extensions training in London July 7-8th! Please RT @ReactiveX",						
    						"Blogged: Everything is a Stream - http://buff.ly/RlZETY  - a deeper dive into our open sourced Reactive Trader - http://buff.ly/RlZETZ ",
    			}.ToObservable()
    			 .Zip(Observable.Interval(TimeSpan.FromSeconds(1)), (tweet,_)=>account + " - " + tweet);		
    	}
    	return Observable.Empty<string>();
    }


    Lee Campbell http://LeeCampbell.blogspot.com

    • Marked as answer by DuffmanBE Thursday, June 05, 2014 11:29 AM
    Thursday, June 05, 2014 7:20 AM
  • Dear Lee,

    Thank you for your answer. The concepts are much clearer to me now.
    I also managed to make the example work while using Select & MultiplyByFiveObservable:

    p.AddTwoNumbersAsync(5, 10)
          .Select(aPlusB => p.MultiplyByFIveObservable(aPlusB))
          .Subscribe((Action<IObservable<int>>)((IObservable<int> x) => 
    x.Subscribe(y =>
    Console.WriteLine(y))));

    Thanks again for your knowledge, now I can finally continue with the rest of the book.


    • Edited by DuffmanBE Thursday, June 05, 2014 11:53 AM formatting
    Thursday, June 05, 2014 11:52 AM
  • Great stuff, glad to help.

    I would suggest avoiding doing what you have here. It is great that you have found a way to get it to work and that is all part of the learning experience, however, avoid this subscribe-inside-a-subscriber pattern. You will find that you end up loosing the ability to deterministically manage your resources (subscriptions in this case). You can force a resource management pattern in there but you end up writing spaghetti code when an appropriate usage of operators is preferable.

    Remember, when you have finished reading "Programming Reactive Extensions and LINQ", make sure to read the REAL book on Rx - "Introduction to Rx", you know the 5star rated one.

    ;-)

    Amazon - http://www.amazon.com/Introduction-Rx-Lee-Campbell-ebook/dp/B008GM3YPM/ref=sr_1_1?ie=UTF8&qid=1401971452&sr=8-1&keywords=Introduction+to+Rx

    Amazon UK - http://www.amazon.co.uk/Introduction-Rx-Lee-Campbell-ebook/dp/B008GM3YPM/ref=sr_1_1?ie=UTF8&qid=1401971429&sr=8-1&keywords=Introduction+to+rx

    Or the Free website version at IntroToRx.com

    (Yes I am the author)

    Lee


    Lee Campbell http://LeeCampbell.blogspot.com

    Thursday, June 05, 2014 12:35 PM