none
review my extended retry operator with result and error predicates , not only on error

    Question

  • Hi All ,

    I have written a small retry operator to be able to do retry based into the on next result or on error checks not only on error check , can you please check it out and let me know what are the possible issues with this one as you are the experts here  :) :

    /**
     */
    
    import java.util.concurrent.atomic.AtomicInteger;
    import rx.*;
    import rx.functions.*;
    import rx.schedulers.Schedulers;
    import rx.subscriptions.SerialSubscription;
    public class OperatorFullRetryWithPredicate <T> implements Observable.Operator<T, Observable<T>> {
        private final ServiceRetryPolicy<T> resultPredicate;
        public OperatorFullRetryWithPredicate(ServiceRetryPolicy<T> resultPredicate) {
    
            this.resultPredicate = resultPredicate;
    
        }
    
        @Override
        public Subscriber<? super Observable<T>> call(final Subscriber<? super T> child) {
            final Scheduler.Worker inner = Schedulers.trampoline().createWorker();
            child.add(inner);
    
            final SerialSubscription serialSubscription = new SerialSubscription();
            // add serialSubscription so it gets un-subscribed if child is un-subscribed
            child.add(serialSubscription);
            return new OperatorFullRetryWithPredicate.SourceSubscriber<T>(child,resultPredicate, inner, serialSubscription);
        }
    
        static final class SourceSubscriber<T> extends Subscriber<Observable<T>> {
            final Subscriber<? super T> child;
            final ServiceRetryPolicy<T>  resultPredicate;
            final Scheduler.Worker inner;
            final SerialSubscription serialSubscription;
            final AtomicInteger attempts = new AtomicInteger();
            public SourceSubscriber(Subscriber<? super T> child,
                                    final ServiceRetryPolicy<T>  resultPredicate,
                                    Scheduler.Worker inner,
                                    SerialSubscription serialSubscription) {
                this.child = child;
                this.resultPredicate=resultPredicate;
                this.inner = inner;
                this.serialSubscription = serialSubscription;
            }
    
    
            @Override
            public void onCompleted() {
                // ignore as we expect a single nested Observable<T>
            }
    
            @Override
            public void onError(Throwable e) {
                child.onError(e);
            }
    
            @Override
            public void onNext(final Observable<T> o) {
                inner.schedule(new Action0() {
    
                    @Override
                    public void call() {
                        final Action0 _self = this;
                        attempts.incrementAndGet();
    
                        // new subscription each time so if it unsubscribes itself it does not prevent retries
                        // by unsubscribing the child subscription
                        Subscriber<T> subscriber = new Subscriber<T>() {
                            boolean done;
                            @Override
                            public void onCompleted() {
                                if (!done) {
                                    done = true;
                                    child.onCompleted();
                                }
                            }
    
                            @Override
                            public void onError(Throwable e) {
                                if (!done) {
                                    done = true;
                                    if (resultPredicate.shouldRetry(e,attempts.get()) && !inner.isUnsubscribed()) {
                                        // retry again
                                        if(resultPredicate.getRetryStrategy().equals(RetryStrategy.EXPONENTIAL)){
                                            inner.schedule(_self,(long) Math.pow(resultPredicate.getDelay(), attempts.get()), resultPredicate.getUnit());
                                        }else {
                                           inner.schedule(_self);
                                        }
    
                                    } else {
                                        // give up and pass the failure
                                        child.onError(e);
                                    }
                                }
                            }
    
                            @Override
                            public void onNext(T v) {
                                if (!done) {
                                    done = true;
                                    if (resultPredicate.shouldRetry( v,attempts.get()) && !inner.isUnsubscribed()) {
                                        // retry again with the target scheduling
                                        if(resultPredicate.getRetryStrategy().equals(RetryStrategy.EXPONENTIAL)){
                                            inner.schedule(_self,(long) Math.pow(resultPredicate.getDelay(), attempts.get()), resultPredicate.getUnit());
                                        }else {
                                            inner.schedule(_self);
                                        }
                                    } else {
                                        // correct result or too many retries so give up and pass the result and end the subscription
                                        child.onNext(v);
                                        child.onCompleted();
                                    }
                                }
                            }
                        };
                        // register this Subscription (and unsubscribe previous if exists)
                        serialSubscription.set(subscriber);
                        o.unsafeSubscribe(subscriber);
                    }
                });
            }
        }
    }
    

    Tuesday, February 07, 2017 12:29 PM