diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index f3a308add9ef1d..6246c26e57bccc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -1152,9 +1152,9 @@ private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanDa } } - protected void waitForCleanups(boolean excludeSystemTopics, int timeoutInMillis) { + private void waitForCleanups(String broker, boolean excludeSystemTopics, int maxWaitTimeInMillis) { long started = System.currentTimeMillis(); - while (System.currentTimeMillis() - started < timeoutInMillis) { + while (System.currentTimeMillis() - started < maxWaitTimeInMillis) { boolean cleaned = true; for (var etr : tableview.entrySet()) { var serviceUnit = etr.getKey(); @@ -1164,7 +1164,7 @@ protected void waitForCleanups(boolean excludeSystemTopics, int timeoutInMillis) continue; } - if (data.state() == Owned && data.dstBroker().equals(lookupServiceAddress)) { + if (data.state() == Owned && broker.equals(data.dstBroker())) { cleaned = false; break; } @@ -1228,7 +1228,12 @@ private synchronized void doCleanup(String broker) { if (orphanServiceUnitCleanupCnt > 0) { - waitForCleanups(true, OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS); + // System bundles can contain this channel's system topic and other important system topics. + // Cleaning such system bundles(closing the system topics) together with the non-system bundles + // can cause the cluster to be temporarily unstable. + // Hence, we clean the non-system bundles first and gracefully wait for them. + // After that, we clean the system bundles, if any. + waitForCleanups(broker, true, OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS); this.totalOrphanServiceUnitCleanupCnt += orphanServiceUnitCleanupCnt; this.totalInactiveBrokerCleanupCnt++; }