Skip to content

Commit

Permalink
[fix][client] Broker address resolution wrong if connect through a mu…
Browse files Browse the repository at this point in the history
…lti-dns names proxy (apache#19597)

(cherry picked from commit e286339)
(cherry picked from commit 14b070b)
(cherry picked from commit 27f0449)
  • Loading branch information
nicoloboschi committed Feb 28, 2023
1 parent 32dfda9 commit 525517f
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,17 @@
import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import com.google.common.collect.Lists;
import io.netty.channel.EventLoopGroup;
import io.netty.resolver.AbstractAddressResolver;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import io.netty.util.concurrent.Promise;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
Expand Down Expand Up @@ -66,7 +72,7 @@ public void testSingleIpAddress() throws Exception {
List<InetSocketAddress> result = Lists.newArrayList();
result.add(new InetSocketAddress("127.0.0.1", brokerPort));
Mockito.when(pool.resolveName(InetSocketAddress.createUnresolved("non-existing-dns-name",
brokerPort)))
brokerPort)))
.thenReturn(CompletableFuture.completedFuture(result));

client.newProducer().topic("persistent://sample/standalone/ns/my-topic").create();
Expand Down Expand Up @@ -107,7 +113,7 @@ public void testNoConnectionPool() throws Exception {
ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop);

