none
Parallel execution of Subscribe delegate?

    Question

  • Hi everyone,

    I'm very new to Rx, and I'm probably overlooking something obvious.  I'm trying to take an IEnumerable, convert it to an IObservable, and attach a handler to be run in parallel for that IEnumerable.  The reason I'd like to use Rx instead of just going with the normal Parallel.ForEach approach is that the IEnumerable will continually have new items placed in it, but that's further down the line.  For now, I'd be happy with getting the following code to actually execute in parallel.  Any suggestions?

    	public static class Program
    	{
    		public static void Main(string[] args)
    		{
    			Enumerable.Range(0, 100)
    				.AsParallel()
    				.ToObservable(Scheduler.ThreadPool)
    				.Subscribe(i => DoWork(i));
    
    			Console.ReadKey();
    		}
    
    		private static void DoWork(int i)
    		{
    			Console.WriteLine("Executing for {0}...", i);
    
    			Thread.Sleep(TimeSpan.FromSeconds(1));
    		}
    	}
    



    Friday, July 1, 2011 9:47 PM

All replies

  • Hi Matt,

    PLINQ is actually more appropriate than Rx for parallelization of this kind; i.e., starting with an IEnumerable<T> and needing to process its elements as fast as possible with regards to the number of threads and processors available, and perhaps other system resources, and choosing a partitioning scheme that fits well with the kind of data being enumerated.  Rx doesn't take these things into consideration (AFAIK).

    > The reason I'd like to use Rx instead of just going with the normal Parallel.ForEach approach is that the IEnumerable will
    > continually have new items placed in it.

    That's alright, PLINQ can handle it just fine.  If you use an iterator block, then I think it'll use chunk partitioning.

    static void Main(string[] args)
    {
    	foreach (var value in MyRange().AsParallel())
    	{
    		DoWork(value);
    	}
    }
    
    private IEnumerable<long> MyRange()
    {
    	while (true)
    	{
    		yield return DateTime.Now.TimeOfDay.Ticks;
    	}
    }
    

    In the future, if you have an IObservable<T> to begin with instead of an IEnumerable<T>, then you might even be better off calling .ToEnumerable().AsParallel(), although keep in mind that you'll want to run the foreach on a different thread than the one pushing values into the observable; otherwise, it'll deadlock.

    As an alternative, we have a Serve operator in Rxx that parallelizes calls to OnNext; however, it's not nearly as sophisticated as PLINQ.  It's more of a blunt object for servers, hence the name.

    http://rxx.codeplex.com/SourceControl/changeset/view/61716#1055694

    - Dave


    http://davesexton.com/blog
    Saturday, July 2, 2011 12:53 AM
  • Thanks for the feedback, Dave, I'll check out the Serve operator.  The example I gave was pretty contrived, but long-term, the collection of things to process could actually be changed by the callbacks that are subscribed to the collection.  So, my example should really have looked like: 
    public static class Program
    {
    	private static IEnumerable<int> _workItems;
    	
    	public static void Main(string[] args)
    	{
    		//TODO: Use one of the thread-safe collections from .NET 4.0
    		_workItems = Enumerable.Range(0, 100);
    		
    		_workItems.AsParallel()
    			.ToObservable(Scheduler.ThreadPool)
    			.Subscribe(i => DoWork(i));
    
    		Console.ReadKey();
    	}
    
    	private static void DoWork(int i)
    	{
    		Console.WriteLine("Executing for {0}...", i);
    		
    		if (/* some criteria to determine if we need to add to the queue */)
    		{
    			_workItems.Add(/* some new number to process */);
    		}
    
    		Thread.Sleep(TimeSpan.FromSeconds(1));
    	}
    }
    

    This is where I thought Rx would be a better fit.  The _workItems would be growing over time as the result of the method subscribed to the _workItems.  It could also be modified from other places as well, so it's not like I can just do a Parallel.ForEach over _workItems once and be done.  The collection could become non-empty again later on. 

    Thanks again for the feedback!

    Saturday, July 2, 2011 4:51 PM
  • Hi,

    The example iterator block in my previous response still applies; however, instead of returning the current date-time you'd have to block to wait for more data from somewhere else.

    The producer-consumer pattern seems applicable so you could probably use a BlockingCollection<T>.  Then you can form your query as follows: (Untested)

    private static BlockingCollection<int> _workItems = new BlockingCollection<int>();
    	
    public static void Main(string[] args)
    {
    	foreach (var value in Enumerable.Range(0, 100))
    	{
    		_workItems.Add(value);
    	}
    	
    	foreach (var value in _workItems.GetConsumingEnumerable().AsParallel())
    	{
    		DoWork(value);
    	}
    
    	Console.ReadKey();
    }
    
    private static void DoWork(int i)
    {
    	Console.WriteLine("Executing for {0}...", i);
    	
    	if (/* some criteria to determine if we need to add to the queue */)
    	{
    		_workItems.Add(/* some new number to process */);
    	}
    }
    

    - Dave


    http://davesexton.com/blog
    Saturday, July 2, 2011 6:42 PM
  • Hi,

    > [snip] This is where I thought Rx would be a better fit. The _workItems would be growing over time as the result
    > of the method subscribed to the _workItems.

    Your requirement is definitely reactive in nature, although if you're looking for parallelization then you'll need to use PLINQ.  Rx isn't about parallelizing work across multiple threads; i.e., introducing concurrency into observations.  It's about composing queries reactively over data, and one of its strongest guarantees is that your observers will never be called in parallel.

    In your particular scenario, it seems that parallelization is what's important to you - having multiple threads "DoWork" concurrently - rather than simply reacting to new data on a single thread like Rx typically does (albeit a scheduled thread that is often concurrent with respect to the subscriber).

    - Dave


    http://davesexton.com/blog
    Saturday, July 2, 2011 6:51 PM
  • Hi,

    Just found this interesting article:

    ParallelExtensionsExtras Tour - #4 - BlockingCollectionExtensions
    http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx

    - Dave


    http://davesexton.com/blog
    Saturday, July 2, 2011 6:55 PM