diff --git a/.travis.yml b/.travis.yml
index acde5976eb..05ab6fe2d8 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -6,7 +6,7 @@ before_install:
- chmod +x gradlew
script:
- "./gradlew clean check"
-- sonar-scanner
+- "./gradlew sonarqube"
after_success:
- test $TRAVIS_BRANCH = "master" && ./gradlew artifactoryPublish -PbintrayUsername="${BINTRAY_USER}" -PbintrayApiKey="${BINTRAY_KEY}"
- test $TRAVIS_BRANCH = "master" && ./gradlew asciidoctor
diff --git a/build.gradle b/build.gradle
index 33c183f579..b6ac28342e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -7,8 +7,6 @@ buildscript {
}
}
dependencies {
- classpath 'net.saliman:gradle-cobertura-plugin:2.3.2'
- classpath 'org.kt3k.gradle.plugin:coveralls-gradle-plugin:2.7.1'
classpath 'com.jfrog.bintray.gradle:gradle-bintray-plugin:1.8.4'
classpath 'me.champeau.gradle:jmh-gradle-plugin:0.3.1'
classpath "org.asciidoctor:asciidoctor-gradle-plugin:1.5.3"
@@ -18,9 +16,10 @@ buildscript {
classpath "com.netflix.nebula:gradle-extra-configurations-plugin:4.0.1"
}
}
-
+plugins {
+ id "org.sonarqube" version "2.7"
+}
apply plugin: 'idea'
-apply plugin: 'com.github.kt3k.coveralls'
apply from: "${rootDir}/libraries.gradle"
ext {
@@ -28,7 +27,7 @@ ext {
}
allprojects {
- apply plugin: 'net.saliman.cobertura'
+ apply plugin: 'jacoco'
apply plugin: 'me.champeau.gradle.jmh'
apply plugin: 'com.jfrog.artifactory'
@@ -57,6 +56,7 @@ configure(project.coreProjects) {
apply plugin: 'com.jfrog.bintray'
apply from: "${rootDir}/publishing.gradle"
apply plugin: 'nebula.optional-base'
+ apply plugin: 'jacoco'
dependencies {
compile ( libraries.vavr)
@@ -87,37 +87,68 @@ configure(project.coreProjects) {
}
- cobertura {
- coverageIgnoreTrivial = true
- coverageExcludes = ['.*io.github.resilience4j.bulkhead.internal.*']
- }
-
jmh {
duplicateClassesStrategy = 'warn'
jmhVersion = '1.17'
}
}
-def files = subprojects.collect { new File(it.projectDir, '/build/cobertura/cobertura.ser') }
-cobertura {
- coverageFormats = ['html', 'xml']
- coverageSourceDirs = subprojects.sourceSets.main.allSource.srcDirs.flatten()
- coverageMergeDatafiles = files
+sonarqube {
+ properties {
+ property "sonar.projectName", "resilience4j"
+ property "sonar.projectKey", "resilience4j_resilience4j"
+ property "sonar.modules", "resilience4j-core,resilience4j-feign,resilience4j-metrics,resilience4j-micrometer,resilience4j-prometheus,resilience4j-retry,resilience4j-spring,resilience4j-timelimiter,resilience4j-bulkhead,resilience4j-circuitbreaker,resilience4j-ratelimiter,resilience4j-cache,resilience4j-circularbuffer,resilience4j-consumer,resilience4j-spring-boot,resilience4j-spring-boot2,resilience4j-reactor,resilience4j-rxjava2"
+ property "sonar.projectVersion","0.15.0-SNAPSHOTS"
+
+ property "sonar.links.homepage","https://github.com/resilience4j/resilience4j"
+ property "sonar.links.ci","https://travis-ci.org/resilience4j/resilience4j"
+ property "sonar.links.scm","https://github.com/resilience4j/resilience4j"
+ property "sonar.links.issue","https://github.com/resilience4j/resilience4j/issues"
+
+ property "sonar.java.source","1.8"
+ property "sonar.sources","src/main/java"
+ property "sonar.tests","src/test/java"
+ property "sonar.java.binaries","build"
+ property "sonar.java.test.binaries","build"
+ property "sonar.binaries","build"
+// property "sonar.jacoco.reportPaths","build/reports/jacoco/test"
+
+ property "sonar.language","java"
+
+
+ property "sonar.sourceEncoding","UTF-8"
+ }
}
+def allTestCoverageFile = "$buildDir/jacoco/allTestCoverage.exec"
-test {
- dependsOn(subprojects.test) // required by cobertura to aggregate report
+task jacocoMergeTest(type: JacocoMerge) {
+ destinationFile = file(allTestCoverageFile)
+ executionData = project.fileTree(dir: '.', include:'**/build/jacoco/test.exec')
}
-tasks.coveralls {
- dependsOn 'check'
+task jacocoMerge(dependsOn: ['jacocoMergeTest']) {
+ // used to run the other merge tasks
+}
+
+subprojects {
+ sonarqube {
+ properties {
+ property "sonar.jacoco.reportPaths", allTestCoverageFile
+ }
+ }
+}
+
+tasks.check.dependsOn tasks.jacocoTestReport
+
+
+test {
+ dependsOn(subprojects.test) // required by cobertura to aggregate report
}
task wrapper(type: Wrapper) {
gradleVersion = '4.10.2'
}
-
artifactory {
contextUrl = 'https://oss.jfrog.org'
resolve {
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
index 421fe971ff..94336fcae9 100644
Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 581f92de70..290541c738 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,5 @@
-#Mon Jan 09 12:41:44 CET 2017
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
+distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.3-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-3.2-all.zip
diff --git a/gradlew b/gradlew
index 4453ccea33..cccdd3d517 100755
--- a/gradlew
+++ b/gradlew
@@ -33,11 +33,11 @@ DEFAULT_JVM_OPTS=""
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"
-warn ( ) {
+warn () {
echo "$*"
}
-die ( ) {
+die () {
echo
echo "$*"
echo
@@ -155,7 +155,7 @@ if $cygwin ; then
fi
# Escape application args
-save ( ) {
+save () {
for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
echo " "
}
diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/BulkheadConfig.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/BulkheadConfig.java
index a6f4df6d2c..588b71a2fe 100644
--- a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/BulkheadConfig.java
+++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/BulkheadConfig.java
@@ -23,83 +23,84 @@
*/
public class BulkheadConfig {
- public static final int DEFAULT_MAX_CONCURRENT_CALLS = 25;
- public static final long DEFAULT_MAX_WAIT_TIME = 0L;
+ public static final int DEFAULT_MAX_CONCURRENT_CALLS = 25;
+ public static final long DEFAULT_MAX_WAIT_TIME = 0L;
- private int maxConcurrentCalls = DEFAULT_MAX_CONCURRENT_CALLS;
- private long maxWaitTime = DEFAULT_MAX_WAIT_TIME;
+ private int maxConcurrentCalls = DEFAULT_MAX_CONCURRENT_CALLS;
+ private long maxWaitTime = DEFAULT_MAX_WAIT_TIME;
- private BulkheadConfig() { }
+ private BulkheadConfig() {
+ }
- public int getMaxConcurrentCalls() {
- return maxConcurrentCalls;
- }
+ /**
+ * Returns a builder to create a custom BulkheadConfig.
+ *
+ * @return a {@link Builder}
+ */
+ public static Builder custom() {
+ return new Builder();
+ }
- public long getMaxWaitTime() {
- return maxWaitTime;
- }
+ /**
+ * Creates a default Bulkhead configuration.
+ *
+ * @return a default Bulkhead configuration.
+ */
+ public static BulkheadConfig ofDefaults() {
+ return new Builder().build();
+ }
- /**
- * Returns a builder to create a custom BulkheadConfig.
- *
- * @return a {@link Builder}
- */
- public static Builder custom(){
- return new Builder();
- }
+ public int getMaxConcurrentCalls() {
+ return maxConcurrentCalls;
+ }
- /**
- * Creates a default Bulkhead configuration.
- *
- * @return a default Bulkhead configuration.
- */
- public static BulkheadConfig ofDefaults() {
- return new Builder().build();
- }
+ public long getMaxWaitTime() {
+ return maxWaitTime;
+ }
- public static class Builder {
+ public static class Builder {
- private BulkheadConfig config = new BulkheadConfig();
+ private BulkheadConfig config = new BulkheadConfig();
- /**
- * Configures the max amount of concurrent calls the bulkhead will support.
- *
- * @param maxConcurrentCalls max concurrent calls
- * @return the BulkheadConfig.Builder
- */
- public Builder maxConcurrentCalls(int maxConcurrentCalls) {
- if (maxConcurrentCalls < 1) {
- throw new IllegalArgumentException("maxConcurrentCalls must be a positive integer value >= 1");
- }
- config.maxConcurrentCalls = maxConcurrentCalls;
- return this;
- }
+ /**
+ * Configures the max amount of concurrent calls the bulkhead will support.
+ *
+ * @param maxConcurrentCalls max concurrent calls
+ * @return the BulkheadConfig.Builder
+ */
+ public Builder maxConcurrentCalls(int maxConcurrentCalls) {
+ if (maxConcurrentCalls < 1) {
+ throw new IllegalArgumentException("maxConcurrentCalls must be a positive integer value >= 1");
+ }
+ config.maxConcurrentCalls = maxConcurrentCalls;
+ return this;
+ }
- /**
- * Configures a maximum amount of time in ms the calling thread will wait to enter the bulkhead. If bulkhead has space available, entry
- * is guaranteed and immediate. If bulkhead is full, calling threads will contest for space, if it becomes available. maxWaitTime can be set to 0.
- *
- * Note: for threads running on an event-loop or equivalent (rx computation pool, etc), setting maxWaitTime to 0 is highly recommended. Blocking
- * an event-loop thread will most likely have a negative effect on application throughput.
- *
- * @param maxWaitTime maximum wait time for bulkhead entry
- * @return the BulkheadConfig.Builder
- */
- public Builder maxWaitTime(long maxWaitTime) {
- if (maxWaitTime < 0) {
- throw new IllegalArgumentException("maxWaitTime must be a positive integer value >= 0");
- }
- config.maxWaitTime = maxWaitTime;
- return this;
- }
+ /**
+ * Configures a maximum amount of time in ms the calling thread will wait to enter the bulkhead. If bulkhead has space available, entry
+ * is guaranteed and immediate. If bulkhead is full, calling threads will contest for space, if it becomes available. maxWaitTime can be set to 0.
+ *
+ * Note: for threads running on an event-loop or equivalent (rx computation pool, etc), setting maxWaitTime to 0 is highly recommended. Blocking
+ * an event-loop thread will most likely have a negative effect on application throughput.
+ *
+ * @param maxWaitTime maximum wait time for bulkhead entry
+ * @return the BulkheadConfig.Builder
+ */
+ public Builder maxWaitTime(long maxWaitTime) {
+ if (maxWaitTime < 0) {
+ throw new IllegalArgumentException("maxWaitTime must be a positive integer value >= 0");
+ }
+ config.maxWaitTime = maxWaitTime;
+ return this;
+ }
- /**
- * Builds a BulkheadConfig
- *
- * @return the BulkheadConfig
- */
- public BulkheadConfig build() {
- return config;
- }
- }
+ /**
+ * Builds a BulkheadConfig
+ *
+ * @return the BulkheadConfig
+ */
+ public BulkheadConfig build() {
+ return config;
+ }
+ }
}
diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/ThreadPoolBulkhead.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/ThreadPoolBulkhead.java
new file mode 100644
index 0000000000..f591352cb9
--- /dev/null
+++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/ThreadPoolBulkhead.java
@@ -0,0 +1,241 @@
+/*
+ *
+ * Copyright 2017: Robert Winkler, Lucas Lech
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+package io.github.resilience4j.bulkhead;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionStage;
+import java.util.function.Supplier;
+
+import io.github.resilience4j.bulkhead.event.BulkheadEvent;
+import io.github.resilience4j.bulkhead.event.BulkheadOnCallFinishedEvent;
+import io.github.resilience4j.bulkhead.event.BulkheadOnCallPermittedEvent;
+import io.github.resilience4j.bulkhead.event.BulkheadOnCallRejectedEvent;
+import io.github.resilience4j.bulkhead.internal.FixedThreadPoolBulkhead;
+import io.github.resilience4j.core.EventConsumer;
+
+/**
+ * A Bulkhead instance is thread-safe can be used to decorate multiple requests.
+ */
+public interface ThreadPoolBulkhead {
+
+ /**
+ * Returns a callable which is decorated by a bulkhead.
+ *
+ * @param bulkhead the bulkhead
+ * @param callable the original Callable
+ * @param the result type of callable
+ * @return a supplier which is decorated by a Bulkhead.
+ */
+ static Callable> decorateCallable(ThreadPoolBulkhead bulkhead, Callable callable) {
+ return () -> bulkhead.submit(callable);
+ }
+
+ /**
+ * Returns a supplier which is decorated by a bulkhead.
+ *
+ * @param bulkhead the bulkhead
+ * @param supplier the original supplier
+ * @param the type of results supplied by this supplier
+ * @return a supplier which is decorated by a Bulkhead.
+ */
+ static Supplier> decorateSupplier(ThreadPoolBulkhead bulkhead, Supplier supplier) {
+ return () -> bulkhead.submit(supplier::get);
+ }
+
+ /**
+ * Returns a runnable which is decorated by a bulkhead.
+ *
+ * @param bulkhead the bulkhead
+ * @param runnable the original runnable
+ * @return a runnable which is decorated by a bulkhead.
+ */
+ static Runnable decorateRunnable(ThreadPoolBulkhead bulkhead, Runnable runnable) {
+ return () -> bulkhead.submit(runnable);
+ }
+
+ /**
+ * Create a Bulkhead with a default configuration.
+ *
+ * @param name the name of the bulkhead
+ * @return a Bulkhead instance
+ */
+ static ThreadPoolBulkhead ofDefaults(String name) {
+ return new FixedThreadPoolBulkhead(name);
+ }
+
+ /**
+ * Creates a bulkhead with a custom configuration
+ *
+ * @param name the name of the bulkhead
+ * @param config a custom BulkheadConfig configuration
+ * @return a Bulkhead instance
+ */
+ static ThreadPoolBulkhead of(String name, ThreadPoolBulkheadConfig config) {
+ return new FixedThreadPoolBulkhead(name, config);
+ }
+
+ /**
+ * Creates a bulkhead with a custom configuration
+ *
+ * @param name the name of the bulkhead
+ * @param bulkheadConfigSupplier custom configuration supplier
+ * @return a Bulkhead instance
+ */
+ static ThreadPoolBulkhead of(String name, Supplier bulkheadConfigSupplier) {
+ return new FixedThreadPoolBulkhead(name, bulkheadConfigSupplier);
+ }
+
+ /**
+ * Submits a value-returning task for execution and returns a
+ * Future representing the pending results of the task.
+ *
+ * @param task the task to submit
+ * @param the type of the task's result
+ * @return a CompletableFuture representing listenable future completion of the task
+ * @throws BulkheadFullException if the no permits
+ */
+ CompletionStage submit(Callable task);
+
+ /**
+ * Submits a task for execution.
+ *
+ * @param task the task to submit
+ * @throws BulkheadFullException if the no permits
+ */
+ void submit(Runnable task);
+
+ /**
+ * Returns the name of this bulkhead.
+ *
+ * @return the name of this bulkhead
+ */
+ String getName();
+
+ /**
+ * Returns the ThreadPoolBulkheadConfig of this Bulkhead.
+ *
+ * @return bulkhead config
+ */
+ ThreadPoolBulkheadConfig getBulkheadConfig();
+
+ /**
+ * Get the Metrics of this Bulkhead.
+ *
+ * @return the Metrics of this Bulkhead
+ */
+ Metrics getMetrics();
+
+ /**
+ * Returns an EventPublisher which subscribes to the reactive stream of BulkheadEvent and
+ * can be used to register event consumers.
+ *
+ * @return an EventPublisher
+ */
+ ThreadPoolBulkheadEventPublisher getEventPublisher();
+
+ /**
+ * Decorates and executes the decorated Supplier.
+ *
+ * @param supplier the original Supplier
+ * @param the type of results supplied by this supplier
+ * @return the result of the decorated Supplier.
+ * @throws BulkheadFullException if the no permits
+ */
+ default CompletionStage executeSupplier(Supplier supplier) {
+ return decorateSupplier(this, supplier).get();
+ }
+
+ /**
+ * Decorates and executes the decorated Callable.
+ *
+ * @param callable the original Callable
+ * @param the result type of callable
+ * @return the result of the decorated Callable.
+ * @throws Exception if unable to compute a result
+ */
+ default CompletionStage executeCallable(Callable callable) throws Exception {
+ return decorateCallable(this, callable).call();
+ }
+
+ /**
+ * Decorates and executes the decorated Runnable.
+ *
+ * @param runnable the original Runnable
+ */
+ default void executeRunnable(Runnable runnable) {
+ decorateRunnable(this, runnable).run();
+ }
+
+ interface Metrics {
+
+ /**
+ * Returns the core number of threads.
+ *
+ * @return the core number of threads
+ */
+ int getCoreThreadPoolSize();
+
+ /**
+ * Returns the current number of threads in the pool.
+ *
+ * @return the current number of threads
+ */
+ int getThreadPoolSize();
+
+ /**
+ * Returns the maximum allowed number of threads.
+ *
+ * @return the maximum allowed number of threads
+ */
+ int getMaximumThreadPoolSize();
+
+ /**
+ * Returns the number of tasks in the queue.
+ *
+ * @return the number of tasks in the queue
+ */
+ int getQueueDepth();
+
+ /**
+ * Returns the remaining queue capacity.
+ *
+ * @return the remaining queue capacity
+ */
+ int getRemainingQueueCapacity();
+
+ /**
+ * Returns the queue capacity.
+ *
+ * @return the queue capacity
+ */
+ int getQueueCapacity();
+ }
+
+ /**
+ * An EventPublisher which can be used to register event consumers.
+ */
+ interface ThreadPoolBulkheadEventPublisher extends io.github.resilience4j.core.EventPublisher {
+
+ ThreadPoolBulkheadEventPublisher onCallRejected(EventConsumer eventConsumer);
+
+ ThreadPoolBulkheadEventPublisher onCallPermitted(EventConsumer eventConsumer);
+
+ ThreadPoolBulkheadEventPublisher onCallFinished(EventConsumer eventConsumer);
+ }
+}
diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/ThreadPoolBulkheadConfig.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/ThreadPoolBulkheadConfig.java
new file mode 100644
index 0000000000..8627a6d4c5
--- /dev/null
+++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/ThreadPoolBulkheadConfig.java
@@ -0,0 +1,147 @@
+/*
+ *
+ * Copyright 2016 Robert Winkler, Lucas Lech, Mahmoud Romeh
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+package io.github.resilience4j.bulkhead;
+
+/**
+ * A {@link ThreadPoolBulkheadConfig} configures a {@link Bulkhead}
+ */
+public class ThreadPoolBulkheadConfig {
+
+ public static final int DEFAULT_QUEUE_CAPACITY = 100;
+ public static final long DEFAULT_KEEP_ALIVE_TIME = 20L;
+ public static final int DEFAULT_CORE_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors() > 1 ? Runtime.getRuntime().availableProcessors() - 1 : 1;
+ public static final int DEFAULT_MAX_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors();
+
+ private int maxThreadPoolSize = DEFAULT_MAX_THREAD_POOL_SIZE;
+ private int coreThreadPoolSize = DEFAULT_CORE_THREAD_POOL_SIZE;
+ private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
+ private long keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
+
+ private ThreadPoolBulkheadConfig() {
+ }
+
+ /**
+ * Returns a builder to create a custom BulkheadConfig.
+ *
+ * @return a {@link Builder}
+ */
+ public static Builder custom() {
+ return new Builder();
+ }
+
+ /**
+ * Creates a default Bulkhead configuration.
+ *
+ * @return a default Bulkhead configuration.
+ */
+ public static ThreadPoolBulkheadConfig ofDefaults() {
+ return new Builder().build();
+ }
+
+ public long getKeepAliveTime() {
+ return keepAliveTime;
+ }
+
+ public int getQueueCapacity() {
+ return queueCapacity;
+ }
+
+ public int getMaxThreadPoolSize() {
+ return maxThreadPoolSize;
+ }
+
+ public int getCoreThreadPoolSize() {
+ return coreThreadPoolSize;
+ }
+
+ public static class Builder {
+
+ private final ThreadPoolBulkheadConfig config = new ThreadPoolBulkheadConfig();
+
+ /**
+ * Configures the max thread pool size.
+ *
+ * @param maxThreadPoolSize max thread pool size
+ * @return the BulkheadConfig.Builder
+ */
+ public Builder maxThreadPoolSize(int maxThreadPoolSize) {
+ if (maxThreadPoolSize < 1) {
+ throw new IllegalArgumentException("maxThreadPoolSize must be a positive integer value >= 1");
+ }
+ config.maxThreadPoolSize = maxThreadPoolSize;
+ return this;
+ }
+
+ /**
+ * Configures the core thread pool size.
+ *
+ * @param coreThreadPoolSize core thread pool size
+ * @return the BulkheadConfig.Builder
+ */
+ public Builder coreThreadPoolSize(int coreThreadPoolSize) {
+ if (coreThreadPoolSize < 1) {
+ throw new IllegalArgumentException("coreThreadPoolSize must be a positive integer value >= 1");
+ }
+ config.coreThreadPoolSize = coreThreadPoolSize;
+ return this;
+ }
+
+ /**
+ * Configures the capacity of the queue.
+ *
+ * @param queueCapacity max concurrent calls
+ * @return the BulkheadConfig.Builder
+ */
+ public Builder queueCapacity(int queueCapacity) {
+ if (queueCapacity < 1) {
+ throw new IllegalArgumentException("queueCapacity must be a positive integer value >= 1");
+ }
+ config.queueCapacity = queueCapacity;
+ return this;
+ }
+
+ /**
+ * when the number of threads is greater than
+ * the core, this is the maximum time that excess idle threads
+ * will wait for new tasks before terminating.
+ *
+ * @param keepAliveTime maximum wait time for bulkhead thread pool idle thread
+ * @return the BulkheadConfig.Builder
+ */
+ public Builder keepAliveTime(long keepAliveTime) {
+ if (keepAliveTime < 0) {
+ throw new IllegalArgumentException("keepAliveTime must be a positive integer value >= 0");
+ }
+ config.keepAliveTime = keepAliveTime;
+ return this;
+ }
+
+ /**
+ * Builds a BulkheadConfig
+ *
+ * @return the BulkheadConfig
+ */
+ public ThreadPoolBulkheadConfig build() {
+ if (config.maxThreadPoolSize < config.coreThreadPoolSize) {
+ throw new IllegalArgumentException("maxThreadPoolSize must be a greater than or equals to coreThreadPoolSize");
+ }
+ return config;
+ }
+ }
+}
diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/ThreadPoolBulkheadRegistry.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/ThreadPoolBulkheadRegistry.java
new file mode 100644
index 0000000000..f117fd6aff
--- /dev/null
+++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/ThreadPoolBulkheadRegistry.java
@@ -0,0 +1,91 @@
+/*
+ *
+ * Copyright 2017 Robert Winkler, Lucas Lech, Mahmoud Romeh
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+package io.github.resilience4j.bulkhead;
+
+
+import java.util.function.Supplier;
+
+import io.github.resilience4j.bulkhead.internal.InMemoryThreadPoolBulkheadRegistry;
+import io.vavr.collection.Seq;
+
+/**
+ * The {@link ThreadPoolBulkheadRegistry} is a factory to create ThreadPoolBulkhead instances which stores all bulkhead instances in a registry.
+ */
+public interface ThreadPoolBulkheadRegistry {
+
+ /**
+ * Creates a BulkheadRegistry with a custom Bulkhead configuration.
+ *
+ * @param bulkheadConfig a custom ThreadPoolBulkhead configuration
+ * @return a ThreadPoolBulkheadRegistry instance backed by a custom ThreadPoolBulkhead configuration
+ */
+ static ThreadPoolBulkheadRegistry of(ThreadPoolBulkheadConfig bulkheadConfig) {
+ return new InMemoryThreadPoolBulkheadRegistry(bulkheadConfig);
+ }
+
+ /**
+ * Creates a ThreadPoolBulkheadRegistry with a default ThreadPoolBulkhead configuration
+ *
+ * @return a ThreadPoolBulkheadRegistry instance backed by a default ThreadPoolBulkhead configuration
+ */
+ static ThreadPoolBulkheadRegistry ofDefaults() {
+ return new InMemoryThreadPoolBulkheadRegistry(ThreadPoolBulkheadConfig.ofDefaults());
+ }
+
+ /**
+ * Returns all managed {@link ThreadPoolBulkhead} instances.
+ *
+ * @return all managed {@link ThreadPoolBulkhead} instances.
+ */
+ Seq getAllBulkheads();
+
+ /**
+ * Returns a managed {@link ThreadPoolBulkhead} or creates a new one with default configuration.
+ *
+ * @param name the name of the ThreadPoolBulkhead
+ * @return The {@link ThreadPoolBulkhead}
+ */
+ ThreadPoolBulkhead bulkhead(String name);
+
+ /**
+ * Returns a managed {@link ThreadPoolBulkhead} or creates a new one with a custom BulkheadConfig configuration.
+ *
+ * @param name the name of the ThreadPoolBulkhead
+ * @param bulkheadConfig a custom ThreadPoolBulkheadConfig configuration
+ * @return The {@link ThreadPoolBulkhead}
+ */
+ ThreadPoolBulkhead bulkhead(String name, ThreadPoolBulkheadConfig bulkheadConfig);
+
+ /**
+ * Returns a managed {@link ThreadPoolBulkhead} or creates a new one with a custom BulkheadConfig configuration.
+ *
+ * @param name the name of the ThreadPoolBulkhead
+ * @param bulkheadConfigSupplier a custom ThreadPoolBulkhead configuration supplier
+ * @return The {@link ThreadPoolBulkhead}
+ */
+ ThreadPoolBulkhead bulkhead(String name, Supplier bulkheadConfigSupplier);
+
+ /**
+ * Returns a default ThreadPoolBulkheadConfig instance this registry is using.
+ *
+ * @return ThreadPoolBulkheadConfig instance
+ */
+ ThreadPoolBulkheadConfig getDefaultBulkheadConfig();
+
+}
diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/FixedThreadPoolBulkhead.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/FixedThreadPoolBulkhead.java
new file mode 100644
index 0000000000..da649f2db2
--- /dev/null
+++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/FixedThreadPoolBulkhead.java
@@ -0,0 +1,268 @@
+/*
+ *
+ * Copyright 2019 Robert Winkler, Mahmoud Romeh
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+package io.github.resilience4j.bulkhead.internal;
+
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import io.github.resilience4j.bulkhead.BulkheadFullException;
+import io.github.resilience4j.bulkhead.ThreadPoolBulkhead;
+import io.github.resilience4j.bulkhead.ThreadPoolBulkheadConfig;
+import io.github.resilience4j.bulkhead.event.BulkheadEvent;
+import io.github.resilience4j.bulkhead.event.BulkheadOnCallFinishedEvent;
+import io.github.resilience4j.bulkhead.event.BulkheadOnCallPermittedEvent;
+import io.github.resilience4j.bulkhead.event.BulkheadOnCallRejectedEvent;
+import io.github.resilience4j.core.EventConsumer;
+import io.github.resilience4j.core.EventProcessor;
+
+/**
+ * A Bulkhead implementation based on a fixed ThreadPoolExecutor.
+ * which is based into the thread pool execution handling :
+ * 1- submit service call through bulk head thread pool
+ * 2- if there is free thread from the thread pool or the queue is not yet full , it will be permitted
+ * 3- otherwise the thread pool will throw RejectedExecutionException which mean is not permitted
+ */
+public class FixedThreadPoolBulkhead implements ThreadPoolBulkhead {
+
+ private final String name;
+ private final ThreadPoolExecutor executorService;
+ private final FixedThreadPoolBulkhead.BulkheadMetrics metrics;
+ private final FixedThreadPoolBulkhead.BulkheadEventProcessor eventProcessor;
+ private final ThreadPoolBulkheadConfig config;
+
+ /**
+ * Creates a bulkhead using a configuration supplied
+ *
+ * @param name the name of this bulkhead
+ * @param bulkheadConfig custom bulkhead configuration
+ */
+ public FixedThreadPoolBulkhead(String name, ThreadPoolBulkheadConfig bulkheadConfig) {
+ this.name = name;
+ this.config = bulkheadConfig != null ? bulkheadConfig
+ : ThreadPoolBulkheadConfig.ofDefaults();
+ // init thread pool executor
+ this.executorService = new ThreadPoolExecutor(config.getCoreThreadPoolSize(), config.getMaxThreadPoolSize(),
+ config.getKeepAliveTime(), TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<>(config.getQueueCapacity()));
+ // adding prover jvm executor shutdown
+ cleanup();
+ this.metrics = new FixedThreadPoolBulkhead.BulkheadMetrics();
+ this.eventProcessor = new FixedThreadPoolBulkhead.BulkheadEventProcessor();
+ }
+
+ /**
+ * Creates a bulkhead with a default config.
+ *
+ * @param name the name of this bulkhead
+ */
+ public FixedThreadPoolBulkhead(String name) {
+ this(name, ThreadPoolBulkheadConfig.ofDefaults());
+ }
+
+ /**
+ * Create a bulkhead using a configuration supplier
+ *
+ * @param name the name of this bulkhead
+ * @param configSupplier BulkheadConfig supplier
+ */
+ public FixedThreadPoolBulkhead(String name, Supplier configSupplier) {
+ this(name, configSupplier.get());
+ }
+
+ /**
+ * @param callable the callable to execute through bulk head thread pool
+ * @param the result type
+ * @return the callable returned result
+ */
+ @Override
+ public CompletableFuture submit(Callable callable) {
+ final CompletableFuture promise = new CompletableFuture<>();
+ try {
+ CompletableFuture.supplyAsync(() -> {
+ try {
+ publishBulkheadEvent(() -> new BulkheadOnCallPermittedEvent(name));
+ return callable.call();
+ } catch (Exception e) {
+ throw new CompletionException(e);
+ }
+ }, executorService).whenComplete((result, throwable) -> {
+ publishBulkheadEvent(() -> new BulkheadOnCallFinishedEvent(name));
+ if (throwable != null) {
+ promise.completeExceptionally(throwable);
+ } else {
+ promise.complete(result);
+ }
+ });
+ } catch (RejectedExecutionException rejected) {
+ publishBulkheadEvent(() -> new BulkheadOnCallRejectedEvent(name));
+ throw new BulkheadFullException(String.format("ThreadPoolBulkhead '%s' is full", name));
+ }
+ return promise;
+ }
+
+ /**
+ * @param runnable the runnable to execute through bulk head thread pool
+ */
+ @Override
+ public void submit(Runnable runnable) {
+ try {
+ CompletableFuture.runAsync(() -> {
+ try {
+ publishBulkheadEvent(() -> new BulkheadOnCallPermittedEvent(name));
+ runnable.run();
+ } catch (Exception e) {
+ throw new CompletionException(e);
+ }
+ }, executorService).whenComplete((voidResult, throwable) -> publishBulkheadEvent(() -> new BulkheadOnCallFinishedEvent(name)));
+ } catch (RejectedExecutionException rejected) {
+ publishBulkheadEvent(() -> new BulkheadOnCallRejectedEvent(name));
+ throw new BulkheadFullException(String.format("ThreadPoolBulkhead '%s' is full", name));
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getName() {
+ return this.name;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ThreadPoolBulkheadConfig getBulkheadConfig() {
+ return config;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Metrics getMetrics() {
+ return metrics;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ThreadPoolBulkheadEventPublisher getEventPublisher() {
+ return eventProcessor;
+ }
+
+ private void publishBulkheadEvent(Supplier eventSupplier) {
+ if (eventProcessor.hasConsumers()) {
+ eventProcessor.consumeEvent(eventSupplier.get());
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("FixedThreadPoolBulkhead '%s'", this.name);
+ }
+
+ private void cleanup() {
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ executorService.shutdown();
+ try {
+ if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+ executorService.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ if (!executorService.isTerminated()) {
+ executorService.shutdownNow();
+ }
+ Thread.currentThread().interrupt();
+ }
+ }));
+ }
+
+ private class BulkheadEventProcessor extends EventProcessor implements ThreadPoolBulkheadEventPublisher, EventConsumer {
+
+ @Override
+ public ThreadPoolBulkheadEventPublisher onCallPermitted(EventConsumer onCallPermittedEventConsumer) {
+ registerConsumer(BulkheadOnCallPermittedEvent.class, onCallPermittedEventConsumer);
+ return this;
+ }
+
+ @Override
+ public ThreadPoolBulkheadEventPublisher onCallRejected(EventConsumer onCallRejectedEventConsumer) {
+ registerConsumer(BulkheadOnCallRejectedEvent.class, onCallRejectedEventConsumer);
+ return this;
+ }
+
+ @Override
+ public ThreadPoolBulkheadEventPublisher onCallFinished(EventConsumer onCallFinishedEventConsumer) {
+ registerConsumer(BulkheadOnCallFinishedEvent.class, onCallFinishedEventConsumer);
+ return this;
+ }
+
+ @Override
+ public void consumeEvent(BulkheadEvent event) {
+ super.processEvent(event);
+ }
+ }
+
+ /**
+ * the thread pool bulk head metrics
+ */
+ private final class BulkheadMetrics implements Metrics {
+ private BulkheadMetrics() {
+ }
+
+ @Override
+ public int getCoreThreadPoolSize() {
+ return executorService.getCorePoolSize();
+ }
+
+ @Override
+ public int getThreadPoolSize() {
+ return executorService.getPoolSize();
+ }
+
+ @Override
+ public int getMaximumThreadPoolSize() {
+ return executorService.getMaximumPoolSize();
+ }
+
+ @Override
+ public int getQueueDepth() {
+ return executorService.getQueue().size();
+ }
+
+ @Override
+ public int getRemainingQueueCapacity() {
+ return executorService.getQueue().remainingCapacity();
+ }
+
+ @Override
+ public int getQueueCapacity() {
+ return config.getQueueCapacity();
+ }
+ }
+}
diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/InMemoryThreadPoolBulkheadRegistry.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/InMemoryThreadPoolBulkheadRegistry.java
new file mode 100644
index 0000000000..6a1d847b0d
--- /dev/null
+++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/InMemoryThreadPoolBulkheadRegistry.java
@@ -0,0 +1,85 @@
+/*
+ *
+ * Copyright 2017 Robert Winkler, Lucas Lech, Mahmoud Romeh
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+package io.github.resilience4j.bulkhead.internal;
+
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Supplier;
+
+import io.github.resilience4j.bulkhead.ThreadPoolBulkhead;
+import io.github.resilience4j.bulkhead.ThreadPoolBulkheadConfig;
+import io.github.resilience4j.bulkhead.ThreadPoolBulkheadRegistry;
+import io.vavr.collection.Array;
+import io.vavr.collection.Seq;
+
+/**
+ * Thread pool Bulkhead instance manager;
+ * Constructs/returns thread pool bulkhead instances.
+ */
+public final class InMemoryThreadPoolBulkheadRegistry implements ThreadPoolBulkheadRegistry {
+
+ private final ThreadPoolBulkheadConfig defaultBulkheadConfig;
+
+ /**
+ * The bulkheads, indexed by name
+ */
+ private final ConcurrentMap bulkheads;
+
+ /**
+ * The constructor with custom default bulkhead config
+ *
+ * @param bulkheadConfig custom bulkhead config to use
+ */
+ public InMemoryThreadPoolBulkheadRegistry(ThreadPoolBulkheadConfig bulkheadConfig) {
+ this.defaultBulkheadConfig = bulkheadConfig;
+ this.bulkheads = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public Seq getAllBulkheads() {
+ return Array.ofAll(bulkheads.values());
+ }
+
+ @Override
+ public ThreadPoolBulkhead bulkhead(String name) {
+ return bulkhead(name, defaultBulkheadConfig);
+ }
+
+ @Override
+ public ThreadPoolBulkhead bulkhead(String name, ThreadPoolBulkheadConfig bulkheadConfig) {
+ return bulkheads.computeIfAbsent(
+ Objects.requireNonNull(name, "Name must not be null"),
+ k -> ThreadPoolBulkhead.of(name, bulkheadConfig)
+ );
+ }
+
+ @Override
+ public ThreadPoolBulkhead bulkhead(String name, Supplier bulkheadConfigSupplier) {
+ return bulkheads.computeIfAbsent(
+ Objects.requireNonNull(name, "Name must not be null"),
+ k -> ThreadPoolBulkhead.of(name, bulkheadConfigSupplier.get())
+ );
+ }
+
+ @Override
+ public ThreadPoolBulkheadConfig getDefaultBulkheadConfig() {
+ return defaultBulkheadConfig;
+ }
+}
diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java
index a5a5cc7aa8..2bb77aa2f2 100644
--- a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java
+++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java
@@ -19,6 +19,10 @@
package io.github.resilience4j.bulkhead.internal;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadConfig;
import io.github.resilience4j.bulkhead.event.BulkheadEvent;
@@ -29,10 +33,6 @@
import io.github.resilience4j.core.EventProcessor;
import io.github.resilience4j.core.lang.Nullable;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-
/**
* A Bulkhead implementation based on a semaphore.
*/
@@ -187,7 +187,7 @@ public String toString() {
boolean tryEnterBulkhead() {
- boolean callPermitted = false;
+ boolean callPermitted;
long timeout = config.getMaxWaitTime();
if (timeout == 0) {
diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/utils/BulkheadUtils.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/utils/BulkheadUtils.java
index 05ed3702ec..600b81a04a 100644
--- a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/utils/BulkheadUtils.java
+++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/utils/BulkheadUtils.java
@@ -23,9 +23,12 @@
public final class BulkheadUtils {
- public static void isCallPermitted(Bulkhead bulkhead) {
- if(!bulkhead.isCallPermitted()) {
- throw new BulkheadFullException(String.format("Bulkhead '%s' is full", bulkhead.getName()));
- }
- }
+ private BulkheadUtils() {
+ }
+
+ public static void isCallPermitted(Bulkhead bulkhead) {
+ if (!bulkhead.isCallPermitted()) {
+ throw new BulkheadFullException(String.format("Bulkhead '%s' is full", bulkhead.getName()));
+ }
+ }
}
diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/utils/MetricNames.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/utils/MetricNames.java
index 213dc7bc30..7e4573fdfd 100644
--- a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/utils/MetricNames.java
+++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/utils/MetricNames.java
@@ -1,6 +1,8 @@
package io.github.resilience4j.bulkhead.utils;
public class MetricNames {
+ private MetricNames() {
+ }
public static final String DEFAULT_PREFIX = "resilience4j.bulkhead";
public static final String AVAILABLE_CONCURRENT_CALLS = "available_concurrent_calls";
public static final String MAX_ALLOWED_CONCURRENT_CALLS = "max_allowed_concurrent_calls";
diff --git a/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/ThreadPoolBulkheadConfigTest.java b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/ThreadPoolBulkheadConfigTest.java
new file mode 100644
index 0000000000..9bcea997e5
--- /dev/null
+++ b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/ThreadPoolBulkheadConfigTest.java
@@ -0,0 +1,86 @@
+/*
+ *
+ * Copyright 2017 Robert Winkler, Lucas Lech
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+package io.github.resilience4j.bulkhead;
+
+import static org.assertj.core.api.Java6Assertions.assertThat;
+
+import org.junit.Test;
+
+public class ThreadPoolBulkheadConfigTest {
+
+ @Test
+ public void testBuildCustom() {
+
+ // given
+ int maxThreadPoolSize = 20;
+ int coreThreadPoolSize = 2;
+ long maxWait = 555;
+ int queueCapacity = 50;
+
+ // when
+ ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom()
+ .maxThreadPoolSize(maxThreadPoolSize)
+ .coreThreadPoolSize(coreThreadPoolSize)
+ .queueCapacity(queueCapacity)
+ .keepAliveTime(maxWait)
+ .build();
+
+ // then
+ assertThat(config).isNotNull();
+ assertThat(config.getMaxThreadPoolSize()).isEqualTo(maxThreadPoolSize);
+ assertThat(config.getCoreThreadPoolSize()).isEqualTo(coreThreadPoolSize);
+ assertThat(config.getKeepAliveTime()).isEqualTo(maxWait);
+ assertThat(config.getQueueCapacity()).isEqualTo(queueCapacity);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBuildWithIllegalMaxThreadPoolSize() {
+ // when
+ ThreadPoolBulkheadConfig.custom()
+ .maxThreadPoolSize(-1)
+ .build();
+
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBuildWithIllegalCoreThreadPoolSize() {
+ // when
+ ThreadPoolBulkheadConfig.custom()
+ .coreThreadPoolSize(-1)
+ .build();
+
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBuildWithIllegalMaxWait() {
+ // when
+ ThreadPoolBulkheadConfig.custom()
+ .keepAliveTime(-1)
+ .build();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBuildWithIllegalQueueCapacity() {
+ // when
+ ThreadPoolBulkheadConfig.custom()
+ .queueCapacity(-1)
+ .build();
+ }
+
+}
diff --git a/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/ThreadPoolBulkheadEventPublisherTest.java b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/ThreadPoolBulkheadEventPublisherTest.java
new file mode 100644
index 0000000000..7b4d7ef9d1
--- /dev/null
+++ b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/ThreadPoolBulkheadEventPublisherTest.java
@@ -0,0 +1,149 @@
+/*
+ *
+ * Copyright 2017 Robert Winkler, Lucas Lech, Mahmoud Romeh
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+package io.github.resilience4j.bulkhead;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.BDDMockito.then;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.BDDMockito;
+import org.slf4j.Logger;
+
+import com.jayway.awaitility.Awaitility;
+import com.jayway.awaitility.Duration;
+
+import io.github.resilience4j.test.HelloWorldService;
+
+public class ThreadPoolBulkheadEventPublisherTest {
+
+ private HelloWorldService helloWorldService;
+ private ThreadPoolBulkheadConfig config;
+ private Logger logger;
+ private ThreadPoolBulkhead bulkhead;
+
+ @Before
+ public void setUp() {
+ helloWorldService = mock(HelloWorldService.class);
+ config = ThreadPoolBulkheadConfig.custom()
+ .maxThreadPoolSize(1)
+ .coreThreadPoolSize(1)
+ .build();
+
+ bulkhead = ThreadPoolBulkhead.of("test", config);
+
+ logger = mock(Logger.class);
+ Awaitility.reset();
+ }
+
+ @Test
+ public void shouldReturnTheSameConsumer() {
+ ThreadPoolBulkhead.ThreadPoolBulkheadEventPublisher eventPublisher = bulkhead.getEventPublisher();
+ ThreadPoolBulkhead.ThreadPoolBulkheadEventPublisher eventPublisher2 = bulkhead.getEventPublisher();
+
+ assertThat(eventPublisher).isEqualTo(eventPublisher2);
+ }
+
+ @Test
+ public void shouldConsumeOnCallRejectedEvent() throws ExecutionException, InterruptedException {
+ // Given
+ ThreadPoolBulkhead bulkhead = ThreadPoolBulkhead.of("test", ThreadPoolBulkheadConfig.custom()
+ .maxThreadPoolSize(1)
+ .coreThreadPoolSize(1)
+ .queueCapacity(1)
+ .build());
+
+ BDDMockito.given(helloWorldService.returnHelloWorld()).willReturn("Hello world");
+
+ // When
+ bulkhead.getEventPublisher()
+ .onCallRejected(event ->
+ logger.info(event.getEventType().toString()));
+ final Exception exception = new Exception();
+ // When
+ new Thread(() -> {
+ try {
+ bulkhead.executeRunnable(() -> {
+ final AtomicInteger counter = new AtomicInteger(0);
+ Awaitility.waitAtMost(Duration.TWO_HUNDRED_MILLISECONDS).until(() -> counter.incrementAndGet() >= 2);
+ });
+ } catch (Exception e) {
+ exception.initCause(e);
+ }
+
+ }).start();
+ new Thread(() -> {
+ try {
+ bulkhead.executeCallable(helloWorldService::returnHelloWorld);
+ } catch (Exception e) {
+ exception.initCause(e);
+ }
+ }).start();
+ new Thread(() -> {
+ try {
+ bulkhead.executeCallable(helloWorldService::returnHelloWorld);
+ } catch (Exception e) {
+ exception.initCause(e);
+ }
+ }).start();
+
+ final AtomicInteger counter = new AtomicInteger(0);
+ Awaitility.waitAtMost(Duration.FIVE_HUNDRED_MILLISECONDS).until(() -> counter.incrementAndGet() >= 2);
+ // Then
+ assertThat(exception).hasCauseInstanceOf(BulkheadFullException.class);
+ then(logger).should(times(1)).info("CALL_REJECTED");
+ }
+
+ @Test
+ public void shouldConsumeOnCallPermittedEvent() throws ExecutionException, InterruptedException {
+ // Given
+ ThreadPoolBulkhead bulkhead = ThreadPoolBulkhead.of("test", config);
+ BDDMockito.given(helloWorldService.returnHelloWorld()).willReturn("Hello world");
+
+ // When
+ bulkhead.getEventPublisher()
+ .onCallPermitted(event ->
+ logger.info(event.getEventType().toString()));
+
+ String result = bulkhead.executeSupplier(helloWorldService::returnHelloWorld).toCompletableFuture().get();
+
+ // Then
+ assertThat(result).isEqualTo("Hello world");
+ then(logger).should(times(1)).info("CALL_PERMITTED");
+ }
+
+ @Test
+ public void shouldConsumeOnCallFinishedEventWhenExecutionIsFinished() throws Exception {
+ // Given
+ ThreadPoolBulkhead bulkhead = ThreadPoolBulkhead.of("test", config);
+ BDDMockito.given(helloWorldService.returnHelloWorld()).willReturn("Hello world");
+ // When
+ bulkhead.getEventPublisher()
+ .onCallFinished(event ->
+ logger.info(event.getEventType().toString()));
+ bulkhead.executeSupplier(helloWorldService::returnHelloWorld).toCompletableFuture().get();
+ // Then
+ then(logger).should(times(1)).info("CALL_FINISHED");
+ }
+}
diff --git a/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/ThreadPoolBulkheadRegistryTest.java b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/ThreadPoolBulkheadRegistryTest.java
new file mode 100644
index 0000000000..4656472d93
--- /dev/null
+++ b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/ThreadPoolBulkheadRegistryTest.java
@@ -0,0 +1,87 @@
+/*
+ *
+ * Copyright 2017 Robert Winkler, Mahmoud Romeh
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+package io.github.resilience4j.bulkhead;
+
+import static org.assertj.core.api.BDDAssertions.assertThat;
+
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class ThreadPoolBulkheadRegistryTest {
+
+ private ThreadPoolBulkheadConfig config;
+ private ThreadPoolBulkheadRegistry registry;
+
+ @Before
+ public void setUp() {
+
+ // registry with default config
+ registry = ThreadPoolBulkheadRegistry.ofDefaults();
+
+ // registry with custom config
+ config = ThreadPoolBulkheadConfig.custom()
+ .maxThreadPoolSize(100)
+ .build();
+ }
+
+ @Test
+ public void shouldReturnCustomConfig() {
+
+ // give
+ ThreadPoolBulkheadRegistry registry = ThreadPoolBulkheadRegistry.of(config);
+
+ // when
+ ThreadPoolBulkheadConfig bulkheadConfig = registry.getDefaultBulkheadConfig();
+
+ // then
+ assertThat(bulkheadConfig).isSameAs(config);
+ }
+
+ @Test
+ public void shouldReturnTheCorrectName() {
+
+ ThreadPoolBulkhead bulkhead = registry.bulkhead("test");
+
+ assertThat(bulkhead).isNotNull();
+ assertThat(bulkhead.getName()).isEqualTo("test");
+ assertThat(bulkhead.getBulkheadConfig().getMaxThreadPoolSize()).isEqualTo(Runtime.getRuntime().availableProcessors());
+ }
+
+ @Test
+ public void shouldBeTheSameInstance() {
+
+ ThreadPoolBulkhead bulkhead1 = registry.bulkhead("test", config);
+ ThreadPoolBulkhead bulkhead2 = registry.bulkhead("test", config);
+
+ assertThat(bulkhead1).isSameAs(bulkhead2);
+ assertThat(registry.getAllBulkheads()).hasSize(1);
+ }
+
+ @Test
+ public void shouldBeNotTheSameInstance() {
+
+ ThreadPoolBulkhead bulkhead1 = registry.bulkhead("test1");
+ ThreadPoolBulkhead bulkhead2 = registry.bulkhead("test2");
+
+ assertThat(bulkhead1).isNotSameAs(bulkhead2);
+ assertThat(registry.getAllBulkheads()).hasSize(2);
+ }
+
+}
diff --git a/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/ThreadPoolBulkheadTest.java b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/ThreadPoolBulkheadTest.java
new file mode 100644
index 0000000000..5f32e56b63
--- /dev/null
+++ b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/ThreadPoolBulkheadTest.java
@@ -0,0 +1,154 @@
+/*
+ *
+ * Copyright 2017 Robert Winkler, Lucas Lech, Mahmoud Romeh
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+package io.github.resilience4j.bulkhead;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.times;
+
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.BDDMockito;
+import org.mockito.Mockito;
+
+import com.jayway.awaitility.Awaitility;
+import com.jayway.awaitility.Duration;
+
+import io.github.resilience4j.test.HelloWorldService;
+
+public class ThreadPoolBulkheadTest {
+
+ private HelloWorldService helloWorldService;
+ private ThreadPoolBulkheadConfig config;
+
+ @Before
+ public void setUp() {
+ Awaitility.reset();
+ helloWorldService = Mockito.mock(HelloWorldService.class);
+ config = ThreadPoolBulkheadConfig.custom()
+ .maxThreadPoolSize(1)
+ .coreThreadPoolSize(1)
+ .queueCapacity(1)
+ .build();
+ }
+
+ @Test
+ public void shouldExecuteSupplierAndFailWithBulkHeadFull() throws InterruptedException {
+
+ // Given
+ ThreadPoolBulkhead bulkhead = ThreadPoolBulkhead.of("testSupplier", config);
+
+ BDDMockito.given(helloWorldService.returnHelloWorld()).willReturn("Hello world");
+ final Exception exception = new Exception();
+ // When
+ new Thread(() -> {
+ try {
+ final AtomicInteger counter = new AtomicInteger(0);
+ bulkhead.executeRunnable(() -> {
+ Awaitility.waitAtMost(Duration.TWO_HUNDRED_MILLISECONDS).until(() -> counter.incrementAndGet() > 1);
+ });
+ } catch (Exception e) {
+ exception.initCause(e);
+ }
+ }).start();
+ new Thread(() -> {
+ try {
+ bulkhead.executeSupplier(helloWorldService::returnHelloWorld);
+ } catch (Exception e) {
+ exception.initCause(e);
+ }
+ }).start();
+ new Thread(() -> {
+ try {
+ bulkhead.executeSupplier(helloWorldService::returnHelloWorld);
+ } catch (Exception e) {
+ exception.initCause(e);
+ }
+ }).start();
+ final AtomicInteger counter = new AtomicInteger(0);
+ Awaitility.waitAtMost(Duration.FIVE_HUNDRED_MILLISECONDS).until(() -> counter.incrementAndGet() >= 2);
+ // Then
+ assertThat(exception.getCause().getMessage()).contains("ThreadPoolBulkhead 'testSupplier' is full");
+ }
+
+
+ @Test
+ public void shouldExecuteCallableAndFailWithBulkHeadFull() throws InterruptedException {
+
+ // Given
+ ThreadPoolBulkhead bulkhead = ThreadPoolBulkhead.of("test", config);
+
+ BDDMockito.given(helloWorldService.returnHelloWorld()).willReturn("Hello world");
+ final Exception exception = new Exception();
+ // When
+ new Thread(() -> {
+ try {
+ bulkhead.executeRunnable(() -> {
+ final AtomicInteger counter = new AtomicInteger(0);
+ Awaitility.waitAtMost(Duration.TWO_HUNDRED_MILLISECONDS).until(() -> counter.incrementAndGet() >= 2);
+ });
+ } catch (Exception e) {
+ exception.initCause(e);
+ }
+
+ }).start();
+ new Thread(() -> {
+ try {
+ bulkhead.executeCallable(helloWorldService::returnHelloWorld);
+ } catch (Exception e) {
+ exception.initCause(e);
+ }
+ }).start();
+ new Thread(() -> {
+ try {
+ bulkhead.executeCallable(helloWorldService::returnHelloWorld);
+ } catch (Exception e) {
+ exception.initCause(e);
+ }
+ }).start();
+ final AtomicInteger counter = new AtomicInteger(0);
+ Awaitility.waitAtMost(Duration.FIVE_HUNDRED_MILLISECONDS).until(() -> counter.incrementAndGet() >= 2);
+ // Then
+
+ assertThat(exception).hasCauseInstanceOf(BulkheadFullException.class);
+ }
+
+
+ @Test
+ public void shouldExecuteSupplierAndReturnWithSuccess() throws ExecutionException, InterruptedException {
+
+ // Given
+ ThreadPoolBulkhead bulkhead = ThreadPoolBulkhead.of("test", config);
+
+ BDDMockito.given(helloWorldService.returnHelloWorld()).willReturn("Hello world");
+
+ // When
+ CompletionStage result = bulkhead.executeSupplier(helloWorldService::returnHelloWorld);
+
+
+ // Then
+ assertThat(result.toCompletableFuture().get()).isEqualTo("Hello world");
+ BDDMockito.then(helloWorldService).should(times(1)).returnHelloWorld();
+ }
+
+
+}
diff --git a/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/internal/FixedThreadPoolBulkheadTest.java b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/internal/FixedThreadPoolBulkheadTest.java
new file mode 100644
index 0000000000..3a0bd1fec9
--- /dev/null
+++ b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/internal/FixedThreadPoolBulkheadTest.java
@@ -0,0 +1,80 @@
+/*
+ *
+ * Copyright 2019 Robert Winkler
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+package io.github.resilience4j.bulkhead.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import io.github.resilience4j.bulkhead.ThreadPoolBulkhead;
+import io.github.resilience4j.bulkhead.ThreadPoolBulkheadConfig;
+
+public class FixedThreadPoolBulkheadTest {
+
+ private ThreadPoolBulkhead bulkhead;
+
+ @Before
+ public void setUp() {
+
+ ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom()
+ .maxThreadPoolSize(2)
+ .coreThreadPoolSize(1)
+ .queueCapacity(10)
+ .keepAliveTime(10)
+ .build();
+
+ bulkhead = ThreadPoolBulkhead.of("test", config);
+ }
+
+ @Test
+ public void testToString() {
+
+ // when
+ String result = bulkhead.toString();
+
+ // then
+ assertThat(result).isEqualTo("FixedThreadPoolBulkhead 'test'");
+ }
+
+ @Test
+ public void testCustomSettings() {
+
+ // then
+ assertThat(bulkhead.getBulkheadConfig().getMaxThreadPoolSize()).isEqualTo(2);
+ assertThat(bulkhead.getBulkheadConfig().getQueueCapacity()).isEqualTo(10);
+ assertThat(bulkhead.getBulkheadConfig().getCoreThreadPoolSize()).isEqualTo(1);
+ assertThat(bulkhead.getBulkheadConfig().getKeepAliveTime()).isEqualTo(10);
+ }
+
+ @Test
+ public void testCreateWithDefaults() {
+ // when
+ ThreadPoolBulkhead bulkhead = ThreadPoolBulkhead.ofDefaults("test");
+
+ // then
+ assertThat(bulkhead).isNotNull();
+ assertThat(bulkhead.getBulkheadConfig()).isNotNull();
+ assertThat(bulkhead.getBulkheadConfig().getMaxThreadPoolSize()).isEqualTo(ThreadPoolBulkheadConfig.DEFAULT_MAX_THREAD_POOL_SIZE);
+ assertThat(bulkhead.getBulkheadConfig().getCoreThreadPoolSize()).isEqualTo(ThreadPoolBulkheadConfig.DEFAULT_CORE_THREAD_POOL_SIZE);
+ assertThat(bulkhead.getBulkheadConfig().getQueueCapacity()).isEqualTo(ThreadPoolBulkheadConfig.DEFAULT_QUEUE_CAPACITY);
+ }
+
+
+}
diff --git a/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkheadTest.java b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkheadTest.java
index 4ccafe734d..e5f22739aa 100644
--- a/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkheadTest.java
+++ b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkheadTest.java
@@ -18,451 +18,462 @@
*/
package io.github.resilience4j.bulkhead.internal;
+import static com.jayway.awaitility.Awaitility.await;
+import static io.github.resilience4j.bulkhead.BulkheadConfig.DEFAULT_MAX_CONCURRENT_CALLS;
+import static io.github.resilience4j.bulkhead.event.BulkheadEvent.Type.CALL_FINISHED;
+import static io.github.resilience4j.bulkhead.event.BulkheadEvent.Type.CALL_PERMITTED;
+import static io.github.resilience4j.bulkhead.event.BulkheadEvent.Type.CALL_REJECTED;
+import static java.lang.Thread.State.BLOCKED;
+import static java.lang.Thread.State.RUNNABLE;
+import static java.lang.Thread.State.TERMINATED;
+import static java.lang.Thread.State.TIMED_WAITING;
+import static java.lang.Thread.State.WAITING;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
import io.github.resilience4j.adapter.RxJava2Adapter;
import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadConfig;
import io.github.resilience4j.bulkhead.event.BulkheadEvent;
import io.reactivex.subscribers.TestSubscriber;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Supplier;
-
-import static com.jayway.awaitility.Awaitility.await;
-import static io.github.resilience4j.bulkhead.event.BulkheadEvent.Type.*;
-import static java.lang.Thread.State.*;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.assertj.core.api.Assertions.assertThat;
public class SemaphoreBulkheadTest {
- private Bulkhead bulkhead;
- private TestSubscriber testSubscriber;
+ private Bulkhead bulkhead;
+ private TestSubscriber testSubscriber;
- @Before
- public void setUp() {
+ @Before
+ public void setUp() {
- BulkheadConfig config = BulkheadConfig.custom()
- .maxConcurrentCalls(2)
- .maxWaitTime(0)
- .build();
+ BulkheadConfig config = BulkheadConfig.custom()
+ .maxConcurrentCalls(2)
+ .maxWaitTime(0)
+ .build();
- bulkhead = Bulkhead.of("test", config);
- testSubscriber = RxJava2Adapter.toFlowable(bulkhead.getEventPublisher())
- .map(BulkheadEvent::getEventType)
- .test();
- }
+ bulkhead = Bulkhead.of("test", config);
+ testSubscriber = RxJava2Adapter.toFlowable(bulkhead.getEventPublisher())
+ .map(BulkheadEvent::getEventType)
+ .test();
+ }
- @Test
- public void shouldReturnTheCorrectName() {
- assertThat(bulkhead.getName()).isEqualTo("test");
- }
+ @Test
+ public void shouldReturnTheCorrectName() {
+ assertThat(bulkhead.getName()).isEqualTo("test");
+ }
- @Test
- public void testBulkhead() throws InterruptedException {
+ @Test
+ public void testBulkhead() throws InterruptedException {
- bulkhead.isCallPermitted();
- bulkhead.isCallPermitted();
+ bulkhead.isCallPermitted();
+ bulkhead.isCallPermitted();
- assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
+ assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
- bulkhead.isCallPermitted();
- bulkhead.onComplete();
+ bulkhead.isCallPermitted();
+ bulkhead.onComplete();
- assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
+ assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
- bulkhead.onComplete();
+ bulkhead.onComplete();
- assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(2);
+ assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(2);
- bulkhead.isCallPermitted();
+ bulkhead.isCallPermitted();
- testSubscriber.assertValueCount(6)
- .assertValues(CALL_PERMITTED, CALL_PERMITTED, CALL_REJECTED, CALL_FINISHED, CALL_FINISHED, CALL_PERMITTED);
- }
+ testSubscriber.assertValueCount(6)
+ .assertValues(CALL_PERMITTED, CALL_PERMITTED, CALL_REJECTED, CALL_FINISHED, CALL_FINISHED, CALL_PERMITTED);
+ }
- @Test
- public void testToString() {
+ @Test
+ public void testToString() {
- // when
- String result = bulkhead.toString();
+ // when
+ String result = bulkhead.toString();
- // then
- assertThat(result).isEqualTo("Bulkhead 'test'");
- }
+ // then
+ assertThat(result).isEqualTo("Bulkhead 'test'");
+ }
- @Test
- public void testCreateWithNullConfig() {
+ @Test
+ public void testCreateWithNullConfig() {
- // given
- Supplier configSupplier = () -> null;
+ // given
+ Supplier configSupplier = () -> null;
- // when
- Bulkhead bulkhead = Bulkhead.of("test", configSupplier);
+ // when
+ Bulkhead bulkhead = Bulkhead.of("test", configSupplier);
- // then
- assertThat(bulkhead).isNotNull();
- assertThat(bulkhead.getBulkheadConfig()).isNotNull();
- }
+ // then
+ assertThat(bulkhead).isNotNull();
+ assertThat(bulkhead.getBulkheadConfig()).isNotNull();
+ }
- @Test
- public void testCreateWithDefaults() {
+ @Test
+ public void testCreateWithDefaults() {
- // when
- Bulkhead bulkhead = Bulkhead.ofDefaults("test");
+ // when
+ Bulkhead bulkhead = Bulkhead.ofDefaults("test");
- // then
- assertThat(bulkhead).isNotNull();
- assertThat(bulkhead.getBulkheadConfig()).isNotNull();
- }
+ // then
+ assertThat(bulkhead).isNotNull();
+ assertThat(bulkhead.getBulkheadConfig()).isNotNull();
+ assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(DEFAULT_MAX_CONCURRENT_CALLS);
+ }
- @Test
- public void testTryEnterWithTimeout() {
+ @Test
+ public void testTryEnterWithTimeout() {
- // given
- BulkheadConfig config = BulkheadConfig.custom()
- .maxConcurrentCalls(1)
- .maxWaitTime(100)
- .build();
+ // given
+ BulkheadConfig config = BulkheadConfig.custom()
+ .maxConcurrentCalls(1)
+ .maxWaitTime(100)
+ .build();
- SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", config);
+ SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", config);
- // when
- boolean entered = bulkhead.tryEnterBulkhead();
+ // when
+ boolean entered = bulkhead.tryEnterBulkhead();
- // then
- assertThat(entered).isTrue();
- }
+ // then
+ assertThat(entered).isTrue();
+ }
- @Test
- public void testEntryTimeout() {
+ @Test
+ public void testEntryTimeout() {
- // given
- BulkheadConfig config = BulkheadConfig.custom()
- .maxConcurrentCalls(1)
- .maxWaitTime(10)
- .build();
+ // given
+ BulkheadConfig config = BulkheadConfig.custom()
+ .maxConcurrentCalls(1)
+ .maxWaitTime(10)
+ .build();
- SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", config);
- bulkhead.isCallPermitted(); // consume the permit
+ SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", config);
+ bulkhead.isCallPermitted(); // consume the permit
- // when
- boolean entered = bulkhead.tryEnterBulkhead();
+ // when
+ boolean entered = bulkhead.tryEnterBulkhead();
- // then
- assertThat(entered).isFalse();
- }
+ // then
+ assertThat(entered).isFalse();
+ }
- @Test // best effort, no asserts
- public void testEntryInterrupted() {
+ @Test // best effort, no asserts
+ @Ignore // what we are testing here :> ?
+ public void testEntryInterrupted() {
- // given
- BulkheadConfig config = BulkheadConfig.custom()
- .maxConcurrentCalls(1)
- .maxWaitTime(10000)
- .build();
+ // given
+ BulkheadConfig config = BulkheadConfig.custom()
+ .maxConcurrentCalls(1)
+ .maxWaitTime(10000)
+ .build();
- final SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", config);
- bulkhead.isCallPermitted(); // consume the permit
- AtomicBoolean entered = new AtomicBoolean(true);
+ final SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", config);
+ bulkhead.isCallPermitted(); // consume the permit
+ AtomicBoolean entered = new AtomicBoolean(true);
- Thread t = new Thread(
- () -> {
- entered.set(bulkhead.tryEnterBulkhead());
- }
- );
-
- // when
- t.start();
- sleep(500);
- t.interrupt();
- sleep(500);
-
- // then
- //assertThat(entered.get()).isFalse();
- }
-
- @Test
- public void changePermissionsInIdleState() {
- BulkheadConfig originalConfig = BulkheadConfig.custom()
- .maxConcurrentCalls(3)
- .maxWaitTime(5000)
- .build();
- SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", originalConfig);
-
- assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(3);
- assertThat(bulkhead.getBulkheadConfig().getMaxWaitTime()).isEqualTo(5000);
-
- BulkheadConfig newConfig = BulkheadConfig.custom()
- .maxConcurrentCalls(5)
- .maxWaitTime(5000)
- .build();
-
- bulkhead.changeConfig(newConfig);
- assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(5);
- assertThat(bulkhead.getBulkheadConfig().getMaxWaitTime()).isEqualTo(5000);
-
-
- newConfig = BulkheadConfig.custom()
- .maxConcurrentCalls(2)
- .maxWaitTime(5000)
- .build();
-
- bulkhead.changeConfig(newConfig);
- assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(2);
- assertThat(bulkhead.getBulkheadConfig().getMaxWaitTime()).isEqualTo(5000);
-
- bulkhead.changeConfig(newConfig);
- }
-
- @Test
- public void changeWaitTimeInIdleState() {
- BulkheadConfig originalConfig = BulkheadConfig.custom()
- .maxConcurrentCalls(3)
- .maxWaitTime(5000)
- .build();
- SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", originalConfig);
-
- assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(3);
- assertThat(bulkhead.getBulkheadConfig().getMaxWaitTime()).isEqualTo(5000);
-
- BulkheadConfig newConfig = BulkheadConfig.custom()
- .maxConcurrentCalls(3)
- .maxWaitTime(3000)
- .build();
-
- bulkhead.changeConfig(newConfig);
- assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(3);
- assertThat(bulkhead.getBulkheadConfig().getMaxWaitTime()).isEqualTo(3000);
-
-
- newConfig = BulkheadConfig.custom()
- .maxConcurrentCalls(3)
- .maxWaitTime(7000)
- .build();
-
- bulkhead.changeConfig(newConfig);
- assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(3);
- assertThat(bulkhead.getBulkheadConfig().getMaxWaitTime()).isEqualTo(7000);
-
- bulkhead.changeConfig(newConfig);
- }
-
- @SuppressWarnings("Duplicates")
- @Test
- public void changePermissionsCountWhileOneThreadIsRunningWithThisPermission() {
- BulkheadConfig originalConfig = BulkheadConfig.custom()
- .maxConcurrentCalls(1)
- .maxWaitTime(0)
- .build();
- SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", originalConfig);
-
- assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
-
- AtomicBoolean bulkheadThreadTrigger = new AtomicBoolean(true);
- assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(1);
- Thread bulkheadThread = new Thread(() -> {
- bulkhead.isCallPermitted();
- while (bulkheadThreadTrigger.get()) {
- Thread.yield();
- }
- bulkhead.onComplete();
- });
- bulkheadThread.setDaemon(true);
- bulkheadThread.start();
-
- await().atMost(1, SECONDS)
- .until(() -> bulkheadThread.getState().equals(RUNNABLE));
-
- assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
- assertThat(bulkhead.tryEnterBulkhead()).isFalse();
-
- BulkheadConfig newConfig = BulkheadConfig.custom()
- .maxConcurrentCalls(2)
- .maxWaitTime(0)
- .build();
-
- bulkhead.changeConfig(newConfig);
- assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(2);
- assertThat(bulkhead.getBulkheadConfig().getMaxWaitTime()).isEqualTo(0);
- assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
- assertThat(bulkhead.tryEnterBulkhead()).isTrue();
-
- assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
- assertThat(bulkhead.tryEnterBulkhead()).isFalse();
-
- Thread changerThread = new Thread(() -> {
- bulkhead.changeConfig(BulkheadConfig.custom()
- .maxConcurrentCalls(1)
- .maxWaitTime(0)
- .build());
- });
- changerThread.setDaemon(true);
- changerThread.start();
-
- await().atMost(1, SECONDS)
- .until(() -> changerThread.getState().equals(WAITING));
-
- assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(2);
-
- bulkheadThreadTrigger.set(false);
- await().atMost(1, SECONDS)
- .until(() -> bulkheadThread.getState().equals(TERMINATED));
- await().atMost(1, SECONDS)
- .until(() -> changerThread.getState().equals(TERMINATED));
-
- assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
- assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(1);
-
- bulkhead.onComplete();
- assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
- assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(1);
- }
-
- @Test
- public void changePermissionsCountWhileOneThreadIsWaitingForPermission() {
- BulkheadConfig originalConfig = BulkheadConfig.custom()
- .maxConcurrentCalls(1)
- .maxWaitTime(500000)
- .build();
- SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", originalConfig);
- assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
- bulkhead.isCallPermitted();
- assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
-
- assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(1);
- Thread bulkheadThread = new Thread(() -> {
- bulkhead.isCallPermitted();
- bulkhead.onComplete();
- });
- bulkheadThread.setDaemon(true);
- bulkheadThread.start();
-
- await().atMost(1, SECONDS)
- .until(() -> bulkheadThread.getState().equals(TIMED_WAITING));
-
- assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
-
- BulkheadConfig newConfig = BulkheadConfig.custom()
- .maxConcurrentCalls(2)
- .maxWaitTime(500000)
- .build();
-
- bulkhead.changeConfig(newConfig);
- await().atMost(1, SECONDS)
- .until(() -> bulkheadThread.getState().equals(TERMINATED));
- assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(2);
- assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
- }
-
- @Test
- public void changeWaitingTimeWhileOneThreadIsWaitingForPermission() {
- BulkheadConfig originalConfig = BulkheadConfig.custom()
- .maxConcurrentCalls(1)
- .maxWaitTime(500000)
- .build();
- SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", originalConfig);
- assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
- bulkhead.isCallPermitted();
- assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
-
- assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(1);
- Thread bulkheadThread = new Thread(() -> {
- bulkhead.isCallPermitted();
- bulkhead.onComplete();
- });
- bulkheadThread.setDaemon(true);
- bulkheadThread.start();
-
- await().atMost(1, SECONDS)
- .until(() -> bulkheadThread.getState().equals(TIMED_WAITING));
- assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
-
- BulkheadConfig newConfig = BulkheadConfig.custom()
- .maxConcurrentCalls(1)
- .maxWaitTime(0)
- .build();
-
- bulkhead.changeConfig(newConfig);
- assertThat(bulkhead.tryEnterBulkhead()).isFalse(); // main thread is not blocked
-
- // previously blocked thread is still waiting
- await().atMost(1, SECONDS)
- .until(() -> bulkheadThread.getState().equals(TIMED_WAITING));
- }
-
- @SuppressWarnings("Duplicates")
- @Test
- public void changePermissionsConcurrently() {
- BulkheadConfig originalConfig = BulkheadConfig.custom()
- .maxConcurrentCalls(3)
- .maxWaitTime(0)
- .build();
- SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", originalConfig);
-
- assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(3);
-
- AtomicBoolean bulkheadThreadTrigger = new AtomicBoolean(true);
- assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(3);
- Thread bulkheadThread = new Thread(() -> {
- bulkhead.isCallPermitted();
- while (bulkheadThreadTrigger.get()) {
- Thread.yield();
- }
- bulkhead.onComplete();
- });
- bulkheadThread.setDaemon(true);
- bulkheadThread.start();
-
- await().atMost(1, SECONDS)
- .until(() -> bulkheadThread.getState().equals(RUNNABLE));
-
- assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(2);
- assertThat(bulkhead.tryEnterBulkhead()).isTrue();
- assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
-
- Thread firstChangerThread = new Thread(() -> {
- bulkhead.changeConfig(BulkheadConfig.custom()
- .maxConcurrentCalls(1)
- .maxWaitTime(0)
- .build());
- });
- firstChangerThread.setDaemon(true);
- firstChangerThread.start();
-
- await().atMost(1, SECONDS)
- .until(() -> firstChangerThread.getState().equals(WAITING));
-
- Thread secondChangerThread = new Thread(() -> {
- bulkhead.changeConfig(BulkheadConfig.custom()
- .maxConcurrentCalls(4)
- .maxWaitTime(0)
- .build());
- });
- secondChangerThread.setDaemon(true);
- secondChangerThread.start();
-
- await().atMost(1, SECONDS)
- .until(() -> secondChangerThread.getState().equals(BLOCKED));
-
- bulkheadThreadTrigger.set(false);
- await().atMost(1, SECONDS)
- .until(() -> bulkheadThread.getState().equals(TERMINATED));
- await().atMost(1, SECONDS)
- .until(() -> firstChangerThread.getState().equals(TERMINATED));
- await().atMost(1, SECONDS)
- .until(() -> secondChangerThread.getState().equals(TERMINATED));
-
- assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(4);
- assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(3); // main thread is still holding
- }
-
- void sleep(long time) {
- try {
- Thread.sleep(time);
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
+ Thread t = new Thread(
+ () -> {
+ entered.set(bulkhead.tryEnterBulkhead());
+ }
+ );
+
+ // when
+ t.start();
+ sleep(500);
+ t.interrupt();
+ sleep(500);
+
+ // then
+ //assertThat(entered.get()).isFalse();
+ }
+
+ @Test
+ public void changePermissionsInIdleState() {
+ BulkheadConfig originalConfig = BulkheadConfig.custom()
+ .maxConcurrentCalls(3)
+ .maxWaitTime(5000)
+ .build();
+ SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", originalConfig);
+
+ assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(3);
+ assertThat(bulkhead.getBulkheadConfig().getMaxWaitTime()).isEqualTo(5000);
+
+ BulkheadConfig newConfig = BulkheadConfig.custom()
+ .maxConcurrentCalls(5)
+ .maxWaitTime(5000)
+ .build();
+
+ bulkhead.changeConfig(newConfig);
+ assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(5);
+ assertThat(bulkhead.getBulkheadConfig().getMaxWaitTime()).isEqualTo(5000);
+
+
+ newConfig = BulkheadConfig.custom()
+ .maxConcurrentCalls(2)
+ .maxWaitTime(5000)
+ .build();
+
+ bulkhead.changeConfig(newConfig);
+ assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(2);
+ assertThat(bulkhead.getBulkheadConfig().getMaxWaitTime()).isEqualTo(5000);
+
+ bulkhead.changeConfig(newConfig);
+ }
+
+ @Test
+ public void changeWaitTimeInIdleState() {
+ BulkheadConfig originalConfig = BulkheadConfig.custom()
+ .maxConcurrentCalls(3)
+ .maxWaitTime(5000)
+ .build();
+ SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", originalConfig);
+
+ assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(3);
+ assertThat(bulkhead.getBulkheadConfig().getMaxWaitTime()).isEqualTo(5000);
+
+ BulkheadConfig newConfig = BulkheadConfig.custom()
+ .maxConcurrentCalls(3)
+ .maxWaitTime(3000)
+ .build();
+
+ bulkhead.changeConfig(newConfig);
+ assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(3);
+ assertThat(bulkhead.getBulkheadConfig().getMaxWaitTime()).isEqualTo(3000);
+
+
+ newConfig = BulkheadConfig.custom()
+ .maxConcurrentCalls(3)
+ .maxWaitTime(7000)
+ .build();
+
+ bulkhead.changeConfig(newConfig);
+ assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(3);
+ assertThat(bulkhead.getBulkheadConfig().getMaxWaitTime()).isEqualTo(7000);
+
+ bulkhead.changeConfig(newConfig);
+ }
+
+ @SuppressWarnings("Duplicates")
+ @Test
+ public void changePermissionsCountWhileOneThreadIsRunningWithThisPermission() {
+ BulkheadConfig originalConfig = BulkheadConfig.custom()
+ .maxConcurrentCalls(1)
+ .maxWaitTime(0)
+ .build();
+ SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", originalConfig);
+
+ assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
+
+ AtomicBoolean bulkheadThreadTrigger = new AtomicBoolean(true);
+ assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(1);
+ Thread bulkheadThread = new Thread(() -> {
+ bulkhead.isCallPermitted();
+ while (bulkheadThreadTrigger.get()) {
+ Thread.yield();
+ }
+ bulkhead.onComplete();
+ });
+ bulkheadThread.setDaemon(true);
+ bulkheadThread.start();
+
+ await().atMost(1, SECONDS)
+ .until(() -> bulkheadThread.getState().equals(RUNNABLE));
+
+ assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
+ assertThat(bulkhead.tryEnterBulkhead()).isFalse();
+
+ BulkheadConfig newConfig = BulkheadConfig.custom()
+ .maxConcurrentCalls(2)
+ .maxWaitTime(0)
+ .build();
+
+ bulkhead.changeConfig(newConfig);
+ assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(2);
+ assertThat(bulkhead.getBulkheadConfig().getMaxWaitTime()).isEqualTo(0);
+ assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
+ assertThat(bulkhead.tryEnterBulkhead()).isTrue();
+
+ assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
+ assertThat(bulkhead.tryEnterBulkhead()).isFalse();
+
+ Thread changerThread = new Thread(() -> {
+ bulkhead.changeConfig(BulkheadConfig.custom()
+ .maxConcurrentCalls(1)
+ .maxWaitTime(0)
+ .build());
+ });
+ changerThread.setDaemon(true);
+ changerThread.start();
+
+ await().atMost(1, SECONDS)
+ .until(() -> changerThread.getState().equals(WAITING));
+
+ assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(2);
+
+ bulkheadThreadTrigger.set(false);
+ await().atMost(1, SECONDS)
+ .until(() -> bulkheadThread.getState().equals(TERMINATED));
+ await().atMost(1, SECONDS)
+ .until(() -> changerThread.getState().equals(TERMINATED));
+
+ assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
+ assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(1);
+
+ bulkhead.onComplete();
+ assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
+ assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(1);
+ }
+
+ @Test
+ public void changePermissionsCountWhileOneThreadIsWaitingForPermission() {
+ BulkheadConfig originalConfig = BulkheadConfig.custom()
+ .maxConcurrentCalls(1)
+ .maxWaitTime(500000)
+ .build();
+ SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", originalConfig);
+ assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
+ bulkhead.isCallPermitted();
+ assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
+
+ assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(1);
+ Thread bulkheadThread = new Thread(() -> {
+ bulkhead.isCallPermitted();
+ bulkhead.onComplete();
+ });
+ bulkheadThread.setDaemon(true);
+ bulkheadThread.start();
+
+ await().atMost(1, SECONDS)
+ .until(() -> bulkheadThread.getState().equals(TIMED_WAITING));
+
+ assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
+
+ BulkheadConfig newConfig = BulkheadConfig.custom()
+ .maxConcurrentCalls(2)
+ .maxWaitTime(500000)
+ .build();
+
+ bulkhead.changeConfig(newConfig);
+ await().atMost(1, SECONDS)
+ .until(() -> bulkheadThread.getState().equals(TERMINATED));
+ assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(2);
+ assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
+ }
+
+ @Test
+ public void changeWaitingTimeWhileOneThreadIsWaitingForPermission() {
+ BulkheadConfig originalConfig = BulkheadConfig.custom()
+ .maxConcurrentCalls(1)
+ .maxWaitTime(500000)
+ .build();
+ SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", originalConfig);
+ assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
+ bulkhead.isCallPermitted();
+ assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
+
+ assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(1);
+ Thread bulkheadThread = new Thread(() -> {
+ bulkhead.isCallPermitted();
+ bulkhead.onComplete();
+ });
+ bulkheadThread.setDaemon(true);
+ bulkheadThread.start();
+
+ await().atMost(1, SECONDS)
+ .until(() -> bulkheadThread.getState().equals(TIMED_WAITING));
+ assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
+
+ BulkheadConfig newConfig = BulkheadConfig.custom()
+ .maxConcurrentCalls(1)
+ .maxWaitTime(0)
+ .build();
+
+ bulkhead.changeConfig(newConfig);
+ assertThat(bulkhead.tryEnterBulkhead()).isFalse(); // main thread is not blocked
+
+ // previously blocked thread is still waiting
+ await().atMost(1, SECONDS)
+ .until(() -> bulkheadThread.getState().equals(TIMED_WAITING));
+ }
+
+ @SuppressWarnings("Duplicates")
+ @Test
+ public void changePermissionsConcurrently() {
+ BulkheadConfig originalConfig = BulkheadConfig.custom()
+ .maxConcurrentCalls(3)
+ .maxWaitTime(0)
+ .build();
+ SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", originalConfig);
+
+ assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(3);
+
+ AtomicBoolean bulkheadThreadTrigger = new AtomicBoolean(true);
+ assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(3);
+ Thread bulkheadThread = new Thread(() -> {
+ bulkhead.isCallPermitted();
+ while (bulkheadThreadTrigger.get()) {
+ Thread.yield();
+ }
+ bulkhead.onComplete();
+ });
+ bulkheadThread.setDaemon(true);
+ bulkheadThread.start();
+
+ await().atMost(1, SECONDS)
+ .until(() -> bulkheadThread.getState().equals(RUNNABLE));
+
+ assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(2);
+ assertThat(bulkhead.tryEnterBulkhead()).isTrue();
+ assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
+
+ Thread firstChangerThread = new Thread(() -> {
+ bulkhead.changeConfig(BulkheadConfig.custom()
+ .maxConcurrentCalls(1)
+ .maxWaitTime(0)
+ .build());
+ });
+ firstChangerThread.setDaemon(true);
+ firstChangerThread.start();
+
+ await().atMost(1, SECONDS)
+ .until(() -> firstChangerThread.getState().equals(WAITING));
+
+ Thread secondChangerThread = new Thread(() -> {
+ bulkhead.changeConfig(BulkheadConfig.custom()
+ .maxConcurrentCalls(4)
+ .maxWaitTime(0)
+ .build());
+ });
+ secondChangerThread.setDaemon(true);
+ secondChangerThread.start();
+
+ await().atMost(1, SECONDS)
+ .until(() -> secondChangerThread.getState().equals(BLOCKED));
+
+ bulkheadThreadTrigger.set(false);
+ await().atMost(1, SECONDS)
+ .until(() -> bulkheadThread.getState().equals(TERMINATED));
+ await().atMost(1, SECONDS)
+ .until(() -> firstChangerThread.getState().equals(TERMINATED));
+ await().atMost(1, SECONDS)
+ .until(() -> secondChangerThread.getState().equals(TERMINATED));
+
+ assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(4);
+ assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(3); // main thread is still holding
+ }
+
+ void sleep(long time) {
+ try {
+ Thread.sleep(time);
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
}
diff --git a/sonar-project.properties b/sonar-project.properties
deleted file mode 100644
index dec747f8a6..0000000000
--- a/sonar-project.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-sonar.projectKey=resilience4j_resilience4j
-sonar.projectName=resilience4j
-sonar.projectVersion=0.15.0-SNAPSHOTS
-
-sonar.modules=resilience4j-core,resilience4j-feign,resilience4j-metrics,resilience4j-micrometer,resilience4j-prometheus,resilience4j-retry,resilience4j-spring,resilience4j-timelimiter,resilience4j-bulkhead,resilience4j-circuitbreaker,resilience4j-ratelimiter,resilience4j-cache,resilience4j-circularbuffer,resilience4j-consumer,resilience4j-spring-boot,resilience4j-spring-boot2,resilience4j-reactor,resilience4j-rxjava2
-
-# =====================================================
-# Meta-data for the project
-# =====================================================
-
-sonar.links.homepage=https://github.com/resilience4j/resilience4j
-sonar.links.ci=https://travis-ci.org/resilience4j/resilience4j
-sonar.links.scm=https://github.com/resilience4j/resilience4j
-sonar.links.issue=https://github.com/resilience4j/resilience4j/issues
-
-# =====================================================
-# Java config
-# =====================================================
-sonar.java.source=1.8
-sonar.sources=src/main/java
-sonar.java.binaries=build
-sonar.binaries=build
-
-# Language
-sonar.language=java
-
-# Encoding of the source files
-sonar.sourceEncoding=UTF-8