Skip to content

Commit

Permalink
Issue ReactiveX#733: Added bulkhead/ThreadPoolBulhead spring customiz…
Browse files Browse the repository at this point in the history
  • Loading branch information
Romeh authored and RobWin committed Jan 13, 2020
1 parent e32438c commit 5d9c2c1
Show file tree
Hide file tree
Showing 18 changed files with 316 additions and 71 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright Mahmoud Romeh
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.resilience4j.common;

import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright Mahmoud Romeh
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.resilience4j.common;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.github.resilience4j.common.bulkhead.configuration;

import io.github.resilience4j.bulkhead.BulkheadConfig;
import io.github.resilience4j.common.CustomizerWithName;

/**
* Enable customization bulkhead configuration builders programmatically.
*/
public interface BulkheadConfigCustomizer extends CustomizerWithName {

/**
* Customize BulkheadConfig configuration builder.
*
* @param configBuilder to be customized
*/
void customize(BulkheadConfig.Builder configBuilder);

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.github.resilience4j.bulkhead.BulkheadConfig;
import io.github.resilience4j.common.CommonProperties;
import io.github.resilience4j.common.CompositeCustomizer;
import io.github.resilience4j.common.utils.ConfigUtils;
import io.github.resilience4j.core.ConfigurationNotFoundException;
import io.github.resilience4j.core.StringUtils;
Expand All @@ -32,32 +33,44 @@ public class BulkheadConfigurationProperties extends CommonProperties {
private Map<String, InstanceProperties> instances = new HashMap<>();
private Map<String, InstanceProperties> configs = new HashMap<>();

public BulkheadConfig createBulkheadConfig(InstanceProperties instanceProperties) {
public BulkheadConfig createBulkheadConfig(InstanceProperties instanceProperties,
CompositeCustomizer<BulkheadConfigCustomizer> compositeBulkheadCustomizer,
String instanceName) {
if (StringUtils.isNotEmpty(instanceProperties.getBaseConfig())) {
InstanceProperties baseProperties = configs.get(instanceProperties.getBaseConfig());
if (baseProperties == null) {
throw new ConfigurationNotFoundException(instanceProperties.getBaseConfig());
}
return buildConfigFromBaseConfig(baseProperties, instanceProperties);
return buildConfigFromBaseConfig(baseProperties, instanceProperties,
compositeBulkheadCustomizer, instanceName);
}
return buildBulkheadConfig(BulkheadConfig.custom(), instanceProperties);
return buildBulkheadConfig(BulkheadConfig.custom(), instanceProperties,
compositeBulkheadCustomizer, instanceName);
}

private BulkheadConfig buildConfigFromBaseConfig(InstanceProperties baseProperties,
InstanceProperties instanceProperties) {
InstanceProperties instanceProperties,
CompositeCustomizer<BulkheadConfigCustomizer> compositeBulkheadCustomizer,
String instanceName) {
ConfigUtils.mergePropertiesIfAny(baseProperties, instanceProperties);
BulkheadConfig baseConfig = buildBulkheadConfig(BulkheadConfig.custom(), baseProperties);
return buildBulkheadConfig(BulkheadConfig.from(baseConfig), instanceProperties);
BulkheadConfig baseConfig = buildBulkheadConfig(BulkheadConfig.custom(), baseProperties,
compositeBulkheadCustomizer, instanceName);
return buildBulkheadConfig(BulkheadConfig.from(baseConfig), instanceProperties,
compositeBulkheadCustomizer, instanceName);
}

private BulkheadConfig buildBulkheadConfig(BulkheadConfig.Builder builder,
InstanceProperties instanceProperties) {
InstanceProperties instanceProperties,
CompositeCustomizer<BulkheadConfigCustomizer> compositeBulkheadCustomizer,
String instanceName) {
if (instanceProperties.getMaxConcurrentCalls() != null) {
builder.maxConcurrentCalls(instanceProperties.getMaxConcurrentCalls());
}
if (instanceProperties.getMaxWaitDuration() != null) {
builder.maxWaitDuration(instanceProperties.getMaxWaitDuration());
}
compositeBulkheadCustomizer.getCustomizer(instanceName)
.ifPresent(bulkheadConfigCustomizer -> bulkheadConfigCustomizer.customize(builder));
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.github.resilience4j.common.bulkhead.configuration;

import io.github.resilience4j.bulkhead.ThreadPoolBulkheadConfig;
import io.github.resilience4j.common.CustomizerWithName;

/**
* Enable customization thread pool bulkhead configuration builders programmatically.
*/
public interface ThreadPoolBulkheadConfigCustomizer extends CustomizerWithName {

/**
* Customize ThreadPoolBulkheadConfig configuration builder.
*
* @param configBuilder to be customized
*/
void customize(ThreadPoolBulkheadConfig.Builder configBuilder);

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.ThreadPoolBulkheadConfig;
import io.github.resilience4j.common.CommonProperties;
import io.github.resilience4j.common.CompositeCustomizer;
import io.github.resilience4j.core.ConfigurationNotFoundException;
import io.github.resilience4j.core.StringUtils;
import io.github.resilience4j.core.lang.Nullable;
Expand Down Expand Up @@ -53,33 +54,44 @@ public InstanceProperties getBackendProperties(String backend) {
}

// Thread pool bulkhead section
public ThreadPoolBulkheadConfig createThreadPoolBulkheadConfig(String backend) {
return createThreadPoolBulkheadConfig(getBackendProperties(backend));
public ThreadPoolBulkheadConfig createThreadPoolBulkheadConfig(String backend,
CompositeCustomizer<ThreadPoolBulkheadConfigCustomizer> compositeThreadPoolBulkheadCustomizer) {
return createThreadPoolBulkheadConfig(getBackendProperties(backend),
compositeThreadPoolBulkheadCustomizer, backend);
}

public ThreadPoolBulkheadConfig createThreadPoolBulkheadConfig(
InstanceProperties instanceProperties) {
InstanceProperties instanceProperties,
CompositeCustomizer<ThreadPoolBulkheadConfigCustomizer> compositeThreadPoolBulkheadCustomizer,
String instanceName) {
if (instanceProperties != null && StringUtils
.isNotEmpty(instanceProperties.getBaseConfig())) {
InstanceProperties baseProperties = configs.get(instanceProperties.getBaseConfig());
if (baseProperties == null) {
throw new ConfigurationNotFoundException(instanceProperties.getBaseConfig());
}
return buildThreadPoolConfigFromBaseConfig(baseProperties, instanceProperties);
return buildThreadPoolConfigFromBaseConfig(baseProperties, instanceProperties,
compositeThreadPoolBulkheadCustomizer, instanceName);
}
return buildThreadPoolBulkheadConfig(ThreadPoolBulkheadConfig.custom(), instanceProperties);
return buildThreadPoolBulkheadConfig(ThreadPoolBulkheadConfig.custom(), instanceProperties,
compositeThreadPoolBulkheadCustomizer, instanceName);
}

private ThreadPoolBulkheadConfig buildThreadPoolConfigFromBaseConfig(
InstanceProperties baseProperties, InstanceProperties instanceProperties) {
InstanceProperties baseProperties, InstanceProperties instanceProperties,
CompositeCustomizer<ThreadPoolBulkheadConfigCustomizer> compositeThreadPoolBulkheadCustomizer,
String instanceName) {
ThreadPoolBulkheadConfig baseConfig = buildThreadPoolBulkheadConfig(
ThreadPoolBulkheadConfig.custom(), baseProperties);
ThreadPoolBulkheadConfig.custom(), baseProperties,
compositeThreadPoolBulkheadCustomizer, instanceName);
return buildThreadPoolBulkheadConfig(ThreadPoolBulkheadConfig.from(baseConfig),
instanceProperties);
instanceProperties, compositeThreadPoolBulkheadCustomizer, instanceName);
}

public ThreadPoolBulkheadConfig buildThreadPoolBulkheadConfig(
ThreadPoolBulkheadConfig.Builder builder, InstanceProperties properties) {
ThreadPoolBulkheadConfig.Builder builder, InstanceProperties properties,
CompositeCustomizer<ThreadPoolBulkheadConfigCustomizer> compositeThreadPoolBulkheadCustomizer,
String instanceName) {
if (properties == null) {
return ThreadPoolBulkheadConfig.custom().build();
}
Expand All @@ -99,6 +111,9 @@ public ThreadPoolBulkheadConfig buildThreadPoolBulkheadConfig(
if (properties.getWritableStackTraceEnabled() != null) {
builder.writableStackTraceEnabled(properties.getWritableStackTraceEnabled());
}
compositeThreadPoolBulkheadCustomizer.getCustomizer(instanceName).ifPresent(
threadPoolBulkheadConfigCustomizer -> threadPoolBulkheadConfigCustomizer
.customize(builder));

return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

import io.github.resilience4j.bulkhead.BulkheadConfig;
import io.github.resilience4j.bulkhead.ThreadPoolBulkheadConfig;
import io.github.resilience4j.common.CompositeCustomizer;
import io.github.resilience4j.core.ConfigurationNotFoundException;
import io.vavr.Tuple;
import io.vavr.Tuple2;
import org.junit.Test;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -46,21 +46,21 @@ public void tesFixedThreadPoolBulkHeadProperties() {
ThreadPoolBulkheadConfigurationProperties bulkheadConfigurationProperties = new ThreadPoolBulkheadConfigurationProperties();
bulkheadConfigurationProperties.getBackends().put("backend1", backendProperties1);
bulkheadConfigurationProperties.getBackends().put("backend2", backendProperties2);
Map<String,String> tags=new HashMap<>();
tags.put("testKey1","testKet2");
Map<String, String> tags = new HashMap<>();
tags.put("testKey1", "testKet2");
bulkheadConfigurationProperties.setTags(tags);

//Then
assertThat(bulkheadConfigurationProperties.getTags()).isNotEmpty();
assertThat(bulkheadConfigurationProperties.getBackends().size()).isEqualTo(2);
assertThat(bulkheadConfigurationProperties.getInstances().size()).isEqualTo(2);
ThreadPoolBulkheadConfig bulkhead1 = bulkheadConfigurationProperties
.createThreadPoolBulkheadConfig("backend1");
.createThreadPoolBulkheadConfig("backend1", compositeThreadPoolBulkheadCustomizer());
assertThat(bulkhead1).isNotNull();
assertThat(bulkhead1.getCoreThreadPoolSize()).isEqualTo(1);

ThreadPoolBulkheadConfig bulkhead2 = bulkheadConfigurationProperties
.createThreadPoolBulkheadConfig("backend2");
.createThreadPoolBulkheadConfig("backend2", compositeThreadPoolBulkheadCustomizer());
assertThat(bulkhead2).isNotNull();
assertThat(bulkhead2.getCoreThreadPoolSize()).isEqualTo(2);

Expand Down Expand Up @@ -102,19 +102,22 @@ public void testCreateThreadPoolBulkHeadPropertiesWithSharedConfigs() {
assertThat(bulkheadConfigurationProperties.getBackends().size()).isEqualTo(2);
// Should get default config and core number
ThreadPoolBulkheadConfig bulkhead1 = bulkheadConfigurationProperties
.createThreadPoolBulkheadConfig("backendWithDefaultConfig");
.createThreadPoolBulkheadConfig("backendWithDefaultConfig",
compositeThreadPoolBulkheadCustomizer());
assertThat(bulkhead1).isNotNull();
assertThat(bulkhead1.getCoreThreadPoolSize()).isEqualTo(3);
assertThat(bulkhead1.getQueueCapacity()).isEqualTo(1);
// Should get shared config and overwrite core number
ThreadPoolBulkheadConfig bulkhead2 = bulkheadConfigurationProperties
.createThreadPoolBulkheadConfig("backendWithSharedConfig");
.createThreadPoolBulkheadConfig("backendWithSharedConfig",
compositeThreadPoolBulkheadCustomizer());
assertThat(bulkhead2).isNotNull();
assertThat(bulkhead2.getCoreThreadPoolSize()).isEqualTo(4);
assertThat(bulkhead2.getQueueCapacity()).isEqualTo(2);
// Unknown backend should get default config of Registry
ThreadPoolBulkheadConfig bulkhead3 = bulkheadConfigurationProperties
.createThreadPoolBulkheadConfig("unknownBackend");
.createThreadPoolBulkheadConfig("unknownBackend",
compositeThreadPoolBulkheadCustomizer());
assertThat(bulkhead3).isNotNull();
assertThat(bulkhead3.getCoreThreadPoolSize())
.isEqualTo(ThreadPoolBulkheadConfig.DEFAULT_CORE_THREAD_POOL_SIZE);
Expand All @@ -140,19 +143,19 @@ public void testBulkHeadProperties() {
BulkheadConfigurationProperties bulkheadConfigurationProperties = new BulkheadConfigurationProperties();
bulkheadConfigurationProperties.getInstances().put("backend1", instanceProperties1);
bulkheadConfigurationProperties.getInstances().put("backend2", instanceProperties2);
Map<String,String> globalTags=new HashMap<>();
globalTags.put("testKey1","testKet2");
Map<String, String> globalTags = new HashMap<>();
globalTags.put("testKey1", "testKet2");
bulkheadConfigurationProperties.setTags(globalTags);
//Then
assertThat(bulkheadConfigurationProperties.getInstances().size()).isEqualTo(2);
assertThat(bulkheadConfigurationProperties.getTags()).isNotEmpty();
BulkheadConfig bulkhead1 = bulkheadConfigurationProperties
.createBulkheadConfig(instanceProperties1);
.createBulkheadConfig(instanceProperties1, compositeBulkheadCustomizer(), "backend1");
assertThat(bulkhead1).isNotNull();
assertThat(bulkhead1.getMaxConcurrentCalls()).isEqualTo(3);

BulkheadConfig bulkhead2 = bulkheadConfigurationProperties
.createBulkheadConfig(instanceProperties2);
.createBulkheadConfig(instanceProperties2, compositeBulkheadCustomizer(), "backend2");
assertThat(bulkhead2).isNotNull();
assertThat(bulkhead2.getMaxConcurrentCalls()).isEqualTo(2);

Expand Down Expand Up @@ -196,21 +199,24 @@ public void testCreateBulkHeadPropertiesWithSharedConfigs() {

// Should get default config and overwrite max calls and wait time
BulkheadConfig bulkhead1 = bulkheadConfigurationProperties
.createBulkheadConfig(backendWithDefaultConfig);
.createBulkheadConfig(backendWithDefaultConfig, compositeBulkheadCustomizer(),
"backendWithDefaultConfig");
assertThat(bulkhead1).isNotNull();
assertThat(bulkhead1.getMaxConcurrentCalls()).isEqualTo(3);
assertThat(bulkhead1.getMaxWaitDuration().toMillis()).isEqualTo(200L);

// Should get shared config and overwrite wait time
BulkheadConfig bulkhead2 = bulkheadConfigurationProperties
.createBulkheadConfig(backendWithSharedConfig);
.createBulkheadConfig(backendWithSharedConfig, compositeBulkheadCustomizer(),
"backendWithSharedConfig");
assertThat(bulkhead2).isNotNull();
assertThat(bulkhead2.getMaxConcurrentCalls()).isEqualTo(2);
assertThat(bulkhead2.getMaxWaitDuration().toMillis()).isEqualTo(300L);

// Unknown backend should get default config of Registry
BulkheadConfig bulkhead3 = bulkheadConfigurationProperties
.createBulkheadConfig(new BulkheadConfigurationProperties.InstanceProperties());
.createBulkheadConfig(new BulkheadConfigurationProperties.InstanceProperties(),
compositeBulkheadCustomizer(), "unknown");
assertThat(bulkhead3).isNotNull();
assertThat(bulkhead3.getMaxWaitDuration().toMillis()).isEqualTo(0L);

Expand All @@ -226,7 +232,8 @@ public void testCreateBulkHeadPropertiesWithUnknownConfig() {

//When
assertThatThrownBy(
() -> bulkheadConfigurationProperties.createBulkheadConfig(instanceProperties))
() -> bulkheadConfigurationProperties.createBulkheadConfig(instanceProperties,
compositeBulkheadCustomizer(), "unknownConfig"))
.isInstanceOf(ConfigurationNotFoundException.class)
.hasMessage("Configuration with name 'unknownConfig' does not exist");
}
Expand Down Expand Up @@ -255,4 +262,12 @@ public void testThreadPoolBulkheadIllegalArgumentOnEventConsumerBufferSize() {
defaultProperties.setEventConsumerBufferSize(-1);
}

private CompositeCustomizer<BulkheadConfigCustomizer> compositeBulkheadCustomizer() {
return new CompositeCustomizer<>(Collections.emptyList());
}

private CompositeCustomizer<ThreadPoolBulkheadConfigCustomizer> compositeThreadPoolBulkheadCustomizer() {
return new CompositeCustomizer<>(Collections.emptyList());
}

}
Loading

0 comments on commit 5d9c2c1

Please sign in to comment.