Skip to content

Commit

Permalink
Merge pull request ReactiveX#177 from resilience4j/174-dynamic-rateli…
Browse files Browse the repository at this point in the history
…miter-configuration

ReactiveX#174 Dynamic ratelimiter configuration
  • Loading branch information
storozhukBM authored Nov 27, 2017
2 parents cdf155d + c2a3d18 commit c473702
Show file tree
Hide file tree
Showing 9 changed files with 314 additions and 47 deletions.
4 changes: 0 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
language: java
sudo: false
addons:
apt:
packages:
- oracle-java8-installer
jdk:
- oraclejdk8
before_install:
Expand Down
7 changes: 5 additions & 2 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ decoratedSupplier = Retry
// Execute the decorated supplier and recover from any exception
String result = Try.ofSupplier(decoratedSupplier)
.recover(throwable -> "Hello from Recovery").get();
// When you don't want to decorate your lambda expression,
// When you don't want to decorate your lambda expression,
// but just execute it and protect the call by a CircuitBreaker.
String result = circuitBreaker.executeSupplier(backendService::doSomething);
Expand Down Expand Up @@ -166,6 +166,9 @@ AtomicRateLimiter atomicLimiter;
long nanosToWaitForPermission = atomicLimiter.getNanosToWait();
----

You can also dynamically change some rate limiter configurations. Find out more in our *http://resilience4j.github.io/resilience4j/[User Guide]*


[[bulkhead]]
=== Bulkhead
The following example shows how to decorate a lambda expression with a Bulkhead. A Bulkhead can be used to limit the amount of parallel executions. This bulkhead abstraction should work well across a variety of threading and io models. It is based on a semaphore, and unlike Hystrix, does not provide "shadow" thread pool option.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,27 @@ Try.run(restrictedCall)
.onFailure((RequestNotPermitted throwable) -> LOG.info("Wait before call it again :)"));
----

===== Dynamic rate limiter reconfiguration

You can use `changeTimeoutDuration` and `changeLimitForPeriod` methods to change rate limiter params in runtime.

[source,java]
----
// Decorate your call to BackendService.doSomething()
CheckedRunnable restrictedCall = RateLimiter
.decorateCheckedRunnable(rateLimiter, backendService::doSomething);
Try.run(restrictedCall)
.andThenTry(restrictedCall)
.onFailure((RequestNotPermitted throwable) -> LOG.info("Wait before call it again :)"));
// durring second refresh cycle limiter will get 100 permissions
rateLimiter.changeLimitForPeriod(100);
----

New timeout duration won't affect threads that are currently waiting for permission.
New limit won't affect current period permissions and will apply only from next one.

===== RateLimiter and RxJava

The following example shows how to decorate an Observable by using the custom RxJava operator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,22 @@ static void waitForPermission(final RateLimiter rateLimiter) throws IllegalState
}
}

/**
* Dynamic rate limiter configuration change.
* This method allows to change timeout duration of current limiter.
* NOTE! New timeout duration won't affect threads that are currently waiting for permission.
* @param timeoutDuration new timeout duration
*/
void changeTimeoutDuration(Duration timeoutDuration);

/**
* Dynamic rate limiter configuration change.
* This method allows to change count of permissions available during refresh period.
* NOTE! New limit won't affect current period permissions and will apply only from next one.
* @param limitForPeriod new permissions limit
*/
void changeLimitForPeriod(int limitForPeriod);

/**
* Acquires a permission from this rate limiter, blocking until one is
* available.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,18 @@ public class RateLimiterConfig {
private static final String LIMIT_REFRESH_PERIOD_MUST_NOT_BE_NULL = "LimitRefreshPeriod must not be null";
private static final Duration ACCEPTABLE_REFRESH_PERIOD = Duration.ofNanos(1L);

private Duration timeoutDuration = Duration.ofSeconds(5);
private Duration limitRefreshPeriod = Duration.ofNanos(500);
private int limitForPeriod = 50;

private RateLimiterConfig() {
private final Duration timeoutDuration;
private final long timeoutDurationInNanos;
private final Duration limitRefreshPeriod;
private final long limitRefreshPeriodInNanos;
private final int limitForPeriod;

private RateLimiterConfig(Duration timeoutDuration, Duration limitRefreshPeriod, int limitForPeriod) {
this.timeoutDuration = timeoutDuration;
this.timeoutDurationInNanos = timeoutDuration.toNanos();
this.limitRefreshPeriod = limitRefreshPeriod;
this.limitRefreshPeriodInNanos = limitRefreshPeriod.toNanos();
this.limitForPeriod = limitForPeriod;
}

/**
Expand All @@ -43,6 +50,15 @@ public static Builder custom() {
return new Builder();
}

/**
* Returns a builder to create a custom RateLimiterConfig using specified config as prototype
*
* @return a {@link RateLimiterConfig.Builder}
*/
public static Builder from(RateLimiterConfig prototype) {
return new Builder(prototype);
}

