Skip to content

Commit

Permalink
Refactor connection activation to ConnectionInitializer #697
Browse files Browse the repository at this point in the history
Connections are now initialized with a ConnectionInitializer that performs the Handshake and issues initialization commands after the handshake. The ConnectionState keeps track of the desired state and applies it on connect and reconnect. This change removes the need to track the state of each connection flag in each connection implementation and moves all flags into a single state object.

The initialization/handshake future is now also provided by RedisHandshakeHandler instead of living in various initializer types.

The handshake for SSL and plaintext connections became simpler and easier to find.

Centralizing the handshake is a pre-requisite for protocol version negotiation.
  • Loading branch information
mp911de committed Dec 28, 2019
1 parent 0c2c05c commit abbfdf6
Show file tree
Hide file tree
Showing 34 changed files with 740 additions and 881 deletions.
23 changes: 11 additions & 12 deletions src/main/java/io/lettuce/core/AbstractRedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package io.lettuce.core;

import static java.util.concurrent.CompletableFuture.completedFuture;

import java.io.Closeable;
import java.net.SocketAddress;
import java.time.Duration;
Expand All @@ -33,6 +31,7 @@
import io.lettuce.core.internal.Futures;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.protocol.ConnectionWatchdog;
import io.lettuce.core.protocol.RedisHandshakeHandler;
import io.lettuce.core.resource.ClientResources;
import io.lettuce.core.resource.DefaultClientResources;
import io.netty.bootstrap.Bootstrap;
Expand Down Expand Up @@ -166,10 +165,6 @@ protected void connectionBuilder(Mono<SocketAddress> socketAddressSupplier, Conn
connectionBuilder.socketAddressSupplier(socketAddressSupplier);
}

private boolean hasPassword(RedisURI connectionSettings) {
return connectionSettings.getPassword() != null && connectionSettings.getPassword().length != 0;
}

