Skip to content

Commit

Permalink
Fix out of order signals when published on another executor #986
Browse files Browse the repository at this point in the history
We now use a single EventExecutor when emitting signals instead of submitting runnables to the EventExecutorGroup.
This change retains execution order and prevents race conditions of completion arrival before an actual data signal.

Original pull request: #987.
  • Loading branch information
trueinsider authored and mp911de committed Feb 27, 2019
1 parent a76ce42 commit 5d78c35
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,11 @@ private <T> Flux<T> createFlux(Supplier<RedisCommand<K, V, T>> commandSupplier,
if (tracingEnabled) {

return withTraceContext().flatMapMany(
it -> Flux.from(new RedisPublisher<>(decorate(commandSupplier, it), connection, dissolve, getScheduler())));
it -> Flux.from(new RedisPublisher<>(decorate(commandSupplier, it), connection, dissolve, getScheduler()
.next())));
}

return Flux.from(new RedisPublisher<>(commandSupplier, connection, dissolve, getScheduler()));
return Flux.from(new RedisPublisher<>(commandSupplier, connection, dissolve, getScheduler().next()));
}

private Mono<TraceContext> withTraceContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.lettuce.core;

import static io.lettuce.core.ClientOptions.DisconnectedBehavior.REJECT_COMMANDS;
import static io.lettuce.core.ScriptOutputType.INTEGER;
import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
Expand Down Expand Up @@ -235,6 +236,24 @@ void subscribeWithDisconnectedClient(RedisClient client) {
connection.close();
}

@Test
@Inject
void publishOnSchedulerTest(RedisClient client) {

client.setOptions(ClientOptions.builder().publishOnScheduler(true).build());

RedisReactiveCommands<String, String> reactive = client.connect().reactive();

int counter = 0;
for (int i = 0; i < 1000; i++) {
if (reactive.eval("return 1", INTEGER).next().block() == null) counter++;
}

assertThat(counter).isZero();

reactive.getStatefulConnection().close();
}

private static Subscriber<String> createSubscriberWithExceptionOnComplete() {
return new Subscriber<String>() {

Expand Down

0 comments on commit 5d78c35

Please sign in to comment.