Skip to content

Commit

Permalink
[improve][broker]Optimize the partition size of the system theme to r…
Browse files Browse the repository at this point in the history
…educe resource consumption
  • Loading branch information
zjxxzjwang committed Dec 26, 2024
1 parent 14129e3 commit 594ec35
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 3 deletions.
5 changes: 4 additions & 1 deletion conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ allowAutoSubscriptionCreation=true
# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned.
defaultNumPartitions=1

# Default number of partitions for the system theme
systemTopicDefaultNumPartitions=1

# Enable the deletion of inactive topics. This parameter need to cooperate with the allowAutoTopicCreation parameter.
# If brokerDeleteInactiveTopicsEnabled is set to true, we should ensure that allowAutoTopicCreation is also set to true.
brokerDeleteInactiveTopicsEnabled=true
Expand Down Expand Up @@ -243,7 +246,7 @@ messageExpiryCheckIntervalInMinutes=5
# How long to delay rewinding cursor and dispatching messages when active consumer is changed
activeConsumerFailoverDelayTimeMillis=1000

# Enable consistent hashing for selecting the active consumer in partitioned topics with Failover subscription type.
# Enable consistent hashing for selecting the active consumer in partitioned topics with Failover subscription type.
# For non-partitioned topics, consistent hashing is used by default.
activeConsumerFailoverConsistentHashing=false

Expand Down
5 changes: 4 additions & 1 deletion conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ maxMessageSizeCheckIntervalInSeconds=60
# How long to delay rewinding cursor and dispatching messages when active consumer is changed
activeConsumerFailoverDelayTimeMillis=1000

# Enable consistent hashing for selecting the active consumer in partitioned topics with Failover subscription type.
# Enable consistent hashing for selecting the active consumer in partitioned topics with Failover subscription type.
# For non-partitioned topics, consistent hashing is used by default.
activeConsumerFailoverConsistentHashing=false

Expand Down Expand Up @@ -1187,6 +1187,9 @@ allowAutoSubscriptionCreation=true
# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned.
defaultNumPartitions=1

# Default number of partitions for the system theme
systemTopicDefaultNumPartitions=1

### --- Transaction config variables --- ###
# Enable transaction coordinator in broker
transactionCoordinatorEnabled=false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2167,6 +2167,12 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
+ " if allowAutoTopicCreationType is partitioned."
)
private int defaultNumPartitions = 1;
@FieldContext(
category = CATEGORY_STORAGE_ML,
dynamic = true,
doc = "Default number of partitions for the system theme."
)
private int systemTopicDefaultNumPartitions = 1;
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "The class of the managed ledger storage"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3291,12 +3291,14 @@ private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopi
Optional<Policies> policies) {
final int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName, policies);
final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
final int systemTopicDefaultNumPartitions = pulsar.getConfiguration().getSystemTopicDefaultNumPartitions();
checkArgument(defaultNumPartitions > 0,
"Default number of partitions should be more than 0");
checkArgument(maxPartitions <= 0 || defaultNumPartitions <= maxPartitions,
"Number of partitions should be less than or equal to " + maxPartitions);

PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions);
PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(
isSystemTopic(topicName) ? systemTopicDefaultNumPartitions : defaultNumPartitions);

return checkMaxTopicsPerNamespace(topicName, defaultNumPartitions)
.thenCompose(__ -> {
Expand Down

0 comments on commit 594ec35

Please sign in to comment.