none
How to add or associate context to each ObservableTcpListener Connection/Session

    Question

  • Hi Everyone,

    I am using the following simple code (which I thought was neat), BUT... It has a problem in that I need a way to associate each incoming connected client with it's own corresponding outgoing pipeline. In the example it is shown as 'TcpClient otherChannel', eventually I will replace it with a TPL Dataflow BufferedBlock<T> otherPipeline. Problem is that I have no way of injecting or associating each unique session unless I keep a separate Dictionary mapping TcpClients with their respective 'otherChannels'. Would be nice if I could extend ObservableTcpListener to cater for my own extended TcpClient that has it's related other context attached to it.

    Am I using the wrong pattern? Should I revert to old-style techniques, or is there a way for me to associate 'otherChannel' with each incoming connection that is aluding me?

    Code is:

    public sealed class SmartRouter
    {
        private IPEndPoint  thisEndPoint;
        private string      otherHost;
        private int         otherPort;
        private int         maxConcurrency;
        private TcpClient   otherChannel;
    
        public SmartRouter(IPEndPoint thisEndPoint, string otherHost, int otherPort, int maxConcurrency = 10)
        {
            this.otherHost= otherHost;
            this.otherPort = otherPort;
            this.thisEndPoint = thisEndPoint;
            this.maxConcurrency = maxConcurrency;
        }
    
        public void Start()
        {
            IObservable<TcpClient> clients = ObservableTcpListener.Start(thisEndPoint,maxConcurrency);
            clients.Subscribe ( client => OnConnect(client)
                                .Subscribe
                                (
                                    message => OnMessage(client, message),
                                    ex      => OnException(client,ex),
                                    ()      => OnCompleted(client)
                                )
                                );
        }
    
        private IObservable<byte[]> OnConnect(TcpClient client)
        {
            if (otherChannel == null)
            {
                otherChannel = new TcpClient(otherHost,otherPort);
            }
            return client.Client.ReceiveUntilCompleted(SocketFlags.None);
        }
    
        private void OnMessage(TcpClient client, byte[] message)
        {
            client.GetStream().Write(message, 0, message.Length);
            otherChannel.GetStream().Write(message, 0, message.Length);
        }
    
        private void OnCompleted(TcpClient client)
        {
            System.Diagnostics.Debug.WriteLine("Completed.");
        }
    
        private void OnException(TcpClient client, Exception ex)
        {
            System.Diagnostics.Debug.WriteLine("Exception: {0}", ex.ToString());
        }
    
    }
    

    What I need is:

    Any ideas or suggestions are most welcome.

    Sunday, May 06, 2012 2:48 PM

