Skip to content

Commit

Permalink
Issue ReactiveX#12 Initial RateLimiter implementation and JavaDocs
Browse files Browse the repository at this point in the history
  • Loading branch information
storozhukBM committed Dec 2, 2016
1 parent 6edf382 commit 5a30b38
Show file tree
Hide file tree
Showing 11 changed files with 1,221 additions and 1 deletion.
6 changes: 5 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,16 @@ dependencies {
// compile 'org.reactivestreams:reactive-streams:1.0.0'
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile "org.slf4j:slf4j-api:1.7.13"

testCompile "io.dropwizard.metrics:metrics-core:3.1.2"
testCompile "junit:junit:4.11"
testCompile "org.assertj:assertj-core:3.0.0"
testCompile "ch.qos.logback:logback-classic:0.9.26"
testCompile "io.dropwizard.metrics:metrics-healthchecks:3.1.2"
testCompile "org.mockito:mockito-all:1.10.19"
testCompile "org.mockito:mockito-core:1.10.19"
testCompile "io.projectreactor:reactor-core:2.5.0.M2"
testCompile "com.jayway.awaitility:awaitility:1.7.0"

jmh "ch.qos.logback:logback-classic:0.9.26"
}

Expand Down
184 changes: 184 additions & 0 deletions src/main/java/javaslang/ratelimiter/RateLimiter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package javaslang.ratelimiter;

import javaslang.control.Try;

