none
Using Reactive Extensions (Rx) for socket programming practical?

    General discussion

  • What is the most succint way of writing the GetMessages function with Rx:

     static void Main()
     {
     Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
    
     var messages = GetMessages(socket, IPAddress.Loopback, 4000);
     messages.Subscribe(x => Console.WriteLine(x));
    
     Console.ReadKey();
     }
    
     static IObservable<string> GetMessages(Socket socket, IPAddress addr, int port)
     {
     var whenConnect = Observable.FromAsyncPattern<IPAddress, int>(socket.BeginConnect, socket.EndConnect)(addr, port);
    
     // now will receive a stream of messages
     // each message is prefixed with an 4 bytes/Int32 indicating it's length. 
     // the rest of the message is a string
    
     // ????????????? Now What ????????????? 
     }
    

    A simple server as a driver for the above sample: http://gist.github.com/452893#file_program.cs

    On Using Rx For Socket Programming

    I've been investigating using Reactive Extensions for some socket programming work I am doing.  My motivation for doing so would be that it would somehow make the code "simpler". Whether this would mean less code, less nesting something along those lines. 

    However so far that does not seem to be the case:

    1.  I haven't found very many examples of using Rx with sockets
    2.  The example I have found don't seem less complicated then my existing BeginXXXX, EndXXXX code
    3.  Although `Observable` has extension methods for FromAsyncPattern, this does not cover the `SocketEventArgs` Async API.

    Current Non-Working Solution

    Here is what I have so far. This doesn't work, it fails with a stack overflow (heh) I haven't figured out the semantics so that I can create an `IObservable` that will read a specified number of bytes.

     static IObservable<int> GetMessages(Socket socket, IPAddress addr, int port)
     {
      var whenConnect = Observable.FromAsyncPattern<IPAddress, int>(socket.BeginConnect, socket.EndConnect)(addr, port);
    
      // keep reading until we get the first 4 bytes
      byte[] buffer = new byte[1024];
      var readAsync = Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>(socket.BeginReceive, socket.EndReceive);
      
      IObservable<int> readBytes = null;
      var temp = from totalRead in Observable.Defer(() => readBytes)
       where totalRead < 4
       select readAsync(buffer, totalRead, totalRead - 4, SocketFlags.None);
      readBytes = temp.SelectMany(x => x).Sum();
    
      var nowDoSomethingElse = readBytes.SkipUntil(whenConnect);
     }
    
    

    Cross Posted from: http://stackoverflow.com/questions/3118289/using-reactive-extensions-rx-for-socket-programming-practical

    • Edited by Joe K Friday, June 25, 2010 7:20 PM added cross-post reference
    • Changed type Jeffrey van Gogh - MS Tuesday, June 29, 2010 10:28 PM Forum based pair programming
    Friday, June 25, 2010 7:19 PM

