Skip to content

Commit

Permalink
Adopt to Reactor Netty changes.
Browse files Browse the repository at this point in the history
Also, ignore deprecations as we cannot switch to the Sinks API easily.

[#466]

Signed-off-by: Mark Paluch <[email protected]>
  • Loading branch information
mp911de committed Jan 11, 2022
1 parent 88819eb commit 46d8bbe
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 109 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@
<compilerArgs>
<arg>-Werror</arg>
<arg>-Xlint:all</arg>
<arg>-Xlint:-deprecation</arg>
<arg>-Xlint:-options</arg>
<arg>-Xlint:-processing</arg>
<arg>-Xlint:-serial</arg>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
* Utility to execute the {@code Parse/Bind/Describe/Execute/Sync} portion of the <a href="https://www.postgresql.org/docs/current/static/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY">Extended query</a>
* message flow.
*/
@SuppressWarnings("deprecation")
class ExtendedFlowDelegate {

static final Predicate<BackendMessage> RESULT_FRAME_FILTER = not(or(BindComplete.class::isInstance, NoData.class::isInstance));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
/**
* {@link Statement} using the <a href="https://www.postgresql.org/docs/current/static/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY">Extended Query Flow</a>.
*/
@SuppressWarnings("deprecation")
final class ExtendedQueryPostgresqlStatement implements PostgresqlStatement {

private final Bindings bindings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package io.r2dbc.postgresql;

import io.netty.handler.ssl.IdentityCipherSuiteFilter;
import io.netty.handler.ssl.OpenSsl;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.r2dbc.postgresql.client.DefaultHostnameVerifier;
Expand All @@ -28,10 +31,10 @@
import io.r2dbc.postgresql.extension.Extension;
import io.r2dbc.postgresql.util.Assert;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.SslProvider;
import reactor.util.annotation.Nullable;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -50,7 +53,6 @@
import java.util.function.ToIntFunction;

import static io.r2dbc.postgresql.message.frontend.Execute.NO_LIMIT;
import static reactor.netty.tcp.SslProvider.DefaultConfigurationType.TCP;

/**
* Connection configuration information for connecting to a PostgreSQL database.
Expand Down Expand Up @@ -896,7 +898,7 @@ private SSLConfig createSslConfig() {
return new SSLConfig(this.sslMode, createSslProvider(), hostnameVerifier);
}

private Supplier<SslProvider> createSslProvider() {
private Supplier<SslContext> createSslProvider() {
SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();
if (this.sslMode.verifyCertificate()) {
if (this.sslRootCert != null) {
Expand All @@ -906,6 +908,13 @@ private Supplier<SslProvider> createSslProvider() {
sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
}

sslContextBuilder.sslProvider(
OpenSsl.isAvailable() ?
io.netty.handler.ssl.SslProvider.OPENSSL :
io.netty.handler.ssl.SslProvider.JDK)
.ciphers(null, IdentityCipherSuiteFilter.INSTANCE)
.applicationProtocolConfig(null);

URL sslKey = this.sslKey;
URL sslCert = this.sslCert;

Expand Down Expand Up @@ -941,10 +950,13 @@ private Supplier<SslProvider> createSslProvider() {
});
}

return () -> SslProvider.builder()
.sslContext(this.sslContextBuilderCustomizer.apply(sslContextBuilder))
.defaultConfiguration(TCP)
.build();
return () -> {
try {
return this.sslContextBuilderCustomizer.apply(sslContextBuilder).build();
} catch (SSLException e) {
throw new IllegalStateException("Failed to create SslContext", e);
}
};
}

interface StreamConsumer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ abstract class AbstractPostgresSSLHandlerAdapter extends ChannelInboundHandlerAd
AbstractPostgresSSLHandlerAdapter(ByteBufAllocator alloc, SSLConfig sslConfig) {
this.sslConfig = sslConfig;
this.sslEngine = sslConfig.getSslProvider().get()
.getSslContext()
.newEngine(alloc);
this.handshakeFuture = new CompletableFuture<>();
this.sslHandler = new SslHandler(this.sslEngine);
Expand Down
98 changes: 1 addition & 97 deletions src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
Expand Down Expand Up @@ -379,7 +377,7 @@ public static Mono<ReactorNettyClient> connect(ConnectionProvider connectionProv
TcpClient tcpClient = TcpClient.create(connectionProvider).remoteAddress(() -> socketAddress);

if (!(socketAddress instanceof InetSocketAddress)) {
tcpClient = tcpClient.runOn(new SocketLoopResources(connectionSettings.hasLoopResources() ? connectionSettings.getRequiredLoopResources() : TcpResources.get()), true);
tcpClient = tcpClient.runOn(connectionSettings.hasLoopResources() ? connectionSettings.getRequiredLoopResources() : TcpResources.get(), true);
} else {

if (connectionSettings.hasLoopResources()) {
Expand Down Expand Up @@ -574,100 +572,6 @@ public ResponseQueueException(String message) {

}

@SuppressWarnings({"deprecation"})
static class SocketLoopResources implements LoopResources {

@Nullable
private static final Class<? extends Channel> EPOLL_SOCKET = findClass("io.netty.channel.epoll.EpollDomainSocketChannel");

@Nullable
private static final Class<? extends Channel> KQUEUE_SOCKET = findClass("io.netty.channel.kqueue.KQueueDomainSocketChannel");

private static final boolean kqueue;

static {
boolean kqueueCheck = false;
try {
Class.forName("io.netty.channel.kqueue.KQueue");
kqueueCheck = io.netty.channel.kqueue.KQueue.isAvailable();
} catch (ClassNotFoundException cnfe) {
}
kqueue = kqueueCheck;
}

private static final boolean epoll;

static {
boolean epollCheck = false;
try {
Class.forName("io.netty.channel.epoll.Epoll");
epollCheck = Epoll.isAvailable();
} catch (ClassNotFoundException cnfe) {
}
epoll = epollCheck;
}

private final LoopResources delegate;

public SocketLoopResources(LoopResources delegate) {
this.delegate = delegate;
}

@SuppressWarnings("unchecked")
private static Class<? extends Channel> findClass(String className) {
try {
return (Class<? extends Channel>) SocketLoopResources.class.getClassLoader().loadClass(className);
} catch (ClassNotFoundException e) {
return null;
}
}

@Override
public Class<? extends Channel> onChannel(EventLoopGroup group) {

if (epoll && EPOLL_SOCKET != null) {
return EPOLL_SOCKET;
}

if (kqueue && KQUEUE_SOCKET != null) {
return KQUEUE_SOCKET;
}

return this.delegate.onChannel(group);
}

@Override
public EventLoopGroup onClient(boolean useNative) {
return this.delegate.onClient(useNative);
}

@Override
public EventLoopGroup onServer(boolean useNative) {
return this.delegate.onServer(useNative);
}

@Override
public EventLoopGroup onServerSelect(boolean useNative) {
return this.delegate.onServerSelect(useNative);
}

@Override
public boolean daemon() {
return this.delegate.daemon();
}

@Override
public void dispose() {
this.delegate.dispose();
}

@Override
public Mono<Void> disposeLater() {
return this.delegate.disposeLater();
}

}

/**
* Value object representing a single conversation. The driver permits a single conversation at a time to ensure that request messages get routed to the proper response receiver and do not leak
* into other conversations. A conversation must be finished in the sense that the {@link Publisher} of {@link FrontendMessage} has completed before the next conversation is started.
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/r2dbc/postgresql/client/SSLConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package io.r2dbc.postgresql.client;

import io.netty.handler.ssl.SslContext;
import io.r2dbc.postgresql.util.Assert;
import reactor.netty.tcp.SslProvider;
import reactor.util.annotation.Nullable;

import javax.net.ssl.HostnameVerifier;
Expand All @@ -31,9 +31,9 @@ public final class SSLConfig {
private final SSLMode sslMode;

@Nullable
private final Supplier<SslProvider> sslProvider;
private final Supplier<SslContext> sslProvider;

public SSLConfig(SSLMode sslMode, @Nullable Supplier<SslProvider> sslProvider, @Nullable HostnameVerifier hostnameVerifier) {
public SSLConfig(SSLMode sslMode, @Nullable Supplier<SslContext> sslProvider, @Nullable HostnameVerifier hostnameVerifier) {
if (sslMode != SSLMode.DISABLE) {
Assert.requireNonNull(sslProvider, "Ssl provider is required for ssl mode " + sslMode);
}
Expand All @@ -57,7 +57,7 @@ public SSLMode getSslMode() {
return this.sslMode;
}

public Supplier<SslProvider> getSslProvider() {
public Supplier<SslContext> getSslProvider() {
if (this.sslProvider == null) {
throw new IllegalStateException("SSL Mode disabled. SslProvider not available");
}
Expand Down

0 comments on commit 46d8bbe

Please sign in to comment.