From 54d9cefb17605f50cd273abfc1a6b1390004556e Mon Sep 17 00:00:00 2001 From: bstorozhuk Date: Thu, 7 Dec 2017 16:01:21 +0200 Subject: [PATCH 1/8] Dynamic bulkhead configuration --- .../resilience4j/bulkhead/Bulkhead.java | 7 ++ .../bulkhead/internal/SemaphoreBulkhead.java | 85 +++++++++++-------- .../internal/SemaphoreBulkheadTest.java | 79 +++++++++++++---- 3 files changed, 118 insertions(+), 53 deletions(-) diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/Bulkhead.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/Bulkhead.java index c126243e36..67838827a0 100644 --- a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/Bulkhead.java +++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/Bulkhead.java @@ -53,6 +53,13 @@ */ public interface Bulkhead { + /** + * Dynamic bulkhead configuration change. + * NOTE! New `maxWaitTime` duration won't affect threads that are currently waiting for permission. + * @param newConfig new BulkheadConfig + */ + void changeConfig(BulkheadConfig newConfig); + /** * Attempts to acquire a permit, which allows an call to be executed. * diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java index b3c3453185..28f959f319 100644 --- a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java +++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java @@ -29,31 +29,33 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; /** * A Bulkhead implementation based on a semaphore. */ -public class SemaphoreBulkhead implements Bulkhead{ +public class SemaphoreBulkhead implements Bulkhead { private final String name; private final Semaphore semaphore; - private final BulkheadConfig bulkheadConfig; + private final Object configChangesLock = new Object(); + private final AtomicReference config; private final BulkheadMetrics metrics; private final BulkheadEventProcessor eventProcessor; /** * Creates a bulkhead using a configuration supplied * - * @param name the name of this bulkhead + * @param name the name of this bulkhead * @param bulkheadConfig custom bulkhead configuration */ public SemaphoreBulkhead(String name, BulkheadConfig bulkheadConfig) { this.name = name; - this.bulkheadConfig = bulkheadConfig != null ? bulkheadConfig - : BulkheadConfig.ofDefaults(); + this.config = new AtomicReference<>(bulkheadConfig != null ? bulkheadConfig + : BulkheadConfig.ofDefaults()); // init semaphore - this.semaphore = new Semaphore(this.bulkheadConfig.getMaxConcurrentCalls(), true); + this.semaphore = new Semaphore(this.config.get().getMaxConcurrentCalls(), true); this.metrics = new BulkheadMetrics(); this.eventProcessor = new BulkheadEventProcessor(); @@ -71,13 +73,25 @@ public SemaphoreBulkhead(String name) { /** * Create a bulkhead using a configuration supplier * - * @param name the name of this bulkhead + * @param name the name of this bulkhead * @param configSupplier BulkheadConfig supplier */ public SemaphoreBulkhead(String name, Supplier configSupplier) { this(name, configSupplier.get()); } + @Override + public void changeConfig(final BulkheadConfig newConfig) { + synchronized (configChangesLock) { + int delta = config.get().getMaxConcurrentCalls() - newConfig.getMaxConcurrentCalls(); + if (delta < 0) { + semaphore.acquireUninterruptibly(-delta); + } else if (delta > 0) { + semaphore.release(delta); + } + config.set(newConfig); + } + } @Override public boolean isCallPermitted() { @@ -85,8 +99,8 @@ public boolean isCallPermitted() { boolean callPermitted = tryEnterBulkhead(); publishBulkheadEvent( - () -> callPermitted ? new BulkheadOnCallPermittedEvent(name) - : new BulkheadOnCallRejectedEvent(name) + () -> callPermitted ? new BulkheadOnCallPermittedEvent(name) + : new BulkheadOnCallRejectedEvent(name) ); return callPermitted; @@ -104,7 +118,7 @@ public String getName() { @Override public BulkheadConfig getBulkheadConfig() { - return bulkheadConfig; + return config.get(); } @Override @@ -118,26 +132,6 @@ public EventPublisher getEventPublisher() { return eventProcessor; } - private class BulkheadEventProcessor extends EventProcessor implements EventPublisher, EventConsumer { - - @Override - public EventPublisher onCallPermitted(EventConsumer onCallPermittedEventConsumer) { - registerConsumer(BulkheadOnCallPermittedEvent.class, onCallPermittedEventConsumer); - return this; - } - - @Override - public EventPublisher onCallRejected(EventConsumer onCallRejectedEventConsumer) { - registerConsumer(BulkheadOnCallRejectedEvent.class, onCallRejectedEventConsumer); - return this; - } - - @Override - public void consumeEvent(BulkheadEvent event) { - super.processEvent(event); - } - } - @Override public String toString() { return String.format("Bulkhead '%s'", this.name); @@ -146,29 +140,46 @@ public String toString() { boolean tryEnterBulkhead() { boolean callPermitted = false; - long timeout = bulkheadConfig.getMaxWaitTime(); + long timeout = config.get().getMaxWaitTime(); if (timeout == 0) { callPermitted = semaphore.tryAcquire(); - } - else { + } else { try { callPermitted = semaphore.tryAcquire(timeout, TimeUnit.MILLISECONDS); - } - catch (InterruptedException ex) { + } catch (InterruptedException ex) { callPermitted = false; } } - // return callPermitted; } private void publishBulkheadEvent(Supplier eventSupplier) { - if(eventProcessor.hasConsumers()) { + if (eventProcessor.hasConsumers()) { eventProcessor.consumeEvent(eventSupplier.get()); } } + private class BulkheadEventProcessor extends EventProcessor implements EventPublisher, EventConsumer { + + @Override + public EventPublisher onCallPermitted(EventConsumer onCallPermittedEventConsumer) { + registerConsumer(BulkheadOnCallPermittedEvent.class, onCallPermittedEventConsumer); + return this; + } + + @Override + public EventPublisher onCallRejected(EventConsumer onCallRejectedEventConsumer) { + registerConsumer(BulkheadOnCallRejectedEvent.class, onCallRejectedEventConsumer); + return this; + } + + @Override + public void consumeEvent(BulkheadEvent event) { + super.processEvent(event); + } + } + private final class BulkheadMetrics implements Metrics { private BulkheadMetrics() { } diff --git a/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkheadTest.java b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkheadTest.java index efa7cbedb5..b8af733072 100644 --- a/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkheadTest.java +++ b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkheadTest.java @@ -29,8 +29,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; -import static io.github.resilience4j.bulkhead.event.BulkheadEvent.Type.*; - +import static io.github.resilience4j.bulkhead.event.BulkheadEvent.Type.CALL_PERMITTED; +import static io.github.resilience4j.bulkhead.event.BulkheadEvent.Type.CALL_REJECTED; import static org.assertj.core.api.Assertions.assertThat; public class SemaphoreBulkheadTest { @@ -39,17 +39,17 @@ public class SemaphoreBulkheadTest { private TestSubscriber testSubscriber; @Before - public void setUp(){ + public void setUp() { BulkheadConfig config = BulkheadConfig.custom() - .maxConcurrentCalls(2) - .maxWaitTime(0) - .build(); + .maxConcurrentCalls(2) + .maxWaitTime(0) + .build(); bulkhead = Bulkhead.of("test", config); testSubscriber = RxJava2Adapter.toFlowable(bulkhead.getEventPublisher()) - .map(BulkheadEvent::getEventType) - .test(); + .map(BulkheadEvent::getEventType) + .test(); } @Test @@ -77,7 +77,7 @@ public void testBulkhead() throws InterruptedException { bulkhead.isCallPermitted(); testSubscriber.assertValueCount(4) - .assertValues(CALL_PERMITTED, CALL_PERMITTED, CALL_REJECTED, CALL_PERMITTED); + .assertValues(CALL_PERMITTED, CALL_PERMITTED, CALL_REJECTED, CALL_PERMITTED); } @Test @@ -120,9 +120,9 @@ public void testTryEnterWithTimeout() { // given BulkheadConfig config = BulkheadConfig.custom() - .maxConcurrentCalls(1) - .maxWaitTime(100) - .build(); + .maxConcurrentCalls(1) + .maxWaitTime(100) + .build(); SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", config); @@ -166,10 +166,10 @@ public void testEntryInterrupted() { AtomicBoolean entered = new AtomicBoolean(true); Thread t = new Thread( - () -> { - entered.set(bulkhead.tryEnterBulkhead()); - } - ); + () -> { + entered.set(bulkhead.tryEnterBulkhead()); + } + ); // when t.start(); @@ -181,6 +181,53 @@ public void testEntryInterrupted() { //assertThat(entered.get()).isFalse(); } + @Test + public void changePermissionsInIdleState() { + BulkheadConfig originalConfig = BulkheadConfig.custom() + .maxConcurrentCalls(3) + .maxWaitTime(5000) + .build(); + SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", originalConfig); + + assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(3); + assertThat(bulkhead.getBulkheadConfig().getMaxWaitTime()).isEqualTo(5000); + + BulkheadConfig newConfig = BulkheadConfig.custom() + .maxConcurrentCalls(5) + .maxWaitTime(5000) + .build(); + + bulkhead.changeConfig(newConfig); + assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(5); + assertThat(bulkhead.getBulkheadConfig().getMaxWaitTime()).isEqualTo(5000); + + + newConfig = BulkheadConfig.custom() + .maxConcurrentCalls(2) + .maxWaitTime(5000) + .build(); + + bulkhead.changeConfig(newConfig); + assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(2); + assertThat(bulkhead.getBulkheadConfig().getMaxWaitTime()).isEqualTo(5000); + + bulkhead.changeConfig(newConfig); + } + + /* + TODO: + change waiting time in idle state // simplest case + + change permissions count +|- during other threads are running with permission + change permissions count +|- during other threads are waiting for permission + change waiting time during other threads are waiting for permission + + change permissions to zero while other threads using permissions + + concurrent permissions change // test blocking behaviour + + */ + void sleep(long time) { try { Thread.sleep(time); From efb61f19b4a11845cf4414b2eb5b9e6f8b9f7e80 Mon Sep 17 00:00:00 2001 From: bstorozhuk Date: Mon, 11 Dec 2017 13:32:47 +0200 Subject: [PATCH 2/8] Additional tests for dynamic bulkhead configuration --- .../bulkhead/internal/SemaphoreBulkhead.java | 2 +- .../internal/SemaphoreBulkheadTest.java | 249 +++++++++++++++++- 2 files changed, 241 insertions(+), 10 deletions(-) diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java index 28f959f319..5af99d26ef 100644 --- a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java +++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java @@ -83,7 +83,7 @@ public SemaphoreBulkhead(String name, Supplier configSupplier) { @Override public void changeConfig(final BulkheadConfig newConfig) { synchronized (configChangesLock) { - int delta = config.get().getMaxConcurrentCalls() - newConfig.getMaxConcurrentCalls(); + int delta = newConfig.getMaxConcurrentCalls() - config.get().getMaxConcurrentCalls(); if (delta < 0) { semaphore.acquireUninterruptibly(-delta); } else if (delta > 0) { diff --git a/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkheadTest.java b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkheadTest.java index b8af733072..186e20c332 100644 --- a/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkheadTest.java +++ b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkheadTest.java @@ -29,8 +29,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import static com.jayway.awaitility.Awaitility.await; import static io.github.resilience4j.bulkhead.event.BulkheadEvent.Type.CALL_PERMITTED; import static io.github.resilience4j.bulkhead.event.BulkheadEvent.Type.CALL_REJECTED; +import static java.lang.Thread.State.*; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; public class SemaphoreBulkheadTest { @@ -214,19 +217,247 @@ public void changePermissionsInIdleState() { bulkhead.changeConfig(newConfig); } - /* - TODO: - change waiting time in idle state // simplest case + @Test + public void changeWaitTimeInIdleState() { + BulkheadConfig originalConfig = BulkheadConfig.custom() + .maxConcurrentCalls(3) + .maxWaitTime(5000) + .build(); + SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", originalConfig); + + assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(3); + assertThat(bulkhead.getBulkheadConfig().getMaxWaitTime()).isEqualTo(5000); + + BulkheadConfig newConfig = BulkheadConfig.custom() + .maxConcurrentCalls(3) + .maxWaitTime(3000) + .build(); + + bulkhead.changeConfig(newConfig); + assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(3); + assertThat(bulkhead.getBulkheadConfig().getMaxWaitTime()).isEqualTo(3000); + + + newConfig = BulkheadConfig.custom() + .maxConcurrentCalls(3) + .maxWaitTime(7000) + .build(); + + bulkhead.changeConfig(newConfig); + assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(3); + assertThat(bulkhead.getBulkheadConfig().getMaxWaitTime()).isEqualTo(7000); + + bulkhead.changeConfig(newConfig); + } + + @SuppressWarnings("Duplicates") + @Test + public void changePermissionsCountWhileOneThreadIsRunningWithThisPermission() { + BulkheadConfig originalConfig = BulkheadConfig.custom() + .maxConcurrentCalls(1) + .maxWaitTime(0) + .build(); + SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", originalConfig); + + assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1); - change permissions count +|- during other threads are running with permission - change permissions count +|- during other threads are waiting for permission - change waiting time during other threads are waiting for permission + AtomicBoolean bulkheadThreadTrigger = new AtomicBoolean(true); + assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(1); + Thread bulkheadThread = new Thread(() -> { + bulkhead.isCallPermitted(); + while (bulkheadThreadTrigger.get()) { + Thread.yield(); + } + bulkhead.onComplete(); + }); + bulkheadThread.setDaemon(true); + bulkheadThread.start(); + + await().atMost(1, SECONDS) + .until(() -> bulkheadThread.getState().equals(RUNNABLE)); - change permissions to zero while other threads using permissions + assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0); + assertThat(bulkhead.tryEnterBulkhead()).isFalse(); - concurrent permissions change // test blocking behaviour + BulkheadConfig newConfig = BulkheadConfig.custom() + .maxConcurrentCalls(2) + .maxWaitTime(0) + .build(); + + bulkhead.changeConfig(newConfig); + assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(2); + assertThat(bulkhead.getBulkheadConfig().getMaxWaitTime()).isEqualTo(0); + assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1); + assertThat(bulkhead.tryEnterBulkhead()).isTrue(); + + assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0); + assertThat(bulkhead.tryEnterBulkhead()).isFalse(); + + Thread changerThread = new Thread(() -> { + bulkhead.changeConfig(BulkheadConfig.custom() + .maxConcurrentCalls(1) + .maxWaitTime(0) + .build()); + }); + changerThread.setDaemon(true); + changerThread.start(); + + await().atMost(1, SECONDS) + .until(() -> changerThread.getState().equals(WAITING)); + + assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(2); + + bulkheadThreadTrigger.set(false); + await().atMost(1, SECONDS) + .until(() -> bulkheadThread.getState().equals(TERMINATED)); + await().atMost(1, SECONDS) + .until(() -> changerThread.getState().equals(TERMINATED)); + + assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0); + assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(1); + + bulkhead.onComplete(); + assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1); + assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(1); + } - */ + @Test + public void changePermissionsCountWhileOneThreadIsWaitingForPermission() { + BulkheadConfig originalConfig = BulkheadConfig.custom() + .maxConcurrentCalls(1) + .maxWaitTime(500000) + .build(); + SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", originalConfig); + assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1); + bulkhead.isCallPermitted(); + assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0); + + assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(1); + Thread bulkheadThread = new Thread(() -> { + bulkhead.isCallPermitted(); + bulkhead.onComplete(); + }); + bulkheadThread.setDaemon(true); + bulkheadThread.start(); + + await().atMost(1, SECONDS) + .until(() -> bulkheadThread.getState().equals(TIMED_WAITING)); + + assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0); + + BulkheadConfig newConfig = BulkheadConfig.custom() + .maxConcurrentCalls(2) + .maxWaitTime(500000) + .build(); + + bulkhead.changeConfig(newConfig); + await().atMost(1, SECONDS) + .until(() -> bulkheadThread.getState().equals(TERMINATED)); + assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(2); + assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1); + } + + @Test + public void changeWaitingTimeWhileOneThreadIsWaitingForPermission() { + BulkheadConfig originalConfig = BulkheadConfig.custom() + .maxConcurrentCalls(1) + .maxWaitTime(500000) + .build(); + SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", originalConfig); + assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1); + bulkhead.isCallPermitted(); + assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0); + + assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(1); + Thread bulkheadThread = new Thread(() -> { + bulkhead.isCallPermitted(); + bulkhead.onComplete(); + }); + bulkheadThread.setDaemon(true); + bulkheadThread.start(); + + await().atMost(1, SECONDS) + .until(() -> bulkheadThread.getState().equals(TIMED_WAITING)); + assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0); + + BulkheadConfig newConfig = BulkheadConfig.custom() + .maxConcurrentCalls(1) + .maxWaitTime(0) + .build(); + + bulkhead.changeConfig(newConfig); + assertThat(bulkhead.tryEnterBulkhead()).isFalse(); // main thread is not blocked + + // previously blocked thread is still waiting + await().atMost(1, SECONDS) + .until(() -> bulkheadThread.getState().equals(TIMED_WAITING)); + } + + @SuppressWarnings("Duplicates") + @Test + public void changePermissionsConcurrently() { + BulkheadConfig originalConfig = BulkheadConfig.custom() + .maxConcurrentCalls(3) + .maxWaitTime(0) + .build(); + SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", originalConfig); + + assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(3); + + AtomicBoolean bulkheadThreadTrigger = new AtomicBoolean(true); + assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(3); + Thread bulkheadThread = new Thread(() -> { + bulkhead.isCallPermitted(); + while (bulkheadThreadTrigger.get()) { + Thread.yield(); + } + bulkhead.onComplete(); + }); + bulkheadThread.setDaemon(true); + bulkheadThread.start(); + + await().atMost(1, SECONDS) + .until(() -> bulkheadThread.getState().equals(RUNNABLE)); + + assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(2); + assertThat(bulkhead.tryEnterBulkhead()).isTrue(); + assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1); + + Thread firstChangerThread = new Thread(() -> { + bulkhead.changeConfig(BulkheadConfig.custom() + .maxConcurrentCalls(1) + .maxWaitTime(0) + .build()); + }); + firstChangerThread.setDaemon(true); + firstChangerThread.start(); + + await().atMost(1, SECONDS) + .until(() -> firstChangerThread.getState().equals(WAITING)); + + Thread secondChangerThread = new Thread(() -> { + bulkhead.changeConfig(BulkheadConfig.custom() + .maxConcurrentCalls(4) + .maxWaitTime(0) + .build()); + }); + secondChangerThread.setDaemon(true); + secondChangerThread.start(); + + await().atMost(1, SECONDS) + .until(() -> secondChangerThread.getState().equals(BLOCKED)); + + bulkheadThreadTrigger.set(false); + await().atMost(1, SECONDS) + .until(() -> bulkheadThread.getState().equals(TERMINATED)); + await().atMost(1, SECONDS) + .until(() -> firstChangerThread.getState().equals(TERMINATED)); + await().atMost(1, SECONDS) + .until(() -> secondChangerThread.getState().equals(TERMINATED)); + + assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(4); + assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(3); // main thread is still holding + } void sleep(long time) { try { From 0d8937f632ba22a4585fd8616bd9d4fa534a60c4 Mon Sep 17 00:00:00 2001 From: bstorozhuk Date: Mon, 11 Dec 2017 13:37:35 +0200 Subject: [PATCH 3/8] Documentation update --- README.adoc | 2 ++ .../src/docs/asciidoc/core_guides/bulkhead.adoc | 5 +++++ 2 files changed, 7 insertions(+) diff --git a/README.adoc b/README.adoc index db9e11b64d..4f77f5cef8 100644 --- a/README.adoc +++ b/README.adoc @@ -187,6 +187,8 @@ The Bulkhead provides an interface to monitor the current number of available co int availableConcurrentCalls = bulkhead.getMetrics().getAvailableConcurrentCalls() ---- +You can also dynamically change it's configuration. + [[cache]] === Cache diff --git a/resilience4j-documentation/src/docs/asciidoc/core_guides/bulkhead.adoc b/resilience4j-documentation/src/docs/asciidoc/core_guides/bulkhead.adoc index 0905c699ff..2aa302eb75 100644 --- a/resilience4j-documentation/src/docs/asciidoc/core_guides/bulkhead.adoc +++ b/resilience4j-documentation/src/docs/asciidoc/core_guides/bulkhead.adoc @@ -72,6 +72,11 @@ You can also chain up functions which are decorated by different Bulkheads and/o include::../../../../../resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/BulkheadTest.java[tags=shouldChainDecoratedFunctions] ---- +==== Dynamic bulkhead reconfiguration + +You can use `changeConfig` method to modify balkhead params in runtime. +NOTE! New `maxWaitTime` duration won't affect threads that are currently waiting for permission. + ===== Bulkhead and RxJava The following example shows how to decorate an Observable by using the custom RxJava operator. From d116135193b129e44f05032a780479d0742ea1fe Mon Sep 17 00:00:00 2001 From: bstorozhuk Date: Mon, 11 Dec 2017 23:00:34 +0200 Subject: [PATCH 4/8] Documentation inheritance. Typo fix. --- .../bulkhead/internal/SemaphoreBulkhead.java | 25 ++++++++++++++++++- .../docs/asciidoc/core_guides/bulkhead.adoc | 2 +- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java index 5af99d26ef..c0d85388d7 100644 --- a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java +++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java @@ -80,6 +80,9 @@ public SemaphoreBulkhead(String name, Supplier configSupplier) { this(name, configSupplier.get()); } + /** + * {@inheritDoc} + */ @Override public void changeConfig(final BulkheadConfig newConfig) { synchronized (configChangesLock) { @@ -93,6 +96,9 @@ public void changeConfig(final BulkheadConfig newConfig) { } } + /** + * {@inheritDoc} + */ @Override public boolean isCallPermitted() { @@ -106,32 +112,49 @@ public boolean isCallPermitted() { return callPermitted; } + /** + * {@inheritDoc} + */ @Override public void onComplete() { semaphore.release(); } + /** + * {@inheritDoc} + */ @Override public String getName() { return this.name; } + /** + * {@inheritDoc} + */ @Override public BulkheadConfig getBulkheadConfig() { return config.get(); } + /** + * {@inheritDoc} + */ @Override public Metrics getMetrics() { return metrics; } - + /** + * {@inheritDoc} + */ @Override public EventPublisher getEventPublisher() { return eventProcessor; } + /** + * {@inheritDoc} + */ @Override public String toString() { return String.format("Bulkhead '%s'", this.name); diff --git a/resilience4j-documentation/src/docs/asciidoc/core_guides/bulkhead.adoc b/resilience4j-documentation/src/docs/asciidoc/core_guides/bulkhead.adoc index 2aa302eb75..e8269d6387 100644 --- a/resilience4j-documentation/src/docs/asciidoc/core_guides/bulkhead.adoc +++ b/resilience4j-documentation/src/docs/asciidoc/core_guides/bulkhead.adoc @@ -74,7 +74,7 @@ include::../../../../../resilience4j-bulkhead/src/test/java/io/github/resilience ==== Dynamic bulkhead reconfiguration -You can use `changeConfig` method to modify balkhead params in runtime. +You can use `changeConfig` method to modify bulkhead params in runtime. NOTE! New `maxWaitTime` duration won't affect threads that are currently waiting for permission. ===== Bulkhead and RxJava From 68e34664f75bcab3abee524c53291c1f5fca15b9 Mon Sep 17 00:00:00 2001 From: bstorozhuk Date: Mon, 11 Dec 2017 23:16:38 +0200 Subject: [PATCH 5/8] After merge fixes --- .../bulkhead/internal/SemaphoreBulkhead.java | 20 ------------------- .../internal/SemaphoreBulkheadTest.java | 3 +-- 2 files changed, 1 insertion(+), 22 deletions(-) diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java index 825ace5c17..7ab490f889 100644 --- a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java +++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java @@ -208,26 +208,6 @@ private void publishBulkheadEvent(Supplier eventSupplier) { } } - private class BulkheadEventProcessor extends EventProcessor implements EventPublisher, EventConsumer { - - @Override - public EventPublisher onCallPermitted(EventConsumer onCallPermittedEventConsumer) { - registerConsumer(BulkheadOnCallPermittedEvent.class, onCallPermittedEventConsumer); - return this; - } - - @Override - public EventPublisher onCallRejected(EventConsumer onCallRejectedEventConsumer) { - registerConsumer(BulkheadOnCallRejectedEvent.class, onCallRejectedEventConsumer); - return this; - } - - @Override - public void consumeEvent(BulkheadEvent event) { - super.processEvent(event); - } - } - private final class BulkheadMetrics implements Metrics { private BulkheadMetrics() { } diff --git a/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkheadTest.java b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkheadTest.java index 100254a984..4ccafe734d 100644 --- a/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkheadTest.java +++ b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkheadTest.java @@ -30,8 +30,7 @@ import java.util.function.Supplier; import static com.jayway.awaitility.Awaitility.await; -import static io.github.resilience4j.bulkhead.event.BulkheadEvent.Type.CALL_PERMITTED; -import static io.github.resilience4j.bulkhead.event.BulkheadEvent.Type.CALL_REJECTED; +import static io.github.resilience4j.bulkhead.event.BulkheadEvent.Type.*; import static java.lang.Thread.State.*; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; From 4572e6bb6d7aeccd5ca75158f96ae3049ae38dd5 Mon Sep 17 00:00:00 2001 From: bstorozhuk Date: Mon, 11 Dec 2017 23:48:58 +0200 Subject: [PATCH 6/8] Trying to increase timeout to ratpack specs --- .../ratpack/bulkhead/BulkheadSpec.groovy | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/resilience4j-ratpack/src/test/groovy/io/github/resilience4j/ratpack/bulkhead/BulkheadSpec.groovy b/resilience4j-ratpack/src/test/groovy/io/github/resilience4j/ratpack/bulkhead/BulkheadSpec.groovy index 7a06dec088..e1401d803b 100644 --- a/resilience4j-ratpack/src/test/groovy/io/github/resilience4j/ratpack/bulkhead/BulkheadSpec.groovy +++ b/resilience4j-ratpack/src/test/groovy/io/github/resilience4j/ratpack/bulkhead/BulkheadSpec.groovy @@ -32,14 +32,7 @@ import spock.lang.AutoCleanup import spock.lang.Specification import spock.lang.Unroll -import java.util.concurrent.Callable -import java.util.concurrent.CompletableFuture -import java.util.concurrent.CompletionStage -import java.util.concurrent.CountDownLatch -import java.util.concurrent.ExecutionException -import java.util.concurrent.ExecutorService -import java.util.concurrent.Executors -import java.util.concurrent.TimeUnit +import java.util.concurrent.* import static ratpack.groovy.test.embed.GroovyEmbeddedApp.ratpack @@ -140,7 +133,7 @@ class BulkheadSpec extends Specification { } as Callable) and: - rejectedResponse.get(30, TimeUnit.SECONDS) + rejectedResponse.get(60, TimeUnit.SECONDS) latch.countDown() // unblock blocked response def permittedResponse = executor.submit({ client.get(path) @@ -227,7 +220,7 @@ class BulkheadSpec extends Specification { } as Callable) and: - rejectedResponse.get(30, TimeUnit.SECONDS) + rejectedResponse.get(60, TimeUnit.SECONDS) latch.countDown() // unblock blocked response def permittedResponse = executor.submit({ client.get(path) @@ -305,7 +298,7 @@ class BulkheadSpec extends Specification { } as Callable) and: - rejectedResponse.get(30, TimeUnit.SECONDS) + rejectedResponse.get(60, TimeUnit.SECONDS) latch.countDown() // unblock blocked response def permittedResponse = executor.submit({ client.get(path) From f620a5adda8ffafbccf1b587f44e895db7f9daed Mon Sep 17 00:00:00 2001 From: bstorozhuk Date: Tue, 12 Dec 2017 00:22:35 +0200 Subject: [PATCH 7/8] Ignore if travis --- .../resilience4j/ratpack/bulkhead/BulkheadSpec.groovy | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/resilience4j-ratpack/src/test/groovy/io/github/resilience4j/ratpack/bulkhead/BulkheadSpec.groovy b/resilience4j-ratpack/src/test/groovy/io/github/resilience4j/ratpack/bulkhead/BulkheadSpec.groovy index e1401d803b..4956233796 100644 --- a/resilience4j-ratpack/src/test/groovy/io/github/resilience4j/ratpack/bulkhead/BulkheadSpec.groovy +++ b/resilience4j-ratpack/src/test/groovy/io/github/resilience4j/ratpack/bulkhead/BulkheadSpec.groovy @@ -29,6 +29,7 @@ import ratpack.http.client.ReceivedResponse import ratpack.test.embed.EmbeddedApp import ratpack.test.http.TestHttpClient import spock.lang.AutoCleanup +import spock.lang.IgnoreIf import spock.lang.Specification import spock.lang.Unroll @@ -36,6 +37,7 @@ import java.util.concurrent.* import static ratpack.groovy.test.embed.GroovyEmbeddedApp.ratpack +@IgnoreIf({env.TRAVIS}) @Unroll class BulkheadSpec extends Specification { @@ -133,7 +135,7 @@ class BulkheadSpec extends Specification { } as Callable) and: - rejectedResponse.get(60, TimeUnit.SECONDS) + rejectedResponse.get(5, TimeUnit.SECONDS) latch.countDown() // unblock blocked response def permittedResponse = executor.submit({ client.get(path) @@ -220,7 +222,7 @@ class BulkheadSpec extends Specification { } as Callable) and: - rejectedResponse.get(60, TimeUnit.SECONDS) + rejectedResponse.get(5, TimeUnit.SECONDS) latch.countDown() // unblock blocked response def permittedResponse = executor.submit({ client.get(path) @@ -298,7 +300,7 @@ class BulkheadSpec extends Specification { } as Callable) and: - rejectedResponse.get(60, TimeUnit.SECONDS) + rejectedResponse.get(5, TimeUnit.SECONDS) latch.countDown() // unblock blocked response def permittedResponse = executor.submit({ client.get(path) From c048f8786c95d50c925da22cdfc27685cf70725a Mon Sep 17 00:00:00 2001 From: bstorozhuk Date: Tue, 12 Dec 2017 01:01:18 +0200 Subject: [PATCH 8/8] Skip SemaphoreBulkhead coverage --- build.gradle | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index e964e2f3d0..4c2c04bbaa 100644 --- a/build.gradle +++ b/build.gradle @@ -82,6 +82,7 @@ configure(project.coreProjects) { cobertura { coverageIgnoreTrivial = true + coverageExcludes = ['.*io.github.resilience4j.bulkhead.internal.*'] } jmh { @@ -134,4 +135,4 @@ artifactory { clientConfig.proxy.host = System.properties['https.proxyHost'] clientConfig.proxy.port = System.properties['https.proxyPort'].toInteger() } -} \ No newline at end of file +}