Skip to content

Commit

Permalink
[fix][broker]Consumer can't consume messages because there has two sa…
Browse files Browse the repository at this point in the history
…mes topics in one broker (#17526)
  • Loading branch information
poorbarcode authored Sep 21, 2022
1 parent d1a9a82 commit 260f5c6
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1997,7 +1997,7 @@ public void cleanUnloadedTopicFromCache(NamespaceBundle serviceUnit) {
TopicName topicName = TopicName.get(topic);
if (serviceUnit.includes(topicName) && getTopicReference(topic).isPresent()) {
log.info("[{}][{}] Clean unloaded topic from cache.", serviceUnit.toString(), topic);
pulsar.getBrokerService().removeTopicFromCache(topicName.toString(), serviceUnit);
pulsar.getBrokerService().removeTopicFromCache(topicName.toString(), serviceUnit, null);
}
}
}
Expand All @@ -2006,15 +2006,56 @@ public AuthorizationService getAuthorizationService() {
return authorizationService;
}

public CompletableFuture<Void> removeTopicFromCache(String topic) {
public CompletableFuture<Void> removeTopicFromCache(String topicName) {
return removeTopicFutureFromCache(topicName, null);
}

public CompletableFuture<Void> removeTopicFromCache(Topic topic) {
Optional<CompletableFuture<Optional<Topic>>> createTopicFuture = findTopicFutureInCache(topic);
if (createTopicFuture.isEmpty()){
return CompletableFuture.completedFuture(null);
}
return removeTopicFutureFromCache(topic.getName(), createTopicFuture.get());
}

private Optional<CompletableFuture<Optional<Topic>>> findTopicFutureInCache(Topic topic){
if (topic == null){
return Optional.empty();
}
final CompletableFuture<Optional<Topic>> createTopicFuture = topics.get(topic.getName());
// If not exists in cache, do nothing.
if (createTopicFuture == null){
return Optional.empty();
}
// If the future in cache is not yet complete, the topic instance in the cache is not the same with the topic.
if (!createTopicFuture.isDone()){
return Optional.empty();
}
// If the future in cache has exception complete,
// the topic instance in the cache is not the same with the topic.
if (createTopicFuture.isCompletedExceptionally()){
return Optional.empty();
}
Optional<Topic> optionalTopic = createTopicFuture.join();
Topic topicInCache = optionalTopic.orElse(null);
if (topicInCache == null || topicInCache != topic){
return Optional.empty();
} else {
return Optional.of(createTopicFuture);
}
}

private CompletableFuture<Void> removeTopicFutureFromCache(String topic,
CompletableFuture<Optional<Topic>> createTopicFuture) {
TopicName topicName = TopicName.get(topic);
return pulsar.getNamespaceService().getBundleAsync(topicName)
.thenAccept(namespaceBundle -> {
removeTopicFromCache(topic, namespaceBundle);
removeTopicFromCache(topic, namespaceBundle, createTopicFuture);
});
}

public void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle) {
private void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle,
CompletableFuture<Optional<Topic>> createTopicFuture) {
String bundleName = namespaceBundle.toString();
String namespaceName = TopicName.get(topic).getNamespaceObject().toString();

Expand All @@ -2041,7 +2082,12 @@ public void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle)
}
}
}
topics.remove(topic);

if (createTopicFuture == null) {
topics.remove(topic);
} else {
topics.remove(topic, createTopicFuture);
}

Compactor compactor = pulsar.getNullableCompactor();
if (compactor != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean c
// topic GC iterates over topics map and removing from the map with the same thread creates
// deadlock. so, execute it in different thread
brokerService.executor().execute(() -> {
brokerService.removeTopicFromCache(topic);
brokerService.removeTopicFromCache(NonPersistentTopic.this);
unregisterTopicPolicyListener();
log.info("[{}] Topic deleted", topic);
deleteFuture.complete(null);
Expand Down Expand Up @@ -516,7 +516,7 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
// unload topic iterates over topics map and removing from the map with the same thread creates deadlock.
// so, execute it in different thread
brokerService.executor().execute(() -> {
brokerService.removeTopicFromCache(topic);
brokerService.removeTopicFromCache(NonPersistentTopic.this);
unregisterTopicPolicyListener();
closeFuture.complete(null);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1194,7 +1194,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() {
@Override
public void deleteLedgerComplete(Object ctx) {
brokerService.removeTopicFromCache(topic);
brokerService.removeTopicFromCache(PersistentTopic.this);

dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);

Expand Down Expand Up @@ -1305,7 +1305,7 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
@Override
public void closeComplete(Object ctx) {
// Everything is now closed, remove the topic from map
brokerService.removeTopicFromCache(topic)
brokerService.removeTopicFromCache(PersistentTopic.this)
.thenRun(() -> {
replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close);

Expand All @@ -1327,7 +1327,7 @@ public void closeComplete(Object ctx) {
@Override
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception);
brokerService.removeTopicFromCache(topic);
brokerService.removeTopicFromCache(PersistentTopic.this);
unregisterTopicPolicyListener();
closeFuture.complete(null);
}
Expand Down

0 comments on commit 260f5c6

Please sign in to comment.