Skip to content

Commit

Permalink
[improve][broker] System topic writer/reader connection not counted (a…
Browse files Browse the repository at this point in the history
…pache#18603)

This PR is a supplement to apache#18369.
- `AbstractTopic.isSameAddressProducersExceeded()`
- `AbstractBaseDispatcher.isConsumersExceededOnSubscription()`

(cherry picked from commit b33bff9)
  • Loading branch information
yuruguo authored and nicoloboschi committed Jan 11, 2023
1 parent e39c370 commit b7a3e7b
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,12 @@ protected void acquirePermitsForDeliveredMessages(Topic topic, ManagedCursor cur
protected abstract boolean isConsumersExceededOnSubscription();

protected boolean isConsumersExceededOnSubscription(AbstractTopic topic, int consumerSize) {
if (topic.isSystemTopic()) {
return false;
}
Integer maxConsumersPerSubscription = topic.getHierarchyTopicPolicies().getMaxConsumersPerSubscription().get();
return maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerSize;
return maxConsumersPerSubscription != null && maxConsumersPerSubscription > 0
&& maxConsumersPerSubscription <= consumerSize;
}

private void processReplicatedSubscriptionSnapshot(PositionImpl pos, ByteBuf headersAndPayload) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,9 @@ protected void unregisterTopicPolicyListener() {
}

protected boolean isSameAddressProducersExceeded(Producer producer) {
if (isSystemTopic() || producer.isRemote()) {
return false;
}
final int maxSameAddressProducers = brokerService.pulsar().getConfiguration()
.getMaxSameAddressProducersPerTopic();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ public void testSystemTopicNotCheckExceed() throws Exception {
admin.namespaces().createNamespace(ns, 2);
admin.topics().createPartitionedTopic(String.format("persistent://%s", topic), 1);

conf.setMaxSameAddressConsumersPerTopic(1);
admin.namespaces().setMaxConsumersPerTopic(ns, 1);
admin.topicPolicies().setMaxConsumers(topic, 1);
NamespaceEventsSystemTopicFactory systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient);
Expand All @@ -274,8 +275,9 @@ public void testSystemTopicNotCheckExceed() throws Exception {
SystemTopicClient.Reader reader1 = systemTopicClientForNamespace.newReader();
SystemTopicClient.Reader reader2 = systemTopicClientForNamespace.newReader();

conf.setMaxSameAddressProducersPerTopic(1);
admin.namespaces().setMaxProducersPerTopic(ns, 1);
admin.topicPolicies().setMaxProducers(topic, 1);

CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer1 = systemTopicClientForNamespace.newWriterAsync();
CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer2 = systemTopicClientForNamespace.newWriterAsync();
CompletableFuture<Void> f1 = admin.topicPolicies().setCompactionThresholdAsync(topic, 1L);
Expand Down

0 comments on commit b7a3e7b

Please sign in to comment.