Skip to content

Commit

Permalink
Merge pull request #33608 from cescoffier/redis-propagate-signal-pub-sub
Browse files Browse the repository at this point in the history
Propagate completion and error events in Redis pub/sub
  • Loading branch information
machi1990 authored May 30, 2023
2 parents 039a3fb + f406a26 commit 89db120
Showing 1 changed file with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,11 @@ public Multi<V> subscribe(String... channels) {
doesNotContainNull(channels, "channels");

return Multi.createFrom().emitter(emitter -> {
subscribe(List.of(channels), emitter::emit)
subscribe(List.of(channels), emitter::emit, emitter::complete, emitter::fail)
.subscribe().with(x -> {
emitter.onTermination(() -> x.unsubscribe(channels).subscribe().asCompletionStage());
emitter.onTermination(() -> {
x.unsubscribe(channels).subscribe().asCompletionStage();
});
}, emitter::fail);
});
}
Expand All @@ -156,7 +158,7 @@ public Multi<V> subscribeToPatterns(String... patterns) {
doesNotContainNull(patterns, "patterns");

return Multi.createFrom().emitter(emitter -> {
subscribeToPatterns(List.of(patterns), emitter::emit)
subscribeToPatterns(List.of(patterns), emitter::emit, emitter::complete, emitter::fail)
.subscribe().with(x -> {
emitter.onTermination(() -> x.unsubscribe(patterns).subscribe().asCompletionStage());
}, emitter::fail);
Expand Down Expand Up @@ -186,10 +188,12 @@ private AbstractRedisSubscriber(RedisConnection connection, RedisAPI api, Consum
public Uni<String> subscribe() {
Uni<Void> handled = Uni.createFrom().emitter(emitter -> {
connection.handler(r -> runOnDuplicatedContext(() -> handleRedisEvent(emitter, r)));
if (onEnd != null)
if (onEnd != null) {
connection.endHandler(() -> runOnDuplicatedContext(onEnd));
if (onException != null)
}
if (onException != null) {
connection.exceptionHandler(t -> runOnDuplicatedContext(() -> onException.accept(t)));
}
});

Uni<Void> subscribed = subscribeToRedis();
Expand Down

0 comments on commit 89db120

Please sign in to comment.