diff --git a/src/main/java/io/lettuce/core/resource/ClientResources.java b/src/main/java/io/lettuce/core/resource/ClientResources.java index 90e8f7e972..ba9a95ee45 100644 --- a/src/main/java/io/lettuce/core/resource/ClientResources.java +++ b/src/main/java/io/lettuce/core/resource/ClientResources.java @@ -56,12 +56,23 @@ public interface ClientResources { /** * Create a new {@link ClientResources} using default settings. * - * @return a new instance of a default client resources. + * @return a new instance of default client resources. */ static ClientResources create() { return DefaultClientResources.create(); } + /** + * Create a new {@link ClientResources} using default settings. + * + * @param threadFactoryProvider provides a {@link java.util.concurrent.ThreadFactory} to create threads. + * @return a new instance of default client resources. + * @since 6.0.4 + */ + static ClientResources create(ThreadFactoryProvider threadFactoryProvider) { + return DefaultClientResources.builder().threadFactoryProvider(threadFactoryProvider).build(); + } + /** * Create a new {@link ClientResources} using default settings. * @@ -219,6 +230,23 @@ default Builder commandLatencyCollector(CommandLatencyCollector commandLatencyCo */ Builder socketAddressResolver(SocketAddressResolver socketAddressResolver); + /** + * Provide a default {@link ThreadFactoryProvider} to obtain {@link java.util.concurrent.ThreadFactory} for a + * {@code poolName} to create threads. + *

+ * Applies only to threading resources created by {@link ClientResources} when not configuring {@link #timer()}, + * {@link #eventExecutorGroup()}, or {@link #eventLoopGroupProvider()}. + * + * @param threadFactoryProvider a provider to obtain a {@link java.util.concurrent.ThreadFactory} for a + * {@code poolName}, must not be {@code null}. + * @return {@code this} {@link Builder}. + * @since 6.0.4 + * @see #eventExecutorGroup(EventExecutorGroup) + * @see #eventLoopGroupProvider(EventLoopGroupProvider) + * @see #timer(Timer) + */ + Builder threadFactoryProvider(ThreadFactoryProvider threadFactoryProvider); + /** * Sets a shared {@link Timer} that can be used across different instances of {@link io.lettuce.core.RedisClient} and * {@link io.lettuce.core.cluster.RedisClusterClient} The provided {@link Timer} instance will not be shut down when diff --git a/src/main/java/io/lettuce/core/resource/DefaultClientResources.java b/src/main/java/io/lettuce/core/resource/DefaultClientResources.java index e83bb79e00..a3c0766ab3 100644 --- a/src/main/java/io/lettuce/core/resource/DefaultClientResources.java +++ b/src/main/java/io/lettuce/core/resource/DefaultClientResources.java @@ -27,12 +27,22 @@ import io.lettuce.core.event.metrics.MetricEventPublisher; import io.lettuce.core.internal.LettuceAssert; import io.lettuce.core.internal.LettuceLists; -import io.lettuce.core.metrics.*; +import io.lettuce.core.metrics.CommandLatencyCollector; +import io.lettuce.core.metrics.CommandLatencyCollectorOptions; +import io.lettuce.core.metrics.CommandLatencyRecorder; +import io.lettuce.core.metrics.DefaultCommandLatencyCollector; +import io.lettuce.core.metrics.DefaultCommandLatencyCollectorOptions; +import io.lettuce.core.metrics.MetricCollector; import io.lettuce.core.resource.Delay.StatefulDelay; import io.lettuce.core.tracing.Tracing; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; -import io.netty.util.concurrent.*; +import io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.netty.util.concurrent.DefaultPromise; +import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.ImmediateEventExecutor; +import io.netty.util.concurrent.PromiseCombiner; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -57,6 +67,8 @@ *

  • a {@code dnsResolver} which is a provided instance of {@link DnsResolver}.
  • *
  • a {@code nettyCustomizer} that is a provided instance of {@link NettyCustomizer}.
  • *
  • a {@code socketAddressResolver} which is a provided instance of {@link SocketAddressResolver}.
  • + *
  • a {@code threadFactoryProvider} to provide a {@link java.util.concurrent.ThreadFactory} for default timer, event loop and + * event executor instances.
  • *
  • a {@code timer} that is a provided instance of {@link io.netty.util.HashedWheelTimer}.
  • *
  • a {@code tracing} that is a provided instance of {@link Tracing}.
  • * @@ -130,6 +142,8 @@ public class DefaultClientResources implements ClientResources { private final SocketAddressResolver socketAddressResolver; + private final ThreadFactoryProvider threadFactoryProvider; + private final Timer timer; private final boolean sharedTimer; @@ -140,6 +154,8 @@ public class DefaultClientResources implements ClientResources { protected DefaultClientResources(Builder builder) { + threadFactoryProvider = builder.threadFactoryProvider; + if (builder.eventLoopGroupProvider == null) { int ioThreadPoolSize = builder.ioThreadPoolSize; @@ -150,7 +166,7 @@ protected DefaultClientResources(Builder builder) { } this.sharedEventLoopGroupProvider = false; - this.eventLoopGroupProvider = new DefaultEventLoopGroupProvider(ioThreadPoolSize); + this.eventLoopGroupProvider = new DefaultEventLoopGroupProvider(ioThreadPoolSize, threadFactoryProvider); } else { this.sharedEventLoopGroupProvider = builder.sharedEventLoopGroupProvider; @@ -167,7 +183,7 @@ protected DefaultClientResources(Builder builder) { } eventExecutorGroup = DefaultEventLoopGroupProvider.createEventLoopGroup(DefaultEventExecutorGroup.class, - computationThreadPoolSize); + computationThreadPoolSize, threadFactoryProvider); sharedEventExecutor = false; } else { sharedEventExecutor = builder.sharedEventExecutor; @@ -175,7 +191,7 @@ protected DefaultClientResources(Builder builder) { } if (builder.timer == null) { - timer = new HashedWheelTimer(new DefaultThreadFactory("lettuce-timer")); + timer = new HashedWheelTimer(threadFactoryProvider.getThreadFactory("lettuce-timer")); sharedTimer = false; } else { timer = builder.timer; @@ -293,6 +309,8 @@ public static class Builder implements ClientResources.Builder { private boolean sharedTimer; + private ThreadFactoryProvider threadFactoryProvider = DefaultThreadFactoryProvider.INSTANCE; + private Timer timer; private Tracing tracing = Tracing.disabled(); @@ -522,6 +540,30 @@ public ClientResources.Builder socketAddressResolver(SocketAddressResolver socke return this; } + /** + * Provide a default {@link ThreadFactoryProvider} to obtain {@link java.util.concurrent.ThreadFactory} for a + * {@code poolName}. + *

    + * Applies only to threading resources created by {@link DefaultClientResources} when not configuring {@link #timer()}, + * {@link #eventExecutorGroup()}, or {@link #eventLoopGroupProvider()}. + * + * @param threadFactoryProvider a provider to obtain a {@link java.util.concurrent.ThreadFactory} for a + * {@code poolName}, must not be {@code null}. + * @return {@code this} {@link ClientResources.Builder}. + * @since 6.0.4 + * @see #eventExecutorGroup(EventExecutorGroup) + * @see #eventLoopGroupProvider(EventLoopGroupProvider) + * @see #timer(Timer) + */ + @Override + public ClientResources.Builder threadFactoryProvider(ThreadFactoryProvider threadFactoryProvider) { + + LettuceAssert.notNull(threadFactoryProvider, "ThreadFactoryProvider must not be null"); + + this.threadFactoryProvider = threadFactoryProvider; + return this; + } + /** * Sets a shared {@link Timer} that can be used across different instances of {@link io.lettuce.core.RedisClient} and * {@link io.lettuce.core.cluster.RedisClusterClient} The provided {@link Timer} instance will not be shut down when @@ -592,8 +634,8 @@ public DefaultClientResources.Builder mutate() { builder.commandLatencyRecorder(commandLatencyRecorder()) .commandLatencyPublisherOptions(commandLatencyPublisherOptions()).dnsResolver(dnsResolver()) .eventBus(eventBus()).eventExecutorGroup(eventExecutorGroup()).reconnectDelay(reconnectDelay) - .socketAddressResolver(socketAddressResolver()).nettyCustomizer(nettyCustomizer()).timer(timer()) - .tracing(tracing()); + .socketAddressResolver(socketAddressResolver()).nettyCustomizer(nettyCustomizer()) + .threadFactoryProvider(threadFactoryProvider).timer(timer()).tracing(tracing()); builder.sharedCommandLatencyCollector = sharedEventLoopGroupProvider; builder.sharedEventExecutor = sharedEventExecutor; diff --git a/src/main/java/io/lettuce/core/resource/DefaultEventLoopGroupProvider.java b/src/main/java/io/lettuce/core/resource/DefaultEventLoopGroupProvider.java index 7795f329ce..39a5b41631 100644 --- a/src/main/java/io/lettuce/core/resource/DefaultEventLoopGroupProvider.java +++ b/src/main/java/io/lettuce/core/resource/DefaultEventLoopGroupProvider.java @@ -51,7 +51,7 @@ public class DefaultEventLoopGroupProvider implements EventLoopGroupProvider { private final int numberOfThreads; - private final ThreadFactoryProvider threadFactoryProvider; + private final io.lettuce.core.resource.ThreadFactoryProvider threadFactoryProvider; private volatile boolean shutdownCalled = false; @@ -80,6 +80,23 @@ public DefaultEventLoopGroupProvider(int numberOfThreads, ThreadFactoryProvider this.threadFactoryProvider = threadFactoryProvider; } + /** + * Creates a new instance of {@link DefaultEventLoopGroupProvider}. + * + * @param numberOfThreads number of threads (pool size) + * @param threadFactoryProvider provides access to {@link io.lettuce.core.resource.ThreadFactoryProvider}. + * @since 6.0.4 + */ + public DefaultEventLoopGroupProvider(int numberOfThreads, + io.lettuce.core.resource.ThreadFactoryProvider threadFactoryProvider) { + + LettuceAssert.isTrue(numberOfThreads > 0, "Number of threads must be greater than zero"); + LettuceAssert.notNull(threadFactoryProvider, "ThreadFactoryProvider must not be null"); + + this.numberOfThreads = numberOfThreads; + this.threadFactoryProvider = threadFactoryProvider; + } + @Override public T allocate(Class type) { @@ -162,7 +179,7 @@ private T getOrCreate(Class type) { * @since 6.0 */ protected EventExecutorGroup doCreateEventLoopGroup(Class type, int numberOfThreads, - ThreadFactoryProvider threadFactoryProvider) { + io.lettuce.core.resource.ThreadFactoryProvider threadFactoryProvider) { return createEventLoopGroup(type, numberOfThreads, threadFactoryProvider); } @@ -201,8 +218,8 @@ public static EventExecutorGroup createEventLoopG * @throws IllegalArgumentException if the {@code type} is not supported. * @since 5.3 */ - private static EventExecutorGroup createEventLoopGroup(Class type, int numberOfThreads, - ThreadFactoryProvider factoryProvider) { + static EventExecutorGroup createEventLoopGroup(Class type, int numberOfThreads, + io.lettuce.core.resource.ThreadFactoryProvider factoryProvider) { logger.debug("Creating executor {}", type.getName()); @@ -304,7 +321,7 @@ public Future shutdown(long quietPeriod, long timeout, TimeUnit timeUni * * @since 6.0 */ - public interface ThreadFactoryProvider { + public interface ThreadFactoryProvider extends io.lettuce.core.resource.ThreadFactoryProvider { /** * Return a {@link ThreadFactory} for the given {@code poolName}. @@ -312,18 +329,8 @@ public interface ThreadFactoryProvider { * @param poolName a descriptive pool name. Typically used as prefix for thread names. * @return the {@link ThreadFactory}. */ - ThreadFactory getThreadFactory(String poolName); - - } - - enum DefaultThreadFactoryProvider implements ThreadFactoryProvider { - - INSTANCE; - @Override - public ThreadFactory getThreadFactory(String poolName) { - return new DefaultThreadFactory(poolName, true); - } + ThreadFactory getThreadFactory(String poolName); } diff --git a/src/main/java/io/lettuce/core/resource/DefaultThreadFactoryProvider.java b/src/main/java/io/lettuce/core/resource/DefaultThreadFactoryProvider.java new file mode 100644 index 0000000000..fdb39011f7 --- /dev/null +++ b/src/main/java/io/lettuce/core/resource/DefaultThreadFactoryProvider.java @@ -0,0 +1,36 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.resource; + +import java.util.concurrent.ThreadFactory; + +import io.netty.util.concurrent.DefaultThreadFactory; + +/** + * Default {@link ThreadFactoryProvider} implementation. + * + * @author Mark Paluch + */ +enum DefaultThreadFactoryProvider implements ThreadFactoryProvider { + + INSTANCE; + + @Override + public ThreadFactory getThreadFactory(String poolName) { + return new DefaultThreadFactory(poolName, true); + } + +} diff --git a/src/main/java/io/lettuce/core/resource/ThreadFactoryProvider.java b/src/main/java/io/lettuce/core/resource/ThreadFactoryProvider.java new file mode 100644 index 0000000000..df4f38e170 --- /dev/null +++ b/src/main/java/io/lettuce/core/resource/ThreadFactoryProvider.java @@ -0,0 +1,37 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.resource; + +import java.util.concurrent.ThreadFactory; + +/** + * Interface to provide a custom {@link java.util.concurrent.ThreadFactory}. Implementations are asked through + * {@link #getThreadFactory(String)} to provide a thread factory for a given pool name. + * + * @since 6.0.4 + */ +@FunctionalInterface +public interface ThreadFactoryProvider { + + /** + * Return a {@link ThreadFactory} for the given {@code poolName}. + * + * @param poolName a descriptive pool name. Typically used as prefix for thread names. + * @return the {@link ThreadFactory}. + */ + ThreadFactory getThreadFactory(String poolName); + +} diff --git a/src/test/java/io/lettuce/core/resource/DefaultClientResourcesUnitTests.java b/src/test/java/io/lettuce/core/resource/DefaultClientResourcesUnitTests.java index 8d38890f17..f482f20931 100644 --- a/src/test/java/io/lettuce/core/resource/DefaultClientResourcesUnitTests.java +++ b/src/test/java/io/lettuce/core/resource/DefaultClientResourcesUnitTests.java @@ -15,15 +15,14 @@ */ package io.lettuce.core.resource; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.verifyZeroInteractions; +import static org.assertj.core.api.Assertions.*; +import static org.mockito.Mockito.*; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import io.lettuce.test.Wait; import org.junit.jupiter.api.Test; import reactor.test.StepVerifier; @@ -36,6 +35,7 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; +import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Future; @@ -258,4 +258,56 @@ void considersDecoupledSharedStateFromMutation() { copyTimer.stop(); timer.stop(); } + + @Test + void shouldApplyThreadFactory() { + + ClientResources clientResources = ClientResources.builder().threadFactoryProvider(name -> runnable -> { + return new MyThread(runnable, name); + }).ioThreadPoolSize(2).computationThreadPoolSize(2).build(); + + HashedWheelTimer hwt = (HashedWheelTimer) clientResources.timer(); + assertThat(hwt).extracting("workerThread").isInstanceOf(MyThread.class); + + AtomicReference eventExecutorThread = new AtomicReference<>(); + EventExecutor eventExecutor = clientResources.eventExecutorGroup().next(); + eventExecutor.submit(() -> eventExecutorThread.set(Thread.currentThread())).awaitUninterruptibly(); + + AtomicReference eventLoopThread = new AtomicReference<>(); + NioEventLoopGroup eventLoopGroup = clientResources.eventLoopGroupProvider().allocate(NioEventLoopGroup.class); + eventLoopGroup.next().submit(() -> eventLoopThread.set(Thread.currentThread())).awaitUninterruptibly(); + + clientResources.eventLoopGroupProvider().release(eventLoopGroup, 0, 0, TimeUnit.SECONDS); + + clientResources.shutdown(0, 0, TimeUnit.SECONDS); + + assertThat(MyThread.started).hasValue(5); + + Wait.untilEquals(5, () -> MyThread.finished).waitOrTimeout(); + } + + static class MyThread extends Thread { + + public static AtomicInteger started = new AtomicInteger(); + + public static AtomicInteger finished = new AtomicInteger(); + + public MyThread(Runnable target, String name) { + super(target, name); + } + + @Override + public synchronized void start() { + started.incrementAndGet(); + super.start(); + } + + @Override + public void run() { + super.run(); + finished.incrementAndGet(); + } + + } + }