From 7be371404060c32780afe366ab9d1a86e89be3b4 Mon Sep 17 00:00:00 2001 From: Jim Brunner Date: Thu, 13 Jul 2023 21:33:24 +0000 Subject: [PATCH] Support for module-based read-only commands #2401 --- .../java/io/lettuce/core/ClientOptions.java | 49 +++++++++++++++++++ .../ClusterDistributionChannelWriter.java | 11 +++-- .../masterreplica/AutodiscoveryConnector.java | 4 +- .../MasterReplicaChannelWriter.java | 15 ++++-- .../core/masterreplica/SentinelConnector.java | 3 +- .../StaticMasterReplicaConnector.java | 4 +- ...terDistributionChannelWriterUnitTests.java | 27 +++++++--- .../MasterReplicaChannelWriterUnitTests.java | 37 +++++++++----- 8 files changed, 119 insertions(+), 31 deletions(-) diff --git a/src/main/java/io/lettuce/core/ClientOptions.java b/src/main/java/io/lettuce/core/ClientOptions.java index cce6c9ca93..4c60726240 100644 --- a/src/main/java/io/lettuce/core/ClientOptions.java +++ b/src/main/java/io/lettuce/core/ClientOptions.java @@ -18,11 +18,15 @@ import java.io.Serializable; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import io.lettuce.core.api.StatefulConnection; import io.lettuce.core.internal.LettuceAssert; import io.lettuce.core.protocol.DecodeBufferPolicies; import io.lettuce.core.protocol.DecodeBufferPolicy; +import io.lettuce.core.protocol.ProtocolKeyword; import io.lettuce.core.protocol.ProtocolVersion; import io.lettuce.core.resource.ClientResources; @@ -31,6 +35,7 @@ * * @author Mark Paluch * @author Gavin Cook + * @author Jim Brunner */ @SuppressWarnings("serial") public class ClientOptions implements Serializable { @@ -43,6 +48,8 @@ public class ClientOptions implements Serializable { public static final DisconnectedBehavior DEFAULT_DISCONNECTED_BEHAVIOR = DisconnectedBehavior.DEFAULT; + public static final Set DEFAULT_EXTRA_READONLY_COMMANDS = Collections.emptySet(); + public static final boolean DEFAULT_PUBLISH_ON_SCHEDULER = false; public static final boolean DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION = true; @@ -69,6 +76,8 @@ public class ClientOptions implements Serializable { private final DisconnectedBehavior disconnectedBehavior; + private final Set extraReadOnlyCommands; + private final boolean publishOnScheduler; private final boolean pingBeforeActivateConnection; @@ -87,11 +96,13 @@ public class ClientOptions implements Serializable { private final TimeoutOptions timeoutOptions; + protected ClientOptions(Builder builder) { this.autoReconnect = builder.autoReconnect; this.cancelCommandsOnReconnectFailure = builder.cancelCommandsOnReconnectFailure; this.decodeBufferPolicy = builder.decodeBufferPolicy; this.disconnectedBehavior = builder.disconnectedBehavior; + this.extraReadOnlyCommands = builder.extraReadOnlyCommands; this.publishOnScheduler = builder.publishOnScheduler; this.pingBeforeActivateConnection = builder.pingBeforeActivateConnection; this.protocolVersion = builder.protocolVersion; @@ -108,6 +119,7 @@ protected ClientOptions(ClientOptions original) { this.cancelCommandsOnReconnectFailure = original.isCancelCommandsOnReconnectFailure(); this.decodeBufferPolicy = original.getDecodeBufferPolicy(); this.disconnectedBehavior = original.getDisconnectedBehavior(); + this.extraReadOnlyCommands = original.getExtraReadOnlyCommands(); this.publishOnScheduler = original.isPublishOnScheduler(); this.pingBeforeActivateConnection = original.isPingBeforeActivateConnection(); this.protocolVersion = original.getConfiguredProtocolVersion(); @@ -160,6 +172,8 @@ public static class Builder { private DisconnectedBehavior disconnectedBehavior = DEFAULT_DISCONNECTED_BEHAVIOR; + private Set extraReadOnlyCommands = DEFAULT_EXTRA_READONLY_COMMANDS; + private boolean pingBeforeActivateConnection = DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION; private ProtocolVersion protocolVersion; @@ -257,6 +271,20 @@ public Builder disconnectedBehavior(DisconnectedBehavior disconnectedBehavior) { return this; } + /** + * Identifies extra commands (module commands) as read-only. Defaults to {@code emptySet}. See + * {@link #DEFAULT_EXTRA_READONLY_COMMANDS}. + * + * @param extraReadOnlyCommands must not be {@code null}. + * @return {@code this} + */ + public Builder extraReadOnlyCommands(Set extraReadOnlyCommands) { + + LettuceAssert.notNull(extraReadOnlyCommands, "extraReadOnlyCommands must not be null"); + this.extraReadOnlyCommands = Collections.unmodifiableSet(new HashSet<>(extraReadOnlyCommands)); + return this; + } + /** * Perform a lightweight {@literal PING} connection handshake when establishing a Redis connection. If {@code true} * (default is {@code true}, {@link #DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION}), every connection and reconnect will @@ -418,6 +446,7 @@ public ClientOptions.Builder mutate() { builder.autoReconnect(isAutoReconnect()).cancelCommandsOnReconnectFailure(isCancelCommandsOnReconnectFailure()) .decodeBufferPolicy(getDecodeBufferPolicy()).disconnectedBehavior(getDisconnectedBehavior()) + .extraReadOnlyCommands(getExtraReadOnlyCommands()) .publishOnScheduler(isPublishOnScheduler()).pingBeforeActivateConnection(isPingBeforeActivateConnection()) .protocolVersion(getConfiguredProtocolVersion()).requestQueueSize(getRequestQueueSize()) .scriptCharset(getScriptCharset()).socketOptions(getSocketOptions()).sslOptions(getSslOptions()) @@ -486,6 +515,26 @@ public DisconnectedBehavior getDisconnectedBehavior() { return disconnectedBehavior; } + /** + * Extra commands (module commands) which are identified as read-only. Defaults to {@code emptySet}. See + * {@link #DEFAULT_EXTRA_READONLY_COMMANDS}. + * + * @return the set of extra read-only commands + */ + public Set getExtraReadOnlyCommands() { + return extraReadOnlyCommands; + } + + /** + * Check if a command is identified as an extra read-only command. + * + * @param command + * @return {@code true} if the command is an extra read-only command. + */ + public boolean isExtraReadOnlyCommand(ProtocolKeyword command) { + return extraReadOnlyCommands.contains(command); + } + /** * Request queue size for a connection. This value applies per connection. The command invocation will throw a * {@link RedisException} if the queue size is exceeded and a new command is requested. Defaults to diff --git a/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java b/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java index 22142db25c..8371ed117c 100644 --- a/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java @@ -61,12 +61,15 @@ * Channel writer for cluster operation. This writer looks up the right partition by hash/slot for the operation. * * @author Mark Paluch + * @author Jim Brunner * @since 3.0 */ class ClusterDistributionChannelWriter implements RedisChannelWriter { private final RedisChannelWriter defaultWriter; + private final ClientOptions clientOptions; + private final ClusterEventListener clusterEventListener; private final int executionLimit; @@ -89,6 +92,7 @@ class ClusterDistributionChannelWriter implements RedisChannelWriter { } this.defaultWriter = defaultWriter; + this.clientOptions = clientOptions; this.clusterEventListener = clusterEventListener; } @@ -336,7 +340,7 @@ private static RedisChannelWriter getWriterToUse(RedisChannelWriter writer) { * @param commands {@link Collection} of {@link RedisCommand commands}. * @return the connectionIntent. */ - static ConnectionIntent getIntent(Collection> commands) { + ConnectionIntent getIntent(Collection> commands) { boolean w = false; boolean r = false; @@ -365,8 +369,9 @@ static ConnectionIntent getIntent(Collection> co return singleConnectionIntent; } - private static ConnectionIntent getIntent(ProtocolKeyword type) { - return ReadOnlyCommands.isReadOnlyCommand(type) ? ConnectionIntent.READ : ConnectionIntent.WRITE; + private ConnectionIntent getIntent(ProtocolKeyword type) { + return (ReadOnlyCommands.isReadOnlyCommand(type) || clientOptions.isExtraReadOnlyCommand(type)) + ? ConnectionIntent.READ : ConnectionIntent.WRITE; } static HostAndPort getMoveTarget(Partitions partitions, String errorMessage) { diff --git a/src/main/java/io/lettuce/core/masterreplica/AutodiscoveryConnector.java b/src/main/java/io/lettuce/core/masterreplica/AutodiscoveryConnector.java index a2340159c5..927924018e 100644 --- a/src/main/java/io/lettuce/core/masterreplica/AutodiscoveryConnector.java +++ b/src/main/java/io/lettuce/core/masterreplica/AutodiscoveryConnector.java @@ -39,6 +39,7 @@ * a single {@link RedisURI}. * * @author Mark Paluch + * @author Jim Brunner * @since 5.1 */ class AutodiscoveryConnector implements MasterReplicaConnector { @@ -125,8 +126,7 @@ private Mono> initializeConnection(Re connectionProvider.setKnownNodes(nodes); - MasterReplicaChannelWriter channelWriter = new MasterReplicaChannelWriter(connectionProvider, - redisClient.getResources()); + MasterReplicaChannelWriter channelWriter = new MasterReplicaChannelWriter(connectionProvider, redisClient.getResources(), redisClient.getOptions()); StatefulRedisMasterReplicaConnectionImpl connection = new StatefulRedisMasterReplicaConnectionImpl<>( channelWriter, codec, redisURI.getTimeout()); diff --git a/src/main/java/io/lettuce/core/masterreplica/MasterReplicaChannelWriter.java b/src/main/java/io/lettuce/core/masterreplica/MasterReplicaChannelWriter.java index ec34f96150..d902b59de0 100644 --- a/src/main/java/io/lettuce/core/masterreplica/MasterReplicaChannelWriter.java +++ b/src/main/java/io/lettuce/core/masterreplica/MasterReplicaChannelWriter.java @@ -18,8 +18,10 @@ import java.util.Collection; import java.util.concurrent.CompletableFuture; +import io.lettuce.core.ClientOptions; import io.lettuce.core.ReadFrom; import io.lettuce.core.RedisChannelWriter; +import io.lettuce.core.RedisClient; import io.lettuce.core.RedisException; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.internal.LettuceAssert; @@ -33,6 +35,7 @@ * Channel writer/dispatcher that dispatches commands based on the ConnectionIntent to different connections. * * @author Mark Paluch + * @author Jim Brunner */ class MasterReplicaChannelWriter implements RedisChannelWriter { @@ -40,14 +43,17 @@ class MasterReplicaChannelWriter implements RedisChannelWriter { private final ClientResources clientResources; + private final ClientOptions clientOptions; + private boolean closed = false; private boolean inTransaction; MasterReplicaChannelWriter(MasterReplicaConnectionProvider masterReplicaConnectionProvider, - ClientResources clientResources) { + ClientResources clientResources, ClientOptions clientOptions) { this.masterReplicaConnectionProvider = masterReplicaConnectionProvider; this.clientResources = clientResources; + this.clientOptions = clientOptions; } @Override @@ -162,7 +168,7 @@ private static void writeCommands(Collection> commands) { + ConnectionIntent getIntent(Collection> commands) { boolean w = false; boolean r = false; @@ -187,8 +193,9 @@ static ConnectionIntent getIntent(Collection> co return singleIntent; } - private static ConnectionIntent getIntent(ProtocolKeyword type) { - return ReadOnlyCommands.isReadOnlyCommand(type) ? ConnectionIntent.READ : ConnectionIntent.WRITE; + private ConnectionIntent getIntent(ProtocolKeyword type) { + return (ReadOnlyCommands.isReadOnlyCommand(type) || clientOptions.isExtraReadOnlyCommand(type)) + ? ConnectionIntent.READ : ConnectionIntent.WRITE; } @Override diff --git a/src/main/java/io/lettuce/core/masterreplica/SentinelConnector.java b/src/main/java/io/lettuce/core/masterreplica/SentinelConnector.java index 6d22f42673..178cbb915f 100644 --- a/src/main/java/io/lettuce/core/masterreplica/SentinelConnector.java +++ b/src/main/java/io/lettuce/core/masterreplica/SentinelConnector.java @@ -35,6 +35,7 @@ * {@link MasterReplicaConnector} to connect a Sentinel-managed Master/Replica setup using a Sentinel {@link RedisURI}. * * @author Mark Paluch + * @author Jim Brunner * @since 5.1 */ class SentinelConnector implements MasterReplicaConnector { @@ -83,7 +84,7 @@ private Mono> initializeConnection(Re connectionProvider.setKnownNodes(nodes); MasterReplicaChannelWriter channelWriter = new MasterReplicaChannelWriter(connectionProvider, - redisClient.getResources()) { + redisClient.getResources(), redisClient.getOptions()) { @Override public CompletableFuture closeAsync() { diff --git a/src/main/java/io/lettuce/core/masterreplica/StaticMasterReplicaConnector.java b/src/main/java/io/lettuce/core/masterreplica/StaticMasterReplicaConnector.java index b1ba3cc1d7..986341c4c4 100644 --- a/src/main/java/io/lettuce/core/masterreplica/StaticMasterReplicaConnector.java +++ b/src/main/java/io/lettuce/core/masterreplica/StaticMasterReplicaConnector.java @@ -35,6 +35,7 @@ * {@link RedisURI}. This connector determines roles and remains using only the provided endpoints. * * @author Mark Paluch + * @author Jim Brunner * @since 5.1 */ class StaticMasterReplicaConnector implements MasterReplicaConnector { @@ -81,8 +82,7 @@ private Mono> initializeConnection(Re connectionProvider.setKnownNodes(nodes); - MasterReplicaChannelWriter channelWriter = new MasterReplicaChannelWriter(connectionProvider, - redisClient.getResources()); + MasterReplicaChannelWriter channelWriter = new MasterReplicaChannelWriter(connectionProvider, redisClient.getResources(), redisClient.getOptions()); StatefulRedisMasterReplicaConnectionImpl connection = new StatefulRedisMasterReplicaConnectionImpl<>( channelWriter, codec, seedNode.getTimeout()); diff --git a/src/test/java/io/lettuce/core/cluster/ClusterDistributionChannelWriterUnitTests.java b/src/test/java/io/lettuce/core/cluster/ClusterDistributionChannelWriterUnitTests.java index eeb670924e..6fc4ceaab2 100644 --- a/src/test/java/io/lettuce/core/cluster/ClusterDistributionChannelWriterUnitTests.java +++ b/src/test/java/io/lettuce/core/cluster/ClusterDistributionChannelWriterUnitTests.java @@ -38,6 +38,7 @@ import io.lettuce.core.ClientOptions; import io.lettuce.core.CommandListenerWriter; +import io.lettuce.core.RedisChannelWriter; import io.lettuce.core.StatefulRedisConnectionImpl; import io.lettuce.core.TimeoutOptions; import io.lettuce.core.api.StatefulRedisConnection; @@ -60,6 +61,7 @@ * * @author Mark Paluch * @author koisyu + * @author Jim Brunner */ @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) @@ -74,6 +76,9 @@ class ClusterDistributionChannelWriterUnitTests { @Mock private ClientResources clientResources; + @Mock + private ClientOptions clientOptions; + @Mock private ClusterEventListener clusterEventListener; @@ -159,40 +164,48 @@ void shouldParseIPv6MovedTargetCorrectly() { @Test void shouldReturnIntentForWriteCommand() { + ClusterDistributionChannelWriter writer = new ClusterDistributionChannelWriter(clusterDistributionChannelWriter, clientOptions, clusterEventListener); + RedisCommand set = new Command<>(CommandType.SET, null); RedisCommand mset = new Command<>(CommandType.MSET, null); - assertThat(ClusterDistributionChannelWriter.getIntent(Arrays.asList(set, mset))).isEqualTo(ConnectionIntent.WRITE); + assertThat(writer.getIntent(Arrays.asList(set, mset))).isEqualTo(ConnectionIntent.WRITE); - assertThat(ClusterDistributionChannelWriter.getIntent(Collections.singletonList(set))).isEqualTo(ConnectionIntent.WRITE); + assertThat(writer.getIntent(Collections.singletonList(set))).isEqualTo(ConnectionIntent.WRITE); } @Test void shouldReturnDefaultIntentForNoCommands() { - assertThat(ClusterDistributionChannelWriter.getIntent(Collections.emptyList())).isEqualTo(ConnectionIntent.WRITE); + ClusterDistributionChannelWriter writer = new ClusterDistributionChannelWriter(clusterDistributionChannelWriter, clientOptions, clusterEventListener); + + assertThat(writer.getIntent(Collections.emptyList())).isEqualTo(ConnectionIntent.WRITE); } @Test void shouldReturnIntentForReadCommand() { + ClusterDistributionChannelWriter writer = new ClusterDistributionChannelWriter(clusterDistributionChannelWriter, clientOptions, clusterEventListener); + RedisCommand get = new Command<>(CommandType.GET, null); RedisCommand mget = new Command<>(CommandType.MGET, null); - assertThat(ClusterDistributionChannelWriter.getIntent(Arrays.asList(get, mget))).isEqualTo(ConnectionIntent.READ); + assertThat(writer.getIntent(Arrays.asList(get, mget))).isEqualTo(ConnectionIntent.READ); - assertThat(ClusterDistributionChannelWriter.getIntent(Collections.singletonList(get))).isEqualTo(ConnectionIntent.READ); + assertThat(writer.getIntent(Collections.singletonList(get))).isEqualTo(ConnectionIntent.READ); } @Test void shouldReturnIntentForMixedCommands() { + ClusterDistributionChannelWriter writer = new ClusterDistributionChannelWriter(clusterDistributionChannelWriter, clientOptions, clusterEventListener); + RedisCommand set = new Command<>(CommandType.SET, null); RedisCommand mget = new Command<>(CommandType.MGET, null); - assertThat(ClusterDistributionChannelWriter.getIntent(Arrays.asList(set, mget))).isEqualTo(ConnectionIntent.WRITE); + assertThat(writer.getIntent(Arrays.asList(set, mget))).isEqualTo(ConnectionIntent.WRITE); - assertThat(ClusterDistributionChannelWriter.getIntent(Collections.singletonList(set))).isEqualTo(ConnectionIntent.WRITE); + assertThat(writer.getIntent(Collections.singletonList(set))).isEqualTo(ConnectionIntent.WRITE); } @Test diff --git a/src/test/java/io/lettuce/core/masterreplica/MasterReplicaChannelWriterUnitTests.java b/src/test/java/io/lettuce/core/masterreplica/MasterReplicaChannelWriterUnitTests.java index b33d038079..ea34ac684a 100644 --- a/src/test/java/io/lettuce/core/masterreplica/MasterReplicaChannelWriterUnitTests.java +++ b/src/test/java/io/lettuce/core/masterreplica/MasterReplicaChannelWriterUnitTests.java @@ -31,6 +31,7 @@ import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; +import io.lettuce.core.ClientOptions; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.codec.StringCodec; import io.lettuce.core.output.StatusOutput; @@ -42,6 +43,7 @@ /** * @author Mark Paluch + * @author Jim Brunner */ @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) @@ -53,59 +55,70 @@ class MasterReplicaChannelWriterUnitTests { @Mock private ClientResources clientResources; + @Mock + private ClientOptions clientOptions; + @Mock private StatefulRedisConnection connection; @Test void shouldReturnIntentForWriteCommand() { + MasterReplicaChannelWriter writer = new MasterReplicaChannelWriter(connectionProvider, clientResources, clientOptions); + RedisCommand set = new Command<>(CommandType.SET, null); RedisCommand mset = new Command<>(CommandType.MSET, null); - assertThat(MasterReplicaChannelWriter.getIntent(Arrays.asList(set, mset))) + assertThat(writer.getIntent(Arrays.asList(set, mset))) .isEqualTo(ConnectionIntent.WRITE); - assertThat(MasterReplicaChannelWriter.getIntent(Collections.singletonList(set))) + assertThat(writer.getIntent(Collections.singletonList(set))) .isEqualTo(ConnectionIntent.WRITE); } @Test void shouldReturnDefaultIntentForNoCommands() { - assertThat(MasterReplicaChannelWriter.getIntent(Collections.emptyList())) + MasterReplicaChannelWriter writer = new MasterReplicaChannelWriter(connectionProvider, clientResources, clientOptions); + + assertThat(writer.getIntent(Collections.emptyList())) .isEqualTo(ConnectionIntent.WRITE); } @Test void shouldReturnIntentForReadCommand() { + MasterReplicaChannelWriter writer = new MasterReplicaChannelWriter(connectionProvider, clientResources, clientOptions); + RedisCommand get = new Command<>(CommandType.GET, null); RedisCommand mget = new Command<>(CommandType.MGET, null); - assertThat(MasterReplicaChannelWriter.getIntent(Arrays.asList(get, mget))) + assertThat(writer.getIntent(Arrays.asList(get, mget))) .isEqualTo(ConnectionIntent.READ); - assertThat(MasterReplicaChannelWriter.getIntent(Collections.singletonList(get))) + assertThat(writer.getIntent(Collections.singletonList(get))) .isEqualTo(ConnectionIntent.READ); } @Test void shouldReturnIntentForMixedCommands() { + MasterReplicaChannelWriter writer = new MasterReplicaChannelWriter(connectionProvider, clientResources, clientOptions); + RedisCommand set = new Command<>(CommandType.SET, null); RedisCommand mget = new Command<>(CommandType.MGET, null); - assertThat(MasterReplicaChannelWriter.getIntent(Arrays.asList(set, mget))) + assertThat(writer.getIntent(Arrays.asList(set, mget))) .isEqualTo(ConnectionIntent.WRITE); - assertThat(MasterReplicaChannelWriter.getIntent(Collections.singletonList(set))) + assertThat(writer.getIntent(Collections.singletonList(set))) .isEqualTo(ConnectionIntent.WRITE); } @Test void shouldBindTransactionsToMaster() { - MasterReplicaChannelWriter writer = new MasterReplicaChannelWriter(connectionProvider, clientResources); + MasterReplicaChannelWriter writer = new MasterReplicaChannelWriter(connectionProvider, clientResources, clientOptions); when(connectionProvider.getConnectionAsync(any(ConnectionIntent.class))) .thenReturn(CompletableFuture.completedFuture(connection)); @@ -120,7 +133,7 @@ void shouldBindTransactionsToMaster() { @Test void shouldBindTransactionsToMasterInBatch() { - MasterReplicaChannelWriter writer = new MasterReplicaChannelWriter(connectionProvider, clientResources); + MasterReplicaChannelWriter writer = new MasterReplicaChannelWriter(connectionProvider, clientResources, clientOptions); when(connectionProvider.getConnectionAsync(any(ConnectionIntent.class))) .thenReturn(CompletableFuture.completedFuture(connection)); @@ -136,7 +149,7 @@ void shouldBindTransactionsToMasterInBatch() { @Test void shouldDeriveIntentFromCommandTypeAfterTransaction() { - MasterReplicaChannelWriter writer = new MasterReplicaChannelWriter(connectionProvider, clientResources); + MasterReplicaChannelWriter writer = new MasterReplicaChannelWriter(connectionProvider, clientResources, clientOptions); when(connectionProvider.getConnectionAsync(any(ConnectionIntent.class))) .thenReturn(CompletableFuture.completedFuture(connection)); @@ -152,7 +165,7 @@ void shouldDeriveIntentFromCommandTypeAfterTransaction() { @Test void shouldDeriveIntentFromCommandTypeAfterDiscardedTransaction() { - MasterReplicaChannelWriter writer = new MasterReplicaChannelWriter(connectionProvider, clientResources); + MasterReplicaChannelWriter writer = new MasterReplicaChannelWriter(connectionProvider, clientResources, clientOptions); when(connectionProvider.getConnectionAsync(any(ConnectionIntent.class))) .thenReturn(CompletableFuture.completedFuture(connection)); @@ -168,7 +181,7 @@ void shouldDeriveIntentFromCommandTypeAfterDiscardedTransaction() { @Test void shouldDeriveIntentFromCommandBatchTypeAfterDiscardedTransaction() { - MasterReplicaChannelWriter writer = new MasterReplicaChannelWriter(connectionProvider, clientResources); + MasterReplicaChannelWriter writer = new MasterReplicaChannelWriter(connectionProvider, clientResources, clientOptions); when(connectionProvider.getConnectionAsync(any(ConnectionIntent.class))) .thenReturn(CompletableFuture.completedFuture(connection));