From 5d78c35f4f3bacc63a003c4487ae831a00085545 Mon Sep 17 00:00:00 2001 From: Nikolai Perevozchikov Date: Wed, 27 Feb 2019 17:58:48 +0300 Subject: [PATCH] Fix out of order signals when published on another executor #986 We now use a single EventExecutor when emitting signals instead of submitting runnables to the EventExecutorGroup. This change retains execution order and prevents race conditions of completion arrival before an actual data signal. Original pull request: #987. --- .../core/AbstractRedisReactiveCommands.java | 5 +++-- .../ReactiveConnectionIntegrationTests.java | 19 +++++++++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java index 07828db0f6..e9838dac4b 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java @@ -402,10 +402,11 @@ private Flux createFlux(Supplier> commandSupplier, if (tracingEnabled) { return withTraceContext().flatMapMany( - it -> Flux.from(new RedisPublisher<>(decorate(commandSupplier, it), connection, dissolve, getScheduler()))); + it -> Flux.from(new RedisPublisher<>(decorate(commandSupplier, it), connection, dissolve, getScheduler() + .next()))); } - return Flux.from(new RedisPublisher<>(commandSupplier, connection, dissolve, getScheduler())); + return Flux.from(new RedisPublisher<>(commandSupplier, connection, dissolve, getScheduler().next())); } private Mono withTraceContext() { diff --git a/src/test/java/io/lettuce/core/ReactiveConnectionIntegrationTests.java b/src/test/java/io/lettuce/core/ReactiveConnectionIntegrationTests.java index 2c3a487024..eb75549050 100644 --- a/src/test/java/io/lettuce/core/ReactiveConnectionIntegrationTests.java +++ b/src/test/java/io/lettuce/core/ReactiveConnectionIntegrationTests.java @@ -16,6 +16,7 @@ package io.lettuce.core; import static io.lettuce.core.ClientOptions.DisconnectedBehavior.REJECT_COMMANDS; +import static io.lettuce.core.ScriptOutputType.INTEGER; import static org.assertj.core.api.Assertions.assertThat; import java.time.Duration; @@ -235,6 +236,24 @@ void subscribeWithDisconnectedClient(RedisClient client) { connection.close(); } + @Test + @Inject + void publishOnSchedulerTest(RedisClient client) { + + client.setOptions(ClientOptions.builder().publishOnScheduler(true).build()); + + RedisReactiveCommands reactive = client.connect().reactive(); + + int counter = 0; + for (int i = 0; i < 1000; i++) { + if (reactive.eval("return 1", INTEGER).next().block() == null) counter++; + } + + assertThat(counter).isZero(); + + reactive.getStatefulConnection().close(); + } + private static Subscriber createSubscriberWithExceptionOnComplete() { return new Subscriber() {