Skip to content

Commit

Permalink
Avoid AuthenticationDataSource mutation for subscription name (#16065)
Browse files Browse the repository at this point in the history
The `authenticationData` field in `ServerCnx` is being mutated to add the `subscription` field that will be passed on to the authorization plugin. The problem is that `authenticationData` is scoped to the whole connection and it should be getting mutated for each consumer that is created on the connection.

The current code leads to a race condition where the subscription name used in the authz plugin is already modified while we're looking at it. Instead, we should create a new object and enforce the final modifier.

(cherry picked from commit e6b12c6)
  • Loading branch information
merlimat authored and codelipenghui committed Jun 15, 2022
1 parent 3211f91 commit 4225887
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,9 @@ public class AuthenticationDataCommand implements AuthenticationDataSource {
protected final String authData;
protected final SocketAddress remoteAddress;
protected final SSLSession sslSession;
protected String subscription;

public AuthenticationDataCommand(String authData) {
this(authData, null, null, null);
}

public AuthenticationDataCommand(String authData, String subscription) {
this(authData, null, null, subscription);
}

public AuthenticationDataCommand(String authData, SocketAddress remoteAddress, SSLSession sslSession,
String subscription) {
this.authData = authData;
this.remoteAddress = remoteAddress;
this.sslSession = sslSession;
this.subscription = subscription;
this(authData, null, null);
}

public AuthenticationDataCommand(String authData, SocketAddress remoteAddress, SSLSession sslSession) {
Expand Down Expand Up @@ -100,22 +87,4 @@ public Certificate[] getTlsCertificates() {
return null;
}
}

/*
* Subscription
*/
@Override
public boolean hasSubscription() {
return this.subscription != null;
}

@Override
public void setSubscription(String subscription) {
this.subscription = subscription;
}

@Override
public String getSubscription() {
return subscription;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.authentication;

import java.net.SocketAddress;
import java.security.cert.Certificate;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class AuthenticationDataSubscription implements AuthenticationDataSource {
private final AuthenticationDataSource authData;
private final String subscription;

public AuthenticationDataSubscription(AuthenticationDataSource authData, String subscription) {
this.authData = authData;
this.subscription = subscription;
}

@Override
public boolean hasDataFromCommand() {
return authData.hasDataFromCommand();
}

@Override
public String getCommandData() {
return authData.getCommandData();
}

@Override
public boolean hasDataFromPeer() {
return authData.hasDataFromPeer();
}

@Override
public SocketAddress getPeerAddress() {
return authData.getPeerAddress();
}

@Override
public boolean hasDataFromTls() {
return authData.hasDataFromTls();
}

@Override
public Certificate[] getTlsCertificates() {
return authData.getTlsCertificates();
}

@Override
public boolean hasSubscription() {
return this.subscription != null;
}

@Override
public String getSubscription() {
return subscription;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
Expand Down Expand Up @@ -115,7 +115,6 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.CommandTcClientConnectRequest;
import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
import org.apache.pulsar.common.api.proto.FeatureFlags;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
Expand Down Expand Up @@ -375,19 +374,20 @@ private boolean invalidOriginalPrincipal(String originalPrincipal) {
// // Incoming commands handling
// ////

private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, TopicOperation operation) {
private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, TopicOperation operation,
AuthenticationDataSource authData) {
if (!service.isAuthorizationEnabled()) {
return CompletableFuture.completedFuture(true);
}
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (originalPrincipal != null) {
isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(
topicName, operation, originalPrincipal, getAuthenticationData());
topicName, operation, originalPrincipal, authData);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
CompletableFuture<Boolean> isAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(
topicName, operation, authRole, authenticationData);
topicName, operation, authRole, authData);
return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> {
if (!isProxyAuthorized) {
log.warn("OriginalRole {} is not authorized to perform operation {} on topic {}",
Expand All @@ -404,15 +404,9 @@ private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName,
private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, String subscriptionName,
TopicOperation operation) {
if (service.isAuthorizationEnabled()) {
if (authenticationData == null) {
authenticationData = new AuthenticationDataCommand("", subscriptionName);
} else {
authenticationData.setSubscription(subscriptionName);
}
if (originalAuthData != null) {
originalAuthData.setSubscription(subscriptionName);
}
return isTopicOperationAllowed(topicName, operation);
AuthenticationDataSource authData =
new AuthenticationDataSubscription(getAuthenticationData(), subscriptionName);
return isTopicOperationAllowed(topicName, operation, authData);
} else {
return CompletableFuture.completedFuture(true);
}
Expand Down Expand Up @@ -447,7 +441,8 @@ protected void handleLookup(CommandLookupTopic lookup) {
lookupSemaphore.release();
return;
}
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP).thenApply(isAuthorized -> {
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, getAuthenticationData()).thenApply(
isAuthorized -> {
if (isAuthorized) {
lookupTopicAsync(getBrokerService().pulsar(), topicName, authoritative,
getPrincipal(), getAuthenticationData(),
Expand Down Expand Up @@ -510,7 +505,8 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
lookupSemaphore.release();
return;
}
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP).thenApply(isAuthorized -> {
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, getAuthenticationData()).thenApply(
isAuthorized -> {
if (isAuthorized) {
unsafeGetPartitionedTopicMetadataAsync(getBrokerService().pulsar(), topicName)
.handle((metadata, ex) -> {
Expand Down Expand Up @@ -1163,7 +1159,7 @@ protected void handleProducer(final CommandProducer cmdProducer) {
}

CompletableFuture<Boolean> isAuthorizedFuture = isTopicOperationAllowed(
topicName, TopicOperation.PRODUCE
topicName, TopicOperation.PRODUCE, getAuthenticationData()
);
isAuthorizedFuture.thenApply(isAuthorized -> {
if (isAuthorized) {
Expand Down

0 comments on commit 4225887

Please sign in to comment.