Parallel Computing Developer Center >
Microsoft Visual Studio 2010 Beta 2 Forums
>
Parallel Computing in C++ and Native Code
>
Using PPL to implement a parallel_remove_copy_if
Using PPL to implement a parallel_remove_copy_if
- I've been thinking about how a remove_copy_if might be parallelised using the PPL.
The best I can come up with is something along these lines.
- Call paralllel_for_each on the source range.
- Use a combinable<std::vector<T>> to store elements which are to be copied, ie. if(!pred(i)) c.local().push_back(i);
- At the end, use combine_each to copy each thread local vector to the output iterator.
This has a number of problems:
- The local vectors will keep reallocating, probably adding more overhead than is saved
- Reallocation can be prevented by using the combinable constructor that initialises thread-local vectors with reserved memory. However, the signature of the function returns a copy of _Ty, invoking the copy constructor. What if vector's copy constructor only allocates the same size() instead of the same capacity()? The capacity is lost, and the optimisation will have no effect.
- If the source data is sorted, the output is not guaranteed to be sorted. Is it possible to guarantee the output would also be in sorted order? I could imagine this being done if each thread is operating on serial ranges, then the combinable object knowing in which order the threads executed their ranges.
Any comments on how parallelisable remove_copy_if is?- Changed TypeStephen Toub - MSFTMSFT, OwnerFriday, July 31, 2009 5:57 PM
Answers
- I've been browsing bits and pieces of the source from the PPL and have come up with a parallel_remove_if:
template<typename Iter> struct iterator_range { Iter begin; Iter end; }; template<typename Iter, typename Predicate> inline Iter parallel_remove_if(Iter begin, Iter end, Predicate pred) { // Get number of processors and create a vector with an iterator range per hardware processing unit static const auto num_processors = CurrentScheduler::Get()->GetNumberOfVirtualProcessors(); if (num_processors == 1) return std::remove_if(begin, end, pred); iterator_range<Iter>* chunk_ranges = (iterator_range<Iter>*)_alloca(sizeof(iterator_range<Iter>) * num_processors); iterator_range<Iter>* const chunk_ranges_end = chunk_ranges + num_processors; //static vector<iterator_range<Iter>> chunk_ranges(num_processors); // Calculate work ranges auto elem_count = std::distance(begin, end); // Number of elements to work on auto elems_per_chunk = elem_count / num_processors; // Elements per hardware processing unit auto spare_elems = elem_count % num_processors; // Remainder elements Iter cursor = begin; // Set the ranges to process for each core auto range = chunk_ranges; do { // Start range from cursor range->begin = cursor; std::advance(cursor, elems_per_chunk); // Add in remainder to initial threads without branching decltype(elems_per_chunk) offset = (spare_elems > 0 ? 1 : 0); std::advance(cursor, offset); spare_elems -= offset; // End the range at the cursor range->end = cursor; // Next range ++range; } while (range != chunk_ranges_end); // Start work LONG volatile counter = -1; auto f = [&counter, &pred, &chunk_ranges] { // Get a range from chunk_ranges to work on iterator_range<Iter>& range = chunk_ranges[InterlockedIncrement(&counter)]; // Perform standard remove_if over this range and store the return iterator back in to the range range.end = std::remove_if(range.begin, range.end, pred); }; typedef task_handle<decltype(f)> task_handle_t; // Allocate stack memory for task handles task_handle_t* tasks = (task_handle_t*)_alloca(sizeof(task_handle_t) * num_processors); task_handle_t* const tasks_end = tasks + num_processors; structured_task_group stg; // Execute tasks for each hardware processing unit do { // Construct a task handle in the memory area (assume no destructor needs to be called?) new (tasks) task_handle_t(f); // Execute the current task stg.run(*tasks); // Advance memory pointer ++tasks; } while (tasks != tasks_end); // While processing get an iterator to the second chunk in preparation for copying down range = chunk_ranges; ++range; // Wait for work over ranges to end stg.wait(); // Merge results in to contiguous stream // Starting with the end of the first chunk's results, copy down each remaining chunk's results to a contiguous output cursor = chunk_ranges->end; do { // If this chunk is already contiguous we can skip copying if (cursor == range->begin) { ++range; continue; } // Copy down chunk for ( ; range->begin != range->end; ++(range->begin), ++cursor) *cursor = *(range->begin); ++range; } while (range != chunk_ranges_end); return cursor; }
The performance seems very good - close to serial performance when using a trivial predicate (with large data sets), but real performance gains are only made when a nontrivial predicate is used (which applies to my situation). I've made a demo project to test the performance here:
http://www.scirra.com/files/parallel_remove_if.zip
My parallel_remove_if, like the standard version, has stable (ordered) output, and results in a contiguous block. It works by splitting the given range in to chunks for each hardware execution unit, calling the standard remove_if on each chunk (via the PPL), then copying down the result blocks to make them contiguous. This copying-down is the main overhead. It effectively negates the performance gains when the predicate is trivial, but is comparitively cheap when the predicate is expensive, improving performance.
Here's an image demonstrating the algorithm:
http://www.scirra.com/files/parallel_remove_if.png- Marked As Answer byrickmolloyMSFT, OwnerWednesday, August 05, 2009 1:15 AM
- Edited byAshleysBrain Monday, August 03, 2009 8:09 PMforgot iterator_range struct from code block
- Edited byAshleysBrain Monday, August 03, 2009 8:10 PM
All Replies
- I've been browsing bits and pieces of the source from the PPL and have come up with a parallel_remove_if:
template<typename Iter> struct iterator_range { Iter begin; Iter end; }; template<typename Iter, typename Predicate> inline Iter parallel_remove_if(Iter begin, Iter end, Predicate pred) { // Get number of processors and create a vector with an iterator range per hardware processing unit static const auto num_processors = CurrentScheduler::Get()->GetNumberOfVirtualProcessors(); if (num_processors == 1) return std::remove_if(begin, end, pred); iterator_range<Iter>* chunk_ranges = (iterator_range<Iter>*)_alloca(sizeof(iterator_range<Iter>) * num_processors); iterator_range<Iter>* const chunk_ranges_end = chunk_ranges + num_processors; //static vector<iterator_range<Iter>> chunk_ranges(num_processors); // Calculate work ranges auto elem_count = std::distance(begin, end); // Number of elements to work on auto elems_per_chunk = elem_count / num_processors; // Elements per hardware processing unit auto spare_elems = elem_count % num_processors; // Remainder elements Iter cursor = begin; // Set the ranges to process for each core auto range = chunk_ranges; do { // Start range from cursor range->begin = cursor; std::advance(cursor, elems_per_chunk); // Add in remainder to initial threads without branching decltype(elems_per_chunk) offset = (spare_elems > 0 ? 1 : 0); std::advance(cursor, offset); spare_elems -= offset; // End the range at the cursor range->end = cursor; // Next range ++range; } while (range != chunk_ranges_end); // Start work LONG volatile counter = -1; auto f = [&counter, &pred, &chunk_ranges] { // Get a range from chunk_ranges to work on iterator_range<Iter>& range = chunk_ranges[InterlockedIncrement(&counter)]; // Perform standard remove_if over this range and store the return iterator back in to the range range.end = std::remove_if(range.begin, range.end, pred); }; typedef task_handle<decltype(f)> task_handle_t; // Allocate stack memory for task handles task_handle_t* tasks = (task_handle_t*)_alloca(sizeof(task_handle_t) * num_processors); task_handle_t* const tasks_end = tasks + num_processors; structured_task_group stg; // Execute tasks for each hardware processing unit do { // Construct a task handle in the memory area (assume no destructor needs to be called?) new (tasks) task_handle_t(f); // Execute the current task stg.run(*tasks); // Advance memory pointer ++tasks; } while (tasks != tasks_end); // While processing get an iterator to the second chunk in preparation for copying down range = chunk_ranges; ++range; // Wait for work over ranges to end stg.wait(); // Merge results in to contiguous stream // Starting with the end of the first chunk's results, copy down each remaining chunk's results to a contiguous output cursor = chunk_ranges->end; do { // If this chunk is already contiguous we can skip copying if (cursor == range->begin) { ++range; continue; } // Copy down chunk for ( ; range->begin != range->end; ++(range->begin), ++cursor) *cursor = *(range->begin); ++range; } while (range != chunk_ranges_end); return cursor; }
The performance seems very good - close to serial performance when using a trivial predicate (with large data sets), but real performance gains are only made when a nontrivial predicate is used (which applies to my situation). I've made a demo project to test the performance here:
http://www.scirra.com/files/parallel_remove_if.zip
My parallel_remove_if, like the standard version, has stable (ordered) output, and results in a contiguous block. It works by splitting the given range in to chunks for each hardware execution unit, calling the standard remove_if on each chunk (via the PPL), then copying down the result blocks to make them contiguous. This copying-down is the main overhead. It effectively negates the performance gains when the predicate is trivial, but is comparitively cheap when the predicate is expensive, improving performance.
Here's an image demonstrating the algorithm:
http://www.scirra.com/files/parallel_remove_if.png- Marked As Answer byrickmolloyMSFT, OwnerWednesday, August 05, 2009 1:15 AM
- Edited byAshleysBrain Monday, August 03, 2009 8:09 PMforgot iterator_range struct from code block
- Edited byAshleysBrain Monday, August 03, 2009 8:10 PM
- That's wonderful Ashley, I have a solution almost done as well, I'll check this out and reply when I'm finished with mine.
Rick Molloy Parallel Computing Platform : http://blogs.msdn.com/nativeconcurrency


