Skip to content

Commit

Permalink
Allow usage of publishOn scheduler for reactive signal emission #905
Browse files Browse the repository at this point in the history
Emission of data and completion signals using the reactive API can now use Lettuce's EventExecutorGroup instead of the I/O thread. Scheduler usage allows multiplexing of threads and early I/O thread release.

Using a single Redis connection in a reactive context can cause effectively single-threaded behavior as processing of downstream operators happens typically on the I/O thread. Compute-intensive processing consumes I/O thread capacity and results in performance decrease.
  • Loading branch information
mp911de committed Dec 10, 2018
1 parent 7f218c0 commit e0e83b0
Show file tree
Hide file tree
Showing 6 changed files with 340 additions and 15 deletions.
31 changes: 27 additions & 4 deletions src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import io.lettuce.core.tracing.TraceContext;
import io.lettuce.core.tracing.TraceContextProvider;
import io.lettuce.core.tracing.Tracing;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.ImmediateEventExecutor;

/**
* A reactive and thread-safe API for a Redis connection.
Expand All @@ -52,11 +54,13 @@ public abstract class AbstractRedisReactiveCommands<K, V> implements RedisHashRe
RedisServerReactiveCommands<K, V>, RedisHLLReactiveCommands<K, V>, BaseRedisReactiveCommands<K, V>,
RedisTransactionalReactiveCommands<K, V>, RedisGeoReactiveCommands<K, V>, RedisClusterReactiveCommands<K, V> {

private final Object mutex = new Object();
private final StatefulConnection<K, V> connection;
private final RedisCodec<K, V> codec;
private final RedisCommandBuilder<K, V> commandBuilder;
private final ClientResources clientResources;
private final boolean tracingEnabled;
private EventExecutorGroup scheduler;

/**
* Initialize a new instance.
Expand All @@ -72,6 +76,24 @@ public AbstractRedisReactiveCommands(StatefulConnection<K, V> connection, RedisC
this.tracingEnabled = clientResources.tracing().isEnabled();
}

private EventExecutorGroup getScheduler() {

if (this.scheduler != null) {
return this.scheduler;
}

synchronized (mutex) {

EventExecutorGroup scheduler = ImmediateEventExecutor.INSTANCE;

if (connection.getOptions().isPublishOnScheduler()) {
scheduler = connection.getResources().eventExecutorGroup();
}

return this.scheduler = scheduler;
}
}

@Override
public Mono<Long> append(K key, V value) {
return createMono(() -> commandBuilder.append(key, value));
Expand Down Expand Up @@ -380,10 +402,10 @@ private <T> Flux<T> createFlux(Supplier<RedisCommand<K, V, T>> commandSupplier,
if (tracingEnabled) {

return withTraceContext().flatMapMany(
it -> Flux.from(new RedisPublisher<>(decorate(commandSupplier, it), connection, dissolve)));
it -> Flux.from(new RedisPublisher<>(decorate(commandSupplier, it), connection, dissolve, getScheduler())));
}

return Flux.from(new RedisPublisher<>(commandSupplier, connection, dissolve));
return Flux.from(new RedisPublisher<>(commandSupplier, connection, dissolve, getScheduler()));
}

private Mono<TraceContext> withTraceContext() {
Expand All @@ -402,10 +424,11 @@ public <T> Mono<T> createMono(Supplier<RedisCommand<K, V, T>> commandSupplier) {
if (tracingEnabled) {

return withTraceContext().flatMap(
it -> Mono.from(new RedisPublisher<>(decorate(commandSupplier, it), connection, false)));
it -> Mono.from(new RedisPublisher<>(decorate(commandSupplier, it), connection, false, getScheduler()
.next())));
}

return Mono.from(new RedisPublisher<>(commandSupplier, connection, false));
return Mono.from(new RedisPublisher<>(commandSupplier, connection, false, getScheduler().next()));
}

private <T> Supplier<RedisCommand<K, V, T>> decorate(Supplier<RedisCommand<K, V, T>> commandSupplier,
Expand Down
42 changes: 42 additions & 0 deletions src/main/java/io/lettuce/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.io.Serializable;

import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.resource.ClientResources;

/**
* Client Options to control the behavior of {@link RedisClient}.
Expand All @@ -30,6 +31,7 @@ public class ClientOptions implements Serializable {
public static final boolean DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION = false;
public static final boolean DEFAULT_AUTO_RECONNECT = true;
public static final boolean DEFAULT_CANCEL_CMD_RECONNECT_FAIL = false;
public static final boolean DEFAULT_PUBLISH_ON_SCHEDULER = false;
public static final boolean DEFAULT_SUSPEND_RECONNECT_PROTO_FAIL = false;
public static final int DEFAULT_REQUEST_QUEUE_SIZE = Integer.MAX_VALUE;
public static final DisconnectedBehavior DEFAULT_DISCONNECTED_BEHAVIOR = DisconnectedBehavior.DEFAULT;
Expand All @@ -40,6 +42,7 @@ public class ClientOptions implements Serializable {
private final boolean pingBeforeActivateConnection;
private final boolean autoReconnect;
private final boolean cancelCommandsOnReconnectFailure;
private final boolean publishOnScheduler;
private final boolean suspendReconnectOnProtocolFailure;
private final int requestQueueSize;
private final DisconnectedBehavior disconnectedBehavior;
Expand All @@ -51,6 +54,7 @@ public class ClientOptions implements Serializable {
protected ClientOptions(Builder builder) {
this.pingBeforeActivateConnection = builder.pingBeforeActivateConnection;
this.cancelCommandsOnReconnectFailure = builder.cancelCommandsOnReconnectFailure;
this.publishOnScheduler = builder.publishOnScheduler;
this.autoReconnect = builder.autoReconnect;
this.suspendReconnectOnProtocolFailure = builder.suspendReconnectOnProtocolFailure;
this.requestQueueSize = builder.requestQueueSize;
Expand All @@ -65,6 +69,7 @@ protected ClientOptions(ClientOptions original) {
this.pingBeforeActivateConnection = original.isPingBeforeActivateConnection();
this.autoReconnect = original.isAutoReconnect();
this.cancelCommandsOnReconnectFailure = original.isCancelCommandsOnReconnectFailure();
this.publishOnScheduler = original.isPublishOnScheduler();
this.suspendReconnectOnProtocolFailure = original.isSuspendReconnectOnProtocolFailure();
this.requestQueueSize = original.getRequestQueueSize();
this.disconnectedBehavior = original.getDisconnectedBehavior();
Expand Down Expand Up @@ -110,6 +115,7 @@ public static class Builder {
private boolean pingBeforeActivateConnection = DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION;
private boolean autoReconnect = DEFAULT_AUTO_RECONNECT;
private boolean cancelCommandsOnReconnectFailure = DEFAULT_CANCEL_CMD_RECONNECT_FAIL;
private boolean publishOnScheduler = DEFAULT_PUBLISH_ON_SCHEDULER;
private boolean suspendReconnectOnProtocolFailure = DEFAULT_SUSPEND_RECONNECT_PROTO_FAIL;
private int requestQueueSize = DEFAULT_REQUEST_QUEUE_SIZE;
private DisconnectedBehavior disconnectedBehavior = DEFAULT_DISCONNECTED_BEHAVIOR;
Expand Down Expand Up @@ -168,6 +174,26 @@ public Builder cancelCommandsOnReconnectFailure(boolean cancelCommandsOnReconnec
return this;
}

/**
* Use a dedicated {@link reactor.core.scheduler.Scheduler} to emit reactive data signals. Enabling this option can be
* useful for reactive sequences that require a significant amount of processing with a single/a few Redis connections.
* <p/>
* A single Redis connection operates on a single thread. Operations that require a significant amount of processing can
* lead to a single-threaded-like behavior for all consumers of the Redis connection. When enabled, data signals will be
* emitted using a different thread served by {@link ClientResources#eventExecutorGroup()}. Defaults to {@literal false}
* , see {@link #DEFAULT_PUBLISH_ON_SCHEDULER}.
*
* @param publishOnScheduler true/false
* @return {@code this}
* @since 5.1.4
* @see org.reactivestreams.Subscriber#onNext(Object)
* @see ClientResources#eventExecutorGroup()
*/
public Builder publishOnScheduler(boolean publishOnScheduler) {
this.publishOnScheduler = publishOnScheduler;
return this;
}