All replies

  • Hi Joe,

    Your goal using Rx should be to make something like the following work, because it's semantically correct according to your specification.  Not to mention, it reads almost like the specification as well.

    (Note that I haven't actually tested this code, except to ensure that it compiles.)

    var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
    
    var query = from _ in socket.WhenConnected(IPAddress.Loopback, 4000)
    	from value in
    		(from first in socket.WhenDataReceived(4)
    		let length = BitConverter.ToInt32(first, 0)
    		from second in socket.WhenDataReceived(length)
    		select Encoding.UTF8.GetString(second, 0, length))
    		.Repeat()
    	select value;
    
    using (query.Subscribe(Console.WriteLine))
    {
    	Console.ReadKey();
    }

    The first thing that is required is the WhenConnected method.

    public static class NetworkExtensions
    {
    	public static IObservable<Unit> WhenConnected(this Socket socket, IPAddress address, int port)
    	{
    		return Observable.FromAsyncPattern<IPAddress, int>(socket.BeginConnect, socket.EndConnect)(address, port);
    	}
    }

    That's pretty straight-forward.  The next requirement is the WhenDataReceived method.  This one's a bit more complicated, but note that I made it more complicated than may be necessary in order to handle shutdowns gracefully (converting OnError to OnCompleted):

    (Edit: Posted the wrong implementation of WhenDataReceived.  The required version is below, followed by the original.  The difference is that the original returns buffers of 1024 until the socket is closed, whereas the required overload returns a single buffer of the specified length.)

    public static IObservable<byte[]> WhenDataReceived(this Socket socket, int byteCount, SocketFlags flags = SocketFlags.None)
    {
    	return Observable.CreateWithDisposable<byte[]>(
    		observer =>
    		{
    			var whenDataReceived = Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>(
    				socket.BeginReceive,
    				socket.EndReceive);
    
    			byte[] buffer = new byte[byteCount];
    			int remainder = byteCount;
    
    			return Observable.While(() => remainder > 0,
    				Observable.Defer(() => whenDataReceived(buffer, buffer.Length - remainder, remainder, flags)))
    				.Prune(whenCompleted => whenCompleted.Select(_ => buffer))
    				.Subscribe(
    					observer.OnNext,
    					ex =>
    					{
    						var socketError = ex as SocketException;
    
    						if (socketError != null && socketError.SocketErrorCode == SocketError.Shutdown)
    							observer.OnCompleted();
    						else
    							observer.OnError(ex);
    					},
    					observer.OnCompleted);
    		});
    }
    public static IObservable<byte[]> WhenDataReceived(this Socket socket, SocketFlags flags = SocketFlags.None)
    {
    	return Observable.CreateWithDisposable<byte[]>(
    		observer =>
    		{
    			var whenDataReceived = Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>(
    				socket.BeginReceive,
    				socket.EndReceive);
    
    			byte[] buffer = new byte[1024];
    
    			return Observable.While(() => socket.Connected, 
    				Observable.Defer(() => whenDataReceived(buffer, 0, buffer.Length, flags)))
    				.Select(read => buffer.Take(read).ToArray())
    				.Subscribe(
    					observer.OnNext,
    					ex =>
    					{
    						var socketError = ex as SocketException;
    
    						if (socketError != null && socketError.SocketErrorCode == SocketError.Shutdown)
    							observer.OnCompleted();
    						else
    							observer.OnError(ex);
    					},
    					observer.OnCompleted);
    		});
    }

    - Dave


    http://davesexton.com/blog
    • Edited by Dave Sexton Friday, June 25, 2010 10:58 PM Posted wrong implementation of WhenDataReceived
    • Marked as answer by Joe K Monday, June 28, 2010 12:02 PM
    • Unmarked as answer by Joe K Monday, June 28, 2010 6:10 PM
    Friday, June 25, 2010 10:54 PM
  • Another way to wrap networkstream in an observable.

    http://staceyw.posterous.com/observe-a-network-stream-using-rx

    Monday, June 28, 2010 5:51 AM
  • This doesn't work yet. One thing I noticed is that in "WhenDataReceived", you never assign to "remainder". Also need to handle when read returns 0 indicating shutdown.
    Monday, June 28, 2010 6:50 PM
  • Joe, check out my post above.  Handles shutdown also.
    Monday, June 28, 2010 8:51 PM
  • Another way to wrap networkstream in an observable.

    http://staceyw.posterous.com/observe-a-network-stream-using-rx

    This seems to be missing a key component.. "ObservableStreamAsync". Also I'm looking to use the Rx in parsing some socket traffic in Silverlight, where my only option is ReceiveAsync/SocketEventArgs. But i'm starting with trying to get it working with the BeginXXX/EndXXX methods first.

    Joe

    Monday, June 28, 2010 9:08 PM
  • Hi Joe,

    As noted, I didn't fully test my implementation - only that it compiled.

    > One thing I noticed is that in "WhenDataReceived", you never assign to "remainder".

    Good observation ;)

    That should be an easy fix.  Just add Do:

    return Observable.While(() => remainder > 0,
    	Observable.Defer(() => 
    		whenDataReceived(buffer, buffer.Length - remainder, remainder, flags)
    		.Do(read => remainder -= read)))
    	.Prune(whenCompleted => whenCompleted.Select(_ => buffer))
    	.Subscribe(...)

    > Also need to handle when read returns 0 indicating shutdown.

    That's not in your original spec.  My intention was to handle shutdown via a closed socket, because I assumed that the client would close the socket when there was no more data to be sent.  Although, I think my implementation won't work for this without checking for the Disconnecting error as well.  The following should work better:

    (socketError.SocketErrorCode == SocketError.Shutdown || socketError.SocketErrorCode == SocketError.Disconnecting)

    Regardless, to stop the query from repeating when the length is 0 as well, one option is to use Observable.If and TakeWhile, with a null message to indicate the end of the stream: 

    var query =
    	from _ in socket.WhenConnected(IPAddress.Loopback, 4000)
    	from message in
    		(from first in socket.WhenDataReceived(4)
    		 let length = BitConverter.ToInt32(first, 0)
    		 from message in Observable.If(
    			condition: () => length > 0,
    			thenSource: from second in socket.WhenDataReceived(length)
    				  select Encoding.UTF8.GetString(second, 0, length),
    			elseSource: Observable.Return<string>(null))
    		 select message)
    		.Repeat()
    		.TakeWhile(message => message != null)
    	select message;

    - Dave


    http://davesexton.com/blog
    Monday, June 28, 2010 10:17 PM
  • Sorry for omission.  Here below.

    // Returns an observable stream. Publishes byte[] after each read completes.
    // byte[].Length <= bufferSize and >= 1
    // After reading completes, the Observable will signal OnComplete.
    public static IObservable<byte[]> ObservableStreamAsync(this Stream stream, int bufferSize)
    {
     if (stream == null)
      throw new ArgumentNullException("stream");
     Subject<byte[]> sub = new Subject<byte[]>();
    
     ReadHelper(stream, bufferSize, sub);
    
     return sub;
    }
    
    private static void ReadHelper(Stream stream, int bufferSize, Subject<byte[]> sub)
    {
     byte[] buf = new byte[bufferSize];
    
     try
     {
      stream.BeginRead(buf, 0, buf.Length, iar =>
      {
       try
       {
        int bytesRead = stream.EndRead(iar);
        if (bytesRead > 0)
        {
         // If we don't read bufferSize bytes, copy array to smaller size array.
         // This way, the consumer of the service always sees only full arrays of data.
         // This could have a resource tax if stream is constantantly returning < bufferSize bytes.
         // This will normally only happen on the last read, but if happening more, you can tune the bufferSize.
         // Alternatively, we could return a class/struct containing actual read Buffer and BytesRead
         // properties and let the consumer inspect BytesRead properties.
         if (bytesRead < bufferSize)
         {
          byte[] newBuf = new byte[bytesRead];
          Array.Copy(buf, newBuf, bytesRead);
          buf = newBuf;
         }
         sub.OnNext(buf); // Post result before next read to prevent any overlap.
         ReadHelper(stream, bufferSize, sub);
        }
        else
        {
         sub.OnCompleted();
        }
       }
       catch (Exception ex)
       {
        sub.OnError(ex);
        return;
       }
      }, null);
     }
     catch (Exception ex)
     {
      Console.WriteLine("Error.");
      sub.OnError(ex);
     }
    }
    Tuesday, June 29, 2010 1:46 AM
  • > Also need to handle when read returns 0 indicating shutdown.

    That's not in your original spec.  My intention was to handle shutdown via a closed socket, because I assumed that the client would close the socket when there was no more data to be sent.  Although, I think my implementation won't work for this without checking for the Disconnecting error as well.  The following should work better:

    I should have been more clear. What I mean by "read returning zero" is:

    If the remote host shuts down the Socket connection with the Shutdown method, and all available data has been received, the EndReceive method will complete immediately and return zero bytes.

    http://msdn.microsoft.com/en-us/library/w7wtt64b.aspx

    By handling the zero bytes from EndRecieve, the graceful shutdown case is taken care of without an exception.

    Tuesday, June 29, 2010 2:34 AM
  • Here's what I have now:

       var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
    
       var messageQuery = from first in socket.WhenDataReceived(4)
            let length = first == null ? 0 : BitConverter.ToInt32(first, 0)
            from second in Observable.If(
             () => length > 0, 
             socket.WhenDataReceived(length),
             Observable.Return<byte[]>(null))
            select second == null ? null : Encoding.UTF8.GetString(second, 0, length);
    
       var query = from _ in socket.WhenConnected(IPAddress.Loopback, 4000)
          from value in messageQuery.Repeat().TakeWhile(value => value != null)
          select value;
    
       using (query.Subscribe(Console.WriteLine, () => Console.WriteLine("Done")))
       {
        Console.ReadKey();
       }


      and

     public static class NetworkExtensions
     {
      public static IObservable WhenConnected(this Socket socket, IPAddress address, int port)
      {
       return Observable.FromAsyncPattern(socket.BeginConnect, socket.EndConnect)(address, port);
      }
    
      public static IObservable WhenDataReceived(this Socket socket, int byteCount, SocketFlags flags = SocketFlags.None)
      {
    
       return Observable.CreateWithDisposable(
        observer =>
        {
         var whenDataReceived = Observable.FromAsyncPattern(
          socket.BeginReceive,
          socket.EndReceive);
    
         byte[] buffer = new byte[byteCount];
         bool zeroBytesRead = false;
         int remainder = byteCount;
    
         return Observable.While(
          () => remainder > 0 && !zeroBytesRead,
          Observable.Defer(() => 
           whenDataReceived(buffer, buffer.Length - remainder, remainder, flags)
            .Do(read => { remainder = remainder - read; zeroBytesRead = read == 0 })))
          .Prune(whenCompleted => whenCompleted.Select(_ => zeroBytesRead ? null : buffer))
          .Subscribe(
           observer.OnNext,
           observer.OnError,
           observer.OnCompleted);
        });
      }
     }
    

    Seems to work, not sure if I can make it simpler then this. See any issues?

     

    I was hoping to change "WhenDataRecieved" to return an "Empty" observable (as in complete with no results) and then only figure out how to do something like "RepeatUntilEmpty", but couldn't figure out how to do "RepeatUntilEmpty" 

    Joe

    Tuesday, June 29, 2010 2:53 AM
  • Hi Joe,

    > What I mean by "read returning zero" is: [snip]

    Ok, thanks, that makes more sense now :)

    I'll write up a test case for my examples to ensure that they work the way that I expected.

    - Dave


    http://davesexton.com/blog
    Tuesday, June 29, 2010 3:14 AM
  • Already have a test case: http://gist.github.com/452893

    Also, think I've got "RepeatUntilEmpty" figured out, though it seems.. "hacky". Not sure if it behaves well with all ISchedulers. 

        public static IObservable<T> RepeatUntilEmpty<T>(this IObservable<T> o)
        {
          var complete = false;
          Func<bool> completeCheck = () =>
            {
              var result = !complete;
              complete = true;
              return result;
            };
          return Observable.While(completeCheck, o.Do(_ => complete = false));
        }

    So that changes it to:

          var messageQuery = from first in socket.WhenDataReceived(4)
                    let length = BitConverter.ToInt32(first, 0)
                    from second in socket.WhenDataReceived(length)
                    select Encoding.UTF8.GetString(second, 0, length);
    
          var query = from _ in socket.WhenConnected(IPAddress.Loopback, 4000)
                from value in messageQuery.RepeatUntilEmpty()
                select value;

    Tuesday, June 29, 2010 3:21 AM
  • Hi Joe,

    Following is the complete console app that exercies my examples.  I noticed a couple of issues so I've modified a few things.  I've also added support for converting read = 0 to OnCompleted, as per your request.

    As for the length, this is probably about as small as it's going to get.  Compared to an imperative version that has the same exact behavior, I think this is pretty good.  It's semantically correct using declarative-style programming, so it reads very close to the spec, and it's actually quite small, especially if you consider that NetworkExtensions is general code that's reusable.

    First, here's some example output in server shutdown mode (.TakeWhile is uncommented):

    The server is running.
    
    The client is listening.
    
    Enter messages to be sent to all clients, one per line.
    Enter an empty message to shutdown.
    This is a test
    Message from server: This is a test
    Test message #2
    Message from server: Test message #2
    This is the last message, then shutdown.
    Message from server: This is the last message, then shutdown.
    
    Server shutdown.
    Client shutdown.
    
    All sockets closed.

     

    Complete Souce Code

    using System;
    using System.Diagnostics.Contracts;
    using System.Linq;
    using System.Net;
    using System.Net.Sockets;
    using System.Text;
    using System.Threading;
    
    namespace ReactiveProgrammingConsole
    {
    	class ReactiveSocketsLab
    	{
    		static void Main()
    		{
    			var ip = IPAddress.Loopback;
    			var port = 4000;
    
    			var serverCompleted = new ManualResetEvent(false);
    
    			StartServer(ip, port)
    				.Subscribe(_ => serverCompleted.Set());
    
    			Console.WriteLine("The server is running.");
    
    			var clientCompleted = new ManualResetEvent(false);
    
    			StartClient(ip, port)
    				.Subscribe(_ => clientCompleted.Set());
    
    			Console.WriteLine();
    			Console.WriteLine("The client is listening.");
    			Console.WriteLine();
    			Console.WriteLine("Enter messages to be sent to all clients, one per line.");
    			Console.WriteLine("Enter an empty message to shutdown.");
    
    			WaitHandle.WaitAll(new[] { serverCompleted, clientCompleted });
    
    			Console.WriteLine();
    			Console.WriteLine("All sockets closed.");
    			Console.ReadKey();
    		}
    
    		static IObservable<Unit> StartServer(IPAddress ip, int port)
    		{
    			var server =
    				from listener in Observable.Return(new TcpListener(ip, port))
    					.Do(listener => listener.Start())
    				from client in Observable.FromAsyncPattern<TcpClient>(listener.BeginAcceptTcpClient, listener.EndAcceptTcpClient)()
    				from response in
    					Observable.Using(() => client, _ =>
    					Observable.Using(client.GetStream, stream =>
    						from message in Observable
    							.Defer(() => Observable
    							.Start(() => Console.ReadLine()))
    							.Repeat()
    							// Comment TakeWhile to test the client's ability to shutdown itself upon receiving an empty message.
    							// Uncomment TakeWhile to test the server's ability to gracefully shutdown all clients and itself.
    							.TakeWhile(message => message.Length > 0)
    						select new { Stream = stream, Message = Encoding.UTF8.GetBytes(message) }))
    				select response;
    
    			return
    				server.Do(
    					response =>
    					{
    						response.Stream.Write(BitConverter.GetBytes(response.Message.Length), 0, 4);
    						response.Stream.Write(response.Message, 0, response.Message.Length);
    					},
    					ex => Console.WriteLine("Server error: {0}", ex),
    					() => Console.WriteLine("Server shutdown."))
    				.Prune(whenCompleted => whenCompleted.Select(_ => new Unit()));
    		}
    
    		static IObservable<Unit> StartClient(IPAddress ip, int port)
    		{
    			var client = Observable.Using(
    				() => new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp),
    				socket =>
    				from _ in socket.WhenConnected(ip, port)
    				from message in
    					(from first in socket.WhenDataReceived(4)
    					 let length = BitConverter.ToInt32(first, 0)
    					 from message in
    						 Observable.If(
    							 condition: () => length > 0,
    							 thenSource: from second in socket.WhenDataReceived(length)
    								  select Encoding.UTF8.GetString(second, 0, length),
    							 elseSource: Observable.Return<string>(null))
    					 select message)
    					.Repeat()
    					.TakeWhile(message => message != null)
    				select message);
    
    			return
    				client.Do(
    					message => Console.WriteLine("Message from server: {0}", message),
    					ex => Console.WriteLine("Client error: {0}", ex),
    					() => Console.WriteLine("Client shutdown."))
    				.Prune(whenCompleted => whenCompleted.Select(_ => new Unit()));
    		}
    	}
    
    	public static partial class NetworkExtensions
    	{
    		public static IObservable<Unit> WhenConnected(this Socket socket, IPAddress address, int port)
    		{
    			return Observable.FromAsyncPattern<IPAddress, int>(socket.BeginConnect, socket.EndConnect)(address, port);
    		}
    
    		public static IObservable<byte[]> WhenDataReceived(this Socket socket, int byteCount, SocketFlags flags = SocketFlags.None)
    		{
    			Contract.Requires(byteCount > 0);
    
    			return Observable.CreateWithDisposable<byte[]>(
    				observer =>
    				{
    					var whenDataReceived = Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>(
    						socket.BeginReceive,
    						socket.EndReceive);
    
    					byte[] buffer = new byte[byteCount];
    					int remainder = byteCount;
    					bool shutdown = false;
    
    					return Observable.While(() => remainder > 0 && !shutdown,
    						Observable.Defer(() =>
    							whenDataReceived(buffer, buffer.Length - remainder, remainder, flags)
    							.Do(read =>
    							{
    								remainder -= read;
    
    								if (read == 0)
    									shutdown = true;
    							})))
    						.Prune(whenCompleted => whenCompleted.Select(_ => buffer))
    						.Subscribe(
    							observer.OnNext,
    							ex =>
    							{
    								var socketError = ex as SocketException;
    
    								if (socketError != null &&
    										(socketError.SocketErrorCode == SocketError.Shutdown
    									|| socketError.SocketErrorCode == SocketError.Disconnecting))
    								{
    									observer.OnCompleted();
    								}
    								else
    									observer.OnError(ex);
    							},
    							observer.OnCompleted);
    				});
    		}
    	}
    }
    

    - Dave


    http://davesexton.com/blog
    Tuesday, June 29, 2010 3:36 AM
  • Tuesday, June 29, 2010 10:27 PM
  • Following this thread, I am starting to wonder out load if I would use Rx for this kind of code.  Even as an old network guy, I find this code difficult to follow and reason about.  I am reminded of "If you have a new hammer, everything starts looking like a nail".  In the whole, we want to make readable and maintainable code - not sure this is either.  Probably also a bear to debug.  I think the real gold of rx is not so much "inside" your libraries, but more consumer facing.  Expose your outputs as rx so the consumer can filter, join, etc on your streams.  Then again, maybe wrong on all points. In any event, always interesting to push the edges.

    - William

    Wednesday, June 30, 2010 5:01 AM
  • Hi William,

    If you were in charge of writing the networking APIs in .NET and one of your requirements was to add methods/events that support asynchronous invocations and callbacks, wouldn't you want to use Rx for the reasons that you mentioned?  It would be a consumer-facing (public), asynchronous, reactive API, and thus Rx is a perfect fit, according to your point.

    Just because this particular example may be internal code, doesn't mean that it's any less fit for use with Rx.  Perhaps my StartClient and StartServer methods could be direct implementations for some public APIs that return observables.  If they were, would you then consider using Rx in the manner that I have?  What's the difference whether the API is public or internal?

    I also tend to disagree with your comment, "I find this code difficult to follow and reason about".  Once you learn about all of these combinators and how to use Rx to provide the output that you'd expect, it all becomes much clearer.  Note, however, that I don't think it's perfect - there are still things that feel slightly wrong and aren't as clear as they probably could be.  Rx still seems to be under active development though, so hopefully it will improve with time - toward perfection ;)

    Furthermore, most of the code actually reads just like the spec that you might have written for it.  It really just doesn't get any clearer than that.  The imperative version of this stuff is certainly more complicated and difficult to understand if one has equal experience with each.  A declarative, semantic approach to writing complicated algorithms is something LINQ provides for us and Rx is just taking advantage of that feature.  Ultimately, LINQ and Rx together allow us to specify what we'd like to happen as executable code, easily control side-effects and also take advantage of lazy execution.  Therefore, it's probably just a matter of experience whether it's easily understood and preferred over a similar-behaving imperative version.

    - Dave


    http://davesexton.com/blog
    Wednesday, June 30, 2010 2:44 PM
  • Check out this post where I've created two classes with extensions on Socket methods that

    1. Wrap each async method into a Task.

    2. I've created an extension method on Socket that read async data and return an IObservable<byte>.

    Finally, I've used this classes like so:

     

    socket.ReadToObservable(exceptionHandler).ObserveOn(Scheduler.CurrentThread);

     

    var messages = from headerBytes in dataRead.Buffer(4)

                    let headerLength = BitConverter.ToInt32(headerBytes.Reverse().ToArray(), 0)

                    let message = dataRead.Take(headerLength)

                    select new Tuple<int,byte[]>(headerLength,message.ToEnumerable().ToArray());

     

    messages.Subscribe(x =>

    {

        Console.WriteLine("message received length: " + x.Item1);

        Console.Write(Encoding.Unicode.GetChars(x.Item2));

    });

    Sunday, October 30, 2011 10:36 PM
  • This project might interest you.

    http://rxx.codeplex.com/SourceControl/changeset/view/63605


    James Miles http://enumeratethis.com
    Monday, October 31, 2011 7:15 AM