Ask a questionAsk a question
 

AnswerUsing PPL to implement a parallel_remove_copy_if

  • Friday, July 31, 2009 5:25 PMAshleysBrain Users MedalsUsers MedalsUsers MedalsUsers MedalsUsers Medals
     
    I've been thinking about how a remove_copy_if might be parallelised using the PPL.

    The best I can come up with is something along these lines.
    - Call paralllel_for_each on the source range.
    - Use a combinable<std::vector<T>> to store elements which are to be copied, ie. if(!pred(i)) c.local().push_back(i);
    - At the end, use combine_each to copy each thread local vector to the output iterator.

    This has a number of problems:
    - The local vectors will keep reallocating, probably adding more overhead than is saved
    - Reallocation can be prevented by using the combinable constructor that initialises thread-local vectors with reserved memory.  However, the signature of the function returns a copy of _Ty, invoking the copy constructor.  What if vector's copy constructor only allocates the same size() instead of the same capacity()?  The capacity is lost, and the optimisation will have no effect.
    - If the source data is sorted, the output is not guaranteed to be sorted.  Is it possible to guarantee the output would also be in sorted order?  I could imagine this being done if each thread is operating on serial ranges, then the combinable object knowing in which order the threads executed their ranges.

    Any comments on how parallelisable remove_copy_if is?

Answers

  • Monday, August 03, 2009 8:06 PMAshleysBrain Users MedalsUsers MedalsUsers MedalsUsers MedalsUsers Medals
     AnswerHas Code
    I've been browsing bits and pieces of the source from the PPL and have come up with a parallel_remove_if:

    template<typename Iter>
    struct iterator_range {
    	Iter begin;
    	Iter end;
    };
    
    template<typename Iter, typename Predicate>
    inline Iter parallel_remove_if(Iter begin, Iter end, Predicate pred)
    {
    	// Get number of processors and create a vector with an iterator range per hardware processing unit
    	static const auto num_processors = CurrentScheduler::Get()->GetNumberOfVirtualProcessors();
    
    	if (num_processors == 1) return std::remove_if(begin, end, pred);
    
    	iterator_range<Iter>* chunk_ranges = (iterator_range<Iter>*)_alloca(sizeof(iterator_range<Iter>) * num_processors);
    	iterator_range<Iter>* const chunk_ranges_end = chunk_ranges + num_processors;
    
    	//static vector<iterator_range<Iter>> chunk_ranges(num_processors);
    
    	// Calculate work ranges
    	auto elem_count = std::distance(begin, end);				// Number of elements to work on
    	auto elems_per_chunk = elem_count / num_processors;			// Elements per hardware processing unit
    	auto spare_elems = elem_count % num_processors;				// Remainder elements
    
    	Iter cursor = begin;
    
    	// Set the ranges to process for each core
    	auto range = chunk_ranges;
    
    	do {
    		// Start range from cursor
    		range->begin = cursor;
    
    		std::advance(cursor, elems_per_chunk);
    
    		// Add in remainder to initial threads without branching
    		decltype(elems_per_chunk) offset = (spare_elems > 0 ? 1 : 0);
    		std::advance(cursor, offset);
    		spare_elems -= offset;
    
    		// End the range at the cursor
    		range->end = cursor;
    
    		// Next range
    		++range;
    	} while (range != chunk_ranges_end);
    
    	// Start work
    	LONG volatile counter = -1;
    
    	auto f = [&counter, &pred, &chunk_ranges] {
    			// Get a range from chunk_ranges to work on
    			iterator_range<Iter>& range = chunk_ranges[InterlockedIncrement(&counter)];
    
    			// Perform standard remove_if over this range and store the return iterator back in to the range
    			range.end = std::remove_if(range.begin, range.end, pred);
    		};
    
    	typedef task_handle<decltype(f)> task_handle_t;
    
    	// Allocate stack memory for task handles
    	task_handle_t* tasks = (task_handle_t*)_alloca(sizeof(task_handle_t) * num_processors);
    	task_handle_t* const tasks_end = tasks + num_processors;
    
    	structured_task_group stg;
    
    	// Execute tasks for each hardware processing unit
    	do {
    		// Construct a task handle in the memory area (assume no destructor needs to be called?)
    		new (tasks) task_handle_t(f);
    
    		// Execute the current task
    		stg.run(*tasks);
    
    		// Advance memory pointer
    		++tasks;
    
    	} while (tasks != tasks_end);
    
    	// While processing get an iterator to the second chunk in preparation for copying down
    	range = chunk_ranges;
    	++range;
    
    	// Wait for work over ranges to end
    	stg.wait();
    
    	// Merge results in to contiguous stream
    	// Starting with the end of the first chunk's results, copy down each remaining chunk's results to a contiguous output
    	cursor = chunk_ranges->end;
    	
    	do {
    		// If this chunk is already contiguous we can skip copying
    		if (cursor == range->begin) {
    			++range;
    			continue;
    		}
    
    		// Copy down chunk
    		for ( ; range->begin != range->end; ++(range->begin), ++cursor)
    			*cursor = *(range->begin);
    		++range;
    	} while (range != chunk_ranges_end);
    
    	return cursor;
    }
    


    The performance seems very good - close to serial performance when using a trivial predicate (with large data sets), but real performance gains are only made when a nontrivial predicate is used (which applies to my situation).  I've made a demo project to test the performance here:
    http://www.scirra.com/files/parallel_remove_if.zip
    My parallel_remove_if, like the standard version, has stable (ordered) output, and results in a contiguous block.  It works by splitting the given range in to chunks for each hardware execution unit, calling the standard remove_if on each chunk (via the PPL), then copying down the result blocks to make them contiguous.  This copying-down is the main overhead.  It effectively negates the performance gains when the predicate is trivial, but is comparitively cheap when the predicate is expensive, improving performance.

    Here's an image demonstrating the algorithm:
    http://www.scirra.com/files/parallel_remove_if.png

