none
AMP crashes using completion_future for continuations with copy_async RRS feed

  • Question

  • I'm trying to maintain static buffers on the accelerator to avoid repeated mallocs, since my OpenCL code gives substantial performance improvements with it. So I have a setup where I have an array<float, 1> for gpu buffers, and a staging array to help speed up copies. To chain the copy operations (CPU->staging->GPU buffer), I'm using continuations with completion_future. However, my code crashes with a memory access error on the copy back (GPU buffer -> staging -> CPU memory). In addition, the crashes are not deterministic, they happen at varying iterations. I've checked that the queued operations are trying to access the correct memory address, and there is sufficient memory allocated for it. Here's my code:

    #include <amp.h>
    #include <amp_math.h>
    #include <iostream>
    #include <vector>
    #include <cstdlib>
    
    using namespace std;
    using namespace concurrency;
    
    #define FRAND (static_cast<float>(rand()) / RAND_MAX)
    #define FSRAND (FRAND - 0.5f)
    
    #define USE_COMPLETIONS
    
    void testfunc () {
    	const int n = 10000, allocn = 20000;
    	static array<float, 1> staging[] = {array<float, 1>(n, accelerator(accelerator::cpu_accelerator).default_view, accelerator(accelerator::default_accelerator).default_view), 
    		array<float, 1>(n, accelerator(accelerator::cpu_accelerator).default_view, accelerator(accelerator::default_accelerator).default_view), 
    		array<float, 1>(n, accelerator(accelerator::cpu_accelerator).default_view, accelerator(accelerator::default_accelerator).default_view), 
    	};
    	static array<float, 1> gpumem[] = {array<float, 1>(allocn), array<float, 1>(allocn), array<float, 1>(allocn)};
    	static bool isinit = false;
    	static vector<float> a(n), b(n), c(n), d(n);
    	if (!isinit) {
    		a.resize(n);
    		b.resize(n);
    		c.resize(n);
    		for (int i = 0; i < n; ++i) {a[i] = FSRAND;	b[i] = FSRAND; c[i] = FSRAND;}
    	}
    	vector<float>::iterator worklists[] = {a.begin(), b.begin(), c.begin(), a.end(), b.end(), c.end()};
    	vector<float> *wlptrs[] = {&a, &b, &c};
    	cout << "init complete" << endl;
    #ifdef USE_COMPLETIONS
    	for (int i = 0; i < 3; ++i) try {
    		completion_future cf = copy_async (worklists[i], worklists[3+i], staging[i]);
    		cf.then([&, i] {
    			cout << "Continuing copyinit " << i << endl;
    			cout.flush();
    			copy_async(staging[i], gpumem[i]).wait();
    		});
    		//cf.get();
    	} catch (std::exception ex) {cout << "Caught exception in copyinit: " << ex.what() << endl;cout.flush();}
    #else
    	shared_future<void> waitlists[3];
    	for (int i = 0; i < 3; ++i) try {
    		waitlists[i] = copy_async (worklists[i], worklists[3+i], staging[i]);
    	} catch (std::exception ex) {cout << "Caught exception in copyinit: " << ex.what() << endl;}
    	for (int i = 0; i < 3; ++i) waitlists[i].wait();
    	for (int i = 0; i < 3; ++i) try {
    		waitlists[i] = copy_async (staging[i], gpumem[i]);
    	} catch (std::exception ex) {cout << "Caught exception in copyinit: " << ex.what() << endl;}
    	for (int i = 0; i < 3; ++i) waitlists[i].wait();
    #endif
    	array<float, 1> &ga = gpumem[0], &gb = gpumem[1], &gc = gpumem[2];
    	cout << "transfers to gpu scheduled" << endl; cout.flush();
    	try {
    		parallel_for_each (concurrency::extent<1>(n),
    			[&](index<1> idx) restrict(amp) {
    				gc[idx] = ga[idx] + gb[idx];
    				ga[idx] = gb[idx];
    				gb[idx] = gc[idx];
    		}
    		);
    	} catch (std::exception ex) {
    		cout << "Caught exception in kernel: " << ex.what() << endl;cout.flush();
    	}
    	cout << "kernel scheduled" << endl;cout.flush();
    
    #ifdef USE_COMPLETIONS
    	for (int i = 0; i < 3; ++i) try {
    		completion_future cf = copy_async (gpumem[i].section(0, n), staging[i]);
    		cf.then([&, i] {
    			cout << "Continuing copyback " << i << " trying to copy to vector at " << (void*)wlptrs[i] << " of size " << wlptrs[i]->size() << " with ptr range: " << &(*wlptrs[i])[0] << ", " << &(wlptrs[i]->back()) << endl;
    			copy_async(staging[i], wlptrs[i]->begin()).wait();
    		});
    		//cf.get();
    	} catch (std::exception ex) {cout << "Caught exception in copyback: " << ex.what() << endl;cout.flush();}
    #else
    	for (int i = 0; i < 3; ++i) try {
    		waitlists[i] = copy_async (gpumem[i].section(0,n), staging[i]);
    	} catch (std::exception ex) {cout << "Caught exception in copyback: " << ex.what() << endl;}
    	for (int i = 0; i < 3; ++i) waitlists[i].wait();
    	for (int i = 0; i < 3; ++i) try {
    		waitlists[i] = copy_async (staging[i], worklists[i]);
    	} catch (std::exception ex) {cout << "Caught exception in copyback: " << ex.what() << endl;}
    	for (int i = 0; i < 3; ++i) waitlists[i].wait();
    #endif
    	cout << "transfer back scheduled" << endl;cout.flush();
    }
    
    int main (int argc, char **argv) {
    	srand(0);
    	for (int i = 0; i < 10000; ++i) {
    		testfunc();
    	}
    	return 0;
    }
    
    Could someone help point out what I'm doing wrong in this example? I'm only using continuations for copies, and I have the regular copy code in there too to compare. This is a knocked down version of what I'm doing, I actually have a lot more buffers and a more complicated kernel. That's why I'm trying to use loops for all the copies. 


    Thursday, June 21, 2012 2:56 PM

