From c260629050afbad50eae55e5e79f99d480db6e1e Mon Sep 17 00:00:00 2001 From: bstorozhuk Date: Wed, 23 Nov 2016 22:07:38 +0200 Subject: [PATCH] Issue #12 Initial RateLimiter implementation and JavaDocs --- build.gradle | 5 +- .../javaslang/ratelimiter/RateLimiter.java | 184 +++++++++++++++ .../ratelimiter/RateLimiterConfig.java | 116 +++++++++ .../ratelimiter/RateLimiterRegistry.java | 41 ++++ .../ratelimiter/RequestNotPermitted.java | 17 ++ .../internal/InMemoryRateLimiterRegistry.java | 71 ++++++ .../SemaphoreBasedRateLimiterImpl.java | 157 +++++++++++++ .../ratelimiter/RateLimiterConfigTest.java | 91 ++++++++ .../ratelimiter/RateLimiterTest.java | 200 ++++++++++++++++ .../InMemoryRateLimiterRegistryTest.java | 118 ++++++++++ .../SemaphoreBasedRateLimiterImplTest.java | 221 ++++++++++++++++++ 11 files changed, 1220 insertions(+), 1 deletion(-) create mode 100644 src/main/java/javaslang/ratelimiter/RateLimiter.java create mode 100644 src/main/java/javaslang/ratelimiter/RateLimiterConfig.java create mode 100644 src/main/java/javaslang/ratelimiter/RateLimiterRegistry.java create mode 100644 src/main/java/javaslang/ratelimiter/RequestNotPermitted.java create mode 100644 src/main/java/javaslang/ratelimiter/internal/InMemoryRateLimiterRegistry.java create mode 100644 src/main/java/javaslang/ratelimiter/internal/SemaphoreBasedRateLimiterImpl.java create mode 100644 src/test/java/javaslang/ratelimiter/RateLimiterConfigTest.java create mode 100644 src/test/java/javaslang/ratelimiter/RateLimiterTest.java create mode 100644 src/test/java/javaslang/ratelimiter/internal/InMemoryRateLimiterRegistryTest.java create mode 100644 src/test/java/javaslang/ratelimiter/internal/SemaphoreBasedRateLimiterImplTest.java diff --git a/build.gradle b/build.gradle index cf834540fe..8220612338 100644 --- a/build.gradle +++ b/build.gradle @@ -62,13 +62,16 @@ jmh { dependencies { compile "io.javaslang:javaslang:2.0.4" compile "org.slf4j:slf4j-api:1.7.13" + testCompile "io.dropwizard.metrics:metrics-core:3.1.2" testCompile "junit:junit:4.11" testCompile "org.assertj:assertj-core:3.0.0" testCompile "ch.qos.logback:logback-classic:0.9.26" testCompile "io.dropwizard.metrics:metrics-healthchecks:3.1.2" - testCompile "org.mockito:mockito-all:1.10.19" + testCompile "org.mockito:mockito-core:1.10.19" testCompile "io.projectreactor:reactor-core:2.5.0.M2" + testCompile "com.jayway.awaitility:awaitility:1.7.0" + jmh "ch.qos.logback:logback-classic:0.9.26" } diff --git a/src/main/java/javaslang/ratelimiter/RateLimiter.java b/src/main/java/javaslang/ratelimiter/RateLimiter.java new file mode 100644 index 0000000000..1a5ff9c2b2 --- /dev/null +++ b/src/main/java/javaslang/ratelimiter/RateLimiter.java @@ -0,0 +1,184 @@ +package javaslang.ratelimiter; + +import javaslang.control.Try; + +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * A RateLimiter distributes permits at a configurable rate. {@link #getPermission} blocks if necessary + * until a permit is available, and then takes it. Once acquired, permits need not be released. + */ +public interface RateLimiter { + + /** + * Acquires a permission from this rate limiter, blocking until one is + * available. + *

+ *

