From 6f55e5b6406cd80725792dedf31b3f391935e0fb Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Fri, 9 Jun 2023 23:18:34 +0800 Subject: [PATCH 1/4] Handle get owned namespaces admin API --- .../extensions/ExtensibleLoadManagerImpl.java | 26 +++++++++ .../channel/ServiceUnitStateChannelImpl.java | 15 ++--- .../broker/namespace/NamespaceService.java | 57 +++++++++++++++---- .../ExtensibleLoadManagerImplTest.java | 54 +++++++++++++++++- 4 files changed, 127 insertions(+), 25 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 531ab18938a1e..9addf4d57d93b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -37,14 +37,17 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.BrokerFilterException; import org.apache.pulsar.broker.loadbalance.LeaderElectionService; import org.apache.pulsar.broker.loadbalance.LoadManager; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData; @@ -180,6 +183,23 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { .build(); private final CountDownLatch initWaiter = new CountDownLatch(1); + /** + * Get all the bundles that are owned by this broker. + */ + public Set getOwnedServiceUnits() { + var entrySet = serviceUnitStateChannel.getOwnershipEntrySet(); + return entrySet.stream() + .filter(entry -> { + var stateData = entry.getValue(); + return stateData.state() == ServiceUnitState.Owned + && StringUtils.isNotBlank(stateData.dstBroker()) + && stateData.dstBroker().equals(brokerRegistry.getBrokerId()); + }).map(entry -> { + var bundle = entry.getKey(); + return this.getNamespaceBundle(bundle); + }).collect(Collectors.toSet()); + } + public enum Role { Leader, Follower @@ -714,6 +734,12 @@ private void monitor() { } } + public NamespaceBundle getNamespaceBundle(String bundle) { + final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle); + final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle); + return pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName, bundleRange); + } + public void disableBroker() throws Exception { serviceUnitStateChannel.cleanOwnerships(); leaderElectionService.close(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 48192b5dc7cd8..adf8aaa23bc96 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -78,7 +78,6 @@ import org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListener; import org.apache.pulsar.broker.loadbalance.extensions.models.Split; import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; -import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.client.api.CompressionType; @@ -703,7 +702,7 @@ private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) { stateChangeListeners.notify(serviceUnit, data, null); if (isTargetBroker(data.dstBroker())) { log(null, serviceUnit, data, null); - pulsar.getNamespaceService().onNamespaceBundleOwned(getNamespaceBundle(serviceUnit)); + pulsar.getNamespaceService().onNamespaceBundleOwned(loadManager.getNamespaceBundle(serviceUnit)); lastOwnEventHandledAt = System.currentTimeMillis(); } else if (data.force() && isTargetBroker(data.sourceBroker())) { closeServiceUnit(serviceUnit); @@ -803,12 +802,6 @@ private boolean isTargetBroker(String broker) { return broker.equals(lookupServiceAddress); } - private NamespaceBundle getNamespaceBundle(String bundle) { - final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle); - final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle); - return pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName, bundleRange); - } - private CompletableFuture deferGetOwnerRequest(String serviceUnit) { return getOwnerRequests .computeIfAbsent(serviceUnit, k -> { @@ -829,7 +822,7 @@ private CompletableFuture deferGetOwnerRequest(String serviceUnit) { private CompletableFuture closeServiceUnit(String serviceUnit) { long startTime = System.nanoTime(); MutableInt unloadedTopics = new MutableInt(); - NamespaceBundle bundle = getNamespaceBundle(serviceUnit); + NamespaceBundle bundle = loadManager.getNamespaceBundle(serviceUnit); return pulsar.getBrokerService().unloadServiceUnit( bundle, true, @@ -860,7 +853,7 @@ private CompletableFuture splitServiceUnit(String serviceUnit, ServiceUnit long startTime = System.nanoTime(); NamespaceService namespaceService = pulsar.getNamespaceService(); NamespaceBundleFactory bundleFactory = namespaceService.getNamespaceBundleFactory(); - NamespaceBundle bundle = getNamespaceBundle(serviceUnit); + NamespaceBundle bundle = loadManager.getNamespaceBundle(serviceUnit); CompletableFuture completionFuture = new CompletableFuture<>(); Map> bundleToDestBroker = data.splitServiceUnitToDestBroker(); List boundaries = null; @@ -1275,7 +1268,7 @@ private synchronized void doCleanup(String broker) { private Optional selectBroker(String serviceUnit, String inactiveBroker) { try { - return loadManager.selectAsync(getNamespaceBundle(serviceUnit), Set.of(inactiveBroker)) + return loadManager.selectAsync(loadManager.getNamespaceBundle(serviceUnit), Set.of(inactiveBroker)) .get(inFlightStateWaitingTimeInMillis, MILLISECONDS); } catch (Throwable e) { log.error("Failed to select a broker for serviceUnit:{}", serviceUnit); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 69c25d7c6d394..76b4e093f7dc4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -757,21 +757,42 @@ public CompletableFuture unloadNamespaceBundle(NamespaceBundle bundle) { } public CompletableFuture unloadNamespaceBundle(NamespaceBundle bundle, Optional destinationBroker) { - if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { - return ExtensibleLoadManagerImpl.get(loadManager.get()) - .unloadNamespaceBundleAsync(bundle, destinationBroker); - } + // unload namespace bundle - return unloadNamespaceBundle(bundle, config.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS); + return unloadNamespaceBundle(bundle, destinationBroker, + config.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS); + } + + public CompletableFuture unloadNamespaceBundle(NamespaceBundle bundle, + Optional destinationBroker, + long timeout, + TimeUnit timeoutUnit) { + return unloadNamespaceBundle(bundle, destinationBroker, timeout, timeoutUnit, true); + } + + public CompletableFuture unloadNamespaceBundle(NamespaceBundle bundle, + long timeout, + TimeUnit timeoutUnit) { + return unloadNamespaceBundle(bundle, Optional.empty(), timeout, timeoutUnit, true); } - public CompletableFuture unloadNamespaceBundle(NamespaceBundle bundle, long timeout, TimeUnit timeoutUnit) { - return unloadNamespaceBundle(bundle, timeout, timeoutUnit, true); + public CompletableFuture unloadNamespaceBundle(NamespaceBundle bundle, + long timeout, + TimeUnit timeoutUnit, + boolean closeWithoutWaitingClientDisconnect) { + return unloadNamespaceBundle(bundle, Optional.empty(), timeout, + timeoutUnit, closeWithoutWaitingClientDisconnect); } - public CompletableFuture unloadNamespaceBundle(NamespaceBundle bundle, long timeout, + public CompletableFuture unloadNamespaceBundle(NamespaceBundle bundle, + Optional destinationBroker, + long timeout, TimeUnit timeoutUnit, boolean closeWithoutWaitingClientDisconnect) { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { + return ExtensibleLoadManagerImpl.get(loadManager.get()) + .unloadNamespaceBundleAsync(bundle, destinationBroker); + } // unload namespace bundle OwnedBundle ob = ownershipCache.getOwnedBundle(bundle); if (ob == null) { @@ -790,13 +811,23 @@ public CompletableFuture> getOwnedNameSpac .getIsolationDataPoliciesAsync(pulsar.getConfiguration().getClusterName()) .thenApply(nsIsolationPoliciesOpt -> nsIsolationPoliciesOpt.orElseGet(NamespaceIsolationPolicies::new)) .thenCompose(namespaceIsolationPolicies -> { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { + ExtensibleLoadManagerImpl extensibleLoadManager = + ExtensibleLoadManagerImpl.get(loadManager.get()); + var statusMap = extensibleLoadManager.getOwnedServiceUnits().stream() + .collect(Collectors.toMap(NamespaceBundle::toString, + bundle -> getNamespaceOwnershipStatus(true, + namespaceIsolationPolicies.getPolicyByNamespace( + bundle.getNamespaceObject())))); + return CompletableFuture.completedFuture(statusMap); + } Collection> futures = ownershipCache.getOwnedBundlesAsync().values(); return FutureUtil.waitForAll(futures) .thenApply(__ -> futures.stream() .map(CompletableFuture::join) .collect(Collectors.toMap(bundle -> bundle.getNamespaceBundle().toString(), - bundle -> getNamespaceOwnershipStatus(bundle, + bundle -> getNamespaceOwnershipStatus(bundle.isActive(), namespaceIsolationPolicies.getPolicyByNamespace( bundle.getNamespaceBundle().getNamespaceObject())) )) @@ -804,10 +835,10 @@ public CompletableFuture> getOwnedNameSpac }); } - private NamespaceOwnershipStatus getNamespaceOwnershipStatus(OwnedBundle nsObj, + private NamespaceOwnershipStatus getNamespaceOwnershipStatus(boolean isActive, NamespaceIsolationPolicy nsIsolationPolicy) { NamespaceOwnershipStatus nsOwnedStatus = new NamespaceOwnershipStatus(BrokerAssignment.shared, false, - nsObj.isActive()); + isActive); if (nsIsolationPolicy == null) { // no matching policy found, this namespace must be an uncontrolled one and using shared broker return nsOwnedStatus; @@ -1103,6 +1134,10 @@ public OwnershipCache getOwnershipCache() { } public Set getOwnedServiceUnits() { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { + ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get()); + return extensibleLoadManager.getOwnedServiceUnits(); + } return ownershipCache.getOwnedBundles().values().stream().map(OwnedBundle::getNamespaceBundle) .collect(Collectors.toSet()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 1181c18ce40fe..84535f9692be9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -96,8 +96,10 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.BrokerAssignment; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.stats.Metrics; @@ -568,7 +570,7 @@ public void testDeployAndRollbackLoadManager() throws Exception { assertEquals(lookupResult1, lookupResult2); assertEquals(lookupResult1, lookupResult3); - NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get("test")).get(); + NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).get(); LookupOptions options = LookupOptions.builder() .authoritative(false) .requestHttps(false) @@ -964,10 +966,10 @@ public void testDisableBroker() throws Exception { var pulsar3 = additionalPulsarTestContext.getPulsarService(); ExtensibleLoadManagerImpl ternaryLoadManager = spy((ExtensibleLoadManagerImpl) FieldUtils.readField(pulsar3.getLoadManager().get(), "loadManager", true)); - String topic = "persistent://public/default/test"; + String topic = "persistent://" + defaultTestNamespace +"/test"; String lookupResult1 = pulsar3.getAdminClient().lookups().lookupTopic(topic); - TopicName topicName = TopicName.get("test"); + TopicName topicName = TopicName.get(topic); NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get(); if (!pulsar3.getBrokerServiceUrl().equals(lookupResult1)) { admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), bundle.getBundleRange(), @@ -1035,6 +1037,52 @@ public void testListTopic() throws Exception { admin.namespaces().deleteNamespace(namespace, true); } + @Test(timeOut = 30 * 1000) + public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws PulsarAdminException { + Set ownedServiceUnitsByPulsar1 = primaryLoadManager.getOwnedServiceUnits(); + log.info("Owned service units: {}", ownedServiceUnitsByPulsar1); + assertTrue(ownedServiceUnitsByPulsar1.isEmpty()); + Set ownedServiceUnitsByPulsar2 = secondaryLoadManager.getOwnedServiceUnits(); + log.info("Owned service units: {}", ownedServiceUnitsByPulsar2); + assertTrue(ownedServiceUnitsByPulsar2.isEmpty()); + Map ownedNamespacesByPulsar1 = + admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar1.getLookupServiceAddress()); + Map ownedNamespacesByPulsar2 = + admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar2.getLookupServiceAddress()); + assertTrue(ownedNamespacesByPulsar1.isEmpty()); + assertTrue(ownedNamespacesByPulsar2.isEmpty()); + + String topic = "persistent://" + defaultTestNamespace + "/test-get-owned-service-units"; + admin.topics().createPartitionedTopic(topic, 1); + NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).join(); + CompletableFuture> owner = primaryLoadManager.assign(Optional.empty(), bundle); + assertFalse(owner.join().isEmpty()); + + BrokerLookupData brokerLookupData = owner.join().get(); + if (brokerLookupData.getWebServiceUrl().equals(pulsar1.getWebServiceAddress())) { + assertOwnedServiceUnits(pulsar1, primaryLoadManager, bundle); + } else { + assertOwnedServiceUnits(pulsar2, secondaryLoadManager, bundle); + } + } + + private void assertOwnedServiceUnits( + PulsarService pulsar, + ExtensibleLoadManagerImpl extensibleLoadManager, + NamespaceBundle bundle) throws PulsarAdminException { + Awaitility.await().untilAsserted(() -> { + Set ownedBundles = extensibleLoadManager.getOwnedServiceUnits(); + assertTrue(ownedBundles.contains(bundle)); + }); + Map ownedNamespaces = + admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar.getLookupServiceAddress()); + assertTrue(ownedNamespaces.containsKey(bundle.toString())); + NamespaceOwnershipStatus status = ownedNamespaces.get(bundle.toString()); + assertTrue(status.is_active); + assertFalse(status.is_controlled); + assertEquals(status.broker_assignment, BrokerAssignment.shared); + } + private static abstract class MockBrokerFilter implements BrokerFilter { @Override From 13186315c9ffa87cddf8f804b42246b6c800976c Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Sat, 10 Jun 2023 09:14:04 +0800 Subject: [PATCH 2/4] Address comments --- .../extensions/ExtensibleLoadManagerImpl.java | 11 +++-------- .../channel/ServiceUnitStateChannelImpl.java | 11 +++++++---- .../broker/loadbalance/impl/LoadManagerShared.java | 6 ++++++ 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 9addf4d57d93b..69a17c7810bb2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -23,6 +23,7 @@ import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Leader; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin; +import static org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.getNamespaceBundle; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.ArrayList; @@ -173,7 +174,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { private final SplitCounter splitCounter = new SplitCounter(); // record unload metrics - private final AtomicReference> unloadMetrics = new AtomicReference(); + private final AtomicReference> unloadMetrics = new AtomicReference<>(); // record split metrics private final AtomicReference> splitMetrics = new AtomicReference<>(); @@ -196,7 +197,7 @@ public Set getOwnedServiceUnits() { && stateData.dstBroker().equals(brokerRegistry.getBrokerId()); }).map(entry -> { var bundle = entry.getKey(); - return this.getNamespaceBundle(bundle); + return getNamespaceBundle(pulsar, bundle); }).collect(Collectors.toSet()); } @@ -734,12 +735,6 @@ private void monitor() { } } - public NamespaceBundle getNamespaceBundle(String bundle) { - final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle); - final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle); - return pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName, bundleRange); - } - public void disableBroker() throws Exception { serviceUnitStateChannel.cleanOwnerships(); leaderElectionService.close(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index adf8aaa23bc96..99c538e6ecfa3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -78,6 +78,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListener; import org.apache.pulsar.broker.loadbalance.extensions.models.Split; import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; +import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.client.api.CompressionType; @@ -702,7 +703,8 @@ private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) { stateChangeListeners.notify(serviceUnit, data, null); if (isTargetBroker(data.dstBroker())) { log(null, serviceUnit, data, null); - pulsar.getNamespaceService().onNamespaceBundleOwned(loadManager.getNamespaceBundle(serviceUnit)); + pulsar.getNamespaceService() + .onNamespaceBundleOwned(LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit)); lastOwnEventHandledAt = System.currentTimeMillis(); } else if (data.force() && isTargetBroker(data.sourceBroker())) { closeServiceUnit(serviceUnit); @@ -822,7 +824,7 @@ private CompletableFuture deferGetOwnerRequest(String serviceUnit) { private CompletableFuture closeServiceUnit(String serviceUnit) { long startTime = System.nanoTime(); MutableInt unloadedTopics = new MutableInt(); - NamespaceBundle bundle = loadManager.getNamespaceBundle(serviceUnit); + NamespaceBundle bundle = LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit); return pulsar.getBrokerService().unloadServiceUnit( bundle, true, @@ -853,7 +855,7 @@ private CompletableFuture splitServiceUnit(String serviceUnit, ServiceUnit long startTime = System.nanoTime(); NamespaceService namespaceService = pulsar.getNamespaceService(); NamespaceBundleFactory bundleFactory = namespaceService.getNamespaceBundleFactory(); - NamespaceBundle bundle = loadManager.getNamespaceBundle(serviceUnit); + NamespaceBundle bundle = LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit); CompletableFuture completionFuture = new CompletableFuture<>(); Map> bundleToDestBroker = data.splitServiceUnitToDestBroker(); List boundaries = null; @@ -1268,7 +1270,8 @@ private synchronized void doCleanup(String broker) { private Optional selectBroker(String serviceUnit, String inactiveBroker) { try { - return loadManager.selectAsync(loadManager.getNamespaceBundle(serviceUnit), Set.of(inactiveBroker)) + return loadManager.selectAsync( + LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit), Set.of(inactiveBroker)) .get(inFlightStateWaitingTimeInMillis, MILLISECONDS); } catch (Throwable e) { log.error("Failed to select a broker for serviceUnit:{}", serviceUnit); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java index 6818ae03b5280..33f346adbe0c5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java @@ -711,4 +711,10 @@ public static void refreshBrokerToFailureDomainMap(PulsarService pulsar, LOG.warn("Failed to get domain-list for cluster {}", e.getMessage()); } } + + public static NamespaceBundle getNamespaceBundle(PulsarService pulsar, String bundle) { + final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle); + final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle); + return pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName, bundleRange); + } } From 7fb52e855de993ef9bfa6ffc0ebaaaf97184e66d Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Sat, 10 Jun 2023 17:10:32 +0800 Subject: [PATCH 3/4] Define local value for broker id --- .../loadbalance/extensions/ExtensibleLoadManagerImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 69a17c7810bb2..ea2085f8c330f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -189,12 +189,13 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { */ public Set getOwnedServiceUnits() { var entrySet = serviceUnitStateChannel.getOwnershipEntrySet(); + var brokerId = brokerRegistry.getBrokerId(); return entrySet.stream() .filter(entry -> { var stateData = entry.getValue(); return stateData.state() == ServiceUnitState.Owned && StringUtils.isNotBlank(stateData.dstBroker()) - && stateData.dstBroker().equals(brokerRegistry.getBrokerId()); + && stateData.dstBroker().equals(brokerId); }).map(entry -> { var bundle = entry.getKey(); return getNamespaceBundle(pulsar, bundle); From c59f34bbe56a9fa4cffdc26bdc7d93d4e915119d Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Mon, 12 Jun 2023 15:34:57 +0800 Subject: [PATCH 4/4] Specifies the type --- .../loadbalance/extensions/ExtensibleLoadManagerImpl.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index ea2085f8c330f..4de6ccaf79c14 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -51,6 +51,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData; @@ -188,8 +189,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { * Get all the bundles that are owned by this broker. */ public Set getOwnedServiceUnits() { - var entrySet = serviceUnitStateChannel.getOwnershipEntrySet(); - var brokerId = brokerRegistry.getBrokerId(); + Set> entrySet = serviceUnitStateChannel.getOwnershipEntrySet(); + String brokerId = brokerRegistry.getBrokerId(); return entrySet.stream() .filter(entry -> { var stateData = entry.getValue();