From 78f40fb30180e7812d452337acb3e44070d65feb Mon Sep 17 00:00:00 2001 From: sokomishalov Date: Mon, 21 Sep 2020 02:49:26 +0300 Subject: [PATCH] Implement command listeners API #1382 Original pull request: #1424. --- .../java/io/lettuce/core/CommandListener.java | 52 +++++ .../core/CommandListenerMulticaster.java | 61 ++++++ .../lettuce/core/CommandListenerWriter.java | 202 ++++++++++++++++++ .../java/io/lettuce/core/RedisClient.java | 9 + .../core/cluster/RedisClusterClient.java | 12 ++ .../core/models/events/CommandBaseEvent.java | 51 +++++ .../models/events/CommandFailedEvent.java | 43 ++++ .../models/events/CommandStartedEvent.java | 44 ++++ .../models/events/CommandSucceededEvent.java | 61 ++++++ .../core/resource/ClientResources.java | 18 ++ .../core/resource/DefaultClientResources.java | 30 ++- .../io/lettuce/core/CommandListenerTest.java | 99 +++++++++ 12 files changed, 681 insertions(+), 1 deletion(-) create mode 100644 src/main/java/io/lettuce/core/CommandListener.java create mode 100644 src/main/java/io/lettuce/core/CommandListenerMulticaster.java create mode 100644 src/main/java/io/lettuce/core/CommandListenerWriter.java create mode 100644 src/main/java/io/lettuce/core/models/events/CommandBaseEvent.java create mode 100644 src/main/java/io/lettuce/core/models/events/CommandFailedEvent.java create mode 100644 src/main/java/io/lettuce/core/models/events/CommandStartedEvent.java create mode 100644 src/main/java/io/lettuce/core/models/events/CommandSucceededEvent.java create mode 100644 src/test/java/io/lettuce/core/CommandListenerTest.java diff --git a/src/main/java/io/lettuce/core/CommandListener.java b/src/main/java/io/lettuce/core/CommandListener.java new file mode 100644 index 0000000000..fbf54263fb --- /dev/null +++ b/src/main/java/io/lettuce/core/CommandListener.java @@ -0,0 +1,52 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.lettuce.core; + +import io.lettuce.core.models.events.CommandFailedEvent; +import io.lettuce.core.models.events.CommandStartedEvent; +import io.lettuce.core.models.events.CommandSucceededEvent; + +/** + * @author Mikhael Sokolov + */ +public interface CommandListener { + + /** + * Listener for command started events. + * + * @param event the event + */ + default void commandStarted(CommandStartedEvent event) { + } + + /** + * Listener for command completed events + * + * @param event the event + */ + default void commandSucceeded(CommandSucceededEvent event) { + } + + /** + * Listener for command failure events + * + * @param event the event + */ + default void commandFailed(CommandFailedEvent event) { + } + +} diff --git a/src/main/java/io/lettuce/core/CommandListenerMulticaster.java b/src/main/java/io/lettuce/core/CommandListenerMulticaster.java new file mode 100644 index 0000000000..3ddc2d955b --- /dev/null +++ b/src/main/java/io/lettuce/core/CommandListenerMulticaster.java @@ -0,0 +1,61 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.lettuce.core; + +import io.lettuce.core.models.events.CommandFailedEvent; +import io.lettuce.core.models.events.CommandStartedEvent; +import io.lettuce.core.models.events.CommandSucceededEvent; + +import java.util.List; + +/** + * Wraps multiple command listeners into one multicaster + * + * @author Mikhael Sokolov + */ +public class CommandListenerMulticaster implements CommandListener { + private final List listeners; + + public CommandListenerMulticaster(List listeners) { + this.listeners = listeners; + } + + @Override + public void commandStarted(CommandStartedEvent event) { + for (CommandListener listener : listeners) { + listener.commandStarted(event); + } + } + + @Override + public void commandSucceeded(CommandSucceededEvent event) { + for (CommandListener listener : listeners) { + listener.commandSucceeded(event); + } + } + + @Override + public void commandFailed(CommandFailedEvent event) { + for (CommandListener listener : listeners) { + listener.commandFailed(event); + } + } + + public List getListeners() { + return listeners; + } +} diff --git a/src/main/java/io/lettuce/core/CommandListenerWriter.java b/src/main/java/io/lettuce/core/CommandListenerWriter.java new file mode 100644 index 0000000000..9e281db343 --- /dev/null +++ b/src/main/java/io/lettuce/core/CommandListenerWriter.java @@ -0,0 +1,202 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.lettuce.core; + +import io.lettuce.core.internal.LettuceAssert; +import io.lettuce.core.models.events.CommandFailedEvent; +import io.lettuce.core.models.events.CommandStartedEvent; +import io.lettuce.core.models.events.CommandSucceededEvent; +import io.lettuce.core.output.CommandOutput; +import io.lettuce.core.protocol.*; +import io.lettuce.core.resource.ClientResources; +import io.netty.buffer.ByteBuf; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Writer for command listeners. + * + * @author Mikhael Sokolov + */ +public class CommandListenerWriter implements RedisChannelWriter { + + private final CommandListener listener; + private final RedisChannelWriter delegate; + + public CommandListenerWriter(RedisChannelWriter delegate, CommandListener listener) { + this.delegate = delegate; + this.listener = listener; + } + + /** + * Check whether {@link ClientResources} is configured to listen commands. + * + * @param clientResources must not be {@code null}. + * @return {@code true} if {@link ClientResources} are configured to listen commands. + */ + public static boolean isSupported(ClientResources clientResources) { + LettuceAssert.notNull(clientResources, "ClientResources must not be null"); + + return !clientResources.commandListeners().isEmpty(); + } + + + @Override + public RedisCommand write(RedisCommand command) { + long now = System.currentTimeMillis(); + CommandStartedEvent startedEvent = new CommandStartedEvent<>(command, now); + listener.commandStarted(startedEvent); + + return delegate.write(new RedisCommandListenerCommand<>(command, startedEvent.getContext(), now, listener)); + } + + @Override + public Collection> write(Collection> redisCommands) { + List> listenedCommands = new ArrayList<>(); + for (RedisCommand redisCommand : redisCommands) { + long now = System.currentTimeMillis(); + CommandStartedEvent startedEvent = new CommandStartedEvent<>(redisCommand, now); + listener.commandStarted(startedEvent); + RedisCommandListenerCommand command = new RedisCommandListenerCommand<>(redisCommand, startedEvent.getContext(), now, listener); + listenedCommands.add(command); + } + + return delegate.write(listenedCommands); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public CompletableFuture closeAsync() { + return delegate.closeAsync(); + } + + @Override + @SuppressWarnings("deprecation") + public void reset() { + delegate.reset(); + } + + @Override + public void setConnectionFacade(ConnectionFacade connection) { + delegate.setConnectionFacade(connection); + } + + @Override + public void setAutoFlushCommands(boolean autoFlush) { + delegate.setAutoFlushCommands(autoFlush); + } + + @Override + public void flushCommands() { + delegate.flushCommands(); + } + + @Override + public ClientResources getClientResources() { + return delegate.getClientResources(); + } + + + private static class RedisCommandListenerCommand implements RedisCommand, DecoratedCommand { + + private final RedisCommand command; + private final Map context; + private final long startedAt; + private final CommandListener listener; + + public RedisCommandListenerCommand(RedisCommand command, Map context, long startedAt, CommandListener listener) { + this.command = command; + this.context = context; + this.startedAt = startedAt; + this.listener = listener; + } + + @Override + public RedisCommand getDelegate() { + return command; + } + + @Override + public CommandOutput getOutput() { + return command.getOutput(); + } + + @Override + public void complete() { + if (getOutput().hasError()) { + CommandFailedEvent failedEvent = new CommandFailedEvent<>(command, context, new RedisCommandExecutionException(getOutput().getError())); + listener.commandFailed(failedEvent); + } else { + long now = System.currentTimeMillis(); + CommandSucceededEvent succeededEvent = new CommandSucceededEvent<>(command, context, startedAt, now); + listener.commandSucceeded(succeededEvent); + } + command.complete(); + } + + @Override + public void cancel() { + command.cancel(); + } + + @Override + public CommandArgs getArgs() { + return command.getArgs(); + } + + @Override + public boolean completeExceptionally(Throwable throwable) { + CommandFailedEvent failedEvent = new CommandFailedEvent<>(command, context, throwable); + listener.commandFailed(failedEvent); + + return command.completeExceptionally(throwable); + } + + @Override + public ProtocolKeyword getType() { + return command.getType(); + } + + @Override + public void encode(ByteBuf buf) { + command.encode(buf); + } + + @Override + public boolean isCancelled() { + return command.isCancelled(); + } + + @Override + public boolean isDone() { + return command.isDone(); + } + + @Override + public void setOutput(CommandOutput output) { + command.setOutput(output); + } + } +} diff --git a/src/main/java/io/lettuce/core/RedisClient.java b/src/main/java/io/lettuce/core/RedisClient.java index 13393b172c..2bed025dc7 100644 --- a/src/main/java/io/lettuce/core/RedisClient.java +++ b/src/main/java/io/lettuce/core/RedisClient.java @@ -273,6 +273,9 @@ private ConnectionFuture> connectStandalone if (CommandExpiryWriter.isSupported(getOptions())) { writer = new CommandExpiryWriter(writer, getOptions(), getResources()); } + if (CommandListenerWriter.isSupported(getResources())) { + writer = new CommandListenerWriter(writer, new CommandListenerMulticaster(getResources().commandListeners())); + } StatefulRedisConnectionImpl connection = newStatefulRedisConnection(writer, endpoint, codec, timeout); ConnectionFuture> future = connectStatefulAsync(connection, endpoint, redisURI, @@ -401,6 +404,9 @@ private ConnectionFuture> connectPubS if (CommandExpiryWriter.isSupported(getOptions())) { writer = new CommandExpiryWriter(writer, getOptions(), getResources()); } + if (CommandListenerWriter.isSupported(getResources())) { + writer = new CommandListenerWriter(writer, new CommandListenerMulticaster(getResources().commandListeners())); + } StatefulRedisPubSubConnectionImpl connection = newStatefulRedisPubSubConnection(endpoint, writer, codec, timeout); @@ -564,6 +570,9 @@ private ConnectionFuture> doConnect if (CommandExpiryWriter.isSupported(getOptions())) { writer = new CommandExpiryWriter(writer, getOptions(), getResources()); } + if (CommandListenerWriter.isSupported(getResources())) { + writer = new CommandListenerWriter(writer, new CommandListenerMulticaster(getResources().commandListeners())); + } StatefulRedisSentinelConnectionImpl connection = newStatefulRedisSentinelConnection(writer, codec, timeout); ConnectionState state = connection.getConnectionState(); diff --git a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java index d8af7afb11..09aaab9fe5 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java +++ b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java @@ -531,6 +531,9 @@ ConnectionFuture> connectToNodeAsync(RedisC if (CommandExpiryWriter.isSupported(getClusterClientOptions())) { writer = new CommandExpiryWriter(writer, getClusterClientOptions(), getResources()); } + if (CommandListenerWriter.isSupported(getResources())) { + writer = new CommandListenerWriter(writer, new CommandListenerMulticaster(getResources().commandListeners())); + } StatefulRedisConnectionImpl connection = new StatefulRedisConnectionImpl<>(writer, endpoint, codec, getDefaultTimeout()); @@ -573,6 +576,9 @@ ConnectionFuture> connectPubSubToNode if (CommandExpiryWriter.isSupported(getClusterClientOptions())) { writer = new CommandExpiryWriter(writer, getClusterClientOptions(), getResources()); } + if (CommandListenerWriter.isSupported(getResources())) { + writer = new CommandListenerWriter(writer, new CommandListenerMulticaster(getResources().commandListeners())); + } StatefulRedisPubSubConnectionImpl connection = new StatefulRedisPubSubConnectionImpl<>(endpoint, writer, codec, getDefaultTimeout()); @@ -612,6 +618,9 @@ private CompletableFuture> connectCl if (CommandExpiryWriter.isSupported(getClusterClientOptions())) { writer = new CommandExpiryWriter(writer, getClusterClientOptions(), getResources()); } + if (CommandListenerWriter.isSupported(getResources())) { + writer = new CommandListenerWriter(writer, new CommandListenerMulticaster(getResources().commandListeners())); + } ClusterDistributionChannelWriter clusterWriter = new ClusterDistributionChannelWriter(getClusterClientOptions(), writer, topologyRefreshScheduler); @@ -695,6 +704,9 @@ private CompletableFuture> con if (CommandExpiryWriter.isSupported(getClusterClientOptions())) { writer = new CommandExpiryWriter(writer, getClusterClientOptions(), getResources()); } + if (CommandListenerWriter.isSupported(getResources())) { + writer = new CommandListenerWriter(writer, new CommandListenerMulticaster(getResources().commandListeners())); + } ClusterDistributionChannelWriter clusterWriter = new ClusterDistributionChannelWriter(getClusterClientOptions(), writer, topologyRefreshScheduler); diff --git a/src/main/java/io/lettuce/core/models/events/CommandBaseEvent.java b/src/main/java/io/lettuce/core/models/events/CommandBaseEvent.java new file mode 100644 index 0000000000..1a26e0429e --- /dev/null +++ b/src/main/java/io/lettuce/core/models/events/CommandBaseEvent.java @@ -0,0 +1,51 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.lettuce.core.models.events; + +import io.lettuce.core.protocol.RedisCommand; + +import java.util.Map; + +/** + * Base class for redis commands events. + * + * @author Mikhael Sokolov + */ +public abstract class CommandBaseEvent { + + private final RedisCommand command; + private final Map context; + + protected CommandBaseEvent(RedisCommand command, Map context) { + this.command = command; + this.context = context; + } + + /** + * @return command + */ + public RedisCommand getCommand() { + return command; + } + + /** + * @return shared context + */ + public Map getContext() { + return context; + } +} diff --git a/src/main/java/io/lettuce/core/models/events/CommandFailedEvent.java b/src/main/java/io/lettuce/core/models/events/CommandFailedEvent.java new file mode 100644 index 0000000000..ce73a2136e --- /dev/null +++ b/src/main/java/io/lettuce/core/models/events/CommandFailedEvent.java @@ -0,0 +1,43 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.lettuce.core.models.events; + +import io.lettuce.core.protocol.RedisCommand; + +import java.util.Map; + +/** + * Event for failed command. + * + * @author Mikhael Sokolov + */ +public class CommandFailedEvent extends CommandBaseEvent { + + private final Throwable cause; + + public CommandFailedEvent(RedisCommand command, Map context, Throwable cause) { + super(command, context); + this.cause = cause; + } + + /** + * @return cause of failure + */ + public Throwable getCause() { + return cause; + } +} diff --git a/src/main/java/io/lettuce/core/models/events/CommandStartedEvent.java b/src/main/java/io/lettuce/core/models/events/CommandStartedEvent.java new file mode 100644 index 0000000000..2b9bc9db93 --- /dev/null +++ b/src/main/java/io/lettuce/core/models/events/CommandStartedEvent.java @@ -0,0 +1,44 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.lettuce.core.models.events; + +import io.lettuce.core.protocol.RedisCommand; + +import java.time.Instant; +import java.util.HashMap; + +/** + * Event for started command. + * + * @author Mikhael Sokolov + */ +public class CommandStartedEvent extends CommandBaseEvent { + + private final long startedAt; + + public CommandStartedEvent(RedisCommand command, long startedAt) { + super(command, new HashMap<>()); + this.startedAt = startedAt; + } + + /** + * @return datetime when the command was started + */ + public Instant getStartedAt() { + return Instant.ofEpochMilli(startedAt); + } +} diff --git a/src/main/java/io/lettuce/core/models/events/CommandSucceededEvent.java b/src/main/java/io/lettuce/core/models/events/CommandSucceededEvent.java new file mode 100644 index 0000000000..3f482a7721 --- /dev/null +++ b/src/main/java/io/lettuce/core/models/events/CommandSucceededEvent.java @@ -0,0 +1,61 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.lettuce.core.models.events; + +import io.lettuce.core.protocol.RedisCommand; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; + +/** + * Event for succeeded command. + * + * @author Mikhael Sokolov + */ +public class CommandSucceededEvent extends CommandBaseEvent { + + private final long startedAt; + private final long succeededAt; + + public CommandSucceededEvent(RedisCommand command, Map context, long startedAt, long succeededAt) { + super(command, context); + this.startedAt = startedAt; + this.succeededAt = succeededAt; + } + + /** + * @return execution duration + */ + public Duration getExecuteDuration() { + return Duration.ofMillis(succeededAt - startedAt); + } + + /** + * @return datetime when the command was started + */ + public Instant getStartedAt() { + return Instant.ofEpochMilli(startedAt); + } + + /** + * @return datetime in millis when the command was succeeded + */ + public Instant getSucceededAt() { + return Instant.ofEpochMilli(succeededAt); + } +} diff --git a/src/main/java/io/lettuce/core/resource/ClientResources.java b/src/main/java/io/lettuce/core/resource/ClientResources.java index b5284ce701..6e3346886c 100644 --- a/src/main/java/io/lettuce/core/resource/ClientResources.java +++ b/src/main/java/io/lettuce/core/resource/ClientResources.java @@ -17,7 +17,9 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import java.util.List; +import io.lettuce.core.CommandListener; import io.lettuce.core.event.EventBus; import io.lettuce.core.event.EventPublisherOptions; import io.lettuce.core.metrics.CommandLatencyCollector; @@ -240,6 +242,15 @@ default Builder commandLatencyCollector(CommandLatencyCollector commandLatencyCo */ Builder tracing(Tracing tracing); + /** + * Sets the {@link CommandListener} instances to listen redis command execution. + * + * @param listeners - redis command listeners, must not be {@code null}. + * @return this + * @since 6.1 + */ + Builder commandListeners(List listeners); + /** * @return a new instance of {@link DefaultClientResources}. */ @@ -384,4 +395,11 @@ default Builder commandLatencyCollector(CommandLatencyCollector commandLatencyCo */ Tracing tracing(); + /** + * Return the {@link CommandListener} instances to listen redis command execution. + * + * @return list of redis command listeners. + * @since 6.1 + */ + List commandListeners(); } diff --git a/src/main/java/io/lettuce/core/resource/DefaultClientResources.java b/src/main/java/io/lettuce/core/resource/DefaultClientResources.java index a50d975e4b..72b97d8e8a 100644 --- a/src/main/java/io/lettuce/core/resource/DefaultClientResources.java +++ b/src/main/java/io/lettuce/core/resource/DefaultClientResources.java @@ -15,10 +15,13 @@ */ package io.lettuce.core.resource; +import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import reactor.core.scheduler.Schedulers; +import io.lettuce.core.CommandListener; import io.lettuce.core.event.DefaultEventBus; import io.lettuce.core.event.DefaultEventPublisherOptions; import io.lettuce.core.event.EventBus; @@ -136,6 +139,8 @@ public class DefaultClientResources implements ClientResources { private final Tracing tracing; + private final List commandListeners; + private volatile boolean shutdownCalled = false; protected DefaultClientResources(Builder builder) { @@ -232,6 +237,7 @@ protected DefaultClientResources(Builder builder) { reconnectDelay = builder.reconnectDelay; nettyCustomizer = builder.nettyCustomizer; tracing = builder.tracing; + commandListeners = builder.commandListeners; if (!sharedTimer && timer instanceof HashedWheelTimer) { ((HashedWheelTimer) timer).start(); @@ -297,6 +303,8 @@ public static class Builder implements ClientResources.Builder { private Tracing tracing = Tracing.disabled(); + private List commandListeners = Collections.emptyList(); + private Builder() { } @@ -559,7 +567,22 @@ public Builder tracing(Tracing tracing) { } /** + * Sets the {@link CommandListener} instances to listen redis command execution. * + * @param listeners - redis command listeners, must not be {@code null}. + * @return this + * @since 6.1 + */ + @Override + public ClientResources.Builder commandListeners(List listeners) { + LettuceAssert.notNull(listeners, "Redis command listeners must not be null"); + + this.commandListeners = Collections.unmodifiableList(listeners); + + return this; + } + + /** * @return a new instance of {@link DefaultClientResources}. */ @Override @@ -593,7 +616,8 @@ public DefaultClientResources.Builder mutate() { .commandLatencyPublisherOptions(commandLatencyPublisherOptions()).dnsResolver(dnsResolver()) .eventBus(eventBus()).eventExecutorGroup(eventExecutorGroup()).reconnectDelay(reconnectDelay) .socketAddressResolver(socketAddressResolver()).nettyCustomizer(nettyCustomizer()).timer(timer()) - .tracing(tracing()); + .tracing(tracing()) + .commandListeners(commandListeners()); builder.sharedCommandLatencyCollector = sharedEventLoopGroupProvider; builder.sharedEventExecutor = sharedEventExecutor; @@ -732,4 +756,8 @@ public Tracing tracing() { return tracing; } + @Override + public List commandListeners() { + return commandListeners; + } } diff --git a/src/test/java/io/lettuce/core/CommandListenerTest.java b/src/test/java/io/lettuce/core/CommandListenerTest.java new file mode 100644 index 0000000000..ea9fafde3c --- /dev/null +++ b/src/test/java/io/lettuce/core/CommandListenerTest.java @@ -0,0 +1,99 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.lettuce.core; + +import io.lettuce.core.api.sync.RedisCommands; +import io.lettuce.core.models.events.CommandFailedEvent; +import io.lettuce.core.models.events.CommandStartedEvent; +import io.lettuce.core.models.events.CommandSucceededEvent; +import io.lettuce.core.resource.ClientResources; +import io.lettuce.test.LettuceExtension; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static io.lettuce.core.ScriptOutputType.BOOLEAN; +import static java.util.Collections.singletonList; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Mikhael Sokolov + */ +@ExtendWith(LettuceExtension.class) +@SuppressWarnings({"rawtypes", "unchecked"}) +public class CommandListenerTest extends TestSupport { + + private RedisURI uri; + + @BeforeEach + void setUp() { + this.uri = RedisURI.create(host, port); + } + + @Test + void shouldWorkRedisCommandListener() { + List startedEvents = Collections.synchronizedList(new ArrayList<>()); + List succeededEvents = Collections.synchronizedList(new ArrayList<>()); + List failedEvents = Collections.synchronizedList(new ArrayList<>()); + + CommandListener listener = new CommandListener() { + @Override + public void commandStarted(CommandStartedEvent event) { + event.getContext().put(key, value); + startedEvents.add(event); + + assertThat(event.getStartedAt()).isNotNull(); + } + + @Override + public void commandSucceeded(CommandSucceededEvent event) { + succeededEvents.add(event); + + assertThat(event.getContext().get(key)).isEqualTo(value); + assertThat(event.getExecuteDuration()).isPositive(); + } + + @Override + public void commandFailed(CommandFailedEvent event) { + failedEvents.add(event); + + assertThat(event.getContext().get(key)).isEqualTo(value); + assertThat(event.getCause()).isInstanceOf(RedisCommandExecutionException.class); + } + }; + + ClientResources resources = ClientResources.builder().commandListeners(singletonList(listener)).build(); + RedisClient client = RedisClient.create(resources, uri); + + RedisCommands sync = client.connect().sync(); + + sync.set(key, value); + sync.get(key); + try { + sync.eval("bad script", BOOLEAN); + } catch (RedisCommandExecutionException ignored) { + } + + assertThat(startedEvents).hasSize(3); + assertThat(succeededEvents).hasSize(2); + assertThat(failedEvents).hasSize(1); + } +}