Skip to content

Commit

Permalink
[improve][broker] System topic writer/reader connection not counted. (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- authored and lifepuzzlefun committed Jan 10, 2023
1 parent d72bdb2 commit 383bf16
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -438,14 +438,21 @@ private PublishRate publishRateInBroker(ServiceConfiguration config) {
return new PublishRate(config.getMaxPublishRatePerTopicInMessages(), config.getMaxPublishRatePerTopicInBytes());
}

protected boolean isProducersExceeded() {
protected boolean isProducersExceeded(Producer producer) {
if (isSystemTopic() || producer.isRemote()) {
return false;
}
Integer maxProducers = topicPolicies.getMaxProducersPerTopic().get();
if (maxProducers > 0 && maxProducers <= producers.size()) {
if (maxProducers != null && maxProducers > 0 && maxProducers <= getUserCreatedProducersSize()) {
return true;
}
return false;
}

private long getUserCreatedProducersSize() {
return producers.values().stream().filter(p -> !p.isRemote()).count();
}

protected void registerTopicPolicyListener() {
if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
&& brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
Expand Down Expand Up @@ -487,14 +494,21 @@ public int getNumberOfSameAddressProducers(final String clientAddress) {
}

protected boolean isConsumersExceededOnTopic() {
int maxConsumersPerTopic = topicPolicies.getMaxConsumerPerTopic().get();
if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= getNumberOfConsumers()) {
if (isSystemTopic()) {
return false;
}
Integer maxConsumersPerTopic = topicPolicies.getMaxConsumerPerTopic().get();
if (maxConsumersPerTopic != null && maxConsumersPerTopic > 0
&& maxConsumersPerTopic <= getNumberOfConsumers()) {
return true;
}
return false;
}

protected boolean isSameAddressConsumersExceededOnTopic(Consumer consumer) {
if (isSystemTopic()) {
return false;
}
final int maxSameAddressConsumers = brokerService.pulsar().getConfiguration()
.getMaxSameAddressConsumersPerTopic();

Expand Down Expand Up @@ -951,7 +965,7 @@ protected void checkTopicFenced() throws BrokerServiceException {
}

protected void internalAddProducer(Producer producer) throws BrokerServiceException {
if (isProducersExceeded()) {
if (isProducersExceeded(producer)) {
log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
throw new BrokerServiceException.ProducerBusyException("Topic reached max producers limit");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3321,6 +3321,9 @@ public MessageDeduplication getMessageDeduplication() {
}

private boolean checkMaxSubscriptionsPerTopicExceed(String subscriptionName) {
if (isSystemTopic()) {
return false;
}
//Existing subscriptions are not affected
if (StringUtils.isNotEmpty(subscriptionName) && getSubscription(subscriptionName) != null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Schema;
Expand Down Expand Up @@ -1505,4 +1506,36 @@ public void testWhenUpdateReplicationCluster() throws Exception {
assertTrue(topic.getReplicators().isEmpty());
});
}

@Test
public void testReplicatorProducerNotExceed() throws Exception {
log.info("--- testReplicatorProducerNotExceed ---");
String namespace1 = "pulsar/ns11";
admin1.namespaces().createNamespace(namespace1);
admin1.namespaces().setNamespaceReplicationClusters(namespace1, Sets.newHashSet("r1", "r2"));
final TopicName dest1 = TopicName.get(
BrokerTestUtil.newUniqueName("persistent://" + namespace1 + "/testReplicatorProducerNotExceed1"));
String namespace2 = "pulsar/ns22";
admin2.namespaces().createNamespace(namespace2);
admin2.namespaces().setNamespaceReplicationClusters(namespace2, Sets.newHashSet("r1", "r2"));
final TopicName dest2 = TopicName.get(
BrokerTestUtil.newUniqueName("persistent://" + namespace1 + "/testReplicatorProducerNotExceed2"));
admin1.topics().createPartitionedTopic(dest1.toString(), 1);
admin1.topicPolicies().setMaxProducers(dest1.toString(), 1);
admin2.topics().createPartitionedTopic(dest2.toString(), 1);
admin2.topicPolicies().setMaxProducers(dest2.toString(), 1);
@Cleanup
MessageProducer producer1 = new MessageProducer(url1, dest1);
log.info("--- Starting producer1 --- " + url1);

producer1.produce(1);

@Cleanup
MessageProducer producer2 = new MessageProducer(url2, dest2);
log.info("--- Starting producer2 --- " + url2);

producer2.produce(1);

Assert.assertThrows(PulsarClientException.ProducerBusyException.class, () -> new MessageProducer(url2, dest2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.events.PulsarEvent;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -202,7 +203,7 @@ public void testHeartbeatTopicNotAllowedToSendEvent() throws Exception {
}

@Test
private void testSetBacklogCausedCreatingProducerFailure() throws Exception {
public void testSetBacklogCausedCreatingProducerFailure() throws Exception {
final String ns = "prop/ns-test";
final String topic = ns + "/topic-1";

Expand Down Expand Up @@ -260,4 +261,37 @@ private void testSetBacklogCausedCreatingProducerFailure() throws Exception {
Assert.fail("failed to create producer");
}
}

@Test
public void testSystemTopicNotCheckExceed() throws Exception {
final String ns = "prop/ns-test";
final String topic = ns + "/topic-1";

admin.namespaces().createNamespace(ns, 2);
admin.topics().createPartitionedTopic(String.format("persistent://%s", topic), 1);

admin.namespaces().setMaxConsumersPerTopic(ns, 1);
admin.topicPolicies().setMaxConsumers(topic, 1);
NamespaceEventsSystemTopicFactory systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient);
TopicPoliciesSystemTopicClient systemTopicClientForNamespace = systemTopicFactory
.createTopicPoliciesSystemTopicClient(NamespaceName.get(ns));
SystemTopicClient.Reader reader1 = systemTopicClientForNamespace.newReader();
SystemTopicClient.Reader reader2 = systemTopicClientForNamespace.newReader();

admin.topicPolicies().setMaxProducers(topic, 1);

CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer1 = systemTopicClientForNamespace.newWriterAsync();
CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer2 = systemTopicClientForNamespace.newWriterAsync();
CompletableFuture<Void> f1 = admin.topicPolicies().setCompactionThresholdAsync(topic, 1L);

FutureUtil.waitForAll(List.of(writer1, writer2, f1)).join();
Assert.assertTrue(reader1.hasMoreEvents());
Assert.assertNotNull(reader1.readNext());
Assert.assertTrue(reader2.hasMoreEvents());
Assert.assertNotNull(reader2.readNext());
reader1.close();
reader2.close();
writer1.get().close();
writer2.get().close();
}
}

0 comments on commit 383bf16

Please sign in to comment.