Skip to content

Commit

Permalink
Merge pull request #3065 from davidmoten/subscribers-wrap
Browse files Browse the repository at this point in the history
add Subscribers.wrap
  • Loading branch information
benjchristensen committed Jul 14, 2015
2 parents 6e3e344 + a0998b9 commit bd8fb30
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 87 deletions.
16 changes: 2 additions & 14 deletions src/main/java/rx/internal/operators/OnSubscribeDefer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.functions.Func0;
import rx.observers.Subscribers;

/**
* Do not create the Observable until an Observer subscribes; create a fresh Observable on each
Expand Down Expand Up @@ -46,20 +47,7 @@ public void call(final Subscriber<? super T> s) {
s.onError(t);
return;
}
o.unsafeSubscribe(new Subscriber<T>(s) {
@Override
public void onNext(T t) {
s.onNext(t);
}
@Override
public void onError(Throwable e) {
s.onError(e);
}
@Override
public void onCompleted() {
s.onCompleted();
}
});
o.unsafeSubscribe(Subscribers.wrap(s));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import rx.Observable.OnSubscribe;
import rx.Scheduler.Worker;
import rx.functions.Action0;
import rx.observers.Subscribers;

/**
* Delays the subscription to the source by the given amount, running on the given scheduler.
Expand Down Expand Up @@ -49,20 +50,7 @@ public void call(final Subscriber<? super T> s) {
@Override
public void call() {
if (!s.isUnsubscribed()) {
source.unsafeSubscribe(new Subscriber<T>(s) {
@Override
public void onNext(T t) {
s.onNext(t);
}
@Override
public void onError(Throwable e) {
s.onError(e);
}
@Override
public void onCompleted() {
s.onCompleted();
}
});
source.unsafeSubscribe(Subscribers.wrap(s));
}
}
}, time, unit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import rx.*;
import rx.Observable.OnSubscribe;
import rx.functions.Func0;
import rx.observers.Subscribers;

/**
* Delays the subscription until the Observable<U> emits an event.
Expand All @@ -42,20 +43,7 @@ public void call(final Subscriber<? super T> child) {
@Override
public void onCompleted() {
// subscribe to actual source
source.unsafeSubscribe(new Subscriber<T>(child) {
@Override
public void onNext(T t) {
child.onNext(t);
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onCompleted() {
child.onCompleted();
}
});
source.unsafeSubscribe(Subscribers.wrap(child));
}

@Override
Expand Down
16 changes: 2 additions & 14 deletions src/main/java/rx/internal/operators/OnSubscribeUsing.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import rx.Observable.OnSubscribe;
import rx.exceptions.CompositeException;
import rx.functions.*;
import rx.observers.Subscribers;

/**
* Constructs an observable sequence that depends on a resource object.
Expand Down Expand Up @@ -68,20 +69,7 @@ public void call(final Subscriber<? super T> subscriber) {
observable = source;
try {
// start
observable.unsafeSubscribe(new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onCompleted() {
subscriber.onCompleted();
}
});
observable.unsafeSubscribe(Subscribers.wrap(subscriber));
} catch (Throwable e) {
Throwable disposeError = disposeEagerlyIfRequested(disposeOnceOnly);
if (disposeError != null)
Expand Down
16 changes: 2 additions & 14 deletions src/main/java/rx/internal/operators/OperatorDoOnSubscribe.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import rx.Observable.Operator;
import rx.Subscriber;
import rx.functions.Action0;
import rx.observers.Subscribers;

/**
* This operator modifies an {@link rx.Observable} so a given action is invoked when the {@link rx.Observable} is subscribed.
Expand All @@ -39,19 +40,6 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
subscribe.call();
// Pass through since this operator is for notification only, there is
// no change to the stream whatsoever.
return new Subscriber<T>(child) {
@Override
public void onNext(T t) {
child.onNext(t);
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onCompleted() {
child.onCompleted();
}
};
return Subscribers.wrap(child);
}
}
19 changes: 2 additions & 17 deletions src/main/java/rx/internal/operators/OperatorDoOnUnsubscribe.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import rx.Observable.Operator;
import rx.*;
import rx.functions.Action0;
import rx.observers.Subscribers;
import rx.subscriptions.Subscriptions;

/**
Expand All @@ -41,22 +42,6 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {

// Pass through since this operator is for notification only, there is
// no change to the stream whatsoever.
return new Subscriber<T>(child) {
@Override
public void onStart() {
}
@Override
public void onNext(T t) {
child.onNext(t);
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onCompleted() {
child.onCompleted();
}
};
return Subscribers.wrap(child);
}
}
38 changes: 38 additions & 0 deletions src/main/java/rx/observers/Subscribers.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import rx.Observer;
import rx.Subscriber;
import rx.annotations.Experimental;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Action0;
import rx.functions.Action1;
Expand Down Expand Up @@ -198,4 +199,41 @@ public final void onNext(T args) {
};
}

/**
* Returns a new {@link Subscriber} that passes all events to
* <code>subscriber</code>, has backpressure controlled by
* <code>subscriber</code> and uses the subscription list of
* <code>subscriber</code> when {@link Subscriber#add(rx.Subscription)} is
* called.
*
* @param subscriber
* the Subscriber to wrap.
*
* @return a new Subscriber that passes all events to
* <code>subscriber</code>, has backpressure controlled by
* <code>subscriber</code> and uses <code>subscriber</code> to
* manage unsubscription.
*
*/
@Experimental
public static <T> Subscriber<T> wrap(final Subscriber<? super T> subscriber) {
return new Subscriber<T>(subscriber) {

@Override
public void onCompleted() {
subscriber.onCompleted();
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}

@Override
public void onNext(T t) {
subscriber.onNext(t);
}

};
}
}

0 comments on commit bd8fb30

Please sign in to comment.