Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][proxy] Fix refresh client auth #17831

Merged
merged 2 commits into from
Sep 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
public class ClientCnx extends PulsarHandler {

protected final Authentication authentication;
private State state;
protected State state;

@Getter
private final ConcurrentLongHashMap<TimedCompletableFuture<? extends Object>> pendingRequests =
Expand Down Expand Up @@ -155,7 +155,7 @@ public class ClientCnx extends PulsarHandler {

private final int maxNumberOfRejectedRequestPerConnection;
private final int rejectedRequestResetTimeSec = 60;
private final int protocolVersion;
protected final int protocolVersion;
private final long operationTimeoutMs;

protected String proxyToTargetBrokerAddress = null;
Expand All @@ -176,7 +176,10 @@ public class ClientCnx extends PulsarHandler {
@Getter
private final ClientCnxIdleState idleState;

enum State {
@Getter
private long lastDisconnectedTimestamp;

protected enum State {
None, SentConnectFrame, Ready, Failed, Connecting
}

Expand Down Expand Up @@ -281,6 +284,7 @@ protected ByteBuf newConnectCommand() throws Exception {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
lastDisconnectedTimestamp = System.currentTimeMillis();
log.info("{} Disconnected", ctx.channel());
if (!connectionFuture.isDone()) {
connectionFuture.completeExceptionally(new PulsarClientException("Connection already closed"));
Expand Down Expand Up @@ -1243,6 +1247,13 @@ public void close() {
}
}

protected void closeWithException(Throwable e) {
if (ctx != null) {
connectionFuture.completeExceptionally(e);
ctx.close();
}
}

private void checkRequestTimeout() {
while (!requestTimeoutQueue.isEmpty()) {
RequestTime request = requestTimeoutQueue.peek();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,19 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
Expand Down Expand Up @@ -453,5 +456,9 @@ public void doMarkAndReleaseUselessConnections(){
// Do release idle connections.
releaseIdleConnectionTaskList.forEach(Runnable::run);
}
}

public Set<CompletableFuture<ClientCnx>> getConnections() {
return Collections.unmodifiableSet(
pool.values().stream().flatMap(n -> n.values().stream()).collect(Collectors.toSet()));
}
}
5 changes: 5 additions & 0 deletions pulsar-proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@
<artifactId>ipaddress</artifactId>
<version>${seancfoley.ipaddress.version}</version>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,35 @@
*/
package org.apache.pulsar.proxy.server;

import static com.google.common.base.Preconditions.checkArgument;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import java.util.Arrays;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
import org.apache.pulsar.common.protocol.Commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Slf4j
public class ProxyClientCnx extends ClientCnx {

String clientAuthRole;
AuthData clientAuthData;
String clientAuthMethod;
int protocolVersion;
private final boolean forwardClientAuthData;
private final String clientAuthMethod;
private final String clientAuthRole;
private final AuthData clientAuthData;
mattisonchao marked this conversation as resolved.
Show resolved Hide resolved
private final ProxyConnection proxyConnection;

public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,
AuthData clientAuthData, String clientAuthMethod, int protocolVersion) {
super(conf, eventLoopGroup);
AuthData clientAuthData, String clientAuthMethod, int protocolVersion,
boolean forwardClientAuthData, ProxyConnection proxyConnection) {
super(conf, eventLoopGroup, protocolVersion);
this.clientAuthRole = clientAuthRole;
this.clientAuthData = clientAuthData;
this.clientAuthMethod = clientAuthMethod;
this.protocolVersion = protocolVersion;
this.forwardClientAuthData = forwardClientAuthData;
this.proxyConnection = proxyConnection;
}

@Override
Expand All @@ -54,10 +59,54 @@ protected ByteBuf newConnectCommand() throws Exception {

authenticationDataProvider = authentication.getAuthData(remoteHostName);
AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
clientAuthMethod);
return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion,
PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
clientAuthMethod);
}

private static final Logger log = LoggerFactory.getLogger(ProxyClientCnx.class);
@Override
protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
checkArgument(authChallenge.hasChallenge());
checkArgument(authChallenge.getChallenge().hasAuthData());

boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData());
if (!forwardClientAuthData || !isRefresh) {
super.handleAuthChallenge(authChallenge);
return;
}

try {
if (log.isDebugEnabled()) {
log.debug("Proxy {} request to refresh the original client authentication data for "
+ "the proxy client {}", proxyConnection.ctx().channel(), ctx.channel());
}

proxyConnection.ctx().writeAndFlush(Commands.newAuthChallenge(clientAuthMethod, AuthData.REFRESH_AUTH_DATA,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we can make some improvements to avoid sending auth challenge command to the client if the proxy just has done the auth challenge. Because all the brokers that interacted with client connection will send the auth challenge command

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like it could be improved, but I remember there's only one connection here.

protocolVersion))
.addListener(writeFuture -> {
if (writeFuture.isSuccess()) {
if (log.isDebugEnabled()) {
log.debug("Proxy {} sent the auth challenge to original client to refresh credentials "
+ "with method {} for the proxy client {}",
proxyConnection.ctx().channel(), clientAuthMethod, ctx.channel());
}
} else {
log.error("Failed to send the auth challenge to original client by the proxy {} "
+ "for the proxy client {}",
proxyConnection.ctx().channel(),
ctx.channel(),
writeFuture.cause());
closeWithException(writeFuture.cause());
}
});

if (state == State.SentConnectFrame) {
state = State.Connecting;
mattisonchao marked this conversation as resolved.
Show resolved Hide resolved
}
} catch (Exception e) {
log.error("Failed to send the auth challenge to origin client by the proxy {} for the proxy client {}",
proxyConnection.ctx().channel(), ctx.channel(), e);
closeWithException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import lombok.Getter;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
Expand Down Expand Up @@ -316,12 +317,11 @@ private synchronized void completeConnect(AuthData clientData) throws PulsarClie
this.clientAuthData = clientData;
this.clientAuthMethod = authMethod;
}
clientCnxSupplier =
() -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
clientAuthMethod, protocolVersionToAdvertise);
clientCnxSupplier = () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole,
clientAuthData, clientAuthMethod, protocolVersionToAdvertise,
service.getConfiguration().isForwardAuthorizationCredentials(), this);
} else {
clientCnxSupplier =
() -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise);
clientCnxSupplier = () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise);
}

