How to add or associate context to each ObservableTcpListener Connection/Session
-
2012年5月6日 14:48
Hi Everyone,
I am using the following simple code (which I thought was neat), BUT... It has a problem in that I need a way to associate each incoming connected client with it's own corresponding outgoing pipeline. In the example it is shown as 'TcpClient otherChannel', eventually I will replace it with a TPL Dataflow BufferedBlock<T> otherPipeline. Problem is that I have no way of injecting or associating each unique session unless I keep a separate Dictionary mapping TcpClients with their respective 'otherChannels'. Would be nice if I could extend ObservableTcpListener to cater for my own extended TcpClient that has it's related other context attached to it.
Am I using the wrong pattern? Should I revert to old-style techniques, or is there a way for me to associate 'otherChannel' with each incoming connection that is aluding me?
Code is:
public sealed class SmartRouter { private IPEndPoint thisEndPoint; private string otherHost; private int otherPort; private int maxConcurrency; private TcpClient otherChannel; public SmartRouter(IPEndPoint thisEndPoint, string otherHost, int otherPort, int maxConcurrency = 10) { this.otherHost= otherHost; this.otherPort = otherPort; this.thisEndPoint = thisEndPoint; this.maxConcurrency = maxConcurrency; } public void Start() { IObservable<TcpClient> clients = ObservableTcpListener.Start(thisEndPoint,maxConcurrency); clients.Subscribe ( client => OnConnect(client) .Subscribe ( message => OnMessage(client, message), ex => OnException(client,ex), () => OnCompleted(client) ) ); } private IObservable<byte[]> OnConnect(TcpClient client) { if (otherChannel == null) { otherChannel = new TcpClient(otherHost,otherPort); } return client.Client.ReceiveUntilCompleted(SocketFlags.None); } private void OnMessage(TcpClient client, byte[] message) { client.GetStream().Write(message, 0, message.Length); otherChannel.GetStream().Write(message, 0, message.Length); } private void OnCompleted(TcpClient client) { System.Diagnostics.Debug.WriteLine("Completed."); } private void OnException(TcpClient client, Exception ex) { System.Diagnostics.Debug.WriteLine("Exception: {0}", ex.ToString()); } }What I need is:
Any ideas or suggestions are most welcome.
すべての返信
-
2012年5月6日 18:15
Hi,
I think you're referring to ObservableTcpListener from the Rxx project. It's actually a wrapper for the StartObservable extension method, which is a wrapper for the AcceptTcpClientAsync method (as of the latest Rxx 2.0 Beta source code).
Unfortunately, it looks like TcpListener doesn't accept a factory method for creating TcpClient objects; however, it does provide an AcceptSocketAsync method that Rxx wraps with StartSocketObservable. So you could call StartSocketObservable instead and then wrap the Socket objects that are returned with your own TcpClient objects.
- Dave
-
2012年5月6日 18:18
Hi,
You could also use the decorator pattern and simply wrap the TcpClient objects created by ObservableTcpListener with your own TcpClient objects.
- Dave
-
2012年5月6日 19:11
Hi Dave
Indeed I was using your neat ObservableTcpListener from the Rxx project. Liked your suggestions... but using StartSocketObservable simply "defers" to the same problem... unfortunately the root of the issue sits at:
public SmartRouter(IPEndPoint thisEndPoint, string otherHost, int otherPort, int maxConcurrency = 10) { ... } public void Start() { IObservable<Socket> sockets = SmartRouter.Start(thisEndPoint,maxConcurrency); sockets.Subscribe ( client => OnConnect(client) .Subscribe ( message => OnMessage(client, message), ex => OnException(client,ex), () => OnCompleted(client) ) ); }What I ultimately need though is:
public void Start() { IObservable<TcpClientEx> clients = SmartRouter.Start(thisEndPoint,maxConcurrency); clients.Subscribe ( client => OnConnect(client) .Subscribe ( message => OnMessage(client, message), ex => OnException(client,ex), () => OnCompleted(client) ) ); }Creating the derived TcpClientEx is simple
public class TcpClientEx : TcpClient { public TcpClientEx(Socket socket) { Client = socket; } public TcpClientEx(string hostname, int port) : base(hostname, port) { } }So perhaps I should rather use the listener.StartSocketObservable() and then have a TcpListenerEx with the relevant extension(s) which will ultimately give me my IObservable<TcpClientEx>?
BTW: Would be nice if TcpListener had a parameterized factory method.
Thanks for your timely response though ;)
- James
- 編集済み Marineheiro 2012年5月6日 19:33
- 編集済み Marineheiro 2012年5月6日 19:35
- 編集済み Marineheiro 2012年5月6日 19:38
-
2012年5月6日 22:31
Hi James,
Why can't you decorate the sockets within your observer?
var listener = new TcpListener(thisEndPoint); IObservable<Socket> sockets = listener .StartSocketObservable(maxConcurrency) .Finally(listener.Stop); sockets.Subscribe(socket => { var client = new TcpClientEx(socket); OnConnect(client).Subscribe( message => OnMessage(client, message), ex => OnException(client, ex), () => OnCompleted(client); });I understand if it's an encapsulation issue. Though you could use a projection (with synchronization, or maxConcurrent: 1), if that's what you need.
> BTW: Would be nice if TcpListener had a parameterized factory method.
True, but the purpose of the wrappers in Rxx are to translate existing FCL functionality into observables via a single method call. Since TcpListener doesn't provide a parameterized factory method, I don't think Rxx should either. However, there are perhaps a few cases in Rxx where I've added extra functionality in wrappers, but only for functionality that seemed to be common yet is difficult to do correctly (basically, the EAP wrappers).
- Dave
- 回答としてマーク Marineheiro 2012年5月6日 23:20
-
2012年5月6日 23:19
Hi Dave
You are truly on the ball !!! I was overcomplicating things for myself :( Guess my lame excuse being that I have missed out on the last 3 years of what's been going on in the .NET world... boy has it changed and improved ! So, I really was only exposed to all of this, this past Thursday and I have a deadline... (yeah I know it's risky if I want to deliver..)... but I really like the fluent and amazingly powerful frameworks/extensions/constructs... makes things so much more maintainable and decoupled etc.
You (and MS) are doing some awesome stuff and I really do appreciate your assistance! It works like magic ! 1000 THANK YOU's !
- James