Answers

  • Hi abhinavgolas,

    The get() method on a completion_future or a concurrency::task object only waits for the associated asynchronous operation to finish. You are right that the code in the previous response will effectively serialize the copies. To issue all the copies asynchronously and collectively wait on all of them, you will have to store the task objects returned by the continuations in a container and wait on them using “when_all” or can use the “&&” operator to create composite tasks that just join the operand tasks (see this blog post). The latter should only be used when joining a small number of tasks.

    Regarding why “parallel_for_each” (p_f_e) does not return a completion_future like other asynchronous operations, the p_f_e operation has “as-if-synchronous” semantics which means that it is safe to access the output of the p_f_e (such as issue a copy) immediately after the p_f_e returns. However on Direct3D accelerators the actual execution of the p_f_e kernel is asynchronous and the DirectX runtime takes care of synchronizing any subsequent kernel executions or copy commands that access the same resources as the previous kernel (in a conflicting fashion; i.e. when at least one of the accesses writes to the common data resource). In short, since C++ AMP currently only supports Direct3D accelerators, there is no need for an asynchronous version of “p_f_e” since the “p_f_e” semantically behaves as if it is synchronous but actually executes asynchronously. Adding a parallel_for_each_async which returns a future is a good idea for future releases and we are considering this for the future, when C++ AMP will have accelerators that by default DO execute synchronously. For this release it would have been redundant.

    Also, I am copying below a modified version of the code that issues multiple copies asynchronously and chains operations appropriately to avoid races.

    #include <amp.h>
    #include <amp_math.h>
    #include <iostream>
    #include <vector>
    #include <cstdlib>
    
    using namespace std;
    using namespace concurrency;
    
    #define FRAND (static_cast<float>(rand()) / RAND_MAX)
    #define FSRAND (FRAND - 0.5f)
    
    void testfunc () {
    	const int n = 10000, allocn = 20000;
        accelerator_view cpuAcclView(accelerator(accelerator::cpu_accelerator).default_view);
        accelerator_view defaultAcclView(accelerator().default_view);
    
    	static array<float, 1> staging[] = {
            array<float, 1>(n, cpuAcclView, defaultAcclView),
            array<float, 1>(n, cpuAcclView, defaultAcclView),
            array<float, 1>(n, cpuAcclView, defaultAcclView)
        };
    
    	static array<float, 1> gpumem[] = {array<float, 1>(allocn), array<float, 1>(allocn), array<float, 1>(allocn)};
    	static bool isinit = false;
    	static vector<float> a(n), b(n), c(n), d(n);
    	if (!isinit) {
    		a.resize(n);
    		b.resize(n);
    		c.resize(n);
    		for (int i = 0; i < n; ++i) {a[i] = FSRAND;	b[i] = FSRAND; c[i] = FSRAND;}
    
            isinit = true;
    	}
    	cout << "init complete" << endl;
    
    	vector<float>::iterator worklists[] = {a.begin(), b.begin(), c.begin(), a.end(), b.end(), c.end()};
    	vector<float> *wlptrs[] = {&a, &b, &c};
        auto asyncTask = task<void>([] {}); // Empty task
    	for (int i = 0; i < 3; ++i)
    	{
    		asyncTask = asyncTask && copy_async(worklists[i], worklists[3+i], staging[i]).to_task().then([&, i] {
    			cout << "Continuing copyinit " << i << endl;
    			cout.flush();
    			copy(staging[i], gpumem[i]);
    		});
    	}
    	array<float, 1> &ga = gpumem[0], &gb = gpumem[1], &gc = gpumem[2];
    	cout << "transfers to gpu scheduled" << endl; cout.flush();
        // Now lets chain the compute on the GPU behind the copy-in task
    
        asyncTask.then([&] {
    		parallel_for_each (concurrency::extent<1>(n),
    			[&](index<1> idx) restrict(amp) {
    				gc[idx] = ga[idx] + gb[idx];
    				ga[idx] = gb[idx];
    				gb[idx] = gc[idx];
    		});
    
        	cout << "kernel scheduled" << endl;cout.flush();
    
            auto tempTask = task<void>([]{}); // Empty task
    	    for (int i = 0; i < 3; ++i)
    	    {
    		    tempTask = tempTask && copy_async (gpumem[i].section(0, n), staging[i]).to_task().then([&, i] {
    			    cout << "Continuing copyback " << i << " trying to copy to vector at " << (void*)wlptrs[i] << " of size " << wlptrs[i]->size() << " with ptr range: " << &(*wlptrs[i])[0] << ", " << &(wlptrs[i]->back()) << endl;
    			    copy(staging[i], wlptrs[i]->begin());
    		    });
    	    }
    
        	cout << "transfer back scheduled" << endl;
            tempTask.get();
        }).get();
    
    	cout.flush();
    }
    
    int main (int argc, char **argv) {
    	srand(0);
    	for (int i = 0; i < 10000; ++i) {
    		testfunc();
    	}
    	return 0;
    }
    
    

    - Amit


    Amit K Agarwal

    • Marked as answer by abhinavgolas Monday, June 25, 2012 2:43 PM
    Saturday, June 23, 2012 12:53 AM
    Moderator