/**
* Creates a default RateLimiter configuration.
*
Expand All @@ -64,6 +80,14 @@ public int getLimitForPeriod() {
return limitForPeriod;
}

public long getTimeoutDurationInNanos() {
return timeoutDurationInNanos;
}

public long getLimitRefreshPeriodInNanos() {
return limitRefreshPeriodInNanos;
}

@Override public String toString() {
return "RateLimiterConfig{" +
"timeoutDuration=" + timeoutDuration +
Expand All @@ -73,16 +97,26 @@ public int getLimitForPeriod() {
}

public static class Builder {
private Duration timeoutDuration = Duration.ofSeconds(5);
private Duration limitRefreshPeriod = Duration.ofNanos(500);
private int limitForPeriod = 50;

private RateLimiterConfig config = new RateLimiterConfig();
public Builder() {
}

public Builder(RateLimiterConfig prototype) {
this.timeoutDuration = prototype.timeoutDuration;
this.limitRefreshPeriod = prototype.limitRefreshPeriod;
this.limitForPeriod = prototype.limitForPeriod;
}

/**
* Builds a RateLimiterConfig
*
* @return the RateLimiterConfig
*/
public RateLimiterConfig build() {
return config;
return new RateLimiterConfig(timeoutDuration, limitRefreshPeriod, limitForPeriod);
}

/**
Expand All @@ -93,7 +127,7 @@ public RateLimiterConfig build() {
* @return the RateLimiterConfig.Builder
*/
public Builder timeoutDuration(final Duration timeoutDuration) {
config.timeoutDuration = checkTimeoutDuration(timeoutDuration);
this.timeoutDuration = checkTimeoutDuration(timeoutDuration);
return this;
}

