diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index 36b3b4c6038fba..f03aa59619fd87 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -81,7 +81,7 @@ */ public class ProxyConnection extends PulsarHandler { private static final Logger LOG = LoggerFactory.getLogger(ProxyConnection.class); - // ConnectionPool is used by the proxy to issue lookup requests + // ConnectionPool is used by the proxy to issue lookup requests. It is null when doing direct broker proxying. private ConnectionPool connectionPool; private final AtomicLong requestIdGenerator = new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2)); @@ -313,24 +313,7 @@ protected static boolean isTlsChannel(Channel channel) { } private synchronized void completeConnect() throws PulsarClientException { - Supplier clientCnxSupplier; - if (service.getConfiguration().isAuthenticationEnabled()) { - clientCnxSupplier = () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, - clientAuthData, clientAuthMethod, protocolVersionToAdvertise, - service.getConfiguration().isForwardAuthorizationCredentials(), this); - } else { - clientCnxSupplier = () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise); - } - - if (this.connectionPool == null) { - this.connectionPool = new ConnectionPool(clientConf, service.getWorkerGroup(), - clientCnxSupplier, - Optional.of(dnsAddressResolverGroup.getResolver(service.getWorkerGroup().next()))); - } else { - LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}", - remoteAddress, state, clientAuthRole); - } - + checkArgument(state == State.Connecting); LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}", remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl); if (hasProxyToBrokerUrl) { @@ -371,8 +354,26 @@ private synchronized void completeConnect() throws PulsarClientException { }); } else { // Client is doing a lookup, we can consider the handshake complete - // and we'll take care of just topics and - // partitions metadata lookups + // and we'll take care of just topics and partitions metadata lookups + Supplier clientCnxSupplier; + if (service.getConfiguration().isAuthenticationEnabled()) { + clientCnxSupplier = () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, + clientAuthData, clientAuthMethod, protocolVersionToAdvertise, + service.getConfiguration().isForwardAuthorizationCredentials(), this); + } else { + clientCnxSupplier = + () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise); + } + + if (this.connectionPool == null) { + this.connectionPool = new ConnectionPool(clientConf, service.getWorkerGroup(), + clientCnxSupplier, + Optional.of(dnsAddressResolverGroup.getResolver(service.getWorkerGroup().next()))); + } else { + LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}", + remoteAddress, state, clientAuthRole); + } + state = State.ProxyLookupRequests; lookupProxyHandler = service.newLookupProxyHandler(this); final ByteBuf msg = Commands.newConnected(protocolVersionToAdvertise, false); @@ -452,7 +453,7 @@ protected void authChallengeSuccessCallback(AuthData authChallenge) { } // First connection - if (this.connectionPool == null || state == State.Connecting) { + if (state == State.Connecting) { // authentication has completed, will send newConnected command. completeConnect(); }