diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 789e1f48ff9a0..cde667b0a442a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -244,7 +244,7 @@ private CompletableFuture createConnection(InetSocketAddress logicalA final CompletableFuture cnxFuture = new CompletableFuture<>(); // Trigger async connect to broker - createConnection(physicalAddress).thenAccept(channel -> { + createConnection(logicalAddress, physicalAddress).thenAccept(channel -> { log.info("[{}] Connected to server", channel); channel.closeFuture().addListener(v -> { @@ -266,16 +266,6 @@ private CompletableFuture createConnection(InetSocketAddress logicalA return; } - if (!logicalAddress.equals(physicalAddress)) { - // We are connecting through a proxy. We need to set the target broker in the ClientCnx object so that - // it can be specified when sending the CommandConnect. - // That phase will happen in the ClientCnx.connectionActive() which will be invoked immediately after - // this method. - cnx.setTargetBroker(logicalAddress); - } - - cnx.setRemoteHostName(physicalAddress.getHostString()); - cnx.connectionFuture().thenRun(() -> { if (log.isDebugEnabled()) { log.debug("[{}] Connection handshake completed", cnx.channel()); @@ -303,7 +293,8 @@ private CompletableFuture createConnection(InetSocketAddress logicalA /** * Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server. */ - private CompletableFuture createConnection(InetSocketAddress unresolvedAddress) { + private CompletableFuture createConnection(InetSocketAddress logicalAddress, + InetSocketAddress unresolvedPhysicalAddress) { CompletableFuture> resolvedAddress; try { if (isSniProxy) { @@ -311,11 +302,11 @@ private CompletableFuture createConnection(InetSocketAddress unresolved resolvedAddress = resolveName(InetSocketAddress.createUnresolved(proxyURI.getHost(), proxyURI.getPort())); } else { - resolvedAddress = resolveName(unresolvedAddress); + resolvedAddress = resolveName(unresolvedPhysicalAddress); } return resolvedAddress.thenCompose( - inetAddresses -> connectToResolvedAddresses(inetAddresses.iterator(), - isSniProxy ? unresolvedAddress : null)); + inetAddresses -> connectToResolvedAddresses(logicalAddress, inetAddresses.iterator(), + isSniProxy ? unresolvedPhysicalAddress : null)); } catch (URISyntaxException e) { log.error("Invalid Proxy url {}", clientConfig.getProxyServiceUrl(), e); return FutureUtil @@ -327,17 +318,19 @@ private CompletableFuture createConnection(InetSocketAddress unresolved * Try to connect to a sequence of IP addresses until a successful connection can be made, or fail if no * address is working. */ - private CompletableFuture connectToResolvedAddresses(Iterator unresolvedAddresses, + private CompletableFuture connectToResolvedAddresses(InetSocketAddress logicalAddress, + Iterator resolvedPhysicalAddress, InetSocketAddress sniHost) { CompletableFuture future = new CompletableFuture<>(); // Successfully connected to server - connectToAddress(unresolvedAddresses.next(), sniHost) + connectToAddress(logicalAddress, resolvedPhysicalAddress.next(), sniHost) .thenAccept(future::complete) .exceptionally(exception -> { - if (unresolvedAddresses.hasNext()) { + if (resolvedPhysicalAddress.hasNext()) { // Try next IP address - connectToResolvedAddresses(unresolvedAddresses, sniHost).thenAccept(future::complete) + connectToResolvedAddresses(logicalAddress, resolvedPhysicalAddress, sniHost) + .thenAccept(future::complete) .exceptionally(ex -> { // This is already unwinding the recursive call future.completeExceptionally(ex); @@ -368,17 +361,22 @@ CompletableFuture> resolveName(InetSocketAddress unresol /** * Attempt to establish a TCP connection to an already resolved single IP address. */ - private CompletableFuture connectToAddress(InetSocketAddress remoteAddress, InetSocketAddress sniHost) { + private CompletableFuture connectToAddress(InetSocketAddress logicalAddress, + InetSocketAddress physicalAddress, InetSocketAddress sniHost) { if (clientConfig.isUseTls()) { return toCompletableFuture(bootstrap.register()) .thenCompose(channel -> channelInitializerHandler - .initTls(channel, sniHost != null ? sniHost : remoteAddress)) + .initTls(channel, sniHost != null ? sniHost : physicalAddress)) .thenCompose(channelInitializerHandler::initSocks5IfConfig) - .thenCompose(channel -> toCompletableFuture(channel.connect(remoteAddress))); + .thenCompose(ch -> + channelInitializerHandler.initializeClientCnx(ch, logicalAddress, physicalAddress)) + .thenCompose(channel -> toCompletableFuture(channel.connect(physicalAddress))); } else { return toCompletableFuture(bootstrap.register()) .thenCompose(channelInitializerHandler::initSocks5IfConfig) - .thenCompose(channel -> toCompletableFuture(channel.connect(remoteAddress))); + .thenCompose(ch -> + channelInitializerHandler.initializeClientCnx(ch, logicalAddress, physicalAddress)) + .thenCompose(channel -> toCompletableFuture(channel.connect(physicalAddress))); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java index b046d1030eda5..f8f61f05661c0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java @@ -43,6 +43,7 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.SecurityUtility; import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder; +import org.apache.pulsar.common.util.netty.NettyFutureUtil; @Slf4j public class PulsarChannelInitializer extends ChannelInitializer { @@ -209,5 +210,29 @@ CompletableFuture initSocks5IfConfig(Channel ch) { return initSocks5Future; } + + CompletableFuture initializeClientCnx(Channel ch, + InetSocketAddress logicalAddress, + InetSocketAddress resolvedPhysicalAddress) { + return NettyFutureUtil.toCompletableFuture(ch.eventLoop().submit(() -> { + final ClientCnx cnx = (ClientCnx) ch.pipeline().get("handler"); + + if (cnx == null) { + throw new IllegalStateException("Missing ClientCnx. This should not happen."); + } + + // Need to do our own equality because the physical address is resolved already + if (!(logicalAddress.getHostString().equalsIgnoreCase(resolvedPhysicalAddress.getHostString()) + && logicalAddress.getPort() == resolvedPhysicalAddress.getPort())) { + // We are connecting through a proxy. We need to set the target broker in the ClientCnx object so that + // it can be specified when sending the CommandConnect. + cnx.setTargetBroker(logicalAddress); + } + + cnx.setRemoteHostName(resolvedPhysicalAddress.getHostString()); + + return ch; + })); + } }