locked
Multi-Threaded StreamSocket App with SSL and cancellation_token_source

    Question

  • I am running into an issue when I try to close down a socket connection that has been made in a multi-threaded winrt application.

    In the application we are using a StreamSocket object that is shared by multiple threads.  For all of the socket operations, I have created a shared cancellation_token_source that is shared between all of the socket operations. 

    When I call close() on the cancellation_token_source and the StreamSocket is not using SSL, then everything seems to work properly (all pending tasks are canceled).  However, in cases where the socket is using SSL, via the UpgradeToSslAsync() method, then I end up with an exception that crashes my app.  Note: that this happens when the cancellation_token_source.close() is called at the same time when the LoadAsync() method is called by a DataReader that is connected to the StreamSocket's Input stream.

    Does anyone have any ideas how to prevent this??

    Thanks

    Here is the call stack i see:

    >    MSVCR110D.dll!_invoke_watson(const wchar_t * pszExpression, const wchar_t * pszFunction, const wchar_t * pszFile, unsigned int nLine, unsigned int pReserved)  Line 131 + 0x5 bytes    C++
         vccorlib110d.DLL!__abi_FailFast()  Line 18 + 0x12 bytes    C++
         Core.dll!?__abi_Windows_Foundation_?$AsyncOperationCompletedHandler@I___abi_IDelegate____abi_Invoke@?Q__abi_IDelegate@?$AsyncOperationCompletedHandler@I@Foundation@Windows@@234@U$AAGJP$AAU?$IAsyncOperation@I@34@W4AsyncStatus@34@@Z(Windows::Foundation::IAsyncOperation<unsigned int> * __param0, Windows::Foundation::AsyncStatus __param1)  Line 65535 + 0x1f bytes    C++
         WinTypes.dll!Windows::Internal::AsyncBaseFTM<Windows::Foundation::IAsyncOperationCompletedHandler<unsigned int>,1>::FireCompletion()  Line 163 + 0xf bytes    C++
         WinTypes.dll!Windows::Internal::_AsyncOperationImpl<Windows::Internal::AsyncBaseFTM<Windows::Foundation::IAsyncOperationCompletedHandler<unsigned int>,1>,Windows::Foundation::IAsyncOperation<unsigned int>,Windows::Foundation::IAsyncOperationCompletedHandler<unsigned int>,unsigned int,Microsoft::WRL::Details::Nil,`Windows::Storage::Streams::DataReader::LoadAsync'::`41'::<lambda_CDF8E8D5FE3E316A>,Windows::Internal::CancelClosePassthrough<Windows::Internal::Promise<unsigned int,Microsoft::WRL::Details::Nil> > >::PromiseResultMethod(IInspectable * pThis, ABI::Windows::Foundation::AsyncStatus status, HRESULT hr, unsigned int result)  Line 816 + 0x3c bytes    C++
         WinTypes.dll!Windows::Storage::Streams::DataReader::StreamReadContinuation::operator()(HRESULT hr, Windows::Foundation::IAsyncOperationWithProgress<Windows::Storage::Streams::IBuffer *,unsigned int> * pAsyncOp, ABI::Windows::Foundation::AsyncStatus status)  Line 1146    C++
         WinTypes.dll!Microsoft::WRL::Details::InvokeHelper<Microsoft::WRL::Implements<Microsoft::WRL::RuntimeClassFlags<2>,Windows::Foundation::IAsyncOperationWithProgressCompletedHandler<Windows::Storage::Streams::IBuffer *,unsigned int>,Microsoft::WRL::FtmBase,Microsoft::WRL::Details::Nil,Microsoft::WRL::Details::Nil,Microsoft::WRL::Details::Nil,Microsoft::WRL::Details::Nil,Microsoft::WRL::Details::Nil,Microsoft::WRL::Details::Nil,Microsoft::WRL::Details::Nil>,Windows::Internal::Detail::ContinuationCallbackWrapper<Windows::Foundation::IAsyncOperationWithProgress<Windows::Storage::Streams::IBuffer *,unsigned int>,Windows::Storage::Streams::DataReader::StreamReadContinuation>,2>::Invoke(Windows::Foundation::IAsyncOperationWithProgress<Windows::Storage::Streams::IBuffer *,unsigned int> * arg1, ABI::Windows::Foundation::AsyncStatus arg2)  Line 212 + 0x17 bytes    C++
         Windows.Networking.dll!Windows::Internal::AsyncBaseFTM<Windows::Foundation::IAsyncOperationWithProgressCompletedHandler<Windows::Storage::Streams::IBuffer *,unsigned int>,1>::FireCompletion()  Line 163 + 0xf bytes    C++
         Windows.Networking.dll!SocketOperationWithProgressBase<Windows::Foundation::IAsyncOperationWithProgressCompletedHandler<Windows::Storage::Streams::IBuffer *,unsigned int>,Windows::Foundation::IAsyncOperationProgressHandler<Windows::Storage::Streams::IBuffer *,unsigned int> >::FireCompletion()  Line 170    C++
         Windows.Networking.dll!ReadOperationBase::FireCompletionAndReleaseOperation()  Line 38 + 0x7 bytes    C++
         Windows.Networking.dll!SerializedOperationQueue<StreamSocketReadOperationServer>::TryDequeueOperations()  Line 139    C++
         Windows.Networking.dll!StreamSocketReadOperationServer::CompleteOperation()  Line 127 + 0x5 bytes    C++
         Windows.Networking.dll!ReadOperationBase::IoCompleted(HRESULT ioResult, unsigned int numberOfBytesTransferred, unsigned char isEmptyRead)  Line 135    C++
         Windows.Networking.dll!StreamSocketReadOperationServer::IoCompleted(HRESULT ioResult, unsigned int numberOfBytesTransferred, IInspectable * context)  Line 116    C++
         Windows.Networking.dll!WebioFacade::WriteCompleteCallback(void * context, unsigned long error, unsigned long information)  Line 368    C++
         Windows.Networking.dll!WapSslCleanupReceiverContexts(_WA_SSL_CONNECTION * SslConn, unsigned long Error, unsigned char CloseNotify)  Line 491    C
         Windows.Networking.dll!WapSslCleanupReceiver(_WA_SSL_CONNECTION * SslConn)  Line 3429    C
         Windows.Networking.dll!WaSslProcessReceiver(_WA_SSL_CONNECTION * SslConn)  Line 3148    C
         Windows.Networking.dll!WaSslAbortReceiver(_WA_SSL_CONNECTION * SslConn, unsigned long Error)  Line 3527    C
         Windows.Networking.dll!WaSslAbortRequest(_WA_CONNECTION * Connection, unsigned long Error)  Line 999    C
         Windows.Networking.dll!StreamSocketServer::Close()  Line 583    C++
         Windows.Networking.dll!StreamSocketListenerImpl::Cancel()  Line 269    C++
         Windows.Networking.dll!ReadOperationBase::OnCancel()  Line 101 + 0xd bytes    C++
         WinTypes.dll!Windows::Internal::CancelClosePassthrough<Windows::Internal::Promise<Windows::Storage::Streams::IBuffer *,Microsoft::WRL::Details::Nil> >::operator()(Windows::Internal::Promise<Windows::Storage::Streams::IBuffer *,Microsoft::WRL::Details::Nil> promise, bool fClosed)  Line 210    C++
         WinTypes.dll!Windows::Internal::_AsyncOperationImpl<Windows::Internal::AsyncBaseFTM<Windows::Foundation::IAsyncOperationCompletedHandler<unsigned int>,1>,Windows::Foundation::IAsyncOperation<unsigned int>,Windows::Foundation::IAsyncOperationCompletedHandler<unsigned int>,unsigned int,Microsoft::WRL::Details::Nil,`Windows::Storage::Streams::DataReader::LoadAsync'::`41'::<lambda_CDF8E8D5FE3E316A>,Windows::Internal::CancelClosePassthrough<Windows::Internal::Promise<unsigned int,Microsoft::WRL::Details::Nil> > >::OnCancel()  Line 808    C++
         WinTypes.dll!Microsoft::WRL::AsyncBase<Windows::Foundation::IAsyncOperationWithProgressCompletedHandler<Windows::Storage::Streams::IBuffer *,unsigned int>,Microsoft::WRL::Details::Nil,1>::Cancel()  Line 234    C++
         Core.dll!Windows::Foundation::IAsyncInfo::Cancel()  Line 65535    C++
         Core.dll!Concurrency::details::_Task_impl<unsigned int>::_CancelAndRunContinuations(bool _SynchronousCancel, bool _UserException, bool _PropagatedFromAncestor, const std::shared_ptr<Concurrency::details::_ExceptionHolder> & _ExceptionHolder)  Line 2035 + 0x19 bytes    C++
         Core.dll!Concurrency::details::_Task_impl_base::_Cancel(bool _SynchronousCancel)  Line 1438 + 0x1f bytes    C++
         Core.dll!Concurrency::details::_Task_impl_base::_CancelViaToken(Concurrency::details::_Task_impl_base * _PImpl)  Line 1482    C++
         MSVCR110D.dll!CancellationTokenRegistration_TaskProc::_Exec()  Line 27 + 0xf bytes    C++
         MSVCR110D.dll!Concurrency::details::_CancellationTokenRegistration::_Invoke()  Line 196    C++
         MSVCR110D.dll!Concurrency::details::_CancellationTokenState::_Cancel()  Line 68    C++
         Core.dll!Concurrency::cancellation_token_source::cancel()  Line 770 + 0xd bytes    C++

    Thursday, August 23, 2012 8:00 PM

