Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][broker] PIP-321 Introduce allowed-cluster at the namespace level #22378

Merged
merged 18 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -703,9 +703,21 @@ protected CompletableFuture<Void> internalSetNamespaceReplicationClusters(List<S
"Invalid cluster id: " + clusterId);
}
return validatePeerClusterConflictAsync(clusterId, replicationClusterSet)
.thenCompose(__ ->
validateClusterForTenantAsync(
namespaceName.getTenant(), clusterId));
.thenCompose(__ -> getNamespacePoliciesAsync(this.namespaceName)
.thenCompose(nsPolicies -> {
if (nsPolicies.allowed_clusters.isEmpty()) {
return validateClusterForTenantAsync(
namespaceName.getTenant(), clusterId);
}
if (!nsPolicies.allowed_clusters.contains(clusterId)) {
Technoboy- marked this conversation as resolved.
Show resolved Hide resolved
String msg = String.format("Cluster [%s] is not in the "
+ "list of allowed clusters list for namespace "
+ "[%s]", clusterId, namespaceName.toString());
log.info(msg);
throw new RestException(Status.FORBIDDEN, msg);
}
return CompletableFuture.completedFuture(null);
}));
}).collect(Collectors.toList());
return FutureUtil.waitForAll(futures).thenApply(__ -> replicationClusterSet);
}))
Expand Down Expand Up @@ -2722,4 +2734,65 @@ protected CompletableFuture<Object> internalGetDispatcherPauseOnAckStatePersiste
return policiesOpt.map(p -> p.dispatcherPauseOnAckStatePersistentEnabled).orElse(false);
});
}

protected CompletableFuture<Void> internalSetNamespaceAllowedClusters(List<String> clusterIds) {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ALLOW_CLUSTERS, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
// Allowed clusters in the namespace policy should be included in the allowed clusters in the tenant
// policy.
.thenCompose(__ -> FutureUtil.waitForAll(clusterIds.stream().map(clusterId ->
validateClusterForTenantAsync(namespaceName.getTenant(), clusterId))
.collect(Collectors.toList())))
// Allowed clusters should include all the existed replication clusters and could not contain global
// cluster.
.thenCompose(__ -> {
checkNotNull(clusterIds, "ClusterIds should not be null");
if (clusterIds.contains("global")) {
throw new RestException(Status.PRECONDITION_FAILED,
"Cannot specify global in the list of allowed clusters");
}
return getNamespacePoliciesAsync(this.namespaceName).thenApply(namespacePolicies -> {
namespacePolicies.replication_clusters.forEach(replicationCluster -> {
if (!clusterIds.contains(replicationCluster)) {
throw new RestException(Status.BAD_REQUEST,
String.format("Allowed clusters do not contain the replication cluster %s. "
+ "Please remove the replication cluster if the cluster is not allowed "
+ "for this namespace", replicationCluster));
}
});
return Sets.newHashSet(clusterIds);
});
})
// Verify the allowed clusters are valid and they do not contain the peer clusters.
.thenCompose(allowedClusters -> clustersAsync()
Technoboy- marked this conversation as resolved.
Show resolved Hide resolved
.thenCompose(clusters -> {
List<CompletableFuture<Void>> futures =
allowedClusters.stream().map(clusterId -> {
if (!clusters.contains(clusterId)) {
throw new RestException(Status.FORBIDDEN,
"Invalid cluster id: " + clusterId);
}
return validatePeerClusterConflictAsync(clusterId, allowedClusters);
Technoboy- marked this conversation as resolved.
Show resolved Hide resolved
}).collect(Collectors.toList());
return FutureUtil.waitForAll(futures).thenApply(__ -> allowedClusters);
}))
// Update allowed clusters into policies.
.thenCompose(allowedClusterSet -> updatePoliciesAsync(namespaceName, policies -> {
policies.allowed_clusters = allowedClusterSet;
return policies;
}));
}

protected CompletableFuture<Set<String>> internalGetNamespaceAllowedClustersAsync() {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ALLOW_CLUSTERS, PolicyOperation.READ)
.thenAccept(__ -> {
if (!namespaceName.isGlobal()) {
throw new RestException(Status.PRECONDITION_FAILED,
"Cannot get the allowed clusters for a non-global namespace");
}
}).thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenApply(policies -> policies.allowed_clusters);
Technoboy- marked this conversation as resolved.
Show resolved Hide resolved
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -2838,5 +2838,52 @@ public void getDispatcherPauseOnAckStatePersistent(@Suspended final AsyncRespons
});
}


