Skip to content

Commit

Permalink
Use 'debounce' as proper name for ThrottleWithTimeout which unfortuna…
Browse files Browse the repository at this point in the history
…tely is the poorly named Rx Throttle operator.

http://drupalmotion.com/article/debounce-and-throttle-visual-explanation

Debounce: Think of it as "grouping multiple events in one". Imagine that you go home, enter in the elevator, doors are closing... and suddenly your neighbor appears in the hall and tries to jump on the elevator. Be polite! and open the doors for him: you are debouncing the elevator departure. Consider that the same situation can happen again with a third person, and so on... probably delaying the departure several minutes.

Throttle: Think of it as a valve, it regulates the flow of the executions. We can determine the maximum number of times a function can be called in certain time. So in the elevator analogy.. you are polite enough to let people in for 10 secs, but once that delay passes, you must go!

http://unscriptable.com/2009/03/20/debouncing-javascript-methods/
http://www.illyriad.co.uk/blog/index.php/2011/09/javascript-dont-spam-your-server-debounce-and-throttle/
  • Loading branch information
benjchristensen committed Sep 10, 2013
1 parent 78cecdd commit 5fabd58
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 20 deletions.
73 changes: 67 additions & 6 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
import rx.operators.OperationTakeUntil;
import rx.operators.OperationTakeWhile;
import rx.operators.OperationThrottleFirst;
import rx.operators.OperationThrottleWithTimeout;
import rx.operators.OperationDebounce;
import rx.operators.OperationTimestamp;
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationToObservableIterable;
Expand Down Expand Up @@ -1811,23 +1811,83 @@ public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler
}

/**
* Throttles by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
* <p>
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
* <p>
* Information on debounce vs throttle:
* <p>
* <ul>
* <li>http://drupalmotion.com/article/debounce-and-throttle-visual-explanation</li>
* <li>http://unscriptable.com/2009/03/20/debouncing-javascript-methods/</li>
* <li>http://www.illyriad.co.uk/blog/index.php/2011/09/javascript-dont-spam-your-server-debounce-and-throttle/</li>
* </ul>
*
* @param timeout
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
* @param unit
* The {@link TimeUnit} for the timeout.
*
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
* @see {@link #throttleWithTimeout};
*/
public Observable<T> debounce(long timeout, TimeUnit unit) {
return create(OperationDebounce.debounce(this, timeout, unit));
}

/**
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
* <p>
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
* <p>
* Information on debounce vs throttle:
* <p>
* <ul>
* <li>http://drupalmotion.com/article/debounce-and-throttle-visual-explanation</li>
* <li>http://unscriptable.com/2009/03/20/debouncing-javascript-methods/</li>
* <li>http://www.illyriad.co.uk/blog/index.php/2011/09/javascript-dont-spam-your-server-debounce-and-throttle/</li>
* </ul>
*
* @param timeout
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
* @param unit
* The unit of time for the specified timeout.
* @param scheduler
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
* @return Observable which performs the throttle operation.
* @see {@link #throttleWithTimeout};
*/
public Observable<T> debounce(long timeout, TimeUnit unit, Scheduler scheduler) {
return create(OperationDebounce.debounce(this, timeout, unit));
}

/**
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
* <p>
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
* <p>
* Information on debounce vs throttle:
* <p>
* <ul>
* <li>http://drupalmotion.com/article/debounce-and-throttle-visual-explanation</li>
* <li>http://unscriptable.com/2009/03/20/debouncing-javascript-methods/</li>
* <li>http://www.illyriad.co.uk/blog/index.php/2011/09/javascript-dont-spam-your-server-debounce-and-throttle/</li>
* </ul>
*
* @param timeout
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
* @param unit
* The {@link TimeUnit} for the timeout.
*
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
* @see {@link #debounce}
*/
public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit) {
return create(OperationThrottleWithTimeout.throttleWithTimeout(this, timeout, unit));
return create(OperationDebounce.debounce(this, timeout, unit));
}

/**
* Throttles by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
* <p>
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
*
Expand All @@ -1838,11 +1898,12 @@ public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit) {
* @param scheduler
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
* @return Observable which performs the throttle operation.
* @see {@link #debounce}
*/
public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler) {
return create(OperationThrottleWithTimeout.throttleWithTimeout(this, timeout, unit, scheduler));
return create(OperationDebounce.debounce(this, timeout, unit, scheduler));
}

