Skip to content

Commit

Permalink
Added fixed threadpool bulkhead to spring (ReactiveX#474)
Browse files Browse the repository at this point in the history
  • Loading branch information
Romeh authored and RobWin committed May 30, 2019
1 parent c846f9e commit ec72266
Show file tree
Hide file tree
Showing 32 changed files with 1,260 additions and 486 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,32 @@
@Target(value = {ElementType.METHOD, ElementType.TYPE})
@Documented
public @interface Bulkhead {
/**
* Name of the bulkhead.
*
* @return the name of the bulkhead
*/
String name();
/**
* Name of the bulkhead.
*
* @return the name of the bulkhead
*/
String name();

/**
* fallbackMethod method name.
*
* @return fallbackMethod method name.
*/
String fallbackMethod() default "";
/**
* fallbackMethod method name.
*
* @return fallbackMethod method name.
*/
String fallbackMethod() default "";

/**
* @return the bulkhead implementation type (SEMAPHORE or THREADPOOL)
*/
Type type() default Type.SEMAPHORE;

/**
* bulkhead implementation types
* <p>
* SEMAPHORE will invoke semaphore based bulkhead implementation
* THREADPOOL will invoke Thread pool based bulkhead implementation
*/
enum Type {
SEMAPHORE, THREADPOOL
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,23 @@ private ThreadPoolBulkheadConfig() {
}

/**
* Returns a builder to create a custom BulkheadConfig.
* Returns a builder to create a custom ThreadPoolBulkheadConfig.
*
* @return a {@link Builder}
*/
public static Builder custom() {
return new Builder();
}

/**
* Returns a builder to create a custom ThreadPoolBulkheadConfig.
*
* @return a {@link Builder}
*/
public static Builder from(ThreadPoolBulkheadConfig threadPoolBulkheadConfig) {
return new Builder(threadPoolBulkheadConfig);
}

/**
* Creates a default Bulkhead configuration.
*
Expand Down Expand Up @@ -72,7 +81,16 @@ public int getCoreThreadPoolSize() {

public static class Builder {

private final ThreadPoolBulkheadConfig config = new ThreadPoolBulkheadConfig();
private ThreadPoolBulkheadConfig config;


public Builder(ThreadPoolBulkheadConfig bulkheadConfig) {
this.config = bulkheadConfig;
}

public Builder() {
config = new ThreadPoolBulkheadConfig();
}

/**
* Configures the max thread pool size.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,30 @@ public void testBuildCustom() {
assertThat(config.getQueueCapacity()).isEqualTo(queueCapacity);
}

@Test
public void testCreateFromBaseConfig() {
// given
int maxThreadPoolSize = 20;
int coreThreadPoolSize = 2;
long maxWait = 555;
int queueCapacity = 50;

// when
ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.from(ThreadPoolBulkheadConfig.custom().build())
.maxThreadPoolSize(maxThreadPoolSize)
.coreThreadPoolSize(coreThreadPoolSize)
.queueCapacity(queueCapacity)
.keepAliveTime(maxWait)
.build();

// then
assertThat(config).isNotNull();
assertThat(config.getMaxThreadPoolSize()).isEqualTo(maxThreadPoolSize);
assertThat(config.getCoreThreadPoolSize()).isEqualTo(coreThreadPoolSize);
assertThat(config.getKeepAliveTime()).isEqualTo(maxWait);
assertThat(config.getQueueCapacity()).isEqualTo(queueCapacity);
}

@Test(expected = IllegalArgumentException.class)
public void testBuildWithIllegalMaxThreadPoolSize() {
// when
Expand Down Expand Up @@ -83,4 +107,14 @@ public void testBuildWithIllegalQueueCapacity() {
.build();
}

@Test(expected = IllegalArgumentException.class)
public void testBuildWithIllegalMaxCoreThreads() {
// when
ThreadPoolBulkheadConfig.custom()
.maxThreadPoolSize(1)
.coreThreadPoolSize(2)
.build();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,31 @@
*/
package io.github.resilience4j.bulkhead.autoconfigure;

import io.github.resilience4j.bulkhead.BulkheadRegistry;
import io.github.resilience4j.bulkhead.configure.*;
import io.github.resilience4j.bulkhead.event.BulkheadEvent;
import io.github.resilience4j.consumer.EventConsumerRegistry;
import io.github.resilience4j.fallback.FallbackDecorators;
import io.github.resilience4j.fallback.autoconfigure.FallbackConfigurationOnMissingBean;
import io.github.resilience4j.utils.ReactorOnClasspathCondition;
import io.github.resilience4j.utils.RxJava2OnClasspathCondition;
import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

import java.util.List;
import io.github.resilience4j.bulkhead.BulkheadRegistry;
import io.github.resilience4j.bulkhead.ThreadPoolBulkheadRegistry;
import io.github.resilience4j.bulkhead.configure.BulkheadAspect;
import io.github.resilience4j.bulkhead.configure.BulkheadAspectExt;
import io.github.resilience4j.bulkhead.configure.BulkheadConfiguration;
import io.github.resilience4j.bulkhead.configure.BulkheadConfigurationProperties;
import io.github.resilience4j.bulkhead.configure.ReactorBulkheadAspectExt;
import io.github.resilience4j.bulkhead.configure.RxJava2BulkheadAspectExt;
import io.github.resilience4j.bulkhead.configure.threadpool.ThreadPoolBulkheadConfiguration;
import io.github.resilience4j.bulkhead.configure.threadpool.ThreadPoolBulkheadConfigurationProperties;
import io.github.resilience4j.bulkhead.event.BulkheadEvent;
import io.github.resilience4j.consumer.EventConsumerRegistry;
import io.github.resilience4j.fallback.FallbackDecorators;
import io.github.resilience4j.fallback.autoconfigure.FallbackConfigurationOnMissingBean;
import io.github.resilience4j.utils.ReactorOnClasspathCondition;
import io.github.resilience4j.utils.RxJava2OnClasspathCondition;

/**
* {@link Configuration
Expand All @@ -41,8 +50,10 @@
public abstract class AbstractBulkheadConfigurationOnMissingBean {

protected final BulkheadConfiguration bulkheadConfiguration;
protected final ThreadPoolBulkheadConfiguration threadPoolBulkheadConfiguration;

public AbstractBulkheadConfigurationOnMissingBean() {
this.threadPoolBulkheadConfiguration = new ThreadPoolBulkheadConfiguration();
this.bulkheadConfiguration = new BulkheadConfiguration();
}

Expand All @@ -55,10 +66,10 @@ public BulkheadRegistry bulkheadRegistry(BulkheadConfigurationProperties bulkhea

@Bean
@ConditionalOnMissingBean
public BulkheadAspect bulkheadAspect(BulkheadConfigurationProperties bulkheadConfigurationProperties,
BulkheadRegistry bulkheadRegistry, @Autowired(required = false) List<BulkheadAspectExt> bulkHeadAspectExtList,
FallbackDecorators fallbackDecorators) {
return bulkheadConfiguration.bulkheadAspect(bulkheadConfigurationProperties, bulkheadRegistry, bulkHeadAspectExtList, fallbackDecorators);
public BulkheadAspect bulkheadAspect(BulkheadConfigurationProperties bulkheadConfigurationProperties, ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry,
BulkheadRegistry bulkheadRegistry, @Autowired(required = false) List<BulkheadAspectExt> bulkHeadAspectExtList,
FallbackDecorators fallbackDecorators) {
return bulkheadConfiguration.bulkheadAspect(bulkheadConfigurationProperties, threadPoolBulkheadRegistry, bulkheadRegistry, bulkHeadAspectExtList, fallbackDecorators);
}

@Bean
Expand All @@ -75,4 +86,13 @@ public ReactorBulkheadAspectExt reactorBulkHeadAspectExt() {
return bulkheadConfiguration.reactorBulkHeadAspectExt();
}


@Bean
@ConditionalOnMissingBean
public ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry(ThreadPoolBulkheadConfigurationProperties threadPoolBulkheadConfigurationProperties,
EventConsumerRegistry<BulkheadEvent> bulkheadEventConsumerRegistry) {

return threadPoolBulkheadConfiguration.threadPoolBulkheadRegistry(threadPoolBulkheadConfigurationProperties, bulkheadEventConsumerRegistry);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2019 Mahmoud Romeh
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.resilience4j.bulkhead.autoconfigure;

import org.springframework.boot.context.properties.ConfigurationProperties;

import io.github.resilience4j.bulkhead.configure.threadpool.ThreadPoolBulkheadConfigurationProperties;

@ConfigurationProperties(prefix = "resilience4j.thread-pool-bulkhead")
public class ThreadPoolBulkheadProperties extends ThreadPoolBulkheadConfigurationProperties {
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,20 @@
import org.junit.Test;

import io.github.resilience4j.bulkhead.BulkheadRegistry;
import io.github.resilience4j.bulkhead.ThreadPoolBulkheadRegistry;
import io.github.resilience4j.bulkhead.autoconfigure.AbstractBulkheadConfigurationOnMissingBean;
import io.github.resilience4j.bulkhead.configure.BulkheadConfigurationProperties;
import io.github.resilience4j.bulkhead.configure.threadpool.ThreadPoolBulkheadConfigurationProperties;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.github.resilience4j.circuitbreaker.autoconfigure.AbstractCircuitBreakerConfigurationOnMissingBean;
import io.github.resilience4j.circuitbreaker.configure.CircuitBreakerConfigurationProperties;
import io.github.resilience4j.consumer.DefaultEventConsumerRegistry;
import io.github.resilience4j.fallback.CompletionStageFallbackDecorator;
import io.github.resilience4j.fallback.FallbackDecorators;
import io.github.resilience4j.ratelimiter.RateLimiterRegistry;
import io.github.resilience4j.ratelimiter.autoconfigure.AbstractRateLimiterConfigurationOnMissingBean;
import io.github.resilience4j.ratelimiter.configure.RateLimiterConfigurationProperties;
import io.github.resilience4j.fallback.CompletionStageFallbackDecorator;
import io.github.resilience4j.fallback.FallbackDecorators;
import io.github.resilience4j.retry.RetryRegistry;
import io.github.resilience4j.retry.autoconfigure.AbstractRetryConfigurationOnMissingBean;
import io.github.resilience4j.retry.configure.RetryConfigurationProperties;
Expand All @@ -48,9 +50,10 @@ public class SpringBootCommonTest {
public void testBulkHeadCommonConfig() {
BulkheadConfigurationOnMissingBean bulkheadConfigurationOnMissingBean = new BulkheadConfigurationOnMissingBean();
assertThat(bulkheadConfigurationOnMissingBean.bulkheadRegistry(new BulkheadConfigurationProperties(), new DefaultEventConsumerRegistry<>())).isNotNull();
assertThat(bulkheadConfigurationOnMissingBean.threadPoolBulkheadRegistry(new ThreadPoolBulkheadConfigurationProperties(), new DefaultEventConsumerRegistry<>())).isNotNull();
assertThat(bulkheadConfigurationOnMissingBean.reactorBulkHeadAspectExt()).isNotNull();
assertThat(bulkheadConfigurationOnMissingBean.rxJava2BulkHeadAspectExt()).isNotNull();
assertThat(bulkheadConfigurationOnMissingBean.bulkheadAspect(new BulkheadConfigurationProperties(), BulkheadRegistry.ofDefaults(), Collections.emptyList(), new FallbackDecorators(Arrays.asList(new CompletionStageFallbackDecorator()))));
assertThat(bulkheadConfigurationOnMissingBean.bulkheadAspect(new BulkheadConfigurationProperties(), ThreadPoolBulkheadRegistry.ofDefaults(), BulkheadRegistry.ofDefaults(), Collections.emptyList(), new FallbackDecorators(Arrays.asList(new CompletionStageFallbackDecorator()))));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadRegistry;
import io.github.resilience4j.bulkhead.ThreadPoolBulkheadRegistry;
import io.github.resilience4j.bulkhead.event.BulkheadEvent;
import io.github.resilience4j.bulkhead.monitoring.endpoint.BulkheadEndpoint;
import io.github.resilience4j.bulkhead.monitoring.endpoint.BulkheadEventsEndpoint;
Expand All @@ -36,14 +37,14 @@
*/
@Configuration
@ConditionalOnClass(Bulkhead.class)
@EnableConfigurationProperties(BulkheadProperties.class)
@EnableConfigurationProperties({BulkheadProperties.class, ThreadPoolBulkheadProperties.class})
@Import(BulkheadConfigurationOnMissingBean.class)
@AutoConfigureBefore(EndpointAutoConfiguration.class)
public class BulkheadAutoConfiguration {

@Bean
public BulkheadEndpoint bulkheadEndpoint(BulkheadRegistry bulkheadRegistry) {
return new BulkheadEndpoint(bulkheadRegistry);
public BulkheadEndpoint bulkheadEndpoint(BulkheadRegistry bulkheadRegistry, ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry) {
return new BulkheadEndpoint(bulkheadRegistry, threadPoolBulkheadRegistry);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
*/
package io.github.resilience4j.bulkhead.monitoring.endpoint;

import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadRegistry;
import java.util.List;

import org.springframework.boot.actuate.endpoint.AbstractEndpoint;
import org.springframework.boot.actuate.endpoint.Endpoint;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.http.ResponseEntity;

import java.util.List;
import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadRegistry;
import io.github.resilience4j.bulkhead.ThreadPoolBulkhead;
import io.github.resilience4j.bulkhead.ThreadPoolBulkheadRegistry;


/**
Expand All @@ -32,16 +35,22 @@
public class BulkheadEndpoint extends AbstractEndpoint {

private final BulkheadRegistry bulkheadRegistry;
private final ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry;

public BulkheadEndpoint(BulkheadRegistry bulkheadRegistry) {
public BulkheadEndpoint(BulkheadRegistry bulkheadRegistry, ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry) {
super("bulkhead");
this.bulkheadRegistry = bulkheadRegistry;
this.threadPoolBulkheadRegistry = threadPoolBulkheadRegistry;
}

@Override
public ResponseEntity<BulkheadEndpointResponse> invoke() {
List<String> bulkheads = bulkheadRegistry.getAllBulkheads()
.map(Bulkhead::getName).sorted().toJavaList();
.map(Bulkhead::getName)
.appendAll(threadPoolBulkheadRegistry.getAllBulkheads()
.map(ThreadPoolBulkhead::getName))
.sorted()
.toJavaList();
return ResponseEntity.ok(new BulkheadEndpointResponse(bulkheads));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import io.github.resilience4j.bulkhead.BulkheadRegistry;
import io.github.resilience4j.bulkhead.ThreadPoolBulkheadRegistry;
import io.github.resilience4j.bulkhead.configure.BulkheadAspect;
import io.github.resilience4j.bulkhead.configure.BulkheadAspectExt;
import io.github.resilience4j.bulkhead.configure.BulkheadConfiguration;
Expand Down Expand Up @@ -104,10 +105,10 @@ public BulkheadRegistry bulkheadRegistry() {
}

@Bean
public BulkheadAspect bulkheadAspect(BulkheadRegistry bulkheadRegistry,
@Autowired(required = false) List<BulkheadAspectExt> bulkheadAspectExts,
FallbackDecorators fallbackDecorators) {
bulkheadAspect = new BulkheadAspect(new BulkheadProperties(), bulkheadRegistry, bulkheadAspectExts, fallbackDecorators);
public BulkheadAspect bulkheadAspect(BulkheadRegistry bulkheadRegistry, ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry,
@Autowired(required = false) List<BulkheadAspectExt> bulkheadAspectExts,
FallbackDecorators fallbackDecorators) {
bulkheadAspect = new BulkheadAspect(new BulkheadProperties(), threadPoolBulkheadRegistry, bulkheadRegistry, bulkheadAspectExts, fallbackDecorators);
return bulkheadAspect;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadRegistry;
import io.github.resilience4j.bulkhead.ThreadPoolBulkheadRegistry;
import io.github.resilience4j.bulkhead.event.BulkheadEvent;
import io.github.resilience4j.bulkhead.monitoring.endpoint.BulkheadEndpoint;
import io.github.resilience4j.bulkhead.monitoring.endpoint.BulkheadEventsEndpoint;
Expand All @@ -38,14 +39,14 @@
*/
@Configuration
@ConditionalOnClass(Bulkhead.class)
@EnableConfigurationProperties(BulkheadProperties.class)
@Import({BulkheadConfigurationOnMissingBean.class, FallbackConfigurationOnMissingBean.class})
@EnableConfigurationProperties({BulkheadProperties.class, ThreadPoolBulkheadProperties.class})
@Import({BulkheadConfigurationOnMissingBean.class, FallbackConfigurationOnMissingBean.class})
@AutoConfigureBefore(EndpointAutoConfiguration.class)
public class BulkheadAutoConfiguration {
@Bean
@ConditionalOnEnabledEndpoint
public BulkheadEndpoint bulkheadEndpoint(BulkheadRegistry bulkheadRegistry) {
return new BulkheadEndpoint(bulkheadRegistry);
public BulkheadEndpoint bulkheadEndpoint(BulkheadRegistry bulkheadRegistry, ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry) {
return new BulkheadEndpoint(bulkheadRegistry, threadPoolBulkheadRegistry);
}

@Bean
Expand Down
Loading

0 comments on commit ec72266

Please sign in to comment.