All Replies

  • Monday, August 03, 2009 8:06 PMAshleysBrain Users MedalsUsers MedalsUsers MedalsUsers MedalsUsers Medals
     AnswerHas Code
    I've been browsing bits and pieces of the source from the PPL and have come up with a parallel_remove_if:

    template<typename Iter>
    struct iterator_range {
    	Iter begin;
    	Iter end;
    };
    
    template<typename Iter, typename Predicate>
    inline Iter parallel_remove_if(Iter begin, Iter end, Predicate pred)
    {
    	// Get number of processors and create a vector with an iterator range per hardware processing unit
    	static const auto num_processors = CurrentScheduler::Get()->GetNumberOfVirtualProcessors();
    
    	if (num_processors == 1) return std::remove_if(begin, end, pred);
    
    	iterator_range<Iter>* chunk_ranges = (iterator_range<Iter>*)_alloca(sizeof(iterator_range<Iter>) * num_processors);
    	iterator_range<Iter>* const chunk_ranges_end = chunk_ranges + num_processors;
    
    	//static vector<iterator_range<Iter>> chunk_ranges(num_processors);
    
    	// Calculate work ranges
    	auto elem_count = std::distance(begin, end);				// Number of elements to work on
    	auto elems_per_chunk = elem_count / num_processors;			// Elements per hardware processing unit
    	auto spare_elems = elem_count % num_processors;				// Remainder elements
    
    	Iter cursor = begin;
    
    	// Set the ranges to process for each core
    	auto range = chunk_ranges;
    
    	do {
    		// Start range from cursor
    		range->begin = cursor;
    
    		std::advance(cursor, elems_per_chunk);
    
    		// Add in remainder to initial threads without branching
    		decltype(elems_per_chunk) offset = (spare_elems > 0 ? 1 : 0);
    		std::advance(cursor, offset);
    		spare_elems -= offset;
    
    		// End the range at the cursor
    		range->end = cursor;
    
    		// Next range
    		++range;
    	} while (range != chunk_ranges_end);
    
    	// Start work
    	LONG volatile counter = -1;
    
    	auto f = [&counter, &pred, &chunk_ranges] {
    			// Get a range from chunk_ranges to work on
    			iterator_range<Iter>& range = chunk_ranges[InterlockedIncrement(&counter)];
    
    			// Perform standard remove_if over this range and store the return iterator back in to the range
    			range.end = std::remove_if(range.begin, range.end, pred);
    		};
    
    	typedef task_handle<decltype(f)> task_handle_t;
    
    	// Allocate stack memory for task handles
    	task_handle_t* tasks = (task_handle_t*)_alloca(sizeof(task_handle_t) * num_processors);
    	task_handle_t* const tasks_end = tasks + num_processors;
    
    	structured_task_group stg;
    
    	// Execute tasks for each hardware processing unit
    	do {
    		// Construct a task handle in the memory area (assume no destructor needs to be called?)
    		new (tasks) task_handle_t(f);
    
    		// Execute the current task
    		stg.run(*tasks);
    
    		// Advance memory pointer
    		++tasks;
    
    	} while (tasks != tasks_end);
    
    	// While processing get an iterator to the second chunk in preparation for copying down
    	range = chunk_ranges;
    	++range;
    
    	// Wait for work over ranges to end
    	stg.wait();
    
    	// Merge results in to contiguous stream
    	// Starting with the end of the first chunk's results, copy down each remaining chunk's results to a contiguous output
    	cursor = chunk_ranges->end;
    	
    	do {
    		// If this chunk is already contiguous we can skip copying
    		if (cursor == range->begin) {
    			++range;
    			continue;
    		}
    
    		// Copy down chunk
    		for ( ; range->begin != range->end; ++(range->begin), ++cursor)
    			*cursor = *(range->begin);
    		++range;
    	} while (range != chunk_ranges_end);
    
    	return cursor;
    }
    


    The performance seems very good - close to serial performance when using a trivial predicate (with large data sets), but real performance gains are only made when a nontrivial predicate is used (which applies to my situation).  I've made a demo project to test the performance here:
    http://www.scirra.com/files/parallel_remove_if.zip
    My parallel_remove_if, like the standard version, has stable (ordered) output, and results in a contiguous block.  It works by splitting the given range in to chunks for each hardware execution unit, calling the standard remove_if on each chunk (via the PPL), then copying down the result blocks to make them contiguous.  This copying-down is the main overhead.  It effectively negates the performance gains when the predicate is trivial, but is comparitively cheap when the predicate is expensive, improving performance.

    Here's an image demonstrating the algorithm:
    http://www.scirra.com/files/parallel_remove_if.png
  • Wednesday, August 05, 2009 1:16 AMrickmolloyMSFT, OwnerUsers MedalsUsers MedalsUsers MedalsUsers MedalsUsers Medals
     
    That's wonderful Ashley, I have a solution almost done as well, I'll check this out and reply when I'm finished with mine.
    Rick Molloy Parallel Computing Platform : http://blogs.msdn.com/nativeconcurrency