From 9e152d66e2da01ecd5c931f9c308c3c878ffacbe Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Fri, 1 Apr 2022 14:58:20 +0200 Subject: [PATCH] Upgrade to Reactor 2022.0.0-M1 [resolves #503] Signed-off-by: Mark Paluch --- pom.xml | 7 +++- .../postgresql/PostgresqlConnection.java | 17 +++++---- .../io/r2dbc/postgresql/client/Client.java | 2 +- .../postgresql/client/ReactorNettyClient.java | 10 +++--- .../PostgresqlConnectionFactoryUnitTests.java | 6 ++-- .../r2dbc/postgresql/client/TestClient.java | 35 ++++++++----------- 6 files changed, 41 insertions(+), 36 deletions(-) diff --git a/pom.xml b/pom.xml index 09512252..8d96bf15 100644 --- a/pom.xml +++ b/pom.xml @@ -48,7 +48,7 @@ UTF-8 UTF-8 1.0.0.BUILD-SNAPSHOT - 2020.0.17 + 2022.0.0-M1 2.1 5.3.16 1.16.3 @@ -648,6 +648,11 @@ true + + + spring-libs-milestone + https://repo.spring.io/libs-milestone + diff --git a/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java b/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java index f6cf0834..30ea1186 100644 --- a/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java +++ b/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java @@ -38,10 +38,10 @@ import io.r2dbc.spi.TransactionDefinition; import io.r2dbc.spi.ValidationDepth; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Disposable; +import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; @@ -474,28 +474,31 @@ void dispose() { void register(Client client) { - this.subscription = client.addNotificationListener(new Subscriber() { + BaseSubscriber subscriber = new BaseSubscriber() { @Override - public void onSubscribe(Subscription subscription) { + protected void hookOnSubscribe(Subscription subscription) { subscription.request(Long.MAX_VALUE); } @Override - public void onNext(NotificationResponse notificationResponse) { + public void hookOnNext(NotificationResponse notificationResponse) { NotificationAdapter.this.sink.emitNext(new NotificationResponseWrapper(notificationResponse), Sinks.EmitFailureHandler.FAIL_FAST); } @Override - public void onError(Throwable throwable) { + public void hookOnError(Throwable throwable) { NotificationAdapter.this.sink.emitError(throwable, Sinks.EmitFailureHandler.FAIL_FAST); } @Override - public void onComplete() { + public void hookOnComplete() { NotificationAdapter.this.sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST); } - }); + }; + + this.subscription = subscriber; + client.addNotificationListener(subscriber); } Flux getEvents() { diff --git a/src/main/java/io/r2dbc/postgresql/client/Client.java b/src/main/java/io/r2dbc/postgresql/client/Client.java index c05c5bbd..aec4f57e 100644 --- a/src/main/java/io/r2dbc/postgresql/client/Client.java +++ b/src/main/java/io/r2dbc/postgresql/client/Client.java @@ -57,7 +57,7 @@ public interface Client { * @throws IllegalArgumentException if {@code consumer} is {@code null} * @since 0.8.1 */ - Disposable addNotificationListener(Subscriber consumer); + void addNotificationListener(Subscriber consumer); /** * Release any resources held by the {@link Client}. diff --git a/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java b/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java index 7087d4e6..76c5866a 100644 --- a/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java +++ b/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java @@ -137,8 +137,8 @@ private ReactorNettyClient(Connection connection, ConnectionSettings settings) { Assert.requireNonNull(connection, "Connection must not be null"); this.settings = Assert.requireNonNull(settings, "ConnectionSettings must not be null"); - connection.addHandler(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE - 5, 1, 4, -4, 0)); - connection.addHandler(new EnsureSubscribersCompleteChannelHandler(this.requestSink)); + connection.addHandlerFirst(new EnsureSubscribersCompleteChannelHandler(this.requestSink)); + connection.addHandlerLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE - 5, 1, 4, -4, 0)); this.connection = connection; this.byteBufAllocator = connection.outbound().alloc(); this.context = new ConnectionContext().withChannelId(connection.channel().toString()); @@ -422,8 +422,8 @@ public Disposable addNotificationListener(Consumer consume } @Override - public Disposable addNotificationListener(Subscriber consumer) { - return this.notificationProcessor.asFlux().subscribe(consumer::onNext, consumer::onError, consumer::onComplete, consumer::onSubscribe); + public void addNotificationListener(Subscriber consumer) { + this.notificationProcessor.asFlux().subscribe(consumer); } @Override @@ -816,7 +816,7 @@ public void onComplete() { @Override public Context currentContext() { Conversation receiver = this.conversations.peek(); - return receiver != null ? receiver.sink.currentContext() : Context.empty(); + return receiver != null ? Context.of(receiver.sink.contextView()) : Context.empty(); } private void tryDrainLoop() { diff --git a/src/test/java/io/r2dbc/postgresql/PostgresqlConnectionFactoryUnitTests.java b/src/test/java/io/r2dbc/postgresql/PostgresqlConnectionFactoryUnitTests.java index f278be6e..7fdae131 100644 --- a/src/test/java/io/r2dbc/postgresql/PostgresqlConnectionFactoryUnitTests.java +++ b/src/test/java/io/r2dbc/postgresql/PostgresqlConnectionFactoryUnitTests.java @@ -66,8 +66,10 @@ void createAuthenticationMD5Password() { // @formatter:off Client client = TestClient.builder() .window() - .expectRequest(new StartupMessage("test-application-name", "test-database", "test-username", Collections.emptyMap())).thenRespond(new AuthenticationMD5Password(TEST.buffer(4).writeInt(100))) - .expectRequest(new PasswordMessage("md55e9836cdb369d50e3bc7d127e88b4804")).thenRespond(AuthenticationOk.INSTANCE) + .expectRequest(new StartupMessage("test-application-name", "test-database", "test-username", Collections.emptyMap())) + .thenRespond(new AuthenticationMD5Password(TEST.buffer(4).writeInt(100))) + .expectRequest(new PasswordMessage("md55e9836cdb369d50e3bc7d127e88b4804")) + .thenRespond(AuthenticationOk.INSTANCE) .done() .build(); // @formatter:on diff --git a/src/test/java/io/r2dbc/postgresql/client/TestClient.java b/src/test/java/io/r2dbc/postgresql/client/TestClient.java index 2fdc8216..2f2a41ed 100644 --- a/src/test/java/io/r2dbc/postgresql/client/TestClient.java +++ b/src/test/java/io/r2dbc/postgresql/client/TestClient.java @@ -26,8 +26,8 @@ import org.reactivestreams.Subscriber; import reactor.core.Disposable; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; import reactor.util.annotation.Nullable; import java.util.ArrayList; @@ -42,7 +42,6 @@ /** * Test {@link Client} implementation that allows specification of expectations and assertions. */ -@SuppressWarnings("deprecation") public final class TestClient implements Client { public static final TestClient NO_OP = new TestClient(false, true, null, null, Flux.empty(), IDLE, new Version("9.4")); @@ -51,15 +50,13 @@ public final class TestClient implements Client { private final boolean connected; - private final reactor.core.publisher.EmitterProcessor notificationProcessor = reactor.core.publisher.EmitterProcessor.create(false); + private final Sinks.Many notificationProcessor = Sinks.many().multicast().onBackpressureBuffer(); private final Integer processId; - private final reactor.core.publisher.EmitterProcessor requestProcessor = reactor.core.publisher.EmitterProcessor.create(false); + private final Sinks.Many requestProcessor = Sinks.many().multicast().onBackpressureBuffer(); - private final FluxSink requests = this.requestProcessor.sink(); - - private final reactor.core.publisher.EmitterProcessor> responseProcessor = reactor.core.publisher.EmitterProcessor.create(false); + private final Sinks.Many> responseProcessor = Sinks.many().replay().all(); private final Integer secretKey; @@ -75,14 +72,12 @@ private TestClient(boolean expectClose, boolean connected, @Nullable Integer pro this.transactionStatus = Assert.requireNonNull(transactionStatus, "transactionStatus must not be null"); this.version = version; - FluxSink> responses = this.responseProcessor.sink(); - Assert.requireNonNull(windows, "windows must not be null") .map(window -> window.exchanges) .map(exchanges -> exchanges .concatMap(exchange -> - this.requestProcessor.zipWith(exchange.requests) + this.requestProcessor.asFlux().zipWith(exchange.requests) .handle((tuple, sink) -> { FrontendMessage actual = tuple.getT1(); FrontendMessage expected = tuple.getT2(); @@ -92,7 +87,7 @@ private TestClient(boolean expectClose, boolean connected, @Nullable Integer pro } }) .thenMany(exchange.responses))) - .subscribe(responses::next, responses::error, responses::complete); + .subscribe(this.responseProcessor::tryEmitNext, this.responseProcessor::tryEmitError, this.responseProcessor::tryEmitComplete); } public static Builder builder() { @@ -108,20 +103,20 @@ public Mono close() { public Flux exchange(Publisher requests) { Assert.requireNonNull(requests, "requests must not be null"); - return this.responseProcessor + return this.responseProcessor.asFlux() .doOnSubscribe(s -> Flux.from(requests) - .subscribe(this.requests::next, this.requests::error)) + .subscribe(this.requestProcessor::tryEmitNext, this.requestProcessor::tryEmitError)) .next() .flatMapMany(Function.identity()); } @Override public Flux exchange(Predicate takeUntil, Publisher requests) { - return this.responseProcessor + return this.responseProcessor.asFlux() .doOnSubscribe(s -> Flux.from(requests) - .subscribe(this.requests::next, this.requests::error)) + .subscribe(this.requestProcessor::tryEmitNext, this.requestProcessor::tryEmitError)) .next() .flatMapMany(Function.identity()) .takeWhile(takeUntil.negate()); @@ -169,21 +164,21 @@ public Mono cancelRequest() { @Override public void send(FrontendMessage message) { - this.requests.next(message); + this.requestProcessor.tryEmitNext(message); } @Override public Disposable addNotificationListener(Consumer consumer) { - return this.notificationProcessor.subscribe(consumer); + return this.notificationProcessor.asFlux().subscribe(consumer); } @Override - public Disposable addNotificationListener(Subscriber consumer) { - return this.notificationProcessor.subscribe(consumer::onNext, consumer::onError, consumer::onComplete, consumer::onSubscribe); + public void addNotificationListener(Subscriber consumer) { + this.notificationProcessor.asFlux().subscribe(consumer); } public void notify(NotificationResponse notification) { - this.notificationProcessor.onNext(notification); + this.notificationProcessor.tryEmitNext(notification); } public static final class Builder {