Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

a few warnings, javadoc, and one missing scheduler parameter #373

Merged
merged 2 commits into from
Sep 13, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 20 additions & 44 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -544,7 +543,7 @@ public static <T> Observable<T> from(Iterable<? extends T> iterable) {
* <p>Implementation note: the entire iterable sequence will be immediately emitted each time an {@link Observer} subscribes. Since this occurs before the {@link Subscription} is returned,
* it in not possible to unsubscribe from the sequence before it completes.
*
* @param array
* @param items
* the source sequence
* @param <T>
* the type of items in the {@link Iterable} sequence and the type of items to be
Expand Down Expand Up @@ -1434,7 +1433,7 @@ public static <T> Observable<T> mergeDelayError(Observable<? extends Observable<
}

/**
* This behaves like {@link #merge(Observable...)} except that if any of the merged Observables
* This behaves like {@link #merge(Observable, Observable)} except that if any of the merged Observables
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
* refrain from propagating that error notification until all of the merged Observables have
* finished emitting items.
Expand Down Expand Up @@ -1462,7 +1461,7 @@ public static <T> Observable<T> mergeDelayError(Observable<? extends T> t1, Obse
}

/**
* This behaves like {@link #merge(Observable...)} except that if any of the merged Observables
* This behaves like {@link #merge(Observable, Observable, Observable)} except that if any of the merged Observables
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
* refrain from propagating that error notification until all of the merged Observables have
* finished emitting items.
Expand Down Expand Up @@ -1492,7 +1491,7 @@ public static <T> Observable<T> mergeDelayError(Observable<? extends T> t1, Obse
}

/**
* This behaves like {@link #merge(Observable...)} except that if any of the merged Observables
* This behaves like {@link #merge(Observable, Observable, Observable, Observable)} except that if any of the merged Observables
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
* refrain from propagating that error notification until all of the merged Observables have
* finished emitting items.
Expand Down Expand Up @@ -1524,7 +1523,7 @@ public static <T> Observable<T> mergeDelayError(Observable<? extends T> t1, Obse
}

/**
* This behaves like {@link #merge(Observable...)} except that if any of the merged Observables
* This behaves like {@link #merge(Observable, Observable, Observable, Observable, Observable)} except that if any of the merged Observables
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
* refrain from propagating that error notification until all of the merged Observables have
* finished emitting items.
Expand Down Expand Up @@ -1558,7 +1557,7 @@ public static <T> Observable<T> mergeDelayError(Observable<? extends T> t1, Obse
}

/**
* This behaves like {@link #merge(Observable...)} except that if any of the merged Observables
* This behaves like {@link #merge(Observable, Observable, Observable, Observable, Observable, Observable)} except that if any of the merged Observables
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
* refrain from propagating that error notification until all of the merged Observables have
* finished emitting items.
Expand Down Expand Up @@ -1594,7 +1593,7 @@ public static <T> Observable<T> mergeDelayError(Observable<? extends T> t1, Obse
}

/**
* This behaves like {@link #merge(Observable...)} except that if any of the merged Observables
* This behaves like {@link #merge(Observable, Observable, Observable, Observable, Observable, Observable, Observable)} except that if any of the merged Observables
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
* refrain from propagating that error notification until all of the merged Observables have
* finished emitting items.
Expand Down Expand Up @@ -1632,7 +1631,7 @@ public static <T> Observable<T> mergeDelayError(Observable<? extends T> t1, Obse
}

/**
* This behaves like {@link #merge(Observable...)} except that if any of the merged Observables
* This behaves like {@link #merge(Observable, Observable, Observable, Observable, Observable, Observable, Observable, Observable)} except that if any of the merged Observables
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
* refrain from propagating that error notification until all of the merged Observables have
* finished emitting items.
Expand Down Expand Up @@ -1672,7 +1671,7 @@ public static <T> Observable<T> mergeDelayError(Observable<? extends T> t1, Obse
}

/**
* This behaves like {@link #merge(Observable...)} except that if any of the merged Observables
* This behaves like {@link #merge(Observable, Observable, Observable, Observable, Observable, Observable, Observable, Observable, Observable)} except that if any of the merged Observables
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
* refrain from propagating that error notification until all of the merged Observables have
* finished emitting items.
Expand Down Expand Up @@ -1832,7 +1831,7 @@ public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler
* The {@link TimeUnit} for the timeout.
*
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
* @see {@link #throttleWithTimeout};
* @see #throttleWithTimeout(long, TimeUnit)
*/
public Observable<T> debounce(long timeout, TimeUnit unit) {
return create(OperationDebounce.debounce(this, timeout, unit));
Expand Down Expand Up @@ -1860,10 +1859,10 @@ public Observable<T> debounce(long timeout, TimeUnit unit) {
* @param scheduler
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
* @return Observable which performs the throttle operation.
* @see {@link #throttleWithTimeout};
* @see #throttleWithTimeout(long, TimeUnit, Scheduler)
*/
public Observable<T> debounce(long timeout, TimeUnit unit, Scheduler scheduler) {
return create(OperationDebounce.debounce(this, timeout, unit));
return create(OperationDebounce.debounce(this, timeout, unit, scheduler));
}

/**
Expand All @@ -1887,7 +1886,7 @@ public Observable<T> debounce(long timeout, TimeUnit unit, Scheduler scheduler)
* The {@link TimeUnit} for the timeout.
*
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
* @see {@link #debounce}
* @see #debounce(long, TimeUnit)
*/
public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit) {
return create(OperationDebounce.debounce(this, timeout, unit));
Expand All @@ -1907,7 +1906,7 @@ public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit) {
* @param scheduler
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
* @return Observable which performs the throttle operation.
* @see {@link #debounce}
* @see #debounce(long, TimeUnit, Scheduler)
*/
public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler) {
return create(OperationDebounce.debounce(this, timeout, unit, scheduler));
Expand All @@ -1920,12 +1919,10 @@ public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit, Scheduler
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleFirst.png">
*
* @param skipDuration
* @param windowDuration
* Time to wait before sending another value after emitting last value.
* @param unit
* The unit of time for the specified timeout.
* @param scheduler
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
* @return Observable which performs the throttle operation.
*/
public Observable<T> throttleFirst(long windowDuration, TimeUnit unit) {
Expand Down Expand Up @@ -1963,7 +1960,7 @@ public Observable<T> throttleFirst(long skipDuration, TimeUnit unit, Scheduler s
* @param unit
* The unit of time for the specified interval.
* @return Observable which performs the throttle operation.
* @see {@link #sample(long, TimeUnit)}
* @see #sample(long, TimeUnit)
*/
public Observable<T> throttleLast(long intervalDuration, TimeUnit unit) {
return sample(intervalDuration, unit);
Expand All @@ -1981,7 +1978,7 @@ public Observable<T> throttleLast(long intervalDuration, TimeUnit unit) {
* @param unit
* The unit of time for the specified interval.
* @return Observable which performs the throttle operation.
* @see {@link #sample(long, TimeUnit, Scheduler)}
* @see #sample(long, TimeUnit, Scheduler)
*/
public Observable<T> throttleLast(long intervalDuration, TimeUnit unit, Scheduler scheduler) {
return sample(intervalDuration, unit, scheduler);
Expand Down Expand Up @@ -2727,7 +2724,7 @@ public Observable<Observable<T>> window(int count) {
* The maximum size of each window before it should be emitted.
* @param skip
* How many produced values need to be skipped before starting a new window. Note that when "skip" and
* "count" are equals that this is the same operation as {@link Observable#window(Observable, int)}.
* "count" are equals that this is the same operation as {@link #window(int)}.
* @return
* An {@link Observable} which produces windows every "skipped" values containing at most
* "count" produced values.
Expand Down Expand Up @@ -3334,9 +3331,6 @@ public Observable<T> retry(int retryCount) {
* <p>
* For example, if an Observable fails on first time but emits [1, 2] then succeeds the second time and
* emits [1, 2, 3, 4, 5] then the complete output would be [1, 2, 1, 2, 3, 4, 5, onCompleted].
*
* @param retryCount
* Number of retry attempts before failing.
* @return Observable with retry logic.
*/
public Observable<T> retry() {
Expand Down Expand Up @@ -3657,7 +3651,7 @@ public Observable<T> takeWhileWithIndex(final Func2<? super T, ? super Integer,
* @return an Observable that emits only the very first item from the source, or none if the
* source Observable completes without emitting a single item.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177%28v=vs.103%29.aspx">MSDN: Observable.First</a>
* @see {@link #first()}
* @see #first()
*/
public Observable<T> takeFirst() {
return first();
Expand All @@ -3672,7 +3666,7 @@ public Observable<T> takeFirst() {
* @return an Observable that emits only the very first item satisfying the given condition from the source,
* or none if the source Observable completes without emitting a single matching item.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177%28v=vs.103%29.aspx">MSDN: Observable.First</a>
* @see {@link #first(Func1)}
* @see #first(Func1)
*/
public Observable<T> takeFirst(Func1<? super T, Boolean> predicate) {
return first(predicate);
Expand Down Expand Up @@ -3812,8 +3806,6 @@ public Observable<T> startWith(Iterable<T> values) {
*
* @param t1
* item to include
* @param values
* Iterable of the items you want the modified Observable to emit first
* @return an Observable that exhibits the modified behavior
*/
public Observable<T> startWith(T t1) {
Expand All @@ -3829,8 +3821,6 @@ public Observable<T> startWith(T t1) {
* item to include
* @param t2
* item to include
* @param values
* Iterable of the items you want the modified Observable to emit first
* @return an Observable that exhibits the modified behavior
*/
public Observable<T> startWith(T t1, T t2) {
Expand All @@ -3848,8 +3838,6 @@ public Observable<T> startWith(T t1, T t2) {
* item to include
* @param t3
* item to include
* @param values
* Iterable of the items you want the modified Observable to emit first
* @return an Observable that exhibits the modified behavior
*/
public Observable<T> startWith(T t1, T t2, T t3) {
Expand All @@ -3869,8 +3857,6 @@ public Observable<T> startWith(T t1, T t2, T t3) {
* item to include
* @param t4
* item to include
* @param values
* Iterable of the items you want the modified Observable to emit first
* @return an Observable that exhibits the modified behavior
*/
public Observable<T> startWith(T t1, T t2, T t3, T t4) {
Expand All @@ -3892,8 +3878,6 @@ public Observable<T> startWith(T t1, T t2, T t3, T t4) {
* item to include
* @param t5
* item to include
* @param values
* Iterable of the items you want the modified Observable to emit first
* @return an Observable that exhibits the modified behavior
*/
public Observable<T> startWith(T t1, T t2, T t3, T t4, T t5) {
Expand All @@ -3917,8 +3901,6 @@ public Observable<T> startWith(T t1, T t2, T t3, T t4, T t5) {
* item to include
* @param t6
* item to include
* @param values
* Iterable of the items you want the modified Observable to emit first
* @return an Observable that exhibits the modified behavior
*/
public Observable<T> startWith(T t1, T t2, T t3, T t4, T t5, T t6) {
Expand All @@ -3944,8 +3926,6 @@ public Observable<T> startWith(T t1, T t2, T t3, T t4, T t5, T t6) {
* item to include
* @param t7
* item to include
* @param values
* Iterable of the items you want the modified Observable to emit first
* @return an Observable that exhibits the modified behavior
*/
public Observable<T> startWith(T t1, T t2, T t3, T t4, T t5, T t6, T t7) {
Expand Down Expand Up @@ -3973,8 +3953,6 @@ public Observable<T> startWith(T t1, T t2, T t3, T t4, T t5, T t6, T t7) {
* item to include
* @param t8
* item to include
* @param values
* Iterable of the items you want the modified Observable to emit first
* @return an Observable that exhibits the modified behavior
*/
public Observable<T> startWith(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8) {
Expand Down Expand Up @@ -4004,8 +3982,6 @@ public Observable<T> startWith(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8) {
* item to include
* @param t9
* item to include
* @param values
* Iterable of the items you want the modified Observable to emit first
* @return an Observable that exhibits the modified behavior
*/
public Observable<T> startWith(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9) {
Expand Down
5 changes: 3 additions & 2 deletions rxjava-core/src/main/java/rx/operators/ChunkedOperation.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ protected interface ChunkCreator {
*
* @param <T>
* The type of objects which this {@link Chunk} can hold.
* <C> The type of object being tracked by the {@link Chunk}
* @param <C>
* The type of object being tracked by the {@link Chunk}
*/
protected abstract static class Chunk<T, C> {
protected final List<T> contents = new ArrayList<T>();
Expand All @@ -78,7 +79,7 @@ public void pushValue(T value) {

/**
* @return
* The mutable underlying {@link C} which contains all the
* The mutable underlying {@code C} which contains all the
* recorded values in this {@link Chunk} object.
*/
abstract public C getContents();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,6 @@ public void testEmpty() {
public void testError() {
Observable<String> sourceStrings = Observable.from("one", "two", "three", "four", "five", "six");
Observable<String> errorSource = Observable.error(new RuntimeException("forced failure"));
@SuppressWarnings("unchecked")
Observable<String> source = Observable.concat(sourceStrings, errorSource);

Observable<GroupedObservable<Integer, String>> grouped = Observable.create(groupBy(source, length));
Expand Down
5 changes: 0 additions & 5 deletions rxjava-core/src/main/java/rx/operators/OperationMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ public void before() {
public void testMap() {
Map<String, String> m1 = getMap("One");
Map<String, String> m2 = getMap("Two");
@SuppressWarnings("unchecked")
Observable<Map<String, String>> observable = Observable.from(m1, m2);

Observable<String> m = Observable.create(map(observable, new Func1<Map<String, String>, String>() {
Expand Down Expand Up @@ -176,7 +175,6 @@ public void testMapMany() {
/* now simulate the behavior to take those IDs and perform nested async calls based on them */
Observable<String> m = Observable.create(mapMany(ids, new Func1<Integer, Observable<String>>() {

@SuppressWarnings("unchecked")
@Override
public Observable<String> call(Integer id) {
/* simulate making a nested async call which creates another Observable */
Expand Down Expand Up @@ -215,15 +213,12 @@ public String call(Map<String, String> map) {
public void testMapMany2() {
Map<String, String> m1 = getMap("One");
Map<String, String> m2 = getMap("Two");
@SuppressWarnings("unchecked")
Observable<Map<String, String>> observable1 = Observable.from(m1, m2);

Map<String, String> m3 = getMap("Three");
Map<String, String> m4 = getMap("Four");
@SuppressWarnings("unchecked")
Observable<Map<String, String>> observable2 = Observable.from(m3, m4);

@SuppressWarnings("unchecked")
Observable<Observable<Map<String, String>>> observable = Observable.from(observable1, observable2);

Observable<String> m = Observable.create(mapMany(observable, new Func1<Observable<Map<String, String>>, Observable<String>>() {
Expand Down
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/operators/OperationRetry.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,6 @@ public Subscription onSubscribe(Observer<? super String> o) {
}
return Subscriptions.empty();
}
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,6 @@ public static <T, E> Observable<T> takeUntil(final Observable<? extends T> sourc
Observable<Notification<T>> s = Observable.create(new SourceObservable<T>(source));
Observable<Notification<T>> o = Observable.create(new OtherObservable<T, E>(other));

@SuppressWarnings("unchecked")
/**
* In JDK 7 we could use 'varargs' instead of 'unchecked'.
* See http://stackoverflow.com/questions/1445233/is-it-possible-to-solve-the-a-generic-array-of-t-is-created-for-a-varargs-param
* and http://hg.openjdk.java.net/jdk7/tl/langtools/rev/46cf751559ae
*/
Observable<Notification<T>> result = Observable.merge(s, o);

return result.takeWhile(new Func1<Notification<T>, Boolean>() {
Expand Down
Loading