diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java index e8816894513d2..fb564bd5083c1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java @@ -20,12 +20,17 @@ import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import io.netty.channel.EventLoopGroup; +import io.netty.resolver.AbstractAddressResolver; import io.netty.util.concurrent.DefaultThreadFactory; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; import java.util.stream.IntStream; +import io.netty.util.concurrent.Promise; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.util.netty.EventLoopUtil; @@ -66,7 +71,7 @@ public void testSingleIpAddress() throws Exception { List result = new ArrayList<>(); result.add(new InetSocketAddress("127.0.0.1", brokerPort)); Mockito.when(pool.resolveName(InetSocketAddress.createUnresolved("non-existing-dns-name", - brokerPort))) + brokerPort))) .thenReturn(CompletableFuture.completedFuture(result)); client.newProducer().topic("persistent://sample/standalone/ns/my-topic").create(); @@ -107,7 +112,7 @@ public void testNoConnectionPool() throws Exception { ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop); InetSocketAddress brokerAddress = - InetSocketAddress.createUnresolved("127.0.0.1", brokerPort); + InetSocketAddress.createUnresolved("127.0.0.1", brokerPort); IntStream.range(1, 5).forEach(i -> { pool.getConnection(brokerAddress).thenAccept(cnx -> { Assert.assertTrue(cnx.channel().isActive()); @@ -119,6 +124,7 @@ public void testNoConnectionPool() throws Exception { pool.closeAllConnections(); pool.close(); + eventLoop.shutdownGracefully(); } @Test @@ -129,7 +135,7 @@ public void testEnableConnectionPool() throws Exception { ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop); InetSocketAddress brokerAddress = - InetSocketAddress.createUnresolved("127.0.0.1", brokerPort); + InetSocketAddress.createUnresolved("127.0.0.1", brokerPort); IntStream.range(1, 10).forEach(i -> { pool.getConnection(brokerAddress).thenAccept(cnx -> { Assert.assertTrue(cnx.channel().isActive()); @@ -141,5 +147,82 @@ public void testEnableConnectionPool() throws Exception { pool.closeAllConnections(); pool.close(); + eventLoop.shutdownGracefully(); + } + + + @Test + public void testSetProxyToTargetBrokerAddress() throws Exception { + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setConnectionsPerBroker(5); + + + EventLoopGroup eventLoop = + EventLoopUtil.newEventLoopGroup(8, false, + new DefaultThreadFactory("test")); + + final AbstractAddressResolver resolver = new AbstractAddressResolver(eventLoop.next()) { + @Override + protected boolean doIsResolved(SocketAddress socketAddress) { + return !((InetSocketAddress) socketAddress).isUnresolved(); + } + + @Override + protected void doResolve(SocketAddress socketAddress, Promise promise) throws Exception { + promise.setFailure(new IllegalStateException()); + throw new IllegalStateException(); + } + + @Override + protected void doResolveAll(SocketAddress socketAddress, Promise promise) throws Exception { + final InetSocketAddress socketAddress1 = (InetSocketAddress) socketAddress; + final boolean isProxy = socketAddress1.getHostName().equals("proxy"); + final boolean isBroker = socketAddress1.getHostName().equals("broker"); + if (!isProxy && !isBroker) { + promise.setFailure(new IllegalStateException()); + throw new IllegalStateException(); + } + List result = new ArrayList<>(); + if (isProxy) { + result.add(new InetSocketAddress("localhost", brokerPort)); + result.add(InetSocketAddress.createUnresolved("proxy", brokerPort)); + } else { + result.add(new InetSocketAddress("127.0.0.1", brokerPort)); + result.add(InetSocketAddress.createUnresolved("broker", brokerPort)); + } + promise.setSuccess(result); + } + }; + + ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop, + (Supplier) () -> new ClientCnx(conf, eventLoop), Optional.of(resolver)); + + + ClientCnx cnx = pool.getConnection( + InetSocketAddress.createUnresolved("proxy", 9999), + InetSocketAddress.createUnresolved("proxy", 9999)).get(); + Assert.assertEquals(cnx.remoteHostName, "proxy"); + Assert.assertNull(cnx.proxyToTargetBrokerAddress); + cnx.close(); + + cnx = pool.getConnection( + InetSocketAddress.createUnresolved("broker", 9999), + InetSocketAddress.createUnresolved("proxy", 9999)).get(); + Assert.assertEquals(cnx.remoteHostName, "proxy"); + Assert.assertEquals(cnx.proxyToTargetBrokerAddress, "broker:9999"); + cnx.close(); + + + cnx = pool.getConnection( + InetSocketAddress.createUnresolved("broker", 9999), + InetSocketAddress.createUnresolved("broker", 9999)).get(); + Assert.assertEquals(cnx.remoteHostName, "broker"); + Assert.assertNull(cnx.proxyToTargetBrokerAddress); + cnx.close(); + + + pool.closeAllConnections(); + pool.close(); + eventLoop.shutdownGracefully(); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 2e105b5328467..3a9a2b9b7ab94 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -305,8 +305,12 @@ private CompletableFuture createConnection(InetSocketAddress logicalAdd resolvedAddress = resolveName(unresolvedPhysicalAddress); } return resolvedAddress.thenCompose( - inetAddresses -> connectToResolvedAddresses(logicalAddress, inetAddresses.iterator(), - isSniProxy ? unresolvedPhysicalAddress : null)); + inetAddresses -> connectToResolvedAddresses( + logicalAddress, + unresolvedPhysicalAddress, + inetAddresses.iterator(), + isSniProxy ? unresolvedPhysicalAddress : null) + ); } catch (URISyntaxException e) { log.error("Invalid Proxy url {}", clientConfig.getProxyServiceUrl(), e); return FutureUtil @@ -319,17 +323,19 @@ private CompletableFuture createConnection(InetSocketAddress logicalAdd * address is working. */ private CompletableFuture connectToResolvedAddresses(InetSocketAddress logicalAddress, + InetSocketAddress unresolvedPhysicalAddress, Iterator resolvedPhysicalAddress, InetSocketAddress sniHost) { CompletableFuture future = new CompletableFuture<>(); // Successfully connected to server - connectToAddress(logicalAddress, resolvedPhysicalAddress.next(), sniHost) + connectToAddress(logicalAddress, resolvedPhysicalAddress.next(), unresolvedPhysicalAddress, sniHost) .thenAccept(future::complete) .exceptionally(exception -> { if (resolvedPhysicalAddress.hasNext()) { // Try next IP address - connectToResolvedAddresses(logicalAddress, resolvedPhysicalAddress, sniHost) + connectToResolvedAddresses(logicalAddress, unresolvedPhysicalAddress, + resolvedPhysicalAddress, sniHost) .thenAccept(future::complete) .exceptionally(ex -> { // This is already unwinding the recursive call @@ -362,20 +368,24 @@ CompletableFuture> resolveName(InetSocketAddress unresol * Attempt to establish a TCP connection to an already resolved single IP address. */ private CompletableFuture connectToAddress(InetSocketAddress logicalAddress, - InetSocketAddress physicalAddress, InetSocketAddress sniHost) { + InetSocketAddress physicalAddress, + InetSocketAddress unresolvedPhysicalAddress, + InetSocketAddress sniHost) { if (clientConfig.isUseTls()) { return toCompletableFuture(bootstrap.register()) .thenCompose(channel -> channelInitializerHandler .initTls(channel, sniHost != null ? sniHost : physicalAddress)) .thenCompose(channelInitializerHandler::initSocks5IfConfig) .thenCompose(ch -> - channelInitializerHandler.initializeClientCnx(ch, logicalAddress, physicalAddress)) + channelInitializerHandler.initializeClientCnx(ch, logicalAddress, + unresolvedPhysicalAddress)) .thenCompose(channel -> toCompletableFuture(channel.connect(physicalAddress))); } else { return toCompletableFuture(bootstrap.register()) .thenCompose(channelInitializerHandler::initSocks5IfConfig) .thenCompose(ch -> - channelInitializerHandler.initializeClientCnx(ch, logicalAddress, physicalAddress)) + channelInitializerHandler.initializeClientCnx(ch, logicalAddress, + unresolvedPhysicalAddress)) .thenCompose(channel -> toCompletableFuture(channel.connect(physicalAddress))); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java index e01b53b8ef136..ed34f7d41c130 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java @@ -213,7 +213,7 @@ CompletableFuture initSocks5IfConfig(Channel ch) { CompletableFuture initializeClientCnx(Channel ch, InetSocketAddress logicalAddress, - InetSocketAddress resolvedPhysicalAddress) { + InetSocketAddress unresolvedPhysicalAddress) { return NettyFutureUtil.toCompletableFuture(ch.eventLoop().submit(() -> { final ClientCnx cnx = (ClientCnx) ch.pipeline().get("handler"); @@ -221,15 +221,13 @@ CompletableFuture initializeClientCnx(Channel ch, throw new IllegalStateException("Missing ClientCnx. This should not happen."); } - // Need to do our own equality because the physical address is resolved already - if (!(logicalAddress.getHostString().equalsIgnoreCase(resolvedPhysicalAddress.getHostString()) - && logicalAddress.getPort() == resolvedPhysicalAddress.getPort())) { + if (!logicalAddress.equals(unresolvedPhysicalAddress)) { // We are connecting through a proxy. We need to set the target broker in the ClientCnx object so that // it can be specified when sending the CommandConnect. cnx.setTargetBroker(logicalAddress); } - cnx.setRemoteHostName(resolvedPhysicalAddress.getHostString()); + cnx.setRemoteHostName(unresolvedPhysicalAddress.getHostString()); return ch; }));