none
RXX TcpClient ConnectObservable with Retry

    질문

  • Hello,

    I'm trying to utilize the Retry mechanism in rxx  to retry a tcp connection. The issueI found and, looks like,has been addressed here is that if you use a Retry with async pattern you only get one connection attempt who's failuer just gets replayed.

    My problem is that when I try and use Defer nothing happens?

    This is what I have so far:

     messageStream = 
                    Observable.Defer(() => 
                        _tcpClient.ConnectObservable(Host, Port))
                              .Retry(ReconnectRetries, 
                                     (ex, i) =>
                                          {
                                              Console.WriteLine("Reconnecting");
                                              return TimeSpan.FromMilliseconds(ReconnectTimeoutMillis);
                                          })
                              .SelectMany(ether => {
                                    if(!ether.IsLeft)
                                    {
                                        Console.WriteLine("Can not connect:{0}", ether.Right.Message);
                                        return Observable.Empty<byte[]>();
                                    }
    
                                    return _tcpClient.GetStream()
                                                     .ReadToEndObservable()
                                                     .DoSomeBufferStuffWithPublisAndPrime();
                                   });

    messageStream has multiple subscribers that should not handle the onError


    2012년 4월 11일 수요일 오후 8:34

답변

  • Hi, 

    Your query looks fine.  What exactly do you mean by "nothing happens"?

    Have you tried placing a breakpoint inside Defer to make sure that it's being called?  For example:

    Observable.Defer(() => 
    {
    	return _tcpClient.ConnectObservable(Host, Port);	// BREAK-POINT
    })

    This is unrelated, but you may want to use the Switch method to make Either<L,R> easier to consume.  For example:

    .SelectMany(either => either.Switch(
    	_ => _tcpClient.GetStream().ReadToEndObservable().DoSomeBufferStuffWithPublisAndPrime(),
    	ex =>
    	{
    		Console.WriteLine("Can not connect:{0}", ex.Message);
    		return Observable.Empty<byte[]>();
    	}));

    Alternatively, you can use TakeLeft to simplify it even further:

    .Retry(ReconnectRetries,
    	(ex, i) =>
    	{
    		Console.WriteLine("Can not connect:{0}", ex.Message);
    		Console.WriteLine("Reconnecting");
    		return TimeSpan.FromMilliseconds(ReconnectTimeoutMillis);
    	})
    .TakeLeft()
    .SelectMany(_ => _tcpClient.GetStream().ReadToEndObservable().DoSomeBufferStuffWithPublisAndPrime());

    - Dave


    http://davesexton.com/blog

    • 답변으로 표시됨 Dmitry Orlovsky 2012년 4월 12일 목요일 오후 1:58
    2012년 4월 11일 수요일 오후 9:52

모든 응답

  • Hi, 

    Your query looks fine.  What exactly do you mean by "nothing happens"?

    Have you tried placing a breakpoint inside Defer to make sure that it's being called?  For example:

    Observable.Defer(() => 
    {
    	return _tcpClient.ConnectObservable(Host, Port);	// BREAK-POINT
    })

    This is unrelated, but you may want to use the Switch method to make Either<L,R> easier to consume.  For example:

    .SelectMany(either => either.Switch(
    	_ => _tcpClient.GetStream().ReadToEndObservable().DoSomeBufferStuffWithPublisAndPrime(),
    	ex =>
    	{
    		Console.WriteLine("Can not connect:{0}", ex.Message);
    		return Observable.Empty<byte[]>();
    	}));

    Alternatively, you can use TakeLeft to simplify it even further:

    .Retry(ReconnectRetries,
    	(ex, i) =>
    	{
    		Console.WriteLine("Can not connect:{0}", ex.Message);
    		Console.WriteLine("Reconnecting");
    		return TimeSpan.FromMilliseconds(ReconnectTimeoutMillis);
    	})
    .TakeLeft()
    .SelectMany(_ => _tcpClient.GetStream().ReadToEndObservable().DoSomeBufferStuffWithPublisAndPrime());

    - Dave


    http://davesexton.com/blog

    • 답변으로 표시됨 Dmitry Orlovsky 2012년 4월 12일 목요일 오후 1:58
    2012년 4월 11일 수요일 오후 9:52
  • Dave,

    Thanks for your suggestions, TakeLeft is awesome.

    By nothing happens I mean no connection attempt, no reconnection attempt, no failure. Like something is stuck, or not started.

    Which begs the question, and in retrospect it makes sense. Defer will not do anything unit someone Subscribes to its return value?

    Would a ObserveOn(Scheduler.TaskPool) later down the line interfere with a proper execution of the Defer?   



    2012년 4월 12일 목요일 오후 12:29
  • My bad, everything seems aok now
    2012년 4월 12일 목요일 오후 1:59
  • Hi Dmitry,

    > Defer will not do anything unit someone Subscribes to its return value?

    Exactly.  Defer converts a hot observable into a cold observable.

    > Would a ObserveOn(Scheduler.TaskPool) later down the line interfere with a proper execution of the Defer?  

    No.  ObserveOn won't have any effect on Defer.

    - Dave


    http://davesexton.com/blog

    2012년 4월 12일 목요일 오후 4:03
  • Thanks again Dave,

    As a side note, if one wanted to implement some sort of behavior once the number of reconnects has been disabled they should do it in an onerror of the subscription to the deferred sequence, or is there another way to specify it the sequence definition? Something like a Finally or RetryFinalFail?

    2012년 4월 12일 목요일 오후 4:08
  • Hi, 

    You could add the Catch operator after Retry.  The Catch operator also listens for an OnError notification.

    The Retry operator will prevent Catch from seeing an OnError notification by using the right notification channel in OnNext.  Retry will only send an OnError notification when the retry count is exhausted.  Therefore, Catch will only catch the last exception, which Retry doesn't handle.

    - Dave


    http://davesexton.com/blog

    2012년 4월 12일 목요일 오후 4:22
  • Hi,

    You could also use Finally to do clean-up, if you don't require the exception information.

    - Dave


    http://davesexton.com/blog

    2012년 4월 12일 목요일 오후 4:23