Skip to content

Commit

Permalink
Allow configuring a default ThreadFactoryProvider to create ClientRes…
Browse files Browse the repository at this point in the history
…ources #1711
  • Loading branch information
mp911de committed Apr 7, 2021
1 parent f5f85b2 commit d76b6af
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 30 deletions.
30 changes: 29 additions & 1 deletion src/main/java/io/lettuce/core/resource/ClientResources.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,23 @@ public interface ClientResources {
/**
* Create a new {@link ClientResources} using default settings.
*
* @return a new instance of a default client resources.
* @return a new instance of default client resources.
*/
static ClientResources create() {
return DefaultClientResources.create();
}

/**
* Create a new {@link ClientResources} using default settings.
*
* @param threadFactoryProvider provides a {@link java.util.concurrent.ThreadFactory} to create threads.
* @return a new instance of default client resources.
* @since 6.0.4
*/
static ClientResources create(ThreadFactoryProvider threadFactoryProvider) {
return DefaultClientResources.builder().threadFactoryProvider(threadFactoryProvider).build();
}

/**
* Create a new {@link ClientResources} using default settings.
*
Expand Down Expand Up @@ -219,6 +230,23 @@ default Builder commandLatencyCollector(CommandLatencyCollector commandLatencyCo
*/
Builder socketAddressResolver(SocketAddressResolver socketAddressResolver);

/**
* Provide a default {@link ThreadFactoryProvider} to obtain {@link java.util.concurrent.ThreadFactory} for a
* {@code poolName} to create threads.
* <p>
* Applies only to threading resources created by {@link ClientResources} when not configuring {@link #timer()},
* {@link #eventExecutorGroup()}, or {@link #eventLoopGroupProvider()}.
*
* @param threadFactoryProvider a provider to obtain a {@link java.util.concurrent.ThreadFactory} for a
* {@code poolName}, must not be {@code null}.
* @return {@code this} {@link Builder}.
* @since 6.0.4
* @see #eventExecutorGroup(EventExecutorGroup)
* @see #eventLoopGroupProvider(EventLoopGroupProvider)
* @see #timer(Timer)
*/
Builder threadFactoryProvider(ThreadFactoryProvider threadFactoryProvider);

/**
* Sets a shared {@link Timer} that can be used across different instances of {@link io.lettuce.core.RedisClient} and
* {@link io.lettuce.core.cluster.RedisClusterClient} The provided {@link Timer} instance will not be shut down when
Expand Down
56 changes: 49 additions & 7 deletions src/main/java/io/lettuce/core/resource/DefaultClientResources.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,22 @@
import io.lettuce.core.event.metrics.MetricEventPublisher;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.internal.LettuceLists;
import io.lettuce.core.metrics.*;
import io.lettuce.core.metrics.CommandLatencyCollector;
import io.lettuce.core.metrics.CommandLatencyCollectorOptions;
import io.lettuce.core.metrics.CommandLatencyRecorder;
import io.lettuce.core.metrics.DefaultCommandLatencyCollector;
import io.lettuce.core.metrics.DefaultCommandLatencyCollectorOptions;
import io.lettuce.core.metrics.MetricCollector;
import io.lettuce.core.resource.Delay.StatefulDelay;
import io.lettuce.core.tracing.Tracing;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.*;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.PromiseCombiner;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
Expand All @@ -57,6 +67,8 @@
* <li>a {@code dnsResolver} which is a provided instance of {@link DnsResolver}.</li>
* <li>a {@code nettyCustomizer} that is a provided instance of {@link NettyCustomizer}.</li>
* <li>a {@code socketAddressResolver} which is a provided instance of {@link SocketAddressResolver}.</li>
* <li>a {@code threadFactoryProvider} to provide a {@link java.util.concurrent.ThreadFactory} for default timer, event loop and
* event executor instances.</li>
* <li>a {@code timer} that is a provided instance of {@link io.netty.util.HashedWheelTimer}.</li>
* <li>a {@code tracing} that is a provided instance of {@link Tracing}.</li>
* </ul>
Expand Down Expand Up @@ -130,6 +142,8 @@ public class DefaultClientResources implements ClientResources {

private final SocketAddressResolver socketAddressResolver;

private final ThreadFactoryProvider threadFactoryProvider;

private final Timer timer;

private final boolean sharedTimer;
Expand All @@ -140,6 +154,8 @@ public class DefaultClientResources implements ClientResources {

protected DefaultClientResources(Builder builder) {

threadFactoryProvider = builder.threadFactoryProvider;

if (builder.eventLoopGroupProvider == null) {
int ioThreadPoolSize = builder.ioThreadPoolSize;

Expand All @@ -150,7 +166,7 @@ protected DefaultClientResources(Builder builder) {
}

this.sharedEventLoopGroupProvider = false;
this.eventLoopGroupProvider = new DefaultEventLoopGroupProvider(ioThreadPoolSize);
this.eventLoopGroupProvider = new DefaultEventLoopGroupProvider(ioThreadPoolSize, threadFactoryProvider);

} else {
this.sharedEventLoopGroupProvider = builder.sharedEventLoopGroupProvider;
Expand All @@ -167,15 +183,15 @@ protected DefaultClientResources(Builder builder) {
}

eventExecutorGroup = DefaultEventLoopGroupProvider.createEventLoopGroup(DefaultEventExecutorGroup.class,
computationThreadPoolSize);
computationThreadPoolSize, threadFactoryProvider);
sharedEventExecutor = false;
} else {
sharedEventExecutor = builder.sharedEventExecutor;
eventExecutorGroup = builder.eventExecutorGroup;
}

if (builder.timer == null) {
timer = new HashedWheelTimer(new DefaultThreadFactory("lettuce-timer"));
timer = new HashedWheelTimer(threadFactoryProvider.getThreadFactory("lettuce-timer"));
sharedTimer = false;
} else {
timer = builder.timer;
Expand Down Expand Up @@ -293,6 +309,8 @@ public static class Builder implements ClientResources.Builder {

private boolean sharedTimer;

private ThreadFactoryProvider threadFactoryProvider = DefaultThreadFactoryProvider.INSTANCE;

private Timer timer;

private Tracing tracing = Tracing.disabled();
Expand Down Expand Up @@ -522,6 +540,30 @@ public ClientResources.Builder socketAddressResolver(SocketAddressResolver socke
return this;
}

/**
* Provide a default {@link ThreadFactoryProvider} to obtain {@link java.util.concurrent.ThreadFactory} for a
* {@code poolName}.
* <p>
* Applies only to threading resources created by {@link DefaultClientResources} when not configuring {@link #timer()},
* {@link #eventExecutorGroup()}, or {@link #eventLoopGroupProvider()}.
*
* @param threadFactoryProvider a provider to obtain a {@link java.util.concurrent.ThreadFactory} for a
* {@code poolName}, must not be {@code null}.
* @return {@code this} {@link ClientResources.Builder}.
* @since 6.0.4
* @see #eventExecutorGroup(EventExecutorGroup)
* @see #eventLoopGroupProvider(EventLoopGroupProvider)
* @see #timer(Timer)
*/
@Override
public ClientResources.Builder threadFactoryProvider(ThreadFactoryProvider threadFactoryProvider) {

LettuceAssert.notNull(threadFactoryProvider, "ThreadFactoryProvider must not be null");

this.threadFactoryProvider = threadFactoryProvider;
return this;
}

/**
* Sets a shared {@link Timer} that can be used across different instances of {@link io.lettuce.core.RedisClient} and
* {@link io.lettuce.core.cluster.RedisClusterClient} The provided {@link Timer} instance will not be shut down when
Expand Down Expand Up @@ -592,8 +634,8 @@ public DefaultClientResources.Builder mutate() {
builder.commandLatencyRecorder(commandLatencyRecorder())
.commandLatencyPublisherOptions(commandLatencyPublisherOptions()).dnsResolver(dnsResolver())
.eventBus(eventBus()).eventExecutorGroup(eventExecutorGroup()).reconnectDelay(reconnectDelay)
.socketAddressResolver(socketAddressResolver()).nettyCustomizer(nettyCustomizer()).timer(timer())
.tracing(tracing());
.socketAddressResolver(socketAddressResolver()).nettyCustomizer(nettyCustomizer())
.threadFactoryProvider(threadFactoryProvider).timer(timer()).tracing(tracing());

builder.sharedCommandLatencyCollector = sharedEventLoopGroupProvider;
builder.sharedEventExecutor = sharedEventExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class DefaultEventLoopGroupProvider implements EventLoopGroupProvider {

private final int numberOfThreads;

private final ThreadFactoryProvider threadFactoryProvider;
private final io.lettuce.core.resource.ThreadFactoryProvider threadFactoryProvider;

private volatile boolean shutdownCalled = false;

Expand Down Expand Up @@ -80,6 +80,23 @@ public DefaultEventLoopGroupProvider(int numberOfThreads, ThreadFactoryProvider
this.threadFactoryProvider = threadFactoryProvider;
}

/**
* Creates a new instance of {@link DefaultEventLoopGroupProvider}.
*
* @param numberOfThreads number of threads (pool size)
* @param threadFactoryProvider provides access to {@link io.lettuce.core.resource.ThreadFactoryProvider}.
* @since 6.0.4
*/
public DefaultEventLoopGroupProvider(int numberOfThreads,
io.lettuce.core.resource.ThreadFactoryProvider threadFactoryProvider) {

LettuceAssert.isTrue(numberOfThreads > 0, "Number of threads must be greater than zero");
LettuceAssert.notNull(threadFactoryProvider, "ThreadFactoryProvider must not be null");

this.numberOfThreads = numberOfThreads;
this.threadFactoryProvider = threadFactoryProvider;
}

@Override
public <T extends EventLoopGroup> T allocate(Class<T> type) {

Expand Down Expand Up @@ -162,7 +179,7 @@ private <T extends EventLoopGroup> T getOrCreate(Class<T> type) {
* @since 6.0
*/
protected <T extends EventLoopGroup> EventExecutorGroup doCreateEventLoopGroup(Class<T> type, int numberOfThreads,
ThreadFactoryProvider threadFactoryProvider) {
io.lettuce.core.resource.ThreadFactoryProvider threadFactoryProvider) {
return createEventLoopGroup(type, numberOfThreads, threadFactoryProvider);
}

Expand Down Expand Up @@ -201,8 +218,8 @@ public static <T extends EventExecutorGroup> EventExecutorGroup createEventLoopG
* @throws IllegalArgumentException if the {@code type} is not supported.
* @since 5.3
*/
private static <T extends EventExecutorGroup> EventExecutorGroup createEventLoopGroup(Class<T> type, int numberOfThreads,
ThreadFactoryProvider factoryProvider) {
static <T extends EventExecutorGroup> EventExecutorGroup createEventLoopGroup(Class<T> type, int numberOfThreads,
io.lettuce.core.resource.ThreadFactoryProvider factoryProvider) {

logger.debug("Creating executor {}", type.getName());

Expand Down Expand Up @@ -304,26 +321,16 @@ public Future<Boolean> shutdown(long quietPeriod, long timeout, TimeUnit timeUni
*
* @since 6.0
*/
public interface ThreadFactoryProvider {
public interface ThreadFactoryProvider extends io.lettuce.core.resource.ThreadFactoryProvider {

/**
* Return a {@link ThreadFactory} for the given {@code poolName}.
*
* @param poolName a descriptive pool name. Typically used as prefix for thread names.
* @return the {@link ThreadFactory}.
*/
ThreadFactory getThreadFactory(String poolName);

}

enum DefaultThreadFactoryProvider implements ThreadFactoryProvider {

INSTANCE;

@Override
public ThreadFactory getThreadFactory(String poolName) {
return new DefaultThreadFactory(poolName, true);
}
ThreadFactory getThreadFactory(String poolName);

}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core.resource;

import java.util.concurrent.ThreadFactory;

import io.netty.util.concurrent.DefaultThreadFactory;

/**
* Default {@link ThreadFactoryProvider} implementation.
*
* @author Mark Paluch
*/
enum DefaultThreadFactoryProvider implements ThreadFactoryProvider {

INSTANCE;

@Override
public ThreadFactory getThreadFactory(String poolName) {
return new DefaultThreadFactory(poolName, true);
}

}
37 changes: 37 additions & 0 deletions src/main/java/io/lettuce/core/resource/ThreadFactoryProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core.resource;

import java.util.concurrent.ThreadFactory;

/**
* Interface to provide a custom {@link java.util.concurrent.ThreadFactory}. Implementations are asked through
* {@link #getThreadFactory(String)} to provide a thread factory for a given pool name.
*
* @since 6.0.4
*/
@FunctionalInterface
public interface ThreadFactoryProvider {

/**
* Return a {@link ThreadFactory} for the given {@code poolName}.
*
* @param poolName a descriptive pool name. Typically used as prefix for thread names.
* @return the {@link ThreadFactory}.
*/
ThreadFactory getThreadFactory(String poolName);

}
Loading

0 comments on commit d76b6af

Please sign in to comment.