Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Set TargetBroker and RemoteHostName before active #20

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalA
final CompletableFuture<ClientCnx> cnxFuture = new CompletableFuture<>();

// Trigger async connect to broker
createConnection(physicalAddress).thenAccept(channel -> {
createConnection(logicalAddress, physicalAddress).thenAccept(channel -> {
log.info("[{}] Connected to server", channel);

channel.closeFuture().addListener(v -> {
Expand All @@ -266,16 +266,6 @@ private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalA
return;
}

if (!logicalAddress.equals(physicalAddress)) {
// We are connecting through a proxy. We need to set the target broker in the ClientCnx object so that
// it can be specified when sending the CommandConnect.
// That phase will happen in the ClientCnx.connectionActive() which will be invoked immediately after
// this method.
cnx.setTargetBroker(logicalAddress);
}

cnx.setRemoteHostName(physicalAddress.getHostString());

cnx.connectionFuture().thenRun(() -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Connection handshake completed", cnx.channel());
Expand Down Expand Up @@ -303,19 +293,20 @@ private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalA
/**
* Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server.
*/
private CompletableFuture<Channel> createConnection(InetSocketAddress unresolvedAddress) {
private CompletableFuture<Channel> createConnection(InetSocketAddress logicalAddress,
InetSocketAddress unresolvedPhysicalAddress) {
CompletableFuture<List<InetSocketAddress>> resolvedAddress;
try {
if (isSniProxy) {
URI proxyURI = new URI(clientConfig.getProxyServiceUrl());
resolvedAddress =
resolveName(InetSocketAddress.createUnresolved(proxyURI.getHost(), proxyURI.getPort()));
} else {
resolvedAddress = resolveName(unresolvedAddress);
resolvedAddress = resolveName(unresolvedPhysicalAddress);
}
return resolvedAddress.thenCompose(
inetAddresses -> connectToResolvedAddresses(inetAddresses.iterator(),
isSniProxy ? unresolvedAddress : null));
inetAddresses -> connectToResolvedAddresses(logicalAddress, inetAddresses.iterator(),
isSniProxy ? unresolvedPhysicalAddress : null));
} catch (URISyntaxException e) {
log.error("Invalid Proxy url {}", clientConfig.getProxyServiceUrl(), e);
return FutureUtil
Expand All @@ -327,17 +318,19 @@ private CompletableFuture<Channel> createConnection(InetSocketAddress unresolved
* Try to connect to a sequence of IP addresses until a successful connection can be made, or fail if no
* address is working.
*/
private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetSocketAddress> unresolvedAddresses,
private CompletableFuture<Channel> connectToResolvedAddresses(InetSocketAddress logicalAddress,
Iterator<InetSocketAddress> resolvedPhysicalAddress,
InetSocketAddress sniHost) {
CompletableFuture<Channel> future = new CompletableFuture<>();

// Successfully connected to server
connectToAddress(unresolvedAddresses.next(), sniHost)
connectToAddress(logicalAddress, resolvedPhysicalAddress.next(), sniHost)
.thenAccept(future::complete)
.exceptionally(exception -> {
if (unresolvedAddresses.hasNext()) {
if (resolvedPhysicalAddress.hasNext()) {
// Try next IP address
connectToResolvedAddresses(unresolvedAddresses, sniHost).thenAccept(future::complete)
connectToResolvedAddresses(logicalAddress, resolvedPhysicalAddress, sniHost)
.thenAccept(future::complete)
.exceptionally(ex -> {
// This is already unwinding the recursive call
future.completeExceptionally(ex);
Expand Down Expand Up @@ -368,17 +361,22 @@ CompletableFuture<List<InetSocketAddress>> resolveName(InetSocketAddress unresol
/**
* Attempt to establish a TCP connection to an already resolved single IP address.
*/
private CompletableFuture<Channel> connectToAddress(InetSocketAddress remoteAddress, InetSocketAddress sniHost) {
private CompletableFuture<Channel> connectToAddress(InetSocketAddress logicalAddress,
InetSocketAddress physicalAddress, InetSocketAddress sniHost) {
if (clientConfig.isUseTls()) {
return toCompletableFuture(bootstrap.register())
.thenCompose(channel -> channelInitializerHandler
.initTls(channel, sniHost != null ? sniHost : remoteAddress))
.initTls(channel, sniHost != null ? sniHost : physicalAddress))
.thenCompose(channelInitializerHandler::initSocks5IfConfig)
.thenCompose(channel -> toCompletableFuture(channel.connect(remoteAddress)));
.thenCompose(ch ->
channelInitializerHandler.initializeClientCnx(ch, logicalAddress, physicalAddress))
.thenCompose(channel -> toCompletableFuture(channel.connect(physicalAddress)));
} else {
return toCompletableFuture(bootstrap.register())
.thenCompose(channelInitializerHandler::initSocks5IfConfig)
.thenCompose(channel -> toCompletableFuture(channel.connect(remoteAddress)));
.thenCompose(ch ->
channelInitializerHandler.initializeClientCnx(ch, logicalAddress, physicalAddress))
.thenCompose(channel -> toCompletableFuture(channel.connect(physicalAddress)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
import org.apache.pulsar.common.util.netty.NettyFutureUtil;

@Slf4j
public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> {
Expand Down Expand Up @@ -209,5 +210,29 @@ CompletableFuture<Channel> initSocks5IfConfig(Channel ch) {

return initSocks5Future;
}

CompletableFuture<Channel> initializeClientCnx(Channel ch,
InetSocketAddress logicalAddress,
InetSocketAddress resolvedPhysicalAddress) {
return NettyFutureUtil.toCompletableFuture(ch.eventLoop().submit(() -> {
final ClientCnx cnx = (ClientCnx) ch.pipeline().get("handler");

if (cnx == null) {
throw new IllegalStateException("Missing ClientCnx. This should not happen.");
}

// Need to do our own equality because the physical address is resolved already
if (!(logicalAddress.getHostString().equalsIgnoreCase(resolvedPhysicalAddress.getHostString())
&& logicalAddress.getPort() == resolvedPhysicalAddress.getPort())) {
// We are connecting through a proxy. We need to set the target broker in the ClientCnx object so that
// it can be specified when sending the CommandConnect.
cnx.setTargetBroker(logicalAddress);
}

cnx.setRemoteHostName(resolvedPhysicalAddress.getHostString());

return ch;
}));
}
}