From 4ee4b9975ecf9fb637a6592f6267864b2e0221f5 Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Fri, 12 May 2023 16:54:00 -0700 Subject: [PATCH 1/4] [improve][broker] Gracefully shut down load balancer extension --- .../apache/pulsar/broker/PulsarService.java | 22 ++++-- .../extensions/ExtensibleLoadManagerImpl.java | 16 +++++ .../ExtensibleLoadManagerWrapper.java | 2 +- .../channel/ServiceUnitStateChannel.java | 5 ++ .../channel/ServiceUnitStateChannelImpl.java | 69 +++++++++++++++---- .../ExtensibleLoadManagerImplTest.java | 45 ++++++++++++ .../channel/ServiceUnitStateChannelTest.java | 10 +-- 7 files changed, 143 insertions(+), 26 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 5fc9920d0f215..2f74319df9166 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -382,6 +382,17 @@ public void closeMetadataServiceSession() throws Exception { localMetadataStore.close(); } + private void closeLeaderElectionService() throws Exception { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { + ExtensibleLoadManagerImpl.get(loadManager.get()).getLeaderElectionService().close(); + } else { + if (this.leaderElectionService != null) { + this.leaderElectionService.close(); + this.leaderElectionService = null; + } + } + } + @Override public void close() throws PulsarServerException { try { @@ -502,10 +513,7 @@ public CompletableFuture closeAsync() { this.bkClientFactory = null; } - if (this.leaderElectionService != null) { - this.leaderElectionService.close(); - this.leaderElectionService = null; - } + closeLeaderElectionService(); if (adminClient != null) { adminClient.close(); @@ -1316,7 +1324,11 @@ public boolean isRunning() { * @return a reference of the current LeaderElectionService instance. */ public LeaderElectionService getLeaderElectionService() { - return this.leaderElectionService; + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { + return ExtensibleLoadManagerImpl.get(loadManager.get()).getLeaderElectionService(); + } else { + return this.leaderElectionService; + } } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 4ebf537f7a8a8..3c7fdf060c386 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -380,12 +380,22 @@ public CompletableFuture> assign(Optional> selectAsync(ServiceUnitId bundle) { + return selectAsync(bundle, Optional.empty()); + } + + public CompletableFuture> selectAsync(ServiceUnitId bundle, + Optional> excludeBrokerSet) { BrokerRegistry brokerRegistry = getBrokerRegistry(); return brokerRegistry.getAvailableBrokerLookupDataAsync() .thenCompose(availableBrokers -> { LoadManagerContext context = this.getContext(); Map availableBrokerCandidates = new HashMap<>(availableBrokers); + if (excludeBrokerSet.isPresent()) { + for (String exclude : excludeBrokerSet.get()) { + availableBrokerCandidates.remove(exclude); + } + } // Filter out brokers that do not meet the rules. List filterPipeline = getBrokerFilterPipeline(); @@ -685,4 +695,10 @@ private void monitor() { log.error("Failed to get the channel ownership.", e); } } + + public void disableBroker() throws Exception { + serviceUnitStateChannel.cleanOwnerships(); + leaderElectionService.close(); + brokerRegistry.unregister(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java index 1eabbe620e213..18e949537dedb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java @@ -74,7 +74,7 @@ public CompletableFuture checkOwnershipAsync(Optional to @Override public void disableBroker() throws Exception { - this.loadManager.getBrokerRegistry().unregister(); + this.loadManager.disableBroker(); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java index 4782a31fe0c56..6e75fe91a914f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java @@ -206,4 +206,9 @@ public interface ServiceUnitStateChannel extends Closeable { * Cancels the ownership monitor. */ void cancelOwnershipMonitor(); + + /** + * Cleans the service unit ownerships from the current broker's channel. + */ + void cleanOwnerships(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index b4c4e7fd5d42a..13c653973ad3e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -110,6 +110,10 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { public static final CompressionType MSG_COMPRESSION_TYPE = CompressionType.ZSTD; private static final long MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS = 30 * 1000; // 30sec + + private static final int OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS = 5000; + private static final int OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS = 100; + private static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 3000; public static final long VERSION_ID_INIT = 1; // initial versionId private static final long OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS = 60; public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3 mins @@ -694,6 +698,8 @@ private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) { if (isTargetBroker(data.dstBroker())) { log(null, serviceUnit, data, null); lastOwnEventHandledAt = System.currentTimeMillis(); + } else if (data.force() && isTargetBroker(data.sourceBroker())) { + closeServiceUnit(serviceUnit); } } @@ -1108,21 +1114,23 @@ private void scheduleCleanup(String broker, long delayInSecs) { private ServiceUnitStateData getOverrideInactiveBrokerStateData(ServiceUnitStateData orphanData, - String selectedBroker) { + String selectedBroker, + String inactiveBroker) { if (orphanData.state() == Splitting) { return new ServiceUnitStateData(Splitting, orphanData.dstBroker(), selectedBroker, Map.copyOf(orphanData.splitServiceUnitToDestBroker()), true, getNextVersionId(orphanData)); } else { - return new ServiceUnitStateData(Owned, selectedBroker, true, getNextVersionId(orphanData)); + return new ServiceUnitStateData(Owned, selectedBroker, inactiveBroker, + true, getNextVersionId(orphanData)); } } - private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanData) { - - Optional selectedBroker = selectBroker(serviceUnit); + private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanData, String inactiveBroker) { + Optional selectedBroker = selectBroker(serviceUnit, inactiveBroker); if (selectedBroker.isPresent()) { - var override = getOverrideInactiveBrokerStateData(orphanData, selectedBroker.get()); + var override = getOverrideInactiveBrokerStateData( + orphanData, selectedBroker.get(), inactiveBroker); log.info("Overriding ownership serviceUnit:{} from orphanData:{} to overrideData:{}", serviceUnit, orphanData, override); publishOverrideEventAsync(serviceUnit, orphanData, override) @@ -1140,8 +1148,37 @@ private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanDa } } + public void cleanOwnerships() { + doCleanup(lookupServiceAddress); + long started = System.currentTimeMillis(); + while (System.currentTimeMillis() - started < OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS) { + boolean cleaned = true; + for (var data : tableview.values()) { + if (data.state() == Owned && data.dstBroker().equals(lookupServiceAddress)) { + cleaned = false; + break; + } + } + if (cleaned) { + try { + MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS); + } catch (InterruptedException e) { + log.warn("Interrupted while gracefully waiting for the cleanup convergence."); + } + break; + } else { + try { + MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS); + } catch (InterruptedException e) { + log.warn("Interrupted while delaying the next service unit clean-up. Cleaning broker:{}", + lookupServiceAddress); + } + } + } + } + - private void doCleanup(String broker) { + private synchronized void doCleanup(String broker) { long startTime = System.nanoTime(); log.info("Started ownership cleanup for the inactive broker:{}", broker); int orphanServiceUnitCleanupCnt = 0; @@ -1153,13 +1190,13 @@ private void doCleanup(String broker) { var state = state(stateData); if (StringUtils.equals(broker, stateData.dstBroker())) { if (isActiveState(state)) { - overrideOwnership(serviceUnit, stateData); + overrideOwnership(serviceUnit, stateData, broker); orphanServiceUnitCleanupCnt++; } } else if (StringUtils.equals(broker, stateData.sourceBroker())) { if (isInFlightState(state)) { - overrideOwnership(serviceUnit, stateData); + overrideOwnership(serviceUnit, stateData, broker); orphanServiceUnitCleanupCnt++; } } @@ -1194,9 +1231,9 @@ private void doCleanup(String broker) { } - private Optional selectBroker(String serviceUnit) { + private Optional selectBroker(String serviceUnit, String inactiveBroker) { try { - return loadManager.selectAsync(getNamespaceBundle(serviceUnit)) + return loadManager.selectAsync(getNamespaceBundle(serviceUnit), Optional.of(Set.of(inactiveBroker))) .get(inFlightStateWaitingTimeInMillis, MILLISECONDS); } catch (Throwable e) { log.error("Failed to select a broker for serviceUnit:{}", serviceUnit); @@ -1204,8 +1241,10 @@ private Optional selectBroker(String serviceUnit) { return Optional.empty(); } - private Optional getRollForwardStateData(String serviceUnit, long nextVersionId) { - Optional selectedBroker = selectBroker(serviceUnit); + private Optional getRollForwardStateData(String serviceUnit, + String inactiveBroker, + long nextVersionId) { + Optional selectedBroker = selectBroker(serviceUnit, inactiveBroker); if (selectedBroker.isEmpty()) { return Optional.empty(); } @@ -1220,7 +1259,7 @@ private Optional getOverrideInFlightStateData( var state = orphanData.state(); switch (state) { case Assigning: { - return getRollForwardStateData(serviceUnit, nextVersionId); + return getRollForwardStateData(serviceUnit, orphanData.dstBroker(), nextVersionId); } case Splitting: { return Optional.of(new ServiceUnitStateData(Splitting, @@ -1233,7 +1272,7 @@ private Optional getOverrideInFlightStateData( // rollback to the src return Optional.of(new ServiceUnitStateData(Owned, orphanData.sourceBroker(), true, nextVersionId)); } else { - return getRollForwardStateData(serviceUnit, nextVersionId); + return getRollForwardStateData(serviceUnit, orphanData.sourceBroker(), nextVersionId); } } default: { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index b71eeb4745b87..b131aff68d909 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -880,6 +880,51 @@ SplitDecision.Reason.Unknown, new AtomicLong(6)) assertEquals(actual, expected); } + @Test + public void testDisableBroker() throws Exception { + // Test rollback to modular load manager. + ServiceConfiguration defaultConf = getDefaultConf(); + defaultConf.setAllowAutoTopicCreation(true); + defaultConf.setForceDeleteNamespaceAllowed(true); + defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); + defaultConf.setLoadBalancerSheddingEnabled(false); + try (var additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf)) { + var pulsar3 = additionalPulsarTestContext.getPulsarService(); + ExtensibleLoadManagerImpl ternaryLoadManager = spy((ExtensibleLoadManagerImpl) + FieldUtils.readField(pulsar3.getLoadManager().get(), "loadManager", true)); + String topic = "persistent://public/default/test"; + + String lookupResult1 = pulsar3.getAdminClient().lookups().lookupTopic(topic); + TopicName topicName = TopicName.get("test"); + NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get(); + if (!pulsar3.getBrokerServiceUrl().equals(lookupResult1)) { + admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), bundle.getBundleRange(), + pulsar3.getLookupServiceAddress()); + lookupResult1 = pulsar2.getAdminClient().lookups().lookupTopic(topic); + } + String lookupResult2 = pulsar1.getAdminClient().lookups().lookupTopic(topic); + String lookupResult3 = pulsar2.getAdminClient().lookups().lookupTopic(topic); + + assertEquals(lookupResult1, pulsar3.getBrokerServiceUrl()); + assertEquals(lookupResult1, lookupResult2); + assertEquals(lookupResult1, lookupResult3); + + + assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + assertTrue(ternaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + + ternaryLoadManager.disableBroker(); + + assertFalse(ternaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + if (primaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()) { + assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + } else { + assertTrue(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + } + } + } + private static abstract class MockBrokerFilter implements BrokerFilter { @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index 77c80187a63e1..75d13a8be57c0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -515,7 +515,7 @@ public void transferTestWhenDestBrokerFails() // recovered, check the monitor update state : Assigned -> Owned doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1))) - .when(loadManager).selectAsync(any()); + .when(loadManager).selectAsync(any(), any()); FieldUtils.writeDeclaredField(channel2, "producer", producer, true); FieldUtils.writeDeclaredField(channel1, "inFlightStateWaitingTimeInMillis", 1 , true); @@ -735,7 +735,7 @@ public void handleBrokerDeletionEventTest() var owner1 = channel1.getOwnerAsync(bundle1); var owner2 = channel2.getOwnerAsync(bundle2); doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2))) - .when(loadManager).selectAsync(any()); + .when(loadManager).selectAsync(any(), any()); assertTrue(owner1.get().isEmpty()); assertTrue(owner2.get().isEmpty()); @@ -1101,7 +1101,7 @@ public void assignTestWhenDestBrokerProducerFails() FieldUtils.writeDeclaredField(channel2, "inFlightStateWaitingTimeInMillis", 3 * 1000, true); doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2))) - .when(loadManager).selectAsync(any()); + .when(loadManager).selectAsync(any(), any()); channel1.publishAssignEventAsync(bundle, lookupServiceAddress2); // channel1 is broken. the assign won't be complete. waitUntilState(channel1, bundle); @@ -1440,7 +1440,7 @@ public void testOverrideInactiveBrokerStateData() // test stable metadata state doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2))) - .when(loadManager).selectAsync(any()); + .when(loadManager).selectAsync(any(), any()); leaderChannel.handleMetadataSessionEvent(SessionReestablished); followerChannel.handleMetadataSessionEvent(SessionReestablished); FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp", @@ -1505,7 +1505,7 @@ public void testOverrideOrphanStateData() // test stable metadata state doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2))) - .when(loadManager).selectAsync(any()); + .when(loadManager).selectAsync(any(), any()); FieldUtils.writeDeclaredField(leaderChannel, "inFlightStateWaitingTimeInMillis", -1, true); FieldUtils.writeDeclaredField(followerChannel, "inFlightStateWaitingTimeInMillis", From 2e8eb09db783948e94a276e36caf874094edb967 Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Sat, 13 May 2023 08:15:18 -0700 Subject: [PATCH 2/4] replaced optional --- .../extensions/ExtensibleLoadManagerImpl.java | 9 +++++---- .../extensions/channel/ServiceUnitStateChannelImpl.java | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 3c7fdf060c386..cbaed8ee8f94f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -380,19 +381,19 @@ public CompletableFuture> assign(Optional> selectAsync(ServiceUnitId bundle) { - return selectAsync(bundle, Optional.empty()); + return selectAsync(bundle, Collections.emptySet()); } public CompletableFuture> selectAsync(ServiceUnitId bundle, - Optional> excludeBrokerSet) { + Set excludeBrokerSet) { BrokerRegistry brokerRegistry = getBrokerRegistry(); return brokerRegistry.getAvailableBrokerLookupDataAsync() .thenCompose(availableBrokers -> { LoadManagerContext context = this.getContext(); Map availableBrokerCandidates = new HashMap<>(availableBrokers); - if (excludeBrokerSet.isPresent()) { - for (String exclude : excludeBrokerSet.get()) { + if (!excludeBrokerSet.isEmpty()) { + for (String exclude : excludeBrokerSet) { availableBrokerCandidates.remove(exclude); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 13c653973ad3e..c3603d635bc12 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -1233,7 +1233,7 @@ private synchronized void doCleanup(String broker) { private Optional selectBroker(String serviceUnit, String inactiveBroker) { try { - return loadManager.selectAsync(getNamespaceBundle(serviceUnit), Optional.of(Set.of(inactiveBroker))) + return loadManager.selectAsync(getNamespaceBundle(serviceUnit), Set.of(inactiveBroker)) .get(inFlightStateWaitingTimeInMillis, MILLISECONDS); } catch (Throwable e) { log.error("Failed to select a broker for serviceUnit:{}", serviceUnit); From 7b241bbec1ff05cc8e61a512bd881e107a63d79a Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Tue, 16 May 2023 20:42:15 -0700 Subject: [PATCH 3/4] delay system bundle cleanup --- .../channel/ServiceUnitStateChannelImpl.java | 52 +++++++++++++++---- 1 file changed, 42 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index c3603d635bc12..f3a308add9ef1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -91,7 +91,6 @@ import org.apache.pulsar.common.naming.NamespaceBundleFactory; import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.naming.NamespaceBundles; -import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.stats.Metrics; @@ -261,6 +260,11 @@ public void cancelOwnershipMonitor() { } } + @Override + public void cleanOwnerships() { + doCleanup(lookupServiceAddress); + } + public synchronized void start() throws PulsarServerException { if (!validateChannelState(LeaderElectionServiceStarted, false)) { throw new IllegalStateException("Invalid channel state:" + channelState.name()); @@ -288,7 +292,7 @@ public synchronized void start() throws PulsarServerException { } } PulsarClusterMetadataSetup.createNamespaceIfAbsent - (pulsar.getPulsarResources(), NamespaceName.SYSTEM_NAMESPACE, config.getClusterName()); + (pulsar.getPulsarResources(), SYSTEM_NAMESPACE, config.getClusterName()); producer = pulsar.getClient().newProducer(schema) .enableBatching(true) @@ -1148,12 +1152,18 @@ private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanDa } } - public void cleanOwnerships() { - doCleanup(lookupServiceAddress); + protected void waitForCleanups(boolean excludeSystemTopics, int timeoutInMillis) { long started = System.currentTimeMillis(); - while (System.currentTimeMillis() - started < OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS) { + while (System.currentTimeMillis() - started < timeoutInMillis) { boolean cleaned = true; - for (var data : tableview.values()) { + for (var etr : tableview.entrySet()) { + var serviceUnit = etr.getKey(); + var data = etr.getValue(); + + if (excludeSystemTopics && serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) { + continue; + } + if (data.state() == Owned && data.dstBroker().equals(lookupServiceAddress)) { cleaned = false; break; @@ -1177,26 +1187,34 @@ public void cleanOwnerships() { } } - private synchronized void doCleanup(String broker) { long startTime = System.nanoTime(); log.info("Started ownership cleanup for the inactive broker:{}", broker); int orphanServiceUnitCleanupCnt = 0; long totalCleanupErrorCntStart = totalCleanupErrorCnt.get(); + Map orphanSystemServiceUnits = new HashMap<>(); for (var etr : tableview.entrySet()) { var stateData = etr.getValue(); var serviceUnit = etr.getKey(); var state = state(stateData); if (StringUtils.equals(broker, stateData.dstBroker())) { if (isActiveState(state)) { - overrideOwnership(serviceUnit, stateData, broker); + if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) { + orphanSystemServiceUnits.put(serviceUnit, stateData); + } else { + overrideOwnership(serviceUnit, stateData, broker); + } orphanServiceUnitCleanupCnt++; } } else if (StringUtils.equals(broker, stateData.sourceBroker())) { if (isInFlightState(state)) { - overrideOwnership(serviceUnit, stateData, broker); + if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) { + orphanSystemServiceUnits.put(serviceUnit, stateData); + } else { + overrideOwnership(serviceUnit, stateData, broker); + } orphanServiceUnitCleanupCnt++; } } @@ -1205,14 +1223,28 @@ private synchronized void doCleanup(String broker) { try { producer.flush(); } catch (PulsarClientException e) { - log.error("Failed to flush the in-flight messages.", e); + log.error("Failed to flush the in-flight non-system bundle override messages.", e); } + if (orphanServiceUnitCleanupCnt > 0) { + waitForCleanups(true, OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS); this.totalOrphanServiceUnitCleanupCnt += orphanServiceUnitCleanupCnt; this.totalInactiveBrokerCleanupCnt++; } + // clean system bundles in the end + for (var orphanSystemServiceUnit : orphanSystemServiceUnits.entrySet()) { + log.info("Overriding orphan system service unit:{}", orphanSystemServiceUnit.getKey()); + overrideOwnership(orphanSystemServiceUnit.getKey(), orphanSystemServiceUnit.getValue(), broker); + } + + try { + producer.flush(); + } catch (PulsarClientException e) { + log.error("Failed to flush the in-flight system bundle override messages.", e); + } + double cleanupTime = TimeUnit.NANOSECONDS .toMillis((System.nanoTime() - startTime)); From 0426d30830d16b05bdc0e8b4933d4085e3026526 Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Thu, 18 May 2023 11:07:44 -0700 Subject: [PATCH 4/4] added comment --- .../channel/ServiceUnitStateChannelImpl.java | 13 +++-- .../channel/ServiceUnitStateChannelTest.java | 55 +++++++++++++------ 2 files changed, 47 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index f3a308add9ef1..6246c26e57bcc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -1152,9 +1152,9 @@ private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanDa } } - protected void waitForCleanups(boolean excludeSystemTopics, int timeoutInMillis) { + private void waitForCleanups(String broker, boolean excludeSystemTopics, int maxWaitTimeInMillis) { long started = System.currentTimeMillis(); - while (System.currentTimeMillis() - started < timeoutInMillis) { + while (System.currentTimeMillis() - started < maxWaitTimeInMillis) { boolean cleaned = true; for (var etr : tableview.entrySet()) { var serviceUnit = etr.getKey(); @@ -1164,7 +1164,7 @@ protected void waitForCleanups(boolean excludeSystemTopics, int timeoutInMillis) continue; } - if (data.state() == Owned && data.dstBroker().equals(lookupServiceAddress)) { + if (data.state() == Owned && broker.equals(data.dstBroker())) { cleaned = false; break; } @@ -1228,7 +1228,12 @@ private synchronized void doCleanup(String broker) { if (orphanServiceUnitCleanupCnt > 0) { - waitForCleanups(true, OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS); + // System bundles can contain this channel's system topic and other important system topics. + // Cleaning such system bundles(closing the system topics) together with the non-system bundles + // can cause the cluster to be temporarily unstable. + // Hence, we clean the non-system bundles first and gracefully wait for them. + // After that, we clean the system bundles, if any. + waitForCleanups(broker, true, OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS); this.totalOrphanServiceUnitCleanupCnt += orphanServiceUnitCleanupCnt; this.totalInactiveBrokerCleanupCnt++; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index 75d13a8be57c0..cb26c460f0a03 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -713,8 +713,8 @@ public void handleBrokerDeletionEventTest() var cleanupJobs1 = getCleanupJobs(channel1); var cleanupJobs2 = getCleanupJobs(channel2); - var leaderCleanupJobs = spy(cleanupJobs1); - var followerCleanupJobs = spy(cleanupJobs2); + var leaderCleanupJobsTmp = spy(cleanupJobs1); + var followerCleanupJobsTmp = spy(cleanupJobs2); var leaderChannel = channel1; var followerChannel = channel2; String leader = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get(); @@ -723,10 +723,12 @@ public void handleBrokerDeletionEventTest() if (leader.equals(lookupServiceAddress2)) { leaderChannel = channel2; followerChannel = channel1; - var tmp = followerCleanupJobs; - followerCleanupJobs = leaderCleanupJobs; - leaderCleanupJobs = tmp; + var tmp = followerCleanupJobsTmp; + followerCleanupJobsTmp = leaderCleanupJobsTmp; + leaderCleanupJobsTmp = tmp; } + final var leaderCleanupJobs = leaderCleanupJobsTmp; + final var followerCleanupJobs = followerCleanupJobsTmp; FieldUtils.writeDeclaredField(leaderChannel, "cleanupJobs", leaderCleanupJobs, true); FieldUtils.writeDeclaredField(followerChannel, "cleanupJobs", followerCleanupJobs, @@ -769,9 +771,11 @@ public void handleBrokerDeletionEventTest() verify(leaderCleanupJobs, times(1)).computeIfAbsent(eq(broker), any()); verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), any()); + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals(0, leaderCleanupJobs.size()); + assertEquals(0, followerCleanupJobs.size()); + }); - assertEquals(0, leaderCleanupJobs.size()); - assertEquals(0, followerCleanupJobs.size()); validateMonitorCounters(leaderChannel, 1, 0, @@ -797,8 +801,12 @@ public void handleBrokerDeletionEventTest() verify(leaderCleanupJobs, times(2)).computeIfAbsent(eq(broker), any()); verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), any()); - assertEquals(1, leaderCleanupJobs.size()); - assertEquals(0, followerCleanupJobs.size()); + + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals(1, leaderCleanupJobs.size()); + assertEquals(0, followerCleanupJobs.size()); + }); + validateMonitorCounters(leaderChannel, 1, 0, @@ -814,8 +822,12 @@ public void handleBrokerDeletionEventTest() verify(leaderCleanupJobs, times(2)).computeIfAbsent(eq(broker), any()); verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), any()); - assertEquals(0, leaderCleanupJobs.size()); - assertEquals(0, followerCleanupJobs.size()); + + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals(0, leaderCleanupJobs.size()); + assertEquals(0, followerCleanupJobs.size()); + }); + validateMonitorCounters(leaderChannel, 1, 0, @@ -833,8 +845,11 @@ public void handleBrokerDeletionEventTest() verify(leaderCleanupJobs, times(3)).computeIfAbsent(eq(broker), any()); verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), any()); - assertEquals(1, leaderCleanupJobs.size()); - assertEquals(0, followerCleanupJobs.size()); + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals(1, leaderCleanupJobs.size()); + assertEquals(0, followerCleanupJobs.size()); + }); + validateMonitorCounters(leaderChannel, 1, 0, @@ -852,8 +867,11 @@ public void handleBrokerDeletionEventTest() verify(leaderCleanupJobs, times(3)).computeIfAbsent(eq(broker), any()); verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), any()); - assertEquals(0, leaderCleanupJobs.size()); - assertEquals(0, followerCleanupJobs.size()); + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals(0, leaderCleanupJobs.size()); + assertEquals(0, followerCleanupJobs.size()); + }); + validateMonitorCounters(leaderChannel, 2, 0, @@ -878,8 +896,11 @@ public void handleBrokerDeletionEventTest() verify(leaderCleanupJobs, times(3)).computeIfAbsent(eq(broker), any()); verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), any()); - assertEquals(0, leaderCleanupJobs.size()); - assertEquals(0, followerCleanupJobs.size()); + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals(0, leaderCleanupJobs.size()); + assertEquals(0, followerCleanupJobs.size()); + }); + validateMonitorCounters(leaderChannel, 2, 0,