locked
Best way to ignore errors RRS feed

  • Question

  • Hello,

    I've been trying to find a good way of ignoring errors and going on to the next event and have found two solutions so far. One of which I think only works with cold observables and the other being the following elegant/easy solution:

    .Materialize()
    .Where(o => o.Kind != NotificationKind.OnError)
    .Dematerialize())
    

    Now I am really not sure though if this is a good solution and whether it is efficient.

     

    Thanks for any input.

     

    David

    Saturday, November 13, 2010 1:37 PM

Answers

  • I've come to realize that even the solution above doesn't work. It only worked in a very specific scenario where this was applied to an Observable.Throw inside a SelectMany block.

     

    Besides, based on the Rx guidelines I have to assume that this is impossible as any properly implemented Observable will never fire any more OnNext after an OnError, whatever I may do with it.

    • Marked as answer by David Grenier Saturday, November 13, 2010 1:56 PM
    Saturday, November 13, 2010 1:56 PM

All replies

  • I've come to realize that even the solution above doesn't work. It only worked in a very specific scenario where this was applied to an Observable.Throw inside a SelectMany block.

     

    Besides, based on the Rx guidelines I have to assume that this is impossible as any properly implemented Observable will never fire any more OnNext after an OnError, whatever I may do with it.

    • Marked as answer by David Grenier Saturday, November 13, 2010 1:56 PM
    Saturday, November 13, 2010 1:56 PM
  • Hi David,

    you are right .. by design observable sources must not generate further messages after calling OnError(). The official error handling methods that allow you to retry cold obersables or to continue with a diffrent one are Retry() and Catch().

    These are some of my personal Rx extension methods that allow to rebind to a cold-observable source and to implement a back-off strategy:

    public static IObservable<TSource> CatchAndRetry<TSource, TException>(
    	this IObservable<TSource> source, 
    	Func<TException /* error */, IObservable<TSource> /* originalSource */, int /* failureCount */, IObservable<TSource>> errorHandler, 
    	int maxRetries
    )
    	where TException : Exception
    {
    	if (errorHandler == null)
    		throw new ArgumentException("errorHandler");
    
    	return source.Catch(Enumerable.Repeat(errorHandler, maxRetries));
    }
    
    public static IObservable<TSource> CatchAndRetry<TSource, TException>(
    	this IObservable<TSource> source, 
    	Func<TException /* error */, IObservable<TSource> /* originalSource */, int /* failureCount */, IObservable<TSource>> errorHandler
    )
    	where TException : Exception
    {
    	if (errorHandler == null)
    		throw new ArgumentException("errorHandler");
    
    	return source.Catch(EnumerableEx.Repeat(errorHandler));
    }
    
    private static IObservable<TSource> Catch<TSource, TException>(
    	this IObservable<TSource> source, IEnumerable<Func<TException /* error */, IObservable<TSource> /* originalSource */, int /* failureCount */, IObservable<TSource>>> retrySourceGenerator
    )
    	where TException : Exception
    {
    	if (source == null)
    		throw new ArgumentNullException("source");
    
    	return Observable.CreateWithDisposable<TSource>(observer =>
    	{
    		var e = retrySourceGenerator.GetEnumerator();
    
    		MutableDisposable subscription = new MutableDisposable();
    
    		int failureCount = 0;
    		IObservable<TSource> nextSource = source;
    		IDisposable recursiveCall = Scheduler.Immediate.Schedule(self =>
    		{
    			subscription.Disposable = nextSource.Subscribe(
    				onNext: observer.OnNext,
    				onError: error =>
    				{
    					if (error is TException)
    					{
    						try
    						{
    							if (e.MoveNext())
    							{
    								nextSource = e.Current((TException)error, source, ++failureCount);
    							}
    							else
    							{
    								nextSource = null;
    								e.Dispose();
    							}
    						}
    						catch (Exception exception)
    						{
    							observer.OnError(exception);
    							e.Dispose();
    							return;
    						}
    
    						if (nextSource != null)
    						{
    							self();
    						}
    						else
    						{
    							observer.OnError(error);
    						}
    					}
    					else
    					{
    						observer.OnError(error);
    					}
    				},
    				onCompleted: observer.OnCompleted
    			);
    		});
    
    		return new CompositeDisposable(subscription, recursiveCall);
    	});
    }
    
    public static IObservable<TSource> RetryWithBackOff<TSource, TException>(
    	this IObservable<TSource> source,
    	Func<TException /* error */, int /* failureCount */, TimeSpan?> backOffStrategy
    )
    where TException : Exception
    {
    	return source.CatchAndRetry<TSource, TException>((error, originalSource, failureCount) =>
    	{
    		TimeSpan? delay = backOffStrategy(error, failureCount);
    		if (!delay.HasValue)
    			return null;
    
    		return Observable.Timer(delay.Value).SelectMany(x => originalSource);
    	});
    }

    Andreas

    Saturday, November 13, 2010 11:53 PM
  • Hi David,

    Take a look at Catch and OnErrorResumeNext.  Although you're correct that once an observable calls OnError it should never call any On* method again, you can still use the aforementioned combinators to continue the observable when OnError is called.

    One approach is to use Publish to make the source hot and then return it as the new observable when OnError is called.  Thus, if the source doesn't call OnError itself, but instead some combinator between the source and Catch / OnErrorResumeNext does, then you can continue the query by attempting to reapply it to the original source.  However, if the source calls OnError then the result will probably be like Observable.Never.

    In the following code I've attempted to generalize this into a couple of extension methods.  The first ignores exceptions of a specific type, while the latter allows the caller to handle exceptions by optionally continuing with the source observable.

    Edit: I've renamed the methods yet again; this time to Try.  Seems like it could be a useful pattern, similar to Observable.Using.

    (not compiled / tested)

    public static IObservable<TSource> Try<TSource, TException>(
    	this IObservable<TSource> source, 
    	Func<IObservable<TSource>, IObservable<Tsource>> selector, 
    )
    	where TException : Exception
    {
    	return Try<TSource, TException>(source, selector, (shared, _) => selector(shared));
    }
    
    public static IObservable<TSource> Try<TSource, TException>(
    	this IObservable<TSource> source, 
    	Func<IObservable<TSource>, IObservable<Tsource>> trySelector, 
    	Func<IObservable<TSource>, TException, IObservable<Tsource>> catchSelector, 
    )
    	where TException : Exception
    {
    	return source.Publish(shared => 
    	{
    		return trySelector(shared).Catch<TSource, TException>(
    			ex => catchSelector(shared, ex));
    	});
    }
    
    

    - Dave


    http://davesexton.com/blog
    • Edited by Dave Sexton Sunday, November 14, 2010 12:06 AM Renamed the methods yet again, this time to "Try".
    Saturday, November 13, 2010 11:55 PM
  • Have a look at this post on flow control and handling errors.

    http://social.msdn.microsoft.com/Forums/en-US/rx/thread/ca063e21-4138-4932-b4e6-32bfb18f37cf

    It covers the use of Retry, OnErrorResumeNext, Catch, Materialize & Dematerialize.

    It appears that you probably want to use Catch and then provide a continuation stream.

    HTH

    Lee

    Wednesday, December 8, 2010 2:20 PM