Skip to content

Commit

Permalink
Reduce visibility of fields in AbstractRedisClient #1241
Browse files Browse the repository at this point in the history
We now have better encapsulation in AbstractRedisClient. Subclasses use proper accessors and field access is greatly reduced.
  • Loading branch information
mp911de committed Mar 13, 2020
1 parent 4b602e7 commit 87dc833
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 228 deletions.
159 changes: 85 additions & 74 deletions src/main/java/io/lettuce/core/AbstractRedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@
import io.lettuce.core.resource.ClientResources;
import io.lettuce.core.resource.DefaultClientResources;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.internal.logging.InternalLogger;
Expand All @@ -65,24 +64,20 @@
*/
public abstract class AbstractRedisClient {

protected static final PooledByteBufAllocator BUF_ALLOCATOR = PooledByteBufAllocator.DEFAULT;
protected static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisClient.class);
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractRedisClient.class);

protected final Map<Class<? extends EventLoopGroup>, EventLoopGroup> eventLoopGroups = new ConcurrentHashMap<>(2);
protected final ConnectionEvents connectionEvents = new ConnectionEvents();
protected final Set<Closeable> closeableResources = ConcurrentHashMap.newKeySet();
protected final EventExecutorGroup genericWorkerPool;
protected final HashedWheelTimer timer;
protected final ChannelGroup channels;
protected final ClientResources clientResources;

protected volatile ClientOptions clientOptions = ClientOptions.builder().build();

protected Duration timeout = RedisURI.DEFAULT_TIMEOUT_DURATION;

private final ClientResources clientResources;
private final Map<Class<? extends EventLoopGroup>, EventLoopGroup> eventLoopGroups = new ConcurrentHashMap<>(2);
private final boolean sharedResources;
private final AtomicBoolean shutdown = new AtomicBoolean();

private volatile ClientOptions clientOptions = ClientOptions.create();
private volatile Duration defaultTimeout = RedisURI.DEFAULT_TIMEOUT_DURATION;

/**
* Create a new instance with client resources.
*
Expand All @@ -92,16 +87,27 @@ public abstract class AbstractRedisClient {
protected AbstractRedisClient(ClientResources clientResources) {

if (clientResources == null) {
sharedResources = false;
this.sharedResources = false;
this.clientResources = DefaultClientResources.create();
} else {
sharedResources = true;
this.sharedResources = true;
this.clientResources = clientResources;
}

genericWorkerPool = this.clientResources.eventExecutorGroup();
channels = new DefaultChannelGroup(genericWorkerPool.next());
timer = (HashedWheelTimer) this.clientResources.timer();
this.channels = new DefaultChannelGroup(this.clientResources.eventExecutorGroup().next());
}

protected int getChannelCount() {
return channels.size();
}

/**
* Returns the default {@link Duration timeout} for commands.
*
* @return the default {@link Duration timeout} for commands.
*/
public Duration getDefaultTimeout() {
return defaultTimeout;
}

/**
Expand All @@ -116,7 +122,7 @@ public void setDefaultTimeout(Duration timeout) {
LettuceAssert.notNull(timeout, "Timeout duration must not be null");
LettuceAssert.isTrue(!timeout.isNegative(), "Timeout duration must be greater or equal to zero");

this.timeout = timeout;
this.defaultTimeout = timeout;
}

/**
Expand All @@ -132,6 +138,65 @@ public void setDefaultTimeout(long timeout, TimeUnit unit) {
setDefaultTimeout(Duration.ofNanos(unit.toNanos(timeout)));
}

/**
* Returns the {@link ClientOptions} which are valid for that client. Connections inherit the current options at the moment
* the connection is created. Changes to options will not affect existing connections.
*
* @return the {@link ClientOptions} for this client
*/
public ClientOptions getOptions() {
return clientOptions;
}

/**
* Set the {@link ClientOptions} for the client.
*
* @param clientOptions client options for the client and connections that are created after setting the options
*/
protected void setOptions(ClientOptions clientOptions) {
LettuceAssert.notNull(clientOptions, "ClientOptions must not be null");
this.clientOptions = clientOptions;
}

/**
* Returns the {@link ClientResources} which are used with that client.
*
* @return the {@link ClientResources} for this client.
* @since 6.0
*
*/
public ClientResources getResources() {
return clientResources;
}

protected int getResourceCount() {
return closeableResources.size();
}

/**
* Add a listener for the RedisConnectionState. The listener is notified every time a connect/disconnect/IO exception
* happens. The listeners are not bound to a specific connection, so every time a connection event happens on any
* connection, the listener will be notified. The corresponding netty channel handler (async connection) is passed on the
* event.
*
* @param listener must not be {@literal null}
*/
public void addListener(RedisConnectionStateListener listener) {
LettuceAssert.notNull(listener, "RedisConnectionStateListener must not be null");
connectionEvents.addListener(listener);
}

