Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] partitioned __change_events topic is policy topic #20392

Merged
merged 2 commits into from
May 25, 2023

Conversation

michaeljmarshall
Copy link
Member

Fixes #20376

Motivation

The __change_events system topic is created in each namespace. It is used for topic policies. The SystemTopicNames#isSystemTopic classifies partitioned and non-partitioned __change_events topics as system topics. The SystemTopicNames#isTopicPoliciesSystemTopic only classifies non-partitioned __change_events topics as topic policies system topics. I think this is a bug, and we need to make sure that parititioned __change_events topics are topic policies system topics.

Note: I am not sure that it is recommended to use a partitioned topic for the topic policies, but for backwards compatibility, I think we have to classify a partitioned __change_events topic as a topic policies topic. Since #10850, we test partitioned __change_events topics.

The AdminApi2Test class fails frequently with an error associated with managed ledgers not being deleted yet. That error may be because of an incomplete classification of the change events system topic name. I say this because the test fails at tenant deletion, and tenant deletion appears to rely on correct classification of the topic policies topic:

if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
partitionedTopicPolicy.add(topic);
} else {
allPartitionedSystemTopics.add(topic);
}

Modifications

  • Update SystemTopicNames#isTopicPoliciesSystemTopic so that it considers a partitioned __change_events topic to be a topic policies system topic.
  • Update PersistentTopic so that it does not do its own classification and instead relies on the SystemTopicNames class.
  • Fix typo in SystemTopicNames#isSystemTopic.
  • Add test

Verifying this change

A new test is added.

Documentation

  • doc-not-needed

Matching PR in forked repository

PR in forked repository: Skipping for this minor change

@michaeljmarshall michaeljmarshall added type/bug The PR fixed a bug or issue reported a bug area/broker doc-not-needed Your PR changes do not impact docs ready-to-test labels May 24, 2023
@michaeljmarshall michaeljmarshall added this to the 3.1.0 milestone May 24, 2023
@michaeljmarshall michaeljmarshall self-assigned this May 24, 2023
Copy link
Contributor

@315157973 315157973 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@michaeljmarshall
Copy link
Member Author

I still think this is the right change, but it is odd that the net effect is to change when we delete the topic policy topic. See:

for (String topic : allPartitionedTopics) {
if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
hasNonSystemTopic = true;
allUserCreatedPartitionTopics.add(topic);
} else {
if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
partitionedTopicPolicy.add(topic);
} else {
allPartitionedSystemTopics.add(topic);
}
}
}
if (!force) {
if (hasNonSystemTopic) {
throw new RestException(Status.CONFLICT, "Cannot delete non empty namespace");
}
}
final CompletableFuture<Void> markDeleteFuture;
if (policies != null && policies.deleted) {
markDeleteFuture = CompletableFuture.completedFuture(null);
} else {
markDeleteFuture = namespaceResources().setPoliciesAsync(namespaceName, old -> {
old.deleted = true;
return old;
});
}
return markDeleteFuture.thenCompose(__ ->
internalDeleteTopicsAsync(allUserCreatedTopics))
.thenCompose(ignore ->
internalDeletePartitionedTopicsAsync(allUserCreatedPartitionTopics))
.thenCompose(ignore ->
internalDeleteTopicsAsync(allSystemTopics))
.thenCompose(ignore ->
internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
.thenCompose(ignore ->
internalDeleteTopicsAsync(topicPolicy))
.thenCompose(ignore ->
internalDeletePartitionedTopicsAsync(partitionedTopicPolicy));

@codecov-commenter
Copy link

Codecov Report

Merging #20392 (f852521) into master (3f2978d) will increase coverage by 36.11%.
The diff coverage is 100.00%.

Impacted file tree graph

@@              Coverage Diff              @@
##             master   #20392       +/-   ##
=============================================
+ Coverage     36.82%   72.94%   +36.11%     
- Complexity    12064    31878    +19814     
=============================================
  Files          1687     1864      +177     
  Lines        128856   138416     +9560     
  Branches      14018    15188     +1170     
=============================================
+ Hits          47456   100968    +53512     
+ Misses        75154    29440    -45714     
- Partials       6246     8008     +1762     
Flag Coverage Δ
inttests 24.19% <100.00%> (-0.01%) ⬇️
systests 25.14% <85.71%> (+0.15%) ⬆️
unittests 72.21% <100.00%> (+40.22%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...mon/policies/data/stats/SubscriptionStatsImpl.java 69.56% <ø> (ø)
...sar/common/policies/data/stats/TopicStatsImpl.java 90.44% <ø> (+36.30%) ⬆️
...org/apache/pulsar/broker/ServiceConfiguration.java 99.37% <100.00%> (+1.35%) ⬆️
...sar/broker/service/persistent/PersistentTopic.java 79.55% <100.00%> (+24.84%) ⬆️
.../apache/pulsar/common/naming/SystemTopicNames.java 77.77% <100.00%> (+11.11%) ⬆️
...he/pulsar/metadata/impl/AbstractMetadataStore.java 85.38% <100.00%> (+32.67%) ⬆️

... and 1428 files with indirect coverage changes

@michaeljmarshall
Copy link
Member Author

It's possible that an important part of the fix is here:

if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic)
&& brokerService.getPulsar().getConfiguration().isSystemTopicEnabled()) {
return deleteTopicPolicies();
} else {
return CompletableFuture.completedFuture(null);
}

