diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 240018a2241..51fd53bd2a8 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -78,6 +78,7 @@ import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; import rx.subjects.PublishSubject; +import rx.subjects.ReplaySubject; import rx.subjects.Subject; import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; @@ -1666,6 +1667,17 @@ public static Observable onErrorReturn(final Observable that, Func1 ConnectableObservable replay(final Observable that) { + return OperationMulticast.multicast(that, ReplaySubject. create()); + } + /** * Returns a connectable observable sequence that shares a single subscription to the underlying sequence. * @@ -3199,13 +3211,22 @@ public Observable reduce(Func2 accumulator) { return reduce(this, accumulator); } + /** + * Returns a connectable observable sequence that shares a single subscription to the underlying sequence replaying all notifications. + * + * @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject. + */ + public ConnectableObservable replay() { + return replay(this); + } + /** * Returns a connectable observable sequence that shares a single subscription to the underlying sequence. * * @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject. */ public ConnectableObservable publish() { - return OperationMulticast.multicast(this, PublishSubject. create()); + return publish(this); } /** @@ -4174,10 +4195,10 @@ public Subscription call(final Observer observer) { @Override public void run() { + counter.incrementAndGet(); System.out.println("published observable being executed"); observer.onNext("one"); observer.onCompleted(); - counter.incrementAndGet(); } }).start(); return subscription; @@ -4219,6 +4240,66 @@ public void call(String v) { } } + @Test + public void testReplay() throws InterruptedException { + final AtomicInteger counter = new AtomicInteger(); + ConnectableObservable o = Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(final Observer observer) { + final BooleanSubscription subscription = new BooleanSubscription(); + new Thread(new Runnable() { + + @Override + public void run() { + System.out.println("published observable being executed"); + observer.onNext("one"); + observer.onCompleted(); + counter.incrementAndGet(); + } + }).start(); + return subscription; + } + }).replay(); + + // we connect immediately and it will emit the value + Subscription s = o.connect(); + try { + + // we then expect the following 2 subscriptions to get that same value + final CountDownLatch latch = new CountDownLatch(2); + + // subscribe once + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + System.out.println("v: " + v); + latch.countDown(); + } + }); + + // subscribe again + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + System.out.println("v: " + v); + latch.countDown(); + } + }); + + if (!latch.await(1000, TimeUnit.MILLISECONDS)) { + fail("subscriptions did not receive values"); + } + assertEquals(1, counter.get()); + } finally { + s.unsubscribe(); + } + } + private static class TestException extends RuntimeException { private static final long serialVersionUID = 1L; }