/**
* Set the per-connection request queue size. The command invocation will lead to a {@link RedisException} if the queue
* size is exceeded. Setting the {@code requestQueueSize} to a lower value will lead earlier to exceptions during
Expand Down Expand Up @@ -295,6 +321,22 @@ public boolean isCancelCommandsOnReconnectFailure() {
return cancelCommandsOnReconnectFailure;
}

/**
* Use a dedicated {@link reactor.core.scheduler.Scheduler} to emit reactive data signals. Enabling this option can be
* useful for reactive sequences that require a significant amount of processing with a single/a few Redis connections.
* <p/>
* A single Redis connection operates on a single thread. Operations that require a significant amount of processing can
* lead to a single-threaded-like behavior for all consumers of the Redis connection. When enabled, data signals will be
* emitted using a different thread served by {@link ClientResources#eventExecutorGroup()}. Defaults to {@literal false} ,
* see {@link #DEFAULT_PUBLISH_ON_SCHEDULER}.
*
* @return {@literal true} to use a dedicated {@link reactor.core.scheduler.Scheduler}
* @since 5.1.4
*/
public boolean isPublishOnScheduler() {
return publishOnScheduler;
}

/**
* If this flag is {@literal true} the reconnect will be suspended on protocol errors. Protocol errors are errors while SSL
* negotiation or when PING before connect fails.
Expand Down
Loading

0 comments on commit e0e83b0

Please sign in to comment.