From 4c6f0bedbe41f49cc053de8a4a4613aa1df52a1f Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Mon, 29 May 2023 08:57:23 +0200 Subject: [PATCH] Allows receiving the channel name when using Redis Pub/Sub When using Redis Pub/Sub, receiving the message channel was impossible. This was a problem when subscribing to multiple channels or using regexp. This commit adds methods for receiving a bi-consumer to get the channel name and payload. When not possible (for method returning a Multi), a dedicated structure is introduced (RedisPubSubMessage). --- .../datasource/pubsub/PubSubCommands.java | 78 ++++++++++ .../pubsub/ReactivePubSubCommands.java | 109 ++++++++++++++ .../datasource/pubsub/RedisPubSubMessage.java | 22 +++ .../BlockingPubSubCommandsImpl.java | 39 +++++ .../datasource/DefaultRedisPubSubMessage.java | 24 +++ .../ReactivePubSubCommandsImpl.java | 137 +++++++++++++++--- .../redis/datasource/PubSubCommandsTest.java | 131 ++++++++++++++++- 7 files changed, 516 insertions(+), 24 deletions(-) create mode 100644 extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/pubsub/RedisPubSubMessage.java create mode 100644 extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/DefaultRedisPubSubMessage.java diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/pubsub/PubSubCommands.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/pubsub/PubSubCommands.java index de44e3b9ca815..95b93ebd7c3cc 100644 --- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/pubsub/PubSubCommands.java +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/pubsub/PubSubCommands.java @@ -1,6 +1,7 @@ package io.quarkus.redis.datasource.pubsub; import java.util.List; +import java.util.function.BiConsumer; import java.util.function.Consumer; import io.quarkus.redis.datasource.RedisCommands; @@ -43,6 +44,19 @@ public interface PubSubCommands extends RedisCommands { */ RedisSubscriber subscribeToPattern(String pattern, Consumer onMessage); + /** + * Same as {@link #subscribeToPattern(String, Consumer)}, but instead of receiving only the message payload, it + * also receives the name of the channel. + * + * @param pattern the pattern + * @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the + * channels matching the pattern, and is invoked on the I/O thread. So, you must + * not block. Offload to a separate thread if needed. The first parameter is the name of the + * channel. The second parameter is the payload. + * @return the subscriber object that lets you unsubscribe + */ + RedisSubscriber subscribeToPattern(String pattern, BiConsumer onMessage); + /** * Subscribes to the given patterns like {@code chan*l}. * @@ -54,6 +68,19 @@ public interface PubSubCommands extends RedisCommands { */ RedisSubscriber subscribeToPatterns(List patterns, Consumer onMessage); + /** + * Same as {@link #subscribeToPatterns(List, Consumer)}, but instead of only receiving the payload, it also receives + * the channel name. + * + * @param patterns the patterns + * @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the + * channels matching the pattern, and is invoked on the I/O thread. So, you must + * not block. Offload to a separate thread if needed. The first parameter is the channel name. + * The second one if the payload. + * @return the subscriber object that lets you unsubscribe + */ + RedisSubscriber subscribeToPatterns(List patterns, BiConsumer onMessage); + /** * Subscribes to the given channels. * @@ -97,6 +124,23 @@ RedisSubscriber subscribe(String channel, Consumer onMessage, Runnable onEnd, RedisSubscriber subscribeToPattern(String pattern, Consumer onMessage, Runnable onEnd, Consumer onException); + /** + * Same as {@link #subscribeToPatterns(List, Consumer, Runnable, Consumer)}, but also receives the channel name. + * + * @param pattern the pattern + * @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the + * channels matching the pattern, and is invoked on the I/O thread. So, you must + * not block. Offload to a separate thread if needed. The first parameter is the name of the + * channel. The second parameter is the payload. + * @param onEnd the end handler. Be aware that this callback is invoked on the I/O thread. + * So, you must not block. Offload to a separate thread if needed. + * @param onException the exception handler. Be aware that this callback is invoked on the I/O thread. + * So, you must not block. Offload to a separate thread if needed. + * @return the subscriber object that lets you unsubscribe + */ + RedisSubscriber subscribeToPattern(String pattern, BiConsumer onMessage, Runnable onEnd, + Consumer onException); + /** * Subscribes to the given patterns like {@code chan*l}. * @@ -113,6 +157,23 @@ RedisSubscriber subscribeToPattern(String pattern, Consumer onMessage, Runnab RedisSubscriber subscribeToPatterns(List patterns, Consumer onMessage, Runnable onEnd, Consumer onException); + /** + * Same as {@link #subscribeToPatterns(List, Consumer, Runnable, Consumer)}, but also receive the channel name. + * + * @param patterns the patterns + * @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the + * channels matching the pattern, and is invoked on the I/O thread. So, you must + * not block. Offload to a separate thread if needed. The first parameter is the name of the + * channel. The second parameter is the payload. + * @param onEnd the end handler. Be aware that this callback is invoked on the I/O thread. + * So, you must not block. Offload to a separate thread if needed. + * @param onException the exception handler. Be aware that this callback is invoked on the I/O thread. + * So, you must not block. Offload to a separate thread if needed. + * @return the subscriber object that lets you unsubscribe + */ + RedisSubscriber subscribeToPatterns(List patterns, BiConsumer onMessage, Runnable onEnd, + Consumer onException); + /** * Subscribes to the given channels. * @@ -129,6 +190,23 @@ RedisSubscriber subscribeToPatterns(List patterns, Consumer onMessage RedisSubscriber subscribe(List channels, Consumer onMessage, Runnable onEnd, Consumer onException); + /** + * Same as {@link #subscribe(List, Consumer, Runnable, Consumer)} but also receives the channel name. + * + * @param channels the channels + * @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the given + * channels, and is invoked on the I/O thread. So, you must not block. Offload to + * a separate thread if needed. The first parameter is the name of the channel. The second + * parameter is the payload. + * @param onEnd the end handler. Be aware that this callback is invoked on the I/O thread. + * So, you must not block. Offload to a separate thread if needed. + * @param onException the exception handler. Be aware that this callback is invoked on the I/O thread. + * So, you must not block. Offload to a separate thread if needed. + * @return the subscriber object that lets you unsubscribe + */ + RedisSubscriber subscribe(List channels, BiConsumer onMessage, Runnable onEnd, + Consumer onException); + /** * A redis subscriber */ diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/pubsub/ReactivePubSubCommands.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/pubsub/ReactivePubSubCommands.java index 0144d7c0c800c..349dc4836d377 100644 --- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/pubsub/ReactivePubSubCommands.java +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/pubsub/ReactivePubSubCommands.java @@ -1,6 +1,7 @@ package io.quarkus.redis.datasource.pubsub; import java.util.List; +import java.util.function.BiConsumer; import java.util.function.Consumer; import io.quarkus.redis.datasource.ReactiveRedisCommands; @@ -40,6 +41,19 @@ public interface ReactivePubSubCommands extends ReactiveRedisCommands { */ Uni subscribeToPattern(String pattern, Consumer onMessage); + /** + * Same as {@link #subscribeToPattern(String, Consumer)}, but instead of receiving only the message payload, it + * also receives the name of the channel. + * + * @param pattern the pattern + * @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the + * channels matching the pattern, and is invoked on the I/O thread. So, you must + * not block. Offload to a separate thread if needed. The first parameter is the name of the + * channel. The second parameter is the payload. + * @return the subscriber object that lets you unsubscribe + */ + Uni subscribeToPattern(String pattern, BiConsumer onMessage); + /** * Subscribes to the given patterns like {@code chan*l}. * @@ -51,6 +65,19 @@ public interface ReactivePubSubCommands extends ReactiveRedisCommands { */ Uni subscribeToPatterns(List patterns, Consumer onMessage); + /** + * Same as {@link #subscribeToPatterns(List, Consumer)}, but instead of only receiving the payload, it also receives + * the channel name. + * + * @param patterns the patterns + * @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the + * channels matching the pattern, and is invoked on the I/O thread. So, you must + * not block. Offload to a separate thread if needed. The first parameter is the channel name. + * The second one if the payload. + * @return the subscriber object that lets you unsubscribe + */ + Uni subscribeToPatterns(List patterns, BiConsumer onMessage); + /** * Subscribes to the given channels. * @@ -62,6 +89,19 @@ public interface ReactivePubSubCommands extends ReactiveRedisCommands { */ Uni subscribe(List channels, Consumer onMessage); + /** + * Same as {@link #subscribe(List, Consumer)}, but instead of just receiving the payload, it also receives the + * channel name. + * + * @param channels the channels + * @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the given + * channels, and is invoked on the I/O thread. So, you must not block. Offload to + * a separate thread if needed. The first parameter is the channel name. The second one if the + * payload. + * @return the subscriber object that lets you unsubscribe + */ + Uni subscribe(List channels, BiConsumer onMessage); + /** * Subscribes to a given channel. * @@ -94,6 +134,23 @@ Uni subscribe(String channel, Consumer onMessage, Ru Uni subscribeToPattern(String pattern, Consumer onMessage, Runnable onEnd, Consumer onException); + /** + * Same as {@link #subscribeToPatterns(List, Consumer, Runnable, Consumer)}, but also receives the channel name. + * + * @param pattern the pattern + * @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the + * channels matching the pattern, and is invoked on the I/O thread. So, you must + * not block. Offload to a separate thread if needed. The first parameter is the name of the + * channel. The second parameter is the payload. + * @param onEnd the end handler. Be aware that this callback is invoked on the I/O thread. + * So, you must not block. Offload to a separate thread if needed. + * @param onException the exception handler. Be aware that this callback is invoked on the I/O thread. + * So, you must not block. Offload to a separate thread if needed. + * @return the subscriber object that lets you unsubscribe + */ + Uni subscribeToPattern(String pattern, BiConsumer onMessage, Runnable onEnd, + Consumer onException); + /** * Subscribes to the given patterns like {@code chan*l}. * @@ -110,6 +167,23 @@ Uni subscribeToPattern(String pattern, Consumer onMe Uni subscribeToPatterns(List patterns, Consumer onMessage, Runnable onEnd, Consumer onException); + /** + * Same as {@link #subscribeToPatterns(List, Consumer, Runnable, Consumer)}, but also receive the channel name. + * + * @param patterns the patterns + * @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the + * channels matching the pattern, and is invoked on the I/O thread. So, you must + * not block. Offload to a separate thread if needed. The first parameter is the name of the + * channel. The second parameter is the payload. + * @param onEnd the end handler. Be aware that this callback is invoked on the I/O thread. + * So, you must not block. Offload to a separate thread if needed. + * @param onException the exception handler. Be aware that this callback is invoked on the I/O thread. + * So, you must not block. Offload to a separate thread if needed. + * @return the subscriber object that lets you unsubscribe + */ + Uni subscribeToPatterns(List patterns, BiConsumer onMessage, Runnable onEnd, + Consumer onException); + /** * Subscribes to the given channels. * @@ -126,6 +200,23 @@ Uni subscribeToPatterns(List patterns, Consumer Uni subscribe(List channels, Consumer onMessage, Runnable onEnd, Consumer onException); + /** + * Same as {@link #subscribe(List, Consumer, Runnable, Consumer)} but also receives the channel name. + * + * @param channels the channels + * @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the given + * channels, and is invoked on the I/O thread. So, you must not block. Offload to + * a separate thread if needed. The first parameter is the name of the channel. The second + * parameter is the payload. + * @param onEnd the end handler. Be aware that this callback is invoked on the I/O thread. + * So, you must not block. Offload to a separate thread if needed. + * @param onException the exception handler. Be aware that this callback is invoked on the I/O thread. + * So, you must not block. Offload to a separate thread if needed. + * @return the subscriber object that lets you unsubscribe + */ + Uni subscribe(List channels, BiConsumer onMessage, Runnable onEnd, + Consumer onException); + /** * Subscribes to the given channels. * This method returns a {@code Multi} emitting an item of type {@code V} for each received message. @@ -137,6 +228,15 @@ Uni subscribe(List channels, Consumer onMess */ Multi subscribe(String... channels); + /** + * Same as {@link #subscribe(String...)}, but instead of receiving the message payload directly, it receives + * instances of {@link RedisPubSubMessage} wrapping the payload and the channel on which the message has been sent. + * + * @param channels the channels + * @return the stream of message + */ + Multi> subscribeAsMessages(String... channels); + /** * Subscribes to the given patterns. * This method returns a {@code Multi} emitting an item of type {@code V} for each received message. @@ -148,6 +248,15 @@ Uni subscribe(List channels, Consumer onMess */ Multi subscribeToPatterns(String... patterns); + /** + * Same as {@link #subscribeToPatterns(String...)}, but instead of receiving only the message payload, it receives + * instances of {@link RedisPubSubMessage} containing both the payload and the name of the channel. + * + * @param patterns the patterns + * @return the stream of message + */ + Multi> subscribeAsMessagesToPatterns(String... patterns); + /** * A redis subscriber */ diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/pubsub/RedisPubSubMessage.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/pubsub/RedisPubSubMessage.java new file mode 100644 index 0000000000000..b6401f6dce044 --- /dev/null +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/pubsub/RedisPubSubMessage.java @@ -0,0 +1,22 @@ +package io.quarkus.redis.datasource.pubsub; + +/** + * A structure encapsulating the Redis pub/sub payload and the channel on which the message was sent. + * This structure is used when using {@link ReactivePubSubCommands#subscribeAsMessages(String...)} and + * {@link ReactivePubSubCommands#subscribeAsMessagesToPatterns(String...)} + * + * @param the type of payload + */ +public interface RedisPubSubMessage { + + /** + * @return the payload. + */ + V getPayload(); + + /** + * @return the channel on which the message was sent. + */ + String getChannel(); + +} diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingPubSubCommandsImpl.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingPubSubCommandsImpl.java index f8e4099032aeb..a59c5d5e92a4d 100644 --- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingPubSubCommandsImpl.java +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingPubSubCommandsImpl.java @@ -4,6 +4,7 @@ import java.time.Duration; import java.util.List; +import java.util.function.BiConsumer; import java.util.function.Consumer; import io.quarkus.redis.datasource.RedisDataSource; @@ -75,6 +76,44 @@ public RedisSubscriber subscribe(List channels, Consumer onMessage, R .await().atMost(timeout); } + @Override + public RedisSubscriber subscribeToPattern(String pattern, BiConsumer onMessage) { + return reactive.subscribeToPattern(pattern, onMessage) + .map(r -> new BlockingRedisSubscriber(r)) + .await().atMost(timeout); + } + + @Override + public RedisSubscriber subscribeToPatterns(List patterns, BiConsumer onMessage) { + return reactive.subscribeToPatterns(patterns, onMessage) + .map(r -> new BlockingRedisSubscriber(r)) + .await().atMost(timeout); + } + + @Override + public RedisSubscriber subscribeToPattern(String pattern, BiConsumer onMessage, Runnable onEnd, + Consumer onException) { + return reactive.subscribeToPattern(pattern, onMessage, onEnd, onException) + .map(r -> new BlockingRedisSubscriber(r)) + .await().atMost(timeout); + } + + @Override + public RedisSubscriber subscribeToPatterns(List patterns, BiConsumer onMessage, Runnable onEnd, + Consumer onException) { + return reactive.subscribeToPatterns(patterns, onMessage, onEnd, onException) + .map(r -> new BlockingRedisSubscriber(r)) + .await().atMost(timeout); + } + + @Override + public RedisSubscriber subscribe(List channels, BiConsumer onMessage, Runnable onEnd, + Consumer onException) { + return reactive.subscribe(channels, onMessage, onEnd, onException) + .map(r -> new BlockingRedisSubscriber(r)) + .await().atMost(timeout); + } + private class BlockingRedisSubscriber implements RedisSubscriber { private final ReactivePubSubCommands.ReactiveRedisSubscriber reactiveRedisSubscriber; diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/DefaultRedisPubSubMessage.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/DefaultRedisPubSubMessage.java new file mode 100644 index 0000000000000..5533279351b0b --- /dev/null +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/DefaultRedisPubSubMessage.java @@ -0,0 +1,24 @@ +package io.quarkus.redis.runtime.datasource; + +import io.quarkus.redis.datasource.pubsub.RedisPubSubMessage; + +public class DefaultRedisPubSubMessage implements RedisPubSubMessage { + + private final V payload; + private final String channel; + + public DefaultRedisPubSubMessage(V payload, String channel) { + this.payload = payload; + this.channel = channel; + } + + @Override + public V getPayload() { + return payload; + } + + @Override + public String getChannel() { + return channel; + } +} diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactivePubSubCommandsImpl.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactivePubSubCommandsImpl.java index 95476dcb2251e..c3a935f13efde 100644 --- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactivePubSubCommandsImpl.java +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactivePubSubCommandsImpl.java @@ -9,10 +9,12 @@ import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.function.BiConsumer; import java.util.function.Consumer; import io.quarkus.redis.datasource.ReactiveRedisDataSource; import io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands; +import io.quarkus.redis.datasource.pubsub.RedisPubSubMessage; import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle; import io.smallrye.common.vertx.VertxContext; import io.smallrye.mutiny.Multi; @@ -65,16 +67,31 @@ public Uni subscribeToPattern(String pattern, Consumer< return subscribeToPattern(pattern, onMessage, null, null); } + @Override + public Uni subscribeToPattern(String pattern, BiConsumer onMessage) { + return subscribeToPattern(pattern, onMessage, null, null); + } + @Override public Uni subscribeToPatterns(List patterns, Consumer onMessage) { return subscribeToPatterns(patterns, onMessage, null, null); } + @Override + public Uni subscribeToPatterns(List patterns, BiConsumer onMessage) { + return subscribeToPatterns(patterns, onMessage, null, null); + } + @Override public Uni subscribe(List channels, Consumer onMessage) { return subscribe(channels, onMessage, null, null); } + @Override + public Uni subscribe(List channels, BiConsumer onMessage) { + return subscribe(channels, onMessage, null, null); + } + @Override public Uni subscribe(String channel, Consumer onMessage, Runnable onEnd, Consumer onException) { @@ -88,19 +105,46 @@ public Uni subscribeToPattern(String pattern, Consumer< } @Override - public Uni subscribeToPatterns(List patterns, Consumer onMessage, Runnable onEnd, + public Uni subscribeToPattern(String pattern, BiConsumer onMessage, Runnable onEnd, Consumer onException) { + return subscribeToPatterns(List.of(pattern), onMessage, onEnd, onException); + } + + private void validatePatterns(List patterns) { notNullOrEmpty(patterns, "patterns"); - nonNull(onMessage, "onMessage"); for (String pattern : patterns) { if (pattern == null) { - throw new IllegalArgumentException("Patterns must not be null"); + throw new IllegalArgumentException("Pattern must not be null"); } if (pattern.isBlank()) { - throw new IllegalArgumentException("Patterns cannot be blank"); + throw new IllegalArgumentException("Pattern cannot be blank"); } } + } + + @Override + public Uni subscribeToPatterns(List patterns, Consumer onMessage, Runnable onEnd, + Consumer onException) { + nonNull(onMessage, "onMessage"); + validatePatterns(patterns); + + return client.connect() + .chain(conn -> { + RedisAPI api = RedisAPI.api(conn); + ReactiveRedisPatternSubscriberImpl subscriber = new ReactiveRedisPatternSubscriberImpl(conn, api, patterns, + (channel, value) -> onMessage.accept(value), onEnd, onException); + return subscriber.subscribe() + .replaceWith(subscriber); + }); + } + + @Override + public Uni subscribeToPatterns(List patterns, BiConsumer onMessage, + Runnable onEnd, + Consumer onException) { + validatePatterns(patterns); + nonNull(onMessage, "onMessage"); return client.connect() .chain(conn -> { @@ -112,9 +156,38 @@ public Uni subscribeToPatterns(List patterns, C }); } + private void validateChannels(List channels) { + notNullOrEmpty(channels, "channels"); + + for (String pattern : channels) { + if (pattern == null) { + throw new IllegalArgumentException("Channel must not be null"); + } + if (pattern.isBlank()) { + throw new IllegalArgumentException("Channel cannot be blank"); + } + } + } + @Override public Uni subscribe(List channels, Consumer onMessage, Runnable onEnd, Consumer onException) { + nonNull(onMessage, "onMessage"); + validateChannels(channels); + + return client.connect() + .chain(conn -> { + RedisAPI api = RedisAPI.api(conn); + ReactiveAbstractRedisSubscriberImpl subscriber = new ReactiveAbstractRedisSubscriberImpl(conn, api, + channels, (channel, value) -> onMessage.accept(value), onEnd, onException); + return subscriber.subscribe() + .replaceWith(subscriber); + }); + } + + @Override + public Uni subscribe(List channels, BiConsumer onMessage, Runnable onEnd, + Consumer onException) { notNullOrEmpty(channels, "channels"); nonNull(onMessage, "onMessage"); @@ -137,6 +210,33 @@ public Uni subscribe(List channels, Consumer }); } + @Override + public Multi subscribeToPatterns(String... patterns) { + notNullOrEmpty(patterns, "patterns"); + doesNotContainNull(patterns, "patterns"); + + return Multi.createFrom().emitter(emitter -> { + subscribeToPatterns(List.of(patterns), emitter::emit, emitter::complete, emitter::fail) + .subscribe().with(x -> { + emitter.onTermination(() -> x.unsubscribe(patterns).subscribe().asCompletionStage()); + }, emitter::fail); + }); + } + + @Override + public Multi> subscribeAsMessagesToPatterns(String... patterns) { + notNullOrEmpty(patterns, "patterns"); + doesNotContainNull(patterns, "patterns"); + return Multi.createFrom().emitter(emitter -> { + subscribeToPatterns(List.of(patterns), + (channel, value) -> emitter.emit(new DefaultRedisPubSubMessage<>(value, channel)), emitter::complete, + emitter::fail) + .subscribe().with(x -> { + emitter.onTermination(() -> x.unsubscribe(patterns).subscribe().asCompletionStage()); + }, emitter::fail); + }); + } + @Override public Multi subscribe(String... channels) { notNullOrEmpty(channels, "channels"); @@ -153,15 +253,16 @@ public Multi subscribe(String... channels) { } @Override - public Multi subscribeToPatterns(String... patterns) { - notNullOrEmpty(patterns, "patterns"); - doesNotContainNull(patterns, "patterns"); + public Multi> subscribeAsMessages(String... channels) { + notNullOrEmpty(channels, "channels"); + doesNotContainNull(channels, "channels"); + List list = List.of(channels); return Multi.createFrom().emitter(emitter -> { - subscribeToPatterns(List.of(patterns), emitter::emit, emitter::complete, emitter::fail) - .subscribe().with(x -> { - emitter.onTermination(() -> x.unsubscribe(patterns).subscribe().asCompletionStage()); - }, emitter::fail); + subscribe(list, (channel, value) -> new DefaultRedisPubSubMessage<>(value, channel), emitter::complete, + emitter::fail) + .subscribe().with(subscriber -> emitter + .onTermination(() -> subscriber.unsubscribe(channels).subscribe().asCompletionStage())); }); } @@ -169,11 +270,11 @@ private abstract class AbstractRedisSubscriber implements ReactiveRedisSubscribe final RedisConnection connection; final RedisAPI api; final String id; - final Consumer onMessage; + final BiConsumer onMessage; final Runnable onEnd; final Consumer onException; - private AbstractRedisSubscriber(RedisConnection connection, RedisAPI api, Consumer onMessage, + private AbstractRedisSubscriber(RedisConnection connection, RedisAPI api, BiConsumer onMessage, Runnable onEnd, Consumer onException) { this.connection = connection; this.api = api; @@ -212,15 +313,15 @@ private void runOnDuplicatedContext(Runnable runnable) { () -> datasource.getVertx().runOnContext(() -> contextConsumer.accept(Vertx.currentContext()))); } - private void handleRedisEvent(UniEmitter emitter, Response r) { + protected void handleRedisEvent(UniEmitter emitter, Response r) { if (r != null && r.size() > 0) { String command = r.get(0).toString(); if ("subscribe".equalsIgnoreCase(command) || "psubscribe".equalsIgnoreCase(command)) { emitter.complete(null); // Subscribed } else if ("message".equalsIgnoreCase(command)) { - onMessage.accept(marshaller.decode(classOfMessage, r.get(2))); + onMessage.accept(r.get(1).toString(), marshaller.decode(classOfMessage, r.get(2))); } else if ("pmessage".equalsIgnoreCase(command)) { - onMessage.accept(marshaller.decode(classOfMessage, r.get(3))); + onMessage.accept(r.get(2).toString(), marshaller.decode(classOfMessage, r.get(3))); } } } @@ -239,7 +340,7 @@ private class ReactiveAbstractRedisSubscriberImpl extends AbstractRedisSubscribe private final List channels; public ReactiveAbstractRedisSubscriberImpl(RedisConnection connection, RedisAPI api, List channels, - Consumer onMessage, Runnable onEnd, + BiConsumer onMessage, Runnable onEnd, Consumer onException) { super(connection, api, onMessage, onEnd, onException); this.channels = new ArrayList<>(channels); @@ -277,7 +378,7 @@ private class ReactiveRedisPatternSubscriberImpl extends AbstractRedisSubscriber private final List patterns; public ReactiveRedisPatternSubscriberImpl(RedisConnection connection, RedisAPI api, List patterns, - Consumer onMessage, Runnable onEnd, + BiConsumer onMessage, Runnable onEnd, Consumer onException) { super(connection, api, onMessage, onEnd, onException); this.patterns = new ArrayList<>(patterns); diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/PubSubCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/PubSubCommandsTest.java index 452d81abff8af..671d77483de73 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/PubSubCommandsTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/PubSubCommandsTest.java @@ -7,6 +7,8 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.BiConsumer; +import java.util.function.Consumer; import org.assertj.core.api.Assertions; import org.awaitility.Awaitility; @@ -16,6 +18,7 @@ import io.quarkus.redis.datasource.pubsub.PubSubCommands; import io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands; +import io.quarkus.redis.datasource.pubsub.RedisPubSubMessage; import io.quarkus.redis.runtime.datasource.BlockingRedisDataSourceImpl; import io.quarkus.redis.runtime.datasource.ReactiveRedisDataSourceImpl; import io.smallrye.common.vertx.VertxContext; @@ -113,7 +116,38 @@ void testMultipleSubscribers() { List people1 = new CopyOnWriteArrayList<>(); List people2 = new CopyOnWriteArrayList<>(); PubSubCommands.RedisSubscriber subscriber1 = pubsub.subscribe(channel, people1::add); - PubSubCommands.RedisSubscriber subscriber2 = pubsub.subscribeToPattern(channel + "*", people2::add); + PubSubCommands.RedisSubscriber subscriber2 = pubsub.subscribeToPattern(channel + "*", p -> people2.add(p)); + + pubsub.publish(channel, new Person("luke", "skywalker")); + pubsub.publish(channel + "-another", new Person("luke", "skywalker")); + + Awaitility.await().until(() -> people1.size() == 1); + Awaitility.await().until(() -> people2.size() == 2); + + pubsub.publish(channel, new Person("leia", "skywalker")); + pubsub.publish(channel, new Person("leia", "skywalker")); + pubsub.publish(channel + "-yet", new Person("leia", "skywalker")); + + Awaitility.await().until(() -> people1.size() == 3); + Awaitility.await().until(() -> people2.size() == 5); + + subscriber2.unsubscribe(); + subscriber1.unsubscribe(); + + awaitNoMoreActiveChannels(); + + } + + @Test + void testMultipleSubscribersAndBiConsumer() { + + List people1 = new CopyOnWriteArrayList<>(); + List people2 = new CopyOnWriteArrayList<>(); + PubSubCommands.RedisSubscriber subscriber1 = pubsub.subscribe(channel, people1::add); + PubSubCommands.RedisSubscriber subscriber2 = pubsub.subscribeToPattern(channel + "*", (s, p) -> { + assertThat(s).isIn(channel, channel + "-another", channel + "-yet"); + people2.add(p); + }); pubsub.publish(channel, new Person("luke", "skywalker")); pubsub.publish(channel + "-another", new Person("luke", "skywalker")); @@ -156,7 +190,10 @@ void invalidSubscribe() { })).isInstanceOf(IllegalArgumentException.class); assertThatThrownBy(() -> pubsub.subscribeToPatterns(Collections.emptyList(), p -> { })).isInstanceOf(IllegalArgumentException.class); - assertThatThrownBy(() -> pubsub.subscribeToPattern("aaa", null)).isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> pubsub.subscribeToPattern("aaa", (Consumer) null)) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> pubsub.subscribeToPattern("aaa", (BiConsumer) null)) + .isInstanceOf(IllegalArgumentException.class); } @Test @@ -185,7 +222,34 @@ void subscribeToMultipleChannels() { @Test void subscribeToMultiplePatterns() { List people1 = new CopyOnWriteArrayList<>(); - PubSubCommands.RedisSubscriber subscriber = pubsub.subscribeToPatterns(List.of(channel + "*", "foo"), people1::add); + PubSubCommands.RedisSubscriber subscriber = pubsub.subscribeToPatterns(List.of(channel + "*", "foo"), + p -> people1.add(p)); + + pubsub.publish(channel, new Person("luke", "skywalker")); + pubsub.publish("foo", new Person("luke", "skywalker")); + + Awaitility.await().until(() -> people1.size() == 2); + + pubsub.publish(channel, new Person("leia", "skywalker")); + pubsub.publish(channel, new Person("leia", "skywalker")); + pubsub.publish("foo", new Person("leia", "skywalker")); + + Awaitility.await().until(() -> people1.size() == 5); + + subscriber.unsubscribe(); + + awaitNoMoreActiveChannels(); + + } + + @Test + void subscribeToMultiplePatternsWithBiConsumer() { + List people1 = new CopyOnWriteArrayList<>(); + PubSubCommands.RedisSubscriber subscriber = pubsub.subscribeToPatterns(List.of(channel + "*", "foo"), + (s, p) -> { + assertThat(s).isIn("foo", channel); + people1.add(p); + }); pubsub.publish(channel, new Person("luke", "skywalker")); pubsub.publish("foo", new Person("luke", "skywalker")); @@ -225,6 +289,32 @@ void subscribeToMultiplePatternsWithMulti() { } + @Test + void subscribeToMultiplePatternsWithMultiAsMessages() { + List> people = new CopyOnWriteArrayList<>(); + Multi> multi = reactive.subscribeAsMessagesToPatterns(channel + "*", "foo"); + + Cancellable cancellable = multi.subscribe().with(people::add); + + pubsub.publish(channel, new Person("luke", "skywalker")); + pubsub.publish("foo", new Person("luke", "skywalker")); + pubsub.publish(channel, new Person("leia", "skywalker")); + pubsub.publish(channel, new Person("leia", "skywalker")); + pubsub.publish("foo", new Person("leia", "skywalker")); + + Awaitility.await().until(() -> people.size() > 1); + + assertThat(people).allSatisfy(m -> { + assertThat(m.getChannel()).isNotBlank(); + assertThat(m.getPayload()).isNotNull(); + }); + + cancellable.cancel(); + + awaitNoMoreActiveChannels(); + + } + @Test void subscribeToSingleWithMulti() { List people1 = new CopyOnWriteArrayList<>(); @@ -249,6 +339,35 @@ void subscribeToSingleWithMulti() { } + @Test + void subscribeToSingleWithMultiAsMessages() { + List> people = new CopyOnWriteArrayList<>(); + Multi> multi = reactive.subscribeAsMessagesToPatterns(channel + "*"); + + Cancellable cancellable = multi.subscribe().with(people::add); + + pubsub.publish("foo", new Person("luke", "skywalker")); + pubsub.publish(channel, new Person("luke", "skywalker")); + + Awaitility.await().until(() -> people.size() == 1); + + pubsub.publish(channel, new Person("leia", "skywalker")); + pubsub.publish(channel, new Person("leia", "skywalker")); + pubsub.publish(channel + "foo", new Person("leia", "skywalker")); + + Awaitility.await().until(() -> people.size() == 4); + + assertThat(people).allSatisfy(m -> { + assertThat(m.getChannel()).isNotBlank(); + assertThat(m.getPayload()).isNotNull(); + }); + + cancellable.cancel(); + + awaitNoMoreActiveChannels(); + + } + @Test void unsubscribe() { @@ -308,7 +427,7 @@ void unsubscribeOne() { void unsubscribePattern() { List people = new CopyOnWriteArrayList<>(); - PubSubCommands.RedisSubscriber subscriber = pubsub.subscribeToPattern(channel + "*", people::add); + PubSubCommands.RedisSubscriber subscriber = pubsub.subscribeToPattern(channel + "*", p -> people.add(p)); pubsub.publish(channel + "1", new Person("luke", "skywalker")); Awaitility.await().until(() -> people.size() == 1); @@ -324,7 +443,7 @@ void unsubscribePattern() { void unsubscribeAllPatterns() { List people = new CopyOnWriteArrayList<>(); - PubSubCommands.RedisSubscriber subscriber = pubsub.subscribeToPattern(channel + "*", people::add); + PubSubCommands.RedisSubscriber subscriber = pubsub.subscribeToPattern(channel + "*", p -> people.add(p)); pubsub.publish(channel + "1", new Person("luke", "skywalker")); Awaitility.await().until(() -> people.size() == 1); @@ -341,7 +460,7 @@ void unsubscribeAllPatterns() { @Test void unsubscribeOnePattern() { List people = new CopyOnWriteArrayList<>(); - PubSubCommands.RedisSubscriber subscriber = pubsub.subscribeToPatterns(List.of("foo*", "bar*"), people::add); + PubSubCommands.RedisSubscriber subscriber = pubsub.subscribeToPatterns(List.of("foo*", "bar*"), p -> people.add(p)); pubsub.publish("foo1", new Person("luke", "skywalker")); pubsub.publish("bar2", new Person("luke", "skywalker")); Awaitility.await().until(() -> people.size() == 2);