bounded_buffer bug? - Why do I need to recreate "join" for every receive?
-
2011년 10월 20일 목요일 오후 10:18
I'm using make_join to create a join which I want to use inside by agent, however, I seem to have to re-create the join for every call to receive, why?
virtual void run() { //auto join = make_join(&video_, &audio_); // DOES NOT WORK? while(true) { auto join = make_join(&video_, &audio_); // WORKS? auto item = Concurrency::receive(join); std::get<0>(item)->audio_data() = std::move(*std::get<1>(item)); Concurrency::send(target_, std::get<0>(item)); } }
EDIT:
I should probably mention that video_ and audio_ are bounded_buffers.
It works fine with unbounded_buffers.
This problem seems to be related to: http://social.msdn.microsoft.com/Forums/en-US/parallelcppnative/thread/42a55562-1380-47e6-a8cc-839e7db13941- 편집됨 Dragon89 2011년 10월 21일 금요일 오전 10:20
모든 응답
-
2011년 10월 21일 금요일 오후 10:32
then use the unbounded buffers. I make it a practice to use pointers all the time so I am aware of varying sizes a lot.
Windows MVP 2010-11, XP, Vista, 7. Expanding into Windows Server 2008 R2, SQL Server, SharePoint, Cloud, Virtualization etc. etc.
Hardcore Games, Legendary is the only Way to Play
Developer | Windows IT | Chess | Economics | Vegan Advocate | PC Reviews
-
2011년 10월 22일 토요일 오전 10:18
If it would have been more appropriate to use unbounded buffer then I would have. BUt this is not the case in my situation.then use the unbounded buffers.
I don't understand.I make it a practice to use pointers all the time so I am aware of varying sizes a lot.
-
2011년 10월 31일 월요일 오후 11:08소유자
Hello,
Thanks for reporting the bug in our sample pack. The bounded_buffer is not really propagating messages on the resume propagation code path. The fix below addresses that issue.
BEFORE:
virtual void propagate_to_any_targets(message<_Type> * _PMessage) { // Enqueue pMessage to the internal message buffer if it is non-NULL. // pMessage can be NULL if this LWT was the result of a Repropagate call // out of a Consume or Release (where no new message is queued up, but // everything remaining in the bounded buffer needs to be propagated out) if (_PMessage != NULL) { _M_messageBuffer.enqueue(_PMessage); // If the incoming pMessage is not the head message, we can safely assume that // the head message is blocked and waiting on Consume(), Release() or a new // link_target() and cannot be propagated out. if (_M_messageBuffer.is_head(_PMessage->msg_id())) { _Propagate_priority_order(); } } else { // While current size is less than capacity try to consume // any previously offered ids. bool _ConsumedMsg = true; while(_ConsumedMsg) { // Assume a message will be found to successfully consume in the // saved ids, if not this will be decremented afterwards. if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity) { break; } _ConsumedMsg = try_consume_msg(); } // Decrement the current size, we broke out of the previous loop // because we reached capacity or there were no more messages to consume. _InterlockedDecrement(&_M_currentSize); } }
After:virtual void propagate_to_any_targets(message<_Type> * _PMessage) { // Enqueue pMessage to the internal message buffer if it is non-NULL. // pMessage can be NULL if this LWT was the result of a Repropagate call // out of a Consume or Release (where no new message is queued up, but // everything remaining in the bounded buffer needs to be propagated out) if (_PMessage != NULL) { _M_messageBuffer.enqueue(_PMessage); // If the incoming pMessage is not the head message, we can safely assume that // the head message is blocked and waiting on Consume(), Release() or a new // link_target() and cannot be propagated out. if (!_M_messageBuffer.is_head(_PMessage->msg_id())) { return; } } _Propagate_priority_order(); { // While current size is less than capacity try to consume // any previously offered ids. bool _ConsumedMsg = true; while(_ConsumedMsg) { // Assume a message will be found to successfully consume in the // saved ids, if not this will be decremented afterwards. if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity) { break; } _ConsumedMsg = try_consume_msg(); } // Decrement the current size, we broke out of the previous loop // because we reached capacity or there were no more messages to consume. _InterlockedDecrement(&_M_currentSize); } }
Thanks!Rahul
Rahul V. Patil- 답변으로 제안됨 Rahul V. PatilModerator 2011년 10월 31일 월요일 오후 11:13
- 답변으로 표시됨 Dragon89 2011년 11월 2일 수요일 오전 12:20
-
2011년 10월 31일 월요일 오후 11:17
good to see you figured it out
feel free to post again if there are any other problems
Windows MVP 2010-11, XP, Vista, 7. Expanding into Windows Server 2008 R2, SQL Server, SharePoint, Cloud, Virtualization etc. etc.
Hardcore Games, Legendary is the only Way to Play
Developer | Windows IT | Chess | Economics | Vegan Advocate | PC Reviews

