diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 73a11e421c..4a92aa8673 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -77,6 +77,7 @@ import rx.plugins.RxJavaErrorHandler; import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; +import rx.subjects.PublishSubject; import rx.subjects.Subject; import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; @@ -2197,6 +2198,19 @@ public static Iterable mostRecent(Observable source, T initialValue) { return OperationMostRecent.mostRecent(source, initialValue); } + /** + * Returns a connectable observable sequence that upon connection causes the source sequence to push results to any connected subscribers. + * + * @param source + * the source sequence whose elements will be pushed to all connected subscribers. + * @param + * source (and result) type + * @return a connectable observable sequence that upon connection causes the source sequence to push results to any connected subscribers. + */ + public static ConnectableObservable publish(Observable source) { + return OperationMulticast.multicast(source, PublishSubject.create()); + } + /** * Returns a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject. * diff --git a/rxjava-core/src/main/java/rx/operators/OperationMulticast.java b/rxjava-core/src/main/java/rx/operators/OperationMulticast.java index ad312fa1dd..bb4585d331 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMulticast.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMulticast.java @@ -50,6 +50,7 @@ public Subscription call(Observer observer) { this.subject = subject; } + @Override public Subscription connect() { synchronized (lock) { if (subscription == null) {