Skip to content

Commit

Permalink
[fix][broker] Restore solution for certain topic unloading race condi…
Browse files Browse the repository at this point in the history
…tions (#20527)

(cherry picked from commit 03f9167)
  • Loading branch information
lhotari committed Jun 8, 2023
1 parent 981aece commit 7c5f9b0
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1909,10 +1909,6 @@ public AuthorizationService getAuthorizationService() {
return authorizationService;
}

public CompletableFuture<Void> removeTopicFromCache(String topicName) {
return removeTopicFutureFromCache(topicName, null);
}

public CompletableFuture<Void> removeTopicFromCache(Topic topic) {
Optional<CompletableFuture<Optional<Topic>>> createTopicFuture = findTopicFutureInCache(topic);
if (!createTopicFuture.isPresent()){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1326,7 +1326,7 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
}

private void disposeTopic(CompletableFuture<?> closeFuture) {
brokerService.removeTopicFromCache(topic)
brokerService.removeTopicFromCache(PersistentTopic.this)
.thenRun(() -> {
replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception {
// (3) remove topic and managed-ledger from broker which means topic is not closed gracefully
consumer.close();
producer.close();
pulsar.getBrokerService().removeTopicFromCache(topic1);
pulsar.getBrokerService().removeTopicFromCache(topic);
ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory();
Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
field.setAccessible(true);
Expand Down Expand Up @@ -242,7 +242,7 @@ public void testSkipCorruptDataLedger() throws Exception {

// clean managed-ledger and recreate topic to clean any data from the cache
producer.close();
pulsar.getBrokerService().removeTopicFromCache(topic1);
pulsar.getBrokerService().removeTopicFromCache(topic);
ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory();
Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
field.setAccessible(true);
Expand Down

0 comments on commit 7c5f9b0

Please sign in to comment.