diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index e355f87581b2c..988f7d34e9916 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -397,17 +397,30 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E ctx.close(); } - /* - * If authentication and authorization is enabled(and not sasl) and - * if the authRole is one of proxyRoles we want to enforce + /** + * When transitioning from Connecting to Connected, this method validates the roles. + * If the authRole is one of proxyRoles, the following must be true: * - the originalPrincipal is given while connecting * - originalPrincipal is not blank - * - originalPrincipal is not a proxy principal + * - originalPrincipal is not a proxy principal. + * @return true when roles are valid and false when roles are invalid */ - private boolean invalidOriginalPrincipal(String originalPrincipal) { - return (service.isAuthenticationEnabled() && service.isAuthorizationEnabled() - && proxyRoles.contains(authRole) && (StringUtils.isBlank(originalPrincipal) - || proxyRoles.contains(originalPrincipal))); + private boolean isValidRoleAndOriginalPrincipal() { + String errorMsg = null; + if (proxyRoles.contains(authRole)) { + if (StringUtils.isBlank(originalPrincipal)) { + errorMsg = "originalPrincipal must be provided when connecting with a proxy role."; + } else if (proxyRoles.contains(originalPrincipal)) { + errorMsg = "originalPrincipal cannot be a proxy role."; + } + } + if (errorMsg != null) { + log.warn("[{}] Illegal combination of role [{}] and originalPrincipal [{}]: {}", remoteAddress, authRole, + originalPrincipal, errorMsg); + return false; + } else { + return true; + } } // //// @@ -489,14 +502,6 @@ protected void handleLookup(CommandLookupTopic lookup) { final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); if (lookupSemaphore.tryAcquire()) { - if (invalidOriginalPrincipal(originalPrincipal)) { - final String msg = "Valid Proxy Client role should be provided for lookup "; - log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", remoteAddress, msg, authRole, - originalPrincipal, topicName); - writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, requestId)); - lookupSemaphore.release(); - return; - } isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, authenticationData, originalAuthData).thenApply( isAuthorized -> { if (isAuthorized) { @@ -570,14 +575,6 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); if (lookupSemaphore.tryAcquire()) { - if (invalidOriginalPrincipal(originalPrincipal)) { - final String msg = "Valid Proxy Client role should be provided for getPartitionMetadataRequest "; - log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", remoteAddress, msg, authRole, - originalPrincipal, topicName); - commandSender.sendPartitionMetadataResponse(ServerError.AuthorizationError, msg, requestId); - lookupSemaphore.release(); - return; - } isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, authenticationData, originalAuthData).thenApply( isAuthorized -> { if (isAuthorized) { @@ -693,6 +690,15 @@ ByteBuf createConsumerStatsResponse(Consumer consumer, long requestId) { // complete the connect and sent newConnected command private void completeConnect(int clientProtoVersion, String clientVersion) { + if (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()) { + if (!isValidRoleAndOriginalPrincipal()) { + state = State.Failed; + service.getPulsarStats().recordConnectionCreateFail(); + final ByteBuf msg = Commands.newError(-1, ServerError.AuthorizationError, "Invalid roles."); + NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg); + return; + } + } writeAndFlush(Commands.newConnected(clientProtoVersion, maxMessageSize, enableSubscriptionPatternEvaluation)); state = State.Connected; service.getPulsarStats().recordConnectionCreateSuccess(); @@ -1065,14 +1071,6 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { remoteAddress, authRole, originalPrincipal); } - if (invalidOriginalPrincipal(originalPrincipal)) { - final String msg = "Valid Proxy Client role should be provided while subscribing "; - log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", remoteAddress, msg, authRole, - originalPrincipal, topicName); - commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, msg); - return; - } - final String subscriptionName = subscribe.getSubscription(); final SubType subType = subscribe.getSubType(); final String consumerName = subscribe.hasConsumerName() ? subscribe.getConsumerName() : ""; @@ -1318,14 +1316,6 @@ protected void handleProducer(final CommandProducer cmdProducer) { return; } - if (invalidOriginalPrincipal(originalPrincipal)) { - final String msg = "Valid Proxy Client role should be provided while creating producer "; - log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", remoteAddress, msg, authRole, - originalPrincipal, topicName); - commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, msg); - return; - } - CompletableFuture isAuthorizedFuture = isTopicOperationAllowed( topicName, TopicOperation.PRODUCE, authenticationData, originalAuthData ); @@ -2169,14 +2159,6 @@ protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); if (lookupSemaphore.tryAcquire()) { - if (invalidOriginalPrincipal(originalPrincipal)) { - final String msg = "Valid Proxy Client role should be provided for getTopicsOfNamespaceRequest "; - log.warn("[{}] {} with role {} and proxyClientAuthRole {} on namespace {}", remoteAddress, msg, - authRole, originalPrincipal, namespaceName); - commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, msg); - lookupSemaphore.release(); - return; - } isNamespaceOperationAllowed(namespaceName, NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> { if (isAuthorized) { getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode) @@ -2735,14 +2717,6 @@ protected void handleCommandWatchTopicList(CommandWatchTopicList commandWatchTop final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); if (lookupSemaphore.tryAcquire()) { - if (invalidOriginalPrincipal(originalPrincipal)) { - final String msg = "Valid Proxy Client role should be provided for watchTopicListRequest "; - log.warn("[{}] {} with role {} and proxyClientAuthRole {} on namespace {}", remoteAddress, msg, - authRole, originalPrincipal, namespaceName); - commandSender.sendErrorResponse(watcherId, ServerError.AuthorizationError, msg); - lookupSemaphore.release(); - return; - } isNamespaceOperationAllowed(namespaceName, NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> { if (isAuthorized) { topicListService.handleWatchTopicList(namespaceName, watcherId, requestId, topicsPattern, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index bd4adef2cd152..2ee30ed1a8128 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -583,6 +583,42 @@ public void testAuthChallengeOriginalPrincipalChangeFails() throws Exception { channel.finish(); } + @Test(timeOut = 30000) + public void testConnectCommandWithInvalidRoleCombinations() throws Exception { + AuthenticationService authenticationService = mock(AuthenticationService.class); + AuthenticationProvider authenticationProvider = new MockAuthenticationProvider(); + String authMethodName = authenticationProvider.getAuthMethodName(); + + when(brokerService.getAuthenticationService()).thenReturn(authenticationService); + when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); + svcConfig.setAuthenticationEnabled(true); + svcConfig.setAuthenticateOriginalAuthData(false); + svcConfig.setAuthorizationEnabled(true); + svcConfig.setProxyRoles(Collections.singleton("pass.proxy")); + + // Invalid combinations where authData is proxy role + verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.proxy", "pass.proxy"); + verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.proxy", ""); + verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.proxy", null); + } + + private void verifyAuthRoleAndOriginalPrincipalBehavior(String authMethodName, String authData, + String originalPrincipal) throws Exception { + resetChannel(); + assertTrue(channel.isActive()); + assertEquals(serverCnx.getState(), State.Start); + + ByteBuf clientCommand = Commands.newConnect(authMethodName, authData, 1,null, + null, originalPrincipal, null, null); + channel.writeInbound(clientCommand); + + Object response = getResponse(); + assertTrue(response instanceof CommandError); + assertEquals(((CommandError) response).getError(), ServerError.AuthorizationError); + assertEquals(serverCnx.getState(), State.Failed); + channel.finish(); + } + @Test(timeOut = 30000) public void testConnectCommandWithAuthenticationNegative() throws Exception { AuthenticationService authenticationService = mock(AuthenticationService.class);