diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 5fc9920d0f215d..2f74319df9166f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -382,6 +382,17 @@ public void closeMetadataServiceSession() throws Exception { localMetadataStore.close(); } + private void closeLeaderElectionService() throws Exception { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { + ExtensibleLoadManagerImpl.get(loadManager.get()).getLeaderElectionService().close(); + } else { + if (this.leaderElectionService != null) { + this.leaderElectionService.close(); + this.leaderElectionService = null; + } + } + } + @Override public void close() throws PulsarServerException { try { @@ -502,10 +513,7 @@ public CompletableFuture closeAsync() { this.bkClientFactory = null; } - if (this.leaderElectionService != null) { - this.leaderElectionService.close(); - this.leaderElectionService = null; - } + closeLeaderElectionService(); if (adminClient != null) { adminClient.close(); @@ -1316,7 +1324,11 @@ public boolean isRunning() { * @return a reference of the current LeaderElectionService instance. */ public LeaderElectionService getLeaderElectionService() { - return this.leaderElectionService; + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { + return ExtensibleLoadManagerImpl.get(loadManager.get()).getLeaderElectionService(); + } else { + return this.leaderElectionService; + } } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 4ebf537f7a8a85..3c7fdf060c386c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -380,12 +380,22 @@ public CompletableFuture> assign(Optional> selectAsync(ServiceUnitId bundle) { + return selectAsync(bundle, Optional.empty()); + } + + public CompletableFuture> selectAsync(ServiceUnitId bundle, + Optional> excludeBrokerSet) { BrokerRegistry brokerRegistry = getBrokerRegistry(); return brokerRegistry.getAvailableBrokerLookupDataAsync() .thenCompose(availableBrokers -> { LoadManagerContext context = this.getContext(); Map availableBrokerCandidates = new HashMap<>(availableBrokers); + if (excludeBrokerSet.isPresent()) { + for (String exclude : excludeBrokerSet.get()) { + availableBrokerCandidates.remove(exclude); + } + } // Filter out brokers that do not meet the rules. List filterPipeline = getBrokerFilterPipeline(); @@ -685,4 +695,10 @@ private void monitor() { log.error("Failed to get the channel ownership.", e); } } + + public void disableBroker() throws Exception { + serviceUnitStateChannel.cleanOwnerships(); + leaderElectionService.close(); + brokerRegistry.unregister(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java index 1eabbe620e2131..18e949537dedbf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java @@ -74,7 +74,7 @@ public CompletableFuture checkOwnershipAsync(Optional to @Override public void disableBroker() throws Exception { - this.loadManager.getBrokerRegistry().unregister(); + this.loadManager.disableBroker(); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java index 4782a31fe0c56d..6e75fe91a914ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java @@ -206,4 +206,9 @@ public interface ServiceUnitStateChannel extends Closeable { * Cancels the ownership monitor. */ void cancelOwnershipMonitor(); + + /** + * Cleans the service unit ownerships from the current broker's channel. + */ + void cleanOwnerships(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index b4c4e7fd5d42a2..265925b321ae4e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -110,6 +110,10 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { public static final CompressionType MSG_COMPRESSION_TYPE = CompressionType.ZSTD; private static final long MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS = 30 * 1000; // 30sec + + private static final int OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS = 5000; + private static final int OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS = 100; + private static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 3000; public static final long VERSION_ID_INIT = 1; // initial versionId private static final long OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS = 60; public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3 mins @@ -694,6 +698,8 @@ private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) { if (isTargetBroker(data.dstBroker())) { log(null, serviceUnit, data, null); lastOwnEventHandledAt = System.currentTimeMillis(); + } else if (data.force() && isTargetBroker(data.sourceBroker())) { + closeServiceUnit(serviceUnit); } } @@ -1114,13 +1120,13 @@ private ServiceUnitStateData getOverrideInactiveBrokerStateData(ServiceUnitState Map.copyOf(orphanData.splitServiceUnitToDestBroker()), true, getNextVersionId(orphanData)); } else { - return new ServiceUnitStateData(Owned, selectedBroker, true, getNextVersionId(orphanData)); + return new ServiceUnitStateData(Owned, selectedBroker, orphanData.dstBroker(), + true, getNextVersionId(orphanData)); } } - private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanData) { - - Optional selectedBroker = selectBroker(serviceUnit); + private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanData, String inactiveBroker) { + Optional selectedBroker = selectBroker(serviceUnit, inactiveBroker); if (selectedBroker.isPresent()) { var override = getOverrideInactiveBrokerStateData(orphanData, selectedBroker.get()); log.info("Overriding ownership serviceUnit:{} from orphanData:{} to overrideData:{}", @@ -1140,8 +1146,37 @@ private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanDa } } + public void cleanOwnerships() { + doCleanup(lookupServiceAddress); + long started = System.currentTimeMillis(); + while (System.currentTimeMillis() - started < OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS) { + boolean cleaned = true; + for (var data : tableview.values()) { + if (data.state() == Owned && data.dstBroker().equals(lookupServiceAddress)) { + cleaned = false; + break; + } + } + if (cleaned) { + try { + MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS); + } catch (InterruptedException e) { + log.warn("Interrupted while gracefully waiting for the cleanup convergence."); + } + break; + } else { + try { + MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS); + } catch (InterruptedException e) { + log.warn("Interrupted while delaying the next service unit clean-up. Cleaning broker:{}", + lookupServiceAddress); + } + } + } + } + - private void doCleanup(String broker) { + private synchronized void doCleanup(String broker) { long startTime = System.nanoTime(); log.info("Started ownership cleanup for the inactive broker:{}", broker); int orphanServiceUnitCleanupCnt = 0; @@ -1153,13 +1188,13 @@ private void doCleanup(String broker) { var state = state(stateData); if (StringUtils.equals(broker, stateData.dstBroker())) { if (isActiveState(state)) { - overrideOwnership(serviceUnit, stateData); + overrideOwnership(serviceUnit, stateData, broker); orphanServiceUnitCleanupCnt++; } } else if (StringUtils.equals(broker, stateData.sourceBroker())) { if (isInFlightState(state)) { - overrideOwnership(serviceUnit, stateData); + overrideOwnership(serviceUnit, stateData, broker); orphanServiceUnitCleanupCnt++; } } @@ -1194,9 +1229,9 @@ private void doCleanup(String broker) { } - private Optional selectBroker(String serviceUnit) { + private Optional selectBroker(String serviceUnit, String inactiveBroker) { try { - return loadManager.selectAsync(getNamespaceBundle(serviceUnit)) + return loadManager.selectAsync(getNamespaceBundle(serviceUnit), Optional.of(Set.of(inactiveBroker))) .get(inFlightStateWaitingTimeInMillis, MILLISECONDS); } catch (Throwable e) { log.error("Failed to select a broker for serviceUnit:{}", serviceUnit); @@ -1204,8 +1239,10 @@ private Optional selectBroker(String serviceUnit) { return Optional.empty(); } - private Optional getRollForwardStateData(String serviceUnit, long nextVersionId) { - Optional selectedBroker = selectBroker(serviceUnit); + private Optional getRollForwardStateData(String serviceUnit, + String inactiveBroker, + long nextVersionId) { + Optional selectedBroker = selectBroker(serviceUnit, inactiveBroker); if (selectedBroker.isEmpty()) { return Optional.empty(); } @@ -1220,7 +1257,7 @@ private Optional getOverrideInFlightStateData( var state = orphanData.state(); switch (state) { case Assigning: { - return getRollForwardStateData(serviceUnit, nextVersionId); + return getRollForwardStateData(serviceUnit, orphanData.dstBroker(), nextVersionId); } case Splitting: { return Optional.of(new ServiceUnitStateData(Splitting, @@ -1233,7 +1270,7 @@ private Optional getOverrideInFlightStateData( // rollback to the src return Optional.of(new ServiceUnitStateData(Owned, orphanData.sourceBroker(), true, nextVersionId)); } else { - return getRollForwardStateData(serviceUnit, nextVersionId); + return getRollForwardStateData(serviceUnit, orphanData.sourceBroker(), nextVersionId); } } default: { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index b71eeb4745b872..b131aff68d9096 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -880,6 +880,51 @@ SplitDecision.Reason.Unknown, new AtomicLong(6)) assertEquals(actual, expected); } + @Test + public void testDisableBroker() throws Exception { + // Test rollback to modular load manager. + ServiceConfiguration defaultConf = getDefaultConf(); + defaultConf.setAllowAutoTopicCreation(true); + defaultConf.setForceDeleteNamespaceAllowed(true); + defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); + defaultConf.setLoadBalancerSheddingEnabled(false); + try (var additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf)) { + var pulsar3 = additionalPulsarTestContext.getPulsarService(); + ExtensibleLoadManagerImpl ternaryLoadManager = spy((ExtensibleLoadManagerImpl) + FieldUtils.readField(pulsar3.getLoadManager().get(), "loadManager", true)); + String topic = "persistent://public/default/test"; + + String lookupResult1 = pulsar3.getAdminClient().lookups().lookupTopic(topic); + TopicName topicName = TopicName.get("test"); + NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get(); + if (!pulsar3.getBrokerServiceUrl().equals(lookupResult1)) { + admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), bundle.getBundleRange(), + pulsar3.getLookupServiceAddress()); + lookupResult1 = pulsar2.getAdminClient().lookups().lookupTopic(topic); + } + String lookupResult2 = pulsar1.getAdminClient().lookups().lookupTopic(topic); + String lookupResult3 = pulsar2.getAdminClient().lookups().lookupTopic(topic); + + assertEquals(lookupResult1, pulsar3.getBrokerServiceUrl()); + assertEquals(lookupResult1, lookupResult2); + assertEquals(lookupResult1, lookupResult3); + + + assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + assertTrue(ternaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + + ternaryLoadManager.disableBroker(); + + assertFalse(ternaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + if (primaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()) { + assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + } else { + assertTrue(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + } + } + } + private static abstract class MockBrokerFilter implements BrokerFilter { @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index 77c80187a63e15..75d13a8be57c0b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -515,7 +515,7 @@ public void transferTestWhenDestBrokerFails() // recovered, check the monitor update state : Assigned -> Owned doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1))) - .when(loadManager).selectAsync(any()); + .when(loadManager).selectAsync(any(), any()); FieldUtils.writeDeclaredField(channel2, "producer", producer, true); FieldUtils.writeDeclaredField(channel1, "inFlightStateWaitingTimeInMillis", 1 , true); @@ -735,7 +735,7 @@ public void handleBrokerDeletionEventTest() var owner1 = channel1.getOwnerAsync(bundle1); var owner2 = channel2.getOwnerAsync(bundle2); doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2))) - .when(loadManager).selectAsync(any()); + .when(loadManager).selectAsync(any(), any()); assertTrue(owner1.get().isEmpty()); assertTrue(owner2.get().isEmpty()); @@ -1101,7 +1101,7 @@ public void assignTestWhenDestBrokerProducerFails() FieldUtils.writeDeclaredField(channel2, "inFlightStateWaitingTimeInMillis", 3 * 1000, true); doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2))) - .when(loadManager).selectAsync(any()); + .when(loadManager).selectAsync(any(), any()); channel1.publishAssignEventAsync(bundle, lookupServiceAddress2); // channel1 is broken. the assign won't be complete. waitUntilState(channel1, bundle); @@ -1440,7 +1440,7 @@ public void testOverrideInactiveBrokerStateData() // test stable metadata state doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2))) - .when(loadManager).selectAsync(any()); + .when(loadManager).selectAsync(any(), any()); leaderChannel.handleMetadataSessionEvent(SessionReestablished); followerChannel.handleMetadataSessionEvent(SessionReestablished); FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp", @@ -1505,7 +1505,7 @@ public void testOverrideOrphanStateData() // test stable metadata state doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2))) - .when(loadManager).selectAsync(any()); + .when(loadManager).selectAsync(any(), any()); FieldUtils.writeDeclaredField(leaderChannel, "inFlightStateWaitingTimeInMillis", -1, true); FieldUtils.writeDeclaredField(followerChannel, "inFlightStateWaitingTimeInMillis",