All replies

  • Hi abhinavgolas,

    Your code has possible race conditions. After launching the copy-in of data from staging to gpumem asynchronously, you are invoking parallel_for_each which returns immediately and then firing copy-out of data from gpumem to staging asynchronously. The two copy operations (copy-in and copy-out) can execute concurrently resulting in undefined behavior.

    Before you launch parallel_for_each you need to make sure that the asynchronous copy operation has finished execution. Also when you are copying-out the data asynchronously (i.e. on a separate thread), your main thread may exit before the copying out is complete. Hence, the chained continuation will then try to access the memory which no longer exist. So you need to wait for the copy-out operation to finish in main thread before exiting. You should change your code to something like this:

    for (int i = 0; i < 3; ++i)
    {
        try
        {
            completion_future cf = copy_async (worklists[i], worklists[3+i], staging[i]);
            cf.to_task().then([&, i] {
                cout << "Continuing copyinit " << i << endl;
                cout.flush();
                copy_async(staging[i], gpumem[i]).wait();
            }).get();
        }
        catch (std::exception ex)
        {
            cout << "Caught exception in copyinit: " << ex.what() << endl;
            cout.flush();
        }
    }

    You can read more about using continuations on our blog.

    Thanks,
    Hasibur

    PS

    Also, the way you are using staging array will not speed up your copy. You are allocating vectors on cpu and then copying it to staging array and then to gpu. What you are trying to do is already taken care of by C++ AMP runtime. To make the copy faster, You can directly create staging array and fill it with data. Then copy data from it to array on gpu. Something like this:

    if (!isinit)
    {
        for (int i = 0; i < n; ++i)
        {
            staging[0][i] = FSRAND; 
            staging[1][i] = FSRAND;
            staging[2][i] = FSRAND;
        }
    }

    for (int i = 0; i < 3; ++i)
    {
        try
        {
            copy_async(staging[i], gpumem[i]).get();
        }
        catch (std::exception ex)
        {
            cout << "Caught exception in copyinit: " << ex.what() << endl;
            cout.flush();
        }
    }

    While copying out the data, you can copy from array on gpu to staging array and then use the staging array directly. There is no need to copy from staging array to vector. You can read more about staging array on our blog.

    Friday, June 22, 2012 2:45 AM
  • Hi Habibur,

    Thanks for the fix, that takes care of the crashes. The way I understand is that the get() function is called that waits till the second asynchronous copy completes. That wouldn't make my code wait till the copies are complete. Or does the get() wait on the host thread till both copies complete? Wouldn't that serialize the copies, as in, send control back to host code, then initiate the next copy. If so, that would defeat the purpose of trying to use continuations. If it's the second case, is there a way to schedule all copies and then wait?

    Also, I'm confused as to why parallel_for_each does not return a completion_future as well, as that will allow chaining of all the copy, kernel, and copyback commands. This is similar to what OpenCL does, and it helps the flow of the program. Also, is a way to wait on multiple completion_future's planned?

    As for directly using a staging array, that may not be feasible for my code. The example I posted is a representation of what I'm trying to do for a small kernel in my code. The STL vectors are components of a physics simulator that has a lot of other components. I wouldn't like to change the entire codebase to use staging arrays instead of STL vectors. What I've been trying to do is speed up CPU-GPU memory copies, and match the OpenCL I/O performance. Right now the OpenCL runtime is more than 2X faster, even though the kernel runtimes are very similar for both AMP and OpenCL. In OpenCL I'm using static device buffers, and I was trying to replicate the same on AMP.

    Friday, June 22, 2012 3:38 PM
  • Hi abhinavgolas,

    The get() method on a completion_future or a concurrency::task object only waits for the associated asynchronous operation to finish. You are right that the code in the previous response will effectively serialize the copies. To issue all the copies asynchronously and collectively wait on all of them, you will have to store the task objects returned by the continuations in a container and wait on them using “when_all” or can use the “&&” operator to create composite tasks that just join the operand tasks (see this blog post). The latter should only be used when joining a small number of tasks.

    Regarding why “parallel_for_each” (p_f_e) does not return a completion_future like other asynchronous operations, the p_f_e operation has “as-if-synchronous” semantics which means that it is safe to access the output of the p_f_e (such as issue a copy) immediately after the p_f_e returns. However on Direct3D accelerators the actual execution of the p_f_e kernel is asynchronous and the DirectX runtime takes care of synchronizing any subsequent kernel executions or copy commands that access the same resources as the previous kernel (in a conflicting fashion; i.e. when at least one of the accesses writes to the common data resource). In short, since C++ AMP currently only supports Direct3D accelerators, there is no need for an asynchronous version of “p_f_e” since the “p_f_e” semantically behaves as if it is synchronous but actually executes asynchronously. Adding a parallel_for_each_async which returns a future is a good idea for future releases and we are considering this for the future, when C++ AMP will have accelerators that by default DO execute synchronously. For this release it would have been redundant.

    Also, I am copying below a modified version of the code that issues multiple copies asynchronously and chains operations appropriately to avoid races.

    #include <amp.h>
    #include <amp_math.h>
    #include <iostream>
    #include <vector>
    #include <cstdlib>
    
    using namespace std;
    using namespace concurrency;
    
    #define FRAND (static_cast<float>(rand()) / RAND_MAX)
    #define FSRAND (FRAND - 0.5f)
    
    void testfunc () {
    	const int n = 10000, allocn = 20000;
        accelerator_view cpuAcclView(accelerator(accelerator::cpu_accelerator).default_view);
        accelerator_view defaultAcclView(accelerator().default_view);
    
    	static array<float, 1> staging[] = {
            array<float, 1>(n, cpuAcclView, defaultAcclView),
            array<float, 1>(n, cpuAcclView, defaultAcclView),
            array<float, 1>(n, cpuAcclView, defaultAcclView)
        };
    
    	static array<float, 1> gpumem[] = {array<float, 1>(allocn), array<float, 1>(allocn), array<float, 1>(allocn)};
    	static bool isinit = false;
    	static vector<float> a(n), b(n), c(n), d(n);
    	if (!isinit) {
    		a.resize(n);
    		b.resize(n);
    		c.resize(n);
    		for (int i = 0; i < n; ++i) {a[i] = FSRAND;	b[i] = FSRAND; c[i] = FSRAND;}
    
            isinit = true;
    	}
    	cout << "init complete" << endl;
    
    	vector<float>::iterator worklists[] = {a.begin(), b.begin(), c.begin(), a.end(), b.end(), c.end()};
    	vector<float> *wlptrs[] = {&a, &b, &c};
        auto asyncTask = task<void>([] {}); // Empty task
    	for (int i = 0; i < 3; ++i)
    	{
    		asyncTask = asyncTask && copy_async(worklists[i], worklists[3+i], staging[i]).to_task().then([&, i] {
    			cout << "Continuing copyinit " << i << endl;
    			cout.flush();
    			copy(staging[i], gpumem[i]);
    		});
    	}
    	array<float, 1> &ga = gpumem[0], &gb = gpumem[1], &gc = gpumem[2];
    	cout << "transfers to gpu scheduled" << endl; cout.flush();
        // Now lets chain the compute on the GPU behind the copy-in task
    
        asyncTask.then([&] {
    		parallel_for_each (concurrency::extent<1>(n),
    			[&](index<1> idx) restrict(amp) {
    				gc[idx] = ga[idx] + gb[idx];
    				ga[idx] = gb[idx];
    				gb[idx] = gc[idx];
    		});
    
        	cout << "kernel scheduled" << endl;cout.flush();
    
            auto tempTask = task<void>([]{}); // Empty task
    	    for (int i = 0; i < 3; ++i)
    	    {
    		    tempTask = tempTask && copy_async (gpumem[i].section(0, n), staging[i]).to_task().then([&, i] {
    			    cout << "Continuing copyback " << i << " trying to copy to vector at " << (void*)wlptrs[i] << " of size " << wlptrs[i]->size() << " with ptr range: " << &(*wlptrs[i])[0] << ", " << &(wlptrs[i]->back()) << endl;
    			    copy(staging[i], wlptrs[i]->begin());
    		    });
    	    }
    
        	cout << "transfer back scheduled" << endl;
            tempTask.get();
        }).get();
    
    	cout.flush();
    }
    
    int main (int argc, char **argv) {
    	srand(0);
    	for (int i = 0; i < 10000; ++i) {
    		testfunc();
    	}
    	return 0;
    }
    
    

    - Amit


    Amit K Agarwal

    • Marked as answer by abhinavgolas Monday, June 25, 2012 2:43 PM
    Saturday, June 23, 2012 12:53 AM
    Moderator
  • Thanks, this is exactly the kind of chaining I was looking for.
    Monday, June 25, 2012 2:44 PM