diff --git a/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java b/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java index e9ff5482f4..96a3c5c170 100644 --- a/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java +++ b/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java @@ -16,6 +16,7 @@ package rx.internal.operators; import rx.Observable; +import rx.Producer; import rx.Observable.Operator; import rx.Subscriber; import rx.exceptions.Exceptions; @@ -87,6 +88,16 @@ public void onNext(T t) { } child.onNext(t); } + + @Override + public void setProducer(final Producer producer) { + child.setProducer(new Producer() { + @Override + public void request(long n) { + producer.request(n); + } + }); + } }; child.add(parent); diff --git a/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservable.java b/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservable.java index 7a14a65a06..3e8afcea00 100644 --- a/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservable.java +++ b/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservable.java @@ -16,6 +16,7 @@ package rx.internal.operators; import rx.Observable; +import rx.Producer; import rx.Observable.Operator; import rx.Subscriber; import rx.exceptions.Exceptions; @@ -84,6 +85,16 @@ public void onCompleted() { child.onCompleted(); } + @Override + public void setProducer(final Producer producer) { + child.setProducer(new Producer() { + @Override + public void request(long n) { + producer.request(n); + } + }); + } + }; child.add(s);