if (this.connectionPool == null) {
Expand Down Expand Up @@ -423,16 +423,22 @@ public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnec
}

// According to auth result, send newConnected or newAuthChallenge command.
private void doAuthentication(AuthData clientData) throws Exception {
private void doAuthentication(AuthData clientData)
throws Exception {
AuthData brokerData = authState.authenticate(clientData);
// authentication has completed, will send newConnected command.
if (authState.isComplete()) {
clientAuthRole = authState.getAuthRole();
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Client successfully authenticated with {} role {}",
remoteAddress, authMethod, clientAuthRole);
remoteAddress, authMethod, clientAuthRole);
}

// First connection
if (this.connectionPool == null || state == State.Connecting) {
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved
// authentication has completed, will send newConnected command.
completeConnect(clientData);
}
completeConnect(clientData);
return;
}

Expand All @@ -441,7 +447,7 @@ private void doAuthentication(AuthData clientData) throws Exception {
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Authentication in progress client by method {}.",
remoteAddress, authMethod);
remoteAddress, authMethod);
}
state = State.Connecting;
}
Expand Down Expand Up @@ -523,18 +529,63 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),

@Override
protected void handleAuthResponse(CommandAuthResponse authResponse) {
checkArgument(state == State.Connecting);
checkArgument(authResponse.hasResponse());
checkArgument(authResponse.getResponse().hasAuthData() && authResponse.getResponse().hasAuthMethodName());

if (LOG.isDebugEnabled()) {
LOG.debug("Received AuthResponse from {}, auth method: {}",
remoteAddress, authResponse.getResponse().getAuthMethodName());
remoteAddress, authResponse.getResponse().getAuthMethodName());
}

try {
AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
doAuthentication(clientData);
if (service.getConfiguration().isForwardAuthorizationCredentials()
Copy link
Member

@michaeljmarshall michaeljmarshall Jan 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An interesting consequence of this change is that the ProxyConnection goes back into the Connecting state if the authentication takes multiple steps.

&& connectionPool != null && state == State.ProxyLookupRequests) {
connectionPool.getConnections().forEach(toBrokerCnxFuture -> {
String clientVersion;
if (authResponse.hasClientVersion()) {
clientVersion = authResponse.getClientVersion();
} else {
clientVersion = PulsarVersion.getVersion();
}
int protocolVersion;
if (authResponse.hasProtocolVersion()) {
protocolVersion = authResponse.getProtocolVersion();
} else {
protocolVersion = Commands.getCurrentProtocolVersion();
}

ByteBuf cmd =
Commands.newAuthResponse(clientAuthMethod, clientData, protocolVersion, clientVersion);
toBrokerCnxFuture.thenAccept(toBrokerCnx -> toBrokerCnx.ctx().writeAndFlush(cmd)
.addListener(writeFuture -> {
if (writeFuture.isSuccess()) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} authentication is refreshed successfully by {}, "
+ "auth method: {} ",
toBrokerCnx.ctx().channel(), ctx.channel(), clientAuthMethod);
}
} else {
LOG.error("Failed to forward the auth response "
+ "from the proxy to the broker through the proxy client, "
+ "proxy: {}, proxy client: {}",
ctx.channel(),
toBrokerCnx.ctx().channel(),
writeFuture.cause());
toBrokerCnx.ctx().channel().pipeline()
.fireExceptionCaught(writeFuture.cause());
}
}))
.whenComplete((__, ex) -> {
if (ex != null) {
LOG.error("Failed to forward the auth response from the proxy to "
+ "the broker through the proxy client, proxy: {}",
ctx().channel(), ex);
}
});
});
}
} catch (Exception e) {
String msg = "Unable to handleAuthResponse";
LOG.warn("[{}] {} ", remoteAddress, msg, e);
Expand Down
Loading