If the current thread is {@linkplain Thread#interrupt interrupted} + * while waiting for a permit then it won't throw {@linkplain InterruptedException}, + * but its interrupt status will be set. + * + * @return {@code true} if a permit was acquired and {@code false} + * if waiting time elapsed before a permit was acquired + */ + boolean getPermission(Duration timeoutDuration); + + /** + * Get the name of this RateLimiter + * + * @return the name of this RateLimiter + */ + String getName(); + + /** + * Get the RateLimiterConfig of this RateLimiter. + * + * @return the RateLimiterConfig of this RateLimiter + */ + RateLimiterConfig getRateLimiterConfig(); + + /** + * Get the Metrics of this RateLimiter. + * + * @return the Metrics of this RateLimiter + */ + Metrics getMetrics(); + + interface Metrics { + /** + * Returns an estimate of the number of threads waiting for permission + * in this JVM process. + * + * @return estimate of the number of threads waiting for permission. + */ + int getNumberOfWaitingThreads(); + } + + /** + * Creates a supplier which is restricted by a RateLimiter. + * + * @param rateLimiter the RateLimiter + * @param supplier the original supplier + * @return a supplier which is restricted by a RateLimiter. + */ + static Try.CheckedSupplier decorateCheckedSupplier(RateLimiter rateLimiter, Try.CheckedSupplier supplier) { + Try.CheckedSupplier decoratedSupplier = () -> { + waitForPermission(rateLimiter); + T result = supplier.get(); + return result; + }; + return decoratedSupplier; + } + + /** + * Creates a runnable which is restricted by a RateLimiter. + * + * @param rateLimiter the RateLimiter + * @param runnable the original runnable + * @return a runnable which is restricted by a RateLimiter. + */ + static Try.CheckedRunnable decorateCheckedRunnable(RateLimiter rateLimiter, Try.CheckedRunnable runnable) { + + Try.CheckedRunnable decoratedRunnable = () -> { + waitForPermission(rateLimiter); + runnable.run(); + }; + return decoratedRunnable; + } + + /** + * Creates a function which is restricted by a RateLimiter. + * + * @param rateLimiter the RateLimiter + * @param function the original function + * @return a function which is restricted by a RateLimiter. + */ + static Try.CheckedFunction decorateCheckedFunction(RateLimiter rateLimiter, Try.CheckedFunction function) { + Try.CheckedFunction decoratedFunction = (T t) -> { + waitForPermission(rateLimiter); + R result = function.apply(t); + return result; + }; + return decoratedFunction; + } + + /** + * Creates a supplier which is restricted by a RateLimiter. + * + * @param rateLimiter the RateLimiter + * @param supplier the original supplier + * @return a supplier which is restricted by a RateLimiter. + */ + static Supplier decorateSupplier(RateLimiter rateLimiter, Supplier supplier) { + Supplier decoratedSupplier = () -> { + waitForPermission(rateLimiter); + T result = supplier.get(); + return result; + }; + return decoratedSupplier; + } + + /** + * Creates a consumer which is restricted by a RateLimiter. + * + * @param rateLimiter the RateLimiter + * @param consumer the original consumer + * @return a consumer which is restricted by a RateLimiter. + */ + static Consumer decorateConsumer(RateLimiter rateLimiter, Consumer consumer) { + Consumer decoratedConsumer = (T t) -> { + waitForPermission(rateLimiter); + consumer.accept(t); + }; + return decoratedConsumer; + } + + /** + * Creates a runnable which is restricted by a RateLimiter. + * + * @param rateLimiter the RateLimiter + * @param runnable the original runnable + * @return a runnable which is restricted by a RateLimiter. + */ + static Runnable decorateRunnable(RateLimiter rateLimiter, Runnable runnable) { + Runnable decoratedRunnable = () -> { + waitForPermission(rateLimiter); + runnable.run(); + }; + return decoratedRunnable; + } + + /** + * Creates a function which is restricted by a RateLimiter. + * + * @param rateLimiter the RateLimiter + * @param function the original function + * @return a function which is restricted by a RateLimiter. + */ + static Function decorateFunction(RateLimiter rateLimiter, Function function) { + Function decoratedFunction = (T t) -> { + waitForPermission(rateLimiter); + R result = function.apply(t); + return result; + }; + return decoratedFunction; + } + + /** + * Will wait for permission within default timeout duration. + * Throws {@link RequestNotPermitted} if waiting time elapsed before a permit was acquired. + * + * @param rateLimiter the RateLimiter to get permission from + */ + static void waitForPermission(final RateLimiter rateLimiter) { + RateLimiterConfig rateLimiterConfig = rateLimiter.getRateLimiterConfig(); + Duration timeoutDuration = rateLimiterConfig.getTimeoutDuration(); + boolean permission = rateLimiter.getPermission(timeoutDuration); + if (!permission) { + throw new RequestNotPermitted("Request not permitted for limiter: " + rateLimiter.getName()); + } + } +} diff --git a/src/main/java/javaslang/ratelimiter/RateLimiterConfig.java b/src/main/java/javaslang/ratelimiter/RateLimiterConfig.java new file mode 100644 index 0000000000..17e2ccd3f3 --- /dev/null +++ b/src/main/java/javaslang/ratelimiter/RateLimiterConfig.java @@ -0,0 +1,116 @@ +package javaslang.ratelimiter; + +import static java.util.Objects.requireNonNull; + +import java.time.Duration; + +public class RateLimiterConfig { + private static final String TIMEOUT_DURATION_MUST_NOT_BE_NULL = "TimeoutDuration must not be null"; + private static final String LIMIT_REFRESH_PERIOD_MUST_NOT_BE_NULL = "LimitRefreshPeriod must not be null"; + + private static final Duration ACCEPTABLE_REFRESH_PERIOD = Duration.ofNanos(500L); // TODO: use jmh to find real one + + private final Duration timeoutDuration; + private final Duration limitRefreshPeriod; + private final int limitForPeriod; + + private RateLimiterConfig(final Duration timeoutDuration, final Duration limitRefreshPeriod, final int limitForPeriod) { + this.timeoutDuration = checkTimeoutDuration(timeoutDuration); + this.limitRefreshPeriod = checkLimitRefreshPeriod(limitRefreshPeriod); + this.limitForPeriod = checkLimitForPeriod(limitForPeriod); + } + + public static Builder builder() { + return new Builder(); + } + + public Duration getTimeoutDuration() { + return timeoutDuration; + } + + public Duration getLimitRefreshPeriod() { + return limitRefreshPeriod; + } + + public int getLimitForPeriod() { + return limitForPeriod; + } + + public static class Builder { + + private Duration timeoutDuration; + private Duration limitRefreshPeriod; + private int limitForPeriod; + + /** + * Builds a RateLimiterConfig + * + * @return the RateLimiterConfig + */ + public RateLimiterConfig build() { + return new RateLimiterConfig( + timeoutDuration, + limitRefreshPeriod, + limitForPeriod + ); + } + + /** + * Configures the default wait for permission duration. + * + * @param timeoutDuration the default wait for permission duration + * @return the RateLimiterConfig.Builder + */ + public Builder timeoutDuration(final Duration timeoutDuration) { + this.timeoutDuration = checkTimeoutDuration(timeoutDuration); + return this; + } + + /** + * Configures the period of limit refresh. + * After each period rate limiter sets its permissions + * count to {@link RateLimiterConfig#limitForPeriod} value. + * + * @param limitRefreshPeriod the period of limit refresh + * @return the RateLimiterConfig.Builder + */ + public Builder limitRefreshPeriod(final Duration limitRefreshPeriod) { + this.limitRefreshPeriod = checkLimitRefreshPeriod(limitRefreshPeriod); + return this; + } + + /** + * Configures the permissions limit for refresh period. + * Count of permissions available during one rate limiter period + * specified by {@link RateLimiterConfig#limitRefreshPeriod} value. + * + * @param limitForPeriod the permissions limit for refresh period + * @return the RateLimiterConfig.Builder + */ + public Builder limitForPeriod(final int limitForPeriod) { + this.limitForPeriod = checkLimitForPeriod(limitForPeriod); + return this; + } + + } + + private static Duration checkTimeoutDuration(final Duration timeoutDuration) { + return requireNonNull(timeoutDuration, TIMEOUT_DURATION_MUST_NOT_BE_NULL); + } + + private static Duration checkLimitRefreshPeriod(Duration limitRefreshPeriod) { + requireNonNull(limitRefreshPeriod, LIMIT_REFRESH_PERIOD_MUST_NOT_BE_NULL); + boolean refreshPeriodIsTooShort = limitRefreshPeriod.compareTo(ACCEPTABLE_REFRESH_PERIOD) < 0; + if (refreshPeriodIsTooShort) { + throw new IllegalArgumentException("LimitRefreshPeriod is too short"); + } + return limitRefreshPeriod; + } + + private static int checkLimitForPeriod(final int limitForPeriod) { + if (limitForPeriod < 1) { + throw new IllegalArgumentException("LimitForPeriod should be greater than 0"); + } + return limitForPeriod; + } +} diff --git a/src/main/java/javaslang/ratelimiter/RateLimiterRegistry.java b/src/main/java/javaslang/ratelimiter/RateLimiterRegistry.java new file mode 100644 index 0000000000..111db8374e --- /dev/null +++ b/src/main/java/javaslang/ratelimiter/RateLimiterRegistry.java @@ -0,0 +1,41 @@ +package javaslang.ratelimiter; + +import javaslang.ratelimiter.internal.InMemoryRateLimiterRegistry; + +import java.util.function.Supplier; + +/** + * Manages all RateLimiter instances. + */ +public interface RateLimiterRegistry { + + /** + * Returns a managed {@link RateLimiter} or creates a new one with the default RateLimiter configuration. + * + * @param name the name of the RateLimiter + * @return The {@link RateLimiter} + */ + RateLimiter rateLimiter(String name); + + /** + * Returns a managed {@link RateLimiter} or creates a new one with a custom RateLimiter configuration. + * + * @param name the name of the RateLimiter + * @param rateLimiterConfig a custom RateLimiter configuration + * @return The {@link RateLimiter} + */ + RateLimiter rateLimiter(String name, RateLimiterConfig rateLimiterConfig); + + /** + * Returns a managed {@link RateLimiterConfig} or creates a new one with a custom RateLimiterConfig configuration. + * + * @param name the name of the RateLimiterConfig + * @param rateLimiterConfigSupplier a supplier of a custom RateLimiterConfig configuration + * @return The {@link RateLimiterConfig} + */ + RateLimiter rateLimiter(String name, Supplier rateLimiterConfigSupplier); + + static RateLimiterRegistry of(RateLimiterConfig defaultRateLimiterConfig) { + return new InMemoryRateLimiterRegistry(defaultRateLimiterConfig); + } +} diff --git a/src/main/java/javaslang/ratelimiter/RequestNotPermitted.java b/src/main/java/javaslang/ratelimiter/RequestNotPermitted.java new file mode 100644 index 0000000000..3c862d80c2 --- /dev/null +++ b/src/main/java/javaslang/ratelimiter/RequestNotPermitted.java @@ -0,0 +1,17 @@ +package javaslang.ratelimiter; + +/** + * Exception that indicates that current thread was not able to acquire permission + * from {@link RateLimiter}. + */ +public class RequestNotPermitted extends RuntimeException { + + /** + * The constructor with a message. + * + * @param message The message. + */ + public RequestNotPermitted(final String message) { + super(message); + } +} \ No newline at end of file diff --git a/src/main/java/javaslang/ratelimiter/internal/InMemoryRateLimiterRegistry.java b/src/main/java/javaslang/ratelimiter/internal/InMemoryRateLimiterRegistry.java new file mode 100644 index 0000000000..2a74e591af --- /dev/null +++ b/src/main/java/javaslang/ratelimiter/internal/InMemoryRateLimiterRegistry.java @@ -0,0 +1,71 @@ +package javaslang.ratelimiter.internal; + +import static java.util.Objects.requireNonNull; + +import javaslang.ratelimiter.RateLimiter; +import javaslang.ratelimiter.RateLimiterConfig; +import javaslang.ratelimiter.RateLimiterRegistry; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +/** + * Backend RateLimiter manager. + * Constructs backend RateLimiters according to configuration values. + */ +public class InMemoryRateLimiterRegistry implements RateLimiterRegistry { + + private static final String NAME_MUST_NOT_BE_NULL = "Name must not be null"; + private static final String CONFIG_MUST_NOT_BE_NULL = "Config must not be null"; + private static final String SUPPLIER_MUST_NOT_BE_NULL = "Supplier must not be null"; + + private final RateLimiterConfig defaultRateLimiterConfig; + /** + * The RateLimiters, indexed by name of the backend. + */ + private final Map rateLimiters; + + public InMemoryRateLimiterRegistry(final RateLimiterConfig defaultRateLimiterConfig) { + this.defaultRateLimiterConfig = requireNonNull(defaultRateLimiterConfig, CONFIG_MUST_NOT_BE_NULL); + rateLimiters = new ConcurrentHashMap<>(); + } + + /** + * {@inheritDoc} + */ + @Override + public RateLimiter rateLimiter(final String name) { + return rateLimiter(name, defaultRateLimiterConfig); + } + + /** + * {@inheritDoc} + */ + @Override + public RateLimiter rateLimiter(final String name, final RateLimiterConfig rateLimiterConfig) { + requireNonNull(name, NAME_MUST_NOT_BE_NULL); + requireNonNull(rateLimiterConfig, CONFIG_MUST_NOT_BE_NULL); + return rateLimiters.computeIfAbsent( + name, + limitName -> new SemaphoreBasedRateLimiterImpl(name, rateLimiterConfig) + ); + } + + /** + * {@inheritDoc} + */ + @Override + public RateLimiter rateLimiter(final String name, final Supplier rateLimiterConfigSupplier) { + requireNonNull(name, NAME_MUST_NOT_BE_NULL); + requireNonNull(rateLimiterConfigSupplier, SUPPLIER_MUST_NOT_BE_NULL); + return rateLimiters.computeIfAbsent( + name, + limitName -> { + RateLimiterConfig rateLimiterConfig = rateLimiterConfigSupplier.get(); + requireNonNull(rateLimiterConfig, CONFIG_MUST_NOT_BE_NULL); + return new SemaphoreBasedRateLimiterImpl(limitName, rateLimiterConfig); + } + ); + } +} diff --git a/src/main/java/javaslang/ratelimiter/internal/SemaphoreBasedRateLimiterImpl.java b/src/main/java/javaslang/ratelimiter/internal/SemaphoreBasedRateLimiterImpl.java new file mode 100644 index 0000000000..832c9af529 --- /dev/null +++ b/src/main/java/javaslang/ratelimiter/internal/SemaphoreBasedRateLimiterImpl.java @@ -0,0 +1,157 @@ +package javaslang.ratelimiter.internal; + +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; + +import javaslang.control.Option; +import javaslang.ratelimiter.RateLimiter; +import javaslang.ratelimiter.RateLimiterConfig; + +import java.time.Duration; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** + * A RateLimiter implementation that consists of {@link Semaphore} + * and scheduler that will refresh permissions + * after each {@link RateLimiterConfig#limitRefreshPeriod}. + */ +public class SemaphoreBasedRateLimiterImpl implements RateLimiter { + + private static final String NAME_MUST_NOT_BE_NULL = "Name must not be null"; + private static final String CONFIG_MUST_NOT_BE_NULL = "RateLimiterConfig must not be null"; + + private final String name; + private final RateLimiterConfig rateLimiterConfig; + private final ScheduledExecutorService scheduler; + private final Semaphore semaphore; + private final SemaphoreBasedRateLimiterMetrics metrics; + + /** + * Creates a RateLimiter. + * + * @param name the name of the RateLimiter + * @param rateLimiterConfig The RateLimiter configuration. + */ + public SemaphoreBasedRateLimiterImpl(final String name, final RateLimiterConfig rateLimiterConfig) { + this(name, rateLimiterConfig, null); + } + + /** + * Creates a RateLimiter. + * + * @param name the name of the RateLimiter + * @param rateLimiterConfig The RateLimiter configuration. + * @param scheduler executor that will refresh permissions + */ + public SemaphoreBasedRateLimiterImpl(String name, RateLimiterConfig rateLimiterConfig, + ScheduledExecutorService scheduler) { + this.name = requireNonNull(name, NAME_MUST_NOT_BE_NULL); + this.rateLimiterConfig = requireNonNull(rateLimiterConfig, CONFIG_MUST_NOT_BE_NULL); + + this.scheduler = Option.of(scheduler).getOrElse(this::configureScheduler); + this.semaphore = new Semaphore(this.rateLimiterConfig.getLimitForPeriod(), true); + this.metrics = this.new SemaphoreBasedRateLimiterMetrics(); + + scheduleLimitRefresh(); + } + + private ScheduledExecutorService configureScheduler() { + ThreadFactory threadFactory = target -> { + Thread thread = new Thread(target, "SchedulerForSemaphoreBasedRateLimiterImpl-" + name); + thread.setDaemon(true); + return thread; + }; + return newSingleThreadScheduledExecutor(threadFactory); + } + + private void scheduleLimitRefresh() { + scheduler.scheduleAtFixedRate( + this::refreshLimit, + this.rateLimiterConfig.getLimitRefreshPeriod().toNanos(), + this.rateLimiterConfig.getLimitRefreshPeriod().toNanos(), + TimeUnit.NANOSECONDS + ); + } + + void refreshLimit() { + semaphore.release(this.rateLimiterConfig.getLimitForPeriod()); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean getPermission(final Duration timeoutDuration) { + try { + boolean success = semaphore.tryAcquire(timeoutDuration.toNanos(), TimeUnit.NANOSECONDS); + return success; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } + + /** + * {@inheritDoc} + */ + @Override + public String getName() { + return this.name; + } + + /** + * {@inheritDoc} + */ + @Override + public Metrics getMetrics() { + return this.metrics; + } + + /** + * {@inheritDoc} + */ + @Override + public RateLimiterConfig getRateLimiterConfig() { + return this.rateLimiterConfig; + } + + /** + * Get the enhanced Metrics with some implementation specific details. + * + * @return the detailed metrics + */ + public SemaphoreBasedRateLimiterMetrics getDetailedMetrics() { + return this.metrics; + } + + /** + * Enhanced {@link Metrics} with some implementation specific details + */ + public final class SemaphoreBasedRateLimiterMetrics implements Metrics { + private SemaphoreBasedRateLimiterMetrics() { + } + + /** + * Returns the current number of permits available in this request limit + * until the next refresh. + *

+ *

This method is typically used for debugging and testing purposes. + * + * @return the number of permits available in this rate limiter until the next refresh. + */ + public int getAvailablePermits() { + return semaphore.availablePermits(); + } + + /** + * {@inheritDoc} + */ + @Override + public int getNumberOfWaitingThreads() { + return semaphore.getQueueLength(); + } + } +} diff --git a/src/test/java/javaslang/ratelimiter/RateLimiterConfigTest.java b/src/test/java/javaslang/ratelimiter/RateLimiterConfigTest.java new file mode 100644 index 0000000000..d0b0532855 --- /dev/null +++ b/src/test/java/javaslang/ratelimiter/RateLimiterConfigTest.java @@ -0,0 +1,91 @@ +package javaslang.ratelimiter; + +import static org.assertj.core.api.BDDAssertions.then; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.time.Duration; + + +public class RateLimiterConfigTest { + + private static final int LIMIT = 50; + private static final Duration TIMEOUT = Duration.ofSeconds(5); + private static final Duration REFRESH_PERIOD = Duration.ofNanos(500); + private static final String TIMEOUT_DURATION_MUST_NOT_BE_NULL = "TimeoutDuration must not be null"; + private static final String REFRESH_PERIOD_MUST_NOT_BE_NULL = "RefreshPeriod must not be null"; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + + @Test + public void builderPositive() throws Exception { + RateLimiterConfig config = RateLimiterConfig.builder() + .timeoutDuration(TIMEOUT) + .limitRefreshPeriod(REFRESH_PERIOD) + .limitForPeriod(LIMIT) + .build(); + + then(config.getLimitForPeriod()).isEqualTo(LIMIT); + then(config.getLimitRefreshPeriod()).isEqualTo(REFRESH_PERIOD); + then(config.getTimeoutDuration()).isEqualTo(TIMEOUT); + } + + @Test + public void builderTimeoutIsNull() throws Exception { + exception.expect(NullPointerException.class); + exception.expectMessage(TIMEOUT_DURATION_MUST_NOT_BE_NULL); + RateLimiterConfig.builder() + .timeoutDuration(null); + } + + @Test + public void builderTimeoutEmpty() throws Exception { + exception.expect(NullPointerException.class); + exception.expectMessage(TIMEOUT_DURATION_MUST_NOT_BE_NULL); + RateLimiterConfig.builder() + .limitRefreshPeriod(REFRESH_PERIOD) + .limitForPeriod(LIMIT) + .build(); + } + + @Test + public void builderRefreshPeriodIsNull() throws Exception { + exception.expect(NullPointerException.class); + exception.expectMessage(REFRESH_PERIOD_MUST_NOT_BE_NULL); + RateLimiterConfig.builder() + .limitRefreshPeriod(null); + } + + @Test + public void builderRefreshPeriodEmpty() throws Exception { + exception.expect(NullPointerException.class); + exception.expectMessage(REFRESH_PERIOD_MUST_NOT_BE_NULL); + RateLimiterConfig.builder() + .timeoutDuration(TIMEOUT) + .limitForPeriod(LIMIT) + .build(); + } + + @Test + public void builderRefreshPeriodTooShort() throws Exception { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("RefreshPeriod is too short"); + RateLimiterConfig.builder() + .timeoutDuration(TIMEOUT) + .limitRefreshPeriod(Duration.ofNanos(499L)) + .limitForPeriod(LIMIT) + .build(); + } + + @Test + public void builderLimitIsLessThanOne() throws Exception { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("LimitForPeriod should be greater than 0"); + RateLimiterConfig.builder() + .limitForPeriod(0); + } +} diff --git a/src/test/java/javaslang/ratelimiter/RateLimiterTest.java b/src/test/java/javaslang/ratelimiter/RateLimiterTest.java new file mode 100644 index 0000000000..b38f52da36 --- /dev/null +++ b/src/test/java/javaslang/ratelimiter/RateLimiterTest.java @@ -0,0 +1,200 @@ +package javaslang.ratelimiter; + +import static org.assertj.core.api.BDDAssertions.then; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import javaslang.control.Try; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + + +@SuppressWarnings("unchecked") +public class RateLimiterTest { + + private static final int LIMIT = 50; + private static final Duration TIMEOUT = Duration.ofSeconds(5); + private static final Duration REFRESH_PERIOD = Duration.ofNanos(500); + + private RateLimiterConfig config; + private RateLimiter limit; + + @Before + public void init() { + config = RateLimiterConfig.builder() + .timeoutDuration(TIMEOUT) + .limitRefreshPeriod(REFRESH_PERIOD) + .limitForPeriod(LIMIT) + .build(); + limit = mock(RateLimiter.class); + when(limit.getRateLimiterConfig()) + .thenReturn(config); + } + + @Test + public void decorateCheckedSupplier() throws Throwable { + Try.CheckedSupplier supplier = mock(Try.CheckedSupplier.class); + Try.CheckedSupplier decorated = RateLimiter.decorateCheckedSupplier(limit, supplier); + + when(limit.getPermission(config.getTimeoutDuration())) + .thenReturn(false); + + Try decoratedSupplierResult = Try.of(decorated); + then(decoratedSupplierResult.isFailure()).isTrue(); + then(decoratedSupplierResult.getCause()).isInstanceOf(RequestNotPermitted.class); + verify(supplier, never()).get(); + + when(limit.getPermission(config.getTimeoutDuration())) + .thenReturn(true); + Try secondSupplierResult = Try.of(decorated); + then(secondSupplierResult.isSuccess()).isTrue(); + verify(supplier, times(1)).get(); + } + + @Test + public void decorateCheckedRunnable() throws Throwable { + Try.CheckedRunnable runnable = mock(Try.CheckedRunnable.class); + Try.CheckedRunnable decorated = RateLimiter.decorateCheckedRunnable(limit, runnable); + + when(limit.getPermission(config.getTimeoutDuration())) + .thenReturn(false); + + Try decoratedRunnableResult = Try.run(decorated); + then(decoratedRunnableResult.isFailure()).isTrue(); + then(decoratedRunnableResult.getCause()).isInstanceOf(RequestNotPermitted.class); + verify(runnable, never()).run(); + + when(limit.getPermission(config.getTimeoutDuration())) + .thenReturn(true); + Try secondRunnableResult = Try.run(decorated); + then(secondRunnableResult.isSuccess()).isTrue(); + verify(runnable, times(1)).run(); + } + + @Test + public void decorateCheckedFunction() throws Throwable { + Try.CheckedFunction function = mock(Try.CheckedFunction.class); + Try.CheckedFunction decorated = RateLimiter.decorateCheckedFunction(limit, function); + + when(limit.getPermission(config.getTimeoutDuration())) + .thenReturn(false); + + Try decoratedFunctionResult = Try.success(1).mapTry(decorated); + then(decoratedFunctionResult.isFailure()).isTrue(); + then(decoratedFunctionResult.getCause()).isInstanceOf(RequestNotPermitted.class); + verify(function, never()).apply(any()); + + when(limit.getPermission(config.getTimeoutDuration())) + .thenReturn(true); + Try secondFunctionResult = Try.success(1).mapTry(decorated); + then(secondFunctionResult.isSuccess()).isTrue(); + verify(function, times(1)).apply(1); + } + + @Test + public void decorateSupplier() throws Exception { + Supplier supplier = mock(Supplier.class); + Supplier decorated = RateLimiter.decorateSupplier(limit, supplier); + + when(limit.getPermission(config.getTimeoutDuration())) + .thenReturn(false); + + Try decoratedSupplierResult = Try.success(decorated).map(Supplier::get); + then(decoratedSupplierResult.isFailure()).isTrue(); + then(decoratedSupplierResult.getCause()).isInstanceOf(RequestNotPermitted.class); + verify(supplier, never()).get(); + + when(limit.getPermission(config.getTimeoutDuration())) + .thenReturn(true); + Try secondSupplierResult = Try.success(decorated).map(Supplier::get); + then(secondSupplierResult.isSuccess()).isTrue(); + verify(supplier, times(1)).get(); + } + + @Test + public void decorateConsumer() throws Exception { + Consumer consumer = mock(Consumer.class); + Consumer decorated = RateLimiter.decorateConsumer(limit, consumer); + + when(limit.getPermission(config.getTimeoutDuration())) + .thenReturn(false); + + Try decoratedConsumerResult = Try.success(1).andThen(decorated); + then(decoratedConsumerResult.isFailure()).isTrue(); + then(decoratedConsumerResult.getCause()).isInstanceOf(RequestNotPermitted.class); + verify(consumer, never()).accept(any()); + + when(limit.getPermission(config.getTimeoutDuration())) + .thenReturn(true); + Try secondConsumerResult = Try.success(1).andThen(decorated); + then(secondConsumerResult.isSuccess()).isTrue(); + verify(consumer, times(1)).accept(1); + } + + @Test + public void decorateRunnable() throws Exception { + Runnable runnable = mock(Runnable.class); + Runnable decorated = RateLimiter.decorateRunnable(limit, runnable); + + when(limit.getPermission(config.getTimeoutDuration())) + .thenReturn(false); + + Try decoratedRunnableResult = Try.success(decorated).andThen(Runnable::run); + then(decoratedRunnableResult.isFailure()).isTrue(); + then(decoratedRunnableResult.getCause()).isInstanceOf(RequestNotPermitted.class); + verify(runnable, never()).run(); + + when(limit.getPermission(config.getTimeoutDuration())) + .thenReturn(true); + Try secondRunnableResult = Try.success(decorated).andThen(Runnable::run); + then(secondRunnableResult.isSuccess()).isTrue(); + verify(runnable, times(1)).run(); + } + + @Test + public void decorateFunction() throws Exception { + Function function = mock(Function.class); + Function decorated = RateLimiter.decorateFunction(limit, function); + + when(limit.getPermission(config.getTimeoutDuration())) + .thenReturn(false); + + Try decoratedFunctionResult = Try.success(1).map(decorated); + then(decoratedFunctionResult.isFailure()).isTrue(); + then(decoratedFunctionResult.getCause()).isInstanceOf(RequestNotPermitted.class); + verify(function, never()).apply(any()); + + when(limit.getPermission(config.getTimeoutDuration())) + .thenReturn(true); + Try secondFunctionResult = Try.success(1).map(decorated); + then(secondFunctionResult.isSuccess()).isTrue(); + verify(function, times(1)).apply(1); + } + + @Test + public void waitForPermissionWithOne() throws Exception { + when(limit.getPermission(config.getTimeoutDuration())) + .thenReturn(true); + RateLimiter.waitForPermission(limit); + verify(limit, times(1)) + .getPermission(config.getTimeoutDuration()); + } + + @Test(expected = RequestNotPermitted.class) + public void waitForPermissionWithoutOne() throws Exception { + when(limit.getPermission(config.getTimeoutDuration())) + .thenReturn(false); + RateLimiter.waitForPermission(limit); + verify(limit, times(1)) + .getPermission(config.getTimeoutDuration()); + } +} \ No newline at end of file diff --git a/src/test/java/javaslang/ratelimiter/internal/InMemoryRateLimiterRegistryTest.java b/src/test/java/javaslang/ratelimiter/internal/InMemoryRateLimiterRegistryTest.java new file mode 100644 index 0000000000..ba8ca819a9 --- /dev/null +++ b/src/test/java/javaslang/ratelimiter/internal/InMemoryRateLimiterRegistryTest.java @@ -0,0 +1,118 @@ +package javaslang.ratelimiter.internal; + +import static org.assertj.core.api.BDDAssertions.then; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import javaslang.ratelimiter.RateLimiter; +import javaslang.ratelimiter.RateLimiterConfig; +import javaslang.ratelimiter.RateLimiterRegistry; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.time.Duration; +import java.util.function.Supplier; + + +public class InMemoryRateLimiterRegistryTest { + + private static final int LIMIT = 50; + private static final Duration TIMEOUT = Duration.ofSeconds(5); + private static final Duration REFRESH_PERIOD = Duration.ofNanos(500); + private static final String CONFIG_MUST_NOT_BE_NULL = "Config must not be null"; + private static final String NAME_MUST_NOT_BE_NULL = "Name must not be null"; + @Rule + public ExpectedException exception = ExpectedException.none(); + private RateLimiterConfig config; + + @Before + public void init() { + config = RateLimiterConfig.builder() + .timeoutDuration(TIMEOUT) + .limitRefreshPeriod(REFRESH_PERIOD) + .limitForPeriod(LIMIT) + .build(); + } + + @Test + public void rateLimiterPositive() throws Exception { + RateLimiterRegistry registry = RateLimiterRegistry.of(config); + RateLimiter firstRateLimiter = registry.rateLimiter("test"); + RateLimiter anotherLimit = registry.rateLimiter("test1"); + RateLimiter sameAsFirst = registry.rateLimiter("test"); + + then(firstRateLimiter).isEqualTo(sameAsFirst); + then(firstRateLimiter).isNotEqualTo(anotherLimit); + } + + @Test + public void rateLimiterPositiveWithSupplier() throws Exception { + RateLimiterRegistry registry = new InMemoryRateLimiterRegistry(config); + Supplier rateLimiterConfigSupplier = mock(Supplier.class); + when(rateLimiterConfigSupplier.get()) + .thenReturn(config); + + RateLimiter firstRateLimiter = registry.rateLimiter("test", rateLimiterConfigSupplier); + verify(rateLimiterConfigSupplier, times(1)).get(); + RateLimiter sameAsFirst = registry.rateLimiter("test", rateLimiterConfigSupplier); + verify(rateLimiterConfigSupplier, times(1)).get(); + RateLimiter anotherLimit = registry.rateLimiter("test1", rateLimiterConfigSupplier); + verify(rateLimiterConfigSupplier, times(2)).get(); + + then(firstRateLimiter).isEqualTo(sameAsFirst); + then(firstRateLimiter).isNotEqualTo(anotherLimit); + } + + @Test + public void rateLimiterConfigIsNull() throws Exception { + exception.expect(NullPointerException.class); + exception.expectMessage(CONFIG_MUST_NOT_BE_NULL); + new InMemoryRateLimiterRegistry(null); + } + + @Test + public void rateLimiterNewWithNullName() throws Exception { + exception.expect(NullPointerException.class); + exception.expectMessage(NAME_MUST_NOT_BE_NULL); + RateLimiterRegistry registry = new InMemoryRateLimiterRegistry(config); + registry.rateLimiter(null); + } + + @Test + public void rateLimiterNewWithNullNonDefaultConfig() throws Exception { + exception.expect(NullPointerException.class); + exception.expectMessage(CONFIG_MUST_NOT_BE_NULL); + RateLimiterRegistry registry = new InMemoryRateLimiterRegistry(config); + RateLimiterConfig rateLimiterConfig = null; + registry.rateLimiter("name", rateLimiterConfig); + } + + @Test + public void rateLimiterNewWithNullNameAndNonDefaultConfig() throws Exception { + exception.expect(NullPointerException.class); + exception.expectMessage(NAME_MUST_NOT_BE_NULL); + RateLimiterRegistry registry = new InMemoryRateLimiterRegistry(config); + registry.rateLimiter(null, config); + } + + @Test + public void rateLimiterNewWithNullNameAndConfigSupplier() throws Exception { + exception.expect(NullPointerException.class); + exception.expectMessage(NAME_MUST_NOT_BE_NULL); + RateLimiterRegistry registry = new InMemoryRateLimiterRegistry(config); + registry.rateLimiter(null, () -> config); + } + + @Test + public void rateLimiterNewWithNullConfigSupplier() throws Exception { + exception.expect(NullPointerException.class); + exception.expectMessage("Supplier must not be null"); + RateLimiterRegistry registry = new InMemoryRateLimiterRegistry(config); + Supplier rateLimiterConfigSupplier = null; + registry.rateLimiter("name", rateLimiterConfigSupplier); + } +} \ No newline at end of file diff --git a/src/test/java/javaslang/ratelimiter/internal/SemaphoreBasedRateLimiterImplTest.java b/src/test/java/javaslang/ratelimiter/internal/SemaphoreBasedRateLimiterImplTest.java new file mode 100644 index 0000000000..24da434548 --- /dev/null +++ b/src/test/java/javaslang/ratelimiter/internal/SemaphoreBasedRateLimiterImplTest.java @@ -0,0 +1,221 @@ +package javaslang.ratelimiter.internal; + +import static com.jayway.awaitility.Awaitility.await; +import static com.jayway.awaitility.Duration.FIVE_HUNDRED_MILLISECONDS; +import static java.lang.Thread.State.RUNNABLE; +import static java.lang.Thread.State.TERMINATED; +import static java.lang.Thread.State.TIMED_WAITING; +import static java.time.Duration.ZERO; +import static javaslang.control.Try.run; +import static org.assertj.core.api.BDDAssertions.then; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.jayway.awaitility.core.ConditionFactory; +import javaslang.ratelimiter.RateLimiter; +import javaslang.ratelimiter.RateLimiterConfig; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.ArgumentCaptor; + +import java.time.Duration; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + + +public class SemaphoreBasedRateLimiterImplTest { + + private static final int LIMIT = 2; + private static final Duration TIMEOUT = Duration.ofSeconds(5); + private static final Duration REFRESH_PERIOD = Duration.ofMillis(100); + private static final String CONFIG_MUST_NOT_BE_NULL = "RateLimiterConfig must not be null"; + private static final String NAME_MUST_NOT_BE_NULL = "Name must not be null"; + private static final Object O = new Object(); + @Rule + public ExpectedException exception = ExpectedException.none(); + private RateLimiterConfig config; + + private static ConditionFactory awaitImpatiently() { + return await() + .pollDelay(1, TimeUnit.MICROSECONDS) + .pollInterval(2, TimeUnit.MILLISECONDS); + } + + @Before + public void init() { + config = RateLimiterConfig.builder() + .timeoutDuration(TIMEOUT) + .limitRefreshPeriod(REFRESH_PERIOD) + .limitForPeriod(LIMIT) + .build(); + } + + @Test + public void rateLimiterCreationWithProvidedScheduler() throws Exception { + ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class); + RateLimiterConfig configSpy = spy(config); + SemaphoreBasedRateLimiterImpl limit = new SemaphoreBasedRateLimiterImpl("test", configSpy, scheduledExecutorService); + + ArgumentCaptor refreshLimitRunnableCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(scheduledExecutorService) + .scheduleAtFixedRate( + refreshLimitRunnableCaptor.capture(), + eq(config.getLimitRefreshPeriod().toNanos()), + eq(config.getLimitRefreshPeriod().toNanos()), + eq(TimeUnit.NANOSECONDS) + ); + + Runnable refreshLimitRunnable = refreshLimitRunnableCaptor.getValue(); + + then(limit.getPermission(ZERO)).isTrue(); + then(limit.getPermission(ZERO)).isTrue(); + then(limit.getPermission(ZERO)).isFalse(); + + Thread.sleep(REFRESH_PERIOD.toMillis() * 2); + verify(configSpy, times(1)).getLimitForPeriod(); + + refreshLimitRunnable.run(); + + verify(configSpy, times(2)).getLimitForPeriod(); + + then(limit.getPermission(ZERO)).isTrue(); + then(limit.getPermission(ZERO)).isTrue(); + then(limit.getPermission(ZERO)).isFalse(); + } + + @Test + public void rateLimiterCreationWithDefaultScheduler() throws Exception { + SemaphoreBasedRateLimiterImpl limit = new SemaphoreBasedRateLimiterImpl("test", config); + awaitImpatiently().atMost(FIVE_HUNDRED_MILLISECONDS) + .until(() -> limit.getPermission(ZERO), equalTo(false)); + awaitImpatiently().atMost(110, TimeUnit.MILLISECONDS) + .until(() -> limit.getPermission(ZERO), equalTo(true)); + } + + @Test + public void getPermissionAndMetrics() throws Exception { + ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class); + RateLimiterConfig configSpy = spy(config); + SemaphoreBasedRateLimiterImpl limit = new SemaphoreBasedRateLimiterImpl("test", configSpy, scheduledExecutorService); + SemaphoreBasedRateLimiterImpl.SemaphoreBasedRateLimiterMetrics detailedMetrics = limit.getDetailedMetrics(); + + SynchronousQueue synchronousQueue = new SynchronousQueue(); + Thread thread = new Thread(() -> { + run(() -> { + for (int i = 0; i < LIMIT; i++) { + System.out.println("SLAVE -> WAITING FOR MASTER"); + synchronousQueue.put(O); + System.out.println("SLAVE -> HAVE COMMAND FROM MASTER"); + limit.getPermission(TIMEOUT); + System.out.println("SLAVE -> ACQUIRED PERMISSION"); + } + System.out.println("SLAVE -> LAST PERMISSION ACQUIRE"); + limit.getPermission(TIMEOUT); + System.out.println("SLAVE -> I'M DONE"); + }); + }); + thread.setDaemon(true); + thread.start(); + + for (int i = 0; i < LIMIT; i++) { + System.out.println("MASTER -> TAKE PERMISSION"); + synchronousQueue.take(); + } + + System.out.println("MASTER -> CHECK IF SLAVE IS WAITING FOR PERMISSION"); + awaitImpatiently() + .atMost(100, TimeUnit.MILLISECONDS).until(detailedMetrics::getAvailablePermits, equalTo(0)); + System.out.println("MASTER -> SLAVE CONSUMED ALL PERMISSIONS"); + awaitImpatiently() + .atMost(2, TimeUnit.SECONDS).until(thread::getState, equalTo(TIMED_WAITING)); + then(detailedMetrics.getAvailablePermits()).isEqualTo(0); + System.out.println("MASTER -> SLAVE WAS WAITING"); + + limit.refreshLimit(); + awaitImpatiently() + .atMost(100, TimeUnit.MILLISECONDS).until(detailedMetrics::getAvailablePermits, equalTo(1)); + awaitImpatiently() + .atMost(2, TimeUnit.SECONDS).until(thread::getState, equalTo(TERMINATED)); + then(detailedMetrics.getAvailablePermits()).isEqualTo(1); + } + + @Test + public void getPermissionInterruption() throws Exception { + ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class); + RateLimiterConfig configSpy = spy(config); + SemaphoreBasedRateLimiterImpl limit = new SemaphoreBasedRateLimiterImpl("test", configSpy, scheduledExecutorService); + limit.getPermission(ZERO); + limit.getPermission(ZERO); + + Thread thread = new Thread(() -> { + limit.getPermission(TIMEOUT); + while (true) { + Function.identity().apply(1); + } + }); + thread.setDaemon(true); + thread.start(); + + awaitImpatiently() + .atMost(2, TimeUnit.SECONDS).until(thread::getState, equalTo(TIMED_WAITING)); + + thread.interrupt(); + + awaitImpatiently() + .atMost(2, TimeUnit.SECONDS).until(thread::getState, equalTo(RUNNABLE)); + then(thread.isInterrupted()).isTrue(); + } + + @Test + public void getName() throws Exception { + ScheduledExecutorService scheduler = mock(ScheduledExecutorService.class); + SemaphoreBasedRateLimiterImpl limit = new SemaphoreBasedRateLimiterImpl("test", config, scheduler); + then(limit.getName()).isEqualTo("test"); + } + + @Test + public void getMetrics() throws Exception { + ScheduledExecutorService scheduler = mock(ScheduledExecutorService.class); + SemaphoreBasedRateLimiterImpl limit = new SemaphoreBasedRateLimiterImpl("test", config, scheduler); + RateLimiter.Metrics metrics = limit.getMetrics(); + then(metrics.getNumberOfWaitingThreads()).isEqualTo(0); + } + + @Test + public void getRateLimiterConfig() throws Exception { + ScheduledExecutorService scheduler = mock(ScheduledExecutorService.class); + SemaphoreBasedRateLimiterImpl limit = new SemaphoreBasedRateLimiterImpl("test", config, scheduler); + then(limit.getRateLimiterConfig()).isEqualTo(config); + } + + @Test + public void getDetailedMetrics() throws Exception { + ScheduledExecutorService scheduler = mock(ScheduledExecutorService.class); + SemaphoreBasedRateLimiterImpl limit = new SemaphoreBasedRateLimiterImpl("test", config, scheduler); + SemaphoreBasedRateLimiterImpl.SemaphoreBasedRateLimiterMetrics metrics = limit.getDetailedMetrics(); + then(metrics.getNumberOfWaitingThreads()).isEqualTo(0); + then(metrics.getAvailablePermits()).isEqualTo(2); + } + + @Test + public void constructionWithNullName() throws Exception { + exception.expect(NullPointerException.class); + exception.expectMessage(NAME_MUST_NOT_BE_NULL); + new SemaphoreBasedRateLimiterImpl(null, config, null); + } + + @Test + public void constructionWithNullConfig() throws Exception { + exception.expect(NullPointerException.class); + exception.expectMessage(CONFIG_MUST_NOT_BE_NULL); + new SemaphoreBasedRateLimiterImpl("test", null, null); + } +} \ No newline at end of file