protected void channelType(ConnectionBuilder connectionBuilder, ConnectionPoint connectionPoint) {

LettuceAssert.notNull(connectionPoint, "ConnectionPoint must not be null");
Expand Down Expand Up @@ -309,18 +304,16 @@ private void initializeChannelAsync0(ConnectionBuilder connectionBuilder, Comple

Bootstrap redisBootstrap = connectionBuilder.bootstrap();

RedisChannelInitializer initializer = connectionBuilder.build();
ChannelInitializer<Channel> initializer = connectionBuilder.build();
redisBootstrap.handler(initializer);

clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap);
CompletableFuture<Boolean> initFuture = initializer.channelInitialized();
ChannelFuture connectFuture = redisBootstrap.connect(redisAddress);

channelReadyFuture.whenComplete((c, t) -> {

if (t instanceof CancellationException) {
connectFuture.cancel(true);
initFuture.cancel(true);
}
});

Expand All @@ -334,7 +327,14 @@ private void initializeChannelAsync0(ConnectionBuilder connectionBuilder, Comple
return;
}

initFuture.whenComplete((success, throwable) -> {
RedisHandshakeHandler handshakeHandler = connectFuture.channel().pipeline().get(RedisHandshakeHandler.class);

if (handshakeHandler == null) {
channelReadyFuture.completeExceptionally(new IllegalStateException("RedisHandshakeHandler not registered"));
return;
}

handshakeHandler.channelInitialized().whenComplete((success, throwable) -> {

if (throwable == null) {

Expand Down Expand Up @@ -432,7 +432,6 @@ public CompletableFuture<Void> shutdownAsync() {
* @param timeUnit the unit of {@code quietPeriod} and {@code timeout}
* @since 4.4
*/
@SuppressWarnings("rawtypes")
public CompletableFuture<Void> shutdownAsync(long quietPeriod, long timeout, TimeUnit timeUnit) {

if (shutdown.compareAndSet(false, true)) {
Expand All @@ -441,7 +440,7 @@ public CompletableFuture<Void> shutdownAsync(long quietPeriod, long timeout, Tim
return closeResources().thenCompose((value) -> closeClientResources(quietPeriod, timeout, timeUnit));
}

return completedFuture(null);
return CompletableFuture.completedFuture(null);
}

private CompletableFuture<Void> closeResources() {
Expand Down
15 changes: 13 additions & 2 deletions src/main/java/io/lettuce/core/ChannelGroupListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
*/
package io.lettuce.core;

import static io.lettuce.core.ConnectionEventTrigger.local;
import static io.lettuce.core.ConnectionEventTrigger.remote;

import io.lettuce.core.event.EventBus;
import io.lettuce.core.event.connection.ConnectedEvent;
import io.lettuce.core.event.connection.DisconnectedEvent;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
Expand All @@ -25,23 +31,28 @@
* ChannelGroup.
*
* @author Will Glozer
* @author Mark Paluch
*/
class ChannelGroupListener extends ChannelInboundHandlerAdapter {

private ChannelGroup channels;
private final ChannelGroup channels;
private final EventBus eventBus;

public ChannelGroupListener(ChannelGroup channels) {
public ChannelGroupListener(ChannelGroup channels, EventBus eventBus) {
this.channels = channels;
this.eventBus = eventBus;
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
eventBus.publish(new ConnectedEvent(local(ctx), remote(ctx)));
channels.add(ctx.channel());
super.channelActive(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
eventBus.publish(new DisconnectedEvent(local(ctx), remote(ctx)));
channels.remove(ctx.channel());
super.channelInactive(ctx);
}
Expand Down
134 changes: 32 additions & 102 deletions src/main/java/io/lettuce/core/ConnectionBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,19 @@
package io.lettuce.core;

import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;

import reactor.core.publisher.Mono;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.protocol.*;
import io.lettuce.core.resource.ClientResources;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.group.ChannelGroup;
import io.netty.util.Timer;

Expand All @@ -41,13 +39,6 @@
*/
public class ConnectionBuilder {

private static final RedisCommandBuilder<String, String> INITIALIZING_CMD_BUILDER = new RedisCommandBuilder<>(
StringCodec.UTF8);

private static final Supplier<AsyncCommand<?, ?, ?>> PING = () -> new AsyncCommand<>(INITIALIZING_CMD_BUILDER.ping());

private static final Supplier<AsyncCommand<?, ?, ?>> NO_PING = () -> null;

private Mono<SocketAddress> socketAddressSupplier;
private ConnectionEvents connectionEvents;
private RedisChannelHandler<?, ?> connection;
Expand All @@ -59,57 +50,21 @@ public class ConnectionBuilder {
private ClientOptions clientOptions;
private Duration timeout;
private ClientResources clientResources;
private String username;
private char[] password;
private String clientName;
private ConnectionInitializer connectionInitializer;
private ReconnectionListener reconnectionListener = ReconnectionListener.NO_OP;
private ConnectionWatchdog connectionWatchdog;
private Supplier<AsyncCommand<?, ?, ?>> handshakeCommandSupplier = ConnectionBuilder.PING;

public static ConnectionBuilder connectionBuilder() {
return new ConnectionBuilder();
}

/**
* @param handshakeCommandSupplier
* @return {@literal true} whether {@code PING}/{@code HELLO} handshake is enabled.
*/
static boolean isHandshakeEnabled(Supplier<AsyncCommand<?, ?, ?>> handshakeCommandSupplier) {
return handshakeCommandSupplier != NO_PING;
}

/**
* Apply settings from {@link RedisURI}
*
* @param redisURI
*/
public void apply(RedisURI redisURI) {

timeout(redisURI.getTimeout());
clientName(redisURI.getClientName());

if (clientOptions.getProtocolVersion() == ProtocolVersion.RESP2) {

pingBeforeConnect(clientOptions.isPingBeforeActivateConnection());

if (clientOptions.isPingBeforeActivateConnection() && hasPassword(redisURI)) {
auth(redisURI.getPassword());
handshakeAuthResp2();
}
} else if (clientOptions.getProtocolVersion() == ProtocolVersion.RESP3) {

if (hasPassword(redisURI)) {
if (LettuceStrings.isNotEmpty(redisURI.getUsername())) {
auth(redisURI.getUsername(), redisURI.getPassword());
} else {
auth("default", redisURI.getPassword());
}
handshakeAuthResp3();

} else {
handshakeResp3();
}
}
}

protected List<ChannelHandler> buildHandlers() {
Expand All @@ -119,13 +74,15 @@ protected List<ChannelHandler> buildHandlers() {
LettuceAssert.assertState(connection != null, "Connection must be set");
LettuceAssert.assertState(clientResources != null, "ClientResources must be set");
LettuceAssert.assertState(endpoint != null, "Endpoint must be set");
LettuceAssert.assertState(connectionInitializer != null, "ConnectionInitializer must be set");

List<ChannelHandler> handlers = new ArrayList<>();

connection.setOptions(clientOptions);

handlers.add(new ChannelGroupListener(channelGroup));
handlers.add(new ChannelGroupListener(channelGroup, clientResources.eventBus()));
handlers.add(new CommandEncoder());
handlers.add(getHandshakeHandler());
handlers.add(commandHandlerSupplier.get());

handlers.add(new ConnectionEventTrigger(connectionEvents, connection, clientResources.eventBus()));
Expand All @@ -137,22 +94,8 @@ protected List<ChannelHandler> buildHandlers() {
return handlers;
}

void pingBeforeConnect(boolean state) {
handshakeCommandSupplier = state ? PING : NO_PING;
}

void handshakeAuthResp2() {
handshakeCommandSupplier = () -> new AsyncCommand<>(INITIALIZING_CMD_BUILDER.auth(new String(password)));
}

void handshakeAuthResp3() {
handshakeCommandSupplier = () -> new AsyncCommand<>(INITIALIZING_CMD_BUILDER.hello(3, this.username.getBytes(),
encode(this.password), this.clientName != null ? this.clientName.getBytes() : null));
}

void handshakeResp3() {
handshakeCommandSupplier = () -> new AsyncCommand<>(
INITIALIZING_CMD_BUILDER.hello(3, null, null, this.clientName != null ? this.clientName.getBytes() : null));
protected ChannelHandler getHandshakeHandler() {
return new RedisHandshakeHandler(connectionInitializer, clientResources, timeout);
}

protected ConnectionWatchdog createConnectionWatchdog() {
Expand All @@ -175,8 +118,8 @@ protected ConnectionWatchdog createConnectionWatchdog() {
return watchdog;
}

public RedisChannelInitializer build() {
return new PlainChannelInitializer(handshakeCommandSupplier, this::buildHandlers, clientResources, timeout);
public ChannelInitializer<Channel> build() {
return new PlainChannelInitializer(this::buildHandlers, clientResources);
}

public ConnectionBuilder socketAddressSupplier(Mono<SocketAddress> socketAddressSupplier) {
Expand Down Expand Up @@ -250,29 +193,8 @@ public ConnectionBuilder clientResources(ClientResources clientResources) {
return this;
}

public ConnectionBuilder clientName(String clientName) {
this.clientName = clientName;
return this;
}

boolean hasPassword(RedisURI connectionSettings) {
return connectionSettings.getPassword() != null && connectionSettings.getPassword().length != 0;
}

public ConnectionBuilder auth(String username, char[] password) {
this.username = username;
this.password = password;
return this;
}

public ConnectionBuilder auth(char[] password) {
this.password = password;
return this;
}

@Deprecated
public ConnectionBuilder password(char[] password) {
this.password = password;
public ConnectionBuilder connectionInitializer(ConnectionInitializer connectionInitializer) {
this.connectionInitializer = connectionInitializer;
return this;
}

Expand All @@ -292,24 +214,32 @@ public ClientResources clientResources() {
return clientResources;
}

public char[] password() {
return password;
}

public Endpoint endpoint() {
return endpoint;
}

Supplier<AsyncCommand<?, ?, ?>> getHandshakeCommandSupplier() {
return handshakeCommandSupplier;
}
static class PlainChannelInitializer extends ChannelInitializer<Channel> {

private final Supplier<List<ChannelHandler>> handlers;
private final ClientResources clientResources;

PlainChannelInitializer(Supplier<List<ChannelHandler>> handlers, ClientResources clientResources) {
this.handlers = handlers;
this.clientResources = clientResources;
}

static byte[] encode(char[] chars) {
@Override
protected void initChannel(Channel channel) {
doInitialize(channel);
}

ByteBuffer encoded = Charset.defaultCharset().encode(CharBuffer.wrap(chars));
byte[] bytes = new byte[encoded.remaining()];
encoded.get(bytes);
private void doInitialize(Channel channel) {

return bytes;
for (ChannelHandler handler : handlers.get()) {
channel.pipeline().addLast(handler);
}

clientResources.nettyCustomizer().afterChannelInitialized(channel);
}
}
}
2 changes: 2 additions & 0 deletions src/main/java/io/lettuce/core/ConnectionEventTrigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.net.SocketAddress;

import io.lettuce.core.event.EventBus;
import io.lettuce.core.event.connection.ConnectionActivatedEvent;
import io.lettuce.core.event.connection.ConnectionDeactivatedEvent;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -43,6 +44,7 @@ class ConnectionEventTrigger extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
connectionEvents.fireEventRedisConnected(connection, ctx.channel().remoteAddress());
eventBus.publish(new ConnectionActivatedEvent(local(ctx), remote(ctx)));
super.channelActive(ctx);
}

Expand Down
Loading

0 comments on commit abbfdf6

Please sign in to comment.