Skip to content

Commit

Permalink
Upgrade to Reactor 2022.0.0-M1
Browse files Browse the repository at this point in the history
[resolves pgjdbc#503]

Signed-off-by: Mark Paluch <[email protected]>
  • Loading branch information
mp911de committed Apr 4, 2022
1 parent b812b9d commit 9e152d6
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 36 deletions.
7 changes: 6 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<r2dbc-spi.version>1.0.0.BUILD-SNAPSHOT</r2dbc-spi.version>
<reactor.version>2020.0.17</reactor.version>
<reactor.version>2022.0.0-M1</reactor.version>
<scram-client.version>2.1</scram-client.version>
<spring-framework.version>5.3.16</spring-framework.version>
<testcontainers.version>1.16.3</testcontainers.version>
Expand Down Expand Up @@ -648,6 +648,11 @@
<enabled>true</enabled>
</snapshots>
</repository>

<repository>
<id>spring-libs-milestone</id>
<url>https://repo.spring.io/libs-milestone</url>
</repository>
</repositories>

</project>
17 changes: 10 additions & 7 deletions src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@
import io.r2dbc.spi.TransactionDefinition;
import io.r2dbc.spi.ValidationDepth;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
Expand Down Expand Up @@ -474,28 +474,31 @@ void dispose() {

void register(Client client) {

this.subscription = client.addNotificationListener(new Subscriber<NotificationResponse>() {
BaseSubscriber<NotificationResponse> subscriber = new BaseSubscriber<NotificationResponse>() {

@Override
public void onSubscribe(Subscription subscription) {
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}

@Override
public void onNext(NotificationResponse notificationResponse) {
public void hookOnNext(NotificationResponse notificationResponse) {
NotificationAdapter.this.sink.emitNext(new NotificationResponseWrapper(notificationResponse), Sinks.EmitFailureHandler.FAIL_FAST);
}

@Override
public void onError(Throwable throwable) {
public void hookOnError(Throwable throwable) {
NotificationAdapter.this.sink.emitError(throwable, Sinks.EmitFailureHandler.FAIL_FAST);
}

@Override
public void onComplete() {
public void hookOnComplete() {
NotificationAdapter.this.sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
}
});
};

this.subscription = subscriber;
client.addNotificationListener(subscriber);
}

Flux<Notification> getEvents() {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/r2dbc/postgresql/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public interface Client {
* @throws IllegalArgumentException if {@code consumer} is {@code null}
* @since 0.8.1
*/
Disposable addNotificationListener(Subscriber<NotificationResponse> consumer);
void addNotificationListener(Subscriber<NotificationResponse> consumer);

/**
* Release any resources held by the {@link Client}.
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ private ReactorNettyClient(Connection connection, ConnectionSettings settings) {
Assert.requireNonNull(connection, "Connection must not be null");
this.settings = Assert.requireNonNull(settings, "ConnectionSettings must not be null");

connection.addHandler(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE - 5, 1, 4, -4, 0));
connection.addHandler(new EnsureSubscribersCompleteChannelHandler(this.requestSink));
connection.addHandlerFirst(new EnsureSubscribersCompleteChannelHandler(this.requestSink));
connection.addHandlerLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE - 5, 1, 4, -4, 0));
this.connection = connection;
this.byteBufAllocator = connection.outbound().alloc();
this.context = new ConnectionContext().withChannelId(connection.channel().toString());
Expand Down Expand Up @@ -422,8 +422,8 @@ public Disposable addNotificationListener(Consumer<NotificationResponse> consume
}

@Override
public Disposable addNotificationListener(Subscriber<NotificationResponse> consumer) {
return this.notificationProcessor.asFlux().subscribe(consumer::onNext, consumer::onError, consumer::onComplete, consumer::onSubscribe);
public void addNotificationListener(Subscriber<NotificationResponse> consumer) {
this.notificationProcessor.asFlux().subscribe(consumer);
}

@Override
Expand Down Expand Up @@ -816,7 +816,7 @@ public void onComplete() {
@Override
public Context currentContext() {
Conversation receiver = this.conversations.peek();
return receiver != null ? receiver.sink.currentContext() : Context.empty();
return receiver != null ? Context.of(receiver.sink.contextView()) : Context.empty();
}

private void tryDrainLoop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ void createAuthenticationMD5Password() {
// @formatter:off
Client client = TestClient.builder()
.window()
.expectRequest(new StartupMessage("test-application-name", "test-database", "test-username", Collections.emptyMap())).thenRespond(new AuthenticationMD5Password(TEST.buffer(4).writeInt(100)))
.expectRequest(new PasswordMessage("md55e9836cdb369d50e3bc7d127e88b4804")).thenRespond(AuthenticationOk.INSTANCE)
.expectRequest(new StartupMessage("test-application-name", "test-database", "test-username", Collections.emptyMap()))
.thenRespond(new AuthenticationMD5Password(TEST.buffer(4).writeInt(100)))
.expectRequest(new PasswordMessage("md55e9836cdb369d50e3bc7d127e88b4804"))
.thenRespond(AuthenticationOk.INSTANCE)
.done()
.build();
// @formatter:on
Expand Down
35 changes: 15 additions & 20 deletions src/test/java/io/r2dbc/postgresql/client/TestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.reactivestreams.Subscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;

import java.util.ArrayList;
Expand All @@ -42,7 +42,6 @@
/**
* Test {@link Client} implementation that allows specification of expectations and assertions.
*/
@SuppressWarnings("deprecation")
public final class TestClient implements Client {

public static final TestClient NO_OP = new TestClient(false, true, null, null, Flux.empty(), IDLE, new Version("9.4"));
Expand All @@ -51,15 +50,13 @@ public final class TestClient implements Client {

private final boolean connected;

private final reactor.core.publisher.EmitterProcessor<NotificationResponse> notificationProcessor = reactor.core.publisher.EmitterProcessor.create(false);
private final Sinks.Many<NotificationResponse> notificationProcessor = Sinks.many().multicast().onBackpressureBuffer();

private final Integer processId;

private final reactor.core.publisher.EmitterProcessor<FrontendMessage> requestProcessor = reactor.core.publisher.EmitterProcessor.create(false);
private final Sinks.Many<FrontendMessage> requestProcessor = Sinks.many().multicast().onBackpressureBuffer();

private final FluxSink<FrontendMessage> requests = this.requestProcessor.sink();

private final reactor.core.publisher.EmitterProcessor<Flux<BackendMessage>> responseProcessor = reactor.core.publisher.EmitterProcessor.create(false);
private final Sinks.Many<Flux<BackendMessage>> responseProcessor = Sinks.many().replay().all();

private final Integer secretKey;

Expand All @@ -75,14 +72,12 @@ private TestClient(boolean expectClose, boolean connected, @Nullable Integer pro
this.transactionStatus = Assert.requireNonNull(transactionStatus, "transactionStatus must not be null");
this.version = version;

FluxSink<Flux<BackendMessage>> responses = this.responseProcessor.sink();

Assert.requireNonNull(windows, "windows must not be null")
.map(window -> window.exchanges)
.map(exchanges -> exchanges
.concatMap(exchange ->

this.requestProcessor.zipWith(exchange.requests)
this.requestProcessor.asFlux().zipWith(exchange.requests)
.handle((tuple, sink) -> {
FrontendMessage actual = tuple.getT1();
FrontendMessage expected = tuple.getT2();
Expand All @@ -92,7 +87,7 @@ private TestClient(boolean expectClose, boolean connected, @Nullable Integer pro
}
})
.thenMany(exchange.responses)))
.subscribe(responses::next, responses::error, responses::complete);
.subscribe(this.responseProcessor::tryEmitNext, this.responseProcessor::tryEmitError, this.responseProcessor::tryEmitComplete);
}

public static Builder builder() {
Expand All @@ -108,20 +103,20 @@ public Mono<Void> close() {
public Flux<BackendMessage> exchange(Publisher<FrontendMessage> requests) {
Assert.requireNonNull(requests, "requests must not be null");

return this.responseProcessor
return this.responseProcessor.asFlux()
.doOnSubscribe(s ->
Flux.from(requests)
.subscribe(this.requests::next, this.requests::error))
.subscribe(this.requestProcessor::tryEmitNext, this.requestProcessor::tryEmitError))
.next()
.flatMapMany(Function.identity());
}

@Override
public Flux<BackendMessage> exchange(Predicate<BackendMessage> takeUntil, Publisher<FrontendMessage> requests) {
return this.responseProcessor
return this.responseProcessor.asFlux()
.doOnSubscribe(s ->
Flux.from(requests)
.subscribe(this.requests::next, this.requests::error))
.subscribe(this.requestProcessor::tryEmitNext, this.requestProcessor::tryEmitError))
.next()
.flatMapMany(Function.identity())
.takeWhile(takeUntil.negate());
Expand Down Expand Up @@ -169,21 +164,21 @@ public Mono<Void> cancelRequest() {

@Override
public void send(FrontendMessage message) {
this.requests.next(message);
this.requestProcessor.tryEmitNext(message);
}

@Override
public Disposable addNotificationListener(Consumer<NotificationResponse> consumer) {
return this.notificationProcessor.subscribe(consumer);
return this.notificationProcessor.asFlux().subscribe(consumer);
}

@Override
public Disposable addNotificationListener(Subscriber<NotificationResponse> consumer) {
return this.notificationProcessor.subscribe(consumer::onNext, consumer::onError, consumer::onComplete, consumer::onSubscribe);
public void addNotificationListener(Subscriber<NotificationResponse> consumer) {
this.notificationProcessor.asFlux().subscribe(consumer);
}

public void notify(NotificationResponse notification) {
this.notificationProcessor.onNext(notification);
this.notificationProcessor.tryEmitNext(notification);
}

public static final class Builder {
Expand Down

0 comments on commit 9e152d6

Please sign in to comment.