diff --git a/conf/broker.conf b/conf/broker.conf index 436b4205fb944..e897f23c39e6a 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -371,6 +371,13 @@ retentionCheckIntervalInSeconds=120 # Use 0 or negative number to disable the check maxNumPartitionsPerPartitionedTopic=0 +# Enable or disable system topic +systemTopicEnabled=false + +# Enable or disable topic level policies, topic level policies depends on the system topic +# Please enable the system topic first. +topicLevelPoliciesEnabled=false + ### --- Authentication --- ### # Role names that are treated as "proxy roles". If the broker sees a request with #role as proxyRoles - it will demand to see a valid original principal. diff --git a/conf/standalone.conf b/conf/standalone.conf index 319b51070689d..d60bc01ebd5a9 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -332,6 +332,13 @@ brokerClientTlsCiphers= # used by the internal client to authenticate with Pulsar brokers brokerClientTlsProtocols= +# Enable or disable system topic +systemTopicEnabled=false + +# Enable or disable topic level policies, topic level policies depends on the system topic +# Please enable the system topic first. +topicLevelPoliciesEnabled=false + ### --- Authentication --- ### # Role names that are treated as "proxy roles". If the broker sees a request with #role as proxyRoles - it will demand to see a valid original principal. diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index e9b924696a9b6..a960dbc538b1d 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -700,6 +700,17 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private Set messagingProtocols = Sets.newTreeSet(); + @FieldContext( + category = CATEGORY_SERVER, + doc = "Enable or disable system topic.") + private boolean systemTopicEnabled = false; + + @FieldContext( + category = CATEGORY_SERVER, + doc = "Enable or disable topic level policies, topic level policies depends on the system topic, " + + "please enable the system topic first.") + private boolean topicLevelPoliciesEnabled = false; + /***** --- TLS --- ****/ @FieldContext( category = CATEGORY_TLS, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 5e515aacf1760..35b6a189d712c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -86,7 +86,9 @@ import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.protocol.ProtocolHandlers; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.TopicPoliciesService; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.stats.MetricsGenerator; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet; @@ -153,6 +155,7 @@ public class PulsarService implements AutoCloseable { private WebSocketService webSocketService = null; private ConfigurationCacheService configurationCacheService = null; private LocalZooKeeperCacheService localZkCacheService = null; + private TopicPoliciesService topicPoliciesService = TopicPoliciesService.DISABLED; private BookKeeperClientFactory bkClientFactory; private ZooKeeperCache localZkCache; private GlobalZooKeeperCache globalZkCache; @@ -492,6 +495,13 @@ public Boolean get() { // Initialize namespace service, after service url assigned. Should init zk and refresh self owner info. this.nsService.initialize(); + // Start topic level policies service + if (config.isTopicLevelPoliciesEnabled() && config.isSystemTopicEnabled()) { + this.topicPoliciesService = new SystemTopicBasedTopicPoliciesService(this); + } + + this.topicPoliciesService.start(); + // Start the leader election service startLeaderElectionService(); @@ -1124,6 +1134,10 @@ public static String bookieMetadataServiceUri(ServiceConfiguration config) { return metadataServiceUri; } + public TopicPoliciesService getTopicPoliciesService() { + return topicPoliciesService; + } + private void startWorkerService(AuthenticationService authenticationService, AuthorizationService authorizationService) throws InterruptedException, IOException, KeeperException { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 1d14bf55a9cab..42fd278c4a207 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -62,6 +62,7 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -200,10 +201,13 @@ protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean auth } boolean isEmpty; + List topics; try { - isEmpty = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join().isEmpty() - && getPartitionedTopicList(TopicDomain.persistent).isEmpty() - && getPartitionedTopicList(TopicDomain.non_persistent).isEmpty(); + topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join(); + topics.addAll(getPartitionedTopicList(TopicDomain.persistent)); + topics.addAll(getPartitionedTopicList(TopicDomain.non_persistent)); + isEmpty = topics.isEmpty(); + } catch (Exception e) { asyncResponse.resume(new RestException(e)); return; @@ -213,8 +217,17 @@ && getPartitionedTopicList(TopicDomain.persistent).isEmpty() if (log.isDebugEnabled()) { log.debug("Found topics on namespace {}", namespaceName); } - asyncResponse.resume(new RestException(Status.CONFLICT, "Cannot delete non empty namespace")); - return; + boolean hasNonSystemTopic = false; + for (String topic : topics) { + if (!SystemTopicClient.isSystemTopic(TopicName.get(topic))) { + hasNonSystemTopic = true; + break; + } + } + if (hasNonSystemTopic) { + asyncResponse.resume(new RestException(Status.CONFLICT, "Cannot delete non empty namespace")); + return; + } } // set the policies to deleted so that somebody else cannot acquire this namespace @@ -232,6 +245,14 @@ && getPartitionedTopicList(TopicDomain.persistent).isEmpty() // remove from owned namespace map and ephemeral node from ZK final List> futures = Lists.newArrayList(); try { + // remove system topics first. + if (!topics.isEmpty()) { + for (String topic : topics) { + pulsar().getBrokerService().getTopicIfExists(topic).whenComplete((topicOptional, ex) -> { + topicOptional.ifPresent(systemTopic -> futures.add(systemTopic.deleteForcefully())); + }); + } + } NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() .getBundles(namespaceName); for (NamespaceBundle bundle : bundles.getBundles()) { @@ -1992,13 +2013,13 @@ private void clearBacklog(NamespaceName nsName, String bundleRange, String subsc subscription = PersistentReplicator.getRemoteCluster(subscription); } for (Topic topic : topicList) { - if (topic instanceof PersistentTopic) { + if (topic instanceof PersistentTopic && !SystemTopicClient.isSystemTopic(TopicName.get(topic.getName()))) { futures.add(((PersistentTopic) topic).clearBacklog(subscription)); } } } else { for (Topic topic : topicList) { - if (topic instanceof PersistentTopic) { + if (topic instanceof PersistentTopic && !SystemTopicClient.isSystemTopic(TopicName.get(topic.getName()))) { futures.add(((PersistentTopic) topic).clearBacklog()); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 24d933d0f2b3f..74d69c4efc44e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -362,7 +362,7 @@ private CompletableFuture> findBrokerServiceUrl(Namespace } } else if (nsData.get().isDisabled()) { future.completeExceptionally( - new IllegalStateException(String.format("Namespace bundle %s is being unloaded", bundle))); + new IllegalStateException(String.format("Namespace bundle %s is being unloaded", bundle))); } else { if (LOG.isDebugEnabled()) { LOG.debug("Namespace bundle {} already owned by {} ", bundle, nsData); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index d5a9f80fc79f0..3b4f645214c3c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -101,13 +101,16 @@ import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.service.persistent.SystemTopic; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; +import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect; import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventListner; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; + import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -926,8 +929,9 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, @Override public void openLedgerComplete(ManagedLedger ledger, Object ctx) { try { - PersistentTopic persistentTopic = new PersistentTopic(topic, ledger, - BrokerService.this); + PersistentTopic persistentTopic = isSystemTopic(topic) + ? new SystemTopic(topic, ledger, BrokerService.this) + : new PersistentTopic(topic, ledger, BrokerService.this); CompletableFuture replicationFuture = persistentTopic.checkReplication(); replicationFuture.thenCompose(v -> { // Also check dedup status @@ -1236,36 +1240,11 @@ public BacklogQuotaManager getBacklogQuotaManager() { return this.backlogQuotaManager; } - /** - * - * @param topic - * needing quota enforcement check - * @return determine if quota enforcement needs to be done for topic - */ - public boolean isBacklogExceeded(PersistentTopic topic) { - TopicName topicName = TopicName.get(topic.getName()); - long backlogQuotaLimitInBytes = getBacklogQuotaManager().getBacklogQuotaLimit(topicName.getNamespace()); - if (backlogQuotaLimitInBytes < 0) { - return false; - } - if (log.isDebugEnabled()) { - log.debug("[{}] - backlog quota limit = [{}]", topic.getName(), backlogQuotaLimitInBytes); - } - - // check if backlog exceeded quota - long storageSize = topic.getBacklogSize(); - if (log.isDebugEnabled()) { - log.debug("[{}] Storage size = [{}], limit [{}]", topic.getName(), storageSize, backlogQuotaLimitInBytes); - } - - return (storageSize >= backlogQuotaLimitInBytes); - } - public void monitorBacklogQuota() { forEachTopic(topic -> { if (topic instanceof PersistentTopic) { PersistentTopic persistentTopic = (PersistentTopic) topic; - if (isBacklogExceeded(persistentTopic)) { + if (persistentTopic.isBacklogExceeded()) { getBacklogQuotaManager().handleExceededBacklogQuota(persistentTopic); } else { if (log.isDebugEnabled()) { @@ -2101,7 +2080,6 @@ public Optional getListenPortTls() { return Optional.empty(); } } - private void checkMessagePublishBuffer() { AtomicLong currentMessagePublishBufferBytes = new AtomicLong(); foreachProducer(producer -> currentMessagePublishBufferBytes.addAndGet(producer.getCnx().getMessagePublishBufferSize())); @@ -2214,4 +2192,7 @@ private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(fin log.debug("No autoSubscriptionCreateOverride policy found for {}", topicName); return null; } + private boolean isSystemTopic(String topic) { + return SystemTopicClient.isSystemTopic(TopicName.get(topic)); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java index b4bfed518ad3c..6d0e50fe68d7a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java @@ -171,6 +171,12 @@ public ConsumerAssignException(String msg) { } } + public static class TopicPoliciesCacheNotInitException extends BrokerServiceException { + public TopicPoliciesCacheNotInitException() { + super("Topic policies cache have not init."); + } + } + public static PulsarApi.ServerError getClientErrorCode(Throwable t) { return getClientErrorCode(t, true); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java new file mode 100644 index 0000000000000..7c561c9725023 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -0,0 +1,333 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.events.ActionType; +import org.apache.pulsar.common.events.EventType; +import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory; +import org.apache.pulsar.broker.systopic.SystemTopicClient; +import org.apache.pulsar.common.events.PulsarEvent; +import org.apache.pulsar.common.events.TopicPoliciesEvent; +import org.apache.pulsar.common.naming.NamespaceBundle; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.TopicPolicies; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Cached topic policies service will cache the system topic reader and the topic policies + * + * While reader cache for the namespace was removed, the topic policies will remove automatically. + */ +public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesService { + + private final PulsarService pulsarService; + private NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory; + + private final Map policiesCache = new ConcurrentHashMap<>(); + + private final Map ownedBundlesCountPerNamespace = new ConcurrentHashMap<>(); + + private final Map> readerCaches = new ConcurrentHashMap<>(); + + private final Map policyCacheInitMap = new ConcurrentHashMap<>(); + + public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) { + this.pulsarService = pulsarService; + } + + @Override + public CompletableFuture updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies) { + CompletableFuture result = new CompletableFuture<>(); + + createSystemTopicFactoryIfNeeded(); + SystemTopicClient systemTopicClient = namespaceEventsSystemTopicFactory.createSystemTopic(topicName.getNamespaceObject(), + EventType.TOPIC_POLICY); + + CompletableFuture writerFuture = systemTopicClient.newWriterAsync(); + writerFuture.whenComplete((writer, ex) -> { + if (ex != null) { + result.completeExceptionally(ex); + } else { + writer.writeAsync( + PulsarEvent.builder() + .actionType(ActionType.UPDATE) + .eventType(EventType.TOPIC_POLICY) + .topicPoliciesEvent( + TopicPoliciesEvent.builder() + .domain(topicName.getDomain().toString()) + .tenant(topicName.getTenant()) + .namespace(topicName.getNamespaceObject().getLocalName()) + .topic(topicName.getLocalName()) + .policies(policies) + .build()) + .build()).whenComplete(((messageId, e) -> { + if (e != null) { + result.completeExceptionally(e); + } else { + if (messageId != null) { + result.complete(null); + } else { + result.completeExceptionally(new RuntimeException("Got message id is null.")); + } + } + writer.closeAsync().whenComplete((v, cause) -> { + if (cause != null) { + log.error("[{}] Close writer error.", topicName, cause); + } else { + if (log.isDebugEnabled()) { + log.debug("[{}] Close writer success.", topicName); + } + } + }); + }) + ); + } + }); + return result; + } + + @Override + public TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesCacheNotInitException { + if (policyCacheInitMap.containsKey(topicName.getNamespaceObject()) + && !policyCacheInitMap.get(topicName.getNamespaceObject())) { + throw new TopicPoliciesCacheNotInitException(); + } + return policiesCache.get(topicName); + } + + @Override + public CompletableFuture getTopicPoliciesBypassCacheAsync(TopicName topicName) { + CompletableFuture result = new CompletableFuture<>(); + createSystemTopicFactoryIfNeeded(); + if (namespaceEventsSystemTopicFactory == null) { + result.complete(null); + return result; + } + SystemTopicClient systemTopicClient = namespaceEventsSystemTopicFactory.createSystemTopic(topicName.getNamespaceObject() + , EventType.TOPIC_POLICY); + systemTopicClient.newReaderAsync().thenAccept(r -> + fetchTopicPoliciesAsyncAndCloseReader(r, topicName, null, result)); + return result; + } + + @Override + public CompletableFuture addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { + CompletableFuture result = new CompletableFuture<>(); + NamespaceName namespace = namespaceBundle.getNamespaceObject(); + createSystemTopicFactoryIfNeeded(); + synchronized (this) { + if (readerCaches.get(namespace) != null) { + ownedBundlesCountPerNamespace.get(namespace).incrementAndGet(); + result.complete(null); + } else { + SystemTopicClient systemTopicClient = namespaceEventsSystemTopicFactory.createSystemTopic(namespace + , EventType.TOPIC_POLICY); + ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1)); + policyCacheInitMap.put(namespace, false); + CompletableFuture readerCompletableFuture = systemTopicClient.newReaderAsync(); + readerCaches.put(namespace, readerCompletableFuture); + readerCompletableFuture.whenComplete((reader, ex) -> { + if (ex != null) { + result.completeExceptionally(ex); + } else { + initPolicesCache(reader, result); + readMorePolicies(reader); + } + }); + } + } + return result; + } + + @Override + public CompletableFuture removeOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { + NamespaceName namespace = namespaceBundle.getNamespaceObject(); + AtomicInteger bundlesCount = ownedBundlesCountPerNamespace.get(namespace); + if (bundlesCount == null || bundlesCount.decrementAndGet() <= 0) { + CompletableFuture readerCompletableFuture = readerCaches.remove(namespace); + if (readerCompletableFuture != null) { + readerCompletableFuture.thenAccept(SystemTopicClient.Reader::closeAsync); + ownedBundlesCountPerNamespace.remove(namespace); + policyCacheInitMap.remove(namespace); + policiesCache.entrySet().removeIf(entry -> entry.getKey().getNamespaceObject().equals(namespace)); + } + } + return CompletableFuture.completedFuture(null); + } + + @Override + public void start() { + + pulsarService.getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener() { + + @Override + public void onLoad(NamespaceBundle bundle) { + addOwnedNamespaceBundleAsync(bundle); + } + + @Override + public void unLoad(NamespaceBundle bundle) { + removeOwnedNamespaceBundleAsync(bundle); + } + + @Override + public boolean test(NamespaceBundle namespaceBundle) { + return true; + } + + }); + } + + private void initPolicesCache(SystemTopicClient.Reader reader, CompletableFuture future) { + reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> { + if (ex != null) { + future.completeExceptionally(ex); + readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject()); + } + if (hasMore) { + reader.readNextAsync().whenComplete((msg, e) -> { + if (e != null) { + future.completeExceptionally(e); + readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject()); + } + refreshTopicPoliciesCache(msg); + initPolicesCache(reader, future); + }); + } else { + future.complete(null); + policyCacheInitMap.computeIfPresent(reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true); + } + }); + } + + private void readMorePolicies(SystemTopicClient.Reader reader) { + reader.readNextAsync().whenComplete((msg, ex) -> { + if (ex == null) { + refreshTopicPoliciesCache(msg); + readMorePolicies(reader); + } else { + if (ex instanceof PulsarClientException.AlreadyClosedException) { + log.error("Read more topic policies exception, close the read now!", ex); + NamespaceName namespace = reader.getSystemTopic().getTopicName().getNamespaceObject(); + ownedBundlesCountPerNamespace.remove(namespace); + readerCaches.remove(namespace); + } else { + readMorePolicies(reader); + } + } + }); + } + + private void refreshTopicPoliciesCache(Message msg) { + if (EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) { + TopicPoliciesEvent event = msg.getValue().getTopicPoliciesEvent(); + policiesCache.put( + TopicName.get(event.getDomain(), event.getTenant(), event.getNamespace(), event.getTopic()), + event.getPolicies() + ); + } + } + + private void createSystemTopicFactoryIfNeeded() { + if (namespaceEventsSystemTopicFactory == null) { + synchronized (this) { + if (namespaceEventsSystemTopicFactory == null) { + try { + namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarService.getClient()); + } catch (PulsarServerException e) { + log.error("Create namespace event system topic factory error.", e); + } + } + } + } + } + + private void fetchTopicPoliciesAsyncAndCloseReader(SystemTopicClient.Reader reader, TopicName topicName, TopicPolicies policies, + CompletableFuture future) { + reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> { + if (ex != null) { + future.completeExceptionally(ex); + } + if (hasMore) { + reader.readNextAsync().whenComplete((msg, e) -> { + if (e != null) { + future.completeExceptionally(e); + } + if (EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) { + TopicPoliciesEvent topicPoliciesEvent = msg.getValue().getTopicPoliciesEvent(); + if (topicName.equals(TopicName.get( + topicPoliciesEvent.getDomain(), + topicPoliciesEvent.getTenant(), + topicPoliciesEvent.getNamespace(), + topicPoliciesEvent.getTopic())) + ) { + fetchTopicPoliciesAsyncAndCloseReader(reader, topicName, topicPoliciesEvent.getPolicies(), future); + } else { + fetchTopicPoliciesAsyncAndCloseReader(reader, topicName, policies, future); + } + } + }); + } else { + future.complete(policies); + reader.closeAsync().whenComplete((v, e) -> { + if (e != null) { + log.error("[{}] Close reader error.", topicName, e); + } + }); + } + }); + } + + @VisibleForTesting + long getPoliciesCacheSize() { + return policiesCache.size(); + } + + @VisibleForTesting + long getReaderCacheCount() { + return readerCaches.size(); + } + + @VisibleForTesting + boolean checkReaderIsCached(NamespaceName namespaceName) { + return readerCaches.get(namespaceName) != null; + } + + @VisibleForTesting + Boolean getPoliciesCacheInit(NamespaceName namespaceName) { + return policyCacheInitMap.get(namespaceName); + } + + private static final Logger log = LoggerFactory.getLogger(SystemTopicBasedTopicPoliciesService.class); +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index aa147d4798bb8..b2b1882ec2c24 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -210,4 +210,8 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats default Optional getDispatchRateLimiter() { return Optional.empty(); } + + default boolean isSystemTopic() { + return false; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java new file mode 100644 index 0000000000000..1d9c382741b47 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException; +import org.apache.pulsar.common.naming.NamespaceBundle; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.TopicPolicies; +import org.apache.pulsar.common.util.FutureUtil; + +import java.util.concurrent.CompletableFuture; + +/** + * Topic policies service + */ +public interface TopicPoliciesService { + + TopicPoliciesService DISABLED = new TopicPoliciesServiceDisabled(); + + /** + * Update policies for a topic async + * @param topicName topic name + * @param policies policies for the topic name + */ + CompletableFuture updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies); + + /** + * Get policies for a topic async + * @param topicName topic name + * @return future of the topic policies + */ + TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesCacheNotInitException; + + /** + * Get policies for a topic without cache async + * @param topicName topic name + * @return future of the topic policies + */ + CompletableFuture getTopicPoliciesBypassCacheAsync(TopicName topicName); + + /** + * Add owned namespace bundle async. + * + * @param namespaceBundle namespace bundle + */ + CompletableFuture addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle); + + /** + * Remove owned namespace bundle async. + * + * @param namespaceBundle namespace bundle + */ + CompletableFuture removeOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle); + + /** + * Start the topic policy service. + */ + void start(); + + class TopicPoliciesServiceDisabled implements TopicPoliciesService { + + @Override + public CompletableFuture updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies) { + return FutureUtil.failedFuture(new UnsupportedOperationException("Topic policies service is disabled.")); + } + + @Override + public TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesCacheNotInitException { + return null; + } + + @Override + public CompletableFuture getTopicPoliciesBypassCacheAsync(TopicName topicName) { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { + //No-op + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture removeOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { + //No-op + return CompletableFuture.completedFuture(null); + } + + @Override + public void start() { + //No-op + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index cbf6718ea5542..e3bd7b00595d9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -230,7 +230,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS boolean isReplicatorStarted = addReplicationCluster(remoteCluster, this, cursor.getName(), localCluster); if (!isReplicatorStarted) { throw new NamingException( - PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster); + PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster); } } else if (cursor.getName().equals(DEDUPLICATION_CURSOR_NAME)) { // This is not a regular subscription, we are going to ignore it for now and let the message dedup logic @@ -238,7 +238,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS } else { final String subscriptionName = Codec.decode(cursor.getName()); subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor, - PersistentSubscription.isCursorFromReplicatedSubscription(cursor))); + PersistentSubscription.isCursorFromReplicatedSubscription(cursor))); // subscription-cursor gets activated by default: deactivate as there is no active subscription right // now subscriptions.get(subscriptionName).deactivateCursor(); @@ -266,7 +266,6 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS checkReplicatedSubscriptionControllerState(); } - // for testing purposes @VisibleForTesting PersistentTopic(String topic, BrokerService brokerService, ManagedLedger ledger, MessageDeduplication messageDeduplication) { @@ -1152,7 +1151,7 @@ public void checkCompaction() { .orElseThrow(() -> new KeeperException.NoNodeException()); - if (policies.compaction_threshold != 0 + if (isSystemTopic() || policies.compaction_threshold != 0 && currentCompaction.isDone()) { long backlogEstimate = 0; @@ -1854,7 +1853,7 @@ public boolean isBacklogQuotaExceeded(String producerName) { if ((retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold || retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) - && brokerService.isBacklogExceeded(this)) { + && isBacklogExceeded()) { log.info("[{}] Backlog quota exceeded. Cannot create producer [{}]", this.getName(), producerName); return true; } else { @@ -1864,6 +1863,28 @@ public boolean isBacklogQuotaExceeded(String producerName) { return false; } + /** + * @return determine if quota enforcement needs to be done for topic + */ + public boolean isBacklogExceeded() { + TopicName topicName = TopicName.get(getName()); + long backlogQuotaLimitInBytes = brokerService.getBacklogQuotaManager().getBacklogQuotaLimit(topicName.getNamespace()); + if (backlogQuotaLimitInBytes < 0) { + return false; + } + if (log.isDebugEnabled()) { + log.debug("[{}] - backlog quota limit = [{}]", getName(), backlogQuotaLimitInBytes); + } + + // check if backlog exceeded quota + long storageSize = getBacklogSize(); + if (log.isDebugEnabled()) { + log.debug("[{}] Storage size = [{}], limit [{}]", getName(), storageSize, backlogQuotaLimitInBytes); + } + + return (storageSize >= backlogQuotaLimitInBytes); + } + @Override public boolean isReplicated() { return !replicators.isEmpty(); @@ -2138,4 +2159,9 @@ Optional getReplicatedSubscriptionController( public CompactedTopic getCompactedTopic() { return compactedTopic; } + + @Override + public boolean isSystemTopic() { + return false; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java new file mode 100644 index 0000000000000..6720209f25e91 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.broker.service.persistent; + +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; + +public class SystemTopic extends PersistentTopic { + + public SystemTopic(String topic, ManagedLedger ledger, BrokerService brokerService) throws BrokerServiceException.NamingException { + super(topic, ledger, brokerService); + } + + @Override + public boolean isBacklogExceeded() { + return false; + } + + @Override + public boolean isSystemTopic() { + return true; + } + + @Override + public void checkMessageExpiry() { + // do nothing for system topic + } + + @Override + public void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode) { + // do nothing for system topic + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java new file mode 100644 index 0000000000000..911a99726a01a --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.systopic; + +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.events.EventType; +import org.apache.pulsar.common.events.EventsTopicNames; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NamespaceEventsSystemTopicFactory { + + private final PulsarClient client; + + public NamespaceEventsSystemTopicFactory(PulsarClient client) { + this.client = client; + } + + public SystemTopicClient createSystemTopic(NamespaceName namespaceName, EventType eventType) { + TopicName topicName = getSystemTopicName(namespaceName, eventType); + if (topicName != null) { + log.info("Create system topic {} for {}", topicName.toString(), eventType); + return new TopicPoliciesSystemTopicClient(client, topicName); + } else { + return null; + } + } + + public static TopicName getSystemTopicName(NamespaceName namespaceName, EventType eventType) { + switch (eventType) { + case TOPIC_POLICY: + return TopicName.get("persistent", namespaceName, + EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME); + default: + return null; + } + } + + private static final Logger log = LoggerFactory.getLogger(NamespaceEventsSystemTopicFactory.class); +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java new file mode 100644 index 0000000000000..c5a33522d1997 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.systopic; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.events.EventsTopicNames; +import org.apache.pulsar.common.events.PulsarEvent; +import org.apache.pulsar.common.naming.TopicName; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Pulsar system topic + */ +public interface SystemTopicClient { + + /** + * Get topic name of the system topic. + * @return topic name + */ + TopicName getTopicName(); + + /** + * Create a reader for the system topic. + * @return a new reader for the system topic + */ + Reader newReader() throws PulsarClientException; + + /** + * Create a reader for the system topic asynchronously. + */ + CompletableFuture newReaderAsync(); + + /** + * Create a writer for the system topic. + * @return writer for the system topic + */ + Writer newWriter() throws PulsarClientException; + + /** + * Create a writer for the system topic asynchronously. + */ + CompletableFuture newWriterAsync(); + + /** + * Close the system topic + */ + void close() throws Exception; + + /** + * Close the system topic asynchronously. + * @return + */ + CompletableFuture closeAsync(); + + /** + * Get all writers of the system topic + * @return writer list + */ + List getWriters(); + + /** + * Get all readers of the system topic + * @return reader list + */ + List getReaders(); + + /** + * Writer for system topic + */ + interface Writer { + /** + * Write event to the system topic + * @param event pulsar event + * @return message id + * @throws PulsarClientException exception while write event cause + */ + MessageId write(PulsarEvent event) throws PulsarClientException; + + /** + * Async write event to the system topic + * @param event pulsar event + * @return message id future + */ + CompletableFuture writeAsync(PulsarEvent event); + + /** + * Close the system topic writer. + */ + void close() throws IOException; + + /** + * Close the writer of the system topic asynchronously. + */ + CompletableFuture closeAsync(); + + /** + * Get the system topic of the writer + * @return system topic + */ + SystemTopicClient getSystemTopicClient(); + + } + + /** + * Reader for system topic + */ + interface Reader { + + /** + * Read event from system topic + * @return pulsar event + */ + Message readNext() throws PulsarClientException; + + /** + * Async read event from system topic + * @return pulsar event future + */ + CompletableFuture> readNextAsync(); + + /** + * Check has more events available for the reader. + * @return true if has remaining events, otherwise false + */ + boolean hasMoreEvents() throws PulsarClientException; + + /** + * Check has more events available for the reader asynchronously. + * @return true if has remaining events, otherwise false + */ + CompletableFuture hasMoreEventsAsync(); + + /** + * Close the system topic reader. + */ + void close() throws IOException; + + /** + * Close the reader of the system topic asynchronously. + */ + CompletableFuture closeAsync(); + + /** + * Get the system topic of the reader + * @return system topic + */ + SystemTopicClient getSystemTopic(); + } + + static boolean isSystemTopic(TopicName topicName) { + return EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME.equals(topicName.getLocalName()); + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClientBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClientBase.java new file mode 100644 index 0000000000000..e358190da1ae8 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClientBase.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.systopic; + +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.FutureUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +public abstract class SystemTopicClientBase implements SystemTopicClient { + + protected final TopicName topicName; + protected final PulsarClient client; + + protected final List writers; + protected final List readers; + + public SystemTopicClientBase(PulsarClient client, TopicName topicName) { + this.client = client; + this.topicName = topicName; + this.writers = Collections.synchronizedList(new ArrayList<>()); + this.readers = Collections.synchronizedList(new ArrayList<>()); + } + + @Override + public Reader newReader() throws PulsarClientException { + try { + return newReaderAsync().get(); + } catch (Exception e) { + throw new PulsarClientException(e); + } + } + + @Override + public CompletableFuture newReaderAsync() { + return newReaderAsyncInternal().thenCompose(reader -> { + readers.add(reader); + return CompletableFuture.completedFuture(reader); + }); + } + + @Override + public Writer newWriter() throws PulsarClientException { + try { + return newWriterAsync().get(); + } catch (Exception e) { + throw new PulsarClientException(e); + } + } + + @Override + public CompletableFuture newWriterAsync() { + return newWriterAsyncInternal().thenCompose(writer -> { + writers.add(writer); + return CompletableFuture.completedFuture(writer); + }); + } + + protected abstract CompletableFuture newWriterAsyncInternal(); + + protected abstract CompletableFuture newReaderAsyncInternal(); + + @Override + public CompletableFuture closeAsync() { + List> futures = new ArrayList<>(); + writers.forEach(writer -> futures.add(writer.closeAsync())); + readers.forEach(reader -> futures.add(reader.closeAsync())); + writers.clear(); + readers.clear(); + return FutureUtil.waitForAll(futures); + } + + @Override + public void close() throws Exception { + closeAsync().get(); + } + + @Override + public TopicName getTopicName() { + return topicName; + } + + @Override + public List getReaders() { + return readers; + } + + @Override + public List getWriters() { + return writers; + } + + private static final Logger log = LoggerFactory.getLogger(SystemTopicClientBase.class); +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java new file mode 100644 index 0000000000000..812bd072722d4 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.systopic; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.events.PulsarEvent; +import org.apache.pulsar.common.naming.TopicName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +/** + * System topic for topic policy + */ +public class TopicPoliciesSystemTopicClient extends SystemTopicClientBase { + + public TopicPoliciesSystemTopicClient(PulsarClient client, TopicName topicName) { + super(client, topicName); + } + + @Override + protected CompletableFuture newWriterAsyncInternal() { + return client.newProducer(Schema.AVRO(PulsarEvent.class)) + .topic(topicName.toString()) + .createAsync().thenCompose(producer -> { + if (log.isDebugEnabled()) { + log.debug("[{}] A new writer is created", topicName); + } + return CompletableFuture.completedFuture(new TopicPolicyWriter(producer, TopicPoliciesSystemTopicClient.this)); + }); + } + + @Override + protected CompletableFuture newReaderAsyncInternal() { + return client.newReader(Schema.AVRO(PulsarEvent.class)) + .topic(topicName.toString()) + .startMessageId(MessageId.earliest) + .readCompacted(true).createAsync() + .thenCompose(reader -> { + if (log.isDebugEnabled()) { + log.debug("[{}] A new reader is created", topicName); + } + return CompletableFuture.completedFuture(new TopicPolicyReader(reader, TopicPoliciesSystemTopicClient.this)); + }); + } + + private static class TopicPolicyWriter implements Writer { + + private final Producer producer; + private final SystemTopicClient systemTopicClient; + + private TopicPolicyWriter(Producer producer, SystemTopicClient systemTopicClient) { + this.producer = producer; + this.systemTopicClient = systemTopicClient; + } + + @Override + public MessageId write(PulsarEvent event) throws PulsarClientException { + return producer.newMessage().key(getEventKey(event)).value(event).send(); + } + + @Override + public CompletableFuture writeAsync(PulsarEvent event) { + return producer.newMessage().key(getEventKey(event)).value(event).sendAsync(); + } + + private String getEventKey(PulsarEvent event) { + return TopicName.get(event.getTopicPoliciesEvent().getDomain(), + event.getTopicPoliciesEvent().getTenant(), + event.getTopicPoliciesEvent().getNamespace(), + event.getTopicPoliciesEvent().getTopic()).toString(); + } + + @Override + public void close() throws IOException { + this.producer.close(); + systemTopicClient.getWriters().remove(TopicPolicyWriter.this); + } + + @Override + public CompletableFuture closeAsync() { + return producer.closeAsync(); + } + + @Override + public SystemTopicClient getSystemTopicClient() { + return systemTopicClient; + } + } + + private static class TopicPolicyReader implements Reader { + + private final org.apache.pulsar.client.api.Reader reader; + private final TopicPoliciesSystemTopicClient systemTopic; + + private TopicPolicyReader(org.apache.pulsar.client.api.Reader reader, + TopicPoliciesSystemTopicClient systemTopic) { + this.reader = reader; + this.systemTopic = systemTopic; + } + + @Override + public Message readNext() throws PulsarClientException { + return reader.readNext(); + } + + @Override + public CompletableFuture> readNextAsync() { + return reader.readNextAsync(); + } + + @Override + public boolean hasMoreEvents() throws PulsarClientException { + return reader.hasMessageAvailable(); + } + + @Override + public CompletableFuture hasMoreEventsAsync() { + return reader.hasMessageAvailableAsync(); + } + + @Override + public void close() throws IOException { + this.reader.close(); + systemTopic.getReaders().remove(TopicPolicyReader.this); + } + + @Override + public CompletableFuture closeAsync() { + return reader.closeAsync().thenCompose(v -> { + systemTopic.getReaders().remove(TopicPolicyReader.this); + return CompletableFuture.completedFuture(null); + }); + } + + @Override + public SystemTopicClient getSystemTopic() { + return systemTopic; + } + } + + private static final Logger log = LoggerFactory.getLogger(TopicPoliciesSystemTopicClient.class); +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index eb4dad7e78e03..a2cb3031f1246 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -2404,7 +2404,7 @@ public void testCompactionStatus() throws Exception { assertNotNull(pulsar.getBrokerService().getTopicReference(topicName)); assertEquals(admin.topics().compactionStatus(topicName).status, - LongRunningProcessStatus.Status.NOT_RUN); + LongRunningProcessStatus.Status.NOT_RUN); // mock actual compaction, we don't need to really run it CompletableFuture promise = new CompletableFuture(); @@ -2413,12 +2413,12 @@ public void testCompactionStatus() throws Exception { admin.topics().triggerCompaction(topicName); assertEquals(admin.topics().compactionStatus(topicName).status, - LongRunningProcessStatus.Status.RUNNING); + LongRunningProcessStatus.Status.RUNNING); promise.complete(1L); assertEquals(admin.topics().compactionStatus(topicName).status, - LongRunningProcessStatus.Status.SUCCESS); + LongRunningProcessStatus.Status.SUCCESS); CompletableFuture errorPromise = new CompletableFuture(); doReturn(errorPromise).when(compactor).compact(topicName); @@ -2426,7 +2426,7 @@ public void testCompactionStatus() throws Exception { errorPromise.completeExceptionally(new Exception("Failed at something")); assertEquals(admin.topics().compactionStatus(topicName).status, - LongRunningProcessStatus.Status.ERROR); + LongRunningProcessStatus.Status.ERROR); assertTrue(admin.topics().compactionStatus(topicName).lastError.contains("Failed at something")); } @@ -2530,4 +2530,4 @@ public void testGetTtlDurationDefaultInSeconds() throws Exception { int seconds = admin.namespaces().getPolicies("prop-xyz/ns1").message_ttl_in_seconds; assertEquals(seconds, 3600); } -} +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 66193a0b6bacd..b018d425d3907 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -1289,4 +1289,4 @@ private void mockWebUrl(URL localWebServiceUrl, NamespaceName namespace) throws doReturn(true).when(nsSvc) .isServiceUnitOwned(Mockito.argThat(bundle -> bundle.getNamespaceObject().equals(namespace))); } -} +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index 2ddff57d54b1c..7b42a487c908a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -2007,7 +2007,7 @@ public void testCompactionStatus() throws Exception { assertNotNull(pulsar.getBrokerService().getTopicReference(topicName)); assertEquals(admin.topics().compactionStatus(topicName).status, - LongRunningProcessStatus.Status.NOT_RUN); + LongRunningProcessStatus.Status.NOT_RUN); // mock actual compaction, we don't need to really run it CompletableFuture promise = new CompletableFuture(); @@ -2016,12 +2016,12 @@ public void testCompactionStatus() throws Exception { admin.topics().triggerCompaction(topicName); assertEquals(admin.topics().compactionStatus(topicName).status, - LongRunningProcessStatus.Status.RUNNING); + LongRunningProcessStatus.Status.RUNNING); promise.complete(1L); assertEquals(admin.topics().compactionStatus(topicName).status, - LongRunningProcessStatus.Status.SUCCESS); + LongRunningProcessStatus.Status.SUCCESS); CompletableFuture errorPromise = new CompletableFuture(); doReturn(errorPromise).when(compactor).compact(topicName); @@ -2029,8 +2029,8 @@ public void testCompactionStatus() throws Exception { errorPromise.completeExceptionally(new Exception("Failed at something")); assertEquals(admin.topics().compactionStatus(topicName).status, - LongRunningProcessStatus.Status.ERROR); + LongRunningProcessStatus.Status.ERROR); assertTrue(admin.topics().compactionStatus(topicName) - .lastError.contains("Failed at something")); + .lastError.contains("Failed at something")); } -} +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 33fbea4e9d172..7bc4b40bb7f41 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -149,8 +149,8 @@ protected final void init() throws Exception { sameThreadOrderedSafeExecutor = new SameThreadOrderedSafeExecutor(); bkExecutor = Executors.newSingleThreadExecutor( new ThreadFactoryBuilder().setNameFormat("mock-pulsar-bk") - .setUncaughtExceptionHandler((thread, ex) -> log.info("Uncaught exception", ex)) - .build()); + .setUncaughtExceptionHandler((thread, ex) -> log.info("Uncaught exception", ex)) + .build()); mockZooKeeper = createMockZooKeeper(); mockBookKeeper = createMockBookKeeper(mockZooKeeper, bkExecutor); @@ -359,4 +359,4 @@ public static void setFieldValue(Class clazz, Object classObj, String fieldNa } private static final Logger log = LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class); -} +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java index 8cb4e0a2ea97f..3dc9e133d18c1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java @@ -534,4 +534,4 @@ private NamespaceBundle makeBundle(final String property, final String cluster, BoundType.CLOSED)); } -} +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java index 551e45cb1c23e..f2fe3f5ec8c13 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java @@ -126,4 +126,4 @@ public void testGetAllPartitions() throws PulsarAdminException, ExecutionExcepti admin.namespaces().deleteNamespace(namespace); } -} +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index a2f27822d0008..282f8acf9347e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -919,7 +919,7 @@ public void testCreateNamespacePolicy() throws Exception { /** * It verifies that unloading bundle gracefully closes managed-ledger before removing ownership to avoid bad-zk * version. - * + * * @throws Exception */ @Test @@ -961,4 +961,4 @@ public void testStuckTopicUnloading() throws Exception { } assertNull(ledgers.get(topicMlName)); } -} +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index 8a678feb8c3c2..9c8667b2ae428 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -1589,4 +1589,4 @@ public void testWithEventTime() throws Exception { assertEquals(msg.getValue(), "test"); assertEquals(msg.getEventTime(), 5); } -} +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java new file mode 100644 index 0000000000000..132d8b17557be --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import com.google.common.collect.Sets; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException; +import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.policies.data.TopicPolicies; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.concurrent.ExecutionException; + +public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServiceBaseTest { + + private static final String NAMESPACE1 = "system-topic/namespace-1"; + private static final String NAMESPACE2 = "system-topic/namespace-2"; + private static final String NAMESPACE3 = "system-topic/namespace-3"; + + private static final TopicName TOPIC1 = TopicName.get("persistent", NamespaceName.get(NAMESPACE1), "topic-1"); + private static final TopicName TOPIC2 = TopicName.get("persistent", NamespaceName.get(NAMESPACE1), "topic-2"); + private static final TopicName TOPIC3 = TopicName.get("persistent", NamespaceName.get(NAMESPACE2), "topic-1"); + private static final TopicName TOPIC4 = TopicName.get("persistent", NamespaceName.get(NAMESPACE2), "topic-2"); + private static final TopicName TOPIC5 = TopicName.get("persistent", NamespaceName.get(NAMESPACE3), "topic-1"); + private static final TopicName TOPIC6 = TopicName.get("persistent", NamespaceName.get(NAMESPACE3), "topic-2"); + + private NamespaceEventsSystemTopicFactory systemTopicFactory; + private SystemTopicBasedTopicPoliciesService systemTopicBasedTopicPoliciesService; + + @BeforeMethod + @Override + protected void setup() throws Exception { + conf.setSystemTopicEnabled(true); + conf.setTopicLevelPoliciesEnabled(true); + super.internalSetup(); + prepareData(); + } + + @AfterMethod + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testGetPolicy() throws ExecutionException, InterruptedException, TopicPoliciesCacheNotInitException { + + // Init topic policies + TopicPolicies initPolicy = TopicPolicies.builder() + .maxConsumerPerTopic(10) + .build(); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, initPolicy); + + Assert.assertNull(systemTopicBasedTopicPoliciesService.getPoliciesCacheInit(TOPIC1.getNamespaceObject())); + + Thread.sleep(1000); + + Assert.assertTrue(systemTopicBasedTopicPoliciesService.getPoliciesCacheInit(TOPIC1.getNamespaceObject())); + + // Assert broker is cache all topic policies + Assert.assertEquals(10, systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1).getMaxConsumerPerTopic().intValue()); + + // Update policy for TOPIC1 + TopicPolicies policies1 = TopicPolicies.builder() + .maxProducerPerTopic(1) + .build(); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, policies1).get(); + + // Update policy for TOPIC2 + TopicPolicies policies2 = TopicPolicies.builder() + .maxProducerPerTopic(2) + .build(); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, policies2).get(); + + // Update policy for TOPIC3 + TopicPolicies policies3 = TopicPolicies.builder() + .maxProducerPerTopic(3) + .build(); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC3, policies3).get(); + + // Update policy for TOPIC4 + TopicPolicies policies4 = TopicPolicies.builder() + .maxProducerPerTopic(4) + .build(); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC4, policies4).get(); + + // Update policy for TOPIC5 + TopicPolicies policies5 = TopicPolicies.builder() + .maxProducerPerTopic(5) + .build(); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC5, policies5).get(); + + // Update policy for TOPIC6 + TopicPolicies policies6 = TopicPolicies.builder() + .maxProducerPerTopic(6) + .build(); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC6, policies6).get(); + + Thread.sleep(1000); + + TopicPolicies policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1); + TopicPolicies policiesGet2 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2); + TopicPolicies policiesGet3 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC3); + TopicPolicies policiesGet4 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC4); + TopicPolicies policiesGet5 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC5); + TopicPolicies policiesGet6 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC6); + + Assert.assertEquals(policiesGet1, policies1); + Assert.assertEquals(policiesGet2, policies2); + Assert.assertEquals(policiesGet3, policies3); + Assert.assertEquals(policiesGet4, policies4); + Assert.assertEquals(policiesGet5, policies5); + Assert.assertEquals(policiesGet6, policies6); + + // Remove reader cache will remove policies cache + Assert.assertEquals(systemTopicBasedTopicPoliciesService.getPoliciesCacheSize(), 6); + + // Check reader cache is correct. + Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE1))); + Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE2))); + Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE3))); + + policies1.setMaxProducerPerTopic(101); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, policies1); + policies2.setMaxProducerPerTopic(102); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, policies2); + policies2.setMaxProducerPerTopic(103); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, policies2); + policies1.setMaxProducerPerTopic(104); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, policies1); + policies2.setMaxProducerPerTopic(105); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, policies2); + policies1.setMaxProducerPerTopic(106); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, policies1); + + Thread.sleep(1000); + + // reader for NAMESPACE1 will back fill the reader cache + policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1); + policiesGet2 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2); + Assert.assertEquals(policies1, policiesGet1); + Assert.assertEquals(policies2, policiesGet2); + + // Check reader cache is correct. + Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE2))); + Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE1))); + Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE3))); + + // Check get without cache + policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPoliciesBypassCacheAsync(TOPIC1).get(); + Assert.assertEquals(policies1, policiesGet1); + } + + private void prepareData() throws PulsarAdminException { + admin.clusters().createCluster("test", new ClusterData(pulsar.getBrokerServiceUrl())); + admin.tenants().createTenant("system-topic", + new TenantInfo(Sets.newHashSet(), Sets.newHashSet("test"))); + admin.namespaces().createNamespace(NAMESPACE1); + admin.namespaces().createNamespace(NAMESPACE2); + admin.namespaces().createNamespace(NAMESPACE3); + systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient); + systemTopicBasedTopicPoliciesService = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPoliciesServiceDisableTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPoliciesServiceDisableTest.java new file mode 100644 index 0000000000000..d6d4d3a9cbcc0 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPoliciesServiceDisableTest.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; + +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.TopicPolicies; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class TopicPoliciesServiceDisableTest extends MockedPulsarServiceBaseTest { + + private TopicPoliciesService systemTopicBasedTopicPoliciesService; + + @BeforeMethod + @Override + protected void setup() throws Exception { + conf.setTopicLevelPoliciesEnabled(false); + super.internalSetup(); + prepareData(); + } + + @AfterMethod + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testTopicLevelPoliciesDisabled() { + try { + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TopicName.get("test"), new TopicPolicies()).get(); + } catch (Exception e) { + Assert.assertTrue(e.getCause() instanceof UnsupportedOperationException); + } + } + + private void prepareData() { + systemTopicBasedTopicPoliciesService = pulsar.getTopicPoliciesService(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index 81ecfc90e8d3a..948bdbd31bcde 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -521,4 +521,4 @@ public String toString() { } } -} +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java new file mode 100644 index 0000000000000..15aa63f6158d1 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.systopic; + +import com.google.common.collect.Sets; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.common.events.ActionType; +import org.apache.pulsar.common.events.EventType; +import org.apache.pulsar.common.events.PulsarEvent; +import org.apache.pulsar.common.events.TopicPoliciesEvent; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.policies.data.TopicPolicies; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class NamespaceEventsSystemTopicServiceTest extends MockedPulsarServiceBaseTest { + + private static final Logger log = LoggerFactory.getLogger(NamespaceEventsSystemTopicServiceTest.class); + + private static final String NAMESPACE1 = "system-topic/namespace-1"; + private static final String NAMESPACE2 = "system-topic/namespace-2"; + private static final String NAMESPACE3 = "system-topic/namespace-3"; + + private NamespaceEventsSystemTopicFactory systemTopicFactory; + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + prepareData(); + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testSendAndReceiveNamespaceEvents() throws Exception { + SystemTopicClient systemTopicClientForNamespace1 = systemTopicFactory.createSystemTopic(NamespaceName.get(NAMESPACE1), EventType.TOPIC_POLICY); + TopicPolicies policies = TopicPolicies.builder() + .maxProducerPerTopic(10) + .build(); + PulsarEvent event = PulsarEvent.builder() + .eventType(EventType.TOPIC_POLICY) + .actionType(ActionType.INSERT) + .topicPoliciesEvent(TopicPoliciesEvent.builder() + .domain("persistent") + .tenant("system-topic") + .namespace(NamespaceName.get(NAMESPACE1).getLocalName()) + .topic("my-topic") + .policies(policies) + .build()) + .build(); + systemTopicClientForNamespace1.newWriter().write(event); + SystemTopicClient.Reader reader = systemTopicClientForNamespace1.newReader(); + Message received = reader.readNext(); + log.info("Receive pulsar event from system topic : {}", received.getValue()); + + // test event send and receive + Assert.assertEquals(received.getValue(), event); + Assert.assertEquals(systemTopicClientForNamespace1.getWriters().size(), 1); + Assert.assertEquals(systemTopicClientForNamespace1.getReaders().size(), 1); + + // test new reader read + SystemTopicClient.Reader reader1 = systemTopicClientForNamespace1.newReader(); + Message received1 = reader1.readNext(); + log.info("Receive pulsar event from system topic : {}", received1.getValue()); + Assert.assertEquals(received1.getValue(), event); + + // test writers and readers + Assert.assertEquals(systemTopicClientForNamespace1.getReaders().size(), 2); + SystemTopicClient.Writer writer = systemTopicClientForNamespace1.newWriter(); + Assert.assertEquals(systemTopicClientForNamespace1.getWriters().size(), 2); + writer.close(); + reader.close(); + Assert.assertEquals(systemTopicClientForNamespace1.getWriters().size(), 1); + Assert.assertEquals(systemTopicClientForNamespace1.getReaders().size(), 1); + systemTopicClientForNamespace1.close(); + Assert.assertEquals(systemTopicClientForNamespace1.getWriters().size(), 0); + Assert.assertEquals(systemTopicClientForNamespace1.getReaders().size(), 0); + } + + private void prepareData() throws PulsarAdminException { + admin.clusters().createCluster("test", new ClusterData(pulsar.getBrokerServiceUrl())); + admin.tenants().createTenant("system-topic", + new TenantInfo(Sets.newHashSet(), Sets.newHashSet("test"))); + admin.namespaces().createNamespace(NAMESPACE1); + admin.namespaces().createNamespace(NAMESPACE2); + admin.namespaces().createNamespace(NAMESPACE3); + systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index 009596f074628..26b12b86d6f8b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -612,4 +612,4 @@ private void stopWebSocketClient(WebSocketClient... clients) { } private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeTest.class); -} +} \ No newline at end of file diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/events/ActionType.java b/pulsar-common/src/main/java/org/apache/pulsar/common/events/ActionType.java new file mode 100644 index 0000000000000..0b626dcbd7232 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/events/ActionType.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.events; + +/** + * Pulsar event action type. + */ +public enum ActionType { + + INSERT, + DELETE, + UPDATE, + NONE +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventType.java b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventType.java new file mode 100644 index 0000000000000..630d8e8cc44ee --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventType.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.events; + +/** + * Pulsar system event type. + */ +public enum EventType { + + /** + * Topic policy events. + */ + TOPIC_POLICY +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java new file mode 100644 index 0000000000000..72b66bb9d5f0c --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.events; + +/** + * System topic name for the event type. + */ +public class EventsTopicNames { + + + /** + * Local topic name for the namespace events. + */ + public static final String NAMESPACE_EVENTS_LOCAL_NAME = "__change_events"; + +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/events/PulsarEvent.java b/pulsar-common/src/main/java/org/apache/pulsar/common/events/PulsarEvent.java new file mode 100644 index 0000000000000..00e98ca679b3f --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/events/PulsarEvent.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.events; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Pulsar base event. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class PulsarEvent { + + private EventType eventType; + private ActionType actionType; + private TopicPoliciesEvent topicPoliciesEvent; +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/events/TopicPoliciesEvent.java b/pulsar-common/src/main/java/org/apache/pulsar/common/events/TopicPoliciesEvent.java new file mode 100644 index 0000000000000..995bd77b9f139 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/events/TopicPoliciesEvent.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.events; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.pulsar.common.policies.data.TopicPolicies; + +/** + * Topic policies event. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TopicPoliciesEvent { + + private String domain; + private String tenant; + private String namespace; + private String topic; + private TopicPolicies policies; +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/events/package-info.java b/pulsar-common/src/main/java/org/apache/pulsar/common/events/package-info.java new file mode 100644 index 0000000000000..240ba51c07bb1 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/events/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.events; \ No newline at end of file diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index d54e0c73506d8..2946157410988 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -197,8 +197,10 @@ public static void setStorageQuota(Policies polices, BacklogQuota quota) { @Override public String toString() { return MoreObjects.toStringHelper(this).add("auth_policies", auth_policies) - .add("replication_clusters", replication_clusters).add("bundles", bundles) - .add("backlog_quota_map", backlog_quota_map).add("persistence", persistence) + .add("replication_clusters", replication_clusters) + .add("bundles", bundles) + .add("backlog_quota_map", backlog_quota_map) + .add("persistence", persistence) .add("deduplicationEnabled", deduplicationEnabled) .add("autoTopicCreationOverride", autoTopicCreationOverride) .add("autoSubscriptionCreationOverride", autoSubscriptionCreationOverride) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java new file mode 100644 index 0000000000000..56fbf8368eed8 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +import com.google.common.collect.Maps; + +import java.util.Map; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + + +/** + * Topic policies. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TopicPolicies { + + private Map backLogQuotaMap = Maps.newHashMap(); + private PersistencePolicies persistence = null; + private RetentionPolicies retentionPolicies = null; + private Boolean deduplicationEnabled = null; + private Integer messageTTLInSeconds = null; + private Integer maxProducerPerTopic = null; + private Integer maxConsumerPerTopic = null; + private Integer maxConsumersPerSubscription = null; + + public boolean isBacklogQuotaSet() { + return !backLogQuotaMap.isEmpty(); + } + + public boolean isPersistentPolicySet() { + return persistence != null; + } + + public boolean isRetentionSet() { + return retentionPolicies != null; + } + + public boolean isDeduplicationSet() { + return deduplicationEnabled != null; + } + + public boolean isMessageTTLSet() { + return messageTTLInSeconds != null; + } + + public boolean isMaxProducerPerTopicSet() { + return maxProducerPerTopic != null; + } + + public boolean isMaxConsumerPerTopicSet() { + return maxConsumerPerTopic != null; + } + + public boolean isMaxConsumersPerSubscriptionSet() { + return maxConsumersPerSubscription != null; + } +}