diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 891e7fafaae..b33e33136ce 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -73,6 +73,7 @@ import rx.operators.OperationOnExceptionResumeNextViaObservable; import rx.operators.OperationParallel; import rx.operators.OperationParallelMerge; +import rx.operators.OperationReplay; import rx.operators.OperationRetry; import rx.operators.OperationSample; import rx.operators.OperationScan; @@ -526,6 +527,24 @@ public ConnectableObservable multicast(Subject su return OperationMulticast.multicast(this, subject); } + /** + * Returns an observable sequence that contains the elements of a sequence + * produced by multicasting the source sequence within a selector function. + * + * @param subjectFactory the subject factory + * @param selector The selector function which can use the multicasted + * source sequence subject to the policies enforced by the + * created subject. + * @return the Observable sequence that contains the elements of a sequence + * produced by multicasting the source sequence within a selector function. + * + * @see MSDN: Observable.Multicast + */ + public Observable multicast( + final Func0> subjectFactory, + final Func1, ? extends Observable> selector) { + return OperationMulticast.multicast(this, subjectFactory, selector); + } /** * Allow the {@link RxJavaErrorHandler} to receive the exception from * onError. @@ -4308,7 +4327,327 @@ public Observable> maxBy(Func1 selector, Comparator public ConnectableObservable replay() { return OperationMulticast.multicast(this, ReplaySubject. create()); } + + /** + * Returns a {@link ConnectableObservable} that shares a single subscription + * to the underlying Observable that will replay all of its items and + * notifications to any future {@link Observer} on the given scheduler. + * + * @param scheduler the scheduler where the Observers will receive the events + * @return a {@link ConnectableObservable} that shares a single subscription + * to the underlying Observable that will replay all of its items and + * notifications to any future {@link Observer} on the given scheduler + * + * @see MSDN: Observable.Replay + */ + public ConnectableObservable replay(Scheduler scheduler) { + return OperationMulticast.multicast(this, OperationReplay.createScheduledSubject(ReplaySubject.create(), scheduler)); + } + + /** + * Returns a connectable observable sequence that shares a single subscription + * to the underlying sequence replaying bufferSize notifications. + * + * @param bufferSize the buffer size + * @return a connectable observable sequence that shares a single subscription + * to the underlying sequence replaying bufferSize notifications + * + * @see MSDN: Observable.Replay + */ + public ConnectableObservable replay(int bufferSize) { + return OperationMulticast.multicast(this, OperationReplay.replayBuffered(bufferSize)); + } + + /** + * Returns a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications. + * + * @param bufferSize the buffer size + * @param scheduler the scheduler where the Observers will receive the events + * @return a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications + * + * @see MSDN: Observable.Replay + */ + public ConnectableObservable replay(int bufferSize, Scheduler scheduler) { + return OperationMulticast.multicast(this, + OperationReplay.createScheduledSubject( + OperationReplay.replayBuffered(bufferSize), scheduler)); + } + + /** + * Returns a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying all notifications within window. + * + * @param time the window length + * @param unit the window length time unit + * @return a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying all notifications within window + * @see MSDN: Observable.Replay + */ + public ConnectableObservable replay(long time, TimeUnit unit) { + return replay(time, unit, Schedulers.threadPoolForComputation()); + } + + /** + * Returns a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying all notifications within window. + * + * @param time the window length + * @param unit the window length time unit + * @param scheduler the scheduler which is used as a time source for the window + * @return a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying all notifications within window + * @see MSDN: Observable.Replay + */ + public ConnectableObservable replay(long time, TimeUnit unit, Scheduler scheduler) { + return OperationMulticast.multicast(this, OperationReplay.replayWindowed(time, unit, -1, scheduler)); + } + + /** + * Returns a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications within window. + * + * @param bufferSize the buffer size + * @param time the window length + * @param unit the window length time unit + * @return Returns a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications within window + * + * @see MSDN: Observable.Replay + */ + public ConnectableObservable replay(int bufferSize, long time, TimeUnit unit) { + return replay(bufferSize, time, unit, Schedulers.threadPoolForComputation()); + } + + /** + * Returns a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications within window. + * + * @param bufferSize the buffer size + * @param time the window length + * @param unit the window length time unit + * @param scheduler the scheduler which is used as a time source for the window + * @return a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications within window + * + * @see MSDN: Observable.Replay + */ + public ConnectableObservable replay(int bufferSize, long time, TimeUnit unit, Scheduler scheduler) { + if (bufferSize < 0) { + throw new IllegalArgumentException("bufferSize < 0"); + } + return OperationMulticast.multicast(this, OperationReplay.replayWindowed(time, unit, bufferSize, scheduler)); + } + + /** + * Returns an observable sequence that is the result of invoking the selector + * on a connectable observable sequence that shares a single subscription to + * the underlying sequence and starts with initial value. + * + * @param the return element type + * @param selector The selector function which can use the multicasted + * this sequence as many times as needed, without causing + * multiple subscriptions to this sequence. + * @return an observable sequence that is the result of invoking the selector + * on a connectable observable sequence that shares a single subscription to + * the underlying sequence and starts with initial value + * + * @see MSDN: Observable.Replay + */ + public Observable replay(Func1, ? extends Observable> selector) { + return OperationMulticast.multicast(this, new Func0>() { + @Override + public Subject call() { + return ReplaySubject.create(); + } + }, selector); + } + /** + * Returns an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying all notifications. + * + * @param the return element type + * @param selector The selector function which can use the multicasted + * this sequence as many times as needed, without causing + * multiple subscriptions to this sequence. + * @param scheduler the scheduler where the replay is observed + * @return an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying all notifications + * + * @see MSDN: Observable.Replay + */ + public Observable replay(Func1, ? extends Observable> selector, final Scheduler scheduler) { + return OperationMulticast.multicast(this, new Func0>() { + @Override + public Subject call() { + return OperationReplay.createScheduledSubject(ReplaySubject.create(), scheduler); + } + }, selector); + } + + /** + * Returns an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications. + * + * @param the return element type + * @param selector The selector function which can use the multicasted + * this sequence as many times as needed, without causing + * multiple subscriptions to this sequence. + * @param bufferSize the buffer size + * @return an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications + * + * @see MSDN: Observable.Replay + */ + public Observable replay(Func1, ? extends Observable> selector, final int bufferSize) { + return OperationMulticast.multicast(this, new Func0>() { + @Override + public Subject call() { + return OperationReplay.replayBuffered(bufferSize); + } + }, selector); + } + + /** + * Returns an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications. + * + * @param the return element type + * @param selector The selector function which can use the multicasted + * this sequence as many times as needed, without causing + * multiple subscriptions to this sequence. + * @param bufferSize the buffer size + * @param scheduler the scheduler where the replay is observed + * @return an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications + * + * @see MSDN: Observable.Replay + */ + public Observable replay(Func1, ? extends Observable> selector, final int bufferSize, final Scheduler scheduler) { + return OperationMulticast.multicast(this, new Func0>() { + @Override + public Subject call() { + return OperationReplay.createScheduledSubject(OperationReplay.replayBuffered(bufferSize), scheduler); + } + }, selector); + } + + /** + * Returns an observable sequence that is the result of invoking + * the selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying all notifications within window. + * + * @param the return element type + * @param selector The selector function which can use the multicasted + * this sequence as many times as needed, without causing + * multiple subscriptions to this sequence. + * @param time the window length + * @param unit the window length time unit + * @return an observable sequence that is the result of invoking + * the selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying all notifications within window + * + * @see MSDN: Observable.Replay + */ + public Observable replay(Func1, ? extends Observable> selector, long time, TimeUnit unit) { + return replay(selector, time, unit, Schedulers.threadPoolForComputation()); + } + + /** + * Returns an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying all notifications within window. + * + * @param the return element type + * @param selector The selector function which can use the multicasted + * this sequence as many times as needed, without causing + * multiple subscriptions to this sequence. + * @param time the window length + * @param unit the window length time unit + * @param scheduler the scheduler which is used as a time source for the window + * @return an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying all notifications within window + * + * @see MSDN: Observable.Replay + */ + public Observable replay(Func1, ? extends Observable> selector, final long time, final TimeUnit unit, final Scheduler scheduler) { + return OperationMulticast.multicast(this, new Func0>() { + @Override + public Subject call() { + return OperationReplay.replayWindowed(time, unit, -1, scheduler); + } + }, selector); + } + + /** + * Returns an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications + * within window. + * + * @param the return element type + * @param selector The selector function which can use the multicasted + * this sequence as many times as needed, without causing + * multiple subscriptions to this sequence. + * @param bufferSize the buffer size + * @param time the window length + * @param unit the window length time unit + * + * @return an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications + * within window + * + * @see MSDN: Observable.Replay + */ + public Observable replay(Func1, ? extends Observable> selector, int bufferSize, long time, TimeUnit unit) { + return replay(selector, bufferSize, time, unit, Schedulers.threadPoolForComputation()); + } + + + /** + * Returns an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications + * within window. + * + * @param the return element type + * @param selector The selector function which can use the multicasted + * this sequence as many times as needed, without causing + * multiple subscriptions to this sequence. + * @param bufferSize the buffer size + * @param time the window length + * @param unit the window length time unit + * @param scheduler the scheduler which is used as a time source for the window + * + * @return an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications + * within window + * + * @see MSDN: Observable.Replay + */ + public Observable replay(Func1, ? extends Observable> selector, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) { + if (bufferSize < 0) { + throw new IllegalArgumentException("bufferSize < 0"); + } + return OperationMulticast.multicast(this, new Func0>() { + @Override + public Subject call() { + return OperationReplay.replayWindowed(time, unit, bufferSize, scheduler); + } + }, selector); + } + /** * Retry subscription to the source Observable when it calls * onError up to a certain number of retries. diff --git a/rxjava-core/src/main/java/rx/operators/OperationMulticast.java b/rxjava-core/src/main/java/rx/operators/OperationMulticast.java index b634b2dbac0..ef310986f55 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMulticast.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMulticast.java @@ -16,10 +16,15 @@ package rx.operators; import rx.Observable; +import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Subscription; import rx.observables.ConnectableObservable; import rx.subjects.Subject; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Func0; +import rx.util.functions.Func1; public class OperationMulticast { public static ConnectableObservable multicast(Observable source, final Subject subject) { @@ -81,4 +86,57 @@ public void unsubscribe() { } } + /** + * Returns an observable sequence that contains the elements of a sequence + * produced by multicasting the source sequence within a selector function. + * + * @param source + * @param subjectFactory + * @param selector + * @return + * + * @see MSDN: Observable.Multicast + */ + public static Observable multicast( + final Observable source, + final Func0> subjectFactory, + final Func1, ? extends Observable> selector) { + return Observable.create(new MulticastSubscribeFunc(source, subjectFactory, selector)); + } + /** The multicast subscription function. */ + private static final class MulticastSubscribeFunc implements OnSubscribeFunc { + final Observable source; + final Func0> subjectFactory; + final Func1, ? extends Observable> resultSelector; + public MulticastSubscribeFunc(Observable source, + Func0> subjectFactory, + Func1, ? extends Observable> resultSelector) { + this.source = source; + this.subjectFactory = subjectFactory; + this.resultSelector = resultSelector; + } + @Override + public Subscription onSubscribe(Observer t1) { + Observable observable; + ConnectableObservable connectable; + try { + Subject subject = subjectFactory.call(); + + connectable = new MulticastConnectableObservable(source, subject); + + observable = resultSelector.call(connectable); + } catch (Throwable t) { + t1.onError(t); + return Subscriptions.empty(); + } + + CompositeSubscription csub = new CompositeSubscription(); + + csub.add(observable.subscribe(new SafeObserver( + new SafeObservableSubscription(csub), t1))); + csub.add(connectable.connect()); + + return csub; + } + } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationReplay.java b/rxjava-core/src/main/java/rx/operators/OperationReplay.java new file mode 100644 index 00000000000..1a89b3d2ac5 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationReplay.java @@ -0,0 +1,751 @@ + /** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.subjects.Subject; +import rx.subscriptions.Subscriptions; +import rx.util.Timestamped; +import rx.util.functions.Action0; +import rx.util.functions.Func1; +import rx.util.functions.Functions; + +/** + * Replay with limited buffer and/or time constraints. + * + * + * @see MSDN: Observable.Replay overloads + */ +public final class OperationReplay { + /** Utility class. */ + private OperationReplay() { throw new IllegalStateException("No instances!"); } + + /** + * Create a BoundedReplaySubject with the given buffer size. + */ + public static Subject replayBuffered(int bufferSize) { + return CustomReplaySubject.create(bufferSize); + } + /** + * Creates a subject whose client observers will observe events + * propagated through the given wrapped subject. + */ + public static Subject createScheduledSubject(Subject subject, Scheduler scheduler) { + Observable observedOn = subject.observeOn(scheduler); + SubjectWrapper s = new SubjectWrapper(subscriberOf(observedOn), subject); + return s; + } + + /** + * Create a CustomReplaySubject with the given time window length + * and optional buffer size. + * + * @param the source and return type + * @param time the length of the time window + * @param unit the unit of the time window length + * @param bufferSize the buffer size if >= 0, otherwise, the buffer will be unlimited + * @param scheduler the scheduler from where the current time is retrieved. The + * observers will not observe on this scheduler. + * @return a Subject with the required replay behavior + */ + public static Subject replayWindowed(long time, TimeUnit unit, int bufferSize, final Scheduler scheduler) { + final long ms = unit.toMillis(time); + if (ms <= 0) { + throw new IllegalArgumentException("The time window is less than 1 millisecond!"); + } + Func1> timestamp = new Func1>() { + @Override + public Timestamped call(T t1) { + return new Timestamped(scheduler.now(), t1); + } + }; + Func1, T> untimestamp = new Func1, T>() { + @Override + public T call(Timestamped t1) { + return t1.getValue(); + } + }; + + ReplayState, T> state; + + if (bufferSize >= 0) { + state = new ReplayState, T>(new VirtualBoundedList>(bufferSize), untimestamp); + } else { + state = new ReplayState, T>(new VirtualArrayList>(), untimestamp); + } + final ReplayState, T> fstate = state; + // time based eviction when a value is added + state.onValueAdded = new Action0() { + @Override + public void call() { + long now = scheduler.now(); + long before = now - ms; + for (int i = fstate.values.start(); i < fstate.values.end(); i++) { + Timestamped v = fstate.values.get(i); + if (v.getTimestampMillis() >= before) { + fstate.values.removeBefore(i); + break; + } + } + } + }; + // time based eviction when a client subscribes + state.onSubscription = state.onValueAdded; + + final CustomReplaySubject, T> brs = new CustomReplaySubject, T>( + new CustomReplaySubjectSubscribeFunc, T>(state), state, timestamp + ); + + return brs; + } + + /** + * Return an OnSubscribeFunc which delegates the subscription to the given observable. + */ + public static OnSubscribeFunc subscriberOf(final Observable target) { + return new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer t1) { + return target.subscribe(t1); + } + }; + } + + /** + * Subject that wraps another subject and uses a mapping function + * to transform the received values. + */ + public static final class MappingSubject extends Subject { + private final Subject subject; + private final Func1 selector; + public MappingSubject(OnSubscribeFunc func, Subject subject, Func1 selector) { + super(func); + this.subject = subject; + this.selector = selector; + } + + @Override + public void onNext(T args) { + subject.onNext(selector.call(args)); + } + + @Override + public void onError(Throwable e) { + subject.onError(e); + } + + @Override + public void onCompleted() { + subject.onCompleted(); + } + + } + + /** + * A subject that wraps another subject. + */ + public static final class SubjectWrapper extends Subject { + /** The wrapped subject. */ + final Subject subject; + public SubjectWrapper(OnSubscribeFunc func, Subject subject) { + super(func); + this.subject = subject; + } + + @Override + public void onNext(T args) { + subject.onNext(args); + } + + @Override + public void onError(Throwable e) { + subject.onError(e); + } + + @Override + public void onCompleted() { + subject.onCompleted(); + } + + } + + /** Base state with lock. */ + static class BaseState { + /** The lock to protect the other fields. */ + private final Lock lock = new ReentrantLock(); + /** Lock. */ + public void lock() { + lock.lock(); + } + /** Unlock. */ + public void unlock() { + lock.unlock(); + } + + } + /** + * Base interface for logically indexing a list. + * @param the value type + */ + public interface VirtualList { + /** @return the number of elements in this list */ + int size(); + /** + * Add an element to the list. + * @param value the value to add + */ + void add(T value); + /** + * Retrieve an element at the specified logical index. + * @param index + * @return + */ + T get(int index); + /** + * Remove elements up before the given logical index and move + * the start() to this index. + *

+ * For example, a list contains 3 items. Calling removeUntil 2 will + * remove the first two items. + * @param index + */ + void removeBefore(int index); + /** + * Clear the elements of this list and increase the + * start by the number of elements. + */ + void clear(); + /** + * Returns the current head index of this list. + * @return + */ + int start(); + /** + * Returns the current tail index of this list (where the next value would appear). + * @return + */ + int end(); + /** + * Clears and resets the indexes of the list. + */ + void reset(); + /** + * Returns the current content as a list. + * @return + */ + List toList(); + } + /** + * Behaves like a normal, unbounded ArrayList but with virtual index. + */ + public static final class VirtualArrayList implements VirtualList { + /** The backing list .*/ + final List list = new ArrayList(); + /** The virtual start index of the list. */ + int startIndex; + @Override + public int size() { + return list.size(); + } + @Override + public void add(T value) { + list.add(value); + } + + @Override + public T get(int index) { + return list.get(index - startIndex); + } + + @Override + public void removeBefore(int index) { + int j = index - startIndex; + if (j > 0 && j <= list.size()) { + list.subList(0, j).clear(); + } + startIndex = index; + } + + @Override + public void clear() { + startIndex += list.size(); + list.clear(); + } + + @Override + public int start() { + return startIndex; + } + + @Override + public int end() { + return startIndex + list.size(); + } + + @Override + public void reset() { + list.clear(); + startIndex = 0; + } + @Override + public List toList() { + return new ArrayList(list); + } + + } + /** + * A bounded list which increases its size up to a maximum capacity, then + * behaves like a circular buffer with virtual indexes. + */ + public static final class VirtualBoundedList implements VirtualList { + /** A list that grows up to maxSize. */ + private final List list = new ArrayList(); + /** The maximum allowed size. */ + private final int maxSize; + /** The logical start index of the list. */ + int startIndex; + /** The head index inside the list, where the first readable value sits. */ + int head; + /** The tail index inside the list, where the next value will be added. */ + int tail; + /** The number of items in the list. */ + int count; + /** + * Construct a VirtualBoundedList with the given maximum number of elements. + * @param maxSize + */ + public VirtualBoundedList(int maxSize) { + if (maxSize < 0) { + throw new IllegalArgumentException("maxSize < 0"); + } + this.maxSize = maxSize; + } + @Override + public int start() { + return startIndex; + } + + @Override + public int end() { + return startIndex + count; + } + + @Override + public void clear() { + startIndex += count; + list.clear(); + head = 0; + tail = 0; + count = 0; + } + @Override + public int size() { + return count; + } + + @Override + public void add(T value) { + if (list.size() == maxSize) { + list.set(tail, value); + head = (head + 1) % maxSize; + tail = (tail + 1) % maxSize; + startIndex++; + } else { + list.add(value); + tail = (tail + 1) % maxSize; + count++; + } + } + + @Override + public T get(int index) { + if (index < start() || index >= end()) { + throw new ArrayIndexOutOfBoundsException(index); + } + int idx = (head + (index - startIndex)) % maxSize; + return list.get(idx); + } + + @Override + public void removeBefore(int index) { + if (index <= start()) { + return; + } + if (index >= end()) { + clear(); + startIndex = index; + return; + } + int rc = index - startIndex; + int head2 = head + rc; + for (int i = head; i < head2; i++) { + list.set(i % maxSize, null); + count--; + } + startIndex = index; + head = head2 % maxSize; + } + @Override + public List toList() { + List r = new ArrayList(list.size() + 1); + for (int i = head; i < head + count; i++) { + int idx = i % maxSize; + r.add(list.get(idx)); + } + return r; + } + + @Override + public void reset() { + list.clear(); + count = 0; + head = 0; + tail = 0; + } + + } + /** + * The state class. + * @param the intermediate type stored in the values buffer + * @param the result type transformed via the resultSelector + */ + static final class ReplayState extends BaseState { + /** The values observed so far. */ + final VirtualList values; + /** The result selector. */ + final Func1 resultSelector; + /** The received error. */ + Throwable error; + /** General completion indicator. */ + boolean done; + /** The map of replayers. */ + final Map replayers = new LinkedHashMap(); + /** + * Callback once a value has been added but before it is replayed + * (I.e, run a time based eviction policy). + *

+ * Called while holding the state lock. + */ + protected Action0 onValueAdded = new Action0() { + @Override + public void call() { + } + }; + /** + * Callback once an error has been called but before it is replayed + * (I.e, run a time based eviction policy). + *

+ * Called while holding the state lock. + */ + protected Action0 onErrorAdded = new Action0() { + @Override + public void call() { + } + }; + /** + * Callback once completed has been called but before it is replayed + * (I.e, run a time based eviction policy). + *

+ * Called while holding the state lock. + */ + protected Action0 onCompletedAdded = new Action0() { + @Override + public void call() { + } + }; + /** + * Callback to pre-manage the values if an observer unsubscribes + * (I.e, run a time based eviction policy). + *

+ * Called while holding the state lock. + */ + protected Action0 onSubscription = new Action0() { + @Override + public void call() { + } + }; + /** + * Construct a ReplayState with the supplied buffer and result selectors. + * @param values + * @param resultSelector + */ + public ReplayState(final VirtualList values, + final Func1 resultSelector) { + this.values = values; + this.resultSelector = resultSelector; + } + /** + * Returns a live collection of the observers. + *

+ * Caller should hold the lock. + * @return + */ + Collection replayers() { + return new ArrayList(replayers.values()); + } + /** + * Add a replayer to the replayers and create a Subscription for it. + *

+ * Caller should hold the lock. + * + * @param obs + * @return + */ + Subscription addReplayer(Observer obs) { + Subscription s = new Subscription() { + final AtomicBoolean once = new AtomicBoolean(); + @Override + public void unsubscribe() { + if (once.compareAndSet(false, true)) { + remove(this); + } + } + + }; + Replayer rp = new Replayer(obs, s); + replayers.put(s, rp); + rp.replayTill(values.start() + values.size()); + return s; + } + /** The replayer that holds a value where the given observer is currently at. */ + final class Replayer { + protected final Observer wrapped; + /** Where this replayer was in reading the list. */ + protected int index; + /** To cancel and unsubscribe this replayer and observer. */ + protected final Subscription cancel; + protected Replayer(Observer wrapped, Subscription cancel) { + this.wrapped = wrapped; + this.cancel = cancel; + } + /** + * Replay up to the given index + * @param limit + */ + void replayTill(int limit) { + int si = values.start(); + if (index < si) { + index = si; + } + while (index < limit) { + TIntermediate value = values.get(index); + index++; + try { + wrapped.onNext(resultSelector.call(value)); + } catch (Throwable t) { + replayers.remove(cancel); + wrapped.onError(t); + return; + } + } + if (done) { + if (error != null) { + wrapped.onError(error); + } else { + wrapped.onCompleted(); + } + } + } + } + /** + * Remove the subscription. + * @param s + */ + void remove(Subscription s) { + lock(); + try { + replayers.remove(s); + } finally { + unlock(); + } + } + /** + * Add a notification value and limit the size of values. + *

+ * Caller should hold the lock. + * @param value + */ + void add(TIntermediate value) { + values.add(value); + } + /** Clears the value list. */ + void clearValues() { + lock(); + try { + values.clear(); + } finally { + unlock(); + } + } + } + /** + * A customizable replay subject with support for transformations. + * + * @param the Observer side's value type + * @param the type of the elements in the replay buffer + * @param the value type of the observers subscribing to this subject + */ + public static final class CustomReplaySubject extends Subject { + /** + * Return a subject that retains all events and will replay them to an {@link Observer} that subscribes. + * @return a subject that retains all events and will replay them to an {@link Observer} that subscribes. + */ + public static CustomReplaySubject create() { + ReplayState state = new ReplayState(new VirtualArrayList(), Functions.identity()); + return new CustomReplaySubject( + new CustomReplaySubjectSubscribeFunc(state), state, + Functions.identity()); + } + /** + * Create a bounded replay subject with the given maximum buffer size. + * @param maxSize the maximum size in number of onNext notifications + * @return + */ + public static CustomReplaySubject create(int maxSize) { + ReplayState state = new ReplayState(new VirtualBoundedList(maxSize), Functions.identity()); + return new CustomReplaySubject( + new CustomReplaySubjectSubscribeFunc(state), state, + Functions.identity()); + } + /** The replay state. */ + protected final ReplayState state; + /** The result selector. */ + protected final Func1 intermediateSelector; + + private CustomReplaySubject( + Observable.OnSubscribeFunc onSubscribe, + ReplayState state, + Func1 intermediateSelector) { + super(onSubscribe); + this.state = state; + this.intermediateSelector = intermediateSelector; + } + + + @Override + public void onCompleted() { + state.lock(); + try { + if (state.done) { + return; + } + state.done = true; + state.onCompletedAdded.call(); + replayValues(); + } finally { + state.unlock(); + } + } + + @Override + public void onError(Throwable e) { + state.lock(); + try { + if (state.done) { + return; + } + state.done = true; + state.error = e; + state.onErrorAdded.call(); + replayValues(); + } finally { + state.unlock(); + } + } + + @Override + public void onNext(TInput args) { + state.lock(); + try { + if (state.done) { + return; + } + state.add(intermediateSelector.call(args)); + state.onValueAdded.call(); + replayValues(); + } finally { + state.unlock(); + } + } + /** + * Replay values up to the current index. + */ + protected void replayValues() { + int s = state.values.start() + state.values.size(); + for (ReplayState.Replayer rp : state.replayers()) { + rp.replayTill(s); + } + } + } + /** + * The subscription function. + * @param the type of the elements in the replay buffer + * @param the value type of the observers subscribing to this subject + */ + protected static final class CustomReplaySubjectSubscribeFunc + implements Observable.OnSubscribeFunc { + + private final ReplayState state; + protected CustomReplaySubjectSubscribeFunc(ReplayState state) { + this.state = state; + } + + @Override + public Subscription onSubscribe(Observer t1) { + VirtualList values; + Throwable error; + state.lock(); + try { + if (!state.done) { + state.onSubscription.call(); + return state.addReplayer(t1); + } + values = state.values; + error = state.error; + } finally { + state.unlock(); + } + // fully replay the subject + for (int i = values.start(); i < values.end(); i++) { + try { + t1.onNext(state.resultSelector.call(values.get(i))); + } catch (Throwable t) { + t1.onError(t); + return Subscriptions.empty(); + } + } + if (error != null) { + t1.onError(error); + } else { + t1.onCompleted(); + } + return Subscriptions.empty(); + } + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationReplayTest.java b/rxjava-core/src/test/java/rx/operators/OperationReplayTest.java new file mode 100644 index 00000000000..56f540d4660 --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperationReplayTest.java @@ -0,0 +1,443 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.InOrder; +import static org.mockito.Mockito.*; +import rx.Observable; +import rx.Observer; +import rx.observables.ConnectableObservable; +import rx.operators.OperationReplay.VirtualBoundedList; +import rx.schedulers.TestScheduler; +import rx.subjects.PublishSubject; +import rx.util.functions.Func1; + +public class OperationReplayTest { + @Test + public void testBoundedList() { + VirtualBoundedList list = new VirtualBoundedList(3); + + list.add(1); // idx: 0 + list.add(2); // idx: 1 + list.add(3); // idx: 2 + + Assert.assertEquals(3, list.size()); + + list.add(4); // idx: 3 + + Assert.assertEquals(3, list.size()); + Assert.assertEquals(Arrays.asList(2, 3, 4), list.toList()); + + Assert.assertEquals(1, list.start()); + Assert.assertEquals(4, list.end()); + + list.removeBefore(3); + + Assert.assertEquals(1, list.size()); + + Assert.assertEquals(Arrays.asList(4), list.toList()); + + Assert.assertEquals(3, list.start()); + Assert.assertEquals(4, list.end()); + } + @Test(expected = ArrayIndexOutOfBoundsException.class) + public void testReadBefore() { + VirtualBoundedList list = new VirtualBoundedList(3); + + list.add(1); // idx: 0 + list.add(2); // idx: 1 + list.add(3); // idx: 2 + list.add(4); // idx: 3 + + list.get(0); + } + @Test(expected = ArrayIndexOutOfBoundsException.class) + public void testReadAfter() { + VirtualBoundedList list = new VirtualBoundedList(3); + + list.add(1); // idx: 0 + list.add(2); // idx: 1 + list.add(3); // idx: 2 + list.add(4); // idx: 3 + + list.get(4); + } + @Test + public void testBufferedReplay() { + PublishSubject source = PublishSubject.create(); + + ConnectableObservable co = source.replay(3); + co.connect(); + + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + + source.onNext(1); + source.onNext(2); + source.onNext(3); + + inOrder.verify(observer1, times(1)).onNext(1); + inOrder.verify(observer1, times(1)).onNext(2); + inOrder.verify(observer1, times(1)).onNext(3); + + source.onNext(4); + source.onCompleted(); + inOrder.verify(observer1, times(1)).onNext(4); + inOrder.verify(observer1, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onError(any(Throwable.class)); + + } + + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + + inOrder.verify(observer1, times(1)).onNext(2); + inOrder.verify(observer1, times(1)).onNext(3); + inOrder.verify(observer1, times(1)).onNext(4); + inOrder.verify(observer1, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onError(any(Throwable.class)); + } + } + @Test + public void testWindowedReplay() { + TestScheduler scheduler = new TestScheduler(); + + PublishSubject source = PublishSubject.create(); + + ConnectableObservable co = source.replay(100, TimeUnit.MILLISECONDS, scheduler); + co.connect(); + + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + + source.onNext(1); + scheduler.advanceTimeBy(60, TimeUnit.MILLISECONDS); + source.onNext(2); + scheduler.advanceTimeBy(60, TimeUnit.MILLISECONDS); + source.onNext(3); + scheduler.advanceTimeBy(60, TimeUnit.MILLISECONDS); + source.onCompleted(); + scheduler.advanceTimeBy(60, TimeUnit.MILLISECONDS); + + inOrder.verify(observer1, times(1)).onNext(1); + inOrder.verify(observer1, times(1)).onNext(2); + inOrder.verify(observer1, times(1)).onNext(3); + + inOrder.verify(observer1, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onError(any(Throwable.class)); + + } + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + inOrder.verify(observer1, times(1)).onNext(3); + + inOrder.verify(observer1, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onError(any(Throwable.class)); + } + } + @Test + public void testReplaySelector() { + final Func1 dbl = new Func1() { + + @Override + public Integer call(Integer t1) { + return t1 * 2; + } + + }; + + Func1, Observable> selector = new Func1, Observable>() { + + @Override + public Observable call(Observable t1) { + return t1.map(dbl); + } + + }; + + PublishSubject source = PublishSubject.create(); + + Observable co = source.replay(selector); + + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + + source.onNext(1); + source.onNext(2); + source.onNext(3); + + inOrder.verify(observer1, times(1)).onNext(2); + inOrder.verify(observer1, times(1)).onNext(4); + inOrder.verify(observer1, times(1)).onNext(6); + + source.onNext(4); + source.onCompleted(); + inOrder.verify(observer1, times(1)).onNext(8); + inOrder.verify(observer1, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onError(any(Throwable.class)); + + } + + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + + inOrder.verify(observer1, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onError(any(Throwable.class)); + + } + + } + + @Test + public void testBufferedReplaySelector() { + + final Func1 dbl = new Func1() { + + @Override + public Integer call(Integer t1) { + return t1 * 2; + } + + }; + + Func1, Observable> selector = new Func1, Observable>() { + + @Override + public Observable call(Observable t1) { + return t1.map(dbl); + } + + }; + + PublishSubject source = PublishSubject.create(); + + Observable co = source.replay(selector, 3); + + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + + source.onNext(1); + source.onNext(2); + source.onNext(3); + + inOrder.verify(observer1, times(1)).onNext(2); + inOrder.verify(observer1, times(1)).onNext(4); + inOrder.verify(observer1, times(1)).onNext(6); + + source.onNext(4); + source.onCompleted(); + inOrder.verify(observer1, times(1)).onNext(8); + inOrder.verify(observer1, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onError(any(Throwable.class)); + + } + + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + + inOrder.verify(observer1, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onError(any(Throwable.class)); + } + } + @Test + public void testWindowedReplaySelector() { + + final Func1 dbl = new Func1() { + + @Override + public Integer call(Integer t1) { + return t1 * 2; + } + + }; + + Func1, Observable> selector = new Func1, Observable>() { + + @Override + public Observable call(Observable t1) { + return t1.map(dbl); + } + + }; + + TestScheduler scheduler = new TestScheduler(); + + PublishSubject source = PublishSubject.create(); + + Observable co = source.replay(selector, 100, TimeUnit.MILLISECONDS, scheduler); + + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + + source.onNext(1); + scheduler.advanceTimeBy(60, TimeUnit.MILLISECONDS); + source.onNext(2); + scheduler.advanceTimeBy(60, TimeUnit.MILLISECONDS); + source.onNext(3); + scheduler.advanceTimeBy(60, TimeUnit.MILLISECONDS); + source.onCompleted(); + scheduler.advanceTimeBy(60, TimeUnit.MILLISECONDS); + + inOrder.verify(observer1, times(1)).onNext(2); + inOrder.verify(observer1, times(1)).onNext(4); + inOrder.verify(observer1, times(1)).onNext(6); + + inOrder.verify(observer1, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onError(any(Throwable.class)); + + } + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + + inOrder.verify(observer1, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onError(any(Throwable.class)); + } + } + @Test + public void testBufferedReplayError() { + PublishSubject source = PublishSubject.create(); + + ConnectableObservable co = source.replay(3); + co.connect(); + + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + + source.onNext(1); + source.onNext(2); + source.onNext(3); + + inOrder.verify(observer1, times(1)).onNext(1); + inOrder.verify(observer1, times(1)).onNext(2); + inOrder.verify(observer1, times(1)).onNext(3); + + source.onNext(4); + source.onError(new RuntimeException("Forced failure")); + + inOrder.verify(observer1, times(1)).onNext(4); + inOrder.verify(observer1, times(1)).onError(any(RuntimeException.class)); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onCompleted(); + + } + + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + + inOrder.verify(observer1, times(1)).onNext(2); + inOrder.verify(observer1, times(1)).onNext(3); + inOrder.verify(observer1, times(1)).onNext(4); + inOrder.verify(observer1, times(1)).onError(any(RuntimeException.class)); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onCompleted(); + } + } + @Test + public void testWindowedReplayError() { + TestScheduler scheduler = new TestScheduler(); + + PublishSubject source = PublishSubject.create(); + + ConnectableObservable co = source.replay(100, TimeUnit.MILLISECONDS, scheduler); + co.connect(); + + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + + source.onNext(1); + scheduler.advanceTimeBy(60, TimeUnit.MILLISECONDS); + source.onNext(2); + scheduler.advanceTimeBy(60, TimeUnit.MILLISECONDS); + source.onNext(3); + scheduler.advanceTimeBy(60, TimeUnit.MILLISECONDS); + source.onError(new RuntimeException("Forced failure")); + scheduler.advanceTimeBy(60, TimeUnit.MILLISECONDS); + + inOrder.verify(observer1, times(1)).onNext(1); + inOrder.verify(observer1, times(1)).onNext(2); + inOrder.verify(observer1, times(1)).onNext(3); + + inOrder.verify(observer1, times(1)).onError(any(RuntimeException.class)); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onCompleted(); + + } + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + inOrder.verify(observer1, times(1)).onNext(3); + + inOrder.verify(observer1, times(1)).onError(any(RuntimeException.class)); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onCompleted(); + } + } +}