diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataCommand.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataCommand.java index efc332968afb2..31bc670a2de1e 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataCommand.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataCommand.java @@ -30,22 +30,9 @@ public class AuthenticationDataCommand implements AuthenticationDataSource { protected final String authData; protected final SocketAddress remoteAddress; protected final SSLSession sslSession; - protected String subscription; public AuthenticationDataCommand(String authData) { - this(authData, null, null, null); - } - - public AuthenticationDataCommand(String authData, String subscription) { - this(authData, null, null, subscription); - } - - public AuthenticationDataCommand(String authData, SocketAddress remoteAddress, SSLSession sslSession, - String subscription) { - this.authData = authData; - this.remoteAddress = remoteAddress; - this.sslSession = sslSession; - this.subscription = subscription; + this(authData, null, null); } public AuthenticationDataCommand(String authData, SocketAddress remoteAddress, SSLSession sslSession) { @@ -100,22 +87,4 @@ public Certificate[] getTlsCertificates() { return null; } } - - /* - * Subscription - */ - @Override - public boolean hasSubscription() { - return this.subscription != null; - } - - @Override - public void setSubscription(String subscription) { - this.subscription = subscription; - } - - @Override - public String getSubscription() { - return subscription; - } } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSubscription.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSubscription.java new file mode 100644 index 0000000000000..f6723609908ac --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSubscription.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.authentication; + +import java.net.SocketAddress; +import java.security.cert.Certificate; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class AuthenticationDataSubscription implements AuthenticationDataSource { + private final AuthenticationDataSource authData; + private final String subscription; + + public AuthenticationDataSubscription(AuthenticationDataSource authData, String subscription) { + this.authData = authData; + this.subscription = subscription; + } + + @Override + public boolean hasDataFromCommand() { + return authData.hasDataFromCommand(); + } + + @Override + public String getCommandData() { + return authData.getCommandData(); + } + + @Override + public boolean hasDataFromPeer() { + return authData.hasDataFromPeer(); + } + + @Override + public SocketAddress getPeerAddress() { + return authData.getPeerAddress(); + } + + @Override + public boolean hasDataFromTls() { + return authData.hasDataFromTls(); + } + + @Override + public Certificate[] getTlsCertificates() { + return authData.getTlsCertificates(); + } + + @Override + public boolean hasSubscription() { + return this.subscription != null; + } + + @Override + public String getSubscription() { + return subscription; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 58035b84c3885..d4b53aeee5d78 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -66,8 +66,8 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.TransactionMetadataStoreService; -import org.apache.pulsar.broker.authentication.AuthenticationDataCommand; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription; import org.apache.pulsar.broker.authentication.AuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationState; import org.apache.pulsar.broker.intercept.BrokerInterceptor; @@ -115,7 +115,6 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; -import org.apache.pulsar.common.api.proto.CommandTcClientConnectRequest; import org.apache.pulsar.common.api.proto.CommandUnsubscribe; import org.apache.pulsar.common.api.proto.FeatureFlags; import org.apache.pulsar.common.api.proto.KeySharedMeta; @@ -375,19 +374,20 @@ private boolean invalidOriginalPrincipal(String originalPrincipal) { // // Incoming commands handling // //// - private CompletableFuture isTopicOperationAllowed(TopicName topicName, TopicOperation operation) { + private CompletableFuture isTopicOperationAllowed(TopicName topicName, TopicOperation operation, + AuthenticationDataSource authData) { if (!service.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } CompletableFuture isProxyAuthorizedFuture; if (originalPrincipal != null) { isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync( - topicName, operation, originalPrincipal, getAuthenticationData()); + topicName, operation, originalPrincipal, authData); } else { isProxyAuthorizedFuture = CompletableFuture.completedFuture(true); } CompletableFuture isAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync( - topicName, operation, authRole, authenticationData); + topicName, operation, authRole, authData); return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> { if (!isProxyAuthorized) { log.warn("OriginalRole {} is not authorized to perform operation {} on topic {}", @@ -404,15 +404,9 @@ private CompletableFuture isTopicOperationAllowed(TopicName topicName, private CompletableFuture isTopicOperationAllowed(TopicName topicName, String subscriptionName, TopicOperation operation) { if (service.isAuthorizationEnabled()) { - if (authenticationData == null) { - authenticationData = new AuthenticationDataCommand("", subscriptionName); - } else { - authenticationData.setSubscription(subscriptionName); - } - if (originalAuthData != null) { - originalAuthData.setSubscription(subscriptionName); - } - return isTopicOperationAllowed(topicName, operation); + AuthenticationDataSource authData = + new AuthenticationDataSubscription(getAuthenticationData(), subscriptionName); + return isTopicOperationAllowed(topicName, operation, authData); } else { return CompletableFuture.completedFuture(true); } @@ -447,7 +441,8 @@ protected void handleLookup(CommandLookupTopic lookup) { lookupSemaphore.release(); return; } - isTopicOperationAllowed(topicName, TopicOperation.LOOKUP).thenApply(isAuthorized -> { + isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, getAuthenticationData()).thenApply( + isAuthorized -> { if (isAuthorized) { lookupTopicAsync(getBrokerService().pulsar(), topicName, authoritative, getPrincipal(), getAuthenticationData(), @@ -510,7 +505,8 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa lookupSemaphore.release(); return; } - isTopicOperationAllowed(topicName, TopicOperation.LOOKUP).thenApply(isAuthorized -> { + isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, getAuthenticationData()).thenApply( + isAuthorized -> { if (isAuthorized) { unsafeGetPartitionedTopicMetadataAsync(getBrokerService().pulsar(), topicName) .handle((metadata, ex) -> { @@ -1163,7 +1159,7 @@ protected void handleProducer(final CommandProducer cmdProducer) { } CompletableFuture isAuthorizedFuture = isTopicOperationAllowed( - topicName, TopicOperation.PRODUCE + topicName, TopicOperation.PRODUCE, getAuthenticationData() ); isAuthorizedFuture.thenApply(isAuthorized -> { if (isAuthorized) {