two questions about GetPartitions() and GetDynamicPartitions()

Answered two questions about GetPartitions() and GetDynamicPartitions()

  • Donnerstag, 23. Februar 2012 18:47
     
      Enthält Code

    Hi

    Q1: I've noticed that when implementing GetPartitions() usually some code is used like below:

    public override IList<IEnumerator<T>>	GetPartitions(int partitionCount)
    {
    	var dynamicPartitions = GetDynamicPartitions();
    	var partitions = new IEnumerator<T>[partitionCount];
    
    	for (int i = 0; i < partitionCount; i++)
    	{
    		partitions[i] = dynamicPartitions.GetEnumerator();
    	}
    	return partitions;
    }

    That is we use GetDynamicPartitions() and the enumerator of its returned object to fill the fixed length List<IEnumerator<T>> object. But this way all of the enumerators which are saved into the partitions[] are able to enumerate all of the items in the source collection, not just the items in a specific partition. Hadn't we better to write a separate enumerator than the enumerator in the object returned by GetDynamicPartitions? A distinct enumerator which can enumerate only the items in a specific partition?

    Q2: What is the correct guidelines to use GetPartitions()/GetDynamicPartitions() in a custom class (not by using PLINQ and ForEach)? What number should be passed to GetPartitions()? If we pass a number greater than logical processors count how we are expected to create the Tasks? Also how we are expected to use GetDynamicPartitions()? It seems we should save the returned object and create some Tasks based on the number of the logical processors in a manner like below:

    public void MyParallelForEach(IEnumerable<T> source, Action<T> act)
    {
    	var partitioner = new MyPartitioner(source);
    	var dynamicPartitions = partitioner.GetDynamicPartitions();
    	int numProcs = Environment.ProcessorCount;
    	int taskCount = numProcs;
    	ManualResetEvent mre = new ManualResetEvent(false);
    	
    	for (int i = 0; i < numProcs; i++)
    	{
    		ThreadPool.QueueUserWorkItem((x) =>
    		{
    			foreach(var item in dynamicPartitions)
    			{
    				act(item);
    			}
    			if (Interlocked.Decrement(ref taskCount) == 0)
    				mre.Set();
    		});
    	}
    	mre.WaitOne();
    }

    If this is true, then how Parallel.ForEach() and also Paralle.For() are able to inject more Tasks?

    Thanks for your help


