Skip to content

Commit

Permalink
[fix][client] Set fields earlier for correct ClientCnx initialization (
Browse files Browse the repository at this point in the history
…#19327)

(cherry picked from commit 3d8b52a)
  • Loading branch information
michaeljmarshall committed Jan 31, 2023
1 parent eec0499 commit 6f6130e
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalA
final CompletableFuture<ClientCnx> cnxFuture = new CompletableFuture<>();

// Trigger async connect to broker
createConnection(physicalAddress).thenAccept(channel -> {
createConnection(logicalAddress, physicalAddress).thenAccept(channel -> {
log.info("[{}] Connected to server", channel);

channel.closeFuture().addListener(v -> {
Expand All @@ -204,16 +204,6 @@ private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalA
return;
}

if (!logicalAddress.equals(physicalAddress)) {
// We are connecting through a proxy. We need to set the target broker in the ClientCnx object so that
// it can be specified when sending the CommandConnect.
// That phase will happen in the ClientCnx.connectionActive() which will be invoked immediately after
// this method.
cnx.setTargetBroker(logicalAddress);
}

cnx.setRemoteHostName(physicalAddress.getHostString());

cnx.connectionFuture().thenRun(() -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Connection handshake completed", cnx.channel());
Expand Down Expand Up @@ -241,19 +231,20 @@ private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalA
/**
* Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server.
*/
private CompletableFuture<Channel> createConnection(InetSocketAddress unresolvedAddress) {
private CompletableFuture<Channel> createConnection(InetSocketAddress logicalAddress,
InetSocketAddress unresolvedPhysicalAddress) {
CompletableFuture<List<InetSocketAddress>> resolvedAddress;
try {
if (isSniProxy) {
URI proxyURI = new URI(clientConfig.getProxyServiceUrl());
resolvedAddress =
resolveName(InetSocketAddress.createUnresolved(proxyURI.getHost(), proxyURI.getPort()));
} else {
resolvedAddress = resolveName(unresolvedAddress);
resolvedAddress = resolveName(unresolvedPhysicalAddress);
}
return resolvedAddress.thenCompose(
inetAddresses -> connectToResolvedAddresses(inetAddresses.iterator(),
isSniProxy ? unresolvedAddress : null));
inetAddresses -> connectToResolvedAddresses(logicalAddress, inetAddresses.iterator(),
isSniProxy ? unresolvedPhysicalAddress : null));
} catch (URISyntaxException e) {
log.error("Invalid Proxy url {}", clientConfig.getProxyServiceUrl(), e);
return FutureUtil
Expand All @@ -265,17 +256,19 @@ private CompletableFuture<Channel> createConnection(InetSocketAddress unresolved
* Try to connect to a sequence of IP addresses until a successful connection can be made, or fail if no
* address is working.
*/
private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetSocketAddress> unresolvedAddresses,
private CompletableFuture<Channel> connectToResolvedAddresses(InetSocketAddress logicalAddress,
Iterator<InetSocketAddress> resolvedPhysicalAddress,
InetSocketAddress sniHost) {
CompletableFuture<Channel> future = new CompletableFuture<>();

// Successfully connected to server
connectToAddress(unresolvedAddresses.next(), sniHost)
connectToAddress(logicalAddress, resolvedPhysicalAddress.next(), sniHost)
.thenAccept(future::complete)
.exceptionally(exception -> {
if (unresolvedAddresses.hasNext()) {
if (resolvedPhysicalAddress.hasNext()) {
// Try next IP address
connectToResolvedAddresses(unresolvedAddresses, sniHost).thenAccept(future::complete)
connectToResolvedAddresses(logicalAddress, resolvedPhysicalAddress, sniHost)
.thenAccept(future::complete)
.exceptionally(ex -> {
// This is already unwinding the recursive call
future.completeExceptionally(ex);
Expand Down Expand Up @@ -306,17 +299,22 @@ CompletableFuture<List<InetSocketAddress>> resolveName(InetSocketAddress unresol
/**
* Attempt to establish a TCP connection to an already resolved single IP address.
*/
private CompletableFuture<Channel> connectToAddress(InetSocketAddress remoteAddress, InetSocketAddress sniHost) {
private CompletableFuture<Channel> connectToAddress(InetSocketAddress logicalAddress,
InetSocketAddress physicalAddress, InetSocketAddress sniHost) {
if (clientConfig.isUseTls()) {
return toCompletableFuture(bootstrap.register())
.thenCompose(channel -> channelInitializerHandler
.initTls(channel, sniHost != null ? sniHost : remoteAddress))
.initTls(channel, sniHost != null ? sniHost : physicalAddress))
.thenCompose(channelInitializerHandler::initSocks5IfConfig)
.thenCompose(channel -> toCompletableFuture(channel.connect(remoteAddress)));
.thenCompose(ch ->
channelInitializerHandler.initializeClientCnx(ch, logicalAddress, physicalAddress))
.thenCompose(channel -> toCompletableFuture(channel.connect(physicalAddress)));
} else {
return toCompletableFuture(bootstrap.register())
.thenCompose(channelInitializerHandler::initSocks5IfConfig)
.thenCompose(channel -> toCompletableFuture(channel.connect(remoteAddress)));
.thenCompose(ch ->
channelInitializerHandler.initializeClientCnx(ch, logicalAddress, physicalAddress))
.thenCompose(channel -> toCompletableFuture(channel.connect(physicalAddress)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
import org.apache.pulsar.common.util.netty.NettyFutureUtil;

@Slf4j
public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> {
Expand Down Expand Up @@ -202,5 +203,29 @@ CompletableFuture<Channel> initSocks5IfConfig(Channel ch) {

return initSocks5Future;
}

CompletableFuture<Channel> initializeClientCnx(Channel ch,
InetSocketAddress logicalAddress,
InetSocketAddress resolvedPhysicalAddress) {
return NettyFutureUtil.toCompletableFuture(ch.eventLoop().submit(() -> {
final ClientCnx cnx = (ClientCnx) ch.pipeline().get("handler");

if (cnx == null) {
throw new IllegalStateException("Missing ClientCnx. This should not happen.");
}

// Need to do our own equality because the physical address is resolved already
if (!(logicalAddress.getHostString().equalsIgnoreCase(resolvedPhysicalAddress.getHostString())
&& logicalAddress.getPort() == resolvedPhysicalAddress.getPort())) {
// We are connecting through a proxy. We need to set the target broker in the ClientCnx object so that
// it can be specified when sending the CommandConnect.
cnx.setTargetBroker(logicalAddress);
}

cnx.setRemoteHostName(resolvedPhysicalAddress.getHostString());

return ch;
}));
}
}

0 comments on commit 6f6130e

Please sign in to comment.