none
Getting the IPEndPoint from Socket.EndReceive using FromAsyncPattern

    Question

  • Is there any way to do the following properly? I'm absoutely sure the way I pass ep below and use it in the lambda is wrong, but I see no other way of doing this using Rx?

     IPEndPoint ep = null;
     Observable
     .Defer(() =>  Observable.FromAsyncPattern<byte[]>FloorServerSocket.Socket.BeginReceive, (i) => FloorServerSocket.Socket.EndReceive(i, ref ep))())
                            .ObserveOn(Scheduler.ThreadPool)
                            .Retry()
                            .Repeat()
                            .Subscribe
                            (
                                (UDP_Packet) => ProcessPacketType(UDP_Packet, ep),
                                (OnError) => Debug.Print("Error: " + OnError.Message),
                                () => Debug.Print("Completed")
                            );

    Thursday, April 05, 2012 11:00 AM

Answers

  • Hi,

    In that case you must use ReceieveFrom, ReceieveFromAsync or ReceiveFromObservable (the latter is defined in Rxx).

    Retry().Repeat() is fine if you need the socket to remain open for the lifetime of the process.  Or you can simply add Retry to my proposed operator for Rxx.

    For example:

    var args = new SocketAsyncEventArgs();
    
    FloorServerSocket.Socket
    	.ReceiveFromUntilCompleted(args)
    	.Retry()
    	.Subscribe
    	(
    		e => ProcessPacketType(e.Buffer, e.RemoteEndPoint),
    		() => Debug.Print("Completed")
    	);

    Note that I removed the error handler because it will never execute; i.e., Retry prevents all exceptions from reaching it.

    - Dave


    http://davesexton.com/blog

    Thursday, April 05, 2012 3:44 PM
  • Hi,

    Actually, the Finally operator that I'm using in my proposed operator may get in the way of Retry.  You'll probably want to drop it.

    public static IObservable<SocketAsyncEventArgs> ReceiveFromUntilCompleted(this Socket socket, SocketAsyncEventArgs eventArgs)
    {
    	return Observable
    		.Defer(() => socket.ReceiveFromObservable(eventArgs))
    		.Repeat()
    		.TakeWhile(e => e.BytesTransferred > 0);
    }

    I'm also assuming that a persistent connectionless socket will never receive 0 bytes (which indicates that the socket is closed).  If I'm wrong, then you'll have to drop TakeWhile too.

    - Dave


    http://davesexton.com/blog

    Thursday, April 05, 2012 3:51 PM

All replies

  • Hi, 

    Unfortunately, ref parameters aren't supported in FromAsyncPattern so you must use a lambda as you're doing; however, you don't need a closure since you can change the return type inside the lambda to Tuple<byte[], IPEndPoint>.

    Though I don't see an overload of EndReceive in .NET that returns a byte[] and a ref parameter.  I see one that returns an int and an EndReceiveFrom method that returns an int and a ref parameter, but both of them have corresponding BeginReceive/BeginReceiveFrom methods that require you to specify the buffer yourself.

    You can simplify the entire query with Rxx.  If you don't need the remote end point, then it's even simpler:

    FloorServerSocket.Socket
    	.ReceiveUntilCompleted()
    	.Subscribe
    	(
    		UDP_Packet => ProcessPacketType(UDP_Packet),
    		ex => Debug.Print("Error: " + ex.Message),
    		() => Debug.Print("Completed")
    	);

    However, if you do need the remote end point, then you'll have to do the looping yourself.  I'll consider adding an overload in Rxx for ReceiveUntilCompleted that accepts a SocketAsyncEventArgs object.  For example:

    public static IObservable<SocketAsyncEventArgs> ReceiveUntilCompleted(this Socket socket, SocketAsyncEventArgs eventArgs)
    {
    	return Observable
    		.Defer(() => socket.ReceiveObservable(eventArgs))
    		.Repeat()
    		.TakeWhile(e => e.BytesTransferred > 0)
    		.Finally(eventArgs.Dispose);
    }

    Furthermore, ObserveOn(Scheduler.ThreadPool) is redundant because the callback will be on a pooled thread already.  And unless you want a permanent connection to the socket, you don't want to use Retry and Repeat because it will literally repeat forever on a pooled thread after the socket is closed; i.e., it may result in ObjectDisposedException being thrown in an infinite loop on a pooled thread.

    - Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Thursday, April 05, 2012 2:21 PM Removed preconditions from example
    Thursday, April 05, 2012 2:20 PM
  • Hi,

    Actually, to get the RemoteEndPoint for a connectionless socket you must call ReceiveFrom or ReceiveFromAsync.  See http://msdn.microsoft.com/en-us/library/system.net.sockets.socket.receivefromasync.aspx for more info.

    So the extension that I posted previously won't work for a connectionless socket unless it's modified slightly to call ReceiveFromObservable:

    public static IObservable<SocketAsyncEventArgs> ReceiveFromUntilCompleted(this Socket socket, SocketAsyncEventArgs eventArgs)
    {
    	return Observable
    		.Defer(() => socket.ReceiveFromObservable(eventArgs))
    		.Repeat()
    		.TakeWhile(e => e.BytesTransferred > 0)
    		.Finally(eventArgs.Dispose);
    }

    - Dave


    http://davesexton.com/blog

    Thursday, April 05, 2012 2:26 PM
  • hi davei

    its a udp server dhcp prog. it must stay open listening to broadcasts an

    also the ip endpoint is required because we must send an ip number back to the calling client...

    d not close. is there a better way to do this and not use repeat and retry?

    sorry my phone is doing weird stuff on this forum. hope my post nakes sense :-)

    Thursday, April 05, 2012 3:18 PM
  • Hi,

    In that case you must use ReceieveFrom, ReceieveFromAsync or ReceiveFromObservable (the latter is defined in Rxx).

    Retry().Repeat() is fine if you need the socket to remain open for the lifetime of the process.  Or you can simply add Retry to my proposed operator for Rxx.

    For example:

    var args = new SocketAsyncEventArgs();
    
    FloorServerSocket.Socket
    	.ReceiveFromUntilCompleted(args)
    	.Retry()
    	.Subscribe
    	(
    		e => ProcessPacketType(e.Buffer, e.RemoteEndPoint),
    		() => Debug.Print("Completed")
    	);

    Note that I removed the error handler because it will never execute; i.e., Retry prevents all exceptions from reaching it.

    - Dave


    http://davesexton.com/blog

    Thursday, April 05, 2012 3:44 PM
  • Hi,

    Actually, the Finally operator that I'm using in my proposed operator may get in the way of Retry.  You'll probably want to drop it.

    public static IObservable<SocketAsyncEventArgs> ReceiveFromUntilCompleted(this Socket socket, SocketAsyncEventArgs eventArgs)
    {
    	return Observable
    		.Defer(() => socket.ReceiveFromObservable(eventArgs))
    		.Repeat()
    		.TakeWhile(e => e.BytesTransferred > 0);
    }

    I'm also assuming that a persistent connectionless socket will never receive 0 bytes (which indicates that the socket is closed).  If I'm wrong, then you'll have to drop TakeWhile too.

    - Dave


    http://davesexton.com/blog

    Thursday, April 05, 2012 3:51 PM
  • Thanks a Million! I'll give this a try immediately... 
    Thursday, April 05, 2012 4:40 PM