Skip to content

Commit

Permalink
Allow configuring a default ThreadFactoryProvider to create ClientRes…
Browse files Browse the repository at this point in the history
…ources #1711
  • Loading branch information
mp911de committed Apr 7, 2021
1 parent b8433ba commit 9d0b51c
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 23 deletions.
30 changes: 29 additions & 1 deletion src/main/java/io/lettuce/core/resource/ClientResources.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,23 @@ public interface ClientResources {
/**
* Create a new {@link ClientResources} using default settings.
*
* @return a new instance of a default client resources.
* @return a new instance of default client resources.
*/
static ClientResources create() {
return DefaultClientResources.create();
}

/**
* Create a new {@link ClientResources} using default settings.
*
* @param threadFactoryProvider provides a {@link java.util.concurrent.ThreadFactory} to create threads.
* @return a new instance of default client resources.
* @since 6.1.1
*/
static ClientResources create(ThreadFactoryProvider threadFactoryProvider) {
return DefaultClientResources.builder().threadFactoryProvider(threadFactoryProvider).build();
}

/**
* Create a new {@link ClientResources} using default settings.
*
Expand Down Expand Up @@ -237,6 +248,23 @@ default Builder commandLatencyCollector(CommandLatencyCollector commandLatencyCo
*/
Builder socketAddressResolver(SocketAddressResolver socketAddressResolver);

/**
* Provide a default {@link ThreadFactoryProvider} to obtain {@link java.util.concurrent.ThreadFactory} for a
* {@code poolName} to create threads.
* <p>
* Applies only to threading resources created by {@link ClientResources} when not configuring {@link #timer()},
* {@link #eventExecutorGroup()}, or {@link #eventLoopGroupProvider()}.
*
* @param threadFactoryProvider a provider to obtain a {@link java.util.concurrent.ThreadFactory} for a
* {@code poolName}, must not be {@code null}.
* @return {@code this} {@link Builder}.
* @since 6.1.1
* @see #eventExecutorGroup(EventExecutorGroup)
* @see #eventLoopGroupProvider(EventLoopGroupProvider)
* @see #timer(Timer)
*/
Builder threadFactoryProvider(ThreadFactoryProvider threadFactoryProvider);

/**
* Sets a shared {@link Timer} that can be used across different instances of {@link io.lettuce.core.RedisClient} and
* {@link io.lettuce.core.cluster.RedisClusterClient} The provided {@link Timer} instance will not be shut down when
Expand Down
41 changes: 36 additions & 5 deletions src/main/java/io/lettuce/core/resource/DefaultClientResources.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor;
Expand Down Expand Up @@ -68,6 +67,8 @@
* {@code computationThreadPoolSize}.</li>
* <li>a {@code nettyCustomizer} that is a provided instance of {@link NettyCustomizer}.</li>
* <li>a {@code socketAddressResolver} which is a provided instance of {@link SocketAddressResolver}.</li>
* <li>a {@code threadFactoryProvider} to provide a {@link java.util.concurrent.ThreadFactory} for default timer, event loop and
* event executor instances.</li>
* <li>a {@code timer} that is a provided instance of {@link io.netty.util.HashedWheelTimer}.</li>
* <li>a {@code tracing} that is a provided instance of {@link Tracing}.</li>
* </ul>
Expand Down Expand Up @@ -150,6 +151,8 @@ public class DefaultClientResources implements ClientResources {

private final SocketAddressResolver socketAddressResolver;

private final ThreadFactoryProvider threadFactoryProvider;

private final Timer timer;

private final boolean sharedTimer;
Expand All @@ -161,6 +164,7 @@ public class DefaultClientResources implements ClientResources {
protected DefaultClientResources(Builder builder) {

addressResolverGroup = builder.addressResolverGroup;
threadFactoryProvider = builder.threadFactoryProvider;

if (builder.eventLoopGroupProvider == null) {
int ioThreadPoolSize = builder.ioThreadPoolSize;
Expand All @@ -172,7 +176,7 @@ protected DefaultClientResources(Builder builder) {
}

this.sharedEventLoopGroupProvider = false;
this.eventLoopGroupProvider = new DefaultEventLoopGroupProvider(ioThreadPoolSize);
this.eventLoopGroupProvider = new DefaultEventLoopGroupProvider(ioThreadPoolSize, threadFactoryProvider);

} else {
this.sharedEventLoopGroupProvider = builder.sharedEventLoopGroupProvider;
Expand All @@ -189,15 +193,15 @@ protected DefaultClientResources(Builder builder) {
}

eventExecutorGroup = DefaultEventLoopGroupProvider.createEventLoopGroup(DefaultEventExecutorGroup.class,
computationThreadPoolSize);
computationThreadPoolSize, threadFactoryProvider);
sharedEventExecutor = false;
} else {
sharedEventExecutor = builder.sharedEventExecutor;
eventExecutorGroup = builder.eventExecutorGroup;
}

if (builder.timer == null) {
timer = new HashedWheelTimer(new DefaultThreadFactory("lettuce-timer"));
timer = new HashedWheelTimer(threadFactoryProvider.getThreadFactory("lettuce-timer"));
sharedTimer = false;
} else {
timer = builder.timer;
Expand Down Expand Up @@ -315,6 +319,8 @@ public static class Builder implements ClientResources.Builder {

private boolean sharedTimer;

private ThreadFactoryProvider threadFactoryProvider = DefaultThreadFactoryProvider.INSTANCE;

private Timer timer;

private Tracing tracing = Tracing.disabled();
Expand Down Expand Up @@ -565,6 +571,30 @@ public ClientResources.Builder socketAddressResolver(SocketAddressResolver socke
return this;
}

/**
* Provide a default {@link ThreadFactoryProvider} to obtain {@link java.util.concurrent.ThreadFactory} for a
* {@code poolName}.
* <p>
* Applies only to threading resources created by {@link DefaultClientResources} when not configuring {@link #timer()},
* {@link #eventExecutorGroup()}, or {@link #eventLoopGroupProvider()}.
*
* @param threadFactoryProvider a provider to obtain a {@link java.util.concurrent.ThreadFactory} for a
* {@code poolName}, must not be {@code null}.
* @return {@code this} {@link ClientResources.Builder}.
* @since 6.1.1
* @see #eventExecutorGroup(EventExecutorGroup)
* @see #eventLoopGroupProvider(EventLoopGroupProvider)
* @see #timer(Timer)
*/
@Override
public ClientResources.Builder threadFactoryProvider(ThreadFactoryProvider threadFactoryProvider) {

LettuceAssert.notNull(threadFactoryProvider, "ThreadFactoryProvider must not be null");

this.threadFactoryProvider = threadFactoryProvider;
return this;
}

/**
* Sets a shared {@link Timer} that can be used across different instances of {@link io.lettuce.core.RedisClient} and
* {@link io.lettuce.core.cluster.RedisClusterClient} The provided {@link Timer} instance will not be shut down when
Expand Down Expand Up @@ -634,7 +664,8 @@ public DefaultClientResources.Builder mutate() {
builder.commandLatencyRecorder(commandLatencyRecorder())
.commandLatencyPublisherOptions(commandLatencyPublisherOptions()).dnsResolver(dnsResolver())
.eventBus(eventBus()).eventExecutorGroup(eventExecutorGroup()).reconnectDelay(reconnectDelay)
.socketAddressResolver(socketAddressResolver()).nettyCustomizer(nettyCustomizer()).timer(timer())
.socketAddressResolver(socketAddressResolver()).nettyCustomizer(nettyCustomizer())
.threadFactoryProvider(threadFactoryProvider).timer(timer())
.tracing(tracing()).addressResolverGroup(addressResolverGroup());

builder.sharedCommandLatencyCollector = sharedEventLoopGroupProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor;
Expand Down Expand Up @@ -59,7 +58,7 @@ public class DefaultEventLoopGroupProvider implements EventLoopGroupProvider {

private final int numberOfThreads;

private final ThreadFactoryProvider threadFactoryProvider;
private final io.lettuce.core.resource.ThreadFactoryProvider threadFactoryProvider;

private volatile boolean shutdownCalled = false;

Expand Down Expand Up @@ -88,6 +87,23 @@ public DefaultEventLoopGroupProvider(int numberOfThreads, ThreadFactoryProvider
this.threadFactoryProvider = threadFactoryProvider;
}

/**
* Creates a new instance of {@link DefaultEventLoopGroupProvider}.
*
* @param numberOfThreads number of threads (pool size)
* @param threadFactoryProvider provides access to {@link io.lettuce.core.resource.ThreadFactoryProvider}.
* @since 6.1.1
*/
public DefaultEventLoopGroupProvider(int numberOfThreads,
io.lettuce.core.resource.ThreadFactoryProvider threadFactoryProvider) {

LettuceAssert.isTrue(numberOfThreads > 0, "Number of threads must be greater than zero");
LettuceAssert.notNull(threadFactoryProvider, "ThreadFactoryProvider must not be null");

this.numberOfThreads = numberOfThreads;
this.threadFactoryProvider = threadFactoryProvider;
}

@Override
public <T extends EventLoopGroup> T allocate(Class<T> type) {

Expand Down Expand Up @@ -170,7 +186,7 @@ private <T extends EventLoopGroup> T getOrCreate(Class<T> type) {
* @since 6.0
*/
protected <T extends EventLoopGroup> EventExecutorGroup doCreateEventLoopGroup(Class<T> type, int numberOfThreads,
ThreadFactoryProvider threadFactoryProvider) {
io.lettuce.core.resource.ThreadFactoryProvider threadFactoryProvider) {
return createEventLoopGroup(type, numberOfThreads, threadFactoryProvider);
}

Expand Down Expand Up @@ -209,8 +225,8 @@ public static <T extends EventExecutorGroup> EventExecutorGroup createEventLoopG
* @throws IllegalArgumentException if the {@code type} is not supported.
* @since 5.3
*/
private static <T extends EventExecutorGroup> EventExecutorGroup createEventLoopGroup(Class<T> type, int numberOfThreads,
ThreadFactoryProvider factoryProvider) {
static <T extends EventExecutorGroup> EventExecutorGroup createEventLoopGroup(Class<T> type, int numberOfThreads,
io.lettuce.core.resource.ThreadFactoryProvider factoryProvider) {

logger.debug("Creating executor {}", type.getName());

Expand Down Expand Up @@ -322,26 +338,16 @@ public Future<Boolean> shutdown(long quietPeriod, long timeout, TimeUnit timeUni
*
* @since 6.0
*/
public interface ThreadFactoryProvider {
public interface ThreadFactoryProvider extends io.lettuce.core.resource.ThreadFactoryProvider {

/**
* Return a {@link ThreadFactory} for the given {@code poolName}.
*
* @param poolName a descriptive pool name. Typically used as prefix for thread names.
* @return the {@link ThreadFactory}.
*/
ThreadFactory getThreadFactory(String poolName);

}

enum DefaultThreadFactoryProvider implements ThreadFactoryProvider {

INSTANCE;

@Override
public ThreadFactory getThreadFactory(String poolName) {
return new DefaultThreadFactory(poolName, true);
}
ThreadFactory getThreadFactory(String poolName);

}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core.resource;

import java.util.concurrent.ThreadFactory;

import io.netty.util.concurrent.DefaultThreadFactory;

/**
* Default {@link ThreadFactoryProvider} implementation.
*
* @author Mark Paluch
*/
enum DefaultThreadFactoryProvider implements ThreadFactoryProvider {

INSTANCE;

@Override
public ThreadFactory getThreadFactory(String poolName) {
return new DefaultThreadFactory(poolName, true);
}

}
37 changes: 37 additions & 0 deletions src/main/java/io/lettuce/core/resource/ThreadFactoryProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core.resource;

import java.util.concurrent.ThreadFactory;

/**
* Interface to provide a custom {@link java.util.concurrent.ThreadFactory}. Implementations are asked through
* {@link #getThreadFactory(String)} to provide a thread factory for a given pool name.
*
* @since 6.1.1
*/
@FunctionalInterface
public interface ThreadFactoryProvider {

/**
* Return a {@link ThreadFactory} for the given {@code poolName}.
*
* @param poolName a descriptive pool name. Typically used as prefix for thread names.
* @return the {@link ThreadFactory}.
*/
ThreadFactory getThreadFactory(String poolName);

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import static org.mockito.Mockito.*;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import io.lettuce.test.Wait;
import org.junit.jupiter.api.Test;

import reactor.test.StepVerifier;
Expand All @@ -33,6 +36,7 @@
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;

Expand Down Expand Up @@ -262,4 +266,55 @@ void considersDecoupledSharedStateFromMutation() {
copyTimer.stop();
timer.stop();
}

@Test
void shouldApplyThreadFactory() {

ClientResources clientResources = ClientResources.builder().threadFactoryProvider(name -> runnable -> {
return new MyThread(runnable, name);
}).ioThreadPoolSize(2).computationThreadPoolSize(2).build();

HashedWheelTimer hwt = (HashedWheelTimer) clientResources.timer();
assertThat(hwt).extracting("workerThread").isInstanceOf(MyThread.class);

AtomicReference<Thread> eventExecutorThread = new AtomicReference<>();
EventExecutor eventExecutor = clientResources.eventExecutorGroup().next();
eventExecutor.submit(() -> eventExecutorThread.set(Thread.currentThread())).awaitUninterruptibly();

AtomicReference<Thread> eventLoopThread = new AtomicReference<>();
NioEventLoopGroup eventLoopGroup = clientResources.eventLoopGroupProvider().allocate(NioEventLoopGroup.class);
eventLoopGroup.next().submit(() -> eventLoopThread.set(Thread.currentThread())).awaitUninterruptibly();

clientResources.eventLoopGroupProvider().release(eventLoopGroup, 0, 0, TimeUnit.SECONDS);

clientResources.shutdown(0, 0, TimeUnit.SECONDS);

assertThat(MyThread.started).hasValue(5);
Wait.untilEquals(5, () -> MyThread.finished).waitOrTimeout();
}

static class MyThread extends Thread {

public static AtomicInteger started = new AtomicInteger();

public static AtomicInteger finished = new AtomicInteger();

public MyThread(Runnable target, String name) {
super(target, name);
}

@Override
public synchronized void start() {
started.incrementAndGet();
super.start();
}

@Override
public void run() {
super.run();
finished.incrementAndGet();
}

}

}

0 comments on commit 9d0b51c

Please sign in to comment.