Alle Antworten

  • Dienstag, 28. Februar 2012 20:09
    Besitzer
     
     Beantwortet

    re: Q1

    It depends on your needs.  The implementation you've shown is a good default.  If you wanted a purely static partitioning scheme for both number of partitions and the data in each partition, then sure, you could modify the implementation of GetPartitions to provide that.

    re: Q2

    If you're implementing your own parallelism construct, then presumably you have a good reason to do so, and your scenarios would drive the behavior here.  PLINQ defaults to using Environment.ProcessorCount as the default number of partitions to request of GetPartitions, but allows that to be overridden with WithDegreeOfParallelism.  GetDynamicPartitions would only be relevant to you if you didn't know up front how many tasks you were creating; each task you did create then would request its own partition.

  • Mittwoch, 29. Februar 2012 09:26
     
      Enthält Code

    Thank you Stephen.

    So I guess when using GetDynamicPartitions() it depends to the consumer how many tasks he creates. Because the method returns an IEnumerable<T>, it will not really differ how many Tasks you create. Either you create a static number of Tasks or increase the number of Tasks during processing. GetDynamicPartitions() doesn't care. Its enumerators are just enumerating, no matter how many Tasks are using them.

    I will be happy however if you can guide me to a sample in which we can increase the number of Tasks after calling GetDynamicPartitions. Something which Paralle.For/ForEach probably use. I've heard in another post from you that each Task created by Parallel.For/ForEach creates a replica of itself and the new Task will repeat this again. I will be happy to know how this is implemented.

    I personally wrote a custom Parallel.For named MyParallelFor which uses a dynamic number of Tasks by using a timer which periodically checks the iteration process. If the number of iterations passed so far is not what it is expected, it creates another Task. The implementation is not perfect and might be somehow problematic. But I think it can be improved. I'll be happy to have your opinion.

    void MyParallelFor(int fromInclusive, int toExclusive, Action<int> body)
    {
    	int numProcs = Environment.ProcessorCount;
    	long index = fromInclusive - 1;
    	int interval = 100;		// There's no reason we set the timer period to this fixed number
    						// but I had no idea how to make it dynamic
    	long lastIndex = 0;
    	int totalItems = toExclusive - fromInclusive;
    	int totalTasks = 0;
    	using (ManualResetEvent mre = new ManualResetEvent(false))
    	{
    		Timer timer = null;
    		timer = new Timer((x) =>
    		 {
    			 if (totalTasks > 0)
    			 {
    				 long currentIndex = Interlocked.Read(ref index);
    				 if ((currentIndex - Interlocked.Exchange(ref lastIndex, currentIndex)) < (totalItems / (totalTasks + 1)))
    					 Task.Factory.StartNew(act);
    			 }
    			 timer.Change(interval, Timeout.Infinite);
    		 }, null, interval, Timeout.Infinite);
    		Action act = () =>
    		{
    			long currentIndex;
    			Interlocked.Increment(ref totalTasks);
    			while ((currentIndex = Interlocked.Increment(ref index)) < toExclusive)
    				body((int)currentIndex);
    			
    			try	{ mre.Set(); }
    			catch { }
    		};
    		
    		for (int i = 0; i < numProcs; i++)
    			Task.Factory.StartNew(act);
    
    		mre.WaitOne();
    	}
    }
    Kind Regards
  • Donnerstag, 1. März 2012 02:44
    Besitzer
     
     Beantwortet Enthält Code

    re: "Because the method returns an IEnumerable<T>, it will not really differ how many Tasks you create."

    Don't let the type "IEnumerable<T>" fool you.  GetDynamicPartitioner needs to return something that can hand out an additional partition every time one is asked for.  We represent partitions as IEnumerator<T> instances, and thus we need to give back something that has a GetPartition or GetEnumerator method.  Turns out we already have something of that exact shape: IEnumerable<T>.  So GetDynamicPartitioner returns an IEnumerable<T>, where every call to GetEnumerator returns an IEnumerator<T> representing an additional partition cooperating with every other partition previously returned from the partitioner instance.  How the actual partitioning happens is entirely up to the implementation of the Partitioner<T>-derived type.

    re: "I will be happy however if you can guide me to a sample in which we can increase the number of Tasks after calling GetDynamicPartitions"

    Here's a naive example.  Note that I'm NOT advocating you write your own parallelized ForEach implementation, I'm simply answering your question about how GetDynamicPartitions can be applied:

    static void NaiveParallelForEach<T>(IEnumerable<T> source, Action<T> body)
    {
        var dp = Partitioner.Create(source).GetDynamicPartitions();
        Action worker = null;
        worker = () =>
        {
            bool replicaCreated = false;
            using (var p = dp.GetEnumerator())
            {
                while (p.MoveNext())
                {
                    if (!replicaCreated)
                    {
                        Task.Factory.StartNew(worker, CancellationToken.None, TaskCreationOptions.AttachedToParent, TaskScheduler.Default);
                        replicaCreated = true;
                    }
                    body(p.Current);
                }
            }
        };
        var t = new Task(worker);
        t.RunSynchronously(TaskScheduler.Default);
        t.Wait();
    }

    re: "The implementation is not perfect and might be somehow problematic. But I think it can be improved. I'll be happy to have your opinion"

    Why are you creating your own?  Is this just a learning exercise for you?

  • Donnerstag, 8. März 2012 07:11
     
     

    Thank you Stephen

    Although the code misses how the ForEach() controls the degree of parallelism (if body(p.Current) takes a long time many concurrent task will be created. Unless the TaskScheduler controls degree of parallelism as well), but I know not mentioning the details was for brevity. Now I've some view about what happens in the For()/ForEach() loops.

    Yes. I implement For()/ForEach() for myself just for an exercise. Although I completely understood your explanation about GetDynamicPartitions(), but I've not been convinced about the model or pattern of the partitioner. The discussion will be long. So I'll write my views as an analytic article, hoping that my viewpoint will be helpful.

    Kind Regards