Skip to content

Commit

Permalink
Merge pull request #33703 from cescoffier/allow-redis-pub-sub-to-rece…
Browse files Browse the repository at this point in the history
…ive-channel

Allows receiving the channel name when using Redis Pub/Sub
  • Loading branch information
cescoffier authored May 31, 2023
2 parents 9cc716c + 4c6f0be commit 81687f1
Show file tree
Hide file tree
Showing 7 changed files with 516 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.quarkus.redis.datasource.pubsub;

import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import io.quarkus.redis.datasource.RedisCommands;
Expand Down Expand Up @@ -43,6 +44,19 @@ public interface PubSubCommands<V> extends RedisCommands {
*/
RedisSubscriber subscribeToPattern(String pattern, Consumer<V> onMessage);

/**
* Same as {@link #subscribeToPattern(String, Consumer)}, but instead of receiving only the message payload, it
* also receives the name of the channel.
*
* @param pattern the pattern
* @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the
* channels matching the pattern, and is invoked on the <strong>I/O thread</strong>. So, you must
* not block. Offload to a separate thread if needed. The first parameter is the name of the
* channel. The second parameter is the payload.
* @return the subscriber object that lets you unsubscribe
*/
RedisSubscriber subscribeToPattern(String pattern, BiConsumer<String, V> onMessage);

/**
* Subscribes to the given patterns like {@code chan*l}.
*
Expand All @@ -54,6 +68,19 @@ public interface PubSubCommands<V> extends RedisCommands {
*/
RedisSubscriber subscribeToPatterns(List<String> patterns, Consumer<V> onMessage);

/**
* Same as {@link #subscribeToPatterns(List, Consumer)}, but instead of only receiving the payload, it also receives
* the channel name.
*
* @param patterns the patterns
* @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the
* channels matching the pattern, and is invoked on the <strong>I/O thread</strong>. So, you must
* not block. Offload to a separate thread if needed. The first parameter is the channel name.
* The second one if the payload.
* @return the subscriber object that lets you unsubscribe
*/
RedisSubscriber subscribeToPatterns(List<String> patterns, BiConsumer<String, V> onMessage);

/**
* Subscribes to the given channels.
*
Expand Down Expand Up @@ -97,6 +124,23 @@ RedisSubscriber subscribe(String channel, Consumer<V> onMessage, Runnable onEnd,
RedisSubscriber subscribeToPattern(String pattern, Consumer<V> onMessage, Runnable onEnd,
Consumer<Throwable> onException);

/**
* Same as {@link #subscribeToPatterns(List, Consumer, Runnable, Consumer)}, but also receives the channel name.
*
* @param pattern the pattern
* @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the
* channels matching the pattern, and is invoked on the <strong>I/O thread</strong>. So, you must
* not block. Offload to a separate thread if needed. The first parameter is the name of the
* channel. The second parameter is the payload.
* @param onEnd the end handler. Be aware that this callback is invoked on the <strong>I/O thread</strong>.
* So, you must not block. Offload to a separate thread if needed.
* @param onException the exception handler. Be aware that this callback is invoked on the <strong>I/O thread</strong>.
* So, you must not block. Offload to a separate thread if needed.
* @return the subscriber object that lets you unsubscribe
*/
RedisSubscriber subscribeToPattern(String pattern, BiConsumer<String, V> onMessage, Runnable onEnd,
Consumer<Throwable> onException);

/**
* Subscribes to the given patterns like {@code chan*l}.
*
Expand All @@ -113,6 +157,23 @@ RedisSubscriber subscribeToPattern(String pattern, Consumer<V> onMessage, Runnab
RedisSubscriber subscribeToPatterns(List<String> patterns, Consumer<V> onMessage, Runnable onEnd,
Consumer<Throwable> onException);

/**
* Same as {@link #subscribeToPatterns(List, Consumer, Runnable, Consumer)}, but also receive the channel name.
*
* @param patterns the patterns
* @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the
* channels matching the pattern, and is invoked on the <strong>I/O thread</strong>. So, you must
* not block. Offload to a separate thread if needed. The first parameter is the name of the
* channel. The second parameter is the payload.
* @param onEnd the end handler. Be aware that this callback is invoked on the <strong>I/O thread</strong>.
* So, you must not block. Offload to a separate thread if needed.
* @param onException the exception handler. Be aware that this callback is invoked on the <strong>I/O thread</strong>.
* So, you must not block. Offload to a separate thread if needed.
* @return the subscriber object that lets you unsubscribe
*/
RedisSubscriber subscribeToPatterns(List<String> patterns, BiConsumer<String, V> onMessage, Runnable onEnd,
Consumer<Throwable> onException);

/**
* Subscribes to the given channels.
*
Expand All @@ -129,6 +190,23 @@ RedisSubscriber subscribeToPatterns(List<String> patterns, Consumer<V> onMessage
RedisSubscriber subscribe(List<String> channels, Consumer<V> onMessage, Runnable onEnd,
Consumer<Throwable> onException);

/**
* Same as {@link #subscribe(List, Consumer, Runnable, Consumer)} but also receives the channel name.
*
* @param channels the channels
* @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the given
* channels, and is invoked on the <strong>I/O thread</strong>. So, you must not block. Offload to
* a separate thread if needed. The first parameter is the name of the channel. The second
* parameter is the payload.
* @param onEnd the end handler. Be aware that this callback is invoked on the <strong>I/O thread</strong>.
* So, you must not block. Offload to a separate thread if needed.
* @param onException the exception handler. Be aware that this callback is invoked on the <strong>I/O thread</strong>.
* So, you must not block. Offload to a separate thread if needed.
* @return the subscriber object that lets you unsubscribe
*/
RedisSubscriber subscribe(List<String> channels, BiConsumer<String, V> onMessage, Runnable onEnd,
Consumer<Throwable> onException);

/**
* A redis subscriber
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.quarkus.redis.datasource.pubsub;

import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import io.quarkus.redis.datasource.ReactiveRedisCommands;
Expand Down Expand Up @@ -40,6 +41,19 @@ public interface ReactivePubSubCommands<V> extends ReactiveRedisCommands {
*/
Uni<ReactiveRedisSubscriber> subscribeToPattern(String pattern, Consumer<V> onMessage);

/**
* Same as {@link #subscribeToPattern(String, Consumer)}, but instead of receiving only the message payload, it
* also receives the name of the channel.
*
* @param pattern the pattern
* @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the
* channels matching the pattern, and is invoked on the <strong>I/O thread</strong>. So, you must
* not block. Offload to a separate thread if needed. The first parameter is the name of the
* channel. The second parameter is the payload.
* @return the subscriber object that lets you unsubscribe
*/
Uni<ReactiveRedisSubscriber> subscribeToPattern(String pattern, BiConsumer<String, V> onMessage);

/**
* Subscribes to the given patterns like {@code chan*l}.
*
Expand All @@ -51,6 +65,19 @@ public interface ReactivePubSubCommands<V> extends ReactiveRedisCommands {
*/
Uni<ReactiveRedisSubscriber> subscribeToPatterns(List<String> patterns, Consumer<V> onMessage);

/**
* Same as {@link #subscribeToPatterns(List, Consumer)}, but instead of only receiving the payload, it also receives
* the channel name.
*
* @param patterns the patterns
* @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the
* channels matching the pattern, and is invoked on the <strong>I/O thread</strong>. So, you must
* not block. Offload to a separate thread if needed. The first parameter is the channel name.
* The second one if the payload.
* @return the subscriber object that lets you unsubscribe
*/
Uni<ReactiveRedisSubscriber> subscribeToPatterns(List<String> patterns, BiConsumer<String, V> onMessage);

/**
* Subscribes to the given channels.
*
Expand All @@ -62,6 +89,19 @@ public interface ReactivePubSubCommands<V> extends ReactiveRedisCommands {
*/
Uni<ReactiveRedisSubscriber> subscribe(List<String> channels, Consumer<V> onMessage);

/**
* Same as {@link #subscribe(List, Consumer)}, but instead of just receiving the payload, it also receives the
* channel name.
*
* @param channels the channels
* @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the given
* channels, and is invoked on the <strong>I/O thread</strong>. So, you must not block. Offload to
* a separate thread if needed. The first parameter is the channel name. The second one if the
* payload.
* @return the subscriber object that lets you unsubscribe
*/
Uni<ReactiveRedisSubscriber> subscribe(List<String> channels, BiConsumer<String, V> onMessage);

/**
* Subscribes to a given channel.
*
Expand Down Expand Up @@ -94,6 +134,23 @@ Uni<ReactiveRedisSubscriber> subscribe(String channel, Consumer<V> onMessage, Ru
Uni<ReactiveRedisSubscriber> subscribeToPattern(String pattern, Consumer<V> onMessage, Runnable onEnd,
Consumer<Throwable> onException);

/**
* Same as {@link #subscribeToPatterns(List, Consumer, Runnable, Consumer)}, but also receives the channel name.
*
* @param pattern the pattern
* @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the
* channels matching the pattern, and is invoked on the <strong>I/O thread</strong>. So, you must
* not block. Offload to a separate thread if needed. The first parameter is the name of the
* channel. The second parameter is the payload.
* @param onEnd the end handler. Be aware that this callback is invoked on the <strong>I/O thread</strong>.
* So, you must not block. Offload to a separate thread if needed.
* @param onException the exception handler. Be aware that this callback is invoked on the <strong>I/O thread</strong>.
* So, you must not block. Offload to a separate thread if needed.
* @return the subscriber object that lets you unsubscribe
*/
Uni<ReactiveRedisSubscriber> subscribeToPattern(String pattern, BiConsumer<String, V> onMessage, Runnable onEnd,
Consumer<Throwable> onException);

/**
* Subscribes to the given patterns like {@code chan*l}.
*
Expand All @@ -110,6 +167,23 @@ Uni<ReactiveRedisSubscriber> subscribeToPattern(String pattern, Consumer<V> onMe
Uni<ReactiveRedisSubscriber> subscribeToPatterns(List<String> patterns, Consumer<V> onMessage, Runnable onEnd,
Consumer<Throwable> onException);

/**
* Same as {@link #subscribeToPatterns(List, Consumer, Runnable, Consumer)}, but also receive the channel name.
*
* @param patterns the patterns
* @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the
* channels matching the pattern, and is invoked on the <strong>I/O thread</strong>. So, you must
* not block. Offload to a separate thread if needed. The first parameter is the name of the
* channel. The second parameter is the payload.
* @param onEnd the end handler. Be aware that this callback is invoked on the <strong>I/O thread</strong>.
* So, you must not block. Offload to a separate thread if needed.
* @param onException the exception handler. Be aware that this callback is invoked on the <strong>I/O thread</strong>.
* So, you must not block. Offload to a separate thread if needed.
* @return the subscriber object that lets you unsubscribe
*/
Uni<ReactiveRedisSubscriber> subscribeToPatterns(List<String> patterns, BiConsumer<String, V> onMessage, Runnable onEnd,
Consumer<Throwable> onException);

/**
* Subscribes to the given channels.
*
Expand All @@ -126,6 +200,23 @@ Uni<ReactiveRedisSubscriber> subscribeToPatterns(List<String> patterns, Consumer
Uni<ReactiveRedisSubscriber> subscribe(List<String> channels, Consumer<V> onMessage, Runnable onEnd,
Consumer<Throwable> onException);

/**
* Same as {@link #subscribe(List, Consumer, Runnable, Consumer)} but also receives the channel name.
*
* @param channels the channels
* @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the given
* channels, and is invoked on the <strong>I/O thread</strong>. So, you must not block. Offload to
* a separate thread if needed. The first parameter is the name of the channel. The second
* parameter is the payload.
* @param onEnd the end handler. Be aware that this callback is invoked on the <strong>I/O thread</strong>.
* So, you must not block. Offload to a separate thread if needed.
* @param onException the exception handler. Be aware that this callback is invoked on the <strong>I/O thread</strong>.
* So, you must not block. Offload to a separate thread if needed.
* @return the subscriber object that lets you unsubscribe
*/
Uni<ReactiveRedisSubscriber> subscribe(List<String> channels, BiConsumer<String, V> onMessage, Runnable onEnd,
Consumer<Throwable> onException);

/**
* Subscribes to the given channels.
* This method returns a {@code Multi} emitting an item of type {@code V} for each received message.
Expand All @@ -137,6 +228,15 @@ Uni<ReactiveRedisSubscriber> subscribe(List<String> channels, Consumer<V> onMess
*/
Multi<V> subscribe(String... channels);

/**
* Same as {@link #subscribe(String...)}, but instead of receiving the message payload directly, it receives
* instances of {@link RedisPubSubMessage} wrapping the payload and the channel on which the message has been sent.
*
* @param channels the channels
* @return the stream of message
*/
Multi<RedisPubSubMessage<V>> subscribeAsMessages(String... channels);

/**
* Subscribes to the given patterns.
* This method returns a {@code Multi} emitting an item of type {@code V} for each received message.
Expand All @@ -148,6 +248,15 @@ Uni<ReactiveRedisSubscriber> subscribe(List<String> channels, Consumer<V> onMess
*/
Multi<V> subscribeToPatterns(String... patterns);

/**
* Same as {@link #subscribeToPatterns(String...)}, but instead of receiving only the message payload, it receives
* instances of {@link RedisPubSubMessage} containing both the payload and the name of the channel.
*
* @param patterns the patterns
* @return the stream of message
*/
Multi<RedisPubSubMessage<V>> subscribeAsMessagesToPatterns(String... patterns);

/**
* A redis subscriber
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.quarkus.redis.datasource.pubsub;

/**
* A structure encapsulating the Redis pub/sub payload and the channel on which the message was sent.
* This structure is used when using {@link ReactivePubSubCommands#subscribeAsMessages(String...)} and
* {@link ReactivePubSubCommands#subscribeAsMessagesToPatterns(String...)}
*
* @param <V> the type of payload
*/
public interface RedisPubSubMessage<V> {

/**
* @return the payload.
*/
V getPayload();

/**
* @return the channel on which the message was sent.
*/
String getChannel();

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import java.time.Duration;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import io.quarkus.redis.datasource.RedisDataSource;
Expand Down Expand Up @@ -75,6 +76,44 @@ public RedisSubscriber subscribe(List<String> channels, Consumer<V> onMessage, R
.await().atMost(timeout);
}

@Override
public RedisSubscriber subscribeToPattern(String pattern, BiConsumer<String, V> onMessage) {
return reactive.subscribeToPattern(pattern, onMessage)
.map(r -> new BlockingRedisSubscriber(r))
.await().atMost(timeout);
}

@Override
public RedisSubscriber subscribeToPatterns(List<String> patterns, BiConsumer<String, V> onMessage) {
return reactive.subscribeToPatterns(patterns, onMessage)
.map(r -> new BlockingRedisSubscriber(r))
.await().atMost(timeout);
}

@Override
public RedisSubscriber subscribeToPattern(String pattern, BiConsumer<String, V> onMessage, Runnable onEnd,
Consumer<Throwable> onException) {
return reactive.subscribeToPattern(pattern, onMessage, onEnd, onException)
.map(r -> new BlockingRedisSubscriber(r))
.await().atMost(timeout);
}

@Override
public RedisSubscriber subscribeToPatterns(List<String> patterns, BiConsumer<String, V> onMessage, Runnable onEnd,
Consumer<Throwable> onException) {
return reactive.subscribeToPatterns(patterns, onMessage, onEnd, onException)
.map(r -> new BlockingRedisSubscriber(r))
.await().atMost(timeout);
}

@Override
public RedisSubscriber subscribe(List<String> channels, BiConsumer<String, V> onMessage, Runnable onEnd,
Consumer<Throwable> onException) {
return reactive.subscribe(channels, onMessage, onEnd, onException)
.map(r -> new BlockingRedisSubscriber(r))
.await().atMost(timeout);
}

private class BlockingRedisSubscriber implements RedisSubscriber {
private final ReactivePubSubCommands.ReactiveRedisSubscriber reactiveRedisSubscriber;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.quarkus.redis.runtime.datasource;

import io.quarkus.redis.datasource.pubsub.RedisPubSubMessage;

public class DefaultRedisPubSubMessage<V> implements RedisPubSubMessage<V> {

private final V payload;
private final String channel;

public DefaultRedisPubSubMessage(V payload, String channel) {
this.payload = payload;
this.channel = channel;
}

@Override
public V getPayload() {
return payload;
}

@Override
public String getChannel() {
return channel;
}
}
Loading

0 comments on commit 81687f1

Please sign in to comment.