From a0ebbc77c22db31e7bada84dd6983be8ab3697ea Mon Sep 17 00:00:00 2001 From: David Gross Date: Sun, 9 Mar 2014 13:48:55 -0700 Subject: [PATCH] fixing a variety of errors in javadoc generation (syntax errors, misnamed params, etc.) --- rxjava-core/src/main/java/rx/Observable.java | 52 ++- rxjava-core/src/main/java/rx/Observer.java | 18 +- rxjava-core/src/main/java/rx/Scheduler.java | 53 +-- .../rx/observers/SynchronizedSubscriber.java | 2 +- .../java/rx/operators/OperationBuffer.java | 250 +++++++------- .../java/rx/operators/OperationWindow.java | 315 ++++++++++-------- .../main/java/rx/operators/OperatorScan.java | 40 +-- .../main/java/rx/operators/SafeObserver.java | 9 +- .../rx/plugins/RxJavaDefaultSchedulers.java | 13 +- .../RxJavaObservableExecutionHook.java | 67 ++-- .../java/rx/schedulers/ExecutorScheduler.java | 8 +- .../main/java/rx/schedulers/Schedulers.java | 17 +- .../java/rx/subjects/BehaviorSubject.java | 20 +- 13 files changed, 453 insertions(+), 411 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 42da02f62b..5bfb5ad278 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -259,7 +259,7 @@ public static interface OnSubscribeFunc extends Function { * observable.map(...).filter(...).take(5).lift(new ObserverA()).lift(new ObserverB(...)).subscribe() * } * - * @param bind + * @param lift * @return an Observable that emits values that are the result of applying the bind function to the values * of the current Observable */ @@ -1552,7 +1552,7 @@ public final static Observable from(T t1, T t2, T t3, T t4, T t5, T t6, T *

* * - * @param items + * @param t1 * the source Array * @param * the type of items in the Array and the type of items to be emitted by the resulting Observable @@ -1680,7 +1680,7 @@ public final static Observable just(T value, Scheduler scheduler) { * if the source is empty * @see RxJava Wiki: max() * @see MSDN: Observable.Max - * @deprecated Use rxjava-math module instead + * @deprecated use rxjava-math module instead */ public final static > Observable max(Observable source) { return OperationMinMax.max(source); @@ -2680,7 +2680,7 @@ public final static Observable switchDo(ObservableRxJava Wiki: switchOnNext() - * @see {@link #switchOnNext(Observable)} + * @see #switchOnNext(Observable) */ public final static Observable switchLatest(Observable> sequenceOfSequences) { return create(OperationSwitch.switchDo(sequenceOfSequences)); @@ -4308,7 +4308,7 @@ public final void onNext(T args) { *

* * - * @param observer + * @param onNotification * the action to invoke for each item emitted by the source Observable * @return the source Observable with the side-effecting behavior applied * @see RxJava Wiki: doOnEach() @@ -4503,7 +4503,7 @@ public final Observable elementAtOrDefault(int index, T defaultValue) { * @return an Observable that emits a Boolean that indicates whether any item emitted by the source * Observable satisfies the {@code predicate} * @see RxJava Wiki: exists() - * @see MSDN: Observable.Any Note: the description in this page was wrong at the time of this writing. + * @see MSDN: Observable.Any (Note: the description in this page was wrong at the time of this writing) */ public final Observable exists(Func1 predicate) { return create(OperationAny.exists(this, predicate)); @@ -4551,7 +4551,7 @@ public final Observable finallyDo(Action0 action) { * @return an Observable that emits only the very first item emitted by the source Observable, or raises an * {@code IllegalArgumentException} if the source Observable is empty * @see RxJava Wiki: first() - * @see MSDN: {@code Observable.firstAsync()} + * @see "MSDN: Observable.firstAsync()" */ public final Observable first() { return take(1).single(); @@ -4568,7 +4568,7 @@ public final Observable first() { * @return an Observable that emits only the very first item emitted by the source Observable that satisfies * the {@code predicate}, or raises an {@code IllegalArgumentException} if no such items are emitted * @see RxJava Wiki: first() - * @see MSDN: {@code Observable.firstAsync()} + * @see "MSDN: Observable.firstAsync()" */ public final Observable first(Func1 predicate) { return takeFirst(predicate).single(); @@ -4585,7 +4585,7 @@ public final Observable first(Func1 predicate) { * @return an Observable that emits only the very first item from the source, or a default item if the * source Observable completes without emitting any items * @see RxJava Wiki: firstOrDefault() - * @see MSDN: {@code Observable.firstOrDefaultAsync()} + * @see "MSDN: Observable.firstOrDefaultAsync()" */ public final Observable firstOrDefault(T defaultValue) { return take(1).singleOrDefault(defaultValue); @@ -4605,7 +4605,7 @@ public final Observable firstOrDefault(T defaultValue) { * @return an Observable that emits only the very first item emitted by the source Observable that satisfies * the {@code predicate}, or a default item if the source Observable emits no such items * @see RxJava Wiki: firstOrDefault() - * @see MSDN: {@code Observable.firstOrDefaultAsync()} + * @see "MSDN: Observable.firstOrDefaultAsync()" */ public final Observable firstOrDefault(T defaultValue, Func1 predicate) { return takeFirst(predicate).singleOrDefault(defaultValue); @@ -4784,7 +4784,7 @@ public final Observable join(Obser * @return an Observable that emits the last item from the source Observable or notifies observers of an * error * @see RxJava Wiki: last() - * @see MSDN: {@code Observable.lastAsync()} + * @see "MSDN: Observable.lastAsync()" */ public final Observable last() { return takeLast(1).single(); @@ -4803,7 +4803,7 @@ public final Observable last() { * @throws IllegalArgumentException * if no items that match the predicate are emitted by the source Observable * @see RxJava Wiki: last() - * @see MSDN: {@code Observable.lastAsync()} + * @see "MSDN: Observable.lastAsync()" */ public final Observable last(Func1 predicate) { return filter(predicate).takeLast(1).single(); @@ -4820,7 +4820,7 @@ public final Observable last(Func1 predicate) { * @return an Observable that emits only the last item emitted by the source Observable, or a default item * if the source Observable is empty * @see RxJava Wiki: lastOrDefault() - * @see MSDN: {@code Observable.lastOrDefaultAsync()} + * @see "MSDN: Observable.lastOrDefaultAsync()" */ public final Observable lastOrDefault(T defaultValue) { return takeLast(1).singleOrDefault(defaultValue); @@ -4840,7 +4840,7 @@ public final Observable lastOrDefault(T defaultValue) { * @return an Observable that emits only the last item emitted by the source Observable that satisfies the * given condition, or a default item if no such item is emitted by the source Observable * @see RxJava Wiki: lastOrDefault() - * @see MSDN: {@code Observable.lastOrDefaultAsync()} + * @see "MSDN: Observable.lastOrDefaultAsync()" */ public final Observable lastOrDefault(T defaultValue, Func1 predicate) { return filter(predicate).takeLast(1).singleOrDefault(defaultValue); @@ -5355,7 +5355,7 @@ public final Observable onExceptionResumeNext(final Observable r * @param f * a {@link Func1} that applies Observable Observers to {@code Observable} in parallel and * returns an {@code Observable} - * @return an Observable that emits the results of applying {@link f} to the items emitted by the source + * @return an Observable that emits the results of applying {@code f} to the items emitted by the source * Observable * @see RxJava Wiki: parallel() */ @@ -5376,7 +5376,7 @@ public final Observable parallel(Func1, Observable> f) { * returns an {@code Observable} * @param s * a {@link Scheduler} to perform the work on - * @return an Observable that emits the results of applying {@link f} to the items emitted by the source + * @return an Observable that emits the results of applying {@code f} to the items emitted by the source * Observable * @see RxJava Wiki: parallel() */ @@ -5450,7 +5450,7 @@ public final Subject call() { * @param initialValue * the initial value of the underlying {@link BehaviorSubject} * @return an Observable that emits {@code initialValue} followed by the results of invoking the selector - * on a {@ConnectableObservable} that shares a single subscription to the underlying Observable + * on a {@link ConnectableObservable} that shares a single subscription to the underlying Observable */ public final Observable publish(Func1, ? extends Observable> selector, final T initialValue) { return multicast(new Func0>() { @@ -6208,7 +6208,7 @@ public final Observable scan(R initialValue, Func2 accum * @throws IllegalArgumentException * if the source emits more than one item or no items * @see RxJava Wiki: single() - * @see MSDN: {@code Observable.singleAsync()} + * @see "MSDN: Observable.singleAsync()" */ public final Observable single() { return create(OperationSingle. single(this)); @@ -6229,7 +6229,7 @@ public final Observable single() { * if the source Observable emits either more than one item that matches the predicate or no * items that match the predicate * @see RxJava Wiki: single() - * @see MSDN: {@code Observable.singleAsync()} + * @see "MSDN: Observable.singleAsync()" */ public final Observable single(Func1 predicate) { return filter(predicate).single(); @@ -6249,7 +6249,7 @@ public final Observable single(Func1 predicate) { * @throws IllegalArgumentException * if the source Observable emits more than one item * @see RxJava Wiki: single() - * @see MSDN: {@code Observable.singleOrDefaultAsync()} + * @see "MSDN: Observable.singleOrDefaultAsync()" */ public final Observable singleOrDefault(T defaultValue) { return create(OperationSingle. singleOrDefault(this, defaultValue)); @@ -6272,7 +6272,7 @@ public final Observable singleOrDefault(T defaultValue) { * @throws IllegalArgumentException * if the source Observable emits more than one item that matches the predicate * @see RxJava Wiki: single() - * @see MSDN: {@code Observable.singleOrDefaultAsync()} + * @see "MSDN: Observable.singleOrDefaultAsync()" */ public final Observable singleOrDefault(T defaultValue, Func1 predicate) { return filter(predicate).singleOrDefault(defaultValue); @@ -7140,8 +7140,7 @@ public final Subscription subscribe(Subscriber observer, Scheduler sc } /** - * Asynchronously subscribes Observers to this Observable on the specified - * {@link Scheduler}. + * Asynchronously subscribes Observers to this Observable on the specified {@link Scheduler}. *

* * @@ -7150,7 +7149,6 @@ public final Subscription subscribe(Subscriber observer, Scheduler sc * @return the source Observable modified so that its subscriptions happen on the * specified {@link Scheduler} * @see RxJava Wiki: subscribeOn() - * @see #subscribeOn(rx.Scheduler, int) */ public final Observable subscribeOn(Scheduler scheduler) { return nest().lift(new OperatorSubscribeOn(scheduler)); @@ -7354,7 +7352,7 @@ public final Observable take(long time, TimeUnit unit, Scheduler scheduler) { * @return an Observable that emits only the very first item emitted by the source Observable, or an empty * Observable if the source Observable completes without emitting a single item * @see RxJava Wiki: first() - * @see MSDN: {@code Observable.firstAsync()} + * @see "MSDN: Observable.firstAsync()" * @deprecated use {@code take(1)} directly */ @Deprecated @@ -7374,7 +7372,7 @@ public final Observable takeFirst() { * the given condition, or that completes without emitting anything if the source Observable * completes without emitting a single condition-satisfying item * @see RxJava Wiki: first() - * @see MSDN: {@code Observable.firstAsync()} + * @see "MSDN: Observable.firstAsync()" */ public final Observable takeFirst(Func1 predicate) { return filter(predicate).take(1); @@ -7638,7 +7636,7 @@ public final Observable takeWhileWithIndex(final Func2 - * After an Observer calls an {@link Observable}'s Observable.subscribe method, the {@link Observable} calls the - * Observer's onNext method to provide notifications. A well-behaved {@link Observable} will call an Observer's - * onCompleted closure exactly once or the Observer's onError closure exactly once. + * After an Observer calls an {@link Observable}'s Observable.subscribe method, the + * {@link Observable} calls the Observer's onNext method to provide notifications. A well-behaved + * {@link Observable} will call an Observer's onCompleted closure exactly once or the Observer's + * onError closure exactly once. *

* For more information see the RxJava Wiki * @@ -38,7 +39,8 @@ public interface Observer { /** * Notifies the Observer that the {@link Observable} has experienced an error condition. *

- * If the {@link Observable} calls this closure, it will not thereafter call onNext or onCompleted. + * If the {@link Observable} calls this closure, it will not thereafter call onNext or + * onCompleted. * * @param e */ @@ -47,11 +49,13 @@ public interface Observer { /** * Provides the Observer with new data. *

- * The {@link Observable} calls this closure 1 or more times, unless it calls onError in which case this closure may never be called. + * The {@link Observable} calls this closure 1 or more times, unless it calls onError in which + * case this closure may never be called. *

- * The {@link Observable} will not call this closure again after it calls either onCompleted or onError. + * The {@link Observable} will not call this closure again after it calls either onCompleted or + * onError. * - * @param args + * @param t */ public abstract void onNext(T t); diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java index 21d4791f44..8203ec567b 100644 --- a/rxjava-core/src/main/java/rx/Scheduler.java +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -31,11 +31,15 @@ * Why is this an abstract class instead of an interface? *

*

    - *
  1. Java doesn't support extension methods and there are many overload methods needing default implementations.
  2. - *
  3. Virtual extension methods aren't available until Java8 which RxJava will not set as a minimum target for a long time.
  4. - *
  5. If only an interface were used Scheduler implementations would then need to extend from an AbstractScheduler pair that gives all of the functionality unless they intend on copy/pasting the - * functionality.
  6. - *
  7. Without virtual extension methods even additive changes are breaking and thus severely impede library maintenance.
  8. + *
  9. Java doesn't support extension methods and there are many overload methods needing default + * implementations.
  10. + *
  11. Virtual extension methods aren't available until Java8 which RxJava will not set as a minimum target for + * a long time.
  12. + *
  13. If only an interface were used Scheduler implementations would then need to extend from an + * AbstractScheduler pair that gives all of the functionality unless they intend on copy/pasting the + * functionality.
  14. + *
  15. Without virtual extension methods even additive changes are breaking and thus severely impede library + * maintenance.
  16. *
*/ public abstract class Scheduler { @@ -44,38 +48,40 @@ public abstract class Scheduler { * Schedules an Action on a new Scheduler instance (typically another thread) for execution. * * @param action - * Action to schedule. - * @return a subscription to be able to unsubscribe from action. + * Action to schedule + * @return a subscription to be able to unsubscribe from action */ public abstract Subscription schedule(Action1 action); /** - * Schedules an Action on a new Scheduler instance (typically another thread) for execution at some point in the future. + * Schedules an Action on a new Scheduler instance (typically another thread) for execution at some point + * in the future. * * @param action + * the Action to schedule * @param delayTime + * time to wait before executing the action * @param unit - * @return + * the time unit the delay time is given in + * @return a subscription to be able to unsubscribe from action */ public abstract Subscription schedule(final Action1 action, final long delayTime, final TimeUnit unit); /** - * Schedules a cancelable action to be executed periodically. - * This default implementation schedules recursively and waits for actions to complete (instead of potentially executing - * long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this. + * Schedules a cancelable action to be executed periodically. This default implementation schedules + * recursively and waits for actions to complete (instead of potentially executing long-running actions + * concurrently). Each scheduler that can do periodic scheduling in a better way should override this. * - * @param state - * State to pass into the action. * @param action - * The action to execute periodically. + * the Action to execute periodically * @param initialDelay - * Time to wait before executing the action for the first time. + * time to wait before executing the action for the first time * @param period - * The time interval to wait each time in between executing the action. + * the time interval to wait each time in between executing the action * @param unit - * The time unit the interval above is given in. - * @return A subscription to be able to unsubscribe from action. + * the time unit the interval above is given in + * @return a subscription to be able to unsubscribe from action */ public Subscription schedulePeriodically(final Action1 action, long initialDelay, long period, TimeUnit unit) { final long periodInNanos = unit.toNanos(period); @@ -151,9 +157,9 @@ public abstract static class Inner implements Subscription { * Schedules an action to be executed in delayTime. * * @param delayTime - * Time the action is to be delayed before executing. + * time the action is to be delayed before executing * @param unit - * Time unit of the delay time. + * time unit of the delay time */ public abstract void schedule(Action1 action, long delayTime, TimeUnit unit); @@ -174,9 +180,10 @@ public long now() { /** * Parallelism available to a Scheduler. *

- * This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster. + * This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases + * such as scheduling work on a computer cluster. * - * @return the scheduler's available degree of parallelism. + * @return the scheduler's available degree of parallelism */ public int degreeOfParallelism() { return Runtime.getRuntime().availableProcessors(); diff --git a/rxjava-core/src/main/java/rx/observers/SynchronizedSubscriber.java b/rxjava-core/src/main/java/rx/observers/SynchronizedSubscriber.java index 8c01f506e1..8dbf1b20f5 100644 --- a/rxjava-core/src/main/java/rx/observers/SynchronizedSubscriber.java +++ b/rxjava-core/src/main/java/rx/observers/SynchronizedSubscriber.java @@ -42,7 +42,7 @@ public SynchronizedSubscriber(Subscriber subscriber, Object lock) { /** * Used when synchronizing an Subscriber without access to the subscription. * - * @param Observer + * @param subscriber */ public SynchronizedSubscriber(Subscriber subscriber) { this(subscriber, new Object()); diff --git a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java index b3dd33400b..d4181504ba 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java +++ b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java @@ -42,24 +42,25 @@ public Buffer call() { } /** - *

This method creates a {@link Func1} object which represents the buffer operation. This operation takes - * values from the specified {@link Observable} source and stores them in a buffer until the {@link Observable} constructed using the {@link Func0} argument, produces a - * value. The buffer is then - * emitted, and a new buffer is created to replace it. A new {@link Observable} will be constructed using the - * provided {@link Func0} object, which will determine when this new buffer is emitted. When the source {@link Observable} completes or produces an error, the current buffer is emitted, and the - * event is propagated - * to all subscribed {@link Observer}s.

- * - *

Note that this operation only produces non-overlapping chunks. At all times there is - * exactly one buffer actively storing values.

+ * This method creates a {@link Func1} object which represents the buffer operation. This operation takes + * values from the specified {@link Observable} source and stores them in a buffer until the + * {@link Observable} constructed using the {@link Func0} argument, produces a value. The buffer is then + * emitted, and a new buffer is created to replace it. A new {@link Observable} will be constructed using + * the provided {@link Func0} object, which will determine when this new buffer is emitted. When the source + * {@link Observable} completes or produces an error, the current buffer is emitted, and the event is + * propagated to all subscribed {@link Observer}s. + *

+ * Note that this operation only produces non-overlapping chunks. At all times there is + * exactly one buffer actively storing values. + *

* * @param source - * The {@link Observable} which produces values. + * the {@link Observable} which produces values * @param bufferClosingSelector - * A {@link Func0} object which produces {@link Observable}s. These {@link Observable}s determine when a buffer is emitted and replaced by simply - * producing an object. + * a {@link Func0} object which produces {@link Observable}s. These {@link Observable}s determine + * when a buffer is emitted and replaced by simply producing an object. * @return - * the {@link Func1} object representing the specified buffer operation. + * the {@link Func1} object representing the specified buffer operation */ public static OnSubscribeFunc> buffer(final Observable source, final Func0> bufferClosingSelector) { return new OnSubscribeFunc>() { @@ -76,29 +77,31 @@ public Subscription onSubscribe(Observer> observer) { } /** - *

This method creates a {@link Func1} object which represents the buffer operation. This operation takes - * values from the specified {@link Observable} source and stores them in the currently active chunks. Initially - * there are no chunks active.

- * - *

Chunks can be created by pushing a {@link rx.util.Opening} value to the "bufferOpenings" {@link Observable}. - * This creates a new buffer which will then start recording values which are produced by the "source" {@link Observable}. Additionally the "bufferClosingSelector" will be used to construct an - * {@link Observable} which can produce values. When it does so it will close this (and only this) newly created - * buffer. When the source {@link Observable} completes or produces an error, all chunks are emitted, and the - * event is propagated to all subscribed {@link Observer}s.

- * - *

Note that when using this operation multiple overlapping chunks - * could be active at any one point.

+ * This method creates a {@link Func1} object which represents the buffer operation. This operation takes + * values from the specified {@link Observable} source and stores them in the currently active chunks. + * Initially there are no chunks active. + *

+ * Chunks can be created by pushing a {@link rx.util.TOpening} value to the "bufferOpenings" + * {@link Observable}. This creates a new buffer which will then start recording values which are produced + * by the "source" {@link Observable}. Additionally the "bufferClosingSelector" will be used to construct an + * {@link Observable} which can produce values. When it does so it will close this (and only this) newly + * created buffer. When the source {@link Observable} completes or produces an error, all chunks are + * emitted, and the event is propagated to all subscribed {@link Observer}s. + *

+ * Note that when using this operation multiple overlapping chunks could be active at any + * one point. + *

* * @param source - * The {@link Observable} which produces values. + * the {@link Observable} which produces values * @param bufferOpenings - * An {@link Observable} which when it produces a {@link rx.util.Opening} value will - * create a new buffer which instantly starts recording the "source" {@link Observable}. + * an {@link Observable} which when it produces a {@link rx.util.TOpening} value will create a + * new buffer which instantly starts recording the "source" {@link Observable} * @param bufferClosingSelector - * A {@link Func0} object which produces {@link Observable}s. These {@link Observable}s determine when a buffer is emitted and replaced by simply - * producing an object. + * a {@link Func0} object which produces {@link Observable}s. These {@link Observable}s determine + * when a buffer is emitted and replaced by simply producing an object. * @return - * the {@link Func1} object representing the specified buffer operation. + * the {@link Func1} object representing the specified buffer operation */ public static OnSubscribeFunc> buffer(final Observable source, final Observable bufferOpenings, final Func1> bufferClosingSelector) { return new OnSubscribeFunc>() { @@ -114,48 +117,50 @@ public Subscription onSubscribe(final Observer> observer) { } /** - *

This method creates a {@link Func1} object which represents the buffer operation. This operation takes + * This method creates a {@link Func1} object which represents the buffer operation. This operation takes * values from the specified {@link Observable} source and stores them in a buffer until the buffer contains * a specified number of elements. The buffer is then emitted, and a new buffer is created to replace it. * When the source {@link Observable} completes or produces an error, the current buffer is emitted, and - * the event is propagated to all subscribed {@link Observer}s.

- * - *

Note that this operation only produces non-overlapping chunks. At all times there is - * exactly one buffer actively storing values.

+ * the event is propagated to all subscribed {@link Observer}s. + *

+ * Note that this operation only produces non-overlapping chunks. At all times there is + * exactly one buffer actively storing values. + *

* * @param source - * The {@link Observable} which produces values. + * the {@link Observable} which produces values * @param count - * The number of elements a buffer should have before being emitted and replaced. + * the number of elements a buffer should have before being emitted and replaced * @return - * the {@link Func1} object representing the specified buffer operation. + * the {@link Func1} object representing the specified buffer operation */ public static OnSubscribeFunc> buffer(Observable source, int count) { return buffer(source, count, count); } /** - *

This method creates a {@link Func1} object which represents the buffer operation. This operation takes + * This method creates a {@link Func1} object which represents the buffer operation. This operation takes * values from the specified {@link Observable} source and stores them in all active chunks until the buffer * contains a specified number of elements. The buffer is then emitted. Chunks are created after a certain - * amount of values have been received. When the source {@link Observable} completes or produces an error, the - * currently active chunks are emitted, and the event is propagated to all subscribed {@link Observer}s.

- * - *

Note that this operation can produce non-connected, connected non-overlapping, or overlapping - * chunks depending on the input parameters.

+ * amount of values have been received. When the source {@link Observable} completes or produces an error, + * the currently active chunks are emitted, and the event is propagated to all subscribed {@link Observer}s. + *

+ * Note that this operation can produce non-connected, connected non-overlapping, or overlapping + * chunks depending on the input parameters. + *

* * @param source - * The {@link Observable} which produces values. + * the {@link Observable} which produces values * @param count - * The number of elements a buffer should have before being emitted. + * the number of elements a buffer should have before being emitted * @param skip - * The interval with which chunks have to be created. Note that when "skip" == "count" - * that this is the same as calling {@link OperationBuffer#buffer(Observable, int)}. - * If "skip" < "count", this buffer operation will produce overlapping chunks and if "skip" - * > "count" non-overlapping chunks will be created and some values will not be pushed + * the interval with which chunks have to be created. Note that when {@code skip == count} that + * this is the same as calling {@link OperationBuffer#buffer(Observable, int)}. If + * {@code skip < count}, this buffer operation will produce overlapping chunks and if + * {@code skip > count} non-overlapping chunks will be created and some values will not be pushed * into a buffer at all! * @return - * the {@link Func1} object representing the specified buffer operation. + * the {@link Func1} object representing the specified buffer operation */ public static OnSubscribeFunc> buffer(final Observable source, final int count, final int skip) { return new OnSubscribeFunc>() { @@ -171,48 +176,50 @@ public Subscription onSubscribe(final Observer> observer) { } /** - *

This method creates a {@link Func1} object which represents the buffer operation. This operation takes + * This method creates a {@link Func1} object which represents the buffer operation. This operation takes * values from the specified {@link Observable} source and stores them in a buffer. Periodically the buffer * is emitted and replaced with a new buffer. How often this is done depends on the specified timespan. * When the source {@link Observable} completes or produces an error, the current buffer is emitted, and - * the event is propagated to all subscribed {@link Observer}s.

- * - *

Note that this operation only produces non-overlapping chunks. At all times there is - * exactly one buffer actively storing values.

+ * the event is propagated to all subscribed {@link Observer}s. + *

+ * Note that this operation only produces non-overlapping chunks. At all times there is + * exactly one buffer actively storing values. + *

* * @param source - * The {@link Observable} which produces values. + * the {@link Observable} which produces values * @param timespan - * The amount of time all chunks must be actively collect values before being emitted. + * the amount of time all chunks must be actively collect values before being emitted * @param unit - * The {@link TimeUnit} defining the unit of time for the timespan. + * the {@link TimeUnit} defining the unit of time for the timespan * @return - * the {@link Func1} object representing the specified buffer operation. + * the {@link Func1} object representing the specified buffer operation */ public static OnSubscribeFunc> buffer(Observable source, long timespan, TimeUnit unit) { return buffer(source, timespan, unit, Schedulers.threadPoolForComputation()); } /** - *

This method creates a {@link Func1} object which represents the buffer operation. This operation takes + * This method creates a {@link Func1} object which represents the buffer operation. This operation takes * values from the specified {@link Observable} source and stores them in a buffer. Periodically the buffer * is emitted and replaced with a new buffer. How often this is done depends on the specified timespan. * When the source {@link Observable} completes or produces an error, the current buffer is emitted, and - * the event is propagated to all subscribed {@link Observer}s.

- * - *

Note that this operation only produces non-overlapping chunks. At all times there is - * exactly one buffer actively storing values.

+ * the event is propagated to all subscribed {@link Observer}s. + *

+ * Note that this operation only produces non-overlapping chunks. At all times there is + * exactly one buffer actively storing values. + *

* * @param source - * The {@link Observable} which produces values. + * the {@link Observable} which produces values * @param timespan - * The amount of time all chunks must be actively collect values before being emitted. + * the amount of time all chunks must be actively collect values before being emitted * @param unit - * The {@link TimeUnit} defining the unit of time for the timespan. + * the {@link TimeUnit} defining the unit of time for the timespan * @param scheduler - * The {@link Scheduler} to use for timing chunks. + * the {@link Scheduler} to use for timing chunks * @return - * the {@link Func1} object representing the specified buffer operation. + * the {@link Func1} object representing the specified buffer operation */ public static OnSubscribeFunc> buffer(final Observable source, final long timespan, final TimeUnit unit, final Scheduler scheduler) { return new OnSubscribeFunc>() { @@ -228,54 +235,56 @@ public Subscription onSubscribe(final Observer> observer) { } /** - *

This method creates a {@link Func1} object which represents the buffer operation. This operation takes + * This method creates a {@link Func1} object which represents the buffer operation. This operation takes * values from the specified {@link Observable} source and stores them in a buffer. Periodically the buffer * is emitted and replaced with a new buffer. How often this is done depends on the specified timespan. * Additionally the buffer is automatically emitted once it reaches a specified number of elements. * When the source {@link Observable} completes or produces an error, the current buffer is emitted, and - * the event is propagated to all subscribed {@link Observer}s.

- * - *

Note that this operation only produces non-overlapping chunks. At all times there is - * exactly one buffer actively storing values.

+ * the event is propagated to all subscribed {@link Observer}s. + *

+ * Note that this operation only produces non-overlapping chunks. At all times there is + * exactly one buffer actively storing values. + *

* * @param source - * The {@link Observable} which produces values. + * the {@link Observable} which produces values * @param timespan - * The amount of time all chunks must be actively collect values before being emitted. + * the amount of time all chunks must be actively collect values before being emitted * @param unit - * The {@link TimeUnit} defining the unit of time for the timespan. + * the {@link TimeUnit} defining the unit of time for the timespan * @param count - * The maximum size of the buffer. Once a buffer reaches this size, it is emitted. + * the maximum size of the buffer. Once a buffer reaches this size, it is emitted * @return - * the {@link Func1} object representing the specified buffer operation. + * the {@link Func1} object representing the specified buffer operation */ public static OnSubscribeFunc> buffer(Observable source, long timespan, TimeUnit unit, int count) { return buffer(source, timespan, unit, count, Schedulers.threadPoolForComputation()); } /** - *

This method creates a {@link Func1} object which represents the buffer operation. This operation takes + * This method creates a {@link Func1} object which represents the buffer operation. This operation takes * values from the specified {@link Observable} source and stores them in a buffer. Periodically the buffer * is emitted and replaced with a new buffer. How often this is done depends on the specified timespan. * Additionally the buffer is automatically emitted once it reaches a specified number of elements. * When the source {@link Observable} completes or produces an error, the current buffer is emitted, and - * the event is propagated to all subscribed {@link Observer}s.

- * - *

Note that this operation only produces non-overlapping chunks. At all times there is - * exactly one buffer actively storing values.

+ * the event is propagated to all subscribed {@link Observer}s. + *

+ * Note that this operation only produces non-overlapping chunks. At all times there is + * exactly one buffer actively storing values. + *

* * @param source - * The {@link Observable} which produces values. + * the {@link Observable} which produces values * @param timespan - * The amount of time all chunks must be actively collect values before being emitted. + * the amount of time all chunks must be actively collect values before being emitted * @param unit - * The {@link TimeUnit} defining the unit of time for the timespan. + * the {@link TimeUnit} defining the unit of time for the timespan * @param count - * The maximum size of the buffer. Once a buffer reaches this size, it is emitted. + * the maximum size of the buffer. Once a buffer reaches this size, it is emitted * @param scheduler - * The {@link Scheduler} to use for timing chunks. + * the {@link Scheduler} to use for timing chunks * @return - * the {@link Func1} object representing the specified buffer operation. + * the {@link Func1} object representing the specified buffer operation */ public static OnSubscribeFunc> buffer(final Observable source, final long timespan, final TimeUnit unit, final int count, final Scheduler scheduler) { return new OnSubscribeFunc>() { @@ -292,54 +301,56 @@ public Subscription onSubscribe(final Observer> observer) { } /** - *

This method creates a {@link Func1} object which represents the buffer operation. This operation takes + * This method creates a {@link Func1} object which represents the buffer operation. This operation takes * values from the specified {@link Observable} source and stores them in a buffer. Periodically the buffer * is emitted and replaced with a new buffer. How often this is done depends on the specified timespan. * The creation of chunks is also periodical. How often this is done depends on the specified timeshift. * When the source {@link Observable} completes or produces an error, the current buffer is emitted, and - * the event is propagated to all subscribed {@link Observer}s.

- * - *

Note that this operation can produce non-connected, or overlapping chunks depending - * on the input parameters.

+ * the event is propagated to all subscribed {@link Observer}s. + *

+ * Note that this operation can produce non-connected, or overlapping chunks depending + * on the input parameters. + *

* * @param source - * The {@link Observable} which produces values. + * the {@link Observable} which produces values * @param timespan - * The amount of time all chunks must be actively collect values before being emitted. + * the amount of time all chunks must be actively collect values before being emitted * @param timeshift - * The amount of time between creating chunks. + * the amount of time between creating chunks * @param unit - * The {@link TimeUnit} defining the unit of time for the timespan. + * the {@link TimeUnit} defining the unit of time for the timespan * @return - * the {@link Func1} object representing the specified buffer operation. + * the {@link Func1} object representing the specified buffer operation */ public static OnSubscribeFunc> buffer(Observable source, long timespan, long timeshift, TimeUnit unit) { return buffer(source, timespan, timeshift, unit, Schedulers.threadPoolForComputation()); } /** - *

This method creates a {@link Func1} object which represents the buffer operation. This operation takes + * This method creates a {@link Func1} object which represents the buffer operation. This operation takes * values from the specified {@link Observable} source and stores them in a buffer. Periodically the buffer * is emitted and replaced with a new buffer. How often this is done depends on the specified timespan. * The creation of chunks is also periodical. How often this is done depends on the specified timeshift. * When the source {@link Observable} completes or produces an error, the current buffer is emitted, and - * the event is propagated to all subscribed {@link Observer}s.

- * - *

Note that this operation can produce non-connected, or overlapping chunks depending - * on the input parameters.

+ * the event is propagated to all subscribed {@link Observer}s. + *

+ * Note that this operation can produce non-connected, or overlapping chunks depending + * on the input parameters. + *

* * @param source - * The {@link Observable} which produces values. + * the {@link Observable} which produces values * @param timespan - * The amount of time all chunks must be actively collect values before being emitted. + * the amount of time all chunks must be actively collect values before being emitted * @param timeshift - * The amount of time between creating chunks. + * the amount of time between creating chunks * @param unit - * The {@link TimeUnit} defining the unit of time for the timespan. + * the {@link TimeUnit} defining the unit of time for the timespan * @param scheduler - * The {@link Scheduler} to use for timing chunks. + * the {@link Scheduler} to use for timing chunks * @return - * the {@link Func1} object representing the specified buffer operation. + * the {@link Func1} object representing the specified buffer operation */ public static OnSubscribeFunc> buffer(final Observable source, final long timespan, final long timeshift, final TimeUnit unit, final Scheduler scheduler) { return new OnSubscribeFunc>() { @@ -359,7 +370,7 @@ public Subscription onSubscribe(final Observer> observer) { * This class represents a single buffer: A sequence of recorded values. * * @param - * The type of objects which this {@link Buffer} can hold. + * the type of objects which this {@link Buffer} can hold */ protected static class Buffer extends Chunk> { /** @@ -402,6 +413,10 @@ public boolean isUnsubscribed() { /** * Create a buffer operator with the given observable sequence as the buffer boundary. + * + * @param source + * @param boundary + * @return */ public static OnSubscribeFunc> bufferWithBoundaryObservable(Observable source, Observable boundary) { return new BufferWithObservableBoundary(source, boundary, 16); @@ -410,6 +425,11 @@ public static OnSubscribeFunc> bufferWithBoundaryObservable(Obser /** * Create a buffer operator with the given observable sequence as the buffer boundary and * with the given initial capacity for buffers. + * + * @param source + * @param boundary + * @param initialCapacity + * @return */ public static OnSubscribeFunc> bufferWithBoundaryObservable(Observable source, Observable boundary, int initialCapacity) { if (initialCapacity <= 0) { diff --git a/rxjava-core/src/main/java/rx/operators/OperationWindow.java b/rxjava-core/src/main/java/rx/operators/OperationWindow.java index c0d96f3a79..5b170c56a0 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationWindow.java +++ b/rxjava-core/src/main/java/rx/operators/OperationWindow.java @@ -42,24 +42,26 @@ public Window call() { } /** - *

This method creates a {@link rx.functions.Func1} object which represents the window operation. This operation takes - * values from the specified {@link rx.Observable} source and stores them in a window until the {@link rx.Observable} constructed using the {@link rx.functions.Func0} argument, produces a - * value. The window is then - * emitted, and a new window is created to replace it. A new {@link rx.Observable} will be constructed using the - * provided {@link rx.functions.Func0} object, which will determine when this new window is emitted. When the source {@link rx.Observable} completes or produces an error, the current window - * is emitted, and the event is propagated - * to all subscribed {@link rx.Observer}s.

- * - *

Note that this operation only produces non-overlapping windows. At all times there is - * exactly one window actively storing values.

+ * This method creates a {@link rx.functions.Func1} object which represents the window operation. This + * operation takes values from the specified {@link rx.Observable} source and stores them in a window until + * the {@link rx.Observable} constructed using the {@link rx.functions.Func0} argument, produces a value. + * The window is then emitted, and a new window is created to replace it. A new {@link rx.Observable} will + * be constructed using the provided {@link rx.functions.Func0} object, which will determine when this new + * window is emitted. When the source {@link rx.Observable} completes or produces an error, the current + * window is emitted, and the event is propagated to all subscribed {@link rx.Observer}s. + *

+ * Note that this operation only produces non-overlapping windows. At all times there is + * exactly one window actively storing values. + *

* * @param source - * The {@link rx.Observable} which produces values. + * the {@link rx.Observable} which produces values * @param windowClosingSelector - * A {@link rx.functions.Func0} object which produces {@link rx.Observable}s. These {@link rx.Observable}s determine when a window is emitted and replaced by simply + * a {@link rx.functions.Func0} object that produces {@link rx.Observable}s. These + * {@link rx.Observable}s determine when a window is emitted and replaced by simply * producing an object. * @return - * the {@link rx.functions.Func1} object representing the specified window operation. + * the {@link rx.functions.Func1} object representing the specified window operation */ public static OnSubscribeFunc> window(final Observable source, final Func0> windowClosingSelector) { return new OnSubscribeFunc>() { @@ -74,29 +76,33 @@ public Subscription onSubscribe(final Observer> observer) } /** - *

This method creates a {@link rx.functions.Func1} object which represents the window operation. This operation takes - * values from the specified {@link rx.Observable} source and stores them in the currently active window. Initially - * there are no windows active.

- * - *

Windows can be created by pushing a {@link rx.util.Opening} value to the "windowOpenings" {@link rx.Observable}. - * This creates a new window which will then start recording values which are produced by the "source" {@link rx.Observable}. Additionally the "windowClosingSelector" will be used to construct an - * {@link rx.Observable} which can produce values. When it does so it will close this (and only this) newly created - * window. When the source {@link rx.Observable} completes or produces an error, all windows are emitted, and the - * event is propagated to all subscribed {@link rx.Observer}s.

- * - *

Note that when using this operation multiple overlapping windows - * could be active at any one point.

+ * This method creates a {@link rx.functions.Func1} object which represents the window operation. This + * operation takes values from the specified {@link rx.Observable} source and stores them in the currently + * active window. Initially there are no windows active. + *

+ * Windows can be created by pushing a {@link rx.util.TOpening} value to the {@code windowOpenings} + * {@link rx.Observable}. This creates a new window which will then start recording values which are + * produced by the {@code source} {@link rx.Observable}. Additionally the {@code windowClosingSelector} + * will be used to construct an {@link rx.Observable} which can produce values. When it does so it will + * close this (and only this) newly created window. When the source {@link rx.Observable} completes or + * produces an error, all windows are emitted, and the event is propagated to all subscribed + * {@link rx.Observer}s. + *

+ * Note that when using this operation multiple overlapping windows could be active at any + * one point. + *

* * @param source - * The {@link rx.Observable} which produces values. + * the {@link rx.Observable} which produces values * @param windowOpenings - * An {@link rx.Observable} which when it produces a {@link rx.util.Opening} value will - * create a new window which instantly starts recording the "source" {@link rx.Observable}. + * an {@link rx.Observable} which when it produces a {@link rx.util.TOpening} value will create a + * new window which instantly starts recording the {@code source} {@link rx.Observable} * @param windowClosingSelector - * A {@link rx.functions.Func0} object which produces {@link rx.Observable}s. These {@link rx.Observable}s determine when a window is emitted and replaced by simply - * producing an object. + * a {@link rx.functions.Func0} object that produces {@link rx.Observable}s. These + * {@link rx.Observable}s determine when a window is emitted and replaced by simply producing an + * object. * @return - * the {@link rx.functions.Func1} object representing the specified window operation. + * the {@link rx.functions.Func1} object representing the specified window operation */ public static OnSubscribeFunc> window(final Observable source, final Observable windowOpenings, final Func1> windowClosingSelector) { return new OnSubscribeFunc>() { @@ -110,48 +116,51 @@ public Subscription onSubscribe(final Observer> observer) } /** - *

This method creates a {@link rx.functions.Func1} object which represents the window operation. This operation takes - * values from the specified {@link rx.Observable} source and stores them in a window until the window contains - * a specified number of elements. The window is then emitted, and a new window is created to replace it. - * When the source {@link rx.Observable} completes or produces an error, the current window is emitted, and - * the event is propagated to all subscribed {@link rx.Observer}s.

- * - *

Note that this operation only produces non-overlapping windows. At all times there is - * exactly one window actively storing values.

+ * This method creates a {@link rx.functions.Func1} object which represents the window operation. This + * operation takes values from the specified {@link rx.Observable} source and stores them in a window until + * the window contains a specified number of elements. The window is then emitted, and a new window is + * created to replace it. When the source {@link rx.Observable} completes or produces an error, the current + * window is emitted, and the event is propagated to all subscribed {@link rx.Observer}s. + *

+ * Note that this operation only produces non-overlapping windows. At all times there is + * exactly one window actively storing values. + *

* * @param source - * The {@link rx.Observable} which produces values. + * the {@link rx.Observable} which produces values * @param count - * The number of elements a window should have before being emitted and replaced. + * the number of elements a window should have before being emitted and replaced * @return - * the {@link rx.functions.Func1} object representing the specified window operation. + * the {@link rx.functions.Func1} object representing the specified window operation */ public static OnSubscribeFunc> window(Observable source, int count) { return window(source, count, count); } /** - *

This method creates a {@link rx.functions.Func1} object which represents the window operation. This operation takes - * values from the specified {@link rx.Observable} source and stores them in all active windows until the window - * contains a specified number of elements. The window is then emitted. windows are created after a certain - * amount of values have been received. When the source {@link rx.Observable} completes or produces an error, the - * currently active windows are emitted, and the event is propagated to all subscribed {@link rx.Observer}s.

- * - *

Note that this operation can produce non-connected, connected non-overlapping, or overlapping - * windows depending on the input parameters.

+ * This method creates a {@link rx.functions.Func1} object which represents the window operation. This + * operation takes values from the specified {@link rx.Observable} source and stores them in all active + * windows until the window contains a specified number of elements. The window is then emitted. Windows are + * created after a certain amount of values have been received. When the source {@link rx.Observable} + * completes or produces an error, the currently active windows are emitted, and the event is propagated to + * all subscribed {@link rx.Observer}s. + *

+ * Note that this operation can produce non-connected, connected non-overlapping, or overlapping + * windows depending on the input parameters. + *

* * @param source - * The {@link rx.Observable} which produces values. + * the {@link rx.Observable} which produces values * @param count - * The number of elements a window should have before being emitted. + * the number of elements a window should have before being emitted * @param skip - * The interval with which windows have to be created. Note that when "skip" == "count" - * that this is the same as calling {@link rx.operators.OperationWindow#window(rx.Observable, int)}. - * If "skip" < "count", this window operation will produce overlapping windows and if "skip" - * > "count" non-overlapping windows will be created and some values will not be pushed - * into a window at all! + * the interval with which windows have to be created. Note that when {@code skip == count} that + * this is the same as calling {@link rx.operators.OperationWindow#window(rx.Observable, int)}. + * If {@code skip < count}, this window operation will produce overlapping windows and if + * {@code skip > count} non-overlapping windows will be created and some values will not be + * pushed into a window at all! * @return - * the {@link rx.functions.Func1} object representing the specified window operation. + * the {@link rx.functions.Func1} object representing the specified window operation */ public static OnSubscribeFunc> window(final Observable source, final int count, final int skip) { return new OnSubscribeFunc>() { @@ -165,48 +174,50 @@ public Subscription onSubscribe(final Observer> observer) } /** - *

This method creates a {@link rx.functions.Func1} object which represents the window operation. This operation takes - * values from the specified {@link rx.Observable} source and stores them in a window. Periodically the window - * is emitted and replaced with a new window. How often this is done depends on the specified timespan. - * When the source {@link rx.Observable} completes or produces an error, the current window is emitted, and - * the event is propagated to all subscribed {@link rx.Observer}s.

- * - *

Note that this operation only produces non-overlapping windows. At all times there is - * exactly one window actively storing values.

+ * This method creates a {@link rx.functions.Func1} object which represents the window operation. This + * operation takes values from the specified {@link rx.Observable} source and stores them in a window. + * Periodically the window is emitted and replaced with a new window. How often this is done depends on the + * specified timespan. When the source {@link rx.Observable} completes or produces an error, the current + * window is emitted, and the event is propagated to all subscribed {@link rx.Observer}s. + *

+ * Note that this operation only produces non-overlapping windows. At all times there is + * exactly one window actively storing values. + *

* * @param source - * The {@link rx.Observable} which produces values. + * the {@link rx.Observable} which produces values * @param timespan - * The amount of time all windows must be actively collect values before being emitted. + * the amount of time all windows must be actively collect values before being emitted * @param unit - * The {@link java.util.concurrent.TimeUnit} defining the unit of time for the timespan. + * the {@link java.util.concurrent.TimeUnit} defining the unit of time for the timespan * @return - * the {@link rx.functions.Func1} object representing the specified window operation. + * the {@link rx.functions.Func1} object representing the specified window operation */ public static OnSubscribeFunc> window(Observable source, long timespan, TimeUnit unit) { return window(source, timespan, unit, Schedulers.threadPoolForComputation()); } /** - *

This method creates a {@link rx.functions.Func1} object which represents the window operation. This operation takes - * values from the specified {@link rx.Observable} source and stores them in a window. Periodically the window - * is emitted and replaced with a new window. How often this is done depends on the specified timespan. - * When the source {@link rx.Observable} completes or produces an error, the current window is emitted, and - * the event is propagated to all subscribed {@link rx.Observer}s.

- * - *

Note that this operation only produces non-overlapping windows. At all times there is - * exactly one window actively storing values.

+ * This method creates a {@link rx.functions.Func1} object which represents the window operation. This + * operation takes values from the specified {@link rx.Observable} source and stores them in a window. + * Periodically the window is emitted and replaced with a new window. How often this is done depends on the + * specified timespan. When the source {@link rx.Observable} completes or produces an error, the current + * window is emitted, and the event is propagated to all subscribed {@link rx.Observer}s. + *

+ * Note that this operation only produces non-overlapping windows. At all times there is + * exactly one window actively storing values. + *

* * @param source - * The {@link rx.Observable} which produces values. + * the {@link rx.Observable} which produces values * @param timespan - * The amount of time all windows must be actively collect values before being emitted. + * the amount of time all windows must be actively collect values before being emitted * @param unit - * The {@link java.util.concurrent.TimeUnit} defining the unit of time for the timespan. + * the {@link java.util.concurrent.TimeUnit} defining the unit of time for the timespan * @param scheduler - * The {@link rx.Scheduler} to use for timing windows. + * the {@link rx.Scheduler} to use for timing windows * @return - * the {@link rx.functions.Func1} object representing the specified window operation. + * the {@link rx.functions.Func1} object representing the specified window operation */ public static OnSubscribeFunc> window(final Observable source, final long timespan, final TimeUnit unit, final Scheduler scheduler) { return new OnSubscribeFunc>() { @@ -220,54 +231,56 @@ public Subscription onSubscribe(final Observer> observer) } /** - *

This method creates a {@link rx.functions.Func1} object which represents the window operation. This operation takes - * values from the specified {@link rx.Observable} source and stores them in a window. Periodically the window - * is emitted and replaced with a new window. How often this is done depends on the specified timespan. - * Additionally the window is automatically emitted once it reaches a specified number of elements. - * When the source {@link rx.Observable} completes or produces an error, the current window is emitted, and - * the event is propagated to all subscribed {@link rx.Observer}s.

- * - *

Note that this operation only produces non-overlapping windows. At all times there is - * exactly one window actively storing values.

+ * This method creates a {@link rx.functions.Func1} object which represents the window operation. This + * operation takes values from the specified {@link rx.Observable} source and stores them in a window. + * Periodically the window is emitted and replaced with a new window. How often this is done depends on the + * specified timespan. Additionally the window is automatically emitted once it reaches a specified number + * of elements. When the source {@link rx.Observable} completes or produces an error, the current window is + * emitted, and the event is propagated to all subscribed {@link rx.Observer}s. + *

+ * Note that this operation only produces non-overlapping windows. At all times there is + * exactly one window actively storing values. + *

* * @param source - * The {@link rx.Observable} which produces values. + * the {@link rx.Observable} which produces values * @param timespan - * The amount of time all windows must be actively collect values before being emitted. + * the amount of time all windows must be actively collect values before being emitted * @param unit - * The {@link java.util.concurrent.TimeUnit} defining the unit of time for the timespan. + * the {@link java.util.concurrent.TimeUnit} defining the unit of time for the timespan * @param count - * The maximum size of the window. Once a window reaches this size, it is emitted. + * the maximum size of the window. Once a window reaches this size, it is emitted * @return - * the {@link rx.functions.Func1} object representing the specified window operation. + * the {@link rx.functions.Func1} object representing the specified window operation */ public static OnSubscribeFunc> window(Observable source, long timespan, TimeUnit unit, int count) { return window(source, timespan, unit, count, Schedulers.threadPoolForComputation()); } /** - *

This method creates a {@link rx.functions.Func1} object which represents the window operation. This operation takes - * values from the specified {@link rx.Observable} source and stores them in a window. Periodically the window - * is emitted and replaced with a new window. How often this is done depends on the specified timespan. - * Additionally the window is automatically emitted once it reaches a specified number of elements. - * When the source {@link rx.Observable} completes or produces an error, the current window is emitted, and - * the event is propagated to all subscribed {@link rx.Observer}s.

- * - *

Note that this operation only produces non-overlapping windows. At all times there is - * exactly one window actively storing values.

+ * This method creates a {@link rx.functions.Func1} object which represents the window operation. This + * operation takes values from the specified {@link rx.Observable} source and stores them in a window. + * Periodically the window is emitted and replaced with a new window. How often this is done depends on the + * specified timespan. Additionally the window is automatically emitted once it reaches a specified number + * of elements. When the source {@link rx.Observable} completes or produces an error, the current window is + * emitted, and the event is propagated to all subscribed {@link rx.Observer}s. + *

+ * Note that this operation only produces non-overlapping windows. At all times there is + * exactly one window actively storing values. + *

* * @param source - * The {@link rx.Observable} which produces values. + * the {@link rx.Observable} which produces values * @param timespan - * The amount of time all windows must be actively collect values before being emitted. + * the amount of time all windows must be actively collect values before being emitted * @param unit - * The {@link java.util.concurrent.TimeUnit} defining the unit of time for the timespan. + * the {@link java.util.concurrent.TimeUnit} defining the unit of time for the timespan * @param count - * The maximum size of the window. Once a window reaches this size, it is emitted. + * the maximum size of the window. Once a window reaches this size, it is emitted * @param scheduler - * The {@link rx.Scheduler} to use for timing windows. + * the {@link rx.Scheduler} to use for timing windows * @return - * the {@link rx.functions.Func1} object representing the specified window operation. + * the {@link rx.functions.Func1} object representing the specified window operation */ public static OnSubscribeFunc> window(final Observable source, final long timespan, final TimeUnit unit, final int count, final Scheduler scheduler) { return new OnSubscribeFunc>() { @@ -281,54 +294,56 @@ public Subscription onSubscribe(final Observer> observer) } /** - *

This method creates a {@link rx.functions.Func1} object which represents the window operation. This operation takes - * values from the specified {@link rx.Observable} source and stores them in a window. Periodically the window - * is emitted and replaced with a new window. How often this is done depends on the specified timespan. - * The creation of windows is also periodical. How often this is done depends on the specified timeshift. - * When the source {@link rx.Observable} completes or produces an error, the current window is emitted, and - * the event is propagated to all subscribed {@link rx.Observer}s.

- * - *

Note that this operation can produce non-connected, or overlapping windows depending - * on the input parameters.

+ * This method creates a {@link rx.functions.Func1} object which represents the window operation. This + * operation takes values from the specified {@link rx.Observable} source and stores them in a window. + * Periodically the window is emitted and replaced with a new window. How often this is done depends on the + * specified timespan. The creation of windows is also periodical. How often this is done depends on the + * specified timeshift. When the source {@link rx.Observable} completes or produces an error, the current + * window is emitted, and the event is propagated to all subscribed {@link rx.Observer}s. + *

+ * Note that this operation can produce non-connected, or overlapping windows depending on + * the input parameters. + *

* * @param source - * The {@link rx.Observable} which produces values. + * the {@link rx.Observable} which produces values * @param timespan - * The amount of time all windows must be actively collect values before being emitted. + * the amount of time all windows must be actively collect values before being emitted * @param timeshift - * The amount of time between creating windows. + * the amount of time between creating windows * @param unit - * The {@link java.util.concurrent.TimeUnit} defining the unit of time for the timespan. + * the {@link java.util.concurrent.TimeUnit} defining the unit of time for the timespan * @return - * the {@link rx.functions.Func1} object representing the specified window operation. + * the {@link rx.functions.Func1} object representing the specified window operation */ public static OnSubscribeFunc> window(Observable source, long timespan, long timeshift, TimeUnit unit) { return window(source, timespan, timeshift, unit, Schedulers.threadPoolForComputation()); } /** - *

This method creates a {@link rx.functions.Func1} object which represents the window operation. This operation takes - * values from the specified {@link rx.Observable} source and stores them in a window. Periodically the window - * is emitted and replaced with a new window. How often this is done depends on the specified timespan. - * The creation of windows is also periodical. How often this is done depends on the specified timeshift. - * When the source {@link rx.Observable} completes or produces an error, the current window is emitted, and - * the event is propagated to all subscribed {@link rx.Observer}s.

- * - *

Note that this operation can produce non-connected, or overlapping windows depending - * on the input parameters.

+ * This method creates a {@link rx.functions.Func1} object which represents the window operation. This + * operation takes values from the specified {@link rx.Observable} source and stores them in a window. + * Periodically the window is emitted and replaced with a new window. How often this is done depends on the + * specified timespan. The creation of windows is also periodical. How often this is done depends on the + * specified timeshift. When the source {@link rx.Observable} completes or produces an error, the current + * window is emitted, and the event is propagated to all subscribed {@link rx.Observer}s. + *

+ * Note that this operation can produce non-connected, or overlapping windows depending on + * the input parameters. + *

* * @param source - * The {@link rx.Observable} which produces values. + * the {@link rx.Observable} which produces values * @param timespan - * The amount of time all windows must be actively collect values before being emitted. + * the amount of time all windows must be actively collect values before being emitted * @param timeshift - * The amount of time between creating windows. + * the amount of time between creating windows * @param unit - * The {@link java.util.concurrent.TimeUnit} defining the unit of time for the timespan. + * the {@link java.util.concurrent.TimeUnit} defining the unit of time for the timespan * @param scheduler - * The {@link rx.Scheduler} to use for timing windows. + * the {@link rx.Scheduler} to use for timing windows * @return - * the {@link rx.functions.Func1} object representing the specified window operation. + * the {@link rx.functions.Func1} object representing the specified window operation */ public static OnSubscribeFunc> window(final Observable source, final long timespan, final long timeshift, final TimeUnit unit, final Scheduler scheduler) { return new OnSubscribeFunc>() { @@ -345,13 +360,13 @@ public Subscription onSubscribe(final Observer> observer) * This class represents a single window: A sequence of recorded values. * * @param - * The type of objects which this {@link Window} can hold. + * the type of objects which this {@link Window} can hold */ protected static class Window extends Chunk> { /** * @return - * The mutable underlying {@link Observable} which contains all the - * recorded values in this {@link Window} object. + * the mutable underlying {@link Observable} which contains all the recorded values in this + * {@link Window} object */ @Override public Observable getContents() { @@ -360,16 +375,20 @@ public Observable getContents() { } /** - * Emits windows of values of the source Observable where the window boundary is - * determined by the items of the boundary Observable. + * Emits windows of values of the source Observable where the window boundary is determined by the items of + * the boundary Observable. + * + * @param source + * @param boundary + * @return */ public static OnSubscribeFunc> window(Observable source, Observable boundary) { return new WindowViaObservable(source, boundary); } /** - * Create non-overlapping windows from the source values by using another observable's - * values as to when to replace a window. + * Create non-overlapping windows from the source values by using another observable's values as to when to + * replace a window. */ private static final class WindowViaObservable implements OnSubscribeFunc> { final Observable source; diff --git a/rxjava-core/src/main/java/rx/operators/OperatorScan.java b/rxjava-core/src/main/java/rx/operators/OperatorScan.java index 2cd8de2626..d1e9b769a9 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorScan.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorScan.java @@ -21,17 +21,17 @@ import rx.functions.Func2; /** - * Returns an Observable that applies a function to the first item emitted by a source Observable, - * then feeds the result of that function along with the second item emitted by an Observable into - * the same function, and so on until all items have been emitted by the source Observable, - * emitting the result of each of these iterations. + * Returns an Observable that applies a function to the first item emitted by a source Observable, then feeds + * the result of that function along with the second item emitted by an Observable into the same function, and + * so on until all items have been emitted by the source Observable, emitting the result of each of these + * iterations. *

* *

* This sort of function is sometimes called an accumulator. *

- * Note that when you pass a seed to scan() the resulting Observable will emit that - * seed as its first emitted item. + * Note that when you pass a seed to scan() the resulting Observable will emit that seed as its + * first emitted item. */ public final class OperatorScan implements Operator { @@ -41,19 +41,14 @@ public final class OperatorScan implements Operator { private static final Object NO_INITIAL_VALUE = new Object(); /** - * Applies an accumulator function over an observable sequence and returns each intermediate - * result with the specified source and accumulator. + * Applies an accumulator function over an observable sequence and returns each intermediate result with the + * specified source and accumulator. * - * @param sequence - * An observable sequence of elements to project. * @param initialValue - * The initial (seed) accumulator value. + * the initial (seed) accumulator value * @param accumulator - * An accumulator function to be invoked on each element from the sequence. - * - * @return An observable sequence whose elements are the result of accumulating the output from - * the list of Observables. - * @see Observable.Scan(TSource, TAccumulate) Method (IObservable(TSource), TAccumulate, Func(TAccumulate, TSource, + * an accumulator function to be invoked on each element from the sequence + * @see Observable.Scan(TSource, TAccumulate) Method (IObservable(TSource), TAccumulate, Func(TAccumulate, TSource, * TAccumulate)) */ public OperatorScan(R initialValue, Func2 accumulator) { @@ -62,17 +57,12 @@ public OperatorScan(R initialValue, Func2 accumulator) { } /** - * Applies an accumulator function over an observable sequence and returns each intermediate - * result with the specified source and accumulator. + * Applies an accumulator function over an observable sequence and returns each intermediate result with the + * specified source and accumulator. * - * @param sequence - * An observable sequence of elements to project. * @param accumulator - * An accumulator function to be invoked on each element from the sequence. - * - * @return An observable sequence whose elements are the result of accumulating the output from - * the list of Observables. - * @see Observable.Scan(TSource) Method (IObservable(TSource), Func(TSource, TSource, TSource)) + * an accumulator function to be invoked on each element from the sequence + * @see Observable.Scan(TSource) Method (IObservable(TSource), Func(TSource, TSource, TSource)) */ @SuppressWarnings("unchecked") public OperatorScan(final Func2 accumulator) { diff --git a/rxjava-core/src/main/java/rx/operators/SafeObserver.java b/rxjava-core/src/main/java/rx/operators/SafeObserver.java index f776619905..cf632b4dc5 100644 --- a/rxjava-core/src/main/java/rx/operators/SafeObserver.java +++ b/rxjava-core/src/main/java/rx/operators/SafeObserver.java @@ -29,7 +29,8 @@ /** * Wrapper around Observer to ensure compliance with Rx contract. *

- * The following is taken from the Rx Design Guidelines document: http://go.microsoft.com/fwlink/?LinkID=205219 + * The following is taken from the Rx Design Guidelines + * document: *

  * Messages sent to instances of the IObserver interface follow the following grammar:
  * 
@@ -38,8 +39,8 @@
  * This grammar allows observable sequences to send any amount (0 or more) of OnNext messages to the subscribed
  * observer instance, optionally followed by a single success (OnCompleted) or failure (OnError) message.
  * 
- * The single message indicating that an observable sequence has finished ensures that consumers of the observable
- * sequence can deterministically establish that it is safe to perform cleanup operations.
+ * The single message indicating that an observable sequence has finished ensures that consumers of the
+ * observable sequence can deterministically establish that it is safe to perform cleanup operations.
  * 
  * A single failure further ensures that abort semantics can be maintained for operators that work on
  * multiple observable sequences (see paragraph 6.6).
@@ -57,7 +58,7 @@
  * It will not synchronize onNext execution. Use the {@link SynchronizedObserver} to do that.
  * 
  * @param 
- * @Deprecated Replaced by SafeSubscriber
+ * @deprecated replaced by SafeSubscriber
  */
 @Deprecated
 public class SafeObserver implements Observer {
diff --git a/rxjava-core/src/main/java/rx/plugins/RxJavaDefaultSchedulers.java b/rxjava-core/src/main/java/rx/plugins/RxJavaDefaultSchedulers.java
index 5adc50e0d1..f3e7708669 100644
--- a/rxjava-core/src/main/java/rx/plugins/RxJavaDefaultSchedulers.java
+++ b/rxjava-core/src/main/java/rx/plugins/RxJavaDefaultSchedulers.java
@@ -18,29 +18,30 @@
 import rx.Scheduler;
 
 /**
- * Define alternate Scheduler implementations to be returned by the `Schedulers` factory methods.
+ * Define alternate Scheduler implementations to be returned by the {@code Schedulers} factory methods.
  * 

- * See {@link RxJavaPlugins} or the RxJava GitHub Wiki for information on configuring plugins: https://github.com/Netflix/RxJava/wiki/Plugins. + * See {@link RxJavaPlugins} or the RxJava GitHub Wiki for information on configuring plugins: + * https://github.com/Netflix/RxJava/wiki/Plugins. */ public abstract class RxJavaDefaultSchedulers { /** - * Scheduler to return from {@link Schedulers.computation()} or null if default should be used. + * Scheduler to return from {@link rx.schedulers.Schedulers#computation()} or null if default should be + * used. * * This instance should be or behave like a stateless singleton; */ public abstract Scheduler getComputationScheduler(); /** - * Scheduler to return from {@link Schedulers.io()} or null if default should be used. + * Scheduler to return from {@link rx.schedulers.Schedulers#io()} or null if default should be used. * * This instance should be or behave like a stateless singleton; */ public abstract Scheduler getIOScheduler(); /** - * Scheduler to return from {@link Schedulers.newThread()} or null if default should be used. + * Scheduler to return from {@link rx.schedulers.Schedulers#newThread()} or null if default should be used. * * This instance should be or behave like a stateless singleton; */ diff --git a/rxjava-core/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java b/rxjava-core/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java index 64402fe0de..0b21d20ffe 100644 --- a/rxjava-core/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java +++ b/rxjava-core/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java @@ -23,34 +23,33 @@ import rx.functions.Func1; /** - * Abstract ExecutionHook with invocations at different lifecycle points of {@link Observable} - * execution with a default no-op implementation. + * Abstract ExecutionHook with invocations at different lifecycle points of {@link Observable} execution with a + * default no-op implementation. *

- * See {@link RxJavaPlugins} or the RxJava GitHub Wiki for information on configuring plugins: https://github.com/Netflix/RxJava/wiki/ - * Plugins . + * See {@link RxJavaPlugins} or the RxJava GitHub Wiki for information on configuring plugins: + * https://github.com/Netflix/RxJava/wiki/Plugins. *

- * Note on thread-safety and performance + * Note on thread-safety and performance: *

- * A single implementation of this class will be used globally so methods on this class will be - * invoked concurrently from multiple threads so all functionality must be thread-safe. + * A single implementation of this class will be used globally so methods on this class will be invoked + * concurrently from multiple threads so all functionality must be thread-safe. *

- * Methods are also invoked synchronously and will add to execution time of the observable so all - * behavior should be fast. If anything time-consuming is to be done it should be spawned - * asynchronously onto separate worker threads. + * Methods are also invoked synchronously and will add to execution time of the observable so all behavior + * should be fast. If anything time-consuming is to be done it should be spawned asynchronously onto separate + * worker threads. * */ public abstract class RxJavaObservableExecutionHook { /** * Invoked during the construction by {@link Observable#create(OnSubscribe)} *

- * This can be used to decorate or replace the onSubscribe function or just perform - * extra logging, metrics and other such things and pass-thru the function. + * This can be used to decorate or replace the onSubscribe function or just perform extra + * logging, metrics and other such things and pass-thru the function. * - * @param onSubscribe + * @param f * original {@link OnSubscribe}<{@code T}> to be executed - * @return {@link OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or - * just returned as a pass-thru. + * @return {@link OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just + * returned as a pass-thru */ public OnSubscribe onCreate(OnSubscribe f) { return f; @@ -59,13 +58,13 @@ public OnSubscribe onCreate(OnSubscribe f) { /** * Invoked before {@link Observable#subscribe(rx.Subscriber)} is about to be executed. *

- * This can be used to decorate or replace the onSubscribe function or just perform - * extra logging, metrics and other such things and pass-thru the function. + * This can be used to decorate or replace the onSubscribe function or just perform extra + * logging, metrics and other such things and pass-thru the function. * * @param onSubscribe * original {@link OnSubscribe}<{@code T}> to be executed - * @return {@link OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or - * just returned as a pass-thru. + * @return {@link OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just + * returned as a pass-thru */ public OnSubscribe onSubscribeStart(Observable observableInsance, final OnSubscribe onSubscribe) { // pass-thru by default @@ -73,16 +72,16 @@ public OnSubscribe onSubscribeStart(Observable observableIns } /** - * Invoked after successful execution of {@link Observable#subscribe(rx.Subscriber)} with - * returned {@link Subscription}. + * Invoked after successful execution of {@link Observable#subscribe(rx.Subscriber)} with returned + * {@link Subscription}. *

- * This can be used to decorate or replace the {@link Subscription} instance or just perform - * extra logging, metrics and other such things and pass-thru the subscription. + * This can be used to decorate or replace the {@link Subscription} instance or just perform extra logging, + * metrics and other such things and pass-thru the subscription. * * @param subscription * original {@link Subscription} - * @return {@link Subscription} subscription that can be modified, decorated, replaced or just - * returned as a pass-thru. + * @return {@link Subscription} subscription that can be modified, decorated, replaced or just returned as a + * pass-thru */ public Subscription onSubscribeReturn(Subscription subscription) { // pass-thru by default @@ -90,16 +89,14 @@ public Subscription onSubscribeReturn(Subscription subscription) { } /** - * Invoked after failed execution of {@link Observable#subscribe(Subscriber)} with thrown - * Throwable. + * Invoked after failed execution of {@link Observable#subscribe(Subscriber)} with thrown Throwable. *

- * This is NOT errors emitted via {@link Subscriber#onError(Throwable)} but exceptions thrown - * when attempting to subscribe to a {@link Func1}<{@link Subscriber}{@code }, - * {@link Subscription}>. + * This is not errors emitted via {@link Subscriber#onError(Throwable)} but exceptions thrown when + * attempting to subscribe to a {@link Func1}<{@link Subscriber}{@code }, {@link Subscription}>. * * @param e * Throwable thrown by {@link Observable#subscribe(Subscriber)} - * @return Throwable that can be decorated, replaced or just returned as a pass-thru. + * @return Throwable that can be decorated, replaced or just returned as a pass-thru */ public Throwable onSubscribeError(Throwable e) { // pass-thru by default @@ -114,9 +111,9 @@ public Throwable onSubscribeError(Throwable e) { * logging, metrics and other such things and pass-thru the onSubscribe. * * @param lift - * original {@link Operator}{@code} ' * @return {@link Operator}{@code } - * function that can be modified, decorated, replaced or - * just returned as a pass-thru. + * original {@link Operator}{@code } + * @return {@link Operator}{@code } function that can be modified, decorated, replaced or just + * returned as a pass-thru */ public Operator onLift(final Operator lift) { return lift; diff --git a/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java index c6f1344320..ff994f0759 100644 --- a/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java @@ -29,16 +29,17 @@ import rx.subscriptions.Subscriptions; /** - * A {@link Scheduler} implementation that uses an {@link Executor} or {@link ScheduledExecutorService} implementation. + * A {@link Scheduler} implementation that uses an {@link Executor} or {@link ScheduledExecutorService} + * implementation. *

- * Note that if an {@link Executor} implementation is used instead of {@link ScheduledExecutorService} then a system-wide Timer will be used to handle delayed events. + * Note that if an {@link Executor} implementation is used instead of {@link ScheduledExecutorService} then a + * system-wide Timer will be used to handle delayed events. */ public class ExecutorScheduler extends Scheduler { private final Executor executor; /** * @deprecated Use Schedulers.executor(); - * @return */ @Deprecated public ExecutorScheduler(Executor executor) { @@ -47,7 +48,6 @@ public ExecutorScheduler(Executor executor) { /** * @deprecated Use Schedulers.executor(); - * @return */ @Deprecated public ExecutorScheduler(ScheduledExecutorService executor) { diff --git a/rxjava-core/src/main/java/rx/schedulers/Schedulers.java b/rxjava-core/src/main/java/rx/schedulers/Schedulers.java index 7bbfe61eaa..7aa8cc0e55 100644 --- a/rxjava-core/src/main/java/rx/schedulers/Schedulers.java +++ b/rxjava-core/src/main/java/rx/schedulers/Schedulers.java @@ -73,7 +73,7 @@ public static Scheduler immediate() { * {@link Scheduler} that queues work on the current thread to be executed after the current work completes. * * @return {@link TrampolineScheduler} instance - * @deprecated Use trampoline() instead + * @deprecated use {@link #trampoline()} instead */ @Deprecated public static Scheduler currentThread() { @@ -121,14 +121,15 @@ public static Scheduler executor(ScheduledExecutorService executor) { /** * {@link Scheduler} intended for computational work. *

- * The implementation is backed by a {@link ScheduledExecutorService} thread-pool sized to the number of CPU cores. + * The implementation is backed by a {@link ScheduledExecutorService} thread-pool sized to the number of CPU + * cores. *

* This can be used for event-loops, processing callbacks and other computational work. *

* Do not perform IO-bound work on this scheduler. Use {@link #io()} instead. * - * @return {@link ExecutorScheduler} for computation-bound work. - * @Deprecated Use {@link #computation()} + * @return {@link ExecutorScheduler} for computation-bound work + * @deprecated use {@link #computation()} */ @Deprecated public static Scheduler threadPoolForComputation() { @@ -142,7 +143,7 @@ public static Scheduler threadPoolForComputation() { *

* Do not perform IO-bound work on this scheduler. Use {@link #io()} instead. * - * @return {@link Scheduler} for computation-bound work. + * @return {@link Scheduler} for computation-bound work */ public static Scheduler computation() { return INSTANCE.computationScheduler; @@ -157,8 +158,8 @@ public static Scheduler computation() { *

* Do not perform computational work on this scheduler. Use {@link #computation()} instead. * - * @return {@link ExecutorScheduler} for IO-bound work. - * @deprecated Use {@link #io()} instead. + * @return {@link ExecutorScheduler} for IO-bound work + * @deprecated use {@link #io()} instead */ @Deprecated public static Scheduler threadPoolForIO() { @@ -174,7 +175,7 @@ public static Scheduler threadPoolForIO() { *

* Do not perform computational work on this scheduler. Use {@link #computation()} instead. * - * @return {@link ExecutorScheduler} for IO-bound work. + * @return {@link ExecutorScheduler} for IO-bound work */ public static Scheduler io() { return INSTANCE.ioScheduler; diff --git a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java index 5e33a85637..036da20a48 100644 --- a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java @@ -67,23 +67,27 @@ public final class BehaviorSubject extends Subject { /** - * Creates a {@link BehaviorSubject} which publishes the last and all subsequent events to each {@link Observer} that subscribes to it. + * Creates a {@link BehaviorSubject} which publishes the last and all subsequent events to each + * {@link Observer} that subscribes to it. * * @param defaultValue - * The value which will be published to any {@link Observer} as long as the {@link BehaviorSubject} has not yet received any events. - * @return the constructed {@link BehaviorSubject}. - * @deprecated Use {@link create()} instead. + * the value which will be published to any {@link Observer} as long as the + * {@link BehaviorSubject} has not yet received any events + * @return the constructed {@link BehaviorSubject} + * @deprecated use {@link #create(T)} instead */ public static BehaviorSubject createWithDefaultValue(T defaultValue) { return create(defaultValue); } /** - * Creates a {@link BehaviorSubject} which publishes the last and all subsequent events to each {@link Observer} that subscribes to it. + * Creates a {@link BehaviorSubject} which publishes the last and all subsequent events to each + * {@link Observer} that subscribes to it. * * @param defaultValue - * The value which will be published to any {@link Observer} as long as the {@link BehaviorSubject} has not yet received any events. - * @return the constructed {@link BehaviorSubject}. + * the value which will be published to any {@link Observer} as long as the + * {@link BehaviorSubject} has not yet received any events + * @return the constructed {@link BehaviorSubject} */ public static BehaviorSubject create(T defaultValue) { final SubjectSubscriptionManager subscriptionManager = new SubjectSubscriptionManager(); @@ -179,4 +183,4 @@ public void onNext(T v) { } } -} \ No newline at end of file +}