diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 864fe7f5d65e17..75b15c15df212d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -438,14 +438,21 @@ private PublishRate publishRateInBroker(ServiceConfiguration config) { return new PublishRate(config.getMaxPublishRatePerTopicInMessages(), config.getMaxPublishRatePerTopicInBytes()); } - protected boolean isProducersExceeded() { + protected boolean isProducersExceeded(Producer producer) { + if (isSystemTopic() || producer.isRemote()) { + return false; + } Integer maxProducers = topicPolicies.getMaxProducersPerTopic().get(); - if (maxProducers > 0 && maxProducers <= producers.size()) { + if (maxProducers != null && maxProducers > 0 && maxProducers <= getUserCreatedProducersSize()) { return true; } return false; } + private long getUserCreatedProducersSize() { + return producers.values().stream().filter(p -> !p.isRemote()).count(); + } + protected void registerTopicPolicyListener() { if (brokerService.pulsar().getConfig().isSystemTopicEnabled() && brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) { @@ -487,14 +494,21 @@ public int getNumberOfSameAddressProducers(final String clientAddress) { } protected boolean isConsumersExceededOnTopic() { - int maxConsumersPerTopic = topicPolicies.getMaxConsumerPerTopic().get(); - if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= getNumberOfConsumers()) { + if (isSystemTopic()) { + return false; + } + Integer maxConsumersPerTopic = topicPolicies.getMaxConsumerPerTopic().get(); + if (maxConsumersPerTopic != null && maxConsumersPerTopic > 0 + && maxConsumersPerTopic <= getNumberOfConsumers()) { return true; } return false; } protected boolean isSameAddressConsumersExceededOnTopic(Consumer consumer) { + if (isSystemTopic()) { + return false; + } final int maxSameAddressConsumers = brokerService.pulsar().getConfiguration() .getMaxSameAddressConsumersPerTopic(); @@ -951,7 +965,7 @@ protected void checkTopicFenced() throws BrokerServiceException { } protected void internalAddProducer(Producer producer) throws BrokerServiceException { - if (isProducersExceeded()) { + if (isProducersExceeded(producer)) { log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic); throw new BrokerServiceException.ProducerBusyException("Topic reached max producers limit"); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index e4bcb92b58dd6b..ea20d413484cf4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -3321,6 +3321,9 @@ public MessageDeduplication getMessageDeduplication() { } private boolean checkMaxSubscriptionsPerTopicExceed(String subscriptionName) { + if (isSystemTopic()) { + return false; + } //Existing subscriptions are not affected if (StringUtils.isNotEmpty(subscriptionName) && getSubscription(subscriptionName) != null) { return false; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index b8f8abc9a62654..7f31ce39c96202 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -75,6 +75,7 @@ import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.api.RawReader; import org.apache.pulsar.client.api.Schema; @@ -1505,4 +1506,36 @@ public void testWhenUpdateReplicationCluster() throws Exception { assertTrue(topic.getReplicators().isEmpty()); }); } + + @Test + public void testReplicatorProducerNotExceed() throws Exception { + log.info("--- testReplicatorProducerNotExceed ---"); + String namespace1 = "pulsar/ns11"; + admin1.namespaces().createNamespace(namespace1); + admin1.namespaces().setNamespaceReplicationClusters(namespace1, Sets.newHashSet("r1", "r2")); + final TopicName dest1 = TopicName.get( + BrokerTestUtil.newUniqueName("persistent://" + namespace1 + "/testReplicatorProducerNotExceed1")); + String namespace2 = "pulsar/ns22"; + admin2.namespaces().createNamespace(namespace2); + admin2.namespaces().setNamespaceReplicationClusters(namespace2, Sets.newHashSet("r1", "r2")); + final TopicName dest2 = TopicName.get( + BrokerTestUtil.newUniqueName("persistent://" + namespace1 + "/testReplicatorProducerNotExceed2")); + admin1.topics().createPartitionedTopic(dest1.toString(), 1); + admin1.topicPolicies().setMaxProducers(dest1.toString(), 1); + admin2.topics().createPartitionedTopic(dest2.toString(), 1); + admin2.topicPolicies().setMaxProducers(dest2.toString(), 1); + @Cleanup + MessageProducer producer1 = new MessageProducer(url1, dest1); + log.info("--- Starting producer1 --- " + url1); + + producer1.produce(1); + + @Cleanup + MessageProducer producer2 = new MessageProducer(url2, dest2); + log.info("--- Starting producer2 --- " + url2); + + producer2.produce(1); + + Assert.assertThrows(PulsarClientException.ProducerBusyException.class, () -> new MessageProducer(url2, dest2)); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index 9ab32d5ffa750a..e79197bb1b686b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.events.PulsarEvent; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; @@ -202,7 +203,7 @@ public void testHeartbeatTopicNotAllowedToSendEvent() throws Exception { } @Test - private void testSetBacklogCausedCreatingProducerFailure() throws Exception { + public void testSetBacklogCausedCreatingProducerFailure() throws Exception { final String ns = "prop/ns-test"; final String topic = ns + "/topic-1"; @@ -260,4 +261,37 @@ private void testSetBacklogCausedCreatingProducerFailure() throws Exception { Assert.fail("failed to create producer"); } } + + @Test + public void testSystemTopicNotCheckExceed() throws Exception { + final String ns = "prop/ns-test"; + final String topic = ns + "/topic-1"; + + admin.namespaces().createNamespace(ns, 2); + admin.topics().createPartitionedTopic(String.format("persistent://%s", topic), 1); + + admin.namespaces().setMaxConsumersPerTopic(ns, 1); + admin.topicPolicies().setMaxConsumers(topic, 1); + NamespaceEventsSystemTopicFactory systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient); + TopicPoliciesSystemTopicClient systemTopicClientForNamespace = systemTopicFactory + .createTopicPoliciesSystemTopicClient(NamespaceName.get(ns)); + SystemTopicClient.Reader reader1 = systemTopicClientForNamespace.newReader(); + SystemTopicClient.Reader reader2 = systemTopicClientForNamespace.newReader(); + + admin.topicPolicies().setMaxProducers(topic, 1); + + CompletableFuture> writer1 = systemTopicClientForNamespace.newWriterAsync(); + CompletableFuture> writer2 = systemTopicClientForNamespace.newWriterAsync(); + CompletableFuture f1 = admin.topicPolicies().setCompactionThresholdAsync(topic, 1L); + + FutureUtil.waitForAll(List.of(writer1, writer2, f1)).join(); + Assert.assertTrue(reader1.hasMoreEvents()); + Assert.assertNotNull(reader1.readNext()); + Assert.assertTrue(reader2.hasMoreEvents()); + Assert.assertNotNull(reader2.readNext()); + reader1.close(); + reader2.close(); + writer1.get().close(); + writer2.get().close(); + } }