Answers

  • Hi James,

    Why can't you decorate the sockets within your observer?

    var listener = new TcpListener(thisEndPoint);
    
    IObservable<Socket> sockets = listener
    	.StartSocketObservable(maxConcurrency)
    	.Finally(listener.Stop);
    
    sockets.Subscribe(socket => 
    {
    	var client = new TcpClientEx(socket);
    
    	OnConnect(client).Subscribe(
    		message => OnMessage(client, message),
    		ex => OnException(client, ex),
    		() => OnCompleted(client);
    });

    I understand if it's an encapsulation issue.  Though you could use a projection (with synchronization, or maxConcurrent: 1), if that's what you need.

    > BTW: Would be nice if TcpListener had a parameterized factory method.

    True, but the purpose of the wrappers in Rxx are to translate existing FCL functionality into observables via a single method call.  Since TcpListener doesn't provide a parameterized factory method, I don't think Rxx should either.  However, there are perhaps a few cases in Rxx where I've added extra functionality in wrappers, but only for functionality that seemed to be common yet is difficult to do correctly (basically, the EAP wrappers).

    - Dave


    http://davesexton.com/blog

    • Marked as answer by Marineheiro Sunday, May 06, 2012 11:20 PM
    Sunday, May 06, 2012 10:31 PM

All replies

  • Hi,

    I think you're referring to ObservableTcpListener from the Rxx project.  It's actually a wrapper for the StartObservable extension method, which is a wrapper for the AcceptTcpClientAsync method (as of the latest Rxx 2.0 Beta source code).

    Unfortunately, it looks like TcpListener doesn't accept a factory method for creating TcpClient objects; however, it does provide an AcceptSocketAsync method that Rxx wraps with StartSocketObservable.  So you could call StartSocketObservable instead and then wrap the Socket objects that are returned with your own TcpClient objects.

    - Dave


    http://davesexton.com/blog

    Sunday, May 06, 2012 6:15 PM
  • Hi,

    You could also use the decorator pattern and simply wrap the TcpClient objects created by ObservableTcpListener with your own TcpClient objects.

    - Dave


    http://davesexton.com/blog

    Sunday, May 06, 2012 6:18 PM
  • Hi Dave

    Indeed I was using your neat  ObservableTcpListener from the Rxx project. Liked your suggestions... but using StartSocketObservable simply "defers" to the same problem... unfortunately the root of the issue sits at:

            public SmartRouter(IPEndPoint thisEndPoint, string otherHost, int otherPort, int maxConcurrency = 10)
            {
                ...
            }
            public void Start()
            {
                IObservable<Socket> sockets = SmartRouter.Start(thisEndPoint,maxConcurrency);
                sockets.Subscribe ( client => OnConnect(client)
                                    .Subscribe
                                    (
                                        message => OnMessage(client, message),
                                        ex      => OnException(client,ex),
                                        ()      => OnCompleted(client)
                                    )
                                    );
            }

    What I ultimately need though is:

            public void Start()
            {
                IObservable<TcpClientEx> clients = SmartRouter.Start(thisEndPoint,maxConcurrency);
                clients.Subscribe ( client => OnConnect(client)
                                    .Subscribe
                                    (
                                        message => OnMessage(client, message),
                                        ex      => OnException(client,ex),
                                        ()      => OnCompleted(client)
                                    )
                                    );
            }

    Creating the derived TcpClientEx is simple

        public class TcpClientEx : TcpClient
        {
            public TcpClientEx(Socket socket)
            {
                Client = socket;
            }
            public TcpClientEx(string hostname, int port)
                : base(hostname, port)
            {
            }
        }

    So perhaps I should rather use the listener.StartSocketObservable() and then have a TcpListenerEx with the relevant extension(s) which will ultimately give me my IObservable<TcpClientEx>?

    BTW: Would be nice if TcpListener had a parameterized factory method. 

    Thanks for your timely response though ;)

    - James




    Sunday, May 06, 2012 7:11 PM
  • Hi James,

    Why can't you decorate the sockets within your observer?

    var listener = new TcpListener(thisEndPoint);
    
    IObservable<Socket> sockets = listener
    	.StartSocketObservable(maxConcurrency)
    	.Finally(listener.Stop);
    
    sockets.Subscribe(socket => 
    {
    	var client = new TcpClientEx(socket);
    
    	OnConnect(client).Subscribe(
    		message => OnMessage(client, message),
    		ex => OnException(client, ex),
    		() => OnCompleted(client);
    });

    I understand if it's an encapsulation issue.  Though you could use a projection (with synchronization, or maxConcurrent: 1), if that's what you need.

    > BTW: Would be nice if TcpListener had a parameterized factory method.

    True, but the purpose of the wrappers in Rxx are to translate existing FCL functionality into observables via a single method call.  Since TcpListener doesn't provide a parameterized factory method, I don't think Rxx should either.  However, there are perhaps a few cases in Rxx where I've added extra functionality in wrappers, but only for functionality that seemed to be common yet is difficult to do correctly (basically, the EAP wrappers).

    - Dave


    http://davesexton.com/blog

    • Marked as answer by Marineheiro Sunday, May 06, 2012 11:20 PM
    Sunday, May 06, 2012 10:31 PM
  • Hi Dave

    You are truly on the ball !!! I was overcomplicating things for myself :( Guess my lame excuse being that I have missed out on the last 3 years of what's been going on in the .NET world... boy has it changed and improved ! So, I really was only exposed to all of this, this past Thursday and I have a deadline... (yeah I know it's risky if I want to deliver..)... but I really like the fluent and amazingly powerful frameworks/extensions/constructs... makes things so much more maintainable and decoupled etc.

    You (and MS) are doing some awesome stuff and I really do appreciate your assistance! It works like magic ! 1000 THANK YOU's !

    - James

    Sunday, May 06, 2012 11:19 PM