Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allows receiving the channel name when using Redis Pub/Sub #33703

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.quarkus.redis.datasource.pubsub;

import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import io.quarkus.redis.datasource.RedisCommands;
Expand Down Expand Up @@ -43,6 +44,19 @@ public interface PubSubCommands<V> extends RedisCommands {
*/
RedisSubscriber subscribeToPattern(String pattern, Consumer<V> onMessage);

/**
* Same as {@link #subscribeToPattern(String, Consumer)}, but instead of receiving only the message payload, it
* also receives the name of the channel.
*
* @param pattern the pattern
* @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the
* channels matching the pattern, and is invoked on the <strong>I/O thread</strong>. So, you must
* not block. Offload to a separate thread if needed. The first parameter is the name of the
* channel. The second parameter is the payload.
* @return the subscriber object that lets you unsubscribe
*/
RedisSubscriber subscribeToPattern(String pattern, BiConsumer<String, V> onMessage);

/**
* Subscribes to the given patterns like {@code chan*l}.
*
Expand All @@ -54,6 +68,19 @@ public interface PubSubCommands<V> extends RedisCommands {
*/
RedisSubscriber subscribeToPatterns(List<String> patterns, Consumer<V> onMessage);

/**
* Same as {@link #subscribeToPatterns(List, Consumer)}, but instead of only receiving the payload, it also receives
* the channel name.
*
* @param patterns the patterns
* @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the
* channels matching the pattern, and is invoked on the <strong>I/O thread</strong>. So, you must
* not block. Offload to a separate thread if needed. The first parameter is the channel name.
* The second one if the payload.
* @return the subscriber object that lets you unsubscribe
*/
RedisSubscriber subscribeToPatterns(List<String> patterns, BiConsumer<String, V> onMessage);

/**
* Subscribes to the given channels.
*
Expand Down Expand Up @@ -97,6 +124,23 @@ RedisSubscriber subscribe(String channel, Consumer<V> onMessage, Runnable onEnd,
RedisSubscriber subscribeToPattern(String pattern, Consumer<V> onMessage, Runnable onEnd,
Consumer<Throwable> onException);

/**
* Same as {@link #subscribeToPatterns(List, Consumer, Runnable, Consumer)}, but also receives the channel name.
*
* @param pattern the pattern
* @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the
* channels matching the pattern, and is invoked on the <strong>I/O thread</strong>. So, you must
* not block. Offload to a separate thread if needed. The first parameter is the name of the
* channel. The second parameter is the payload.
* @param onEnd the end handler. Be aware that this callback is invoked on the <strong>I/O thread</strong>.
* So, you must not block. Offload to a separate thread if needed.
* @param onException the exception handler. Be aware that this callback is invoked on the <strong>I/O thread</strong>.
* So, you must not block. Offload to a separate thread if needed.
* @return the subscriber object that lets you unsubscribe
*/
RedisSubscriber subscribeToPattern(String pattern, BiConsumer<String, V> onMessage, Runnable onEnd,
Consumer<Throwable> onException);

/**
* Subscribes to the given patterns like {@code chan*l}.
*
Expand All @@ -113,6 +157,23 @@ RedisSubscriber subscribeToPattern(String pattern, Consumer<V> onMessage, Runnab
RedisSubscriber subscribeToPatterns(List<String> patterns, Consumer<V> onMessage, Runnable onEnd,
Consumer<Throwable> onException);

/**
* Same as {@link #subscribeToPatterns(List, Consumer, Runnable, Consumer)}, but also receive the channel name.
*
* @param patterns the patterns
* @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the
* channels matching the pattern, and is invoked on the <strong>I/O thread</strong>. So, you must
* not block. Offload to a separate thread if needed. The first parameter is the name of the
* channel. The second parameter is the payload.
* @param onEnd the end handler. Be aware that this callback is invoked on the <strong>I/O thread</strong>.
* So, you must not block. Offload to a separate thread if needed.
* @param onException the exception handler. Be aware that this callback is invoked on the <strong>I/O thread</strong>.
* So, you must not block. Offload to a separate thread if needed.
* @return the subscriber object that lets you unsubscribe
*/
RedisSubscriber subscribeToPatterns(List<String> patterns, BiConsumer<String, V> onMessage, Runnable onEnd,
Consumer<Throwable> onException);

/**
* Subscribes to the given channels.
*
Expand All @@ -129,6 +190,23 @@ RedisSubscriber subscribeToPatterns(List<String> patterns, Consumer<V> onMessage
RedisSubscriber subscribe(List<String> channels, Consumer<V> onMessage, Runnable onEnd,
Consumer<Throwable> onException);

/**
* Same as {@link #subscribe(List, Consumer, Runnable, Consumer)} but also receives the channel name.
*
* @param channels the channels
* @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the given
* channels, and is invoked on the <strong>I/O thread</strong>. So, you must not block. Offload to
* a separate thread if needed. The first parameter is the name of the channel. The second
* parameter is the payload.
* @param onEnd the end handler. Be aware that this callback is invoked on the <strong>I/O thread</strong>.
* So, you must not block. Offload to a separate thread if needed.
* @param onException the exception handler. Be aware that this callback is invoked on the <strong>I/O thread</strong>.
* So, you must not block. Offload to a separate thread if needed.
* @return the subscriber object that lets you unsubscribe
*/
RedisSubscriber subscribe(List<String> channels, BiConsumer<String, V> onMessage, Runnable onEnd,
Consumer<Throwable> onException);

/**
* A redis subscriber
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.quarkus.redis.datasource.pubsub;

import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import io.quarkus.redis.datasource.ReactiveRedisCommands;
Expand Down Expand Up @@ -40,6 +41,19 @@ public interface ReactivePubSubCommands<V> extends ReactiveRedisCommands {
*/
Uni<ReactiveRedisSubscriber> subscribeToPattern(String pattern, Consumer<V> onMessage);

/**
* Same as {@link #subscribeToPattern(String, Consumer)}, but instead of receiving only the message payload, it
* also receives the name of the channel.
*
* @param pattern the pattern
* @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the
* channels matching the pattern, and is invoked on the <strong>I/O thread</strong>. So, you must
* not block. Offload to a separate thread if needed. The first parameter is the name of the
* channel. The second parameter is the payload.
* @return the subscriber object that lets you unsubscribe
*/
Uni<ReactiveRedisSubscriber> subscribeToPattern(String pattern, BiConsumer<String, V> onMessage);

/**
* Subscribes to the given patterns like {@code chan*l}.
*
Expand All @@ -51,6 +65,19 @@ public interface ReactivePubSubCommands<V> extends ReactiveRedisCommands {
*/
Uni<ReactiveRedisSubscriber> subscribeToPatterns(List<String> patterns, Consumer<V> onMessage);

/**
* Same as {@link #subscribeToPatterns(List, Consumer)}, but instead of only receiving the payload, it also receives
* the channel name.
*
* @param patterns the patterns
* @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the
* channels matching the pattern, and is invoked on the <strong>I/O thread</strong>. So, you must
* not block. Offload to a separate thread if needed. The first parameter is the channel name.
* The second one if the payload.
* @return the subscriber object that lets you unsubscribe
*/
Uni<ReactiveRedisSubscriber> subscribeToPatterns(List<String> patterns, BiConsumer<String, V> onMessage);

/**
* Subscribes to the given channels.
*
Expand All @@ -62,6 +89,19 @@ public interface ReactivePubSubCommands<V> extends ReactiveRedisCommands {
*/
Uni<ReactiveRedisSubscriber> subscribe(List<String> channels, Consumer<V> onMessage);

/**
* Same as {@link #subscribe(List, Consumer)}, but instead of just receiving the payload, it also receives the
* channel name.
*
* @param channels the channels
* @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the given
* channels, and is invoked on the <strong>I/O thread</strong>. So, you must not block. Offload to
* a separate thread if needed. The first parameter is the channel name. The second one if the
* payload.
* @return the subscriber object that lets you unsubscribe
*/
Uni<ReactiveRedisSubscriber> subscribe(List<String> channels, BiConsumer<String, V> onMessage);

/**
* Subscribes to a given channel.
*
Expand Down Expand Up @@ -94,6 +134,23 @@ Uni<ReactiveRedisSubscriber> subscribe(String channel, Consumer<V> onMessage, Ru
Uni<ReactiveRedisSubscriber> subscribeToPattern(String pattern, Consumer<V> onMessage, Runnable onEnd,
Consumer<Throwable> onException);

/**
* Same as {@link #subscribeToPatterns(List, Consumer, Runnable, Consumer)}, but also receives the channel name.
*
* @param pattern the pattern
* @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the
* channels matching the pattern, and is invoked on the <strong>I/O thread</strong>. So, you must
* not block. Offload to a separate thread if needed. The first parameter is the name of the
* channel. The second parameter is the payload.
* @param onEnd the end handler. Be aware that this callback is invoked on the <strong>I/O thread</strong>.
* So, you must not block. Offload to a separate thread if needed.
* @param onException the exception handler. Be aware that this callback is invoked on the <strong>I/O thread</strong>.
* So, you must not block. Offload to a separate thread if needed.
* @return the subscriber object that lets you unsubscribe
*/
Uni<ReactiveRedisSubscriber> subscribeToPattern(String pattern, BiConsumer<String, V> onMessage, Runnable onEnd,
Consumer<Throwable> onException);

/**
* Subscribes to the given patterns like {@code chan*l}.
*
Expand All @@ -110,6 +167,23 @@ Uni<ReactiveRedisSubscriber> subscribeToPattern(String pattern, Consumer<V> onMe
Uni<ReactiveRedisSubscriber> subscribeToPatterns(List<String> patterns, Consumer<V> onMessage, Runnable onEnd,
Consumer<Throwable> onException);

/**
* Same as {@link #subscribeToPatterns(List, Consumer, Runnable, Consumer)}, but also receive the channel name.
*
* @param patterns the patterns
* @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the
* channels matching the pattern, and is invoked on the <strong>I/O thread</strong>. So, you must
* not block. Offload to a separate thread if needed. The first parameter is the name of the
* channel. The second parameter is the payload.
* @param onEnd the end handler. Be aware that this callback is invoked on the <strong>I/O thread</strong>.
* So, you must not block. Offload to a separate thread if needed.
* @param onException the exception handler. Be aware that this callback is invoked on the <strong>I/O thread</strong>.
* So, you must not block. Offload to a separate thread if needed.
* @return the subscriber object that lets you unsubscribe
*/
Uni<ReactiveRedisSubscriber> subscribeToPatterns(List<String> patterns, BiConsumer<String, V> onMessage, Runnable onEnd,
Consumer<Throwable> onException);

/**
* Subscribes to the given channels.
*
Expand All @@ -126,6 +200,23 @@ Uni<ReactiveRedisSubscriber> subscribeToPatterns(List<String> patterns, Consumer
Uni<ReactiveRedisSubscriber> subscribe(List<String> channels, Consumer<V> onMessage, Runnable onEnd,
Consumer<Throwable> onException);

/**
* Same as {@link #subscribe(List, Consumer, Runnable, Consumer)} but also receives the channel name.
*
* @param channels the channels
* @param onMessage the message consumer. Be aware that this callback is invoked for each message sent to the given
* channels, and is invoked on the <strong>I/O thread</strong>. So, you must not block. Offload to
* a separate thread if needed. The first parameter is the name of the channel. The second
* parameter is the payload.
* @param onEnd the end handler. Be aware that this callback is invoked on the <strong>I/O thread</strong>.
* So, you must not block. Offload to a separate thread if needed.
* @param onException the exception handler. Be aware that this callback is invoked on the <strong>I/O thread</strong>.
* So, you must not block. Offload to a separate thread if needed.
* @return the subscriber object that lets you unsubscribe
*/
Uni<ReactiveRedisSubscriber> subscribe(List<String> channels, BiConsumer<String, V> onMessage, Runnable onEnd,
Consumer<Throwable> onException);

/**
* Subscribes to the given channels.
* This method returns a {@code Multi} emitting an item of type {@code V} for each received message.
Expand All @@ -137,6 +228,15 @@ Uni<ReactiveRedisSubscriber> subscribe(List<String> channels, Consumer<V> onMess
*/
Multi<V> subscribe(String... channels);

/**
* Same as {@link #subscribe(String...)}, but instead of receiving the message payload directly, it receives
* instances of {@link RedisPubSubMessage} wrapping the payload and the channel on which the message has been sent.
*
* @param channels the channels
* @return the stream of message
*/
Multi<RedisPubSubMessage<V>> subscribeAsMessages(String... channels);

/**
* Subscribes to the given patterns.
* This method returns a {@code Multi} emitting an item of type {@code V} for each received message.
Expand All @@ -148,6 +248,15 @@ Uni<ReactiveRedisSubscriber> subscribe(List<String> channels, Consumer<V> onMess
*/
Multi<V> subscribeToPatterns(String... patterns);

/**
* Same as {@link #subscribeToPatterns(String...)}, but instead of receiving only the message payload, it receives
* instances of {@link RedisPubSubMessage} containing both the payload and the name of the channel.
*
* @param patterns the patterns
* @return the stream of message
*/
Multi<RedisPubSubMessage<V>> subscribeAsMessagesToPatterns(String... patterns);

/**
* A redis subscriber
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.quarkus.redis.datasource.pubsub;

/**
* A structure encapsulating the Redis pub/sub payload and the channel on which the message was sent.
* This structure is used when using {@link ReactivePubSubCommands#subscribeAsMessages(String...)} and
* {@link ReactivePubSubCommands#subscribeAsMessagesToPatterns(String...)}
*
* @param <V> the type of payload
*/
public interface RedisPubSubMessage<V> {

/**
* @return the payload.
*/
V getPayload();

/**
* @return the channel on which the message was sent.
*/
String getChannel();

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import java.time.Duration;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import io.quarkus.redis.datasource.RedisDataSource;
Expand Down Expand Up @@ -75,6 +76,44 @@ public RedisSubscriber subscribe(List<String> channels, Consumer<V> onMessage, R
.await().atMost(timeout);
}

@Override
public RedisSubscriber subscribeToPattern(String pattern, BiConsumer<String, V> onMessage) {
return reactive.subscribeToPattern(pattern, onMessage)
.map(r -> new BlockingRedisSubscriber(r))
.await().atMost(timeout);
}

@Override
public RedisSubscriber subscribeToPatterns(List<String> patterns, BiConsumer<String, V> onMessage) {
return reactive.subscribeToPatterns(patterns, onMessage)
.map(r -> new BlockingRedisSubscriber(r))
.await().atMost(timeout);
}

@Override
public RedisSubscriber subscribeToPattern(String pattern, BiConsumer<String, V> onMessage, Runnable onEnd,
Consumer<Throwable> onException) {
return reactive.subscribeToPattern(pattern, onMessage, onEnd, onException)
.map(r -> new BlockingRedisSubscriber(r))
.await().atMost(timeout);
}

@Override
public RedisSubscriber subscribeToPatterns(List<String> patterns, BiConsumer<String, V> onMessage, Runnable onEnd,
Consumer<Throwable> onException) {
return reactive.subscribeToPatterns(patterns, onMessage, onEnd, onException)
.map(r -> new BlockingRedisSubscriber(r))
.await().atMost(timeout);
}

@Override
public RedisSubscriber subscribe(List<String> channels, BiConsumer<String, V> onMessage, Runnable onEnd,
Consumer<Throwable> onException) {
return reactive.subscribe(channels, onMessage, onEnd, onException)
.map(r -> new BlockingRedisSubscriber(r))
.await().atMost(timeout);
}

private class BlockingRedisSubscriber implements RedisSubscriber {
private final ReactivePubSubCommands.ReactiveRedisSubscriber reactiveRedisSubscriber;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.quarkus.redis.runtime.datasource;

import io.quarkus.redis.datasource.pubsub.RedisPubSubMessage;

public class DefaultRedisPubSubMessage<V> implements RedisPubSubMessage<V> {

private final V payload;
private final String channel;

public DefaultRedisPubSubMessage(V payload, String channel) {
this.payload = payload;
this.channel = channel;
}

@Override
public V getPayload() {
return payload;
}

@Override
public String getChannel() {
return channel;
}
}
Loading