InetSocketAddress brokerAddress =
InetSocketAddress.createUnresolved("127.0.0.1", brokerPort);
InetSocketAddress.createUnresolved("127.0.0.1", brokerPort);
IntStream.range(1, 5).forEach(i -> {
pool.getConnection(brokerAddress).thenAccept(cnx -> {
Assert.assertTrue(cnx.channel().isActive());
Expand All @@ -119,6 +125,7 @@ public void testNoConnectionPool() throws Exception {

pool.closeAllConnections();
pool.close();
eventLoop.shutdownGracefully();
}

@Test
Expand All @@ -129,7 +136,7 @@ public void testEnableConnectionPool() throws Exception {
ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop);

InetSocketAddress brokerAddress =
InetSocketAddress.createUnresolved("127.0.0.1", brokerPort);
InetSocketAddress.createUnresolved("127.0.0.1", brokerPort);
IntStream.range(1, 10).forEach(i -> {
pool.getConnection(brokerAddress).thenAccept(cnx -> {
Assert.assertTrue(cnx.channel().isActive());
Expand All @@ -141,5 +148,82 @@ public void testEnableConnectionPool() throws Exception {

pool.closeAllConnections();
pool.close();
eventLoop.shutdownGracefully();
}


@Test
public void testSetProxyToTargetBrokerAddress() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setConnectionsPerBroker(5);


EventLoopGroup eventLoop =
EventLoopUtil.newEventLoopGroup(8, false,
new DefaultThreadFactory("test"));

final AbstractAddressResolver resolver = new AbstractAddressResolver(eventLoop.next()) {
@Override
protected boolean doIsResolved(SocketAddress socketAddress) {
return !((InetSocketAddress) socketAddress).isUnresolved();
}

@Override
protected void doResolve(SocketAddress socketAddress, Promise promise) throws Exception {
promise.setFailure(new IllegalStateException());
throw new IllegalStateException();
}

@Override
protected void doResolveAll(SocketAddress socketAddress, Promise promise) throws Exception {
final InetSocketAddress socketAddress1 = (InetSocketAddress) socketAddress;
final boolean isProxy = socketAddress1.getHostName().equals("proxy");
final boolean isBroker = socketAddress1.getHostName().equals("broker");
if (!isProxy && !isBroker) {
promise.setFailure(new IllegalStateException());
throw new IllegalStateException();
}
List<InetSocketAddress> result = new ArrayList<>();
if (isProxy) {
result.add(new InetSocketAddress("localhost", brokerPort));
result.add(InetSocketAddress.createUnresolved("proxy", brokerPort));
} else {
result.add(new InetSocketAddress("127.0.0.1", brokerPort));
result.add(InetSocketAddress.createUnresolved("broker", brokerPort));
}
promise.setSuccess(result);
}
};

ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop,
(Supplier<ClientCnx>) () -> new ClientCnx(conf, eventLoop), Optional.of(resolver));


ClientCnx cnx = pool.getConnection(
InetSocketAddress.createUnresolved("proxy", 9999),
InetSocketAddress.createUnresolved("proxy", 9999)).get();
Assert.assertEquals(cnx.remoteHostName, "proxy");
Assert.assertNull(cnx.proxyToTargetBrokerAddress);
cnx.close();

cnx = pool.getConnection(
InetSocketAddress.createUnresolved("broker", 9999),
InetSocketAddress.createUnresolved("proxy", 9999)).get();
Assert.assertEquals(cnx.remoteHostName, "proxy");
Assert.assertEquals(cnx.proxyToTargetBrokerAddress, "broker:9999");
cnx.close();


cnx = pool.getConnection(
InetSocketAddress.createUnresolved("broker", 9999),
InetSocketAddress.createUnresolved("broker", 9999)).get();
Assert.assertEquals(cnx.remoteHostName, "broker");
Assert.assertNull(cnx.proxyToTargetBrokerAddress);
cnx.close();


pool.closeAllConnections();
pool.close();
eventLoop.shutdownGracefully();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,12 @@ private CompletableFuture<Channel> createConnection(InetSocketAddress logicalAdd
resolvedAddress = resolveName(unresolvedPhysicalAddress);
}
return resolvedAddress.thenCompose(
inetAddresses -> connectToResolvedAddresses(logicalAddress, inetAddresses.iterator(),
isSniProxy ? unresolvedPhysicalAddress : null));
inetAddresses -> connectToResolvedAddresses(
logicalAddress,
unresolvedPhysicalAddress,
inetAddresses.iterator(),
isSniProxy ? unresolvedPhysicalAddress : null)
);
} catch (URISyntaxException e) {
log.error("Invalid Proxy url {}", clientConfig.getProxyServiceUrl(), e);
return FutureUtil
Expand All @@ -274,17 +278,19 @@ private CompletableFuture<Channel> createConnection(InetSocketAddress logicalAdd
* address is working.
*/
private CompletableFuture<Channel> connectToResolvedAddresses(InetSocketAddress logicalAddress,
InetSocketAddress unresolvedPhysicalAddress,
Iterator<InetSocketAddress> resolvedPhysicalAddress,
InetSocketAddress sniHost) {
CompletableFuture<Channel> future = new CompletableFuture<>();

// Successfully connected to server
connectToAddress(logicalAddress, resolvedPhysicalAddress.next(), sniHost)
connectToAddress(logicalAddress, resolvedPhysicalAddress.next(), unresolvedPhysicalAddress, sniHost)
.thenAccept(future::complete)
.exceptionally(exception -> {
if (resolvedPhysicalAddress.hasNext()) {
// Try next IP address
connectToResolvedAddresses(logicalAddress, resolvedPhysicalAddress, sniHost)
connectToResolvedAddresses(logicalAddress, unresolvedPhysicalAddress,
resolvedPhysicalAddress, sniHost)
.thenAccept(future::complete)
.exceptionally(ex -> {
// This is already unwinding the recursive call
Expand Down Expand Up @@ -317,20 +323,24 @@ CompletableFuture<List<InetSocketAddress>> resolveName(InetSocketAddress unresol
* Attempt to establish a TCP connection to an already resolved single IP address.
*/
private CompletableFuture<Channel> connectToAddress(InetSocketAddress logicalAddress,
InetSocketAddress physicalAddress, InetSocketAddress sniHost) {
InetSocketAddress physicalAddress,
InetSocketAddress unresolvedPhysicalAddress,
InetSocketAddress sniHost) {
if (clientConfig.isUseTls()) {
return toCompletableFuture(bootstrap.register())
.thenCompose(channel -> channelInitializerHandler
.initTls(channel, sniHost != null ? sniHost : physicalAddress))
.thenCompose(channelInitializerHandler::initSocks5IfConfig)
.thenCompose(ch ->
channelInitializerHandler.initializeClientCnx(ch, logicalAddress, physicalAddress))
channelInitializerHandler.initializeClientCnx(ch, logicalAddress,
unresolvedPhysicalAddress))
.thenCompose(channel -> toCompletableFuture(channel.connect(physicalAddress)));
} else {
return toCompletableFuture(bootstrap.register())
.thenCompose(channelInitializerHandler::initSocks5IfConfig)
.thenCompose(ch ->
channelInitializerHandler.initializeClientCnx(ch, logicalAddress, physicalAddress))
channelInitializerHandler.initializeClientCnx(ch, logicalAddress,
unresolvedPhysicalAddress))
.thenCompose(channel -> toCompletableFuture(channel.connect(physicalAddress)));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,23 +207,21 @@ CompletableFuture<Channel> initSocks5IfConfig(Channel ch) {

CompletableFuture<Channel> initializeClientCnx(Channel ch,
InetSocketAddress logicalAddress,
InetSocketAddress resolvedPhysicalAddress) {
InetSocketAddress unresolvedPhysicalAddress) {
return NettyFutureUtil.toCompletableFuture(ch.eventLoop().submit(() -> {
final ClientCnx cnx = (ClientCnx) ch.pipeline().get("handler");

if (cnx == null) {
throw new IllegalStateException("Missing ClientCnx. This should not happen.");
}

// Need to do our own equality because the physical address is resolved already
if (!(logicalAddress.getHostString().equalsIgnoreCase(resolvedPhysicalAddress.getHostString())
&& logicalAddress.getPort() == resolvedPhysicalAddress.getPort())) {
if (!logicalAddress.equals(unresolvedPhysicalAddress)) {
// We are connecting through a proxy. We need to set the target broker in the ClientCnx object so that
// it can be specified when sending the CommandConnect.
cnx.setTargetBroker(logicalAddress);
}

cnx.setRemoteHostName(resolvedPhysicalAddress.getHostString());
cnx.setRemoteHostName(unresolvedPhysicalAddress.getHostString());

return ch;
}));
Expand Down

0 comments on commit 525517f

Please sign in to comment.