diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 4de6ccaf79c14..a9a6e6138bda9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -38,6 +38,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.stream.Collectors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -84,6 +85,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies; +import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; @@ -365,56 +367,99 @@ public CompletableFuture> assign(Optional> future = lookupRequests.computeIfAbsent(bundle, k -> { + return dedupeLookupRequest(bundle, k -> { final CompletableFuture> owner; // Assign the bundle to channel owner if is internal topic, to avoid circular references. if (topic.isPresent() && isInternalTopic(topic.get().toString())) { owner = serviceUnitStateChannel.getChannelOwnerAsync(); } else { - owner = serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> { - // If the bundle not assign yet, select and publish assign event to channel. - if (broker.isEmpty()) { - return this.selectAsync(serviceUnit).thenCompose(brokerOpt -> { - if (brokerOpt.isPresent()) { - assignCounter.incrementSuccess(); - log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle); - return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get()) - .thenApply(Optional::of); - } else { - throw new IllegalStateException( - "Failed to select the new owner broker for bundle: " + bundle); - } - }); + owner = getOwnerAsync(serviceUnit, bundle, false).thenApply(Optional::ofNullable); + } + return getBrokerLookupData(owner, bundle); + }); + } + + private CompletableFuture getOwnerAsync( + ServiceUnitId serviceUnit, String bundle, boolean ownByLocalBrokerIfAbsent) { + return serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> { + // If the bundle not assign yet, select and publish assign event to channel. + if (broker.isEmpty()) { + CompletableFuture> selectedBroker; + if (ownByLocalBrokerIfAbsent) { + String brokerId = this.brokerRegistry.getBrokerId(); + selectedBroker = CompletableFuture.completedFuture(Optional.of(brokerId)); + } else { + selectedBroker = this.selectAsync(serviceUnit); + } + return selectedBroker.thenCompose(brokerOpt -> { + if (brokerOpt.isPresent()) { + assignCounter.incrementSuccess(); + log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle); + return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get()); } - assignCounter.incrementSkip(); - // Already assigned, return it. - return CompletableFuture.completedFuture(broker); + throw new IllegalStateException( + "Failed to select the new owner broker for bundle: " + bundle); }); } + assignCounter.incrementSkip(); + // Already assigned, return it. + return CompletableFuture.completedFuture(broker.get()); + }); + } - return owner.thenCompose(broker -> { - if (broker.isEmpty()) { - String errorMsg = String.format( - "Failed to get or assign the owner for bundle:%s", bundle); - log.error(errorMsg); - throw new IllegalStateException(errorMsg); - } - return CompletableFuture.completedFuture(broker.get()); - }).thenCompose(broker -> this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> { - if (brokerLookupData.isEmpty()) { - String errorMsg = String.format( - "Failed to look up a broker registry:%s for bundle:%s", broker, bundle); - log.error(errorMsg); - throw new IllegalStateException(errorMsg); - } - return CompletableFuture.completedFuture(brokerLookupData); - })); + private CompletableFuture> getBrokerLookupData( + CompletableFuture> owner, + String bundle) { + return owner.thenCompose(broker -> { + if (broker.isEmpty()) { + String errorMsg = String.format( + "Failed to get or assign the owner for bundle:%s", bundle); + log.error(errorMsg); + throw new IllegalStateException(errorMsg); + } + return CompletableFuture.completedFuture(broker.get()); + }).thenCompose(broker -> this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> { + if (brokerLookupData.isEmpty()) { + String errorMsg = String.format( + "Failed to look up a broker registry:%s for bundle:%s", broker, bundle); + log.error(errorMsg); + throw new IllegalStateException(errorMsg); + } + return CompletableFuture.completedFuture(brokerLookupData); + })); + } + + /** + * Method to get the current owner of the NamespaceBundle + * or set the local broker as the owner if absent. + * + * @param namespaceBundle the NamespaceBundle + * @return The ephemeral node data showing the current ownership info in ServiceUnitStateChannel + */ + public CompletableFuture tryAcquiringOwnership(NamespaceBundle namespaceBundle) { + log.info("Try acquiring ownership for bundle: {} - {}.", namespaceBundle, brokerRegistry.getBrokerId()); + final String bundle = namespaceBundle.toString(); + return dedupeLookupRequest(bundle, k -> { + final CompletableFuture owner = + this.getOwnerAsync(namespaceBundle, bundle, true); + return getBrokerLookupData(owner.thenApply(Optional::ofNullable), bundle); + }).thenApply(brokerLookupData -> { + if (brokerLookupData.isEmpty()) { + throw new IllegalStateException( + "Failed to get the broker lookup data for bundle: " + bundle); + } + return brokerLookupData.get().toNamespaceEphemeralData(); }); + } + + private CompletableFuture> dedupeLookupRequest( + String key, Function>> provider) { + CompletableFuture> future = lookupRequests.computeIfAbsent(key, provider); future.whenComplete((r, t) -> { if (t != null) { assignCounter.incrementFailure(); } - lookupRequests.remove(bundle); + lookupRequests.remove(key); } ); return future; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 99c538e6ecfa3..fbf3534e7d440 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -1192,6 +1192,11 @@ private synchronized void doCleanup(String broker) { log.info("Started ownership cleanup for the inactive broker:{}", broker); int orphanServiceUnitCleanupCnt = 0; long totalCleanupErrorCntStart = totalCleanupErrorCnt.get(); + String heartbeatNamespace = + NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), pulsar.getConfiguration()) + .toString(); + String heartbeatNamespaceV2 = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(), + pulsar.getConfiguration()).toString(); Map orphanSystemServiceUnits = new HashMap<>(); for (var etr : tableview.entrySet()) { @@ -1202,6 +1207,19 @@ private synchronized void doCleanup(String broker) { if (isActiveState(state)) { if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) { orphanSystemServiceUnits.put(serviceUnit, stateData); + } else if (serviceUnit.startsWith(heartbeatNamespace) + || serviceUnit.startsWith(heartbeatNamespaceV2)) { + // Skip the heartbeat namespace + log.info("Skip override heartbeat namespace bundle" + + " serviceUnit:{}, stateData:{}", serviceUnit, stateData); + tombstoneAsync(serviceUnit).whenComplete((__, e) -> { + if (e != null) { + log.error("Failed cleaning the heartbeat namespace ownership serviceUnit:{}, " + + "stateData:{}, cleanupErrorCnt:{}.", + serviceUnit, stateData, + totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart, e); + } + }); } else { overrideOwnership(serviceUnit, stateData, broker); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 76b4e093f7dc4..9be8d4938e3e3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -367,7 +367,14 @@ public boolean registerNamespace(NamespaceName nsname, boolean ensureOwned) thro // all pre-registered namespace is assumed to have bundles disabled nsFullBundle = bundleFactory.getFullBundle(nsname); // v2 namespace will always use full bundle object - NamespaceEphemeralData otherData = ownershipCache.tryAcquiringOwnership(nsFullBundle).get(); + final NamespaceEphemeralData otherData; + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { + ExtensibleLoadManagerImpl loadManager = ExtensibleLoadManagerImpl.get(this.loadManager.get()); + otherData = loadManager.tryAcquiringOwnership(nsFullBundle).get(); + } else { + otherData = ownershipCache.tryAcquiringOwnership(nsFullBundle).get(); + } + if (StringUtils.equals(pulsar.getBrokerServiceUrl(), otherData.getNativeUrl()) || StringUtils.equals(pulsar.getBrokerServiceUrlTls(), otherData.getNativeUrlTls())) { if (nsFullBundle != null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 84535f9692be9..c2c60831955a3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -89,6 +89,8 @@ import org.apache.pulsar.broker.namespace.LookupOptions; import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener; import org.apache.pulsar.broker.namespace.NamespaceBundleSplitListener; +import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; +import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.TableViewImpl; @@ -96,6 +98,7 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.naming.TopicVersion; import org.apache.pulsar.common.policies.data.BrokerAssignment; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; @@ -1038,19 +1041,48 @@ public void testListTopic() throws Exception { } @Test(timeOut = 30 * 1000) - public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws PulsarAdminException { + public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws Exception { + NamespaceName heartbeatNamespacePulsar1V1 = + NamespaceService.getHeartbeatNamespace(pulsar1.getAdvertisedAddress(), pulsar1.getConfiguration()); + NamespaceName heartbeatNamespacePulsar1V2 = + NamespaceService.getHeartbeatNamespaceV2(pulsar1.getAdvertisedAddress(), pulsar1.getConfiguration()); + + NamespaceName heartbeatNamespacePulsar2V1 = + NamespaceService.getHeartbeatNamespace(pulsar2.getAdvertisedAddress(), pulsar2.getConfiguration()); + NamespaceName heartbeatNamespacePulsar2V2 = + NamespaceService.getHeartbeatNamespaceV2(pulsar2.getAdvertisedAddress(), pulsar2.getConfiguration()); + + NamespaceBundle bundle1 = pulsar1.getNamespaceService().getNamespaceBundleFactory() + .getFullBundle(heartbeatNamespacePulsar1V1); + NamespaceBundle bundle2 = pulsar1.getNamespaceService().getNamespaceBundleFactory() + .getFullBundle(heartbeatNamespacePulsar1V2); + + NamespaceBundle bundle3 = pulsar2.getNamespaceService().getNamespaceBundleFactory() + .getFullBundle(heartbeatNamespacePulsar2V1); + NamespaceBundle bundle4 = pulsar2.getNamespaceService().getNamespaceBundleFactory() + .getFullBundle(heartbeatNamespacePulsar2V2); + Set ownedServiceUnitsByPulsar1 = primaryLoadManager.getOwnedServiceUnits(); log.info("Owned service units: {}", ownedServiceUnitsByPulsar1); - assertTrue(ownedServiceUnitsByPulsar1.isEmpty()); + // heartbeat namespace bundle will own by pulsar1 + assertEquals(ownedServiceUnitsByPulsar1.size(), 2); + assertTrue(ownedServiceUnitsByPulsar1.contains(bundle1)); + assertTrue(ownedServiceUnitsByPulsar1.contains(bundle2)); Set ownedServiceUnitsByPulsar2 = secondaryLoadManager.getOwnedServiceUnits(); log.info("Owned service units: {}", ownedServiceUnitsByPulsar2); - assertTrue(ownedServiceUnitsByPulsar2.isEmpty()); + assertEquals(ownedServiceUnitsByPulsar2.size(), 2); + assertTrue(ownedServiceUnitsByPulsar2.contains(bundle3)); + assertTrue(ownedServiceUnitsByPulsar2.contains(bundle4)); Map ownedNamespacesByPulsar1 = admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar1.getLookupServiceAddress()); Map ownedNamespacesByPulsar2 = admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar2.getLookupServiceAddress()); - assertTrue(ownedNamespacesByPulsar1.isEmpty()); - assertTrue(ownedNamespacesByPulsar2.isEmpty()); + assertEquals(ownedNamespacesByPulsar1.size(), 2); + assertTrue(ownedNamespacesByPulsar1.containsKey(bundle1.toString())); + assertTrue(ownedNamespacesByPulsar1.containsKey(bundle2.toString())); + assertEquals(ownedNamespacesByPulsar2.size(), 2); + assertTrue(ownedNamespacesByPulsar2.containsKey(bundle3.toString())); + assertTrue(ownedNamespacesByPulsar2.containsKey(bundle4.toString())); String topic = "persistent://" + defaultTestNamespace + "/test-get-owned-service-units"; admin.topics().createPartitionedTopic(topic, 1); @@ -1083,6 +1115,23 @@ private void assertOwnedServiceUnits( assertEquals(status.broker_assignment, BrokerAssignment.shared); } + @Test(timeOut = 30 * 1000) + public void testTryAcquiringOwnership() + throws PulsarAdminException, ExecutionException, InterruptedException { + final String namespace = "public/testTryAcquiringOwnership"; + admin.namespaces().createNamespace(namespace, 1); + String topic = "persistent://" + namespace + "/test"; + NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).get(); + NamespaceEphemeralData namespaceEphemeralData = primaryLoadManager.tryAcquiringOwnership(bundle).get(); + assertEquals(namespaceEphemeralData.getNativeUrl(), pulsar1.getBrokerServiceUrl()); + admin.namespaces().deleteNamespace(namespace, true); + } + + @Test(timeOut = 30 * 1000) + public void testHealthcheck() throws PulsarAdminException { + admin.brokers().healthcheck(TopicVersion.V2); + } + private static abstract class MockBrokerFilter implements BrokerFilter { @Override