@POST
@Path("/{tenant}/{namespace}/allowedClusters")
@ApiOperation(value = "Set the allowed clusters for a namespace.")
@ApiResponses(value = {
@ApiResponse(code = 400, message = "The list of allowed clusters should include all replication clusters."),
@ApiResponse(code = 403, message = "The requester does not have admin permissions."),
@ApiResponse(code = 404, message = "The specified tenant, cluster, or namespace does not exist."),
@ApiResponse(code = 409, message = "A peer-cluster cannot be part of an allowed-cluster."),
@ApiResponse(code = 412, message = "The namespace is not global or the provided cluster IDs are invalid.")})
public void setNamespaceAllowedClusters(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@ApiParam(value = "List of allowed clusters", required = true)
List<String> clusterIds) {
validateNamespaceName(tenant, namespace);
internalSetNamespaceAllowedClusters(clusterIds)
.thenAccept(asyncResponse::resume)
.exceptionally(e -> {
log.error("[{}] Failed to set namespace allowed clusters on namespace {}",
clientAppId(), namespace, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return null;
});
}

@GET
@Path("/{tenant}/{namespace}/allowedClusters")
@ApiOperation(value = "Get the allowed clusters for a namespace.",
response = String.class, responseContainer = "List")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 412, message = "Namespace is not global")})
public void getNamespaceAllowedClusters(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalGetNamespaceAllowedClustersAsync()
.thenAccept(asyncResponse::resume)
.exceptionally(e -> {
log.error("[{}] Failed to get namespace allowed clusters on namespace {}", clientAppId(),
namespace, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return null;
});
}

private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1857,52 +1857,81 @@ public CompletableFuture<Void> checkReplication() {
if (log.isDebugEnabled()) {
log.debug("[{}] Checking replication status", name);
}

List<String> configuredClusters = topicPolicies.getReplicationClusters().get();
if (CollectionUtils.isEmpty(configuredClusters)) {
log.warn("[{}] No replication clusters configured", name);
return CompletableFuture.completedFuture(null);
}

int newMessageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get();

String localCluster = brokerService.pulsar().getConfiguration().getClusterName();

// if local cluster is removed from global namespace cluster-list : then delete topic forcefully
// because pulsar doesn't serve global topic without local repl-cluster configured.
if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) {
log.info("Deleting topic [{}] because local cluster is not part of "
+ " global namespace repl list {}", topic, configuredClusters);
return deleteForcefully();
}

removeTerminatedReplicators(replicators);
List<CompletableFuture<Void>> futures = new ArrayList<>();

// Check for missing replicators
for (String cluster : configuredClusters) {
if (cluster.equals(localCluster)) {
continue;
}
if (!replicators.containsKey(cluster)) {
futures.add(startReplicator(cluster));
return checkAllowedCluster(localCluster).thenCompose(success -> {
if (!success) {
// local cluster is not part of global namespace replication list.
// The topic is not allowed to serve anymore.
return CompletableFuture.completedFuture(null);
Demogorgon314 marked this conversation as resolved.
Show resolved Hide resolved
}
}

// Check for replicators to be stopped
replicators.forEach((cluster, replicator) -> {
// Update message TTL
((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLInSeconds);
if (!cluster.equals(localCluster)) {
if (!configuredClusters.contains(cluster)) {
futures.add(removeReplicator(cluster));
int newMessageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get();

removeTerminatedReplicators(replicators);
List<CompletableFuture<Void>> futures = new ArrayList<>();

// The replication clusters at namespace level will get local cluster when creating a namespace.
// If there are only one cluster in the replication clusters, it means the replication is not enabled.
// If the cluster 1 and cluster 2 use the same configuration store and the namespace is created in cluster1
// without enabling geo-replication, then the replication clusters always has cluster1.
//
// When a topic under the namespace is load in the cluster2, the `cluster1` may be identified as
// remote cluster and start geo-replication. This check is to avoid the above case.
if (!(configuredClusters.size() == 1 && replicators.isEmpty())) {
// Check for missing replicators
for (String cluster : configuredClusters) {
if (cluster.equals(localCluster)) {
continue;
}
if (!replicators.containsKey(cluster)) {
futures.add(startReplicator(cluster));
}
}
// Check for replicators to be stopped
replicators.forEach((cluster, replicator) -> {
// Update message TTL
((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLInSeconds);
if (!cluster.equals(localCluster)) {
if (!configuredClusters.contains(cluster)) {
futures.add(removeReplicator(cluster));
}
}
});
}
});

futures.add(checkShadowReplication());
futures.add(checkShadowReplication());

return FutureUtil.waitForAll(futures);
return FutureUtil.waitForAll(futures);
});
}

private CompletableFuture<Boolean> checkAllowedCluster(String localCluster) {
Demogorgon314 marked this conversation as resolved.
Show resolved Hide resolved
List<String> replicationClusters = topicPolicies.getReplicationClusters().get();
return brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject()).thenCompose(policiesOptional -> {
Set<String> allowedClusters = Set.of();
if (policiesOptional.isPresent()) {
allowedClusters = policiesOptional.get().allowed_clusters;
}
// if local cluster is removed from global namespace cluster-list : then delete topic forcefully
// because pulsar doesn't serve global topic without local repl-cluster configured.
if (TopicName.get(topic).isGlobal() && !replicationClusters.contains(localCluster)
&& !allowedClusters.contains(localCluster)) {
Technoboy- marked this conversation as resolved.
Show resolved Hide resolved
log.warn("Deleting topic [{}] because local cluster is not part of "
+ "global namespace repl list {} and allowed list {}", topic,
replicationClusters, allowedClusters);
return deleteForcefully().thenApply(__ -> false);
} else {
return CompletableFuture.completedFuture(true);
}
});
}

private CompletableFuture<Void> checkShadowReplication() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -901,14 +901,16 @@ public static CompletableFuture<ClusterDataImpl> checkLocalOrGetPeerReplicationC
log.warn(msg);
validationFuture.completeExceptionally(new RestException(Status.NOT_FOUND,
"Namespace is deleted"));
} else if (policies.replication_clusters.isEmpty()) {
} else if (policies.replication_clusters.isEmpty() && policies.allowed_clusters.isEmpty()) {
String msg = String.format(
"Namespace does not have any clusters configured : local_cluster=%s ns=%s",
localCluster, namespace.toString());
log.warn(msg);
validationFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED, msg));
} else if (!policies.replication_clusters.contains(localCluster)) {
getOwnerFromPeerClusterListAsync(pulsarService, policies.replication_clusters)
} else if (!policies.replication_clusters.contains(localCluster) && !policies.allowed_clusters
.contains(localCluster)) {
getOwnerFromPeerClusterListAsync(pulsarService, policies.replication_clusters,
policies.allowed_clusters)
.thenAccept(ownerPeerCluster -> {
if (ownerPeerCluster != null) {
// found a peer that own this namespace
Expand Down Expand Up @@ -948,9 +950,9 @@ public static CompletableFuture<ClusterDataImpl> checkLocalOrGetPeerReplicationC
}

private static CompletableFuture<ClusterDataImpl> getOwnerFromPeerClusterListAsync(PulsarService pulsar,
Set<String> replicationClusters) {
Set<String> replicationClusters, Set<String> allowedClusters) {
String currentCluster = pulsar.getConfiguration().getClusterName();
if (replicationClusters == null || replicationClusters.isEmpty() || isBlank(currentCluster)) {
if (replicationClusters.isEmpty() && allowedClusters.isEmpty() || isBlank(currentCluster)) {
return CompletableFuture.completedFuture(null);
}

Expand All @@ -960,7 +962,8 @@ private static CompletableFuture<ClusterDataImpl> getOwnerFromPeerClusterListAsy
return CompletableFuture.completedFuture(null);
}
for (String peerCluster : cluster.get().getPeerClusterNames()) {
if (replicationClusters.contains(peerCluster)) {
if (replicationClusters.contains(peerCluster)
|| allowedClusters.contains(peerCluster)) {
return pulsar.getPulsarResources().getClusterResources().getClusterAsync(peerCluster)
.thenApply(ret -> {
if (!ret.isPresent()) {
Expand Down
Loading
Loading