import java.time.Duration;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* A RateLimiter distributes permits at a configurable rate. {@link #getPermission} blocks if necessary
* until a permit is available, and then takes it. Once acquired, permits need not be released.
*/
public interface RateLimiter {

/**
* Acquires a permission from this rate limiter, blocking until one is
* available.
* <p>
* <p>If the current thread is {@linkplain Thread#interrupt interrupted}
* while waiting for a permit then it won't throw {@linkplain InterruptedException},
* but its interrupt status will be set.
*
* @return {@code true} if a permit was acquired and {@code false}
* if waiting time elapsed before a permit was acquired
*/
boolean getPermission(Duration timeoutDuration);

/**
* Get the name of this RateLimiter
*
* @return the name of this RateLimiter
*/
String getName();

/**
* Get the RateLimiterConfig of this RateLimiter.
*
* @return the RateLimiterConfig of this RateLimiter
*/
RateLimiterConfig getRateLimiterConfig();

/**
* Get the Metrics of this RateLimiter.
*
* @return the Metrics of this RateLimiter
*/
Metrics getMetrics();

interface Metrics {
/**
* Returns an estimate of the number of threads waiting for permission
* in this JVM process.
*
* @return estimate of the number of threads waiting for permission.
*/
int getNumberOfWaitingThreads();
}

/**
* Creates a supplier which is restricted by a RateLimiter.
*
* @param rateLimiter the RateLimiter
* @param supplier the original supplier
* @return a supplier which is restricted by a RateLimiter.
*/
static <T> Try.CheckedSupplier<T> decorateCheckedSupplier(RateLimiter rateLimiter, Try.CheckedSupplier<T> supplier) {
Try.CheckedSupplier<T> decoratedSupplier = () -> {
waitForPermission(rateLimiter);
T result = supplier.get();
return result;
};
return decoratedSupplier;
}

/**
* Creates a runnable which is restricted by a RateLimiter.
*
* @param rateLimiter the RateLimiter
* @param runnable the original runnable
* @return a runnable which is restricted by a RateLimiter.
*/
static Try.CheckedRunnable decorateCheckedRunnable(RateLimiter rateLimiter, Try.CheckedRunnable runnable) {

Try.CheckedRunnable decoratedRunnable = () -> {
waitForPermission(rateLimiter);
runnable.run();
};
return decoratedRunnable;
}

/**
* Creates a function which is restricted by a RateLimiter.
*
* @param rateLimiter the RateLimiter
* @param function the original function
* @return a function which is restricted by a RateLimiter.
*/
static <T, R> Try.CheckedFunction<T, R> decorateCheckedFunction(RateLimiter rateLimiter, Try.CheckedFunction<T, R> function) {
Try.CheckedFunction<T, R> decoratedFunction = (T t) -> {
waitForPermission(rateLimiter);
R result = function.apply(t);
return result;
};
return decoratedFunction;
}

/**
* Creates a supplier which is restricted by a RateLimiter.
*
* @param rateLimiter the RateLimiter
* @param supplier the original supplier
* @return a supplier which is restricted by a RateLimiter.
*/
static <T> Supplier<T> decorateSupplier(RateLimiter rateLimiter, Supplier<T> supplier) {
Supplier<T> decoratedSupplier = () -> {
waitForPermission(rateLimiter);
T result = supplier.get();
return result;
};
return decoratedSupplier;
}

/**
* Creates a consumer which is restricted by a RateLimiter.
*
* @param rateLimiter the RateLimiter
* @param consumer the original consumer
* @return a consumer which is restricted by a RateLimiter.
*/
static <T> Consumer<T> decorateConsumer(RateLimiter rateLimiter, Consumer<T> consumer) {
Consumer<T> decoratedConsumer = (T t) -> {
waitForPermission(rateLimiter);
consumer.accept(t);
};
return decoratedConsumer;
}

/**
* Creates a runnable which is restricted by a RateLimiter.
*
* @param rateLimiter the RateLimiter
* @param runnable the original runnable
* @return a runnable which is restricted by a RateLimiter.
*/
static Runnable decorateRunnable(RateLimiter rateLimiter, Runnable runnable) {
Runnable decoratedRunnable = () -> {
waitForPermission(rateLimiter);
runnable.run();
};
return decoratedRunnable;
}

/**
* Creates a function which is restricted by a RateLimiter.
*
* @param rateLimiter the RateLimiter
* @param function the original function
* @return a function which is restricted by a RateLimiter.
*/
static <T, R> Function<T, R> decorateFunction(RateLimiter rateLimiter, Function<T, R> function) {
Function<T, R> decoratedFunction = (T t) -> {
waitForPermission(rateLimiter);
R result = function.apply(t);
return result;
};
return decoratedFunction;
}

/**
* Will wait for permission within default timeout duration.
* Throws {@link RequestNotPermitted} if waiting time elapsed before a permit was acquired.
*
* @param rateLimiter the RateLimiter to get permission from
*/
static void waitForPermission(final RateLimiter rateLimiter) {
RateLimiterConfig rateLimiterConfig = rateLimiter.getRateLimiterConfig();
Duration timeoutDuration = rateLimiterConfig.getTimeoutDuration();
boolean permission = rateLimiter.getPermission(timeoutDuration);
if (!permission) {
throw new RequestNotPermitted("Request not permitted for limiter: " + rateLimiter.getName());
}
}
}
116 changes: 116 additions & 0 deletions src/main/java/javaslang/ratelimiter/RateLimiterConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package javaslang.ratelimiter;

import static java.util.Objects.requireNonNull;

import java.time.Duration;

public class RateLimiterConfig {
private static final String TIMEOUT_DURATION_MUST_NOT_BE_NULL = "TimeoutDuration must not be null";
private static final String LIMIT_REFRESH_PERIOD_MUST_NOT_BE_NULL = "LimitRefreshPeriod must not be null";

private static final Duration ACCEPTABLE_REFRESH_PERIOD = Duration.ofNanos(500L); // TODO: use jmh to find real one

private final Duration timeoutDuration;
private final Duration limitRefreshPeriod;
private final int limitForPeriod;

private RateLimiterConfig(final Duration timeoutDuration, final Duration limitRefreshPeriod, final int limitForPeriod) {
this.timeoutDuration = checkTimeoutDuration(timeoutDuration);
this.limitRefreshPeriod = checkLimitRefreshPeriod(limitRefreshPeriod);
this.limitForPeriod = checkLimitForPeriod(limitForPeriod);
}

public static Builder builder() {
return new Builder();
}

public Duration getTimeoutDuration() {
return timeoutDuration;
}

public Duration getLimitRefreshPeriod() {
return limitRefreshPeriod;
}

public int getLimitForPeriod() {
return limitForPeriod;
}

public static class Builder {

private Duration timeoutDuration;
private Duration limitRefreshPeriod;
private int limitForPeriod;

/**
* Builds a RateLimiterConfig
*
* @return the RateLimiterConfig
*/
public RateLimiterConfig build() {
return new RateLimiterConfig(
timeoutDuration,
limitRefreshPeriod,
limitForPeriod
);
}

/**
* Configures the default wait for permission duration.
*
* @param timeoutDuration the default wait for permission duration
* @return the RateLimiterConfig.Builder
*/
public Builder timeoutDuration(final Duration timeoutDuration) {
this.timeoutDuration = checkTimeoutDuration(timeoutDuration);
return this;
}

/**
* Configures the period of limit refresh.
* After each period rate limiter sets its permissions
* count to {@link RateLimiterConfig#limitForPeriod} value.
*
* @param limitRefreshPeriod the period of limit refresh
* @return the RateLimiterConfig.Builder
*/
public Builder limitRefreshPeriod(final Duration limitRefreshPeriod) {
this.limitRefreshPeriod = checkLimitRefreshPeriod(limitRefreshPeriod);
return this;
}

/**
* Configures the permissions limit for refresh period.
* Count of permissions available during one rate limiter period
* specified by {@link RateLimiterConfig#limitRefreshPeriod} value.
*
* @param limitForPeriod the permissions limit for refresh period
* @return the RateLimiterConfig.Builder
*/
public Builder limitForPeriod(final int limitForPeriod) {
this.limitForPeriod = checkLimitForPeriod(limitForPeriod);
return this;
}

}

private static Duration checkTimeoutDuration(final Duration timeoutDuration) {
return requireNonNull(timeoutDuration, TIMEOUT_DURATION_MUST_NOT_BE_NULL);
}

private static Duration checkLimitRefreshPeriod(Duration limitRefreshPeriod) {
requireNonNull(limitRefreshPeriod, LIMIT_REFRESH_PERIOD_MUST_NOT_BE_NULL);
boolean refreshPeriodIsTooShort = limitRefreshPeriod.compareTo(ACCEPTABLE_REFRESH_PERIOD) < 0;
if (refreshPeriodIsTooShort) {
throw new IllegalArgumentException("LimitRefreshPeriod is too short");
}
return limitRefreshPeriod;
}

private static int checkLimitForPeriod(final int limitForPeriod) {
if (limitForPeriod < 1) {
throw new IllegalArgumentException("LimitForPeriod should be greater than 0");
}
return limitForPeriod;
}
}
41 changes: 41 additions & 0 deletions src/main/java/javaslang/ratelimiter/RateLimiterRegistry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package javaslang.ratelimiter;

import javaslang.ratelimiter.internal.InMemoryRateLimiterRegistry;

import java.util.function.Supplier;

/**
* Manages all RateLimiter instances.
*/
public interface RateLimiterRegistry {

/**
* Returns a managed {@link RateLimiter} or creates a new one with the default RateLimiter configuration.
*
* @param name the name of the RateLimiter
* @return The {@link RateLimiter}
*/
RateLimiter rateLimiter(String name);

/**
* Returns a managed {@link RateLimiter} or creates a new one with a custom RateLimiter configuration.
*
* @param name the name of the RateLimiter
* @param rateLimiterConfig a custom RateLimiter configuration
* @return The {@link RateLimiter}
*/
RateLimiter rateLimiter(String name, RateLimiterConfig rateLimiterConfig);

/**
* Returns a managed {@link RateLimiterConfig} or creates a new one with a custom RateLimiterConfig configuration.
*
* @param name the name of the RateLimiterConfig
* @param rateLimiterConfigSupplier a supplier of a custom RateLimiterConfig configuration
* @return The {@link RateLimiterConfig}
*/
RateLimiter rateLimiter(String name, Supplier<RateLimiterConfig> rateLimiterConfigSupplier);

static RateLimiterRegistry of(RateLimiterConfig defaultRateLimiterConfig) {
return new InMemoryRateLimiterRegistry(defaultRateLimiterConfig);
}
}
17 changes: 17 additions & 0 deletions src/main/java/javaslang/ratelimiter/RequestNotPermitted.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package javaslang.ratelimiter;

/**
* Exception that indicates that current thread was not able to acquire permission
* from {@link RateLimiter}.
*/
public class RequestNotPermitted extends RuntimeException {

/**
* The constructor with a message.
*
* @param message The message.
*/
public RequestNotPermitted(final String message) {
super(message);
}
}
Loading

0 comments on commit 5a30b38

Please sign in to comment.