Skip to content

Commit

Permalink
[PIP-39] Introduce system topic and topic policies service (apache#4955)
Browse files Browse the repository at this point in the history
Fix: apache#4899
### Motivation
PIP-39 : (https://github.com/apache/pulsar/wiki/PIP-39%3A-Namespace-Change-Events)

Introduce system topic and topic policies service which can be used support topic level policies overwrite feature.

### Modification
Added system topic interface and topic policies system topic.
Added topic policies interface and system topic based implementation.
  • Loading branch information
codelipenghui authored and huangdx0726 committed Aug 24, 2020
1 parent b97c0eb commit 622c3b5
Show file tree
Hide file tree
Showing 38 changed files with 1,792 additions and 66 deletions.
7 changes: 7 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,13 @@ retentionCheckIntervalInSeconds=120
# Use 0 or negative number to disable the check
maxNumPartitionsPerPartitionedTopic=0

# Enable or disable system topic
systemTopicEnabled=false

# Enable or disable topic level policies, topic level policies depends on the system topic
# Please enable the system topic first.
topicLevelPoliciesEnabled=false

### --- Authentication --- ###
# Role names that are treated as "proxy roles". If the broker sees a request with
#role as proxyRoles - it will demand to see a valid original principal.
Expand Down
7 changes: 7 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,13 @@ brokerClientTlsCiphers=
# used by the internal client to authenticate with Pulsar brokers
brokerClientTlsProtocols=

# Enable or disable system topic
systemTopicEnabled=false

# Enable or disable topic level policies, topic level policies depends on the system topic
# Please enable the system topic first.
topicLevelPoliciesEnabled=false

### --- Authentication --- ###
# Role names that are treated as "proxy roles". If the broker sees a request with
#role as proxyRoles - it will demand to see a valid original principal.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,17 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private Set<String> messagingProtocols = Sets.newTreeSet();

@FieldContext(
category = CATEGORY_SERVER,
doc = "Enable or disable system topic.")
private boolean systemTopicEnabled = false;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Enable or disable topic level policies, topic level policies depends on the system topic, " +
"please enable the system topic first.")
private boolean topicLevelPoliciesEnabled = false;

/***** --- TLS --- ****/
@FieldContext(
category = CATEGORY_TLS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.protocol.ProtocolHandlers;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
Expand Down Expand Up @@ -153,6 +155,7 @@ public class PulsarService implements AutoCloseable {
private WebSocketService webSocketService = null;
private ConfigurationCacheService configurationCacheService = null;
private LocalZooKeeperCacheService localZkCacheService = null;
private TopicPoliciesService topicPoliciesService = TopicPoliciesService.DISABLED;
private BookKeeperClientFactory bkClientFactory;
private ZooKeeperCache localZkCache;
private GlobalZooKeeperCache globalZkCache;
Expand Down Expand Up @@ -492,6 +495,13 @@ public Boolean get() {
// Initialize namespace service, after service url assigned. Should init zk and refresh self owner info.
this.nsService.initialize();

// Start topic level policies service
if (config.isTopicLevelPoliciesEnabled() && config.isSystemTopicEnabled()) {
this.topicPoliciesService = new SystemTopicBasedTopicPoliciesService(this);
}

this.topicPoliciesService.start();

// Start the leader election service
startLeaderElectionService();

Expand Down Expand Up @@ -1124,6 +1134,10 @@ public static String bookieMetadataServiceUri(ServiceConfiguration config) {
return metadataServiceUri;
}

public TopicPoliciesService getTopicPoliciesService() {
return topicPoliciesService;
}

private void startWorkerService(AuthenticationService authenticationService,
AuthorizationService authorizationService)
throws InterruptedException, IOException, KeeperException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceBundle;
Expand Down Expand Up @@ -200,10 +201,13 @@ protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean auth
}

boolean isEmpty;
List<String> topics;
try {
isEmpty = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join().isEmpty()
&& getPartitionedTopicList(TopicDomain.persistent).isEmpty()
&& getPartitionedTopicList(TopicDomain.non_persistent).isEmpty();
topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join();
topics.addAll(getPartitionedTopicList(TopicDomain.persistent));
topics.addAll(getPartitionedTopicList(TopicDomain.non_persistent));
isEmpty = topics.isEmpty();

} catch (Exception e) {
asyncResponse.resume(new RestException(e));
return;
Expand All @@ -213,8 +217,17 @@ && getPartitionedTopicList(TopicDomain.persistent).isEmpty()
if (log.isDebugEnabled()) {
log.debug("Found topics on namespace {}", namespaceName);
}
asyncResponse.resume(new RestException(Status.CONFLICT, "Cannot delete non empty namespace"));
return;
boolean hasNonSystemTopic = false;
for (String topic : topics) {
if (!SystemTopicClient.isSystemTopic(TopicName.get(topic))) {
hasNonSystemTopic = true;
break;
}
}
if (hasNonSystemTopic) {
asyncResponse.resume(new RestException(Status.CONFLICT, "Cannot delete non empty namespace"));
return;
}
}

// set the policies to deleted so that somebody else cannot acquire this namespace
Expand All @@ -232,6 +245,14 @@ && getPartitionedTopicList(TopicDomain.persistent).isEmpty()
// remove from owned namespace map and ephemeral node from ZK
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
try {
// remove system topics first.
if (!topics.isEmpty()) {
for (String topic : topics) {
pulsar().getBrokerService().getTopicIfExists(topic).whenComplete((topicOptional, ex) -> {
topicOptional.ifPresent(systemTopic -> futures.add(systemTopic.deleteForcefully()));
});
}
}
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName);
for (NamespaceBundle bundle : bundles.getBundles()) {
Expand Down Expand Up @@ -1992,13 +2013,13 @@ private void clearBacklog(NamespaceName nsName, String bundleRange, String subsc
subscription = PersistentReplicator.getRemoteCluster(subscription);
}
for (Topic topic : topicList) {
if (topic instanceof PersistentTopic) {
if (topic instanceof PersistentTopic && !SystemTopicClient.isSystemTopic(TopicName.get(topic.getName()))) {
futures.add(((PersistentTopic) topic).clearBacklog(subscription));
}
}
} else {
for (Topic topic : topicList) {
if (topic instanceof PersistentTopic) {
if (topic instanceof PersistentTopic && !SystemTopicClient.isSystemTopic(TopicName.get(topic.getName()))) {
futures.add(((PersistentTopic) topic).clearBacklog());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(Namespace
}
} else if (nsData.get().isDisabled()) {
future.completeExceptionally(
new IllegalStateException(String.format("Namespace bundle %s is being unloaded", bundle)));
new IllegalStateException(String.format("Namespace bundle %s is being unloaded", bundle)));
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Namespace bundle {} already owned by {} ", bundle, nsData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,16 @@
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventListner;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;

import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
Expand Down Expand Up @@ -926,8 +929,9 @@ private void createPersistentTopic(final String topic, boolean createIfMissing,
@Override
public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
try {
PersistentTopic persistentTopic = new PersistentTopic(topic, ledger,
BrokerService.this);
PersistentTopic persistentTopic = isSystemTopic(topic)
? new SystemTopic(topic, ledger, BrokerService.this)
: new PersistentTopic(topic, ledger, BrokerService.this);
CompletableFuture<Void> replicationFuture = persistentTopic.checkReplication();
replicationFuture.thenCompose(v -> {
// Also check dedup status
Expand Down Expand Up @@ -1236,36 +1240,11 @@ public BacklogQuotaManager getBacklogQuotaManager() {
return this.backlogQuotaManager;
}

/**
*
* @param topic
* needing quota enforcement check
* @return determine if quota enforcement needs to be done for topic
*/
public boolean isBacklogExceeded(PersistentTopic topic) {
TopicName topicName = TopicName.get(topic.getName());
long backlogQuotaLimitInBytes = getBacklogQuotaManager().getBacklogQuotaLimit(topicName.getNamespace());
if (backlogQuotaLimitInBytes < 0) {
return false;
}
if (log.isDebugEnabled()) {
log.debug("[{}] - backlog quota limit = [{}]", topic.getName(), backlogQuotaLimitInBytes);
}

// check if backlog exceeded quota
long storageSize = topic.getBacklogSize();
if (log.isDebugEnabled()) {
log.debug("[{}] Storage size = [{}], limit [{}]", topic.getName(), storageSize, backlogQuotaLimitInBytes);
}

return (storageSize >= backlogQuotaLimitInBytes);
}

public void monitorBacklogQuota() {
forEachTopic(topic -> {
if (topic instanceof PersistentTopic) {
PersistentTopic persistentTopic = (PersistentTopic) topic;
if (isBacklogExceeded(persistentTopic)) {
if (persistentTopic.isBacklogExceeded()) {
getBacklogQuotaManager().handleExceededBacklogQuota(persistentTopic);
} else {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -2101,7 +2080,6 @@ public Optional<Integer> getListenPortTls() {
return Optional.empty();
}
}

private void checkMessagePublishBuffer() {
AtomicLong currentMessagePublishBufferBytes = new AtomicLong();
foreachProducer(producer -> currentMessagePublishBufferBytes.addAndGet(producer.getCnx().getMessagePublishBufferSize()));
Expand Down Expand Up @@ -2214,4 +2192,7 @@ private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(fin
log.debug("No autoSubscriptionCreateOverride policy found for {}", topicName);
return null;
}
private boolean isSystemTopic(String topic) {
return SystemTopicClient.isSystemTopic(TopicName.get(topic));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ public ConsumerAssignException(String msg) {
}
}

public static class TopicPoliciesCacheNotInitException extends BrokerServiceException {
public TopicPoliciesCacheNotInitException() {
super("Topic policies cache have not init.");
}
}

public static PulsarApi.ServerError getClientErrorCode(Throwable t) {
return getClientErrorCode(t, true);
}
Expand Down
Loading

0 comments on commit 622c3b5

Please sign in to comment.