/**
* Throttles by skipping value until `skipDuration` passes and then emits the next received value.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
* quickly followed up with other values. Values which are not followed up by other values within the specified timeout are published
* as soon as the timeout expires.
*/
public final class OperationThrottleWithTimeout {
public final class OperationDebounce {

/**
* This operation filters out events which are published too quickly in succession. This is done by dropping events which are
Expand All @@ -56,8 +56,8 @@ public final class OperationThrottleWithTimeout {
* The unit of time for the specified timeout.
* @return A {@link Func1} which performs the throttle operation.
*/
public static <T> OnSubscribeFunc<T> throttleWithTimeout(Observable<T> items, long timeout, TimeUnit unit) {
return throttleWithTimeout(items, timeout, unit, Schedulers.threadPoolForComputation());
public static <T> OnSubscribeFunc<T> debounce(Observable<T> items, long timeout, TimeUnit unit) {
return debounce(items, timeout, unit, Schedulers.threadPoolForComputation());
}

/**
Expand All @@ -75,23 +75,23 @@ public static <T> OnSubscribeFunc<T> throttleWithTimeout(Observable<T> items, lo
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
* @return A {@link Func1} which performs the throttle operation.
*/
public static <T> OnSubscribeFunc<T> throttleWithTimeout(final Observable<T> items, final long timeout, final TimeUnit unit, final Scheduler scheduler) {
public static <T> OnSubscribeFunc<T> debounce(final Observable<T> items, final long timeout, final TimeUnit unit, final Scheduler scheduler) {
return new OnSubscribeFunc<T>() {
@Override
public Subscription onSubscribe(Observer<? super T> observer) {
return new Throttle<T>(items, timeout, unit, scheduler).onSubscribe(observer);
return new Debounce<T>(items, timeout, unit, scheduler).onSubscribe(observer);
}
};
}

private static class Throttle<T> implements OnSubscribeFunc<T> {
private static class Debounce<T> implements OnSubscribeFunc<T> {

private final Observable<T> items;
private final long timeout;
private final TimeUnit unit;
private final Scheduler scheduler;

public Throttle(Observable<T> items, long timeout, TimeUnit unit, Scheduler scheduler) {
public Debounce(Observable<T> items, long timeout, TimeUnit unit, Scheduler scheduler) {
this.items = items;
this.timeout = timeout;
this.unit = unit;
Expand All @@ -100,11 +100,11 @@ public Throttle(Observable<T> items, long timeout, TimeUnit unit, Scheduler sche

@Override
public Subscription onSubscribe(Observer<? super T> observer) {
return items.subscribe(new ThrottledObserver<T>(observer, timeout, unit, scheduler));
return items.subscribe(new DebounceObserver<T>(observer, timeout, unit, scheduler));
}
}

private static class ThrottledObserver<T> implements Observer<T> {
private static class DebounceObserver<T> implements Observer<T> {

private final Observer<? super T> observer;
private final long timeout;
Expand All @@ -113,7 +113,7 @@ private static class ThrottledObserver<T> implements Observer<T> {

private final AtomicReference<Subscription> lastScheduledNotification = new AtomicReference<Subscription>();

public ThrottledObserver(Observer<? super T> observer, long timeout, TimeUnit unit, Scheduler scheduler) {
public DebounceObserver(Observer<? super T> observer, long timeout, TimeUnit unit, Scheduler scheduler) {
// we need to synchronize the observer since the on* events can be coming from different
// threads and are thus non-deterministic and could be interleaved
this.observer = new SynchronizedObserver<T>(observer);
Expand Down Expand Up @@ -174,7 +174,7 @@ public void before() {
}

@Test
public void testThrottlingWithCompleted() {
public void testDebounceWithCompleted() {
Observable<String> source = Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(Observer<? super String> observer) {
Expand All @@ -187,7 +187,7 @@ public Subscription onSubscribe(Observer<? super String> observer) {
}
});

Observable<String> sampled = Observable.create(OperationThrottleWithTimeout.throttleWithTimeout(source, 400, TimeUnit.MILLISECONDS, scheduler));
Observable<String> sampled = Observable.create(OperationDebounce.debounce(source, 400, TimeUnit.MILLISECONDS, scheduler));
sampled.subscribe(observer);

scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS);
Expand All @@ -201,7 +201,38 @@ public Subscription onSubscribe(Observer<? super String> observer) {
}

@Test
public void testThrottlingWithError() {
public void testDebounceNeverEmits() {
Observable<String> source = Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(Observer<? super String> observer) {
// all should be skipped since they are happening faster than the 200ms timeout
publishNext(observer, 100, "a"); // Should be skipped
publishNext(observer, 200, "b"); // Should be skipped
publishNext(observer, 300, "c"); // Should be skipped
publishNext(observer, 400, "d"); // Should be skipped
publishNext(observer, 500, "e"); // Should be skipped
publishNext(observer, 600, "f"); // Should be skipped
publishNext(observer, 700, "g"); // Should be skipped
publishNext(observer, 800, "h"); // Should be skipped
publishCompleted(observer, 900); // Should be published as soon as the timeout expires.

return Subscriptions.empty();
}
});

Observable<String> sampled = Observable.create(OperationDebounce.debounce(source, 200, TimeUnit.MILLISECONDS, scheduler));
sampled.subscribe(observer);

scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS);
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(0)).onNext(anyString());
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

@Test
public void testDebounceWithError() {
Observable<String> source = Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(Observer<? super String> observer) {
Expand All @@ -214,7 +245,7 @@ public Subscription onSubscribe(Observer<? super String> observer) {
}
});

Observable<String> sampled = Observable.create(OperationThrottleWithTimeout.throttleWithTimeout(source, 400, TimeUnit.MILLISECONDS, scheduler));
Observable<String> sampled = Observable.create(OperationDebounce.debounce(source, 400, TimeUnit.MILLISECONDS, scheduler));
sampled.subscribe(observer);

scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS);
Expand Down

0 comments on commit 5fabd58

Please sign in to comment.