Prioritized message buffer type suggestion
- After playing a bit with PPL in Visual Studio 2010 Beta a following suggestion came to my mind:
Why not introduce a new type of message buffer, that will allow senders to change the default order of messages by supplying each message with a priority. Recipients, connecting their targets to such message buffer and calling receive will get messages sorted by their priority.- Moved byStephen Toub - MSFTMSFT, OwnerThursday, May 28, 2009 12:27 AM (From:Parallel Computing in C++ and Native Code)
- Changed TypeStephen Toub - MSFTMSFT, OwnerFriday, May 29, 2009 2:55 PM
Answers
- Thanks for sharing the detailed scenario, this helps a lot. If you're willing to share the code you're using we'd certainly take a look at it. I'm particularly interested in seeing the > 3700 threads scenario.
When I built a 'find in files' example the approach I took was to use a task_group to recursive search for files and to spawn an additional task with task_group::run everytime I found a directory. When I found a file that was of the right type to check, I used 'asend' to send a message asynchronously (are you using asend with your synchronous call or send?), as far as number of finder tasks to spawn goes, this may depend on the hardware. I would definitely use message blocks to pipeline the compute intensive work here though.
Regarding _Get_num_chunks; this is an implementation detail and we'd like to have flexibility to remove it, there are supported APIs like Concurrency::GetProcessorCount() which are defined in <concrtrm.h>.
Regarding the priority queue, I think the "correct" solution would be to use a priority queue as you've alluded to. If you wanted to build a custom prioritized unbounded_buffer, I think the easiest path would be to copy the unbounded_buffer code and look at replacing _M_messageBuffer member with a threadsafe priority queue that implicity knew how to prioritize messages of type T. i.e.:
message_buffer<message<_Type>> _M_messageBuffer;
There is an alternative that is in the box, and that is to use the message block's constructor overload that takes a filter_method functor and then to link multiple targets to an unbounded_buffer. Specifically if multiple targets are linked to an unbounded_buffer then the unbounded_buffer will offer the message to each target linked in the original linked order, this can be used in combination with the filter functor for a 'chain of command' style pattern and you can effectively have a message block with a filter functor that checks for priorities linked first. Here's what I mean... In this example I approximate sending 4 large "files" and 1 small file and we'd like to start processing the small file as soon as is feasible.
#include <agents.h>
#include <string>
#include <iostream>
#include "windows.h"using namespace ::std;
using namespace ::Concurrency;//helper to connect message blocks
template <class Buf1Class, class Buf2Class>
void connect(Buf1Class& buf1, Buf2Class& buf2)
{
buf1.link_target(&buf2);
}class File
{
public:
File(string filename,size_t filesize):name(filename),size(filesize){}
const size_t size;
const string name;
bool openfile()
{
cout << "loading file: " << name << endl;//simulate file load 5ms per page
Sleep(5*size/1024*4);
return true;
}
};int main()
{
const size_t smallfilesize = 10000;//a functor to simulate loading a file
auto loadfile = [](File in)
{
in.openfile();
};//a filter function, that returns true if the file is small
auto trueIfSmallFilter = [smallfilesize](File in) -> bool
{
if (in.size < smallfilesize)
return true;
return false;
};//a functor to create a file from a string
auto createFileFromString = [](string in) -> File
{
return File(in,in.length()*1024);
};//here's a transformer for opening the files
transformer<string,File> openFiles(createFileFromString);//here's a transformer for lookingupFiles, note the filter function
call<File> lookupSmallFiles(loadfile,trueIfSmallFilter);//here's a transformer for lookingupFiles
call<File> lookupFiles(loadfile);//here's an unbounded_buffer of filenames
unbounded_buffer<string> filenames;//here's an unbounded_buffer of files
unbounded_buffer<File> files;connect(filenames,openFiles);
connect(openFiles,files);
//when multiple targets are hooked up to an unbounded_buffer
//they are offered messages in order
//here the filter function is used to only process small files
connect(files,lookupSmallFiles);
connect(files,lookupFiles);asend(&filenames,string("very long file 1"));
asend(&filenames,string("very long file 2"));
asend(&filenames,string("very long file 3"));
asend(&filenames,string("very long file 4"));//now send a short file
asend(&filenames,string("smallfile"));
//sleeping is the lazy way to wait for the messages to propagate
//please don't follow my example here in any real code
Sleep(100);return 0;
}
Rick Molloy Parallel Computing Platform : http://blogs.msdn.com/nativeconcurrency- Marked As Answer byrickmolloyMSFT, OwnerTuesday, June 02, 2009 9:56 PM
All Replies
- Can you let us know what kind of scenario you're thinking about here? It may help us either to better understand what you're looking for or to suggest how you may accomplish this.
With that being said the message blocks are intended to be extensible, and if what we have doesn't meet the need it's possible to build additional message blocks to suit a particular purpose. So again, hearing just a little bit more about what precisely you'd like to do would help here.
-Rick
Rick Molloy Parallel Computing Platform : http://blogs.msdn.com/nativeconcurrency - I undestand that message blocks are extendable. Just thought the built-in list of message block types might be extended with a prioritized one.
Consider the following scenario:
We are implementing a "find in files" program. Its goal is to scan a number of different drives for matched files. For each matched file, we count a number of occurrences of a pattern (so, the whole file must be scanned). For this, we have several tasks in parallel (consider the underlying hardware allows concurrent access to different drives). As matched files are found, they all are added to a queue, (an unbounded_buffer object).
Then we have a number of finder tasks, each of which call "receive" on the queue and execute (potentionally lengthy) operation.
Now, if our goal is to get as much results as wen can and as quickly as we can, we may want to replace the unbounded_buffer with a prioritized queue, which will force smaller files to be fetched before larger ones, possibly providing user with end results faster.
Another problem I cannot see easily solved with PPL (or I didn't quite understand it) - is how to determine the optimal number of finder tasks?
I have tested the following scenarios:
1. Replace an unbounded_buffer with a call object. The call object contains a direct call to a finder's function.
Resulted in synchronous operation.
2. Replace an unbounded_buffer with a call object. The call object executes the task_group::run with a finder's function.
Resulted in an enourmous number of threads (I saw >3700, for example).
3. After peaking at the internals of parallel_for, I used the "internal" _Get_num_chunks function. After further tests, multiplying the number of finder's tasks by two and then by four resulted in better overall performance.
All tests were made on Windows 7 RC on quad core computer.
It is a little uncertain whether PPL can solve this problem. Or at least it does it internally in such high-level functions like parallel_for and parallel_for_each. In the latter case it would be great to provide some kind of hint to users of other PPL's components, as this information might be helpful.
But again I maybe not quite understand the concept, as the documentation is very very scarce at this point...- Marked As Answer byrickmolloyMSFT, OwnerTuesday, June 02, 2009 9:56 PM
- Unmarked As Answer byrickmolloyMSFT, OwnerTuesday, June 02, 2009 9:56 PM
- Thanks for sharing the detailed scenario, this helps a lot. If you're willing to share the code you're using we'd certainly take a look at it. I'm particularly interested in seeing the > 3700 threads scenario.
When I built a 'find in files' example the approach I took was to use a task_group to recursive search for files and to spawn an additional task with task_group::run everytime I found a directory. When I found a file that was of the right type to check, I used 'asend' to send a message asynchronously (are you using asend with your synchronous call or send?), as far as number of finder tasks to spawn goes, this may depend on the hardware. I would definitely use message blocks to pipeline the compute intensive work here though.
Regarding _Get_num_chunks; this is an implementation detail and we'd like to have flexibility to remove it, there are supported APIs like Concurrency::GetProcessorCount() which are defined in <concrtrm.h>.
Regarding the priority queue, I think the "correct" solution would be to use a priority queue as you've alluded to. If you wanted to build a custom prioritized unbounded_buffer, I think the easiest path would be to copy the unbounded_buffer code and look at replacing _M_messageBuffer member with a threadsafe priority queue that implicity knew how to prioritize messages of type T. i.e.:
message_buffer<message<_Type>> _M_messageBuffer;
There is an alternative that is in the box, and that is to use the message block's constructor overload that takes a filter_method functor and then to link multiple targets to an unbounded_buffer. Specifically if multiple targets are linked to an unbounded_buffer then the unbounded_buffer will offer the message to each target linked in the original linked order, this can be used in combination with the filter functor for a 'chain of command' style pattern and you can effectively have a message block with a filter functor that checks for priorities linked first. Here's what I mean... In this example I approximate sending 4 large "files" and 1 small file and we'd like to start processing the small file as soon as is feasible.
#include <agents.h>
#include <string>
#include <iostream>
#include "windows.h"using namespace ::std;
using namespace ::Concurrency;//helper to connect message blocks
template <class Buf1Class, class Buf2Class>
void connect(Buf1Class& buf1, Buf2Class& buf2)
{
buf1.link_target(&buf2);
}class File
{
public:
File(string filename,size_t filesize):name(filename),size(filesize){}
const size_t size;
const string name;
bool openfile()
{
cout << "loading file: " << name << endl;//simulate file load 5ms per page
Sleep(5*size/1024*4);
return true;
}
};int main()
{
const size_t smallfilesize = 10000;//a functor to simulate loading a file
auto loadfile = [](File in)
{
in.openfile();
};//a filter function, that returns true if the file is small
auto trueIfSmallFilter = [smallfilesize](File in) -> bool
{
if (in.size < smallfilesize)
return true;
return false;
};//a functor to create a file from a string
auto createFileFromString = [](string in) -> File
{
return File(in,in.length()*1024);
};//here's a transformer for opening the files
transformer<string,File> openFiles(createFileFromString);//here's a transformer for lookingupFiles, note the filter function
call<File> lookupSmallFiles(loadfile,trueIfSmallFilter);//here's a transformer for lookingupFiles
call<File> lookupFiles(loadfile);//here's an unbounded_buffer of filenames
unbounded_buffer<string> filenames;//here's an unbounded_buffer of files
unbounded_buffer<File> files;connect(filenames,openFiles);
connect(openFiles,files);
//when multiple targets are hooked up to an unbounded_buffer
//they are offered messages in order
//here the filter function is used to only process small files
connect(files,lookupSmallFiles);
connect(files,lookupFiles);asend(&filenames,string("very long file 1"));
asend(&filenames,string("very long file 2"));
asend(&filenames,string("very long file 3"));
asend(&filenames,string("very long file 4"));//now send a short file
asend(&filenames,string("smallfile"));
//sleeping is the lazy way to wait for the messages to propagate
//please don't follow my example here in any real code
Sleep(100);return 0;
}
Rick Molloy Parallel Computing Platform : http://blogs.msdn.com/nativeconcurrency- Marked As Answer byrickmolloyMSFT, OwnerTuesday, June 02, 2009 9:56 PM


