locked
Splitting StreamSocketListner stream into discreet packets based on BYTE pattern?

    Question

  • I've got a streamsocketlistener.   As soon as it connects a GStreamer video streamer starts streaming JPEGs in a TCP Stream.  I've got something going right now that looks like this:

    List<Byte[]> frames = new List<Byte[]>(); MemoryStream image = new MemoryStream(); bool FirstJPEG = true; reader.InputStreamOptions = InputStreamOptions.Partial; await reader.LoadAsync(6000); while (reader.UnconsumedBufferLength > 0) { Byte[] message = new Byte[reader.UnconsumedBufferLength]; reader.ReadBytes(message); if (message[0] == (Byte)(0xFF) && message[1] == (Byte)(0xD8)) {

    // the bmp part is done in a Dispatcher thread but this is a reduced slice of the code for illustration.

    bmp.SetSourceAsync(image.AsRandomAccessStream()); image.SetLength(0); await image.WriteAsync(message, 0, message.Length); } else { await image.WriteAsync(message, 0, message.Length); string stopper = "debugstopper"; } await reader.LoadAsync(6000); }

    So if the beginning of message is the beginning of  JPEG header (0xFF,0xD8) it writes it to the bitmap.  Otherwise it keeps writing.  The problem is that instead of firing off a "message" every time it receives a packet the buffer grows very very quickly and by the second call of LoadAsync(6000) it's already filled to 6000 and there is no longer any separation to my knowledge between the different packets.  So then it keeps streaming data in until it's amassed a huge amoutn of data when a LoadAsync() randomly generates a message with a JPEG at the beginning.  

    How can I ensure that I look at each packet coming in discreetly?   Or is there a better way to split the stream without relying on the new-image header [0xFF,0xD8] being the first 2 bytes? 
    Tuesday, March 24, 2015 1:19 AM

Answers

  • Found the problem a couple hours ago.  I think the only reason the buffer was so large was because I was attempting to look at it in the debugger. Good old Heisenbug.  Here is the new code for the sake of future generations:

    using (DataReader reader = new DataReader(socket.InputStream))
                {
                    MemoryStream jpg = new MemoryStream();
                    bool FirstJPEG = true;
                    reader.InputStreamOptions = InputStreamOptions.Partial;
                    await reader.LoadAsync(6144);  //read up to 6144 bytes (somewhat arbitrary since the .Partial will read out data instantaneously when data is written to the buffer).  Pretty much just "wait for next packet received". 
                    while (reader.UnconsumedBufferLength > 0)
                    {
                        //Create a new Byte Array to hold the latest data.
                        Byte[] message = new Byte[reader.UnconsumedBufferLength];
                        reader.ReadBytes(message); // Fill our byte array with the new data.
    
                        List<int> imageHeaders = SearchBytePattern(new byte[] { 0xFF, 0xD8 }, message); // Find the byte offset positions of each frame header. 
    
                        if (imageHeaders.Count > 0)  // Somewhere in this packet is the beginning of a new frame and therefore an end to the last frame. 
                        {
                            for (int i = 0; i < imageHeaders.Count; i++)  // Just in case we have some very small frames that fit entirely within a packet we'll iterate through all of the headers we found.   We could reduce latency slightly if we wanted to assume that no two frames exist in anyone packet.
                            {
                                int j;
                                if (i == 0) j = 0;
                                else j = imageHeaders[(i - 1)];
    
    
                                await jpg.WriteAsync(message, j, (imageHeaders[i] - j));   // If this is the first loop write from position 0 (start) for pos[i]-j bytes.  This should finish off the current jpg memory with everything right up to the start of the next frame. 
                                // If this isn't the first loop then write from the start of the previous header to the current one. 
    
                                // If this is our first frame header, we don't know if any junk had been sent previously, so just ignore the first frame and proceed. 
    
                                if (!FirstJPEG)
                                {
                                    //   getJPEG(jpg.ToArray());
    
                                    await _dispatcher.RunAsync(CoreDispatcherPriority.High, async () =>
                                    {
                                        JPEG2Bitmap(jpg.ToArray());
                                    });
                                }
                                else { FirstJPEG = false; }
    
                                // We've sucessfully written a JPEG to the BitmapDecoder so clear the JPEG. 
                                jpg.SetLength(0);
                            }
                            await jpg.WriteAsync(message, imageHeaders.Last(), (message.Length - imageHeaders.Last()));
                        }
                        else
                        {
                            await jpg.WriteAsync(message, 0, message.Length); // We found a "Middle" packet with no headers.   Appending the entire message to our JPEG MemoryStream.
                        }
    
                        await reader.LoadAsync(6144); // Refill the reader before the while loop finishes so that unless the network disconnects we go around and around and around in this thread...
                    }
    
                    reader.DetachStream(); // Clear the reader memory.
                }

    And the Byte array search code from another thread:

    static private List<int> SearchBytePattern(byte[] pattern, byte[] bytes)
            {
                List<int> positions = new List<int>();
                int patternLength = pattern.Length;
                int totalLength = bytes.Length;
                byte firstMatchByte = pattern[0];
                for (int i = 0; i < totalLength; i++)
                {
                    if (firstMatchByte == bytes[i] && totalLength - i >= patternLength)
                    {
                        byte[] match = new byte[patternLength];
                        Array.Copy(bytes, i, match, 0, patternLength);
                        if (match.SequenceEqual<byte>(pattern))
                        {
                            positions.Add(i);
                            i += patternLength - 1;
                        }
                    }
                }
                return positions;
            }

    Wednesday, March 25, 2015 3:03 AM

