Skip to content

Commit

Permalink
Issue ReactiveX#657: Added Future decorator in RateLimiter (ReactiveX…
Browse files Browse the repository at this point in the history
  • Loading branch information
clgroft authored and RobWin committed Nov 7, 2019
1 parent 9f62d71 commit 0822830
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -153,6 +154,41 @@ static <T> Supplier<CompletionStage<T>> decorateCompletionStage(RateLimiter rate
};
}

/**
* Returns a Supplier which is decorated by a RateLimiter.
* @param rateLimiter the rate limiter
* @param supplier the original supplier
* @param <T> the type of the returned Future's result
* @param <F> the return type of the original Supplier (extends Future&lt;T&gt;)
* @return a supplier which is decorated by a rate limiter.
*/
static <T, F extends Future<T>> Supplier<F> decorateFuture(
RateLimiter rateLimiter,
Supplier<? extends F> supplier
) {
return decorateFuture(rateLimiter, 1, supplier);
}

/**
* Returns a Supplier which is decorated by a RateLimiter.
* @param rateLimiter the rate limiter
* @param permits the number of permits that this call requires
* @param supplier the original supplier
* @param <T> the type of the returned Future's result
* @param <F> the return type of the original Supplier (extends Future&lt;T&gt;)
* @return a supplier which is decorated by a rate limiter.
*/
static <T, F extends Future<T>> Supplier<F> decorateFuture(
RateLimiter rateLimiter,
int permits,
Supplier<? extends F> supplier
) {
return () -> {
waitForPermission(rateLimiter, permits);
return supplier.get();
};
}

/**
* Creates a supplier which is restricted by a RateLimiter.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@
import org.junit.Test;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
Expand Down Expand Up @@ -226,6 +223,34 @@ public void decorateCompletionStage() {
then(supplier).should().get();
}

@Test(expected = RequestNotPermitted.class)
public void decorateFutureFailure()
throws InterruptedException, ExecutionException, TimeoutException {

Supplier<String> supplier = mock(Supplier.class);
given(supplier.get()).willReturn("Resource");
Supplier<Future<String>> decoratedFuture =
RateLimiter.decorateFuture(limit, () -> supplyAsync(supplier));
given(limit.acquirePermission(1)).willReturn(false);

decoratedFuture.get().get(2, TimeUnit.SECONDS);
}

@Test
public void decorateFutureSuccess()
throws ExecutionException, InterruptedException, TimeoutException {
Supplier<String> supplier = mock(Supplier.class);
given(supplier.get()).willReturn("Resource");
Supplier<Future<String>> decoratedFuture =
RateLimiter.decorateFuture(limit, () -> supplyAsync(supplier));
given(limit.acquirePermission(1)).willReturn(true);

String result = decoratedFuture.get().get(2, TimeUnit.SECONDS);

then(supplier).should().get();
assertThat(result).isEqualTo("Resource");
}

@Test
public void waitForPermissionWithOne() {
given(limit.acquirePermission(1)).willReturn(true);
Expand Down

0 comments on commit 0822830

Please sign in to comment.