Skip to content

Commit

Permalink
Merge pull request ReactiveX#184 from resilience4j/dynamic-bulkhead-c…
Browse files Browse the repository at this point in the history
…onfig

Dynamic bulkhead config
  • Loading branch information
storozhukBM authored Dec 11, 2017
2 parents 73da955 + c048f87 commit 5e67b37
Show file tree
Hide file tree
Showing 7 changed files with 357 additions and 37 deletions.
2 changes: 2 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ The Bulkhead provides an interface to monitor the current number of available co
int availableConcurrentCalls = bulkhead.getMetrics().getAvailableConcurrentCalls()
----

You can also dynamically change it's configuration.

[[cache]]
=== Cache

Expand Down
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ configure(project.coreProjects) {

cobertura {
coverageIgnoreTrivial = true
coverageExcludes = ['.*io.github.resilience4j.bulkhead.internal.*']
}

jmh {
Expand Down Expand Up @@ -134,4 +135,4 @@ artifactory {
clientConfig.proxy.host = System.properties['https.proxyHost']
clientConfig.proxy.port = System.properties['https.proxyPort'].toInteger()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@
*/
public interface Bulkhead {

/**
* Dynamic bulkhead configuration change.
* NOTE! New `maxWaitTime` duration won't affect threads that are currently waiting for permission.
* @param newConfig new BulkheadConfig
*/
void changeConfig(BulkheadConfig newConfig);

/**
* Attempts to acquire a permit, which allows an call to be executed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

/**
Expand All @@ -39,22 +40,23 @@ public class SemaphoreBulkhead implements Bulkhead {

private final String name;
private final Semaphore semaphore;
private final BulkheadConfig bulkheadConfig;
private final Object configChangesLock = new Object();
private final AtomicReference<BulkheadConfig> config;
private final BulkheadMetrics metrics;
private final BulkheadEventProcessor eventProcessor;

/**
* Creates a bulkhead using a configuration supplied
*
* @param name the name of this bulkhead
* @param name the name of this bulkhead
* @param bulkheadConfig custom bulkhead configuration
*/
public SemaphoreBulkhead(String name, BulkheadConfig bulkheadConfig) {
this.name = name;
this.bulkheadConfig = bulkheadConfig != null ? bulkheadConfig
: BulkheadConfig.ofDefaults();
this.config = new AtomicReference<>(bulkheadConfig != null ? bulkheadConfig
: BulkheadConfig.ofDefaults());
// init semaphore
this.semaphore = new Semaphore(this.bulkheadConfig.getMaxConcurrentCalls(), true);
this.semaphore = new Semaphore(this.config.get().getMaxConcurrentCalls(), true);

this.metrics = new BulkheadMetrics();
this.eventProcessor = new BulkheadEventProcessor();
Expand All @@ -79,42 +81,74 @@ public SemaphoreBulkhead(String name, Supplier<BulkheadConfig> configSupplier) {
this(name, configSupplier.get());
}

/**
* {@inheritDoc}
*/
@Override
public void changeConfig(final BulkheadConfig newConfig) {
synchronized (configChangesLock) {
int delta = newConfig.getMaxConcurrentCalls() - config.get().getMaxConcurrentCalls();
if (delta < 0) {
semaphore.acquireUninterruptibly(-delta);
} else if (delta > 0) {
semaphore.release(delta);
}
config.set(newConfig);
}
}

/**
* {@inheritDoc}
*/
@Override
public boolean isCallPermitted() {

boolean callPermitted = tryEnterBulkhead();

publishBulkheadEvent(
() -> callPermitted ? new BulkheadOnCallPermittedEvent(name)
: new BulkheadOnCallRejectedEvent(name)
() -> callPermitted ? new BulkheadOnCallPermittedEvent(name)
: new BulkheadOnCallRejectedEvent(name)
);

return callPermitted;
}

/**
* {@inheritDoc}
*/
@Override
public void onComplete() {
semaphore.release();
publishBulkheadEvent(() -> new BulkheadOnCallFinishedEvent(name));
}

/**
* {@inheritDoc}
*/
@Override
public String getName() {
return this.name;
}

/**
* {@inheritDoc}
*/
@Override
public BulkheadConfig getBulkheadConfig() {
return bulkheadConfig;
return config.get();
}

/**
* {@inheritDoc}
*/
@Override
public Metrics getMetrics() {
return metrics;
}


/**
* {@inheritDoc}
*/
@Override
public EventPublisher getEventPublisher() {
return eventProcessor;
Expand Down Expand Up @@ -154,7 +188,7 @@ public String toString() {
boolean tryEnterBulkhead() {

boolean callPermitted = false;
long timeout = bulkheadConfig.getMaxWaitTime();
long timeout = config.get().getMaxWaitTime();

if (timeout == 0) {
callPermitted = semaphore.tryAcquire();
Expand All @@ -165,7 +199,6 @@ boolean tryEnterBulkhead() {
callPermitted = false;
}
}
//
return callPermitted;
}

Expand Down
Loading

0 comments on commit 5e67b37

Please sign in to comment.