Skip to content

Commit

Permalink
[cleanup][broker] Validate originalPrincipal earlier in ServerCnx (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeljmarshall authored Feb 6, 2023
1 parent 7075075 commit fd3ce8b
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -397,17 +397,30 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
ctx.close();
}

/*
* If authentication and authorization is enabled(and not sasl) and
* if the authRole is one of proxyRoles we want to enforce
/**
* When transitioning from Connecting to Connected, this method validates the roles.
* If the authRole is one of proxyRoles, the following must be true:
* - the originalPrincipal is given while connecting
* - originalPrincipal is not blank
* - originalPrincipal is not a proxy principal
* - originalPrincipal is not a proxy principal.
* @return true when roles are valid and false when roles are invalid
*/
private boolean invalidOriginalPrincipal(String originalPrincipal) {
return (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()
&& proxyRoles.contains(authRole) && (StringUtils.isBlank(originalPrincipal)
|| proxyRoles.contains(originalPrincipal)));
private boolean isValidRoleAndOriginalPrincipal() {
String errorMsg = null;
if (proxyRoles.contains(authRole)) {
if (StringUtils.isBlank(originalPrincipal)) {
errorMsg = "originalPrincipal must be provided when connecting with a proxy role.";
} else if (proxyRoles.contains(originalPrincipal)) {
errorMsg = "originalPrincipal cannot be a proxy role.";
}
}
if (errorMsg != null) {
log.warn("[{}] Illegal combination of role [{}] and originalPrincipal [{}]: {}", remoteAddress, authRole,
originalPrincipal, errorMsg);
return false;
} else {
return true;
}
}

// ////
Expand Down Expand Up @@ -489,14 +502,6 @@ protected void handleLookup(CommandLookupTopic lookup) {

final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
if (invalidOriginalPrincipal(originalPrincipal)) {
final String msg = "Valid Proxy Client role should be provided for lookup ";
log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", remoteAddress, msg, authRole,
originalPrincipal, topicName);
writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, requestId));
lookupSemaphore.release();
return;
}
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, authenticationData, originalAuthData).thenApply(
isAuthorized -> {
if (isAuthorized) {
Expand Down Expand Up @@ -570,14 +575,6 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa

final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
if (invalidOriginalPrincipal(originalPrincipal)) {
final String msg = "Valid Proxy Client role should be provided for getPartitionMetadataRequest ";
log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", remoteAddress, msg, authRole,
originalPrincipal, topicName);
commandSender.sendPartitionMetadataResponse(ServerError.AuthorizationError, msg, requestId);
lookupSemaphore.release();
return;
}
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, authenticationData, originalAuthData).thenApply(
isAuthorized -> {
if (isAuthorized) {
Expand Down Expand Up @@ -693,6 +690,15 @@ ByteBuf createConsumerStatsResponse(Consumer consumer, long requestId) {

// complete the connect and sent newConnected command
private void completeConnect(int clientProtoVersion, String clientVersion) {
if (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()) {
if (!isValidRoleAndOriginalPrincipal()) {
state = State.Failed;
service.getPulsarStats().recordConnectionCreateFail();
final ByteBuf msg = Commands.newError(-1, ServerError.AuthorizationError, "Invalid roles.");
NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
return;
}
}
writeAndFlush(Commands.newConnected(clientProtoVersion, maxMessageSize, enableSubscriptionPatternEvaluation));
state = State.Connected;
service.getPulsarStats().recordConnectionCreateSuccess();
Expand Down Expand Up @@ -1065,14 +1071,6 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
remoteAddress, authRole, originalPrincipal);
}

if (invalidOriginalPrincipal(originalPrincipal)) {
final String msg = "Valid Proxy Client role should be provided while subscribing ";
log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", remoteAddress, msg, authRole,
originalPrincipal, topicName);
commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, msg);
return;
}

final String subscriptionName = subscribe.getSubscription();
final SubType subType = subscribe.getSubType();
final String consumerName = subscribe.hasConsumerName() ? subscribe.getConsumerName() : "";
Expand Down Expand Up @@ -1318,14 +1316,6 @@ protected void handleProducer(final CommandProducer cmdProducer) {
return;
}

if (invalidOriginalPrincipal(originalPrincipal)) {
final String msg = "Valid Proxy Client role should be provided while creating producer ";
log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", remoteAddress, msg, authRole,
originalPrincipal, topicName);
commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, msg);
return;
}

CompletableFuture<Boolean> isAuthorizedFuture = isTopicOperationAllowed(
topicName, TopicOperation.PRODUCE, authenticationData, originalAuthData
);
Expand Down Expand Up @@ -2169,14 +2159,6 @@ protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet

final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
if (invalidOriginalPrincipal(originalPrincipal)) {
final String msg = "Valid Proxy Client role should be provided for getTopicsOfNamespaceRequest ";
log.warn("[{}] {} with role {} and proxyClientAuthRole {} on namespace {}", remoteAddress, msg,
authRole, originalPrincipal, namespaceName);
commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, msg);
lookupSemaphore.release();
return;
}
isNamespaceOperationAllowed(namespaceName, NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
if (isAuthorized) {
getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
Expand Down Expand Up @@ -2735,14 +2717,6 @@ protected void handleCommandWatchTopicList(CommandWatchTopicList commandWatchTop

final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
if (invalidOriginalPrincipal(originalPrincipal)) {
final String msg = "Valid Proxy Client role should be provided for watchTopicListRequest ";
log.warn("[{}] {} with role {} and proxyClientAuthRole {} on namespace {}", remoteAddress, msg,
authRole, originalPrincipal, namespaceName);
commandSender.sendErrorResponse(watcherId, ServerError.AuthorizationError, msg);
lookupSemaphore.release();
return;
}
isNamespaceOperationAllowed(namespaceName, NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
if (isAuthorized) {
topicListService.handleWatchTopicList(namespaceName, watcherId, requestId, topicsPattern,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,42 @@ public void testAuthChallengeOriginalPrincipalChangeFails() throws Exception {
channel.finish();
}

@Test(timeOut = 30000)
public void testConnectCommandWithInvalidRoleCombinations() throws Exception {
AuthenticationService authenticationService = mock(AuthenticationService.class);
AuthenticationProvider authenticationProvider = new MockAuthenticationProvider();
String authMethodName = authenticationProvider.getAuthMethodName();

when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
svcConfig.setAuthenticationEnabled(true);
svcConfig.setAuthenticateOriginalAuthData(false);
svcConfig.setAuthorizationEnabled(true);
svcConfig.setProxyRoles(Collections.singleton("pass.proxy"));

// Invalid combinations where authData is proxy role
verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.proxy", "pass.proxy");
verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.proxy", "");
verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.proxy", null);
}

private void verifyAuthRoleAndOriginalPrincipalBehavior(String authMethodName, String authData,
String originalPrincipal) throws Exception {
resetChannel();
assertTrue(channel.isActive());
assertEquals(serverCnx.getState(), State.Start);

ByteBuf clientCommand = Commands.newConnect(authMethodName, authData, 1,null,
null, originalPrincipal, null, null);
channel.writeInbound(clientCommand);

Object response = getResponse();
assertTrue(response instanceof CommandError);
assertEquals(((CommandError) response).getError(), ServerError.AuthorizationError);
assertEquals(serverCnx.getState(), State.Failed);
channel.finish();
}

@Test(timeOut = 30000)
public void testConnectCommandWithAuthenticationNegative() throws Exception {
AuthenticationService authenticationService = mock(AuthenticationService.class);
Expand Down

0 comments on commit fd3ce8b

Please sign in to comment.