With this change, we won't send a notification that the __change_events-partition-x topic has been deleted.

@michaeljmarshall
Copy link
Member Author

I still think this is the right change, but it is odd that the net effect is to change when we delete the topic policy topic. See:

for (String topic : allPartitionedTopics) {
if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
hasNonSystemTopic = true;
allUserCreatedPartitionTopics.add(topic);
} else {
if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
partitionedTopicPolicy.add(topic);
} else {
allPartitionedSystemTopics.add(topic);
}
}
}
if (!force) {
if (hasNonSystemTopic) {
throw new RestException(Status.CONFLICT, "Cannot delete non empty namespace");
}
}
final CompletableFuture<Void> markDeleteFuture;
if (policies != null && policies.deleted) {
markDeleteFuture = CompletableFuture.completedFuture(null);
} else {
markDeleteFuture = namespaceResources().setPoliciesAsync(namespaceName, old -> {
old.deleted = true;
return old;
});
}
return markDeleteFuture.thenCompose(__ ->
internalDeleteTopicsAsync(allUserCreatedTopics))
.thenCompose(ignore ->
internalDeletePartitionedTopicsAsync(allUserCreatedPartitionTopics))
.thenCompose(ignore ->
internalDeleteTopicsAsync(allSystemTopics))
.thenCompose(ignore ->
internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
.thenCompose(ignore ->
internalDeleteTopicsAsync(topicPolicy))
.thenCompose(ignore ->
internalDeletePartitionedTopicsAsync(partitionedTopicPolicy));

It looks like the order is likely the key to this bug fix. When we mis-classify the topic policies topic (as we do before this fix), we likely do the following:

  1. Delete the topic policies topic partitions.
  2. Delete some other topic, which triggers a DELETE notification to be sent by the broker's local TopicPolicyWriter.
  3. That deletion event triggers a producer to connect to the change events topic.
  4. Because we always allow system topics to be auto created, the topic is created:
    //System topic can always be created automatically
    if (pulsar.getConfiguration().isSystemTopicEnabled() && isSystemTopic(topicName)) {
    return CompletableFuture.completedFuture(true);
    }

Then, when we go to delete the tenant (in the flaky test), we get a failure because there is a managed ledger, and that will happen to be the one for the TopicPolicyWriter.

@michaeljmarshall
Copy link
Member Author

Note that #17609 attempted to prevent the conditions I describe above. However, I think the key mistake may be in the logic made here:

private CompletableFuture<Boolean> isAllowAutoTopicCreationAsync(final TopicName topicName,
final Optional<Policies> policies) {
if (policies.isPresent() && policies.get().deleted) {
log.info("Preventing AutoTopicCreation on a namespace that is being deleted {}",
topicName.getNamespaceObject());
return CompletableFuture.completedFuture(false);
}
//System topic can always be created automatically
if (pulsar.getConfiguration().isSystemTopicEnabled() && isSystemTopic(topicName)) {
return CompletableFuture.completedFuture(true);
}

That policies object is fetched earlier in the code as:

Optional<Policies> policies =
pulsar.getPulsarResources().getNamespaceResources()
.getPoliciesIfCached(topicName.getNamespaceObject());

It would seem that getting the policies only when cached presents a problem because the notification that the namespace is being deleted might not actually be present on the broker that receives the lookup request. Perhaps we should update that logic to get the latest version of the namespace policies object.

@michaeljmarshall
Copy link
Member Author

Now, I am wondering if #18220 was necessary because of this bug.

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Good work @michaeljmarshall

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch

@nicoloboschi nicoloboschi merged commit 9918bce into apache:master May 25, 2023
@michaeljmarshall michaeljmarshall deleted the fix-event-topic-names branch May 25, 2023 13:51
michaeljmarshall added a commit that referenced this pull request May 25, 2023
michaeljmarshall added a commit that referenced this pull request May 25, 2023
lhotari pushed a commit that referenced this pull request May 26, 2023
(cherry picked from commit 9918bce)

backported to branch-2.10 by renaming SystemTopicsNames to EventsTopicNames
lhotari pushed a commit to datastax/pulsar that referenced this pull request May 26, 2023
…he#20392)

(cherry picked from commit 9918bce)

backported to branch-2.10 by renaming SystemTopicsNames to EventsTopicNames

(cherry picked from commit 323d112)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Flaky-test: AdminApi2Test.resetClusters
7 participants