From a0998b9fb566395f77bffec666690ed260806617 Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Mon, 6 Jul 2015 08:48:56 +1000 Subject: [PATCH] add Subscribers.wrap --- .../internal/operators/OnSubscribeDefer.java | 16 +------- .../OnSubscribeDelaySubscription.java | 16 +------- ...ubscribeDelaySubscriptionWithSelector.java | 16 +------- .../internal/operators/OnSubscribeUsing.java | 16 +------- .../operators/OperatorDoOnSubscribe.java | 16 +------- .../operators/OperatorDoOnUnsubscribe.java | 19 +--------- src/main/java/rx/observers/Subscribers.java | 38 +++++++++++++++++++ 7 files changed, 50 insertions(+), 87 deletions(-) diff --git a/src/main/java/rx/internal/operators/OnSubscribeDefer.java b/src/main/java/rx/internal/operators/OnSubscribeDefer.java index 4a6434140c..23ee937145 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeDefer.java +++ b/src/main/java/rx/internal/operators/OnSubscribeDefer.java @@ -19,6 +19,7 @@ import rx.Observable.OnSubscribe; import rx.Subscriber; import rx.functions.Func0; +import rx.observers.Subscribers; /** * Do not create the Observable until an Observer subscribes; create a fresh Observable on each @@ -46,20 +47,7 @@ public void call(final Subscriber s) { s.onError(t); return; } - o.unsafeSubscribe(new Subscriber(s) { - @Override - public void onNext(T t) { - s.onNext(t); - } - @Override - public void onError(Throwable e) { - s.onError(e); - } - @Override - public void onCompleted() { - s.onCompleted(); - } - }); + o.unsafeSubscribe(Subscribers.wrap(s)); } } diff --git a/src/main/java/rx/internal/operators/OnSubscribeDelaySubscription.java b/src/main/java/rx/internal/operators/OnSubscribeDelaySubscription.java index 95036d399e..1876db73a3 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeDelaySubscription.java +++ b/src/main/java/rx/internal/operators/OnSubscribeDelaySubscription.java @@ -21,6 +21,7 @@ import rx.Observable.OnSubscribe; import rx.Scheduler.Worker; import rx.functions.Action0; +import rx.observers.Subscribers; /** * Delays the subscription to the source by the given amount, running on the given scheduler. @@ -49,20 +50,7 @@ public void call(final Subscriber s) { @Override public void call() { if (!s.isUnsubscribed()) { - source.unsafeSubscribe(new Subscriber(s) { - @Override - public void onNext(T t) { - s.onNext(t); - } - @Override - public void onError(Throwable e) { - s.onError(e); - } - @Override - public void onCompleted() { - s.onCompleted(); - } - }); + source.unsafeSubscribe(Subscribers.wrap(s)); } } }, time, unit); diff --git a/src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionWithSelector.java b/src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionWithSelector.java index d6b2f0ad2c..b32179b3f7 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionWithSelector.java +++ b/src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionWithSelector.java @@ -18,6 +18,7 @@ import rx.*; import rx.Observable.OnSubscribe; import rx.functions.Func0; +import rx.observers.Subscribers; /** * Delays the subscription until the Observable emits an event. @@ -42,20 +43,7 @@ public void call(final Subscriber child) { @Override public void onCompleted() { // subscribe to actual source - source.unsafeSubscribe(new Subscriber(child) { - @Override - public void onNext(T t) { - child.onNext(t); - } - @Override - public void onError(Throwable e) { - child.onError(e); - } - @Override - public void onCompleted() { - child.onCompleted(); - } - }); + source.unsafeSubscribe(Subscribers.wrap(child)); } @Override diff --git a/src/main/java/rx/internal/operators/OnSubscribeUsing.java b/src/main/java/rx/internal/operators/OnSubscribeUsing.java index 7470a65dc8..14d8d46b7b 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeUsing.java +++ b/src/main/java/rx/internal/operators/OnSubscribeUsing.java @@ -22,6 +22,7 @@ import rx.Observable.OnSubscribe; import rx.exceptions.CompositeException; import rx.functions.*; +import rx.observers.Subscribers; /** * Constructs an observable sequence that depends on a resource object. @@ -68,20 +69,7 @@ public void call(final Subscriber subscriber) { observable = source; try { // start - observable.unsafeSubscribe(new Subscriber(subscriber) { - @Override - public void onNext(T t) { - subscriber.onNext(t); - } - @Override - public void onError(Throwable e) { - subscriber.onError(e); - } - @Override - public void onCompleted() { - subscriber.onCompleted(); - } - }); + observable.unsafeSubscribe(Subscribers.wrap(subscriber)); } catch (Throwable e) { Throwable disposeError = disposeEagerlyIfRequested(disposeOnceOnly); if (disposeError != null) diff --git a/src/main/java/rx/internal/operators/OperatorDoOnSubscribe.java b/src/main/java/rx/internal/operators/OperatorDoOnSubscribe.java index b7999c2b5c..685cf9ae1b 100644 --- a/src/main/java/rx/internal/operators/OperatorDoOnSubscribe.java +++ b/src/main/java/rx/internal/operators/OperatorDoOnSubscribe.java @@ -18,6 +18,7 @@ import rx.Observable.Operator; import rx.Subscriber; import rx.functions.Action0; +import rx.observers.Subscribers; /** * This operator modifies an {@link rx.Observable} so a given action is invoked when the {@link rx.Observable} is subscribed. @@ -39,19 +40,6 @@ public Subscriber call(final Subscriber child) { subscribe.call(); // Pass through since this operator is for notification only, there is // no change to the stream whatsoever. - return new Subscriber(child) { - @Override - public void onNext(T t) { - child.onNext(t); - } - @Override - public void onError(Throwable e) { - child.onError(e); - } - @Override - public void onCompleted() { - child.onCompleted(); - } - }; + return Subscribers.wrap(child); } } diff --git a/src/main/java/rx/internal/operators/OperatorDoOnUnsubscribe.java b/src/main/java/rx/internal/operators/OperatorDoOnUnsubscribe.java index 396012c2eb..217c46977f 100644 --- a/src/main/java/rx/internal/operators/OperatorDoOnUnsubscribe.java +++ b/src/main/java/rx/internal/operators/OperatorDoOnUnsubscribe.java @@ -18,6 +18,7 @@ import rx.Observable.Operator; import rx.*; import rx.functions.Action0; +import rx.observers.Subscribers; import rx.subscriptions.Subscriptions; /** @@ -41,22 +42,6 @@ public Subscriber call(final Subscriber child) { // Pass through since this operator is for notification only, there is // no change to the stream whatsoever. - return new Subscriber(child) { - @Override - public void onStart() { - } - @Override - public void onNext(T t) { - child.onNext(t); - } - @Override - public void onError(Throwable e) { - child.onError(e); - } - @Override - public void onCompleted() { - child.onCompleted(); - } - }; + return Subscribers.wrap(child); } } diff --git a/src/main/java/rx/observers/Subscribers.java b/src/main/java/rx/observers/Subscribers.java index c7cffdb46c..1c42aa4b68 100644 --- a/src/main/java/rx/observers/Subscribers.java +++ b/src/main/java/rx/observers/Subscribers.java @@ -17,6 +17,7 @@ import rx.Observer; import rx.Subscriber; +import rx.annotations.Experimental; import rx.exceptions.OnErrorNotImplementedException; import rx.functions.Action0; import rx.functions.Action1; @@ -198,4 +199,41 @@ public final void onNext(T args) { }; } + /** + * Returns a new {@link Subscriber} that passes all events to + * subscriber, has backpressure controlled by + * subscriber and uses the subscription list of + * subscriber when {@link Subscriber#add(rx.Subscription)} is + * called. + * + * @param subscriber + * the Subscriber to wrap. + * + * @return a new Subscriber that passes all events to + * subscriber, has backpressure controlled by + * subscriber and uses subscriber to + * manage unsubscription. + * + */ + @Experimental + public static Subscriber wrap(final Subscriber subscriber) { + return new Subscriber(subscriber) { + + @Override + public void onCompleted() { + subscriber.onCompleted(); + } + + @Override + public void onError(Throwable e) { + subscriber.onError(e); + } + + @Override + public void onNext(T t) { + subscriber.onNext(t); + } + + }; + } }