/**
* Removes a listener.
*
* @param listener must not be {@literal null}
*/
public void removeListener(RedisConnectionStateListener listener) {

LettuceAssert.notNull(listener, "RedisConnectionStateListener must not be null");
connectionEvents.removeListener(listener);
}

/**
* Populate connection builder with necessary resources.
*
Expand All @@ -143,9 +208,7 @@ protected void connectionBuilder(Mono<SocketAddress> socketAddressSupplier, Conn
RedisURI redisURI) {

Bootstrap redisBootstrap = new Bootstrap();
redisBootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
redisBootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
redisBootstrap.option(ChannelOption.ALLOCATOR, BUF_ALLOCATOR);
redisBootstrap.option(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT);

ClientOptions clientOptions = getOptions();
SocketOptions socketOptions = clientOptions.getSocketOptions();
Expand All @@ -161,7 +224,7 @@ protected void connectionBuilder(Mono<SocketAddress> socketAddressSupplier, Conn
connectionBuilder.apply(redisURI);

connectionBuilder.bootstrap(redisBootstrap);
connectionBuilder.channelGroup(channels).connectionEvents(connectionEvents).timer(timer);
connectionBuilder.channelGroup(channels).connectionEvents(connectionEvents);
connectionBuilder.socketAddressSupplier(socketAddressSupplier);
}

Expand Down Expand Up @@ -512,58 +575,6 @@ private CompletableFuture<Void> closeClientResources(long quietPeriod, long time
return Futures.allOf(groupCloseFutures);
}

protected int getResourceCount() {
return closeableResources.size();
}

protected int getChannelCount() {
return channels.size();
}

/**
* Add a listener for the RedisConnectionState. The listener is notified every time a connect/disconnect/IO exception
* happens. The listeners are not bound to a specific connection, so every time a connection event happens on any
* connection, the listener will be notified. The corresponding netty channel handler (async connection) is passed on the
* event.
*
* @param listener must not be {@literal null}
*/
public void addListener(RedisConnectionStateListener listener) {
LettuceAssert.notNull(listener, "RedisConnectionStateListener must not be null");
connectionEvents.addListener(listener);
}

/**
* Removes a listener.
*
* @param listener must not be {@literal null}
*/
public void removeListener(RedisConnectionStateListener listener) {

LettuceAssert.notNull(listener, "RedisConnectionStateListener must not be null");
connectionEvents.removeListener(listener);
}

/**
* Returns the {@link ClientOptions} which are valid for that client. Connections inherit the current options at the moment
* the connection is created. Changes to options will not affect existing connections.
*
* @return the {@link ClientOptions} for this client
*/
public ClientOptions getOptions() {
return clientOptions;
}

/**
* Set the {@link ClientOptions} for the client.
*
* @param clientOptions client options for the client and connections that are created after setting the options
*/
protected void setOptions(ClientOptions clientOptions) {
LettuceAssert.notNull(clientOptions, "ClientOptions must not be null");
this.clientOptions = clientOptions;
}

protected RedisHandshake createHandshake(ConnectionState state) {
return new RedisHandshake(clientOptions.getConfiguredProtocolVersion(), clientOptions.isPingBeforeActivateConnection(),
state);
Expand Down
10 changes: 2 additions & 8 deletions src/main/java/io/lettuce/core/ConnectionBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public class ConnectionBuilder {
private Endpoint endpoint;
private Supplier<CommandHandler> commandHandlerSupplier;
private ChannelGroup channelGroup;
private Timer timer;
private Bootstrap bootstrap;
private ClientOptions clientOptions;
private Duration timeout;
Expand Down Expand Up @@ -105,10 +104,10 @@ protected ConnectionWatchdog createConnectionWatchdog() {
}

LettuceAssert.assertState(bootstrap != null, "Bootstrap must be set for autoReconnect=true");
LettuceAssert.assertState(timer != null, "Timer must be set for autoReconnect=true");
LettuceAssert.assertState(socketAddressSupplier != null, "SocketAddressSupplier must be set for autoReconnect=true");

ConnectionWatchdog watchdog = new ConnectionWatchdog(clientResources.reconnectDelay(), clientOptions, bootstrap, timer,
ConnectionWatchdog watchdog = new ConnectionWatchdog(clientResources.reconnectDelay(), clientOptions, bootstrap,
clientResources.timer(),
clientResources.eventExecutorGroup(), socketAddressSupplier, reconnectionListener, connection,
clientResources.eventBus());

Expand Down Expand Up @@ -173,11 +172,6 @@ public ConnectionBuilder commandHandler(Supplier<CommandHandler> supplier) {
return this;
}

public ConnectionBuilder timer(Timer timer) {
this.timer = timer;
return this;
}

public ConnectionBuilder bootstrap(Bootstrap bootstrap) {
this.bootstrap = bootstrap;
return this;
Expand Down
Loading

0 comments on commit 87dc833

Please sign in to comment.