Skip to content

Commit

Permalink
Operators: throttleWithTimeout, throttleFirst, throttleLast
Browse files Browse the repository at this point in the history
- javadocs explaining differences
- link between throttleLast and sample (aliase)
- refactored throttleFirst to be a more efficient implementations
- concurrency changes to throttleWithTimeout
  • Loading branch information
benjchristensen committed Sep 10, 2013
1 parent d0e50fe commit 78cecdd
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 541 deletions.
77 changes: 40 additions & 37 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
*/
package rx;

import static rx.util.functions.Functions.not;
import static rx.util.functions.Functions.*;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -64,10 +63,8 @@
import rx.operators.OperationTakeLast;
import rx.operators.OperationTakeUntil;
import rx.operators.OperationTakeWhile;
import rx.operators.OperationThrottle;
import rx.operators.OperationThrottleFirst;
import rx.operators.OperationThrottleWithTimeout;
import rx.operators.OperationThrottleLast;
import rx.operators.OperationTimestamp;
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationToObservableIterable;
Expand Down Expand Up @@ -1814,9 +1811,9 @@ public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler
}

/**
* Throttles by dropping all values that are followed by newer values before the timeout value expires. The timer reset on each `onNext` call.
* Throttles by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
* <p>
* NOTE: If the timeout is set higher than the rate of traffic then this will drop all data.
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
*
* @param timeout
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
Expand All @@ -1830,9 +1827,9 @@ public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit) {
}

/**
* Throttles by dropping all values that are followed by newer values before the timeout value expires. The timer reset on each `onNext` call.
* Throttles by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
* <p>
* NOTE: If the timeout is set higher than the rate of traffic then this will drop all data.
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
*
* @param timeout
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
Expand All @@ -1847,64 +1844,70 @@ public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit, Scheduler
}

/**
* Throttles to first value in each window.
* Throttles by skipping value until `skipDuration` passes and then emits the next received value.
* <p>
* This differs from {@link #throttleLast} in that this only tracks passage of time whereas {@link #throttleLast} ticks at scheduled intervals.
*
* @param windowDuration
* Duration of windows within with the first value will be chosen.
* @param skipDuration
* Time to wait before sending another value after emitting last value.
* @param unit
* The unit of time for the specified timeout.
* @param scheduler
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
* @return Observable which performs the throttle operation.
*/
public Observable<T> throttleFirst(long windowDuration, TimeUnit unit) {
return create(OperationThrottleFirst.throttleFirst(this, windowDuration, unit));
}

/**
* Throttles to first value in each window.
* Throttles by skipping value until `skipDuration` passes and then emits the next received value.
* <p>
* This differs from {@link #throttleLast} in that this only tracks passage of time whereas {@link #throttleLast} ticks at scheduled intervals.
*
* @param windowDuration
* Duration of windows within with the first value will be chosen.
* @param skipDuration
* Time to wait before sending another value after emitting last value.
* @param unit
* The unit of time for the specified timeout.
* @param scheduler
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
* @return Observable which performs the throttle operation.
*/
public Observable<T> throttleFirst(long windowDuration, TimeUnit unit, Scheduler scheduler) {
return create(OperationThrottleFirst.throttleFirst(this, windowDuration, unit, scheduler));
public Observable<T> throttleFirst(long skipDuration, TimeUnit unit, Scheduler scheduler) {
return create(OperationThrottleFirst.throttleFirst(this, skipDuration, unit, scheduler));
}



/**
* Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired.
*
* @param timeout
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
* Throttles by returning the last value of each interval defined by 'intervalDuration'.
* <p>
* This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas {@link #throttleFirst} does not tick, it just tracks passage of time.
*
* @param intervalDuration
* Duration of windows within with the last value will be chosen.
* @param unit
* The {@link TimeUnit} for the timeout.
*
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
* The unit of time for the specified interval.
* @return Observable which performs the throttle operation.
* @see {@link #sample(long, TimeUnit)}
*/
public Observable<T> throttleLast(long timeout, TimeUnit unit) {
return create(OperationThrottleLast.throttleLast(this, timeout, unit));
public Observable<T> throttleLast(long intervalDuration, TimeUnit unit) {
return sample(intervalDuration, unit);
}

/**
* Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired.
* Throttles by returning the last value of each interval defined by 'intervalDuration'.
* <p>
* This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas {@link #throttleFirst} does not tick, it just tracks passage of time.
*
* @param timeout
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
* @param intervalDuration
* Duration of windows within with the last value will be chosen.
* @param unit
* The {@link TimeUnit} for the timeout.
* @param scheduler
* The {@link Scheduler} to use when timing incoming values.
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
* The unit of time for the specified interval.
* @return Observable which performs the throttle operation.
* @see {@link #sample(long, TimeUnit, Scheduler)}
*/
public Observable<T> throttleLast(long timeout, TimeUnit unit, Scheduler scheduler) {
return create(OperationThrottleLast.throttleLast(this, timeout, unit, scheduler));
public Observable<T> throttleLast(long intervalDuration, TimeUnit unit, Scheduler scheduler) {
return sample(intervalDuration, unit, scheduler);
}


/**
* Wraps each item emitted by a source Observable in a {@link Timestamped} object.
Expand Down
Loading

0 comments on commit 78cecdd

Please sign in to comment.