All replies

  • It seems the incoming package time is shorter than the handling image time, how about creating a temporary array to store the incoming package one by one, and then use another task to write to image?

    Wednesday, March 25, 2015 2:55 AM
  • Found the problem a couple hours ago.  I think the only reason the buffer was so large was because I was attempting to look at it in the debugger. Good old Heisenbug.  Here is the new code for the sake of future generations:

    using (DataReader reader = new DataReader(socket.InputStream))
                {
                    MemoryStream jpg = new MemoryStream();
                    bool FirstJPEG = true;
                    reader.InputStreamOptions = InputStreamOptions.Partial;
                    await reader.LoadAsync(6144);  //read up to 6144 bytes (somewhat arbitrary since the .Partial will read out data instantaneously when data is written to the buffer).  Pretty much just "wait for next packet received". 
                    while (reader.UnconsumedBufferLength > 0)
                    {
                        //Create a new Byte Array to hold the latest data.
                        Byte[] message = new Byte[reader.UnconsumedBufferLength];
                        reader.ReadBytes(message); // Fill our byte array with the new data.
    
                        List<int> imageHeaders = SearchBytePattern(new byte[] { 0xFF, 0xD8 }, message); // Find the byte offset positions of each frame header. 
    
                        if (imageHeaders.Count > 0)  // Somewhere in this packet is the beginning of a new frame and therefore an end to the last frame. 
                        {
                            for (int i = 0; i < imageHeaders.Count; i++)  // Just in case we have some very small frames that fit entirely within a packet we'll iterate through all of the headers we found.   We could reduce latency slightly if we wanted to assume that no two frames exist in anyone packet.
                            {
                                int j;
                                if (i == 0) j = 0;
                                else j = imageHeaders[(i - 1)];
    
    
                                await jpg.WriteAsync(message, j, (imageHeaders[i] - j));   // If this is the first loop write from position 0 (start) for pos[i]-j bytes.  This should finish off the current jpg memory with everything right up to the start of the next frame. 
                                // If this isn't the first loop then write from the start of the previous header to the current one. 
    
                                // If this is our first frame header, we don't know if any junk had been sent previously, so just ignore the first frame and proceed. 
    
                                if (!FirstJPEG)
                                {
                                    //   getJPEG(jpg.ToArray());
    
                                    await _dispatcher.RunAsync(CoreDispatcherPriority.High, async () =>
                                    {
                                        JPEG2Bitmap(jpg.ToArray());
                                    });
                                }
                                else { FirstJPEG = false; }
    
                                // We've sucessfully written a JPEG to the BitmapDecoder so clear the JPEG. 
                                jpg.SetLength(0);
                            }
                            await jpg.WriteAsync(message, imageHeaders.Last(), (message.Length - imageHeaders.Last()));
                        }
                        else
                        {
                            await jpg.WriteAsync(message, 0, message.Length); // We found a "Middle" packet with no headers.   Appending the entire message to our JPEG MemoryStream.
                        }
    
                        await reader.LoadAsync(6144); // Refill the reader before the while loop finishes so that unless the network disconnects we go around and around and around in this thread...
                    }
    
                    reader.DetachStream(); // Clear the reader memory.
                }

    And the Byte array search code from another thread:

    static private List<int> SearchBytePattern(byte[] pattern, byte[] bytes)
            {
                List<int> positions = new List<int>();
                int patternLength = pattern.Length;
                int totalLength = bytes.Length;
                byte firstMatchByte = pattern[0];
                for (int i = 0; i < totalLength; i++)
                {
                    if (firstMatchByte == bytes[i] && totalLength - i >= patternLength)
                    {
                        byte[] match = new byte[patternLength];
                        Array.Copy(bytes, i, match, 0, patternLength);
                        if (match.SequenceEqual<byte>(pattern))
                        {
                            positions.Add(i);
                            i += patternLength - 1;
                        }
                    }
                }
                return positions;
            }

    Wednesday, March 25, 2015 3:03 AM