RXX TcpClient ConnectObservable with Retry
-
11 April 2012 20:34
Hello,
I'm trying to utilize the Retry mechanism in rxx to retry a tcp connection. The issueI found and, looks like,has been addressed here is that if you use a Retry with async pattern you only get one connection attempt who's failuer just gets replayed.
My problem is that when I try and use Defer nothing happens?
This is what I have so far:
messageStream = Observable.Defer(() => _tcpClient.ConnectObservable(Host, Port)) .Retry(ReconnectRetries, (ex, i) => { Console.WriteLine("Reconnecting"); return TimeSpan.FromMilliseconds(ReconnectTimeoutMillis); }) .SelectMany(ether => { if(!ether.IsLeft) { Console.WriteLine("Can not connect:{0}", ether.Right.Message); return Observable.Empty<byte[]>(); } return _tcpClient.GetStream() .ReadToEndObservable() .DoSomeBufferStuffWithPublisAndPrime(); });
messageStream has multiple subscribers that should not handle the onError
- Diedit oleh Dmitry Orlovsky 11 April 2012 20:36 typos
Semua Balasan
-
11 April 2012 21:52
Hi,
Your query looks fine. What exactly do you mean by "nothing happens"?
Have you tried placing a breakpoint inside Defer to make sure that it's being called? For example:
Observable.Defer(() => { return _tcpClient.ConnectObservable(Host, Port); // BREAK-POINT })This is unrelated, but you may want to use the Switch method to make Either<L,R> easier to consume. For example:
.SelectMany(either => either.Switch( _ => _tcpClient.GetStream().ReadToEndObservable().DoSomeBufferStuffWithPublisAndPrime(), ex => { Console.WriteLine("Can not connect:{0}", ex.Message); return Observable.Empty<byte[]>(); }));Alternatively, you can use TakeLeft to simplify it even further:
.Retry(ReconnectRetries, (ex, i) => { Console.WriteLine("Can not connect:{0}", ex.Message); Console.WriteLine("Reconnecting"); return TimeSpan.FromMilliseconds(ReconnectTimeoutMillis); }) .TakeLeft() .SelectMany(_ => _tcpClient.GetStream().ReadToEndObservable().DoSomeBufferStuffWithPublisAndPrime());- Dave
http://davesexton.com/blog
- Ditandai sebagai Jawaban oleh Dmitry Orlovsky 12 April 2012 13:58
-
12 April 2012 12:29
Dave,
Thanks for your suggestions, TakeLeft is awesome.
By nothing happens I mean no connection attempt, no reconnection attempt, no failure. Like something is stuck, or not started.
Which begs the question, and in retrospect it makes sense. Defer will not do anything unit someone Subscribes to its return value?
Would a ObserveOn(Scheduler.TaskPool) later down the line interfere with a proper execution of the Defer?
- Diedit oleh Dmitry Orlovsky 12 April 2012 12:51
- Diedit oleh Dmitry Orlovsky 12 April 2012 12:53
-
12 April 2012 13:59My bad, everything seems aok now
-
12 April 2012 16:03
Hi Dmitry,
> Defer will not do anything unit someone Subscribes to its return value?
Exactly. Defer converts a hot observable into a cold observable.
> Would a ObserveOn(Scheduler.TaskPool) later down the line interfere with a proper execution of the Defer?
No. ObserveOn won't have any effect on Defer.
- Dave
http://davesexton.com/blog
-
12 April 2012 16:08
Thanks again Dave,
As a side note, if one wanted to implement some sort of behavior once the number of reconnects has been disabled they should do it in an onerror of the subscription to the deferred sequence, or is there another way to specify it the sequence definition? Something like a Finally or RetryFinalFail?
-
12 April 2012 16:22
Hi,
You could add the Catch operator after Retry. The Catch operator also listens for an OnError notification.
The Retry operator will prevent Catch from seeing an OnError notification by using the right notification channel in OnNext. Retry will only send an OnError notification when the retry count is exhausted. Therefore, Catch will only catch the last exception, which Retry doesn't handle.
- Dave
http://davesexton.com/blog
-
12 April 2012 16:23
Hi,
You could also use Finally to do clean-up, if you don't require the exception information.
- Dave
http://davesexton.com/blog