diff --git a/gradle.properties b/gradle.properties index e8bad1250c..f4153fddee 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ kotlin.code.style=official systemProp.file.encoding=UTF-8 -systemProp.sun.jnu.encoding=UTF-8 \ No newline at end of file +systemProp.sun.jnu.encoding=UTF-8 diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/ContextPropagator.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/ContextPropagator.java new file mode 100644 index 0000000000..9ca35d2a7d --- /dev/null +++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/ContextPropagator.java @@ -0,0 +1,178 @@ +package io.github.resilience4j.bulkhead; + +import java.util.*; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.stream.Collectors.toMap; + +/** + * Abstraction to retrieve, copy and clean up values across thread boundary. This class is + * specifically use for propagating {@link ThreadLocal} across different thread boundaries. + * + * @param value type that is copied across thread boundary. + */ +public interface ContextPropagator { + + /** + * Retrieves value from the currently executing thread. This method should produce values (as + * Supplier) that needs to be propagated to new thread. + * + * @return a Supplier producing the value from current thread + */ + Supplier> retrieve(); + + /** + * Copies value from the parent thread into new executing thread. This method is passed with the + * values received from method {@link ContextPropagator#retrieve()} in the parent thread. + * + * @return a Consumer to set values in new thread. + */ + Consumer> copy(); + + /** + * CleanUp value before thread execution finish. This method is passed with the values received + * from method {@link ContextPropagator#retrieve()} in the parent thread. + * + * @return a Consumer to cleanUp values. + */ + Consumer> clear(); + + /** + * Method decorates supplier to copy variables across thread boundary. + * + * @param propagator the instance of {@link ContextPropagator} + * @param supplier the supplier to be decorated + * @param the type of variable that cross thread boundary + * @return decorated supplier of type T + */ + static Supplier decorateSupplier(ContextPropagator propagator, + Supplier supplier) { + final Optional value = (Optional) propagator.retrieve().get(); + return () -> { + try { + propagator.copy().accept(value); + return supplier.get(); + } finally { + propagator.clear().accept(value); + } + }; + } + + /** + * Method decorates supplier to copy variables across thread boundary. + * + * @param propagators the instance of {@link ContextPropagator} should be non null. + * @param supplier the supplier to be decorated + * @param the type of variable that cross thread boundary + * @return decorated supplier of type T + */ + static Supplier decorateSupplier(List propagators, + Supplier supplier) { + + Objects.requireNonNull(propagators, "ContextPropagator list should be non null"); + + //Create identity map of , if we have duplicate ContextPropagators then last one wins. + final Map values = propagators.stream() + .collect(toMap( + p -> p, //key as ContextPropagator instance itself + p -> p.retrieve().get(), //Supplier Optional value + (first, second) -> second, //Merge function, this simply choose later value in key collision + HashMap::new)); //type of map + + return () -> { + try { + values.forEach((p, v) -> p.copy().accept(v)); + return supplier.get(); + } finally { + values.forEach((p, v) -> p.clear().accept(v)); + } + }; + } + + /** + * Method decorates runnable to copy variables across thread boundary. + * + * @param propagators the instance of {@link ContextPropagator} + * @param runnable the runnable to be decorated + * @param the type of variable that cross thread boundary + * @return decorated supplier of type T + */ + static Runnable decorateRunnable(List propagators, + Runnable runnable) { + Objects.requireNonNull(propagators, "ContextPropagator list should be non null"); + + //Create identity map of , if we have duplicate ContextPropagators then last one wins. + final Map values = propagators.stream() + .collect(toMap( + p -> p, //key as ContextPropagator instance itself + p -> p.retrieve().get(), //Supplier Optional value + (first, second) -> second, //Merge function, this simply choose later value in key collision + HashMap::new)); //type of map + + return () -> { + try { + values.forEach((p, v) -> p.copy().accept(v)); + runnable.run(); + } finally { + values.forEach((p, v) -> p.clear().accept(v)); + } + }; + } + + /** + * Method decorates runnable to copy variables across thread boundary. + * + * @param propagator the instance of {@link ContextPropagator} + * @param runnable the runnable to be decorated + * @param the type of variable that cross thread boundary + * @return decorated supplier of type T + */ + static Runnable decorateRunnable(ContextPropagator propagator, + Runnable runnable) { + final Optional value = (Optional) propagator.retrieve().get(); + return () -> { + try { + propagator.copy().accept(value); + runnable.run(); + } finally { + propagator.clear().accept(value); + } + }; + } + + /** + * An empty context propagator. + * + * @param type. + * @return an empty {@link ContextPropagator} + */ + static ContextPropagator empty() { + return new EmptyContextPropagator<>(); + } + + /** + * A convenient implementation of empty {@link ContextPropagator} + * + * @param type of class. + */ + class EmptyContextPropagator implements ContextPropagator { + + @Override + public Supplier> retrieve() { + return () -> Optional.empty(); + } + + @Override + public Consumer> copy() { + return (t) -> { + }; + } + + @Override + public Consumer> clear() { + return (t) -> { + }; + } + } +} diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/ThreadPoolBulkheadConfig.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/ThreadPoolBulkheadConfig.java index 176fabef35..cd66aa25c1 100644 --- a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/ThreadPoolBulkheadConfig.java +++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/ThreadPoolBulkheadConfig.java @@ -18,7 +18,16 @@ */ package io.github.resilience4j.bulkhead; +import io.github.resilience4j.core.lang.Nullable; + import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static io.github.resilience4j.core.ClassUtils.instantiateClassDefConstructor; +import static java.util.Arrays.stream; +import static java.util.stream.Collectors.toList; /** * A {@link ThreadPoolBulkheadConfig} configures a {@link Bulkhead} @@ -39,6 +48,7 @@ public class ThreadPoolBulkheadConfig { private int queueCapacity = DEFAULT_QUEUE_CAPACITY; private Duration keepAliveDuration = DEFAULT_KEEP_ALIVE_DURATION; private boolean writableStackTraceEnabled = DEFAULT_WRITABLE_STACK_TRACE_ENABLED; + private List contextPropagators = new ArrayList<>(); private ThreadPoolBulkheadConfig() { } @@ -82,19 +92,21 @@ public int getMaxThreadPoolSize() { return maxThreadPoolSize; } - public int getCoreThreadPoolSize() { - return coreThreadPoolSize; - } + public int getCoreThreadPoolSize() { return coreThreadPoolSize; } public boolean isWritableStackTraceEnabled() { return writableStackTraceEnabled; } - public static class Builder { + public List getContextPropagator() { + return contextPropagators; + } + public static class Builder { + private Class[] contextPropagatorClasses = new Class[0]; + private List contextPropagators = new ArrayList<>(); private ThreadPoolBulkheadConfig config; - public Builder(ThreadPoolBulkheadConfig bulkheadConfig) { this.config = bulkheadConfig; } @@ -133,6 +145,26 @@ public Builder coreThreadPoolSize(int coreThreadPoolSize) { return this; } + /** + * Configures the context propagator class. + * + * @return the BulkheadConfig.Builder + */ + public final Builder contextPropagator( + @Nullable Class... contextPropagatorClasses) { + this.contextPropagatorClasses = contextPropagatorClasses != null + ? contextPropagatorClasses + : new Class[0]; + return this; + } + + public final Builder contextPropagator(ContextPropagator... contextPropagators) { + this.contextPropagators = contextPropagators != null ? + Arrays.stream(contextPropagators).collect(toList()) : + new ArrayList<>(); + return this; + } + /** * Configures the capacity of the queue. * @@ -188,6 +220,16 @@ public ThreadPoolBulkheadConfig build() { throw new IllegalArgumentException( "maxThreadPoolSize must be a greater than or equals to coreThreadPoolSize"); } + if (contextPropagatorClasses.length > 0) { + config.contextPropagators.addAll((List)stream(contextPropagatorClasses) + .map(c -> instantiateClassDefConstructor(c)) + .collect(toList())); + } + //setting bean of type context propagator overrides the class type. + if (contextPropagators.size() > 0){ + config.contextPropagators.addAll((List)this.contextPropagators); + } + return config; } } diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/FixedThreadPoolBulkhead.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/FixedThreadPoolBulkhead.java index fe5bbfda78..7e927ba616 100644 --- a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/FixedThreadPoolBulkhead.java +++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/FixedThreadPoolBulkhead.java @@ -20,6 +20,7 @@ import io.github.resilience4j.bulkhead.BulkheadFullException; +import io.github.resilience4j.bulkhead.ContextPropagator; import io.github.resilience4j.bulkhead.ThreadPoolBulkhead; import io.github.resilience4j.bulkhead.ThreadPoolBulkheadConfig; import io.github.resilience4j.bulkhead.event.BulkheadEvent; @@ -32,6 +33,7 @@ import io.vavr.collection.HashMap; import io.vavr.collection.Map; +import java.util.Optional; import java.util.concurrent.*; import java.util.function.Supplier; @@ -137,14 +139,15 @@ public FixedThreadPoolBulkhead(String name, Supplier c public CompletableFuture submit(Callable callable) { final CompletableFuture promise = new CompletableFuture<>(); try { - CompletableFuture.supplyAsync(() -> { + + CompletableFuture.supplyAsync(ContextPropagator.decorateSupplier(config.getContextPropagator(),() -> { try { publishBulkheadEvent(() -> new BulkheadOnCallPermittedEvent(name)); return callable.call(); } catch (Exception e) { throw new CompletionException(e); } - }, executorService).whenComplete((result, throwable) -> { + }), executorService).whenComplete((result, throwable) -> { publishBulkheadEvent(() -> new BulkheadOnCallFinishedEvent(name)); if (throwable != null) { promise.completeExceptionally(throwable); @@ -165,14 +168,14 @@ public CompletableFuture submit(Callable callable) { @Override public void submit(Runnable runnable) { try { - CompletableFuture.runAsync(() -> { + CompletableFuture.runAsync(ContextPropagator.decorateRunnable(config.getContextPropagator(),() -> { try { publishBulkheadEvent(() -> new BulkheadOnCallPermittedEvent(name)); runnable.run(); } catch (Exception e) { throw new CompletionException(e); } - }, executorService).whenComplete((voidResult, throwable) -> publishBulkheadEvent( + }), executorService).whenComplete((voidResult, throwable) -> publishBulkheadEvent( () -> new BulkheadOnCallFinishedEvent(name))); } catch (RejectedExecutionException rejected) { publishBulkheadEvent(() -> new BulkheadOnCallRejectedEvent(name)); diff --git a/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/ContextPropagatorTest.java b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/ContextPropagatorTest.java new file mode 100644 index 0000000000..8a0c3dc469 --- /dev/null +++ b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/ContextPropagatorTest.java @@ -0,0 +1,228 @@ +package io.github.resilience4j.bulkhead; + +import io.github.resilience4j.bulkhead.TestContextPropagators.TestThreadLocalContextPropagator; +import org.assertj.core.api.Assertions; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import static com.jayway.awaitility.Awaitility.matches; +import static com.jayway.awaitility.Awaitility.waitAtMost; +import static org.assertj.core.api.Assertions.assertThat; + +public class ContextPropagatorTest { + + @Test + public void contextPropagationFailureSingleTest() { + + ThreadLocal threadlocal = new ThreadLocal(); + threadlocal.set("SingleValueShould_NOT_CrossThreadBoundary"); + + Supplier supplier = () -> threadlocal.get(); + + //Thread boundary + final CompletableFuture future = CompletableFuture.supplyAsync(supplier); + + waitAtMost(5, TimeUnit.SECONDS) + .until(() -> null == future.get()); + } + + @Test + public void contextPropagationEmptyListShouldNotFail() { + + Supplier supplier = () -> "Hello World"; + + //Thread boundary + Supplier decoratedSupplier = ContextPropagator.decorateSupplier(Collections.emptyList(), supplier); + final CompletableFuture future = CompletableFuture.supplyAsync(decoratedSupplier); + + waitAtMost(5, TimeUnit.SECONDS) + .until(() -> "Hello World" == future.get()); + } + + @Test + public void contextPropagationFailureMultipleTest() throws Exception { + + ThreadLocal threadLocalOne = new ThreadLocal(); + threadLocalOne.set("FirstValueShould_NOT_CrossThreadBoundary"); + + ThreadLocal threadLocalTwo = new ThreadLocal(); + threadLocalTwo.set("SecondValueShould_NOT_CrossThreadBoundary"); + + TestThreadLocalContextPropagator propagatorOne = new TestThreadLocalContextPropagator(threadLocalOne); + TestThreadLocalContextPropagator propagatorTwo = new TestThreadLocalContextPropagator(threadLocalTwo); + + Supplier> supplier = () -> Arrays.asList( + threadLocalOne.get(), + threadLocalTwo.get() + ); + + //Thread boundary + final CompletableFuture> future = CompletableFuture.supplyAsync(supplier); + + waitAtMost(5, TimeUnit.SECONDS).until(matches( + () -> assertThat(future.get()).containsExactlyInAnyOrder(null, null) + )); + } + + @Test + public void contextPropagationSupplierMultipleTest() { + + ThreadLocal threadLocalOne = new ThreadLocal(); + threadLocalOne.set("FirstValueShouldCrossThreadBoundary"); + + ThreadLocal threadLocalTwo = new ThreadLocal(); + threadLocalTwo.set("SecondValueShouldCrossThreadBoundary"); + + TestThreadLocalContextPropagator propagatorOne = new TestThreadLocalContextPropagator(threadLocalOne); + TestThreadLocalContextPropagator propagatorTwo = new TestThreadLocalContextPropagator(threadLocalTwo); + + + Supplier> supplier = ContextPropagator.decorateSupplier(Arrays.asList(propagatorOne, propagatorTwo), () -> Arrays.asList( + threadLocalOne.get(), + threadLocalTwo.get() + )); + + //Thread boundary + final CompletableFuture> future = CompletableFuture.supplyAsync(supplier); + + waitAtMost(5, TimeUnit.SECONDS) + .until(matches(() -> Assertions.assertThat(future.get()).containsExactlyInAnyOrder( + "FirstValueShouldCrossThreadBoundary", + "SecondValueShouldCrossThreadBoundary") + )); + } + + @Test + public void contextPropagationSupplierSingleTest() { + + ThreadLocal threadlocal = new ThreadLocal(); + threadlocal.set("SingleValueShouldCrossThreadBoundary"); + + Supplier supplier = ContextPropagator + .decorateSupplier(new TestThreadLocalContextPropagator(threadlocal), + () -> threadlocal.get()); + + //Thread boundary + final CompletableFuture future = CompletableFuture.supplyAsync(supplier); + + waitAtMost(5, TimeUnit.SECONDS) + .until(() -> "SingleValueShouldCrossThreadBoundary" == future.get()); + } + + @Test + public void contextPropagationRunnableFailureSingleTest() { + + AtomicReference reference = new AtomicReference(); + + //Thread boundary + Runnable runnable = ContextPropagator + .decorateRunnable(Collections.emptyList(), + () -> reference.set("Hello World")); + + CompletableFuture.runAsync(runnable); + + waitAtMost(5, TimeUnit.SECONDS) + .until(() -> "Hello World" == reference.get()); + } + + @Test + public void contextPropagationRunnableEmptyListShouldNotFail() { + + ThreadLocal threadlocal = new ThreadLocal(); + threadlocal.set("SingleValueShould_NOT_CrossThreadBoundary"); + + AtomicReference reference = new AtomicReference(); + + Runnable runnable = () -> reference.set(threadlocal.get()); + + //Thread boundary + CompletableFuture.runAsync(runnable); + + waitAtMost(5, TimeUnit.SECONDS) + .until(() -> null == reference.get()); + } + + @Test + public void contextPropagationRunnableSingleTest() { + + ThreadLocal threadlocal = new ThreadLocal(); + threadlocal.set("SingleValueShouldCrossThreadBoundary"); + + AtomicReference reference = new AtomicReference(); + + Runnable runnable = ContextPropagator + .decorateRunnable(new TestThreadLocalContextPropagator(threadlocal), + () -> reference.set(threadlocal.get())); + + //Thread boundary + CompletableFuture.runAsync(runnable); + + waitAtMost(5, TimeUnit.SECONDS) + .until(() -> "SingleValueShouldCrossThreadBoundary" == reference.get()); + } + + @Test + public void contextPropagationRunnableMultipleTest() { + + ThreadLocal threadLocalOne = new ThreadLocal(); + threadLocalOne.set("FirstValueShouldCrossThreadBoundary"); + + ThreadLocal threadLocalTwo = new ThreadLocal(); + threadLocalTwo.set("SecondValueShouldCrossThreadBoundary"); + + TestThreadLocalContextPropagator propagatorOne = new TestThreadLocalContextPropagator(threadLocalOne); + TestThreadLocalContextPropagator propagatorTwo = new TestThreadLocalContextPropagator(threadLocalTwo); + + + AtomicReference reference = new AtomicReference(); + + Runnable runnable = ContextPropagator + .decorateRunnable(Arrays.asList(propagatorOne, propagatorTwo), + () -> reference.set(Arrays.asList( + threadLocalOne.get(), + threadLocalTwo.get() + ))); + + + //Thread boundary + CompletableFuture.runAsync(runnable); + + waitAtMost(5, TimeUnit.SECONDS) + .until(matches(() -> assertThat(reference.get()).containsExactlyInAnyOrder( + "FirstValueShouldCrossThreadBoundary", + "SecondValueShouldCrossThreadBoundary"))); + } + + @Test + public void contextPropagationRunnableMultipleFailureTest() { + + ThreadLocal threadLocalOne = new ThreadLocal(); + threadLocalOne.set("FirstValueShouldCross_NOT_ThreadBoundary"); + + ThreadLocal threadLocalTwo = new ThreadLocal(); + threadLocalTwo.set("SecondValueShould_NOT_CrossThreadBoundary"); + + + AtomicReference reference = new AtomicReference(); + + Runnable runnable = () -> reference.set(Arrays.asList( + threadLocalOne.get(), + threadLocalTwo.get() + )); + + + //Thread boundary + CompletableFuture.runAsync(runnable); + + waitAtMost(5, TimeUnit.SECONDS) + .until(matches(() -> assertThat(reference.get()).containsExactlyInAnyOrder( + null, null))); + } +} diff --git a/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/TestContextPropagators.java b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/TestContextPropagators.java new file mode 100644 index 0000000000..1679f61d21 --- /dev/null +++ b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/TestContextPropagators.java @@ -0,0 +1,90 @@ +package io.github.resilience4j.bulkhead; + +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Supplier; + +public class TestContextPropagators { + + public static class TestThreadLocalContextPropagator implements ContextPropagator { + private ThreadLocal threadLocal; + + public TestThreadLocalContextPropagator(ThreadLocal threadLocal) { + this.threadLocal = threadLocal; + } + + @Override + public Supplier> retrieve() { + return () -> (Optional) Optional.ofNullable(threadLocal.get()); + } + + @Override + public Consumer> copy() { + return (t) -> t.ifPresent(e -> { + if (threadLocal.get() != null) { + threadLocal.set(null); + threadLocal.remove(); + } + threadLocal.set(e); + }); + } + + @Override + public Consumer> clear() { + return (t) -> { + if (threadLocal.get() != null) { + threadLocal.set(null); + threadLocal.remove(); + } + }; + } + } + + public static class TestThreadLocalContextPropagatorWithHolder implements ContextPropagator { + + @Override + public Supplier> retrieve() { + return () -> (Optional) TestThreadLocalContextHolder.get(); + } + + @Override + public Consumer> copy() { + return (t) -> t.ifPresent(e -> { + clear(); + TestThreadLocalContextHolder.put(e); + }); + } + + @Override + public Consumer> clear() { + return (t) -> TestThreadLocalContextHolder.clear(); + } + + public static class TestThreadLocalContextHolder { + + private static final ThreadLocal threadLocal = new ThreadLocal(); + + private TestThreadLocalContextHolder() { + } + + public static void put(Object context) { + if (threadLocal.get() != null) { + clear(); + } + threadLocal.set(context); + } + + public static void clear() { + if (threadLocal.get() != null) { + threadLocal.set(null); + threadLocal.remove(); + } + } + + public static Optional get() { + return Optional.ofNullable(threadLocal.get()); + } + } + } +} + diff --git a/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/ThreadPoolBulkheadConfigTest.java b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/ThreadPoolBulkheadConfigTest.java index 6d5c7e6a4e..5e3c41a4aa 100644 --- a/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/ThreadPoolBulkheadConfigTest.java +++ b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/ThreadPoolBulkheadConfigTest.java @@ -21,6 +21,11 @@ import org.junit.Test; import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -45,6 +50,8 @@ public void testBuildCustom() { assertThat(config.getCoreThreadPoolSize()).isEqualTo(coreThreadPoolSize); assertThat(config.getKeepAliveDuration().toMillis()).isEqualTo(maxWait); assertThat(config.getQueueCapacity()).isEqualTo(queueCapacity); + assertThat(config.getContextPropagator()).isEmpty(); + } @Test @@ -67,6 +74,7 @@ public void testCreateFromBaseConfig() { assertThat(config.getCoreThreadPoolSize()).isEqualTo(coreThreadPoolSize); assertThat(config.getKeepAliveDuration().toMillis()).isEqualTo(maxWait); assertThat(config.getQueueCapacity()).isEqualTo(queueCapacity); + assertThat(config.getContextPropagator()).isEmpty(); } @Test(expected = IllegalArgumentException.class) @@ -107,4 +115,123 @@ public void testBuildWithIllegalMaxCoreThreads() { .build(); } + @Test + public void testContextPropagatorConfig() { + + ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig + .custom() + .contextPropagator(TestCtxPropagator.class) + .build(); + + assertThat(config).isNotNull(); + assertThat(config.getContextPropagator()).isNotNull(); + assertThat(config.getContextPropagator().size()).isEqualTo(1); + assertThat(config.getContextPropagator().get(0).getClass()).isEqualTo(TestCtxPropagator.class); + } + + @Test + public void testContextPropagatorConfigDefault() { + + int maxThreadPoolSize = 20; + int coreThreadPoolSize = 2; + long maxWait = 555; + int queueCapacity = 50; + + ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom() + .maxThreadPoolSize(maxThreadPoolSize) + .coreThreadPoolSize(coreThreadPoolSize) + .queueCapacity(queueCapacity) + .keepAliveDuration(Duration.ofMillis(maxWait)) + .build(); + + assertThat(config).isNotNull(); + assertThat(config.getContextPropagator()).isNotNull(); + assertThat(config.getContextPropagator()).isEmpty(); + } + + @Test + public void testContextPropagatorSetAsBean() { + + int maxThreadPoolSize = 20; + int coreThreadPoolSize = 2; + long maxWait = 555; + int queueCapacity = 50; + + ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom() + .maxThreadPoolSize(maxThreadPoolSize) + .coreThreadPoolSize(coreThreadPoolSize) + .queueCapacity(queueCapacity) + .keepAliveDuration(Duration.ofMillis(maxWait)) + .contextPropagator(new TestCtxPropagator()) + .build(); + + assertThat(config).isNotNull(); + assertThat(config.getContextPropagator()).isNotNull(); + assertThat(config.getContextPropagator()).hasSize(1); + assertThat(config.getContextPropagator().get(0).getClass()).isEqualTo(TestCtxPropagator.class); + } + + @Test + public void testContextPropagatorSetAsBeanOverrideSetAsClass() { + + int maxThreadPoolSize = 20; + int coreThreadPoolSize = 2; + long maxWait = 555; + int queueCapacity = 50; + + ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom() + .maxThreadPoolSize(maxThreadPoolSize) + .coreThreadPoolSize(coreThreadPoolSize) + .queueCapacity(queueCapacity) + .keepAliveDuration(Duration.ofMillis(maxWait)) + .contextPropagator(TestCtxPropagator2.class) + //this should override TestCtxPropagator2 context propagator + .contextPropagator(new TestCtxPropagator()) + .build(); + + assertThat(config).isNotNull(); + assertThat(config.getContextPropagator()).isNotNull(); + assertThat(config.getContextPropagator()).hasSize(2); + List> ctxPropagators = config.getContextPropagator() + .stream().map(ct -> ((ContextPropagator) ct).getClass()).collect(Collectors.toList()); + assertThat(ctxPropagators).containsExactlyInAnyOrder(TestCtxPropagator.class, TestCtxPropagator2.class); + + } + + public static class TestCtxPropagator implements ContextPropagator { + + @Override + public Supplier> retrieve() { + return null; + } + + @Override + public Consumer> copy() { + return null; + } + + @Override + public Consumer> clear() { + return null; + } + } + + public static class TestCtxPropagator2 implements ContextPropagator { + + @Override + public Supplier> retrieve() { + return null; + } + + @Override + public Consumer> copy() { + return null; + } + + @Override + public Consumer> clear() { + return null; + } + } + } diff --git a/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/internal/FixedThreadPoolBulkheadTest.java b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/internal/FixedThreadPoolBulkheadTest.java index 1129241767..6e428834b8 100644 --- a/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/internal/FixedThreadPoolBulkheadTest.java +++ b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/internal/FixedThreadPoolBulkheadTest.java @@ -18,18 +18,26 @@ */ package io.github.resilience4j.bulkhead.internal; + +import io.github.resilience4j.bulkhead.TestContextPropagators; +import io.github.resilience4j.bulkhead.TestContextPropagators.TestThreadLocalContextPropagatorWithHolder.TestThreadLocalContextHolder; import io.github.resilience4j.bulkhead.ThreadPoolBulkhead; import io.github.resilience4j.bulkhead.ThreadPoolBulkheadConfig; import org.junit.Before; import org.junit.Test; import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import static com.jayway.awaitility.Awaitility.waitAtMost; import static org.assertj.core.api.Assertions.assertThat; public class FixedThreadPoolBulkheadTest { private ThreadPoolBulkhead bulkhead; + private FixedThreadPoolBulkhead fixedThreadPoolBulkhead; @Before public void setUp() { @@ -37,9 +45,36 @@ public void setUp() { .maxThreadPoolSize(2) .coreThreadPoolSize(1) .queueCapacity(10) + .contextPropagator(TestContextPropagators.TestThreadLocalContextPropagatorWithHolder.class) .keepAliveDuration(Duration.ofMillis(10)) .build(); bulkhead = ThreadPoolBulkhead.of("test", config); + fixedThreadPoolBulkhead = new FixedThreadPoolBulkhead("testPool", config); + } + + @Test + public void testSupplierThreadLocalContextPropagator() { + + TestThreadLocalContextHolder.put("ValueShouldCrossThreadBoundary"); + + CompletableFuture future = fixedThreadPoolBulkhead + .submit(() -> TestThreadLocalContextHolder.get().orElse(null)); + + waitAtMost(5, TimeUnit.SECONDS) + .until(() -> "ValueShouldCrossThreadBoundary" == future.get()); + } + + @Test + public void testRunnableThreadLocalContextPropagator() { + + TestThreadLocalContextHolder.put("ValueShouldCrossThreadBoundary"); + AtomicReference reference = new AtomicReference(); + + fixedThreadPoolBulkhead + .submit(() -> reference.set(TestThreadLocalContextHolder.get().orElse(null))); + + waitAtMost(5, TimeUnit.SECONDS) + .until(() -> "ValueShouldCrossThreadBoundary" == reference.get()); } @Test diff --git a/resilience4j-core/src/main/java/io/github/resilience4j/core/ClassUtils.java b/resilience4j-core/src/main/java/io/github/resilience4j/core/ClassUtils.java index 5a563d992a..6d9096a97b 100644 --- a/resilience4j-core/src/main/java/io/github/resilience4j/core/ClassUtils.java +++ b/resilience4j-core/src/main/java/io/github/resilience4j/core/ClassUtils.java @@ -3,6 +3,9 @@ import io.github.resilience4j.core.lang.Nullable; import java.lang.reflect.Constructor; +import java.util.Arrays; +import java.util.Objects; +import java.util.Optional; import java.util.function.Predicate; public class ClassUtils { @@ -23,4 +26,23 @@ public static Predicate instantiatePredicateClass(Class T instantiateClassDefConstructor(Class clazz) { + //if constructor present then it should have a no arg constructor + //if not present then default constructor is already their + Objects.requireNonNull(clazz, "class to instantiate should not be null"); + if (clazz.getConstructors().length > 0 && !Arrays.stream(clazz.getConstructors()) + .filter(c -> c.getParameterCount() == 0) + .findFirst().isPresent()) { + throw new InstantiationException( + "Default constructor is required to create instance of public class: " + clazz + .getName()); + } + try { + return clazz.getConstructor().newInstance(); + } catch (Exception e) { + throw new InstantiationException( + "Unable to create instance of class: " + clazz.getName(), e); + } + } } diff --git a/resilience4j-core/src/test/java/io/github/resilience4j/core/ClassUtilsTest.java b/resilience4j-core/src/test/java/io/github/resilience4j/core/ClassUtilsTest.java index f9c841ac34..036afb3187 100644 --- a/resilience4j-core/src/test/java/io/github/resilience4j/core/ClassUtilsTest.java +++ b/resilience4j-core/src/test/java/io/github/resilience4j/core/ClassUtilsTest.java @@ -22,6 +22,22 @@ public void shouldFailToInstantiatePredicateClass() { .hasCauseInstanceOf(NoSuchMethodException.class); } + @Test + public void shouldInstantiateClassWithDefaultConstructor() { + assertThat(ClassUtils.instantiateClassDefConstructor(DefaultConstructor.class)).isNotNull(); + } + + @Test + public void shouldInstantiateClassWithDefaultConstructor2() { + assertThat(ClassUtils.instantiateClassDefConstructor(DefaultConstructor2.class)).isNotNull(); + } + + @Test + public void shouldFailToInstantiateNoDefaultConstructor() { + assertThatThrownBy( + () -> ClassUtils.instantiateClassDefConstructor(NoDefaultConstructor.class)) + .isInstanceOf(InstantiationException.class); + } public static class PublicPredicate implements Predicate { @@ -45,4 +61,13 @@ public boolean test(String o) { return o.equals(bla); } } + + public static class NoDefaultConstructor { + public NoDefaultConstructor(String a){} + } + public static class DefaultConstructor {} + public static class DefaultConstructor2 { + public DefaultConstructor2(String a){} + public DefaultConstructor2(){} + } } diff --git a/resilience4j-framework-common/src/main/java/io/github/resilience4j/common/bulkhead/configuration/BulkheadConfigCustomizer.java b/resilience4j-framework-common/src/main/java/io/github/resilience4j/common/bulkhead/configuration/BulkheadConfigCustomizer.java index 8134c30014..167b3ed9de 100644 --- a/resilience4j-framework-common/src/main/java/io/github/resilience4j/common/bulkhead/configuration/BulkheadConfigCustomizer.java +++ b/resilience4j-framework-common/src/main/java/io/github/resilience4j/common/bulkhead/configuration/BulkheadConfigCustomizer.java @@ -2,6 +2,9 @@ import io.github.resilience4j.bulkhead.BulkheadConfig; import io.github.resilience4j.common.CustomizerWithName; +import io.github.resilience4j.core.lang.NonNull; + +import java.util.function.Consumer; /** * Enable customization bulkhead configuration builders programmatically. @@ -15,4 +18,28 @@ public interface BulkheadConfigCustomizer extends CustomizerWithName { */ void customize(BulkheadConfig.Builder configBuilder); + /** + * A convenient method to create BulkheadConfigCustomizer using {@link Consumer} + * + * @param instanceName the name of the instance + * @param consumer delegate call to Consumer when {@link BulkheadConfigCustomizer#customize(BulkheadConfig.Builder)} + * is called + * @param generic type of Customizer + * @return Customizer instance + */ + static BulkheadConfigCustomizer of(@NonNull String instanceName, + @NonNull Consumer consumer) { + return new BulkheadConfigCustomizer() { + + @Override + public void customize(BulkheadConfig.Builder builder) { + consumer.accept(builder); + } + + @Override + public String name() { + return instanceName; + } + }; + } } diff --git a/resilience4j-framework-common/src/main/java/io/github/resilience4j/common/bulkhead/configuration/ThreadPoolBulkheadConfigCustomizer.java b/resilience4j-framework-common/src/main/java/io/github/resilience4j/common/bulkhead/configuration/ThreadPoolBulkheadConfigCustomizer.java index 18b0948f38..9763d33c50 100644 --- a/resilience4j-framework-common/src/main/java/io/github/resilience4j/common/bulkhead/configuration/ThreadPoolBulkheadConfigCustomizer.java +++ b/resilience4j-framework-common/src/main/java/io/github/resilience4j/common/bulkhead/configuration/ThreadPoolBulkheadConfigCustomizer.java @@ -2,6 +2,9 @@ import io.github.resilience4j.bulkhead.ThreadPoolBulkheadConfig; import io.github.resilience4j.common.CustomizerWithName; +import io.github.resilience4j.core.lang.NonNull; + +import java.util.function.Consumer; /** * Enable customization thread pool bulkhead configuration builders programmatically. @@ -15,4 +18,28 @@ public interface ThreadPoolBulkheadConfigCustomizer extends CustomizerWithName { */ void customize(ThreadPoolBulkheadConfig.Builder configBuilder); + /** + * A convenient method to create ThreadpoolBulkheadConfigCustomizer using {@link Consumer} + * + * @param instanceName the name of the instance + * @param consumer delegate call to Consumer when {@link ThreadPoolBulkheadConfigCustomizer#customize(ThreadPoolBulkheadConfig.Builder)} + * is called + * @param generic type of Customizer + * @return Customizer instance + */ + static ThreadPoolBulkheadConfigCustomizer of(@NonNull String instanceName, + @NonNull Consumer consumer) { + return new ThreadPoolBulkheadConfigCustomizer() { + + @Override + public void customize(ThreadPoolBulkheadConfig.Builder builder) { + consumer.accept(builder); + } + + @Override + public String name() { + return instanceName; + } + }; + } } diff --git a/resilience4j-framework-common/src/main/java/io/github/resilience4j/common/bulkhead/configuration/ThreadPoolBulkheadConfigurationProperties.java b/resilience4j-framework-common/src/main/java/io/github/resilience4j/common/bulkhead/configuration/ThreadPoolBulkheadConfigurationProperties.java index 39d3b4fc50..9e6061ac41 100644 --- a/resilience4j-framework-common/src/main/java/io/github/resilience4j/common/bulkhead/configuration/ThreadPoolBulkheadConfigurationProperties.java +++ b/resilience4j-framework-common/src/main/java/io/github/resilience4j/common/bulkhead/configuration/ThreadPoolBulkheadConfigurationProperties.java @@ -16,6 +16,7 @@ package io.github.resilience4j.common.bulkhead.configuration; import io.github.resilience4j.bulkhead.Bulkhead; +import io.github.resilience4j.bulkhead.ContextPropagator; import io.github.resilience4j.bulkhead.ThreadPoolBulkheadConfig; import io.github.resilience4j.common.CommonProperties; import io.github.resilience4j.common.CompositeCustomizer; @@ -111,10 +112,12 @@ public ThreadPoolBulkheadConfig buildThreadPoolBulkheadConfig( if (properties.getWritableStackTraceEnabled() != null) { builder.writableStackTraceEnabled(properties.getWritableStackTraceEnabled()); } + if(properties.getContextPropagators() != null){ + builder.contextPropagator(properties.getContextPropagators()); + } compositeThreadPoolBulkheadCustomizer.getCustomizer(instanceName).ifPresent( threadPoolBulkheadConfigCustomizer -> threadPoolBulkheadConfigCustomizer .customize(builder)); - return builder.build(); } @@ -138,6 +141,11 @@ public static class InstanceProperties { private int queueCapacity; private Duration keepAliveDuration; + @Nullable + private Class[] contextPropagators; + + + public int getMaxThreadPoolSize() { return maxThreadPoolSize; } @@ -220,5 +228,22 @@ public InstanceProperties setBaseConfig(String baseConfig) { this.baseConfig = baseConfig; return this; } + + /** + * Getter return array of {@link ContextPropagator} class + * @return array of {@link ContextPropagator} classes + */ + public Class[] getContextPropagators() { return contextPropagators; } + + /** + * Set the class type of {@link ContextPropagator} + * + * @param contextPropagators subclass of {@link ContextPropagator} + * @return return builder instance back for fluent set up + */ + public InstanceProperties setContextPropagator(Class... contextPropagators) { + this.contextPropagators = contextPropagators; + return this; + } } } diff --git a/resilience4j-framework-common/src/main/java/io/github/resilience4j/common/circuitbreaker/configuration/CircuitBreakerConfigCustomizer.java b/resilience4j-framework-common/src/main/java/io/github/resilience4j/common/circuitbreaker/configuration/CircuitBreakerConfigCustomizer.java index 5716cf813c..82f75deb7d 100644 --- a/resilience4j-framework-common/src/main/java/io/github/resilience4j/common/circuitbreaker/configuration/CircuitBreakerConfigCustomizer.java +++ b/resilience4j-framework-common/src/main/java/io/github/resilience4j/common/circuitbreaker/configuration/CircuitBreakerConfigCustomizer.java @@ -2,6 +2,9 @@ import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; import io.github.resilience4j.common.CustomizerWithName; +import io.github.resilience4j.core.lang.NonNull; + +import java.util.function.Consumer; /** * Enable customization circuit breaker configuration builders programmatically. @@ -15,4 +18,29 @@ public interface CircuitBreakerConfigCustomizer extends CustomizerWithName { */ void customize(CircuitBreakerConfig.Builder configBuilder); + /** + * A convenient method to create CircuitBreakerConfigCustomizer using {@link Consumer} + * + * @param instanceName the name of the instance + * @param consumer delegate call to Consumer when {@link CircuitBreakerConfigCustomizer#customize(CircuitBreakerConfig.Builder)} + * is called + * @param generic type of Customizer + * @return Customizer instance + */ + static CircuitBreakerConfigCustomizer of(@NonNull String instanceName, + @NonNull Consumer consumer) { + return new CircuitBreakerConfigCustomizer() { + + @Override + public void customize(CircuitBreakerConfig.Builder builder) { + consumer.accept(builder); + } + + @Override + public String name() { + return instanceName; + } + }; + } + } diff --git a/resilience4j-framework-common/src/main/java/io/github/resilience4j/common/ratelimiter/configuration/RateLimiterConfigCustomizer.java b/resilience4j-framework-common/src/main/java/io/github/resilience4j/common/ratelimiter/configuration/RateLimiterConfigCustomizer.java index 6278a94eac..a11a32d292 100644 --- a/resilience4j-framework-common/src/main/java/io/github/resilience4j/common/ratelimiter/configuration/RateLimiterConfigCustomizer.java +++ b/resilience4j-framework-common/src/main/java/io/github/resilience4j/common/ratelimiter/configuration/RateLimiterConfigCustomizer.java @@ -1,8 +1,11 @@ package io.github.resilience4j.common.ratelimiter.configuration; import io.github.resilience4j.common.CustomizerWithName; +import io.github.resilience4j.core.lang.NonNull; import io.github.resilience4j.ratelimiter.RateLimiterConfig; +import java.util.function.Consumer; + /** * Enable customization rate limiter configuration builders programmatically. */ @@ -15,4 +18,28 @@ public interface RateLimiterConfigCustomizer extends CustomizerWithName { */ void customize(RateLimiterConfig.Builder configBuilder); + /** + * A convenient method to create RateLimiterConfigCustomizer using {@link Consumer} + * + * @param instanceName the name of the instance + * @param consumer delegate call to Consumer when {@link RateLimiterConfigCustomizer#customize(RateLimiterConfig.Builder)} + * is called + * @param generic type of Customizer + * @return Customizer instance + */ + static RateLimiterConfigCustomizer of(@NonNull String instanceName, + @NonNull Consumer consumer) { + return new RateLimiterConfigCustomizer() { + + @Override + public void customize(RateLimiterConfig.Builder builder) { + consumer.accept(builder); + } + + @Override + public String name() { + return instanceName; + } + }; + } } diff --git a/resilience4j-framework-common/src/main/java/io/github/resilience4j/common/retry/configuration/RetryConfigCustomizer.java b/resilience4j-framework-common/src/main/java/io/github/resilience4j/common/retry/configuration/RetryConfigCustomizer.java index 17e4d93c1e..c47d7c7d8d 100644 --- a/resilience4j-framework-common/src/main/java/io/github/resilience4j/common/retry/configuration/RetryConfigCustomizer.java +++ b/resilience4j-framework-common/src/main/java/io/github/resilience4j/common/retry/configuration/RetryConfigCustomizer.java @@ -1,8 +1,11 @@ package io.github.resilience4j.common.retry.configuration; import io.github.resilience4j.common.CustomizerWithName; +import io.github.resilience4j.core.lang.NonNull; import io.github.resilience4j.retry.RetryConfig; +import java.util.function.Consumer; + /** * Enable customization retry configuration builders programmatically. */ @@ -14,4 +17,29 @@ public interface RetryConfigCustomizer extends CustomizerWithName { * @param configBuilder to be customized */ void customize(RetryConfig.Builder configBuilder); + + /** + * A convenient method to create RetryConfigCustomizer using {@link Consumer} + * + * @param instanceName the name of the instance + * @param consumer delegate call to Consumer when {@link RetryConfigCustomizer#customize(RetryConfig.Builder)} + * is called + * @param generic type of Customizer + * @return Customizer instance + */ + static RetryConfigCustomizer of(@NonNull String instanceName, + @NonNull Consumer consumer) { + return new RetryConfigCustomizer() { + + @Override + public void customize(RetryConfig.Builder builder) { + consumer.accept(builder); + } + + @Override + public String name() { + return instanceName; + } + }; + } } diff --git a/resilience4j-spring-boot-common/src/main/java/io/github/resilience4j/bulkhead/autoconfigure/AbstractBulkheadConfigurationOnMissingBean.java b/resilience4j-spring-boot-common/src/main/java/io/github/resilience4j/bulkhead/autoconfigure/AbstractBulkheadConfigurationOnMissingBean.java index 909f9b3f4d..f8282d48cd 100644 --- a/resilience4j-spring-boot-common/src/main/java/io/github/resilience4j/bulkhead/autoconfigure/AbstractBulkheadConfigurationOnMissingBean.java +++ b/resilience4j-spring-boot-common/src/main/java/io/github/resilience4j/bulkhead/autoconfigure/AbstractBulkheadConfigurationOnMissingBean.java @@ -129,7 +129,6 @@ public ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry( EventConsumerRegistry bulkheadEventConsumerRegistry, RegistryEventConsumer threadPoolBulkheadRegistryEventConsumer, @Qualifier("compositeThreadPoolBulkheadCustomizer") CompositeCustomizer compositeThreadPoolBulkheadCustomizer) { - return threadPoolBulkheadConfiguration.threadPoolBulkheadRegistry( threadPoolBulkheadConfigurationProperties, bulkheadEventConsumerRegistry, threadPoolBulkheadRegistryEventConsumer, compositeThreadPoolBulkheadCustomizer); diff --git a/resilience4j-spring-boot-common/src/test/java/io/github/resilience4j/SpringBootCommonTest.java b/resilience4j-spring-boot-common/src/test/java/io/github/resilience4j/SpringBootCommonTest.java index 7f60e5be5f..2d4734b643 100644 --- a/resilience4j-spring-boot-common/src/test/java/io/github/resilience4j/SpringBootCommonTest.java +++ b/resilience4j-spring-boot-common/src/test/java/io/github/resilience4j/SpringBootCommonTest.java @@ -23,6 +23,8 @@ import io.github.resilience4j.circuitbreaker.autoconfigure.AbstractCircuitBreakerConfigurationOnMissingBean; import io.github.resilience4j.circuitbreaker.configure.CircuitBreakerConfigurationProperties; import io.github.resilience4j.common.CompositeCustomizer; +import io.github.resilience4j.common.bulkhead.configuration.BulkheadConfigCustomizer; +import io.github.resilience4j.common.bulkhead.configuration.ThreadPoolBulkheadConfigCustomizer; import io.github.resilience4j.common.bulkhead.configuration.ThreadPoolBulkheadConfigurationProperties; import io.github.resilience4j.common.circuitbreaker.configuration.CircuitBreakerConfigCustomizer; import io.github.resilience4j.consumer.DefaultEventConsumerRegistry; @@ -55,12 +57,13 @@ public void testBulkHeadCommonConfig() { .bulkheadRegistry(new BulkheadConfigurationProperties(), new DefaultEventConsumerRegistry<>(), new CompositeRegistryEventConsumer<>(Collections.emptyList()), - new CompositeCustomizer<>(Collections.emptyList()))).isNotNull(); + new CompositeCustomizer<>(Collections.singletonList(BulkheadConfigCustomizer.of("backend", builder -> builder.maxConcurrentCalls(10)))))).isNotNull(); assertThat(bulkheadConfigurationOnMissingBean .threadPoolBulkheadRegistry(new ThreadPoolBulkheadConfigurationProperties(), new DefaultEventConsumerRegistry<>(), new CompositeRegistryEventConsumer<>(Collections.emptyList()), - new CompositeCustomizer<>(Collections.emptyList()))).isNotNull(); + new CompositeCustomizer<>(Collections.singletonList( + ThreadPoolBulkheadConfigCustomizer.of("backend", builder -> builder.coreThreadPoolSize(10)))))).isNotNull(); assertThat(bulkheadConfigurationOnMissingBean.reactorBulkHeadAspectExt()).isNotNull(); assertThat(bulkheadConfigurationOnMissingBean.rxJava2BulkHeadAspectExt()).isNotNull(); assertThat(bulkheadConfigurationOnMissingBean diff --git a/resilience4j-spring-boot2/src/test/java/io/github/resilience4j/TestThreadLocalContextPropagator.java b/resilience4j-spring-boot2/src/test/java/io/github/resilience4j/TestThreadLocalContextPropagator.java new file mode 100644 index 0000000000..0b71d5d33d --- /dev/null +++ b/resilience4j-spring-boot2/src/test/java/io/github/resilience4j/TestThreadLocalContextPropagator.java @@ -0,0 +1,57 @@ +package io.github.resilience4j; + +import io.github.resilience4j.bulkhead.ContextPropagator; + +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static io.github.resilience4j.TestThreadLocalContextPropagator.TestThreadLocalContextHolder.*; + +public class TestThreadLocalContextPropagator implements ContextPropagator { + + @Override + public Supplier> retrieve() { + return () -> (Optional) get(); + } + + @Override + public Consumer> copy() { + return (t) -> t.ifPresent(e -> { + clear(); + put(e); + }); + } + + @Override + public Consumer> clear() { + return (t) -> TestThreadLocalContextHolder.clear(); + } + + public static class TestThreadLocalContextHolder { + + private static final ThreadLocal threadLocal = new ThreadLocal(); + + private TestThreadLocalContextHolder() { + } + + public static void put(Object context) { + if (threadLocal.get() != null) { + clear(); + } + threadLocal.set(context); + } + + public static void clear() { + if (threadLocal.get() != null) { + threadLocal.set(null); + threadLocal.remove(); + } + } + + public static Optional get() { + return Optional.ofNullable(threadLocal.get()); + } + } +} + diff --git a/resilience4j-spring-boot2/src/test/java/io/github/resilience4j/bulkhead/BulkheadAutoConfigurationTest.java b/resilience4j-spring-boot2/src/test/java/io/github/resilience4j/bulkhead/BulkheadAutoConfigurationTest.java index 3211f715a5..b889e3b362 100644 --- a/resilience4j-spring-boot2/src/test/java/io/github/resilience4j/bulkhead/BulkheadAutoConfigurationTest.java +++ b/resilience4j-spring-boot2/src/test/java/io/github/resilience4j/bulkhead/BulkheadAutoConfigurationTest.java @@ -17,13 +17,19 @@ import com.github.tomakehurst.wiremock.client.WireMock; import com.github.tomakehurst.wiremock.junit.WireMockRule; +import io.github.resilience4j.TestThreadLocalContextPropagator; +import io.github.resilience4j.TestThreadLocalContextPropagator.TestThreadLocalContextHolder; import io.github.resilience4j.bulkhead.autoconfigure.BulkheadProperties; import io.github.resilience4j.bulkhead.autoconfigure.ThreadPoolBulkheadProperties; import io.github.resilience4j.bulkhead.configure.BulkheadAspect; import io.github.resilience4j.bulkhead.event.BulkheadEvent; +import io.github.resilience4j.common.CompositeCustomizer; +import io.github.resilience4j.common.bulkhead.configuration.BulkheadConfigCustomizer; +import io.github.resilience4j.common.bulkhead.configuration.ThreadPoolBulkheadConfigCustomizer; import io.github.resilience4j.common.bulkhead.monitoring.endpoint.BulkheadEndpointResponse; import io.github.resilience4j.common.bulkhead.monitoring.endpoint.BulkheadEventDTO; import io.github.resilience4j.common.bulkhead.monitoring.endpoint.BulkheadEventsEndpointResponse; +import io.github.resilience4j.service.test.BeanContextPropagator; import io.github.resilience4j.service.test.DummyFeignClient; import io.github.resilience4j.service.test.TestApplication; import io.github.resilience4j.service.test.bulkhead.BulkheadDummyService; @@ -39,16 +45,17 @@ import org.springframework.http.ResponseEntity; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.Map; +import java.util.concurrent.*; +import java.util.stream.Collectors; import static com.jayway.awaitility.Awaitility.await; import static java.util.concurrent.CompletableFuture.runAsync; import static org.assertj.core.api.Assertions.assertThat; +import static org.springframework.test.util.ReflectionTestUtils.getField; @RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, @@ -75,6 +82,55 @@ public class BulkheadAutoConfigurationTest { private TestRestTemplate restTemplate; @Autowired private DummyFeignClient dummyFeignClient; + @Autowired + private CompositeCustomizer compositeThreadPoolBulkheadCustomizer; + @Autowired + private CompositeCustomizer compositeBulkheadCustomizer; + + @Test + public void testThreadPoolBulkheadCustomizer() { + Map customizerMap = (Map) getField( + compositeThreadPoolBulkheadCustomizer, "customizerMap"); + assertThat(customizerMap).isNotNull().hasSize(1).containsKeys("backendC"); + + //ContextPropagator set by properties + ThreadPoolBulkhead bulkheadD = threadPoolBulkheadRegistry + .bulkhead(BulkheadDummyService.BACKEND_D); + + assertThat(bulkheadD).isNotNull(); + assertThat(bulkheadD.getBulkheadConfig()).isNotNull(); + assertThat(bulkheadD.getBulkheadConfig().getContextPropagator()).isNotNull(); + assertThat(bulkheadD.getBulkheadConfig().getContextPropagator().size()).isEqualTo(1); + assertThat(bulkheadD.getBulkheadConfig().getContextPropagator().get(0).getClass()) + .isEqualTo(TestThreadLocalContextPropagator.class); + + //ContextPropagator set by bean using Registry Customizer + ThreadPoolBulkhead bulkheadC = threadPoolBulkheadRegistry + .bulkhead(BulkheadDummyService.BACKEND_C); + + assertThat(bulkheadC).isNotNull(); + assertThat(bulkheadC.getBulkheadConfig()).isNotNull(); + assertThat(bulkheadC.getBulkheadConfig().getContextPropagator()).isNotNull(); + assertThat(bulkheadC.getBulkheadConfig().getContextPropagator().size()).isEqualTo(1); + assertThat(bulkheadC.getBulkheadConfig().getContextPropagator().get(0).getClass()) + .isEqualTo(BeanContextPropagator.class); + } + + @Test + public void testBulkheadCustomizer() { + Map customizerMap = (Map) getField( + compositeBulkheadCustomizer, "customizerMap"); + assertThat(customizerMap).isNotNull().hasSize(2).containsKeys("backendCustomizer","backendD"); + + Bulkhead backendCustomizer = bulkheadRegistry.bulkhead("backendCustomizer"); + + assertThat(backendCustomizer).isNotNull(); + assertThat(backendCustomizer.getBulkheadConfig()).isNotNull(); + assertThat(backendCustomizer.getBulkheadConfig().getMaxWaitDuration()).isEqualTo(Duration.ofMillis(100)); + + //updated by Customizer + assertThat(backendCustomizer.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(20); + } /** * This test verifies that the combination of @FeignClient and @Bulkhead annotation works as @@ -156,8 +212,8 @@ public void testBulkheadAutoConfigurationThreadPool() { ResponseEntity bulkheadList = restTemplate .getForEntity("/actuator/bulkheads", BulkheadEndpointResponse.class); - assertThat(bulkheadList.getBody().getBulkheads()).hasSize(7) - .containsExactly("backendA", "backendB", "backendB", "backendC", "backendD", "backendD", + assertThat(bulkheadList.getBody().getBulkheads()).hasSize(8) + .containsExactlyInAnyOrder("backendA", "backendB", "backendB", "backendC", "backendCustomizer", "backendD","backendD", "dummyFeignClient"); for (int i = 0; i < 5; i++) { @@ -184,7 +240,53 @@ public void testBulkheadAutoConfigurationThreadPool() { es.shutdown(); // test thread pool customizer final ThreadPoolBulkhead backendD = threadPoolBulkheadRegistry.bulkhead("backendD"); - assertThat(backendD.getBulkheadConfig().getMaxThreadPoolSize()).isEqualTo(2); + assertThat(backendD.getBulkheadConfig().getMaxThreadPoolSize()).isEqualTo(1); + } + + /** + * The test verifies that a Bulkhead instance is created and configured properly and is able to + * transfer context from ThreadLocal + */ + @Test + public void testBulkheadAutoConfigurationThreadPoolContextPropagation() + throws InterruptedException, TimeoutException, ExecutionException { + assertThat(threadPoolBulkheadRegistry).isNotNull(); + assertThat(threadPoolBulkheadProperties).isNotNull(); + + TestThreadLocalContextHolder.put("SurviveThreadBoundary"); + + ThreadPoolBulkhead bulkhead = threadPoolBulkheadRegistry + .bulkhead(BulkheadDummyService.BACKEND_D); + + assertThat(bulkhead).isNotNull(); + assertThat(bulkhead.getBulkheadConfig()).isNotNull(); + assertThat(bulkhead.getBulkheadConfig().getContextPropagator()).isNotNull(); + assertThat(bulkhead.getBulkheadConfig().getContextPropagator().size()).isEqualTo(1); + assertThat(bulkhead.getBulkheadConfig().getContextPropagator().get(0).getClass()) + .isEqualTo(TestThreadLocalContextPropagator.class); + + CompletableFuture future = dummyService + .doSomethingAsyncWithThreadLocal(); + + Object value = future.get(5, TimeUnit.SECONDS); + + assertThat(value).isEqualTo("SurviveThreadBoundary"); + // Test Actuator endpoints + + ResponseEntity bulkheadEventList = getBulkheadEvents( + "/actuator/bulkheadevents"); + List bulkheadEventsByBackend = bulkheadEventList.getBody() + .getBulkheadEvents(); + + bulkheadEventsByBackend = bulkheadEventsByBackend.stream().filter(b -> "backendD".equals(b.getBulkheadName())).collect( + Collectors.toList()); + + assertThat(bulkheadEventsByBackend).isNotNull(); + assertThat(bulkheadEventsByBackend.size()).isEqualTo(2); + assertThat(bulkheadEventsByBackend.stream() + .filter(it -> it.getType() == BulkheadEvent.Type.CALL_PERMITTED).count() == 1); + assertThat(bulkheadEventsByBackend.stream() + .filter(it -> it.getType() == BulkheadEvent.Type.CALL_FINISHED).count() == 1); } @@ -220,9 +322,9 @@ public void testBulkheadAutoConfiguration() { ResponseEntity bulkheadList = restTemplate .getForEntity("/actuator/bulkheads", BulkheadEndpointResponse.class); - assertThat(bulkheadList.getBody().getBulkheads()).hasSize(7) - .containsExactly("backendA", "backendB", "backendB", "backendC", "backendD", "backendD", - "dummyFeignClient"); + assertThat(bulkheadList.getBody().getBulkheads()).hasSize(8) + .containsExactlyInAnyOrder("backendA", "backendB", "backendB", "backendC", "backendCustomizer", + "dummyFeignClient","backendD","backendD"); for (int i = 0; i < 5; i++) { es.submit(dummyService::doSomething); @@ -354,8 +456,8 @@ private void commonAssertions() { ResponseEntity bulkheadList = restTemplate .getForEntity("/actuator/bulkheads", BulkheadEndpointResponse.class); - assertThat(bulkheadList.getBody().getBulkheads()).hasSize(7) - .containsExactly("backendA", "backendB", "backendB", "backendC", "backendD", "backendD", + assertThat(bulkheadList.getBody().getBulkheads()).hasSize(8) + .containsExactlyInAnyOrder("backendA", "backendB", "backendB", "backendC","backendCustomizer", "backendD","backendD", "dummyFeignClient"); ResponseEntity bulkheadEventList = getBulkheadEvents( diff --git a/resilience4j-spring-boot2/src/test/java/io/github/resilience4j/service/test/BeanContextPropagator.java b/resilience4j-spring-boot2/src/test/java/io/github/resilience4j/service/test/BeanContextPropagator.java new file mode 100644 index 0000000000..cd6aee8e84 --- /dev/null +++ b/resilience4j-spring-boot2/src/test/java/io/github/resilience4j/service/test/BeanContextPropagator.java @@ -0,0 +1,27 @@ +package io.github.resilience4j.service.test; + +import io.github.resilience4j.bulkhead.ContextPropagator; + +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Supplier; + +public class BeanContextPropagator implements ContextPropagator { + + @Override + public Supplier retrieve() { + return () -> Optional.empty(); + } + + @Override + public Consumer copy() { + return (t) -> { + }; + } + + @Override + public Consumer clear() { + return (t) -> { + }; + } +} diff --git a/resilience4j-spring-boot2/src/test/java/io/github/resilience4j/service/test/TestApplication.java b/resilience4j-spring-boot2/src/test/java/io/github/resilience4j/service/test/TestApplication.java index 88d25164f1..9742be8286 100644 --- a/resilience4j-spring-boot2/src/test/java/io/github/resilience4j/service/test/TestApplication.java +++ b/resilience4j-spring-boot2/src/test/java/io/github/resilience4j/service/test/TestApplication.java @@ -1,20 +1,21 @@ package io.github.resilience4j.service.test; import io.github.resilience4j.bulkhead.BulkheadConfig; +import io.github.resilience4j.bulkhead.ContextPropagator; import io.github.resilience4j.bulkhead.ThreadPoolBulkheadConfig; -import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; import io.github.resilience4j.common.bulkhead.configuration.BulkheadConfigCustomizer; import io.github.resilience4j.common.bulkhead.configuration.ThreadPoolBulkheadConfigCustomizer; import io.github.resilience4j.common.circuitbreaker.configuration.CircuitBreakerConfigCustomizer; import io.github.resilience4j.common.ratelimiter.configuration.RateLimiterConfigCustomizer; import io.github.resilience4j.common.retry.configuration.RetryConfigCustomizer; -import io.github.resilience4j.ratelimiter.RateLimiterConfig; -import io.github.resilience4j.retry.RetryConfig; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.context.annotation.Bean; +import java.util.List; + + /** * @author bstorozhuk */ @@ -27,51 +28,41 @@ public static void main(String[] args) { } @Bean - public CircuitBreakerConfigCustomizer testCustomizer() { - return new CircuitBreakerConfigCustomizer() { - @Override - public void customize(CircuitBreakerConfig.Builder builder) { - builder.slidingWindowSize(100); - } - - @Override - public String name() { - return "backendC"; - } - }; + public ThreadPoolBulkheadConfigCustomizer contextPropagatorBeanCustomizer( + List contextPropagators) { + return ThreadPoolBulkheadConfigCustomizer.of("backendC", (builder) -> + builder.contextPropagator( + contextPropagators.toArray(new ContextPropagator[contextPropagators.size()]))); + } + @Bean + public BulkheadConfigCustomizer testBulkheadCustomizer() { + return BulkheadConfigCustomizer.of( + "backendCustomizer", + builder -> builder.maxConcurrentCalls(20)); } @Bean - public RateLimiterConfigCustomizer testRateLimiterCustomizer() { - return new RateLimiterConfigCustomizer() { - @Override - public void customize(RateLimiterConfig.Builder builder) { - builder.limitForPeriod(200); - } + public ContextPropagator beanContextPropagator() { + return new BeanContextPropagator(); + } - @Override - public String name() { - return "backendCustomizer"; - } - }; + @Bean + public CircuitBreakerConfigCustomizer testCustomizer() { + return CircuitBreakerConfigCustomizer + .of("backendC", builder -> builder.slidingWindowSize(100)); + } + @Bean + public RateLimiterConfigCustomizer testRateLimiterCustomizer() { + return RateLimiterConfigCustomizer + .of("backendCustomizer", builder -> builder.limitForPeriod(200)); } @Bean public RetryConfigCustomizer testRetryCustomizer() { - return new RetryConfigCustomizer() { - @Override - public void customize(RetryConfig.Builder builder) { - builder.maxAttempts(4); - } - - @Override - public String name() { - return "retryBackendD"; - } - }; - + return RetryConfigCustomizer.of("retryBackendD", + builder -> builder.maxAttempts(4)); } @Bean @@ -88,23 +79,4 @@ public String name() { } }; } - - @Bean - public ThreadPoolBulkheadConfigCustomizer testThreadPoolBulkheadConfigCustomizer() { - return new ThreadPoolBulkheadConfigCustomizer() { - - @Override - public String name() { - return "backendD"; - } - - @Override - public void customize( - ThreadPoolBulkheadConfig.Builder configBuilder) { - configBuilder.maxThreadPoolSize(2); - } - }; - } - - } diff --git a/resilience4j-spring-boot2/src/test/java/io/github/resilience4j/service/test/bulkhead/BulkheadDummyService.java b/resilience4j-spring-boot2/src/test/java/io/github/resilience4j/service/test/bulkhead/BulkheadDummyService.java index d49632066e..8b13276392 100644 --- a/resilience4j-spring-boot2/src/test/java/io/github/resilience4j/service/test/bulkhead/BulkheadDummyService.java +++ b/resilience4j-spring-boot2/src/test/java/io/github/resilience4j/service/test/bulkhead/BulkheadDummyService.java @@ -6,8 +6,11 @@ public interface BulkheadDummyService { String BACKEND = "backendA"; String BACKEND_C = "backendC"; + String BACKEND_D = "backendD"; void doSomething(); CompletableFuture doSomethingAsync() throws InterruptedException; + + CompletableFuture doSomethingAsyncWithThreadLocal() throws InterruptedException; } diff --git a/resilience4j-spring-boot2/src/test/java/io/github/resilience4j/service/test/bulkhead/BulkheadDummyServiceImpl.java b/resilience4j-spring-boot2/src/test/java/io/github/resilience4j/service/test/bulkhead/BulkheadDummyServiceImpl.java index fed5daa952..3bcc0ee35a 100644 --- a/resilience4j-spring-boot2/src/test/java/io/github/resilience4j/service/test/bulkhead/BulkheadDummyServiceImpl.java +++ b/resilience4j-spring-boot2/src/test/java/io/github/resilience4j/service/test/bulkhead/BulkheadDummyServiceImpl.java @@ -1,5 +1,6 @@ package io.github.resilience4j.service.test.bulkhead; +import io.github.resilience4j.TestThreadLocalContextPropagator; import io.github.resilience4j.bulkhead.annotation.Bulkhead; import io.github.resilience4j.retry.annotation.Retry; import org.springframework.stereotype.Component; @@ -27,4 +28,11 @@ public CompletableFuture doSomethingAsync() throws InterruptedException Thread.sleep(500); return CompletableFuture.completedFuture("Test"); } + + @Override + @Bulkhead(name = BulkheadDummyService.BACKEND_D, type = Bulkhead.Type.THREADPOOL) + public CompletableFuture doSomethingAsyncWithThreadLocal() throws InterruptedException { + return CompletableFuture.completedFuture( + TestThreadLocalContextPropagator.TestThreadLocalContextHolder.get().orElse(null)); + } } diff --git a/resilience4j-spring-boot2/src/test/resources/application.yaml b/resilience4j-spring-boot2/src/test/resources/application.yaml index 193413ce1e..11b31764a6 100644 --- a/resilience4j-spring-boot2/src/test/resources/application.yaml +++ b/resilience4j-spring-boot2/src/test/resources/application.yaml @@ -134,6 +134,9 @@ resilience4j.bulkhead: dummyFeignClient: maxWaitDuration: 100 maxConcurrentCalls: 3 + backendCustomizer: + maxWaitDuration: 100 + maxConcurrentCalls: 10 resilience4j.thread-pool-bulkhead": tags: @@ -157,6 +160,8 @@ resilience4j.thread-pool-bulkhead": maxThreadPoolSize: 1 coreThreadPoolSize: 1 queueCapacity: 1 + contextPropagator: + - io.github.resilience4j.TestThreadLocalContextPropagator management.security.enabled: false diff --git a/resilience4j-spring/src/main/java/io/github/resilience4j/bulkhead/configure/BulkheadConfiguration.java b/resilience4j-spring/src/main/java/io/github/resilience4j/bulkhead/configure/BulkheadConfiguration.java index 6f74c971ee..0852d129dd 100644 --- a/resilience4j-spring/src/main/java/io/github/resilience4j/bulkhead/configure/BulkheadConfiguration.java +++ b/resilience4j-spring/src/main/java/io/github/resilience4j/bulkhead/configure/BulkheadConfiguration.java @@ -102,7 +102,6 @@ private BulkheadRegistry createBulkheadRegistry( .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> bulkheadConfigurationProperties.createBulkheadConfig(entry.getValue(), compositeBulkheadCustomizer, entry.getKey()))); - return BulkheadRegistry.of(configs, bulkheadRegistryEventConsumer, io.vavr.collection.HashMap.ofAll(bulkheadConfigurationProperties.getTags())); } diff --git a/resilience4j-spring/src/main/java/io/github/resilience4j/bulkhead/configure/threadpool/ThreadPoolBulkheadConfiguration.java b/resilience4j-spring/src/main/java/io/github/resilience4j/bulkhead/configure/threadpool/ThreadPoolBulkheadConfiguration.java index c0aa53cffc..7e0e5e24d2 100644 --- a/resilience4j-spring/src/main/java/io/github/resilience4j/bulkhead/configure/threadpool/ThreadPoolBulkheadConfiguration.java +++ b/resilience4j-spring/src/main/java/io/github/resilience4j/bulkhead/configure/threadpool/ThreadPoolBulkheadConfiguration.java @@ -22,6 +22,7 @@ import io.github.resilience4j.common.CompositeCustomizer; import io.github.resilience4j.common.bulkhead.configuration.ThreadPoolBulkheadConfigCustomizer; import io.github.resilience4j.common.bulkhead.configuration.ThreadPoolBulkheadConfigurationProperties; +import io.github.resilience4j.common.bulkhead.configuration.ThreadPoolBulkheadConfigurationProperties.InstanceProperties; import io.github.resilience4j.consumer.EventConsumerRegistry; import io.github.resilience4j.core.registry.CompositeRegistryEventConsumer; import io.github.resilience4j.core.registry.RegistryEventConsumer; @@ -37,13 +38,14 @@ import java.util.Optional; import java.util.stream.Collectors; +import static java.util.Optional.ofNullable; + /** * {@link Configuration Configuration} for {@link io.github.resilience4j.bulkhead.ThreadPoolBulkhead} */ @Configuration public class ThreadPoolBulkheadConfiguration { - @Bean @Qualifier("compositeThreadPoolBulkheadCustomizer") public CompositeCustomizer compositeThreadPoolBulkheadCustomizer( @@ -51,7 +53,6 @@ public CompositeCustomizer compositeThreadPo return new CompositeCustomizer<>(customizers); } - /** * @param bulkheadConfigurationProperties bulk head spring configuration properties * @param bulkheadEventConsumerRegistry the bulk head event consumer registry @@ -101,7 +102,6 @@ private ThreadPoolBulkheadRegistry createBulkheadRegistry( entry -> threadPoolBulkheadConfigurationProperties .createThreadPoolBulkheadConfig(entry.getValue(), compositeThreadPoolBulkheadCustomizer, entry.getKey()))); - return ThreadPoolBulkheadRegistry.of(configs, threadPoolBulkheadRegistryEventConsumer, io.vavr.collection.HashMap.ofAll(threadPoolBulkheadConfigurationProperties.getTags())); } @@ -124,10 +124,9 @@ private void registerEventConsumer(ThreadPoolBulkheadRegistry bulkheadRegistry, private void registerEventConsumer(EventConsumerRegistry eventConsumerRegistry, ThreadPoolBulkhead bulkHead, ThreadPoolBulkheadConfigurationProperties bulkheadConfigurationProperties) { - int eventConsumerBufferSize = Optional - .ofNullable(bulkheadConfigurationProperties.getBackendProperties(bulkHead.getName())) + int eventConsumerBufferSize = ofNullable(bulkheadConfigurationProperties.getBackendProperties(bulkHead.getName())) .map( - ThreadPoolBulkheadConfigurationProperties.InstanceProperties::getEventConsumerBufferSize) + InstanceProperties::getEventConsumerBufferSize) .orElse(100); bulkHead.getEventPublisher().onEvent(eventConsumerRegistry.createEventConsumer( String.join("-", ThreadPoolBulkhead.class.getSimpleName(), bulkHead.getName()), diff --git a/resilience4j-spring/src/test/java/io/github/resilience4j/TestThreadLocalContextPropagator.java b/resilience4j-spring/src/test/java/io/github/resilience4j/TestThreadLocalContextPropagator.java new file mode 100644 index 0000000000..0b71d5d33d --- /dev/null +++ b/resilience4j-spring/src/test/java/io/github/resilience4j/TestThreadLocalContextPropagator.java @@ -0,0 +1,57 @@ +package io.github.resilience4j; + +import io.github.resilience4j.bulkhead.ContextPropagator; + +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static io.github.resilience4j.TestThreadLocalContextPropagator.TestThreadLocalContextHolder.*; + +public class TestThreadLocalContextPropagator implements ContextPropagator { + + @Override + public Supplier> retrieve() { + return () -> (Optional) get(); + } + + @Override + public Consumer> copy() { + return (t) -> t.ifPresent(e -> { + clear(); + put(e); + }); + } + + @Override + public Consumer> clear() { + return (t) -> TestThreadLocalContextHolder.clear(); + } + + public static class TestThreadLocalContextHolder { + + private static final ThreadLocal threadLocal = new ThreadLocal(); + + private TestThreadLocalContextHolder() { + } + + public static void put(Object context) { + if (threadLocal.get() != null) { + clear(); + } + threadLocal.set(context); + } + + public static void clear() { + if (threadLocal.get() != null) { + threadLocal.set(null); + threadLocal.remove(); + } + } + + public static Optional get() { + return Optional.ofNullable(threadLocal.get()); + } + } +} + diff --git a/resilience4j-spring/src/test/java/io/github/resilience4j/bulkhead/configure/BulkHeadConfigurationSpringTest.java b/resilience4j-spring/src/test/java/io/github/resilience4j/bulkhead/configure/BulkHeadConfigurationSpringTest.java index afb5579045..7a79dab546 100644 --- a/resilience4j-spring/src/test/java/io/github/resilience4j/bulkhead/configure/BulkHeadConfigurationSpringTest.java +++ b/resilience4j-spring/src/test/java/io/github/resilience4j/bulkhead/configure/BulkHeadConfigurationSpringTest.java @@ -15,12 +15,15 @@ */ package io.github.resilience4j.bulkhead.configure; +import io.github.resilience4j.TestThreadLocalContextPropagator; import io.github.resilience4j.bulkhead.BulkheadRegistry; import io.github.resilience4j.bulkhead.ThreadPoolBulkheadRegistry; import io.github.resilience4j.bulkhead.event.BulkheadEvent; +import io.github.resilience4j.common.bulkhead.configuration.ThreadPoolBulkheadConfigurationProperties; import io.github.resilience4j.consumer.DefaultEventConsumerRegistry; import io.github.resilience4j.consumer.EventConsumerRegistry; import io.github.resilience4j.fallback.FallbackDecorators; +import org.assertj.core.api.Assertions; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; @@ -32,8 +35,8 @@ import java.util.List; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; +import static org.assertj.core.api.Assertions.*; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = { @@ -49,8 +52,13 @@ public void testAllCircuitBreakerConfigurationBeansOverridden() { assertNotNull(configWithOverrides.bulkheadRegistry); assertNotNull(configWithOverrides.bulkheadAspect); assertNotNull(configWithOverrides.bulkheadConfigurationProperties); + assertNotNull(configWithOverrides.threadPoolBulkheadConfigurationProperties); assertNotNull(configWithOverrides.bulkheadEventEventConsumerRegistry); assertNotNull(configWithOverrides.threadPoolBulkheadRegistry); + assertTrue(configWithOverrides.threadPoolBulkheadConfigurationProperties.getConfigs().containsKey("sharedBackend")); + assertThat(configWithOverrides.threadPoolBulkheadConfigurationProperties.getConfigs().get("sharedBackend").getContextPropagators()).isNotNull(); + assertThat(configWithOverrides.threadPoolBulkheadConfigurationProperties.getConfigs().get("sharedBackend").getContextPropagators().length).isEqualTo(1); + assertEquals(configWithOverrides.threadPoolBulkheadConfigurationProperties.getConfigs().get("sharedBackend").getContextPropagators()[0], TestThreadLocalContextPropagator.class); assertTrue(configWithOverrides.bulkheadConfigurationProperties.getConfigs().size() == 1); } @@ -68,6 +76,8 @@ public static class ConfigWithOverrides { private BulkheadConfigurationProperties bulkheadConfigurationProperties; + private ThreadPoolBulkheadConfigurationProperties threadPoolBulkheadConfigurationProperties; + @Bean public ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry() { threadPoolBulkheadRegistry = ThreadPoolBulkheadRegistry.ofDefaults(); @@ -103,6 +113,12 @@ public BulkheadConfigurationProperties bulkheadConfigurationProperties() { return bulkheadConfigurationProperties; } + @Bean + public ThreadPoolBulkheadConfigurationProperties threadpoolBulkheadConfigurationProperties() { + threadPoolBulkheadConfigurationProperties = new ThreadPoolBulkheadConfigurationPropertiesTest(); + return threadPoolBulkheadConfigurationProperties; + } + private class BulkheadConfigurationPropertiesTest extends BulkheadConfigurationProperties { BulkheadConfigurationPropertiesTest() { @@ -113,7 +129,15 @@ private class BulkheadConfigurationPropertiesTest extends BulkheadConfigurationP } } - } + private class ThreadPoolBulkheadConfigurationPropertiesTest extends ThreadPoolBulkheadConfigurationProperties { + ThreadPoolBulkheadConfigurationPropertiesTest() { + InstanceProperties instanceProperties = new InstanceProperties(); + instanceProperties.setContextPropagator(TestThreadLocalContextPropagator.class); + getConfigs().put("sharedBackend", instanceProperties); + } + + } + } } \ No newline at end of file diff --git a/resilience4j-spring/src/test/java/io/github/resilience4j/bulkhead/configure/BulkHeadConfigurationTest.java b/resilience4j-spring/src/test/java/io/github/resilience4j/bulkhead/configure/BulkHeadConfigurationTest.java index 5258b4160a..bf8d75b0ff 100644 --- a/resilience4j-spring/src/test/java/io/github/resilience4j/bulkhead/configure/BulkHeadConfigurationTest.java +++ b/resilience4j-spring/src/test/java/io/github/resilience4j/bulkhead/configure/BulkHeadConfigurationTest.java @@ -1,9 +1,7 @@ package io.github.resilience4j.bulkhead.configure; -import io.github.resilience4j.bulkhead.Bulkhead; -import io.github.resilience4j.bulkhead.BulkheadRegistry; -import io.github.resilience4j.bulkhead.ThreadPoolBulkhead; -import io.github.resilience4j.bulkhead.ThreadPoolBulkheadRegistry; +import io.github.resilience4j.TestThreadLocalContextPropagator; +import io.github.resilience4j.bulkhead.*; import io.github.resilience4j.bulkhead.configure.threadpool.ThreadPoolBulkheadConfiguration; import io.github.resilience4j.bulkhead.event.BulkheadEvent; import io.github.resilience4j.common.CompositeCustomizer; @@ -15,6 +13,8 @@ import java.time.Duration; import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static org.assertj.core.api.Assertions.assertThat; @@ -30,6 +30,7 @@ public void tesFixedThreadPoolBulkHeadRegistry() { //Given ThreadPoolBulkheadConfigurationProperties.InstanceProperties backendProperties1 = new ThreadPoolBulkheadConfigurationProperties.InstanceProperties(); backendProperties1.setCoreThreadPoolSize(1); + backendProperties1.setContextPropagator(TestThreadLocalContextPropagator.class); ThreadPoolBulkheadConfigurationProperties.InstanceProperties backendProperties2 = new ThreadPoolBulkheadConfigurationProperties.InstanceProperties(); backendProperties2.setCoreThreadPoolSize(2); @@ -52,10 +53,16 @@ public void tesFixedThreadPoolBulkHeadRegistry() { ThreadPoolBulkhead bulkhead1 = bulkheadRegistry.bulkhead("backend1"); assertThat(bulkhead1).isNotNull(); assertThat(bulkhead1.getBulkheadConfig().getCoreThreadPoolSize()).isEqualTo(1); + assertThat(bulkhead1.getBulkheadConfig().getContextPropagator()).isNotNull(); + assertThat(bulkhead1.getBulkheadConfig().getContextPropagator().size()).isEqualTo(1); + assertThat(bulkhead1.getBulkheadConfig().getContextPropagator().get(0).getClass()) + .isEqualTo(TestThreadLocalContextPropagator.class); ThreadPoolBulkhead bulkhead2 = bulkheadRegistry.bulkhead("backend2"); assertThat(bulkhead2).isNotNull(); assertThat(bulkhead2.getBulkheadConfig().getCoreThreadPoolSize()).isEqualTo(2); + assertThat(bulkhead2.getBulkheadConfig().getContextPropagator()).isNotNull(); + assertThat(bulkhead2.getBulkheadConfig().getContextPropagator()).isEmpty(); assertThat(eventConsumerRegistry.getAllEventConsumer()).hasSize(2); } @@ -72,6 +79,7 @@ public void testCreateThreadPoolBulkHeadRegistryWithSharedConfigs() { ThreadPoolBulkheadConfigurationProperties.InstanceProperties sharedProperties = new ThreadPoolBulkheadConfigurationProperties.InstanceProperties(); sharedProperties.setCoreThreadPoolSize(2); sharedProperties.setQueueCapacity(2); + sharedProperties.setContextPropagator(TestThreadLocalContextPropagator.class); ThreadPoolBulkheadConfigurationProperties.InstanceProperties backendWithDefaultConfig = new ThreadPoolBulkheadConfigurationProperties.InstanceProperties(); backendWithDefaultConfig.setBaseConfig("default"); @@ -80,6 +88,7 @@ public void testCreateThreadPoolBulkHeadRegistryWithSharedConfigs() { ThreadPoolBulkheadConfigurationProperties.InstanceProperties backendWithSharedConfig = new ThreadPoolBulkheadConfigurationProperties.InstanceProperties(); backendWithSharedConfig.setBaseConfig("sharedConfig"); backendWithSharedConfig.setCoreThreadPoolSize(4); + backendWithSharedConfig.setContextPropagator(TestThreadLocalContextPropagator.class); ThreadPoolBulkheadConfigurationProperties bulkheadConfigurationProperties = new ThreadPoolBulkheadConfigurationProperties(); bulkheadConfigurationProperties.getConfigs().put("default", defaultProperties); @@ -106,15 +115,31 @@ public void testCreateThreadPoolBulkHeadRegistryWithSharedConfigs() { assertThat(bulkhead1).isNotNull(); assertThat(bulkhead1.getBulkheadConfig().getCoreThreadPoolSize()).isEqualTo(3); assertThat(bulkhead1.getBulkheadConfig().getQueueCapacity()).isEqualTo(1); + assertThat(bulkhead1.getBulkheadConfig().getContextPropagator()).isNotNull(); + assertThat(bulkhead1.getBulkheadConfig().getContextPropagator()).isEmpty(); + // Should get shared config and overwrite core number ThreadPoolBulkhead bulkhead2 = bulkheadRegistry.bulkhead("backendWithSharedConfig"); assertThat(bulkhead2).isNotNull(); assertThat(bulkhead2.getBulkheadConfig().getCoreThreadPoolSize()).isEqualTo(4); assertThat(bulkhead2.getBulkheadConfig().getQueueCapacity()).isEqualTo(2); + assertThat(bulkhead2.getBulkheadConfig().getContextPropagator()).isNotNull(); + assertThat(bulkhead2.getBulkheadConfig().getContextPropagator().size()).isEqualTo(2); + List> ctxPropagators = bulkhead2 + .getBulkheadConfig().getContextPropagator().stream().map(ctx -> ctx.getClass()) + .collect( + Collectors.toList()); + assertThat(ctxPropagators).containsExactlyInAnyOrder(TestThreadLocalContextPropagator.class, + TestThreadLocalContextPropagator.class); + + // Unknown backend should get default config of Registry ThreadPoolBulkhead bulkhead3 = bulkheadRegistry.bulkhead("unknownBackend"); assertThat(bulkhead3).isNotNull(); assertThat(bulkhead3.getBulkheadConfig().getCoreThreadPoolSize()).isEqualTo(1); + assertThat(bulkhead3.getBulkheadConfig().getContextPropagator()).isNotNull(); + assertThat(bulkhead3.getBulkheadConfig().getContextPropagator()).isEmpty(); + assertThat(eventConsumerRegistry.getAllEventConsumer()).hasSize(3); } catch (Exception e) { System.out.println( diff --git a/resilience4j-spring/src/test/java/io/github/resilience4j/bulkhead/configure/BulkheadBuilderCustomizerTest.java b/resilience4j-spring/src/test/java/io/github/resilience4j/bulkhead/configure/BulkheadBuilderCustomizerTest.java new file mode 100644 index 0000000000..df91091005 --- /dev/null +++ b/resilience4j-spring/src/test/java/io/github/resilience4j/bulkhead/configure/BulkheadBuilderCustomizerTest.java @@ -0,0 +1,244 @@ +package io.github.resilience4j.bulkhead.configure; + +import io.github.resilience4j.TestThreadLocalContextPropagator; +import io.github.resilience4j.bulkhead.*; +import io.github.resilience4j.bulkhead.configure.threadpool.ThreadPoolBulkheadConfiguration; +import io.github.resilience4j.bulkhead.event.BulkheadEvent; +import io.github.resilience4j.common.CompositeCustomizer; +import io.github.resilience4j.common.bulkhead.configuration.BulkheadConfigCustomizer; +import io.github.resilience4j.common.bulkhead.configuration.ThreadPoolBulkheadConfigCustomizer; +import io.github.resilience4j.common.bulkhead.configuration.ThreadPoolBulkheadConfigurationProperties; +import io.github.resilience4j.consumer.DefaultEventConsumerRegistry; +import io.github.resilience4j.consumer.EventConsumerRegistry; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.springframework.test.util.ReflectionTestUtils.getField; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = {BulkheadBuilderCustomizerTest.Config.class}) +public class BulkheadBuilderCustomizerTest { + + @Autowired + private ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry; + + @Autowired + private BulkheadRegistry bulkheadRegistry; + + @Autowired + @Qualifier("compositeBulkheadCustomizer") + private CompositeCustomizer compositeBulkheadCustomizer; + + @Autowired + @Qualifier("compositeThreadPoolBulkheadCustomizer") + private CompositeCustomizer compositeThreadPoolBulkheadCustomizer; + + + @Test + public void testThreadPoolBulkheadCustomizer() { + + assertThat(threadPoolBulkheadRegistry).isNotNull(); + assertThat(compositeThreadPoolBulkheadCustomizer).isNotNull(); + + //All Customizer bean should be added to CompositeBuilderCustomizer as its primary bean. + Map map = (Map) getField( + compositeThreadPoolBulkheadCustomizer, + "customizerMap"); + assertThat(map).isNotNull(); + assertThat(map).hasSize(2).containsKeys("backendB", "backendD"); + + //This test context propagator set to config by properties. R4J will invoke default + // constructor of ContextPropagator class using reflection. + //downside is that no dependencies can be added to ContextPropagators class + ThreadPoolBulkhead bulkheadA = threadPoolBulkheadRegistry.bulkhead("bulkheadA", "backendA"); + assertThat(bulkheadA).isNotNull(); + assertThat(bulkheadA.getBulkheadConfig().getCoreThreadPoolSize()).isEqualTo(2); + assertThat(bulkheadA.getBulkheadConfig().getMaxThreadPoolSize()).isEqualTo(4); + assertThat(bulkheadA.getBulkheadConfig().getContextPropagator()).hasSize(1); + assertThat(bulkheadA.getBulkheadConfig().getContextPropagator().get(0)) + .isInstanceOf(TestThreadLocalContextPropagator.class); + + //This test context propagator bean set to config using Customizer interface via SpringContext + ThreadPoolBulkhead bulkheadB = threadPoolBulkheadRegistry.bulkhead("bulkheadB", "backendB"); + assertThat(bulkheadB).isNotNull(); + assertThat(bulkheadB.getBulkheadConfig().getCoreThreadPoolSize()).isEqualTo(2); + assertThat(bulkheadB.getBulkheadConfig().getMaxThreadPoolSize()).isEqualTo(4); + assertThat(bulkheadB.getBulkheadConfig().getContextPropagator()).hasSize(1); + assertThat(bulkheadB.getBulkheadConfig().getContextPropagator().get(0)) + .isInstanceOf(BeanContextPropagator.class); + + //This test has no context propagator + ThreadPoolBulkhead bulkheadC = threadPoolBulkheadRegistry.bulkhead("bulkheadC", "backendC"); + assertThat(bulkheadC).isNotNull(); + assertThat(bulkheadC.getBulkheadConfig().getCoreThreadPoolSize()).isEqualTo(2); + assertThat(bulkheadC.getBulkheadConfig().getMaxThreadPoolSize()).isEqualTo(4); + assertThat(bulkheadC.getBulkheadConfig().getContextPropagator()).hasSize(0); + + //This test context propagator bean set to config using Customizer interface via SpringContext + ThreadPoolBulkhead bulkheadD = threadPoolBulkheadRegistry.bulkhead("bulkheadD", "backendD"); + assertThat(bulkheadD).isNotNull(); + assertThat(bulkheadD.getBulkheadConfig().getCoreThreadPoolSize()).isEqualTo(2); + assertThat(bulkheadD.getBulkheadConfig().getMaxThreadPoolSize()).isEqualTo(4); + assertThat(bulkheadD.getBulkheadConfig().getContextPropagator()).hasSize(1); + assertThat(bulkheadD.getBulkheadConfig().getContextPropagator().get(0)) + .isInstanceOf(BeanContextPropagator.class); + + } + + @Test + public void testBulkheadCustomizer() { + + assertThat(bulkheadRegistry).isNotNull(); + assertThat(compositeBulkheadCustomizer).isNotNull(); + + //All Customizer bean should be added to CompositeBuilderCustomizer as its primary bean. + Map map = (Map) getField( + compositeBulkheadCustomizer, + "customizerMap"); + assertThat(map).isNotNull(); + assertThat(map).hasSize(1).containsKeys("backendOne"); + + //This config is changed programmatically + Bulkhead bulkheadOne = bulkheadRegistry.bulkhead("bulkheadOne", "backendOne"); + assertThat(bulkheadOne).isNotNull(); + assertThat(bulkheadOne.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(20); + assertThat(bulkheadOne.getBulkheadConfig().getMaxWaitDuration()) + .isEqualTo(Duration.ofSeconds(1)); + + Bulkhead bulkheadTwo = bulkheadRegistry.bulkhead("bulkheadTwo", "backendTwo"); + assertThat(bulkheadTwo).isNotNull(); + assertThat(bulkheadTwo.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(10); + assertThat(bulkheadTwo.getBulkheadConfig().getMaxWaitDuration()) + .isEqualTo(Duration.ofSeconds(1)); + + } + + + @Configuration + @Import({ThreadPoolBulkheadConfiguration.class, BulkheadConfiguration.class}) + static class Config { + + + @Bean + public EventConsumerRegistry eventConsumerRegistry() { + return new DefaultEventConsumerRegistry<>(); + } + + @Bean + public ThreadPoolBulkheadConfigCustomizer customizerB( + List contextPropagators) { + return ThreadPoolBulkheadConfigCustomizer.of("backendB", builder -> + builder.contextPropagator( + contextPropagators.toArray(new ContextPropagator[contextPropagators.size()]))); + } + + @Bean + public ThreadPoolBulkheadConfigCustomizer customizerD( + List contextPropagators) { + return ThreadPoolBulkheadConfigCustomizer.of("backendD", builder -> + builder.contextPropagator( + contextPropagators.toArray(new ContextPropagator[contextPropagators.size()]))); + } + + @Bean + public BulkheadConfigCustomizer customizerOne( + List contextPropagators) { + return BulkheadConfigCustomizer.of("backendOne", builder -> + builder.maxConcurrentCalls(20)); + } + + @Bean + public BeanContextPropagator beanContextPropagator() { + return new BeanContextPropagator(); + } + + @Bean + public ThreadPoolBulkheadConfigurationProperties threadPoolBulkheadConfigurationProperties() { + return new ThreadPoolBulkheadConfigurationPropertiesTest(); + } + + @Bean + public BulkheadConfigurationProperties bulkheadConfigurationProperties() { + return new BulkheadConfigurationPropertiesTest(); + } + + private class ThreadPoolBulkheadConfigurationPropertiesTest extends + ThreadPoolBulkheadConfigurationProperties { + + ThreadPoolBulkheadConfigurationPropertiesTest() { + InstanceProperties properties1 = new InstanceProperties(); + properties1.setCoreThreadPoolSize(2); + properties1.setMaxThreadPoolSize(4); + properties1.setContextPropagator(TestThreadLocalContextPropagator.class); + getConfigs().put("backendA", properties1); + + InstanceProperties properties2 = new InstanceProperties(); + properties2.setCoreThreadPoolSize(2); + properties2.setMaxThreadPoolSize(4); + getConfigs().put("backendB", properties2); + + InstanceProperties properties3 = new InstanceProperties(); + properties3.setCoreThreadPoolSize(2); + properties3.setMaxThreadPoolSize(4); + getConfigs().put("backendC", properties3); + + InstanceProperties properties4 = new InstanceProperties(); + properties4.setCoreThreadPoolSize(2); + properties4.setMaxThreadPoolSize(4); + getConfigs().put("backendD", properties3); + } + + } + + private class BulkheadConfigurationPropertiesTest extends + BulkheadConfigurationProperties { + + BulkheadConfigurationPropertiesTest() { + InstanceProperties properties1 = new InstanceProperties(); + properties1.setMaxConcurrentCalls(10); + properties1.setMaxWaitDuration(Duration.ofSeconds(1)); + getConfigs().put("backendOne", properties1); + + InstanceProperties properties2 = new InstanceProperties(); + properties2.setMaxConcurrentCalls(10); + properties2.setMaxWaitDuration(Duration.ofSeconds(1)); + getConfigs().put("backendTwo", properties2); + } + } + } + + public static class BeanContextPropagator implements ContextPropagator { + + @Override + public Supplier> retrieve() { + return () -> Optional.empty(); + } + + @Override + public Consumer> copy() { + return (t) -> { + }; + } + + @Override + public Consumer> clear() { + return (t) -> { + }; + } + } +}