Skip to content

Commit

Permalink
Merge pull request ReactiveX#373 from jmhofer/throttle-debounce-fix
Browse files Browse the repository at this point in the history
a few warnings, javadoc, and one missing scheduler parameter
  • Loading branch information
benjchristensen committed Sep 13, 2013
2 parents 3d3ea2c + fee07c9 commit 5690e60
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 66 deletions.
64 changes: 20 additions & 44 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -544,7 +543,7 @@ public static <T> Observable<T> from(Iterable<? extends T> iterable) {
* <p>Implementation note: the entire iterable sequence will be immediately emitted each time an {@link Observer} subscribes. Since this occurs before the {@link Subscription} is returned,
* it in not possible to unsubscribe from the sequence before it completes.
*
* @param array
* @param items
* the source sequence
* @param <T>
* the type of items in the {@link Iterable} sequence and the type of items to be
Expand Down Expand Up @@ -1434,7 +1433,7 @@ public static <T> Observable<T> mergeDelayError(Observable<? extends Observable<
}

/**
* This behaves like {@link #merge(Observable...)} except that if any of the merged Observables
* This behaves like {@link #merge(Observable, Observable)} except that if any of the merged Observables
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
* refrain from propagating that error notification until all of the merged Observables have
* finished emitting items.
Expand Down Expand Up @@ -1462,7 +1461,7 @@ public static <T> Observable<T> mergeDelayError(Observable<? extends T> t1, Obse
}

/**
* This behaves like {@link #merge(Observable...)} except that if any of the merged Observables
* This behaves like {@link #merge(Observable, Observable, Observable)} except that if any of the merged Observables
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
* refrain from propagating that error notification until all of the merged Observables have
* finished emitting items.
Expand Down Expand Up @@ -1492,7 +1491,7 @@ public static <T> Observable<T> mergeDelayError(Observable<? extends T> t1, Obse
}

/**
* This behaves like {@link #merge(Observable...)} except that if any of the merged Observables
* This behaves like {@link #merge(Observable, Observable, Observable, Observable)} except that if any of the merged Observables
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
* refrain from propagating that error notification until all of the merged Observables have
* finished emitting items.
Expand Down Expand Up @@ -1524,7 +1523,7 @@ public static <T> Observable<T> mergeDelayError(Observable<? extends T> t1, Obse
}

/**
* This behaves like {@link #merge(Observable...)} except that if any of the merged Observables
* This behaves like {@link #merge(Observable, Observable, Observable, Observable, Observable)} except that if any of the merged Observables
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
* refrain from propagating that error notification until all of the merged Observables have
* finished emitting items.
Expand Down Expand Up @@ -1558,7 +1557,7 @@ public static <T> Observable<T> mergeDelayError(Observable<? extends T> t1, Obse
}

/**
* This behaves like {@link #merge(Observable...)} except that if any of the merged Observables
* This behaves like {@link #merge(Observable, Observable, Observable, Observable, Observable, Observable)} except that if any of the merged Observables
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
* refrain from propagating that error notification until all of the merged Observables have
* finished emitting items.
Expand Down Expand Up @@ -1594,7 +1593,7 @@ public static <T> Observable<T> mergeDelayError(Observable<? extends T> t1, Obse
}

/**
* This behaves like {@link #merge(Observable...)} except that if any of the merged Observables
* This behaves like {@link #merge(Observable, Observable, Observable, Observable, Observable, Observable, Observable)} except that if any of the merged Observables
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
* refrain from propagating that error notification until all of the merged Observables have
* finished emitting items.
Expand Down Expand Up @@ -1632,7 +1631,7 @@ public static <T> Observable<T> mergeDelayError(Observable<? extends T> t1, Obse
}

/**
* This behaves like {@link #merge(Observable...)} except that if any of the merged Observables
* This behaves like {@link #merge(Observable, Observable, Observable, Observable, Observable, Observable, Observable, Observable)} except that if any of the merged Observables
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
* refrain from propagating that error notification until all of the merged Observables have
* finished emitting items.
Expand Down Expand Up @@ -1672,7 +1671,7 @@ public static <T> Observable<T> mergeDelayError(Observable<? extends T> t1, Obse
}

/**
* This behaves like {@link #merge(Observable...)} except that if any of the merged Observables
* This behaves like {@link #merge(Observable, Observable, Observable, Observable, Observable, Observable, Observable, Observable, Observable)} except that if any of the merged Observables
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
* refrain from propagating that error notification until all of the merged Observables have
* finished emitting items.
Expand Down Expand Up @@ -1832,7 +1831,7 @@ public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler
* The {@link TimeUnit} for the timeout.
*
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
* @see {@link #throttleWithTimeout};
* @see #throttleWithTimeout(long, TimeUnit)
*/
public Observable<T> debounce(long timeout, TimeUnit unit) {
return create(OperationDebounce.debounce(this, timeout, unit));
Expand Down Expand Up @@ -1860,10 +1859,10 @@ public Observable<T> debounce(long timeout, TimeUnit unit) {
* @param scheduler
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
* @return Observable which performs the throttle operation.
* @see {@link #throttleWithTimeout};
* @see #throttleWithTimeout(long, TimeUnit, Scheduler)
*/
public Observable<T> debounce(long timeout, TimeUnit unit, Scheduler scheduler) {
return create(OperationDebounce.debounce(this, timeout, unit));
return create(OperationDebounce.debounce(this, timeout, unit, scheduler));
}

/**
Expand All @@ -1887,7 +1886,7 @@ public Observable<T> debounce(long timeout, TimeUnit unit, Scheduler scheduler)
* The {@link TimeUnit} for the timeout.
*
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
* @see {@link #debounce}
* @see #debounce(long, TimeUnit)
*/
public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit) {
return create(OperationDebounce.debounce(this, timeout, unit));
Expand All @@ -1907,7 +1906,7 @@ public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit) {
* @param scheduler
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
* @return Observable which performs the throttle operation.
* @see {@link #debounce}
* @see #debounce(long, TimeUnit, Scheduler)
*/
public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler) {
return create(OperationDebounce.debounce(this, timeout, unit, scheduler));
Expand All @@ -1920,12 +1919,10 @@ public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit, Scheduler
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleFirst.png">
*
* @param skipDuration
* @param windowDuration
* Time to wait before sending another value after emitting last value.
* @param unit
* The unit of time for the specified timeout.
* @param scheduler
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
* @return Observable which performs the throttle operation.
*/
public Observable<T> throttleFirst(long windowDuration, TimeUnit unit) {
Expand Down Expand Up @@ -1963,7 +1960,7 @@ public Observable<T> throttleFirst(long skipDuration, TimeUnit unit, Scheduler s
* @param unit
* The unit of time for the specified interval.
* @return Observable which performs the throttle operation.
* @see {@link #sample(long, TimeUnit)}
* @see #sample(long, TimeUnit)
*/
public Observable<T> throttleLast(long intervalDuration, TimeUnit unit) {
return sample(intervalDuration, unit);
Expand All @@ -1981,7 +1978,7 @@ public Observable<T> throttleLast(long intervalDuration, TimeUnit unit) {
* @param unit
* The unit of time for the specified interval.
* @return Observable which performs the throttle operation.
* @see {@link #sample(long, TimeUnit, Scheduler)}
* @see #sample(long, TimeUnit, Scheduler)
*/
public Observable<T> throttleLast(long intervalDuration, TimeUnit unit, Scheduler scheduler) {
return sample(intervalDuration, unit, scheduler);
Expand Down Expand Up @@ -2727,7 +2724,7 @@ public Observable<Observable<T>> window(int count) {
* The maximum size of each window before it should be emitted.
* @param skip
* How many produced values need to be skipped before starting a new window. Note that when "skip" and
* "count" are equals that this is the same operation as {@link Observable#window(Observable, int)}.
* "count" are equals that this is the same operation as {@link #window(int)}.
* @return
* An {@link Observable} which produces windows every "skipped" values containing at most
* "count" produced values.
Expand Down Expand Up @@ -3334,9 +3331,6 @@ public Observable<T> retry(int retryCount) {
* <p>
* For example, if an Observable fails on first time but emits [1, 2] then succeeds the second time and
* emits [1, 2, 3, 4, 5] then the complete output would be [1, 2, 1, 2, 3, 4, 5, onCompleted].
*
* @param retryCount
* Number of retry attempts before failing.
* @return Observable with retry logic.
*/
public Observable<T> retry() {
Expand Down Expand Up @@ -3657,7 +3651,7 @@ public Observable<T> takeWhileWithIndex(final Func2<? super T, ? super Integer,
* @return an Observable that emits only the very first item from the source, or none if the
* source Observable completes without emitting a single item.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177%28v=vs.103%29.aspx">MSDN: Observable.First</a>
* @see {@link #first()}
* @see #first()
*/
public Observable<T> takeFirst() {
return first();
Expand All @@ -3672,7 +3666,7 @@ public Observable<T> takeFirst() {
* @return an Observable that emits only the very first item satisfying the given condition from the source,
* or none if the source Observable completes without emitting a single matching item.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177%28v=vs.103%29.aspx">MSDN: Observable.First</a>
* @see {@link #first(Func1)}
* @see #first(Func1)
*/
public Observable<T> takeFirst(Func1<? super T, Boolean> predicate) {
return first(predicate);
Expand Down Expand Up @@ -3812,8 +3806,6 @@ public Observable<T> startWith(Iterable<T> values) {
*
* @param t1
* item to include
* @param values
* Iterable of the items you want the modified Observable to emit first
* @return an Observable that exhibits the modified behavior
*/
public Observable<T> startWith(T t1) {
Expand All @@ -3829,8 +3821,6 @@ public Observable<T> startWith(T t1) {
* item to include
* @param t2
* item to include
* @param values
* Iterable of the items you want the modified Observable to emit first
* @return an Observable that exhibits the modified behavior
*/
public Observable<T> startWith(T t1, T t2) {
Expand All @@ -3848,8 +3838,6 @@ public Observable<T> startWith(T t1, T t2) {
* item to include
* @param t3
* item to include
* @param values
* Iterable of the items you want the modified Observable to emit first
* @return an Observable that exhibits the modified behavior
*/
public Observable<T> startWith(T t1, T t2, T t3) {
Expand All @@ -3869,8 +3857,6 @@ public Observable<T> startWith(T t1, T t2, T t3) {
* item to include
* @param t4
* item to include
* @param values
* Iterable of the items you want the modified Observable to emit first
* @return an Observable that exhibits the modified behavior
*/
public Observable<T> startWith(T t1, T t2, T t3, T t4) {
Expand All @@ -3892,8 +3878,6 @@ public Observable<T> startWith(T t1, T t2, T t3, T t4) {
* item to include
* @param t5
* item to include
* @param values
* Iterable of the items you want the modified Observable to emit first
* @return an Observable that exhibits the modified behavior
*/
public Observable<T> startWith(T t1, T t2, T t3, T t4, T t5) {
Expand All @@ -3917,8 +3901,6 @@ public Observable<T> startWith(T t1, T t2, T t3, T t4, T t5) {
* item to include
* @param t6
* item to include
* @param values
* Iterable of the items you want the modified Observable to emit first
* @return an Observable that exhibits the modified behavior
*/
public Observable<T> startWith(T t1, T t2, T t3, T t4, T t5, T t6) {
Expand All @@ -3944,8 +3926,6 @@ public Observable<T> startWith(T t1, T t2, T t3, T t4, T t5, T t6) {
* item to include
* @param t7
* item to include
* @param values
* Iterable of the items you want the modified Observable to emit first
* @return an Observable that exhibits the modified behavior
*/
public Observable<T> startWith(T t1, T t2, T t3, T t4, T t5, T t6, T t7) {
Expand Down Expand Up @@ -3973,8 +3953,6 @@ public Observable<T> startWith(T t1, T t2, T t3, T t4, T t5, T t6, T t7) {
* item to include
* @param t8
* item to include
* @param values
* Iterable of the items you want the modified Observable to emit first
* @return an Observable that exhibits the modified behavior
*/
public Observable<T> startWith(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8) {
Expand Down Expand Up @@ -4004,8 +3982,6 @@ public Observable<T> startWith(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8) {
* item to include
* @param t9
* item to include
* @param values
* Iterable of the items you want the modified Observable to emit first
* @return an Observable that exhibits the modified behavior
*/
public Observable<T> startWith(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9) {
Expand Down
5 changes: 3 additions & 2 deletions rxjava-core/src/main/java/rx/operators/ChunkedOperation.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ protected interface ChunkCreator {
*
* @param <T>
* The type of objects which this {@link Chunk} can hold.
* <C> The type of object being tracked by the {@link Chunk}
* @param <C>
* The type of object being tracked by the {@link Chunk}
*/
protected abstract static class Chunk<T, C> {
protected final List<T> contents = new ArrayList<T>();
Expand All @@ -78,7 +79,7 @@ public void pushValue(T value) {

/**
* @return
* The mutable underlying {@link C} which contains all the
* The mutable underlying {@code C} which contains all the
* recorded values in this {@link Chunk} object.
*/
abstract public C getContents();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,6 @@ public void testEmpty() {
public void testError() {
Observable<String> sourceStrings = Observable.from("one", "two", "three", "four", "five", "six");
Observable<String> errorSource = Observable.error(new RuntimeException("forced failure"));
@SuppressWarnings("unchecked")
Observable<String> source = Observable.concat(sourceStrings, errorSource);

Observable<GroupedObservable<Integer, String>> grouped = Observable.create(groupBy(source, length));
Expand Down
5 changes: 0 additions & 5 deletions rxjava-core/src/main/java/rx/operators/OperationMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ public void before() {
public void testMap() {
Map<String, String> m1 = getMap("One");
Map<String, String> m2 = getMap("Two");
@SuppressWarnings("unchecked")
Observable<Map<String, String>> observable = Observable.from(m1, m2);

Observable<String> m = Observable.create(map(observable, new Func1<Map<String, String>, String>() {
Expand Down Expand Up @@ -176,7 +175,6 @@ public void testMapMany() {
/* now simulate the behavior to take those IDs and perform nested async calls based on them */
Observable<String> m = Observable.create(mapMany(ids, new Func1<Integer, Observable<String>>() {

@SuppressWarnings("unchecked")
@Override
public Observable<String> call(Integer id) {
/* simulate making a nested async call which creates another Observable */
Expand Down Expand Up @@ -215,15 +213,12 @@ public String call(Map<String, String> map) {
public void testMapMany2() {
Map<String, String> m1 = getMap("One");
Map<String, String> m2 = getMap("Two");
@SuppressWarnings("unchecked")
Observable<Map<String, String>> observable1 = Observable.from(m1, m2);

Map<String, String> m3 = getMap("Three");
Map<String, String> m4 = getMap("Four");
@SuppressWarnings("unchecked")
Observable<Map<String, String>> observable2 = Observable.from(m3, m4);

@SuppressWarnings("unchecked")
Observable<Observable<Map<String, String>>> observable = Observable.from(observable1, observable2);

Observable<String> m = Observable.create(mapMany(observable, new Func1<Observable<Map<String, String>>, Observable<String>>() {
Expand Down
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/operators/OperationRetry.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,6 @@ public Subscription onSubscribe(Observer<? super String> o) {
}
return Subscriptions.empty();
}
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,6 @@ public static <T, E> Observable<T> takeUntil(final Observable<? extends T> sourc
Observable<Notification<T>> s = Observable.create(new SourceObservable<T>(source));
Observable<Notification<T>> o = Observable.create(new OtherObservable<T, E>(other));

@SuppressWarnings("unchecked")
/**
* In JDK 7 we could use 'varargs' instead of 'unchecked'.
* See http://stackoverflow.com/questions/1445233/is-it-possible-to-solve-the-a-generic-array-of-t-is-created-for-a-varargs-param
* and http://hg.openjdk.java.net/jdk7/tl/langtools/rev/46cf751559ae
*/
Observable<Notification<T>> result = Observable.merge(s, o);

return result.takeWhile(new Func1<Notification<T>, Boolean>() {
Expand Down
Loading

0 comments on commit 5690e60

Please sign in to comment.