Skip to content

Commit

Permalink
Replace use of deprecated DirectProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
saai-syvendra committed Oct 24, 2024
1 parent d1b41f1 commit c7267f9
Showing 1 changed file with 6 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

@RequiredArgsConstructor
Expand All @@ -34,17 +33,17 @@ public abstract class SharedTopicListener implements TopicListener {
protected final ListenerProperties listenerProperties;

@Override
@SuppressWarnings("deprecation")
public Flux<TopicMessage> listen(TopicMessageFilter filter) {
DirectProcessor<TopicMessage> overflowProcessor = DirectProcessor.create();
FluxSink<TopicMessage> overflowSink = overflowProcessor.sink();
Sinks.Many<TopicMessage> sink = Sinks.many().unicast().onBackpressureBuffer();
Flux<TopicMessage> overflowProcessor = sink.asFlux();

// moving publishOn from after onBackpressureBuffer to after Flux.merge reduces CPU usage by up to 40%
Flux<TopicMessage> topicMessageFlux = getSharedListener(filter)
.doOnSubscribe(s -> log.info("Subscribing: {}", filter))
.onBackpressureBuffer(
listenerProperties.getMaxBufferSize(), t -> overflowSink.error(Exceptions.failWithOverflow()))
.doFinally(s -> overflowSink.complete());
listenerProperties.getMaxBufferSize(), t -> sink.tryEmitError(Exceptions.failWithOverflow()))
.doFinally(s -> sink.tryEmitComplete());

return Flux.merge(listenerProperties.getPrefetch(), topicMessageFlux, overflowProcessor)
.publishOn(Schedulers.boundedElastic(), false, listenerProperties.getPrefetch());
}
Expand Down

0 comments on commit c7267f9

Please sign in to comment.