Expand All @@ -107,7 +141,7 @@ public Builder timeoutDuration(final Duration timeoutDuration) {
* @return the RateLimiterConfig.Builder
*/
public Builder limitRefreshPeriod(final Duration limitRefreshPeriod) {
config.limitRefreshPeriod = checkLimitRefreshPeriod(limitRefreshPeriod);
this.limitRefreshPeriod = checkLimitRefreshPeriod(limitRefreshPeriod);
return this;
}

Expand All @@ -121,7 +155,7 @@ public Builder limitRefreshPeriod(final Duration limitRefreshPeriod) {
* @return the RateLimiterConfig.Builder
*/
public Builder limitForPeriod(final int limitForPeriod) {
config.limitForPeriod = checkLimitForPeriod(limitForPeriod);
this.limitForPeriod = checkLimitForPeriod(limitForPeriod);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,45 @@ public class AtomicRateLimiter implements RateLimiter {
private static final long nanoTimeStart = nanoTime();

private final String name;
private final RateLimiterConfig rateLimiterConfig;
private final long cyclePeriodInNanos;
private final int permissionsPerCycle;
private final AtomicInteger waitingThreads;
private final AtomicReference<State> state;
private final RateLimiterEventProcessor eventProcessor;


public AtomicRateLimiter(String name, RateLimiterConfig rateLimiterConfig) {
this.name = name;
this.rateLimiterConfig = rateLimiterConfig;

cyclePeriodInNanos = rateLimiterConfig.getLimitRefreshPeriod().toNanos();
permissionsPerCycle = rateLimiterConfig.getLimitForPeriod();

waitingThreads = new AtomicInteger(0);
state = new AtomicReference<>(new State(0, permissionsPerCycle, 0));
state = new AtomicReference<>(new State(
rateLimiterConfig,0, rateLimiterConfig.getLimitForPeriod(), 0
));
eventProcessor = new RateLimiterEventProcessor();
}

/**
* {@inheritDoc}
*/
@Override
public void changeTimeoutDuration(final Duration timeoutDuration) {
RateLimiterConfig newConfig = RateLimiterConfig.from(state.get().config)
.timeoutDuration(timeoutDuration)
.build();
state.updateAndGet(currentState -> new State(
newConfig, currentState.activeCycle, currentState.activePermissions, currentState.nanosToWait
));
}

this.eventProcessor = new RateLimiterEventProcessor();
/**
* {@inheritDoc}
*/
@Override
public void changeLimitForPeriod(final int limitForPeriod) {
RateLimiterConfig newConfig = RateLimiterConfig.from(state.get().config)
.limitForPeriod(limitForPeriod)
.build();
state.updateAndGet(currentState -> new State(
newConfig, currentState.activeCycle, currentState.activePermissions, currentState.nanosToWait
));
}

/**
Expand Down Expand Up @@ -143,6 +163,9 @@ private boolean compareAndSet(final State current, final State next) {
* @return next {@link State}
*/
private State calculateNextState(final long timeoutInNanos, final State activeState) {
long cyclePeriodInNanos = activeState.config.getLimitRefreshPeriodInNanos();
int permissionsPerCycle = activeState.config.getLimitForPeriod();

long currentNanos = currentNanoTime();
long currentCycle = currentNanos / cyclePeriodInNanos;

Expand All @@ -154,21 +177,26 @@ private State calculateNextState(final long timeoutInNanos, final State activeSt
nextCycle = currentCycle;
nextPermissions = (int) min(nextPermissions + accumulatedPermissions, permissionsPerCycle);
}
long nextNanosToWait = nanosToWaitForPermission(nextPermissions, currentNanos, currentCycle);
State nextState = reservePermissions(timeoutInNanos, nextCycle, nextPermissions, nextNanosToWait);
long nextNanosToWait = nanosToWaitForPermission(
cyclePeriodInNanos, permissionsPerCycle, nextPermissions, currentNanos, currentCycle
);
State nextState = reservePermissions(activeState.config, timeoutInNanos, nextCycle, nextPermissions, nextNanosToWait);
return nextState;
}

/**
* Calculates time to wait for next permission as
* [time to the next cycle] + [duration of full cycles until reserved permissions expire]
*
* @param availablePermissions currently available permissions, can be negative if some permissions have been reserved
*
* @param cyclePeriodInNanos current configuration values
* @param permissionsPerCycle current configuration values
*@param availablePermissions currently available permissions, can be negative if some permissions have been reserved
* @param currentNanos current time in nanoseconds
* @param currentCycle current {@link AtomicRateLimiter} cycle
* @return nanoseconds to wait for the next permission
* @param currentCycle current {@link AtomicRateLimiter} cycle @return nanoseconds to wait for the next permission
*/
private long nanosToWaitForPermission(final int availablePermissions, final long currentNanos, final long currentCycle) {
private long nanosToWaitForPermission(final long cyclePeriodInNanos, final int permissionsPerCycle,
final int availablePermissions, final long currentNanos, final long currentCycle) {
if (availablePermissions > 0) {
return 0L;
}
Expand All @@ -182,19 +210,22 @@ private long nanosToWaitForPermission(final int availablePermissions, final long
* Determines whether caller can acquire permission before timeout or not and then creates corresponding {@link State}.
* Reserves permissions only if caller can successfully wait for permission.
*
*
* @param config
* @param timeoutInNanos max time that caller can wait for permission in nanoseconds
* @param cycle cycle for new {@link State}
* @param permissions permissions for new {@link State}
* @param nanosToWait nanoseconds to wait for the next permission
* @return new {@link State} with possibly reserved permissions and time to wait
*/
private State reservePermissions(final long timeoutInNanos, final long cycle, final int permissions, final long nanosToWait) {
private State reservePermissions(final RateLimiterConfig config, final long timeoutInNanos,
final long cycle, final int permissions, final long nanosToWait) {
boolean canAcquireInTime = timeoutInNanos >= nanosToWait;
int permissionsWithReservation = permissions;
if (canAcquireInTime) {
permissionsWithReservation--;
}
return new State(cycle, permissionsWithReservation, nanosToWait);
return new State(config, cycle, permissionsWithReservation, nanosToWait);
}

/**
Expand Down Expand Up @@ -256,7 +287,7 @@ public String getName() {
*/
@Override
public RateLimiterConfig getRateLimiterConfig() {
return rateLimiterConfig;
return state.get().config;
}

/**
Expand All @@ -275,7 +306,7 @@ public EventPublisher getEventPublisher() {
@Override public String toString() {
return "AtomicRateLimiter{" +
"name='" + name + '\'' +
", rateLimiterConfig=" + rateLimiterConfig +
", rateLimiterConfig=" + state.get().config +
'}';
}

Expand Down Expand Up @@ -314,13 +345,15 @@ private void publishRateLimiterEvent(boolean permissionAcquired) {
* </ul>
*/
private static class State {
private final RateLimiterConfig config;

private final long activeCycle;

private final int activePermissions;
private final long nanosToWait;

private State(final long activeCycle, final int activePermissions, final long nanosToWait) {
private State(RateLimiterConfig config,
final long activeCycle, final int activePermissions, final long nanosToWait) {
this.config = config;
this.activeCycle = activeCycle;
this.activePermissions = activePermissions;
this.nanosToWait = nanosToWait;
Expand Down
Loading

0 comments on commit c473702

Please sign in to comment.