Skip to content

Commit

Permalink
Use DirectProcessor in EventBus #1086
Browse files Browse the repository at this point in the history
Lettuce now uses DirectProcessor as non-blocking event bus. DirectProcessor no longer blocks calling .next(…). Previously, EmitterProcessor used a blocking queue which blocked the caller if downstream consumers did not consume events in time.
  • Loading branch information
mp911de committed Jul 29, 2019
1 parent 911aa6f commit 8b63744
Showing 1 changed file with 7 additions and 6 deletions.
13 changes: 7 additions & 6 deletions src/main/java/io/lettuce/core/event/DefaultEventBus.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
*/
package io.lettuce.core.event;

import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Scheduler;

/**
Expand All @@ -27,11 +28,13 @@
*/
public class DefaultEventBus implements EventBus {

private final EmitterProcessor<Event> bus;
private final DirectProcessor<Event> bus;
private final FluxSink<Event> sink;
private final Scheduler scheduler;

public DefaultEventBus(Scheduler scheduler) {
this.bus = EmitterProcessor.create();
this.bus = DirectProcessor.create();
this.sink = bus.sink();
this.scheduler = scheduler;
}

Expand All @@ -42,8 +45,6 @@ public Flux<Event> get() {

@Override
public void publish(Event event) {
if (bus.hasDownstreams()) {
bus.onNext(event);
}
sink.next(event);
}
}

0 comments on commit 8b63744

Please sign in to comment.