diff --git a/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java b/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java index 04b0fb6f..28ca991b 100644 --- a/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java +++ b/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java @@ -40,6 +40,7 @@ import io.r2dbc.spi.R2dbcException; import io.r2dbc.spi.TransactionDefinition; import io.r2dbc.spi.ValidationDepth; +import io.r2dbc.spi.Wrapped; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; @@ -48,6 +49,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; +import reactor.core.scheduler.Scheduler; import reactor.util.Logger; import reactor.util.Loggers; import reactor.util.annotation.Nullable; @@ -62,7 +64,7 @@ /** * An implementation of {@link Connection} for connecting to a PostgreSQL database. */ -final class PostgresqlConnection implements io.r2dbc.postgresql.api.PostgresqlConnection { +final class PostgresqlConnection implements io.r2dbc.postgresql.api.PostgresqlConnection, Wrapped { private final Logger logger = Loggers.getLogger(this.getClass()); @@ -384,6 +386,21 @@ public String toString() { '}'; } + @Override + public E unwrap(Class targetClass) { + + if (targetClass == Scheduler.class) { + return targetClass.cast(this.client.getScheduler()); + } + + return Wrapped.super.unwrap(targetClass); + } + + @Override + public Object unwrap() { + return null; + } + @Override public Mono validate(ValidationDepth depth) { diff --git a/src/main/java/io/r2dbc/postgresql/client/Client.java b/src/main/java/io/r2dbc/postgresql/client/Client.java index e563e2d0..ed98307c 100644 --- a/src/main/java/io/r2dbc/postgresql/client/Client.java +++ b/src/main/java/io/r2dbc/postgresql/client/Client.java @@ -28,6 +28,7 @@ import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import java.util.Optional; import java.util.TimeZone; @@ -121,6 +122,12 @@ default Flux exchange(Publisher requests) { */ Optional getProcessId(); + /** + * @return returns the EventLoop as scheduler. + * @since 1.0.7 + */ + Scheduler getScheduler(); + /** * Returns the connected process secret key if it has been communicated. * diff --git a/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java b/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java index 8ff9cf30..ba20dc9d 100644 --- a/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java +++ b/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java @@ -22,6 +22,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoop; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; @@ -55,6 +56,7 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; import reactor.core.publisher.Sinks; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.netty.Connection; import reactor.netty.channel.AbortedException; @@ -108,6 +110,8 @@ public final class ReactorNettyClient implements Client { private final Connection connection; + private final Scheduler scheduler; + private ConnectionContext context; private final Sinks.Many> requestSink = Sinks.many().unicast().onBackpressureBuffer(); @@ -167,6 +171,9 @@ private ReactorNettyClient(Connection connection, ConnectionSettings settings) { this.context = connectionContext; + EventLoop eventLoop = connection.channel().eventLoop(); + this.scheduler = Schedulers.fromExecutorService(eventLoop, eventLoop.toString()); + AtomicReference receiveError = new AtomicReference<>(); connection.inbound().receive() @@ -470,6 +477,11 @@ public Optional getProcessId() { return Optional.ofNullable(this.processId); } + @Override + public Scheduler getScheduler() { + return this.scheduler; + } + @Override public Optional getSecretKey() { return Optional.ofNullable(this.secretKey); diff --git a/src/test/java/io/r2dbc/postgresql/client/TestClient.java b/src/test/java/io/r2dbc/postgresql/client/TestClient.java index b783bf18..464a9825 100644 --- a/src/test/java/io/r2dbc/postgresql/client/TestClient.java +++ b/src/test/java/io/r2dbc/postgresql/client/TestClient.java @@ -28,6 +28,8 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; import reactor.util.annotation.Nullable; import java.util.ArrayList; @@ -138,6 +140,11 @@ public Optional getProcessId() { return Optional.ofNullable(this.processId); } + @Override + public Scheduler getScheduler() { + return Schedulers.immediate(); + } + @Override public Optional getSecretKey() { return Optional.ofNullable(this.secretKey);