none
Parallel iteration of observable using PLINQ ForAll

    Question

  • The following example locks up after 8 iterations:

    class Program
    {

        static void Main(string[] args)
        {
            var subject = new BehaviorSubject<int>(0);

            subject
                .ToEnumerable()
                .AsParallel()
                .WithDegreeOfParallelism(1)
                .ForAll
                (
                    i =>
                    {
                        Debug.WriteLine($"{i} {Thread.CurrentThread.ManagedThreadId}");
                        subject.OnNext(i+1);
                    }
                );

    }

    I am using System.Reactive v3.1.1. If I remove the PLINQ statements and replace them with a simple System.Interactive ForEach call, then the bug disappears. It also appears that the number of iterations is 8 times the degree of parallelism, e.g for 3, the program iterates 24 times.

    Also, I am baffled as to why the recursive nature of the loop does not cause a stack overflow.

    I have tried a number of different modifications to the code and I think I can actually rule out any Reactive involvement in the bug. Below are a couple of implementations that fail. One uses PLINQ and one uses TPL. Both fail but with a different number of iterations per thread used. Both examples use the common enumerable provided.

    Common Enumerable (not-threadsafe):

    public class SimpleEnumerable<T>: IEnumerable<T>
    {
    	private T _value;
    	private readonly AutoResetEvent _releaseValueEvent = new AutoResetEvent(false);
    
    	IEnumerator IEnumerable.GetEnumerator()
    	{
    		return GetEnumerator();
    	}
    
    	public IEnumerator<T> GetEnumerator()
    	{
    		while(true)
    		{
    			_releaseValueEvent.WaitOne();
    			yield return _value;
    		}
    	}
    
    	public void OnNext(T value)
    	{
    		_value = value;
    		_releaseValueEvent.Set();
    	}
    }

    Common Enumerable (threadsafe):

    public class SimpleEnumerable<T>: IEnumerable<T>
    {
    	private readonly BlockingCollection<T> _blockingCollection = new BlockingCollection<T>();
    
    	IEnumerator IEnumerable.GetEnumerator()
    	{
    		return GetEnumerator();
    	}
    
    	public IEnumerator<T> GetEnumerator()
    	{
    		while(true)
    		{
    			yield return _blockingCollection.Take();
    		}
    	}
    
    	public void OnNext(T value)
    	{
    		_blockingCollection.Add(value);
    	}
    }

    PLINQ Example:

    public static void Main(string[] args)
    {
    	var enumerable = new SimpleEnumerable<int>();
    	enumerable.OnNext(0);
    
    	enumerable
    		.Do(i => Debug.WriteLine($"{i} {Thread.CurrentThread.ManagedThreadId}"))
    		.AsParallel()
    		.WithDegreeOfParallelism(1)
    		.ForEach
    		(
    			i =>
    			{
    				Debug.WriteLine($"{i} {Thread.CurrentThread.ManagedThreadId}");
    				enumerable.OnNext(i+1);
    			}
    		);
    }

    TPL Example:

    public static void Main(string[] args)
    {
    	var enumerable = new SimpleEnumerable<int>();
    	enumerable.OnNext(0);
    
    	Parallel.ForEach
    	(
    		enumerable,
    		new ParallelOptions { MaxDegreeOfParallelism = 1},
    		i =>
    		{
    			Debug.WriteLine($"{i} {Thread.CurrentThread.ManagedThreadId}");
    			enumerable.OnNext(i+1);
    		}
    	);
    }

    Base on my analysis of the callstack, it appears that there is a deadlock that occurs in a partitioner related method both in PLINQ and TPL, but I am not sure how to interpret this.


    • Edited by Tom Cap Thursday, November 24, 2016 5:11 PM
    Thursday, November 24, 2016 3:04 AM