Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Only send AuthChallenge when client supports it #19626

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -775,10 +775,21 @@ public void authChallengeSuccessCallback(AuthData authChallenge,
}
}
} else {
// auth not complete, continue auth with client side.
ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, authChallenge, clientProtocolVersion));
if (log.isDebugEnabled()) {
log.debug("[{}] Authentication in progress client by method {}.", remoteAddress, authMethod);
if (supportsAuthChallenge(clientProtocolVersion)) {
// auth not complete, continue auth with client side.
ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, authChallenge, clientProtocolVersion));
if (log.isDebugEnabled()) {
log.debug("[{}] Authentication in progress client by method {}.", remoteAddress, authMethod);
}
} else {
state = State.Failed;
service.getPulsarStats().recordConnectionCreateFail();
log.warn("[{}] client's protocol version does not support AuthChallenges, closing connection.",
remoteAddress);
final ByteBuf msg = Commands.newError(-1, ServerError.AuthenticationError,
"Server generated auth challenge, but client's protocol version ["
+ getRemoteEndpointProtocolVersion() + "] does not support them.");
NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
}
}
} catch (Exception | AssertionError e) {
Expand Down Expand Up @@ -3218,9 +3229,13 @@ public boolean isBatchMessageCompatibleVersion() {
}

boolean supportsAuthenticationRefresh() {
return features != null && features.isSupportsAuthRefresh();
return features != null && features.isSupportsAuthRefresh()
&& supportsAuthChallenge(getRemoteEndpointProtocolVersion());
}

boolean supportsAuthChallenge(int protocolVersion) {
return protocolVersion >= ProtocolVersion.v14.getValue();
}

boolean supportBrokerMetadata() {
return features != null && features.isSupportsBrokerEntryMetadata();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ public void testConnectCommandWithPassingOriginalPrincipal() throws Exception {
channel.finish();
}

@Test
public void testAuthChallengePrincipalChangeFails() throws Exception {
AuthenticationService authenticationService = mock(AuthenticationService.class);
AuthenticationProvider authenticationProvider = new MockAlwaysExpiredAuthenticationProvider();
Expand Down Expand Up @@ -533,6 +534,7 @@ public void testAuthChallengePrincipalChangeFails() throws Exception {
channel.finish();
}

@Test
public void testAuthChallengeOriginalPrincipalChangeFails() throws Exception {
AuthenticationService authenticationService = mock(AuthenticationService.class);
AuthenticationProvider authenticationProvider = new MockAlwaysExpiredAuthenticationProvider();
Expand All @@ -551,7 +553,7 @@ public void testAuthChallengeOriginalPrincipalChangeFails() throws Exception {
// Don't want the keep alive task affecting which messages are handled
serverCnx.cancelKeepAliveTask();

ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1, null,
ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", ProtocolVersion.v14.getValue(), null,
null, "pass.client", "pass.client", authMethodName);
channel.writeInbound(clientCommand);

Expand Down Expand Up @@ -706,7 +708,8 @@ public void testAuthResponseWithFailingAuthData() throws Exception {

// Trigger another AuthChallenge to verify that code path continues to challenge
ByteBuf authResponse1 =
Commands.newAuthResponse(authMethodName, AuthData.of("challenge.client".getBytes()), 1, "1");
Commands.newAuthResponse(authMethodName, AuthData.of("challenge.client".getBytes()),
ProtocolVersion.v14.getValue(), "1");
channel.writeInbound(authResponse1);

Object challenge2 = getResponse();
Expand All @@ -725,6 +728,67 @@ public void testAuthResponseWithFailingAuthData() throws Exception {
channel.finish();
}

@Test
public void testAuthChallengeWithOldClientResultsInErrorToClient() throws Exception {
AuthenticationService authenticationService = mock(AuthenticationService.class);
AuthenticationProvider authenticationProvider = new MockMultiStageAuthenticationProvider();
String authMethodName = authenticationProvider.getAuthMethodName();

when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
svcConfig.setAuthenticationEnabled(true);

resetChannel();
assertTrue(channel.isActive());
assertEquals(serverCnx.getState(), State.Start);

// Trigger connect command to result in AuthChallenge
ByteBuf clientCommand = Commands.newConnect(authMethodName, "challenge.client", ProtocolVersion.v13.getValue(),
"1", null, null, null, null);
channel.writeInbound(clientCommand);

Object error = getResponse();
assertTrue(error instanceof CommandError);
assertEquals(serverCnx.getState(), State.Failed);
assertFalse(serverCnx.isActive());
channel.finish();
}

@Test
public void testAuthRefreshWithOldClientResultsClosedConnection() throws Exception {
AuthenticationService authenticationService = mock(AuthenticationService.class);
AuthenticationProvider authenticationProvider = new MockAlwaysExpiredAuthenticationProvider();
String authMethodName = authenticationProvider.getAuthMethodName();

when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
svcConfig.setAuthenticationEnabled(true);
svcConfig.setAuthenticationRefreshCheckSeconds(30);

resetChannel();
assertTrue(channel.isActive());
assertEquals(serverCnx.getState(), State.Start);
// Don't want the keep alive task affecting which messages are handled
serverCnx.cancelKeepAliveTask();

// Trigger connect command to result in AuthChallenge
ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.client", ProtocolVersion.v13.getValue(),
"1", null, null, null, null);
channel.writeInbound(clientCommand);
Object connected = getResponse();
assertTrue(connected instanceof CommandConnected);

// Trigger the ServerCnx to check if authentication is expired (it is because of our special implementation)
// and then force channel to run the task
channel.advanceTimeBy(30, TimeUnit.SECONDS);
assertTrue(channel.hasPendingTasks(), "This test assumes there are pending tasks to run.");
channel.runPendingTasks();
assertTrue(channel.outboundMessages().isEmpty());
// Then expect channel to close
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive());
channel.finish();
}

@Test(timeOut = 30000)
public void testOriginalAuthDataTriggersAuthChallengeFailure() throws Exception {
// Test verifies the current behavior in the absence of a solution for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,12 +455,21 @@ protected void authChallengeSuccessCallback(AuthData authChallenge) {
return;
}

// auth not complete, continue auth with client side.
final ByteBuf msg = Commands.newAuthChallenge(authMethod, authChallenge, protocolVersionToAdvertise);
writeAndFlush(msg);
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Authentication in progress client by method {}.",
remoteAddress, authMethod);
if (getRemoteEndpointProtocolVersion() >= ProtocolVersion.v14.getValue()) {
// auth not complete, continue auth with client side.
final ByteBuf msg = Commands.newAuthChallenge(authMethod, authChallenge, protocolVersionToAdvertise);
writeAndFlush(msg);
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Authentication in progress client by method {}.",
remoteAddress, authMethod);
}
} else {
LOG.warn("[{}] client's protocol version does not support AuthChallenges, closing connection.",
remoteAddress);
final ByteBuf msg = Commands.newError(-1, ServerError.AuthenticationError,
"Server generated auth challenge, but client's protocol version ["
+ getRemoteEndpointProtocolVersion() + "] does not support them.");
writeAndFlushAndClose(msg);
}
} catch (Exception e) {
authenticationFailedCallback(e);
Expand Down
Loading