All replies

  • Hello,

    What's your system and Visual Studio version?

    Would you please provide us sample codes to reproduce this issue?

    Best regards,
    Jesse


    Jesse Jiang [MSFT]
    MSDN Community Support | Feedback to us

    Monday, August 27, 2012 6:29 AM
  • I am using RTM Win 8 / RTM VS 2012.

    Basically, I have a wrapper class called TcpSocket that wraps operations of a StreamSocket.  Multiple threads will call into the operations.  The exception will occur in the RecvAsync method, if we call close on a StreamSocket that is using SSL.

    #include "TcpSocket.h"
    #include <ppltasks.h>
    
    using namespace Concurrency;
    
    TcpSocket::TcpSocket()
    { 
        _tcpStream = ref new StreamSocket();
        _inputReader = nullptr;
        _outputWriter = nullptr;
    	_synchronizer = ref new AsyncSynchronizer();
    }
    
    TcpSocket::~TcpSocket()
    {
    	 _inputReader = nullptr;
    	 _outputWriter = nullptr;
    	 _tcpStream = nullptr;
    }
    
    TcpSocket::Close()
    {
        _cancelTokenSource.cancel()
    }
    
    IAsyncAction^ TcpSocket::ConnectAsync()
    {
        try
        {
    
            auto connectTask = create_task(_tcpStream->ConnectAsync(_endpoint), _cancelTokenSource.get_token());
            auto setupTask = connectTask.then([this](task<void> previousTask)
            {
                try
                {
                    previousTask.get();
                    this->_inputReader = ref new DataReader(_tcpStream->InputStream);
                    _inputReader->InputStreamOptions = InputStreamOptions::Partial;
                    _outputWriter = ref new DataWriter(_tcpStream->OutputStream);
    
                    // Set IsConnected to true if connect was successful
                    IsConnected = true;
                }
                catch (Exception^ exception)
                {
                    LastErrorMessage = exception->Message;
                    // Set IsConnected to false if there was an error
                    IsConnected = false;
                }
            });
    
            return (IAsyncAction^)create_async([setupTask]() { return setupTask; });
        }
        catch(Exception^)
        {
            return nullptr;
        }
    }
    
    IAsyncOperation<IBuffer^>^ TcpSocket::RecvAsync(unsigned int bufferLength)
    {
        try
        {
            auto loadTask = create_task(_inputReader->LoadAsync(bufferLength), _cancelTokenSource.get_token());
            task<IBuffer^> readTask = loadTask.then([this, bufferLength] (unsigned int bytesRead) -> IBuffer^
            {
    
                IBuffer^ buffer = _inputReader->ReadBuffer(bytesRead);
                if (buffer->Length != bytesRead)
                {
                    // These Should match
                   return nullptr;
                }
                return buffer;
            }).then([this](task<IBuffer^> readTask) -> IBuffer^
            {
                try
                {        
    				// Try getting an exception.
    				IBuffer^ buffer = readTask.get();
    
                }
                catch (Exception^ exception)
                {
                    LastErrorMessage = exception->Message;
                    //cancel_current_task();
                    return nullptr;            
                }
            });
    
            return (IAsyncOperation<IBuffer^>^)create_async([readTask]() { return readTask; });
        }
        catch(Exception^)
        {
            return nullptr;
        }
    }
    
    IAsyncOperation<unsigned int>^ TcpSocket::SendAsync(const Array<unsigned char>^ sendData)
    {
        try
        {
            _outputWriter->WriteBytes(sendData);
            auto storeTask = create_task(_outputWriter->StoreAsync(), _cancelTokenSource.get_token());
            task<unsigned int> writeTask = storeTask.then([this] (task<unsigned int> writeTask) -> unsigned int
            {
                try
                {
    				// Try getting an exception.
    				int bytes = writeTask.get();
                    
                    return bytes;
                }
                catch (Exception^ exception)
                {
                    LastErrorMessage = exception->Message;
                    return (unsigned int)exception->HResult;
                }
            });
            return (IAsyncOperation<unsigned int>^)create_async([writeTask]() {return writeTask; });
        }
        catch(Exception^)
        {
            return nullptr;
        }
    }
    
    
    bool TcpSocket::UpgradeToSsl(String^ validationHostName)
    {
        try
        {
            Concurrency::event synchronizer;
            bool retVal = false;
    
            HostName^ hostname = ref new HostName(validationHostName);
            auto upgradeTask = create_task(_tcpStream->UpgradeToSslAsync(SocketProtectionLevel::Ssl, hostname), _cancelTokenSource.get_token());
            upgradeTask.then([&](task<void> previous)
            {
                try
                {
                    previous.get();
                    retVal = true;
                    synchronizer.set();
                }
                catch (Exception^ ex)
                {
                    LastErrorMessage = ex->Message;
                    OutputDebugStringW(ex->Message->Data());
                    retVal = false;
                    synchronizer.set();
                }
    
            }, Concurrency::task_continuation_context::use_arbitrary() );
            synchronizer.wait();
            return retVal;
        }
        catch(Exception^ ex)
        {
            LastErrorMessage = ex->Message;
            OutputDebugStringW(ex->Message->Data());
            return false;
        }
    }
    

    Monday, August 27, 2012 5:04 PM
  • I do see the same problem. Any solutions?
    Wednesday, October 24, 2012 6:03 PM
  • Cancellation in the PPL
    ....
    This document explains the role of cancellation in the Parallel Patterns Library (PPL), how to cancel parallel work, and how to determine when parallel work is canceled.
    ....
    Key Points
    ....
    Cancellation does not occur immediately. Although new work is not started if a task or task group is cancelled, active work must check for and respond to cancellation.

    Friday, February 22, 2013 8:16 PM