bounded_buffer bug? - Why do I need to recreate "join" for every receive?

답변됨 bounded_buffer bug? - Why do I need to recreate "join" for every receive?

  • 2011년 10월 20일 목요일 오후 10:18
     
      코드 있음

    I'm using make_join to create a join which I want to use inside by agent, however, I seem to have to re-create the join for every call to receive, why?

    virtual void run()
    {		
            //auto join = make_join(&video_, &audio_); // DOES NOT WORK?
    	while(true)
    	{
    		auto join = make_join(&video_, &audio_); // WORKS?
    		auto item = Concurrency::receive(join);
    		std::get<0>(item)->audio_data() = std::move(*std::get<1>(item));
    		Concurrency::send(target_, std::get<0>(item));
    	}
    }
    

    EDIT:

    I should probably mention that video_ and audio_ are bounded_buffers.

    It works fine with unbounded_buffers.

    This problem seems to be related to: http://social.msdn.microsoft.com/Forums/en-US/parallelcppnative/thread/42a55562-1380-47e6-a8cc-839e7db13941
    • 편집됨 Dragon89 2011년 10월 21일 금요일 오전 10:20
    •  

모든 응답

  • 2011년 10월 21일 금요일 오후 10:32
     
     

    then use the unbounded buffers. I make it a practice to use pointers all the time so I am aware of varying sizes a lot.

     


    Windows MVP 2010-11, XP, Vista, 7. Expanding into Windows Server 2008 R2, SQL Server, SharePoint, Cloud, Virtualization etc. etc.

    Hardcore Games, Legendary is the only Way to Play

    Developer | Windows IT | Chess | Economics | Vegan Advocate | PC Reviews

  • 2011년 10월 22일 토요일 오전 10:18
     
     

    then use the unbounded buffers.

    If it would have been more appropriate to use unbounded buffer then I would have. BUt this is not the case in my situation.

    I make it a practice to use pointers all the time so I am aware of varying sizes a lot.

    I don't understand.
  • 2011년 10월 31일 월요일 오후 11:08
    소유자
     
     답변됨 코드 있음

    Hello,

    Thanks for reporting the bug in our sample pack.  The bounded_buffer is not really propagating messages on the resume propagation code path. The fix below addresses that issue.

    BEFORE:

    virtual void propagate_to_any_targets(message<_Type> * _PMessage)
            {
                // Enqueue pMessage to the internal message buffer if it is non-NULL.
                // pMessage can be NULL if this LWT was the result of a Repropagate call
                // out of a Consume or Release (where no new message is queued up, but
                // everything remaining in the bounded buffer needs to be propagated out)
                if (_PMessage != NULL)
                {
                    _M_messageBuffer.enqueue(_PMessage);
    
                    // If the incoming pMessage is not the head message, we can safely assume that
                    // the head message is blocked and waiting on Consume(), Release() or a new
                    // link_target() and cannot be propagated out.
                    if (_M_messageBuffer.is_head(_PMessage->msg_id()))
                    {
                        _Propagate_priority_order();
                    }
                }
                else
                {
                    // While current size is less than capacity try to consume
                    // any previously offered ids.
                    bool _ConsumedMsg = true;
                    while(_ConsumedMsg)
                    {
                        // Assume a message will be found to successfully consume in the
                        // saved ids, if not this will be decremented afterwards.
                        if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity)
                        {
                            break;
                        }
    
                        _ConsumedMsg = try_consume_msg();
                    }
    
                    // Decrement the current size, we broke out of the previous loop
                    // because we reached capacity or there were no more messages to consume.
                    _InterlockedDecrement(&_M_currentSize);
                }
            }
    
    

     After:
            virtual void propagate_to_any_targets(message<_Type> * _PMessage)
            {
                // Enqueue pMessage to the internal message buffer if it is non-NULL.
                // pMessage can be NULL if this LWT was the result of a Repropagate call
               // out of a Consume or Release (where no new message is queued up, but
                // everything remaining in the bounded buffer needs to be propagated out)
                if (_PMessage != NULL)
                {
                    _M_messageBuffer.enqueue(_PMessage);
    
                    // If the incoming pMessage is not the head message, we can safely assume that
                    // the head message is blocked and waiting on Consume(), Release() or a new
                    // link_target() and cannot be propagated out.
                    if (!_M_messageBuffer.is_head(_PMessage->msg_id()))
                    {
                        return;
                    }
                }
    
                _Propagate_priority_order();
                
                {
    
                    // While current size is less than capacity try to consume
                    // any previously offered ids.
                    bool _ConsumedMsg = true;
                    while(_ConsumedMsg)
                    {
                        // Assume a message will be found to successfully consume in the
                        // saved ids, if not this will be decremented afterwards.
                        if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity)
                        {
                            break;
                        }
    
                        _ConsumedMsg = try_consume_msg();
                    }
    
                    // Decrement the current size, we broke out of the previous loop
                    // because we reached capacity or there were no more messages to consume.
                    _InterlockedDecrement(&_M_currentSize);
                }
            }
    
    

    Thanks!

    Rahul

     


    Rahul V. Patil
    • 답변으로 제안됨 Rahul V. PatilModerator 2011년 10월 31일 월요일 오후 11:13
    • 답변으로 표시됨 Dragon89 2011년 11월 2일 수요일 오전 12:20
    •  
  • 2011년 10월 31일 월요일 오후 11:17
     
     

    good to see you figured it out

    feel free to post again if there are any other problems

     


    Windows MVP 2010-11, XP, Vista, 7. Expanding into Windows Server 2008 R2, SQL Server, SharePoint, Cloud, Virtualization etc. etc.

    Hardcore Games, Legendary is the only Way to Play

    Developer | Windows IT | Chess | Economics | Vegan Advocate | PC Reviews