locked
DataReader.LoadAsync does not detect closed sockets (with InputStreamOptions::Partial) [C++]

    Question

  • [this is on Windows 8 Consumer Preview with Visual Studio 11 Beta, using C++ in a Metro app]

    When reading data from a TCP socket (StreamSocket) with a DataReader object where DataReader.InputStreamOptions = InputStreamOptions::Partial, the 'Completed' callback of LoadAsync() will keep returning the same data over and over again after the socket has been closed by the remote endpoint.

    The use case is a simple one: the client has a TCP connection to a server, and keeps looping on: calling LoadAsync(4096) to ask to load up to 4096 bytes, then calling DataReader::ReadBytes() in the 'Completed' callback, reading whatever is available in the buffer, then back to LoadAsync(4096) for more data. When the remote end closes the socket, this client loop never terminates: the last received buffer keeps coming back in the 'Completed' callback, and there's no way to detect the socket has been closed.

    There is a very simple way to reproduce this, with the StreamSocket (http://code.msdn.microsoft.com/windowsapps/StreamSocket-Sample-8c573931) sample app:

    modify the reader to use the 'Partial mode', and modify the ReceiveStringLoop like this:

    void ScenarioInput1::OnConnection(StreamSocketListener^ listener, StreamSocketListenerConnectionReceivedEventArgs^ object)
    {
        DataReader^ reader = ref new DataReader(object->Socket->InputStream);
        reader->InputStreamOptions = InputStreamOptions::Partial; // added in this example
    
        ReceiveStringLoop(reader, object->Socket);
    }
    void ScenarioInput1::ReceiveStringLoop(DataReader^ reader, StreamSocket^ socket)
    {
        task<unsigned int> loadSize(reader->LoadAsync(4096));
        loadSize.then([this, reader, socket] (unsigned int size)
        {
    	String^ str = reader->ReadString(size);
            rootPage->NotifyUserFromAsyncThread("Received data: \"" + str + "\"", NotifyType::StatusMessage);
        }).then([this, reader, socket] (task<void> previousTask)
        {
            try
            {
                previousTask.get();
                ReceiveStringLoop(reader, socket);
            }
            catch (Exception^ exception)
            {
                rootPage->NotifyUserFromAsyncThread("Read stream failed with error: " + exception->Message, NotifyType::ErrorMessage);
    
                // Explicitly close the socket.
                delete socket;
            }
            catch (StreamClosedException&)
            {
                // Do not print anything here - this will usually happen because user closed the client socket.
    
                delete socket;
            }
        });
    }

    The rest of the sample app is left otherwise unmodified. We can observe that sending and receiving of data still works as before. But after the socket is closed (the 'close' button on the sample app), the loadSize.then() handler will be called repeatedly, with the same size, and the same data keeps coming back, as if the DataReader object was reusing whatever was previously left in the buffer, over and over again. The 'size' parameter is never 0 as one would expect it to be after the socket is closed. So there's no way to detect that the remote endpoint has closed the connection and get out of the reading loop sequence.

    Friday, April 06, 2012 8:48 AM

All replies

  • I'm also seeing the same problem described by gboccongibod. I would expect a StreamClosedException or a Timeout depending on how the socket connection was terminated if in the completion handle calling get()/getResult().

    Trying to provide cover for this problem by doing a protocol change isn't possible for our legacy application.

    Monday, April 09, 2012 6:53 PM
  • Microsoft person here (and from the team that owns this API). We're not allowed to tell you about anything that isn't released, but I can tell you that what you're seeing is a bug, and that we consider it to be very serious.  The #1 goal of any network API is to give you the bytes that were sent, without changes, and without (artificially) dropping or duplicating any data.

    So, please, stay tuned, and watch for upcoming releases.


    Network Developer Experience Team (Microsoft)

    Wednesday, April 11, 2012 1:35 AM
  • DataReader reader->InputStreamOptions = InputStreamOptions::Partial;
    auto task1 = make_task([=] {
    	try
    	{
    		bool load_complete = false;
    		unsigned int cpreaddata = 0;
    		unsigned int loaddata = size;
    		while (!load_complete)
    		{
    			DataReaderLoadOperation^ load_data = reader->LoadAsync(loaddata);
    			WaitForSingleObjectEx(smi_event_wait,100,false);
    			AsyncStatus loadstarus = smi_load_data->Status;
    			if(loadstarus == AsyncStatus::Completed)
    			{
    				unsigned int unreaddata = smi_reader->UnconsumedBufferLength;
    				auto data = ref new Platform::Array<unsigned char>(unreaddata);
    				smi_reader->ReadBytes(data);		
    				memcpy(buf+cpreaddata,data->Data,unreaddata);
    				cpreaddata += unreaddata;
    				loaddata -= unreaddata;
    				if(0 == loaddata)
    				{
    					load_complete = true;
    				}
    			} 
    		}
    	}
    	catch (Exception^ e)
    	{
    		e->Message;
    	}
    });
    
    structured_task_group tasks;
    tasks.run_and_wait(task1);

    hi

    you can use this method to load data completed before close socket and it's verified.

    WaitForSingleObjectEx() is used to promise load data enough.

    the loaded data is saved in buf

    Wednesday, April 11, 2012 9:31 AM

  • Hi, Is it solved yet or not? I am facing the same problem as gboccongibod.
    Tuesday, August 20, 2013 7:40 AM
  • Hi

    You can detect and bypass the above issue something like following way

    unsigned int availableBytes = bytesLoaded.get();
    if (availableBytes	> 0 && nullptr != dataReader)
    				{   
    							// address here
    				}
    				else
    				{
    					cancel_current_task();
    				}

    Wednesday, August 21, 2013 3:44 AM
  • DataReader reader->InputStreamOptions = InputStreamOptions::Partial;
    auto task1 = make_task([=] {
    	try
    	{
    		bool load_complete = false;
    		unsigned int cpreaddata = 0;
    		unsigned int loaddata = size;
    		while (!load_complete)
    		{
    			DataReaderLoadOperation^ load_data = reader->LoadAsync(loaddata);
    			WaitForSingleObjectEx(smi_event_wait,100,false);
    			AsyncStatus loadstarus = smi_load_data->Status;
    			if(loadstarus == AsyncStatus::Completed)
    			{
    				unsigned int unreaddata = smi_reader->UnconsumedBufferLength;
    				auto data = ref new Platform::Array<unsigned char>(unreaddata);
    				smi_reader->ReadBytes(data);		
    				memcpy(buf+cpreaddata,data->Data,unreaddata);
    				cpreaddata += unreaddata;
    				loaddata -= unreaddata;
    				if(0 == loaddata)
    				{
    					load_complete = true;
    				}
    			} 
    		}
    	}
    	catch (Exception^ e)
    	{
    		e->Message;
    	}
    });
    
    structured_task_group tasks;
    tasks.run_and_wait(task1);

    hi

    you can use this method to load data completed before close socket and it's verified.

    WaitForSingleObjectEx() is used to promise load data enough.

    the loaded data is saved in buf

    Hi oishixixi,

      Please can you tell me how to define  the smi_event_wait, smi_load_data, smi_reader ?

    I have a function to send some data to bluetooth device and get some data back, so here is my code, and i want to modify it to loop receiving,  please can you teach me how to do it ?

    void CBTWinRTDLL::SendReceiveData()
    {
    	Platform::Array<unsigned char>^ sendArray = ref new Platform::Array<unsigned char, 1>(10);
    	sendArray[0] = 0x65;
    	sendArray[1] = 0x00;
    	sendArray[2] = 0x00;
    	sendArray[3] = 0x00;
    	sendArray[4] = 0x00;
    	sendArray[5] = 0x00;
    	sendArray[6] = 0x00;
    	sendArray[7] = 0x00;
    	sendArray[8] = 0x00;
    	sendArray[9] = 0x00;
    
    	_writer->WriteBytes(sendArray);
    
    	create_task(_writer->StoreAsync()).then([this](task<unsigned int> writeTask)
    	{
    		try
    		{
    			// Try getting an exception.
    			writeTask.get();
    
    			_reader->InputStreamOptions = InputStreamOptions::Partial;
    
    			create_task(_reader->LoadAsync(10)).then(
    				[this](unsigned int readSize)
    			{
    
    				Platform::Array<unsigned char>^ recvArray = ref new Platform::Array<unsigned char, 1>(readSize);
    				_reader->ReadBytes(recvArray);
    
    				Platform::String^ recvStr;
    				for (int i = 0; i <  recvArray->Length; i++)
    				{
    					recvStr += recvArray[i].ToString() + " ";
    				}
    				MessageDialog^ md = ref new MessageDialog("received: " + recvStr, "WinRT DLL");
    				md->ShowAsync();
    			});
    		}
    		catch (Exception^ exception)
    		{
    		}
    	});
    }


    nio

    Thursday, December 12, 2013 9:45 AM
  • Hi

    I've not been able to get DataReader to read more than a few hundred bytes at a time using this code.  Is this something you have seen?

    Would you mind sharing a full working sample (to dissect)?

    Thanks!
    G

    Sunday, March 16, 2014 4:15 PM