locked
Issue on cold observable - erlang ring problem RRS feed

  • Question

  • I'm having an issue rewriting for Rx 1.1. I was under the assumption that a cold observable starts pushing values only on subscription. Yet the code doesn't seem to behave like I expect:

     

    Enumerable
    .Range(0, 10000)
    .Select(_ => new Subject<int>())
    .Memoize(nodes =>
    {
    	nodes.Buffer(2, 1).Where(o => o.Count == 2).ForEach(o => o[0].Subscribe(o[1]));
    	Observable.Range(0, 10000).Subscribe(nodes.First());
    	
    	return nodes;
    }).Last().Last().Dump();
    

     


    The above crashes with an InvalidOperationException: Sequence contains no elements.

    While introducing a .Delay like so:

     

    Observable.Range(0, 10000).Delay(TimeSpan.FromMilliseconds(100)).Subscribe(nodes.First());
    

     

    Fixes the problem and I get the expected behavior.

    Also, couldn't Memoize be rewritten to accept a Func<IEnumerable/IObservable<T>, TResult> instead? It would still work with inline lambdas but be more flexible?

    Thanks,

    David


    • Edited by David Grenier Sunday, July 10, 2011 1:52 PM using experimental
    Sunday, July 10, 2011 1:48 PM

Answers

  • I figured the problem was that the last subscription (.Dump()) was only registered after all messages were en-route across the ring.

    Enumerable
    	.Range(0, 100)
    	.Select(_ => new Subject<int>())
    	.Memoize(nodes =>
    	{
    		var finish = nodes.Last().Finally(() => Console.WriteLine("Done"));
    		nodes.Buffer(2, 1).Where(o => o.Count == 2).ForEach(o => o[0].Subscribe(o[1]));
    		Observable.Range(0, 90000).Subscribe(nodes.First());
    		
    		return EnumerableEx.Return(finish.IgnoreElements());
    	})
    	.Single()
    	.Dump();
    

    Note that I'd still care to know how to get this working across all cores.

    • Edited by David Grenier Sunday, July 10, 2011 4:40 PM Added code
    • Marked as answer by David Grenier Sunday, July 10, 2011 4:43 PM
    Sunday, July 10, 2011 4:04 PM

All replies

  • Another question, it seems the sample above always run on a single thread. I have tried to find where I could introduce parallelism by allowing this to run on the TaskPool but couldn't find a way.

    Before Rx 1.1, new Subject<_>() accepted a scheduler, but I found out that it only made the sample much slower as it ran in 1minXX instead of 9 secs on a single thread.

    Thanks,

    David


    Sunday, July 10, 2011 1:51 PM
  • I figured the problem was that the last subscription (.Dump()) was only registered after all messages were en-route across the ring.

    Enumerable
    	.Range(0, 100)
    	.Select(_ => new Subject<int>())
    	.Memoize(nodes =>
    	{
    		var finish = nodes.Last().Finally(() => Console.WriteLine("Done"));
    		nodes.Buffer(2, 1).Where(o => o.Count == 2).ForEach(o => o[0].Subscribe(o[1]));
    		Observable.Range(0, 90000).Subscribe(nodes.First());
    		
    		return EnumerableEx.Return(finish.IgnoreElements());
    	})
    	.Single()
    	.Dump();
    

    Note that I'd still care to know how to get this working across all cores.

    • Edited by David Grenier Sunday, July 10, 2011 4:40 PM Added code
    • Marked as answer by David Grenier Sunday, July 10, 2011 4:43 PM
    Sunday, July 10, 2011 4:04 PM
  • Hi David,

    Take a look at §§4.2 and 6.7 in the Rx Design Guidelines.  All Rx operators that generate sequences ensure serial notifications.

    Subject<T> does not prevent concurrency though, unless you use the Subject.Synchronize method explicitly.

    Therefore, to introduce concurrency you'll have to create it yourself at the source.  For example, if you want to support a maximum concurrency of 2 messages, then you'll have to call Observable.Range twice with half of the values in each.  You also must introduce concurrency by explicitly passing in a scheduler.  For example:

    for (int i = 0; i < maxConcurrency; i++)
    {
    	Observable.Range(0, 10000 / maxConcurrency, Scheduler.ThreadPool)...
    }
    

    Also keep in mind that using a Subject<T> for concurrent notifications might cause problems with Rx operators, since the guidelines do state that serializability is a valid assumption when writing operators.

    The general idea is that Rx appears to not be designed for introducing parallelism; instead, it's designed for taming it.

    Edit: I just want to point out that of course Rx is used to model asynchronous queries, so in this respect it's really good at introducing concurrency, perhaps just not at introducing observable parallelism, to make up a phrase ;)

    - Dave


    http://davesexton.com/blog
    • Edited by Dave Sexton Sunday, July 10, 2011 7:57 PM Clarification to last sentence
    Sunday, July 10, 2011 7:53 PM