using Observable.FromAsyncPattern with a NetworkStream?
-
Freitag, 18. Juni 2010 21:11
Using the following code:
TcpListener listener = new TcpListener(IPAddress.Any, 7777);
listener.Start();
Console.WriteLine("Waiting for connection.");
TcpClient client = listener.AcceptTcpClient();
Console.WriteLine("Client connected, reading data...");
NetworkStream ns = client.GetStream();
byte[] buffer = new byte[1024];
Func<byte[], int, int, IObservable<int>> readerFunc = Observable.FromAsyncPattern<byte[], int, int, int>(ns.BeginRead, ns.EndRead);
IObservable<int> observableStream = readerFunc(buffer, 0, 1024); observableStream.Subscribe( bytesRead => Console.WriteLine("Recieved {0} bytes.", bytesRead), ex => Console.WriteLine("Exception: {0}", ex.ToString()), () => Console.WriteLine("Completed.") ); Console.WriteLine("Press return to quit"); Console.ReadLine(); listener.Stop();I'm only getting one packet of data then the subscription is completed. How can I continue receiving data, or this not the intention of the "FromAsyncPattern"? Cheers!
Alle Antworten
-
Freitag, 18. Juni 2010 23:00
Hi,
Streams must be read into a buffer incrementally. See the example in the documentation.
NetworkStream.EndRead Method
http://msdn.microsoft.com/en-us/library/system.net.sockets.networkstream.endread.aspxI recommend using WebClient instead, if it meets your requirements. The Download* methods are especially useful.
System.Net.WebClient
http://msdn.microsoft.com/en-us/library/system.net.webclient.aspx- Dave
http://davesexton.com/blog -
Samstag, 19. Juni 2010 06:38
Thanks Dave for taking your time to answer, but no, I can't use WebClient since I'm trying to use it as a Tcp SERVER. :)
Actually what I'm getting at, but can't grasp since I'm a total newbie to Rx, is that it confuses me that I can't do a "observableStream.Repeat().Subscribe(...)" in this case, as the "FromAsyncPattern" seems to be aimed at invoke-once scenarios. What I woud like to do is to auto re-subscribe and continue receiving more data as it comes in.
Cheers!
-
Samstag, 19. Juni 2010 06:40
I did manage to get this to work by utilizing "Observable.Defer" and creating the observableStream like so:
var observableStream = Observable.Defer(() => {
Func<byte[], int, int, IObservable<int>> readerFunc = Observable.FromAsyncPattern<byte[], int, int, int>(ns.BeginRead, ns.EndRead);
return readerFunc(buffer, 0, 1024);
}); -
Samstag, 19. Juni 2010 08:55
Hi Ted,
Yea, I ignored the TcpListener part, sorry :)
- Dave
http://davesexton.com/blog -
Samstag, 19. Juni 2010 09:22
Hi Ted,
The problem with your original example, as I tried to point out, was that you were only reading from the stream into a buffer once, which is not the correct pattern for reading from streams.
You can try using Observable.While to repeatedly read from the stream. Defer can also be used, as you've discovered, to allow side-effects to occur upon each subscription (while repeating). In this case, the side-effect is invoking the function returned by FromAsyncPattern.
byte[] buffer = new byte[1024]; var readAsync = Observable.FromAsyncPattern<byte[], int, int, int>(ns.BeginRead, ns.EndRead); var whenRead = Observable.While(() => ns.DataAvailable, Observable.Defer(() => readAsync(buffer, 0, 1024)));
- Dave
http://davesexton.com/blog -
Samstag, 19. Juni 2010 10:21
Hi Ted,
Here's a more complete example of what Rx can do. Note that I haven't actually tested it though (I have to go catch a train now :).
using System; using System.Collections.Generic; using System.Concurrency; using System.Disposables; using System.Linq; using System.Net; using System.Net.Sockets; namespace ReactiveProgrammingConsole { class TcpListenerLab { static void Main() { Console.WriteLine("Start listening..."); using ( IPAddress.Any.Listen(port: 7777) .Subscribe( request => Console.WriteLine("Recieved {0} bytes from {1}.", request.ReceivedBytes.Count, request.EndPoint), ex => Console.WriteLine("Exception: {0}", ex.ToString()), () => Console.WriteLine("Completed."))) { Console.ReadKey(); } Console.WriteLine("Listener stopped."); Console.ReadKey(); } } public sealed class TcpRequest { public IPEndPoint EndPoint { get { return endPoint; } } public ICollection<byte> ReceivedBytes { get { return receivedBytes; } } private readonly IPEndPoint endPoint; private readonly ICollection<byte> receivedBytes; public TcpRequest(IPEndPoint endPoint, ICollection<byte> receivedBytes) { this.endPoint = endPoint; this.receivedBytes = receivedBytes; } } public static class NetworkExtensions { public static IObservable<TcpRequest> Listen(this IPAddress address, int port) { return Listen(address, port, Scheduler.CurrentThread); } public static IObservable<TcpRequest> Listen(this IPAddress address, int port, IScheduler scheduler) { return Observable.CreateWithDisposable<TcpRequest>( observer => { var listener = new TcpListener(address, port); Func<IObservable<TcpClient>> listenAsync = Observable.FromAsyncPattern<TcpClient>( listener.BeginAcceptTcpClient, listener.EndAcceptTcpClient); IObservable<TcpClient> whenClientConnected = Observable.Defer(() => listenAsync()).Repeat(); var whenBeginReceivingData = from client in whenClientConnected let buffer = new byte[1024] from read in Observable.Using( () => client.GetStream(), stream => { var readAsync = Observable.FromAsyncPattern<byte[], int, int, int>( stream.BeginRead, stream.EndRead); return Observable.While(() => stream.DataAvailable, Observable.Defer(() => readAsync(buffer, 0, buffer.Length))); }) group buffer.Take(read).ToArray() by client into g select g; CompositeDisposable disposables = null; disposables = new CompositeDisposable( whenBeginReceivingData.Subscribe( client => disposables.Add( client.Aggregate(new List<byte>(1024), (list, buffer) => { list.AddRange(buffer); return list; }) .Select(data => new TcpRequest( (IPEndPoint) client.Key.Client.RemoteEndPoint, data)) .Subscribe( request => observer.OnNext(request), ex => { var socketEx = ex as SocketException; if (socketEx != null && socketEx.SocketErrorCode == SocketError.Shutdown) { observer.OnCompleted(); } else observer.OnError(ex); })), observer.OnError, observer.OnCompleted), Disposable.Create(() => listener.Stop()), scheduler.Schedule(() => { try { listener.Start(); } catch (Exception ex) { observer.OnError(ex); } })); return disposables; }); } } }- Dave
http://davesexton.com/blog- Als Antwort vorgeschlagen James Miles Dienstag, 22. Juni 2010 09:51
-
Sonntag, 20. Juni 2010 07:59
Thanks Dave!
Now, that was one elaborate example! Much appreciated! Now I have something to digest, and play with. Rx really seems to be lots of fun digging into, but also somewhat of a brain twister... :) Thanks again!
Cheers,
Ted- Als Antwort markiert Jeffrey van Gogh - MS Dienstag, 29. Juni 2010 22:05
-
Dienstag, 29. Juni 2010 22:05Another option here is to use the Iterate pattern: http://social.msdn.microsoft.com/Forums/en/rx/thread/74592668-bbad-45a3-b115-72ae14064a29
-
Montag, 25. Juli 2011 19:29
Thanks a TON. Really. Rx seems so much awesome and sucha beauty but very tricky..atleast for people new to linq.
I was trying to figure out how to iterate the collection to get connected sockets every time it happens but failed to do so. Finally i found this reply. I am glad to read this. Thanks again. God Bless
-
Donnerstag, 19. Januar 2012 00:25Hi,I have to implement a proxy which forwards messages from a central server to multiple clients.In the current version this is done with many threads.The current proxy creates for each client 3 threads (one for establishing the connection, one for sending data to the central server and one to send data from the central server to the client)I'm absolutely new to RX and LINQ isn't my best friend (yet)...After watching multiple videos on MSDN and reading multiple articles and HowTo's I found this post.Unfortunately the code above is not running correctly.Problem 1: Observable.CreateWithDisposable is not existing anymore. I found one article which told me that I should use just Oberveable.Create instead.Problem 2: "System.InvalidOperationException: Not listening. You must call the Start() method before calling this method.\r\n at System.Net.Sockets.TcpListener.BeginAcceptTcpClient(AsyncCallback callback, Object state)\r\n at System.Reactive.Linq.Observable.<>c__DisplayClass2`1.<FromAsyncPattern>b__0()"So I added:listener.Start();after:var listener = new TcpListener(address, port);now i don't get this exception and I getProblem 3:Nothing is written to the Console. But when is set a breakpoint I can see that my simple test client is connected and has send some data.any help is appreciated,Stefan
-
Donnerstag, 19. Januar 2012 06:49
Hi Stefan,
Take a look at the Rxx project on CodePlex. It provides reactive wrappers for various System.Net types. We also have a few labs that illustrate their usage. For example:
Socket Lab
http://rxx.codeplex.com/SourceControl/changeset/view/64579#1055777We don't have a wrapper for TcpListener, but I'll consider adding one. We do have an HTTP implementation though:
ObservableHttpListener Class
http://rxx.codeplex.com/SourceControl/changeset/view/64579#973572- Dave
http://davesexton.com/blog -
Donnerstag, 19. Januar 2012 10:47
Hi Dave,
I'm glad that you found my post ;-)
Thx for the quick response!
Doesn't your sample above provide pretty much from the code I need?
I have to rewrite an existing proxy server and i guess that i can replace much (maybe all) the thread logic (endless while loops, Thread.Sleep(), "Data-pulling"...) with RX.
- I need to connect my proxy server to two ports on the central server (one for sending data, one for receiving data). This has to be a TcpConnection since there is a custom protocol transmitted. In current implementation therefor are two threads used.
- I need to parse an ID from this custom protocol to find the right client to forward the data.
- I need to accept new clients. In current implementation this is also done in a separate thread
- I need to forward messages from clients to the central server. In current implementation this is also done in a separate thread(for each client)
Am I right that this is pretty hard stuff to implement for an RX newbie?^^
-
Donnerstag, 19. Januar 2012 12:19
Hi Stefan,
> Doesn't your sample above provide pretty much from the code I need?
I don't know, you haven't provided enough information about your requirements. And anyway there may be better ways of doing this, depending upon your requirements, which is why I linked you to Rxx. For example, we have various Stream extensions and I've recently checked in support for binary parsers, which will be included in the next release.
> I have to rewrite an existing proxy server and i guess that i can replace much (maybe all) the thread logic
> (endless while loops, Thread.Sleep(), "Data-pulling"...) with RX.It sounds like you're on the right track. If your application is using "data-pulling" though, then you'll have to convert it into a reactive model (data-pushing) to use Rx. That means you'll probably have to invert much of your code into continuations, although you can also use LINQ whenever possible to keep your code cleaner.
Also consider using one of the Rx-Experimental overloads of Observable.Create that accepts an iterator method (this is what Jeffrey was referring to in his response), which enables you to write reactive iterators that contain "endless while loops" using imperative-style coding. It's similar to the upcoming async/await feature in C# 5.0. If your proxy is a really complicated asynchronous mess, then this might be the best approach for a quick conversion into Rx.
> I need to connect my proxy server to two ports on the central server (one for sending data, one for receiving data). [snip]
Create a TcpClient for sending data and a TcpListener for receiving data, (edit:) unless you need to initiate both connections to the server; in that case, you can use two different TcpClient objects or even the same object if the endpoint is the same. Observable.FromAsyncPattern can be used to convert them to observables, similar to my old code in this thread.
> I need to parse an ID from this custom protocol to find the right client to forward the data.
You may want to consider using Rxx binary parsers for this (to be included in the next release; already checked into source control).
Note that parsers can have a substantial negative impact on performance in terms of both speed and memory usage, so if you're writing a high-performance proxy then Rxx parsers are probably not the best approach. They can make parsing much simpler though, so if you're interested then it may be worth trying it out and running some benchmarks to see whether they're acceptable for your particular performance goals.
Also note that the interactive parsers can be substantially faster than the reactive parsers, so if you find that the reactive parsers just aren't cutting it then you can try converting the source to IEnumerable<byte>; e.g., Stream.ToEnumerable() - another Rxx extension. The parser query should require little to no modifications.
Binary Parser Lab
http://rxx.codeplex.com/SourceControl/changeset/view/64579#1118356> I need to accept new clients. In current implementation this is also done in a separate thread
If they share the same local endpoint, then perhaps just use a single TcpListener. My old code should still work:
var acceptTcpClient = Observable.FromAsyncPattern<TcpClient>( listener.BeginAcceptTcpClient, listener.EndAcceptTcpClient); var clients = Observable.Defer(acceptTcpClient).Repeat();
> I need to forward messages from clients to the central server. In current implementation this is also done in
> a separate thread(for each client)Perhaps you could write a SelectMany query and call Subscribe() without any parameters to invoke the query for its side-effects.
For example: (Untested)
var outgoing = new TcpClient(); var connectToServer = Observable.FromAsyncPattern<string, int>(outgoing.BeginConnect, outgoing.EndConnect); var messageForwardingProxy = connectToServer(address, port).SelectMany(Observable.Using( outgoing.GetStream, outgoingStream => from incoming in clients from _ in Observable.Using( incoming.GetStream, incomingStream => from bytes in incomingStream.ReadToEndObservable() // Rxx extension from _ in outgoingStream.WriteObservable(bytes, 0, bytes.Length) // Rxx Extension select Unit.Default) .Finally(incoming.Close) select Unit.Default)) .Finally(outgoing.Close); var running = messageForwardingProxy.Subscribe();
> Am I right that this is pretty hard stuff to implement for an RX newbie?^^
It's definitely not entry level :)
- Dave
http://davesexton.com/blog- Bearbeitet Dave Sexton Donnerstag, 19. Januar 2012 12:23 Additional comment about TcpClient vs. TcpListener
- Als Antwort vorgeschlagen My.self Donnerstag, 19. Januar 2012 15:38
-
Donnerstag, 19. Januar 2012 13:45
Ok,
thx so far.I guess this is a great chance to collect RX skills since i don't know any other technology which meets my requirements.
Maybe you can tell me if there is something better?
As mentioned before there is a central server which provides two network ports to send and receive data. This server is maintained by an external company so there is no possibility to change anything there. The data is transferred binary in a custom protocol.
On the other side we have multiple Windows CE5 (Compact Framework 3.5) clients which have to send and receive this data in real time. I know that RX is not supported on Windows CE and so i guess the clients have to stay with the old code.
A proxy (or maybe is gateway the correct term) is needed since the central server does not support direct connections (as mentioned before he only opens two ports and the proxy has to parse the device ID from the protocol and forwards the data to the correct client).
Now the customer has many change requests and I think this is a great chance to do some code optimization since I'm not a big fan of starting too many threads and do observation within endless while loops in combination with Thread.Sleep...
So I started to look for some alternative technologies.
Actually I like .NET Remoting or WCF which provide a Duplex binding but this is not supported on Windows CE5.
So,
was this a good decision to try it with RX?Cheers,
Stefan
-
Donnerstag, 19. Januar 2012 15:02
Hi Stefan,
Sorry if I wasn't clear, but I didn't mean that Rx wasn't an appropriate choice or that there are other technologies that can meet your requirements (although perhaps there are). What I meant was that Rx offers more than one way to meet your general requirements and so I don't know whether my old code in this thread is actually what you need.
Would you say that your current non-Rx solution is the only way to meet your particular requirements without using Rx? Probably not. There are many different ways that you could tie together your client sockets, event handlers, while loops, etc. to achieve the same results, without using Rx.
Rx doesn't provide a solution for connecting sockets asynchronously. It's not a "socket library". Rx provides a means to author asynchronous computations (reactive code) in a declarative style via LINQ. So just like your existing solution that uses an imperative-style of coding, there are many ways in which you can use Rx to meet your requirements in a declarative style. How exactly to do that depends on your specific requirements.
Basically, Rx is an appropriate framework for converting synchronous code into asynchronous code. It's also appropriate for re-authoring asynchronous imperative-style code as asynchronous declarative-style code through LINQ, which is apparently what you're trying to do. So yes, it seems to be an appropriate framework for you. Whether or not my original solution in this thread is appropriate for you, I can't say for sure, but it doesn't seem likely to fit as is.
What you've stated so far is simply that you need to receive socket data, process it, and send it out asynchronously. You haven't given us a complete specification though. Rx can certainly help you to accomplish that and I've shown a few different ways in this thread already.
> was this a good decision to try it with RX?
Whether or not using Rx is a good decision is up to you. Obviously it's subjective whether you like LINQ or not. There's other factors as well, such as whether the learning curve is worth it. I personally believe that Rx was certainly worth learning for myself, but I can't say whether you'll agree. :)
- Dave
http://davesexton.com/blog -
Donnerstag, 19. Januar 2012 15:37
Actually it was clear for me that your response was connected to your old code in this thread. I also know that there are always several solutions for a specific requirement but I always want to find the best.
I guess this is the wrong location to place a general technology question... Sorry!
After intensive investigation I believe that RX is the best choice for this requirement. I already did some coding and it dramatically reduced the source code :)
Thx for support,
Cheers,
Stefan
-
Donnerstag, 19. Januar 2012 16:34
I have just been looking at this thing myself, Stefan.
Jeffrey van Gogh's post on how to use the iterator is very interesting, however it referes to quite an old version of Rx. I have quite a lot of experience in Rx, and I would strongly suggest that you use "Rxx" library for what you need. This library further extends Rx (Rx Extensions!) and has a brilliant set of functionality for turning APM/Async/BEginEnd methods into Rx.
To write the code yourself would require days invested in Duffy, Albahari and Richter books and then days further learning Rx and Linq. James and Dave have done an amazing job and should save you hours or days of frustration.
HTH
Lee
Lee Campbell http://LeeCampbell.blogspot.com -
Samstag, 21. Januar 2012 02:41
I guess that the sample postet below should accept an TcpClient and output the data which was sent by the client to the console...
but it doesn't :(
Can anybody point me to the problen?
using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Net.Sockets; using System.Reactive.Concurrency; using System.Reactive.Linq; using System.Reactive.Disposables; namespace ReactiveProgrammingConsole { class TcpListenerLab { static void Main2() { Console.WriteLine( "Start listening..." ); using( IPAddress.Parse( "192.168.1.7" ).Listen( port: 7777 ) .Subscribe( request => Console.WriteLine( "Recieved {0} bytes from {1}.", request.ReceivedBytes.Count, request.EndPoint ), ex => Console.WriteLine( "Exception: {0}", ex.ToString() ), () => Console.WriteLine( "Completed." ) ) ) { Console.ReadKey(); } Console.WriteLine( "Listener stopped." ); Console.ReadKey(); } } public sealed class TcpRequest { public IPEndPoint EndPoint { get { return endPoint; } } public ICollection<byte> ReceivedBytes { get { return receivedBytes; } } private readonly IPEndPoint endPoint; private readonly ICollection<byte> receivedBytes; public TcpRequest( IPEndPoint endPoint, ICollection<byte> receivedBytes ) { this.endPoint = endPoint; this.receivedBytes = receivedBytes; } } public static class NetworkExtensions { public static IObservable<TcpRequest> Listen( this IPAddress address, int port ) { return Listen( address, port, Scheduler.CurrentThread ); } public static IObservable<TcpRequest> Listen( this IPAddress address, int port, IScheduler scheduler ) { return Observable.Create<TcpRequest>( observer => { var listener = new TcpListener( address, port ); listener.Start(); Func<IObservable<TcpClient>> listenAsync = Observable.FromAsyncPattern<TcpClient>( listener.BeginAcceptTcpClient, listener.EndAcceptTcpClient ); IObservable<TcpClient> whenClientConnected = Observable.Defer( () => listenAsync() ).Repeat(); var whenBeginReceivingData = from client in whenClientConnected let buffer = new byte[ 1024 ] from read in Observable.Using( () => client.GetStream(), stream => { var readAsync = Observable.FromAsyncPattern<byte[], int, int, int>( stream.BeginRead, stream.EndRead ); return Observable.While( () => stream.DataAvailable, Observable.Defer( () => readAsync( buffer, 0, buffer.Length ) ) ); } ) group buffer.Take( read ).ToArray() by client into g select g; CompositeDisposable disposables = null; disposables = new CompositeDisposable( whenBeginReceivingData.Subscribe( client => disposables.Add( client.Aggregate( new List<byte>( 1024 ), ( list, buffer ) => { list.AddRange( buffer ); return list; } ) .Select( data => new TcpRequest( ( IPEndPoint ) client.Key.Client.RemoteEndPoint, data ) ) .Subscribe( request => observer.OnNext( request ), ex => { var socketEx = ex as SocketException; if( socketEx != null && socketEx.SocketErrorCode == SocketError.Shutdown ) { observer.OnCompleted(); } else observer.OnError( ex ); } ) ), observer.OnError, observer.OnCompleted ), Disposable.Create( () => listener.Stop() ), scheduler.Schedule( () => { try { listener.Start(); } catch( Exception ex ) { observer.OnError( ex ); } } ) ); return disposables; } ); } } }
-
Samstag, 21. Januar 2012 04:19
Hi,
Is it because you're calling listener.Start() twice?
Are you getting an exception? Have you tried adding breakpoints to see what code is executed, if any?
- Dave
http://davesexton.com/blog -
Samstag, 21. Januar 2012 10:50
I had to add this line because otherwise the following exceptions is thrown:
"System.InvalidOperationException: Not listening. You must call the Start() method before calling this method.\r\n at System.Net.Sockets.TcpListener.BeginAcceptTcpClient(AsyncCallback callback, Object state)\r\n at System.Reactive.Linq.Observable.<>c__DisplayClass2`1.<FromAsyncPattern>b__0()"after adding this line the sample is running without any exception.
When I debug this sample I can see that the data was received by adding a breakpoint to the aggregate function.
I guess I already refactored this sample a 1000 times but I wasn't able to output the received data to the console :(
One of my first tires with this code is posted below.
First thing I made was to remove the duplicate Listener.Start call.
public static IObservable<TcpRequest> Listen( this IPAddress address, int port, IScheduler scheduler ) { return Observable.Create<TcpRequest>( observer => { var listener = new TcpListener( address, port ); listener.Start(); Func<IObservable<TcpClient>> listenAsync = Observable.FromAsyncPattern<TcpClient>( listener.BeginAcceptTcpClient, listener.EndAcceptTcpClient ); IObservable<TcpClient> whenClientConnected = Observable.Defer( () => listenAsync() ).Repeat(); var whenBeginReceivingData = from client in whenClientConnected let buffer = new byte[ 1024 ] from read in Observable.Using( () => client.GetStream(), stream => { var readAsync = Observable.FromAsyncPattern<byte[], int, int, int>( stream.BeginRead, stream.EndRead ); return Observable.While( () => stream.DataAvailable, Observable.Defer( () => readAsync( buffer, 0, buffer.Length ) ) ); } ) group buffer.Take( read ).ToArray() by client into g select g; CompositeDisposable disposables = null; disposables = new CompositeDisposable( whenBeginReceivingData.Subscribe( client => disposables.Add( client.Aggregate( new List<byte>( 1024 ), ( list, buffer ) => { list.AddRange( buffer ); return list; } ) .Select( data => new TcpRequest( ( IPEndPoint ) client.Key.Client.RemoteEndPoint, data ) ) .Subscribe( request => observer.OnNext( request )) ) ) ); return disposables; } ); }
-
Samstag, 21. Januar 2012 11:35
Hi Stefan,
Well as I mentioned in my original post, I didn't actually test it :)
I guess moving the Start call was correct. But your new code causes a race condition in Subscribe that may cause a NullReferenceException to be thrown. That can be avoided by initializing the disposables variable before calling Subscribe.
Though perhaps there are other issues with this code. If you're hitting the breakpoint in Aggregate, then are you hitting a breakpoint in the following Select that creates a new TcpRequest object? If not, then I suspect it's because client is not calling OnCompleted. Is your client closing the connection after it's done sending data?
Also, you probably shouldn't remove observer.OnError and observer.OnCompleted from the call to Subscribe. Forwarding error and completion notifications can be quite important.
- Dave
http://davesexton.com/blog -
Samstag, 21. Januar 2012 11:38
Hi Stefan,
Actually, it looks like the reason that OnCompleted is not being called is because GroupBy is used. Try using GroupByUntil instead.
- Dave
http://davesexton.com/blog -
Samstag, 21. Januar 2012 14:04
HI Dave!
It is no problem that the sample is not running. I'm learning quite a lot while I'm staring at your code and try to make it run.But it seems that there is quite a lot more to understand since I have to ask the next (maybe silly) question:
Which "GroupBy" should i replace by "GroupByUntil"?
Cheers,
stefan -
Samstag, 21. Januar 2012 18:17
Hi Stefan,
I just checked in TcpListener and TcpClient wrappers to Rxx:
http://rxx.codeplex.com/SourceControl/changeset/changes/64713
And here's the corresponding lab:
http://rxx.codeplex.com/SourceControl/changeset/view/64713#1118888
The lab shows a much better approach to converting TcpListener to an observable than the code in this thread. No grouping necessary :)
--
To answer your last question, the group keyword indicates where GroupBy is being called, but to change it to GroupByUntil you'll have to break out of query comprehension syntax. You can do that by removing the group..by..into statement and leave just the select statement, but you'll also have to change the data in the projection; e.g., select new { client, read, buffer }. Then you can wrap the entire query in parenthesis and call GroupByUntil.- Dave
http://davesexton.com/blog- Als Antwort vorgeschlagen My.self Mittwoch, 25. Januar 2012 00:11
-
Dienstag, 24. Januar 2012 03:10
Thank you a 1000 times!
ObservableTcpListener.Start( new IPEndPoint( IPAddress.Loopback, 15005 ) ) .Subscribe( client => client.Client.ReceiveUntilCompleted( SocketFlags.None ) .Subscribe( message => Console.WriteLine( System.Text.ASCIIEncoding.ASCII.GetString( message ) ) ));I guess the code posted above provides all functionality I need (one direction -> forward messages from multiple clients to one central server).
I really like RX since the snippet posted above causes a code reduction of 99.9% compared to the old implementation :P
Cheers,
Stefan
-
Dienstag, 24. Januar 2012 04:31
Hi Stefan,
A couple things may be worth noting about your code:
- Nesting calls to Subscribe in this way means that inner subscriptions aren't cancellable. This might be alright for your particular requirements, but it's something to consider. Normally, I'd solve this problem by using a SelectMany query, but since ObservableTcpListener does not produce a well-behaved observable (as documented), you'll either have to call Synchronize as well (as I do in the lab) or manage subscriptions yourself; e.g., with CompositeDisposable.
- The observable that ReceiveUntilCompleted returns contains zero or more buffers as byte[]. So it isn't necessarily safe to assume that you'll get a complete message with each notification. There are several ways to work around this. Which you choose may depend upon your performance goals. In the lab, I've chosen to flatten and then aggregate by calling .SelectMany(b => b).ToArray(). This might not be the most performant solution but it may be acceptable.
- Dave
http://davesexton.com/blog -
Dienstag, 24. Januar 2012 15:21
I thought I'm already skilled enough to optimize your code :P
Thx for correction!

