How to implement SerialPort parser in Rx
-
Saturday, December 26, 2009 7:38 AMHi,
I'm trying to understand Rx but it's hard to find any working examples. I thought it would be worth to try to implement a simple parser for data comming through a serial port. The protocol is similar to NMEA GPS data (http://www.gpsinformation.org/dale/nmea.htm).
So my idea was to first wrap the SerialPort as Observable that exposes byte[] packets. Then a parser would subscribe to it to parse the packet into messages - each message has a command header and data string. The parser is Observable as well. Then for each type of command there would be another subscription that filters out messages from parser for this command. In the end I can observe the whole thing for particular type of commands (like in case of GPS I might subscribe only for position change commands).
Please let me know if this sounds like this is something where Rx should be used and if so where should I begin.
Should I create my own IObservable wrapper for SerialPort or there is something that can help me with this?
Thanks,
-Szymon
Answers
-
Monday, December 28, 2009 7:10 AMOwner
Hi Szymon,
Wrapping a SerialPort object is definitely a possibility. While I don't have a serial port or a device here, I did do some basic hacking that may just work, using Observable.Create<T> which provides a convient way to create IObservable<T> objects. Here's what I came up with:
static IObservable<byte> CreatePortObservable(SerialPort port) { return Observable.Create<byte>(obs => { // Alternative Rx-driven approach on the inside // // var rcv = Observable.FromEvent<SerialDataReceivedEventArgs>(port, "DataReceived"); // var err = Observable.FromEvent<SerialErrorReceivedEventArgs>(port, "ErrorReceived"); var rcv = new SerialDataReceivedEventHandler((sender, e) => { if (e.EventType == SerialData.Eof) { obs.OnCompleted(); } else { var buf = new byte[port.BytesToRead]; for (int i = 0; i < port.Read(buf, 0, buf.Length); i++) obs.OnNext(buf[i]); } }); port.DataReceived += rcv; var err = new SerialErrorReceivedEventHandler((sender, e) => { obs.OnError(new Exception(e.EventType.ToString())); }); port.ErrorReceived += err; return () => { port.DataReceived -= rcv; port.ErrorReceived -= err; // depending on ownership of port, you could Dispose it here too }; }); }Obviously, as I didn't test this and took some shortcuts (e.g. in the OnError case), this should be seen merely as a hint for a potential approach, rather than a definite solution.
Hope this helps,
-Bart
Bart De Smet [MVP]- Proposed As Answer by Ryan RileyMVP Tuesday, December 29, 2009 4:03 PM
- Marked As Answer by head.in.the.boxMicrosoft Employee, Owner Tuesday, December 29, 2009 4:26 PM