Buffering to variable size with length inside content
-
Thursday, December 06, 2012 7:49 AM
Hi,
I have the following setup:
A class that reads certain frames from a network connection. These frames are sent to a processing services where they are translated. Now I want to do this in a reactive fashion. So I read from the networkstream in an observable (1kb at a time) and pass this observable to the processing service. Now the problem is, I don't know how large the frames really are. Good thing is, the frames do. The size of the frame is passed in the header of the frame. I can't process a package until it is complete so I want to use buffer, but this needs a lenght or a function to decide how to buffer. The length I do not know, and in the function I can't seem to acces the content. How can I buffer this.
Here is some code that I have:
public IObservable<byte[]> Read() { return Observable.Using(() => new NetworkStream(_connection), ns => Observable.FromAsync<byte[]>(async _ => { byte[] buffer = new byte[BUFFER_SIZE]; int bytesRead = await ns.ReadAsync(buffer, 0, BUFFER_SIZE, _token.Token); return buffer.Take(bytesRead).ToArray(); })).DoWhile(() => _connection.Connected); }This is how I read from the networkstream, BUFFER_SIZE is set to 1024.
The frames sent over ethernet are according to a specified protocol and are in essence:
TypeString (10b), #measurements(2b), someMoreBoringsStufffThatIsNotImportant(5b), Measurements (#measurements*6b)
Can anyone give any ideas on how I should do this?
Sincerely, Brecht
All Replies
-
Thursday, December 06, 2012 4:03 PM
Hi Brecht,
Take a look at Rxx Parsers. Here's a related discussion for parsing a binary stream. (There are also related labs in the Rxx source, but CodePlex won't let me access them in the browser at the moment.)
Sorry for the brevity but I'm stepping out now. If you need more info let me know and I'll follow up when I get back later.
- Dave
-
Tuesday, December 18, 2012 7:27 AM
Hi Dave,
This looks very helpful but I can't really see how I should use it on my code.
public IObservable<byte[]> Read() { return Observable.Using(() => new NetworkStream(_connection), ns => Observable.FromAsync<byte[]>(async _ => { byte[] buffer = new byte[BUFFER_SIZE]; int bytesRead = await ns.ReadAsync(buffer, 0, BUFFER_SIZE, _token.Token); return buffer.Take(bytesRead).ToArray(); })).DoWhile(() => _connection.Connected); }Where buffer_size = 1024 (just a constant value). I don't know how large a "package" is going to be. It could easily be larger than 1024.
Say the package is 1504 bytes, then I should be able to read the first 1024, read the header (12bytes) pick the 2 last bytes of the header to see how large the package is. (Multiply by six but I think I got that :-)) and then wait for the remainder of the package. How would I handle this with a parser?
Sincerely, Brecht
-
Tuesday, December 18, 2012 9:51 AM
Hi Brecht,
In a nutshell, use the SelectMany operator (e.g., multiple from statements) to combine rules sequentially.
Though the first thing you'll need to do is flatten the sequence. It's difficult (impossible?) to parse buffers varying by size.
(Note: I didn't try building or running this code. There may be typo's and other bugs.)
IObservable<byte> source = Read().SelectMany(bytes => bytes);
The next step is to set up an identity parser, as follows. Actually, it's not necessary to complete the grammar, but it will aid IntelliSense a bit. I find that sometimes IntelliSense gets confused and doesn't show any completion statements unless you give it a chance to see a completed grammar first, perhaps due to the underlying complexity that parsers abstract, or simply because VS is slow since I have a really old computer :)
IObservable<byte> parsed = source.ParseBinary(parser => from next in parser select next);The above query does nothing special. It simply passes every byte through the parser without changing them. Hence, it's the identity parser.
Next, you'll want to define rules based on your specification. Here's your spec:
TypeString (10b) - Header part 1
#measurements(2b) - Header part 2
someMoreBoringsStufffThatIsNotImportant(5b)
Measurements (#measurements*6b)The first rule will represent TypeString:
IObservable<byte> parsed = source.ParseBinary(parser => from next in parser let typeString = next.Exactly(10) select next);Notice that the typeString rule is quite similar to the original spec, though it's not being used in the grammar yet.
However, if you know that the type of the data will be a string, then instead of using the Exactly operator to get raw bytes, you could use a specialized conversion method on the parser object to get a string:
let typeString = parser.String(Encoding.ASCII, 10)
But since that's not part of your spec, I'll continue to use the first example that gets raw bytes.
Here is the query with the final header rule:
IObservable<byte> parsed = source.ParseBinary(parser => from next in parser let typeString = next.Exactly(10) let measurementCount = parser.Int16() let header = from ts in typeString from count in measurementCount select count select next);Notice that the SelectMany query (consecutive from statements) defines the header as a sub-grammar that waits for typeString followed by measurementCount and projects the actual count as the result, ignoring the typeString data completely.
Keep in mind that we aren't using the header grammar yet, we've only defined it.
Here's the query with the remaining rules defined:
IObservable<byte> parsed = source.ParseBinary(parser => from next in parser let typeString = next.Exactly(10) let measurementCount = parser.Int16() let header = from ts in typeString from count in measurementCount select count let someMoreBoringsStufffThatIsNotImportant = next.Exactly(5) select next);Notice that I haven't defined the measurements rule yet. I could define this as an inline function that takes an int and returns a grammar that matches 6 * int bytes. In practice, I'll do that sometimes. However, for the sake of simplicity here, I won't define measurements as a discrete rule. Instead, I'll just define the grammar and embed measurements inside of it. Since it's the final rule of the grammar, it should still look very close to the original spec.
IObservable<byte> parsed = source.ParseBinary(parser => from next in parser let typeString = next.Exactly(10) let measurementCount = parser.Int16() let header = from ts in typeString from count in measurementCount select count let someMoreBoringsStufffThatIsNotImportant = next.Exactly(5) select from count in header from _ in someMoreBoringsStufffThatIsNotImportant from measurements in next.Exactly(count * 6) // TODO: Parse measurements select measurements);The return value is a sequence of bytes spanning the entire block of measurements. The header and other data is dropped since they're not being used in the final projection.
- Dave
- Marked As Answer by BrechtVsk Wednesday, December 19, 2012 11:01 AM
-
Wednesday, December 19, 2012 8:15 AM
Hi Dave,
Thanks again for the reply. I just have one more problem with it. When trying this out, there seems to be no extension method for IObservable for the parser. Digging a little in the source code of Rxx I found that there is only an extension for IEnumerable. Or am I missing something?
Thanks in advance.
UPDATE: I found the problem, I was using the wrong namespace (Rxx.Parses.Linq instead of Rxx.Parsers.Reactive.Linq. Seems I didn't dig deep enough in the source code :d)
Sincerely, Brecht
- Edited by BrechtVsk Wednesday, December 19, 2012 8:46 AM

