-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker] Fix future of clear delayed message can't complete #20075
[fix][broker] Fix future of clear delayed message can't complete #20075
Conversation
clear delayed message uncomplete
@coderzc we are now in code freeze for 3.0. So unless this is a blocker for the release, this change should target 3.0.1/3.1. Please change the milestone again to 3.0 if you believe that's a blocker. |
This bug will lead to a topic can't be deleted until restart broker, so I think it is a blocker. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a test?
try { | ||
if (delayedDeliveryTracker.isEmpty()) { | ||
delayedDeliveryTracker = Optional | ||
.of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we never touch the method trackDelayedDelivery()
, then the variable delayedDeliveryTracker
is always is empty
, right?
If yes, why need we initialize it when we should clear delayed messages? Can we just return a completed future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@poorbarcode Because the dispatcher was closed before unsubscribe
was called, this lead to the tracker reference being lost, so we need to reinitialize the tracker to clean up residual persistent data.
Please see: line-1320、line-1356
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
Lines 1320 to 1357 in 4f503fd
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect())); | |
if (closeIfClientsConnected) { | |
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); | |
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect())); | |
producers.values().forEach(producer -> futures.add(producer.disconnect())); | |
} | |
FutureUtil.waitForAll(futures).thenRunAsync(() -> { | |
closeClientFuture.complete(null); | |
}, getOrderedExecutor()).exceptionally(ex -> { | |
log.error("[{}] Error closing clients", topic, ex); | |
unfenceTopicToResume(); | |
closeClientFuture.completeExceptionally(ex); | |
return null; | |
}); | |
closeClientFuture.thenAccept(__ -> { | |
CompletableFuture<Void> deleteTopicAuthenticationFuture = new CompletableFuture<>(); | |
brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5); | |
deleteTopicAuthenticationFuture.thenCompose(ignore -> deleteSchema()) | |
.thenCompose(ignore -> { | |
if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic) | |
&& brokerService.getPulsar().getConfiguration().isSystemTopicEnabled()) { | |
return deleteTopicPolicies(); | |
} else { | |
return CompletableFuture.completedFuture(null); | |
} | |
}) | |
.thenCompose(ignore -> transactionBufferCleanupAndClose()) | |
.whenComplete((v, ex) -> { | |
if (ex != null) { | |
log.error("[{}] Error deleting topic", topic, ex); | |
unfenceTopicToResume(); | |
deleteFuture.completeExceptionally(ex); | |
} else { | |
List<CompletableFuture<Void>> subsDeleteFutures = new ArrayList<>(); | |
subscriptions.forEach((sub, p) -> subsDeleteFutures.add(unsubscribe(sub))); | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the dispatcher was closed before unsubscribe was called, this lead to the tracker reference being lost, so we need to reinitialize the tracker to clean up residual persistent data.
I see. Another question: the method new tracker
will call recover
internally, right? If yes, I feel it is an expensive task. Can we only set the Tracker to an unavailable state when closing the dispatcher to avoid recover the second time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, I think we should use another way to clean up data to avoid unnecessary recover, such as add a clean method in the factory.
@poorbarcode I use |
19ff631
to
ad34c39
Compare
Codecov Report
@@ Coverage Diff @@
## master #20075 +/- ##
=============================================
+ Coverage 35.58% 72.93% +37.34%
- Complexity 12423 31901 +19478
=============================================
Files 1691 1868 +177
Lines 128771 138309 +9538
Branches 14044 15211 +1167
=============================================
+ Hits 45822 100874 +55052
+ Misses 76931 29422 -47509
- Partials 6018 8013 +1995
Flags with carried forward coverage won't be shown. Click here to find out more.
|
PIP: #16763
Motivation
If
newTracker()
throws an exception, then the future will can't complete forever, this will lead to the topic can't be deleted until restart broker.Modifications
Use
cleanResidualSnapshots
instead of creating a tracker to clear residual snapshots if the tracker does not exist.Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: