Skip to content

Commit

Permalink
[fix][broker]Consumer can't consume messages because there has two sa…
Browse files Browse the repository at this point in the history
…mes topics in one broker
  • Loading branch information
poorbarcode committed Sep 14, 2022
1 parent e771fc8 commit 235d13c
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1997,7 +1997,7 @@ public void cleanUnloadedTopicFromCache(NamespaceBundle serviceUnit) {
TopicName topicName = TopicName.get(topic);
if (serviceUnit.includes(topicName) && getTopicReference(topic).isPresent()) {
log.info("[{}][{}] Clean unloaded topic from cache.", serviceUnit.toString(), topic);
pulsar.getBrokerService().removeTopicFromCache(topicName.toString(), serviceUnit);
pulsar.getBrokerService().removeTopicFromCache(topicName.toString(), serviceUnit, null);
}
}
}
Expand All @@ -2006,15 +2006,43 @@ public AuthorizationService getAuthorizationService() {
return authorizationService;
}

public CompletableFuture<Void> removeTopicFromCache(String topic) {
public CompletableFuture<Void> removeTopicFromCache(String topicNameString, Topic topic) {
if (topic == null){
return removeTopicFutureFromCache(topicNameString, null);
}
final CompletableFuture<Optional<Topic>> createTopicFuture = topics.get(topicNameString);
// If not exists in cache, do nothing.
if (createTopicFuture == null){
return CompletableFuture.completedFuture(null);
}
// If the future in cache is not yet complete, the topic instance in the cache is not the same with the topic
// in the argument. Do nothing.
if (!createTopicFuture.isDone()){
return CompletableFuture.completedFuture(null);
}
return createTopicFuture.thenCompose(topicOptional -> {
Topic topicInCache = topicOptional.orElse(null);
// If @param topic is not equals with cached, do nothing.
if (topicInCache == null || topicInCache != topic){
return CompletableFuture.completedFuture(null);
} else {
// Do remove.
return removeTopicFutureFromCache(topicNameString, createTopicFuture);
}
});
}

private CompletableFuture<Void> removeTopicFutureFromCache(String topic,
CompletableFuture<Optional<Topic>> createTopicFuture) {
TopicName topicName = TopicName.get(topic);
return pulsar.getNamespaceService().getBundleAsync(topicName)
.thenAccept(namespaceBundle -> {
removeTopicFromCache(topic, namespaceBundle);
removeTopicFromCache(topic, namespaceBundle, createTopicFuture);
});
}

public void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle) {
public void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle,
CompletableFuture<Optional<Topic>> createTopicFuture) {
String bundleName = namespaceBundle.toString();
String namespaceName = TopicName.get(topic).getNamespaceObject().toString();

Expand All @@ -2041,7 +2069,12 @@ public void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle)
}
}
}
topics.remove(topic);

if (createTopicFuture == null) {
topics.remove(topic);
} else {
topics.remove(topic, createTopicFuture);
}

Compactor compactor = pulsar.getNullableCompactor();
if (compactor != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean c
// topic GC iterates over topics map and removing from the map with the same thread creates
// deadlock. so, execute it in different thread
brokerService.executor().execute(() -> {
brokerService.removeTopicFromCache(topic);
brokerService.removeTopicFromCache(topic, NonPersistentTopic.this);
unregisterTopicPolicyListener();
log.info("[{}] Topic deleted", topic);
deleteFuture.complete(null);
Expand Down Expand Up @@ -516,7 +516,7 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
// unload topic iterates over topics map and removing from the map with the same thread creates deadlock.
// so, execute it in different thread
brokerService.executor().execute(() -> {
brokerService.removeTopicFromCache(topic);
brokerService.removeTopicFromCache(topic, NonPersistentTopic.this);
unregisterTopicPolicyListener();
closeFuture.complete(null);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1189,7 +1189,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() {
@Override
public void deleteLedgerComplete(Object ctx) {
brokerService.removeTopicFromCache(topic);
brokerService.removeTopicFromCache(topic, PersistentTopic.this);

dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);

Expand Down Expand Up @@ -1300,7 +1300,7 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
@Override
public void closeComplete(Object ctx) {
// Everything is now closed, remove the topic from map
brokerService.removeTopicFromCache(topic)
brokerService.removeTopicFromCache(topic, PersistentTopic.this)
.thenRun(() -> {
replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close);

Expand All @@ -1322,7 +1322,7 @@ public void closeComplete(Object ctx) {
@Override
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception);
brokerService.removeTopicFromCache(topic);
brokerService.removeTopicFromCache(topic, PersistentTopic.this);
closeFuture.complete(null);
}
}, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception {
// (3) remove topic and managed-ledger from broker which means topic is not closed gracefully
consumer.close();
producer.close();
pulsar.getBrokerService().removeTopicFromCache(topic1);
pulsar.getBrokerService().removeTopicFromCache(topic1, null);
ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory();
Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
field.setAccessible(true);
Expand Down Expand Up @@ -252,7 +252,7 @@ public void testSkipCorruptDataLedger() throws Exception {

// clean managed-ledger and recreate topic to clean any data from the cache
producer.close();
pulsar.getBrokerService().removeTopicFromCache(topic1);
pulsar.getBrokerService().removeTopicFromCache(topic1, null);
ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory();
Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
field.setAccessible(true);
Expand Down

0 comments on commit 235d13c

Please sign in to comment.