Skip to content

Commit

Permalink
[fix][broker] Handle heartbeat namespace in ExtensibleLoadManager (#2…
Browse files Browse the repository at this point in the history
  • Loading branch information
Demogorgon314 authored Jun 14, 2023
1 parent 441d0ef commit 3069464
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -84,6 +85,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
Expand Down Expand Up @@ -365,56 +367,99 @@ public CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnit

final String bundle = serviceUnit.toString();

CompletableFuture<Optional<BrokerLookupData>> future = lookupRequests.computeIfAbsent(bundle, k -> {
return dedupeLookupRequest(bundle, k -> {
final CompletableFuture<Optional<String>> owner;
// Assign the bundle to channel owner if is internal topic, to avoid circular references.
if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
owner = serviceUnitStateChannel.getChannelOwnerAsync();
} else {
owner = serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
// If the bundle not assign yet, select and publish assign event to channel.
if (broker.isEmpty()) {
return this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
if (brokerOpt.isPresent()) {
assignCounter.incrementSuccess();
log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle);
return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get())
.thenApply(Optional::of);
} else {
throw new IllegalStateException(
"Failed to select the new owner broker for bundle: " + bundle);
}
});
owner = getOwnerAsync(serviceUnit, bundle, false).thenApply(Optional::ofNullable);
}
return getBrokerLookupData(owner, bundle);
});
}

private CompletableFuture<String> getOwnerAsync(
ServiceUnitId serviceUnit, String bundle, boolean ownByLocalBrokerIfAbsent) {
return serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
// If the bundle not assign yet, select and publish assign event to channel.
if (broker.isEmpty()) {
CompletableFuture<Optional<String>> selectedBroker;
if (ownByLocalBrokerIfAbsent) {
String brokerId = this.brokerRegistry.getBrokerId();
selectedBroker = CompletableFuture.completedFuture(Optional.of(brokerId));
} else {
selectedBroker = this.selectAsync(serviceUnit);
}
return selectedBroker.thenCompose(brokerOpt -> {
if (brokerOpt.isPresent()) {
assignCounter.incrementSuccess();
log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle);
return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get());
}
assignCounter.incrementSkip();
// Already assigned, return it.
return CompletableFuture.completedFuture(broker);
throw new IllegalStateException(
"Failed to select the new owner broker for bundle: " + bundle);
});
}
assignCounter.incrementSkip();
// Already assigned, return it.
return CompletableFuture.completedFuture(broker.get());
});
}

return owner.thenCompose(broker -> {
if (broker.isEmpty()) {
String errorMsg = String.format(
"Failed to get or assign the owner for bundle:%s", bundle);
log.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
return CompletableFuture.completedFuture(broker.get());
}).thenCompose(broker -> this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
if (brokerLookupData.isEmpty()) {
String errorMsg = String.format(
"Failed to look up a broker registry:%s for bundle:%s", broker, bundle);
log.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
return CompletableFuture.completedFuture(brokerLookupData);
}));
private CompletableFuture<Optional<BrokerLookupData>> getBrokerLookupData(
CompletableFuture<Optional<String>> owner,
String bundle) {
return owner.thenCompose(broker -> {
if (broker.isEmpty()) {
String errorMsg = String.format(
"Failed to get or assign the owner for bundle:%s", bundle);
log.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
return CompletableFuture.completedFuture(broker.get());
}).thenCompose(broker -> this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
if (brokerLookupData.isEmpty()) {
String errorMsg = String.format(
"Failed to look up a broker registry:%s for bundle:%s", broker, bundle);
log.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
return CompletableFuture.completedFuture(brokerLookupData);
}));
}

/**
* Method to get the current owner of the <code>NamespaceBundle</code>
* or set the local broker as the owner if absent.
*
* @param namespaceBundle the <code>NamespaceBundle</code>
* @return The ephemeral node data showing the current ownership info in <code>ServiceUnitStateChannel</code>
*/
public CompletableFuture<NamespaceEphemeralData> tryAcquiringOwnership(NamespaceBundle namespaceBundle) {
log.info("Try acquiring ownership for bundle: {} - {}.", namespaceBundle, brokerRegistry.getBrokerId());
final String bundle = namespaceBundle.toString();
return dedupeLookupRequest(bundle, k -> {
final CompletableFuture<String> owner =
this.getOwnerAsync(namespaceBundle, bundle, true);
return getBrokerLookupData(owner.thenApply(Optional::ofNullable), bundle);
}).thenApply(brokerLookupData -> {
if (brokerLookupData.isEmpty()) {
throw new IllegalStateException(
"Failed to get the broker lookup data for bundle: " + bundle);
}
return brokerLookupData.get().toNamespaceEphemeralData();
});
}

private CompletableFuture<Optional<BrokerLookupData>> dedupeLookupRequest(
String key, Function<String, CompletableFuture<Optional<BrokerLookupData>>> provider) {
CompletableFuture<Optional<BrokerLookupData>> future = lookupRequests.computeIfAbsent(key, provider);
future.whenComplete((r, t) -> {
if (t != null) {
assignCounter.incrementFailure();
}
lookupRequests.remove(bundle);
lookupRequests.remove(key);
}
);
return future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1192,6 +1192,11 @@ private synchronized void doCleanup(String broker) {
log.info("Started ownership cleanup for the inactive broker:{}", broker);
int orphanServiceUnitCleanupCnt = 0;
long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
String heartbeatNamespace =
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), pulsar.getConfiguration())
.toString();
String heartbeatNamespaceV2 = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
pulsar.getConfiguration()).toString();

Map<String, ServiceUnitStateData> orphanSystemServiceUnits = new HashMap<>();
for (var etr : tableview.entrySet()) {
Expand All @@ -1202,6 +1207,19 @@ private synchronized void doCleanup(String broker) {
if (isActiveState(state)) {
if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
orphanSystemServiceUnits.put(serviceUnit, stateData);
} else if (serviceUnit.startsWith(heartbeatNamespace)
|| serviceUnit.startsWith(heartbeatNamespaceV2)) {
// Skip the heartbeat namespace
log.info("Skip override heartbeat namespace bundle"
+ " serviceUnit:{}, stateData:{}", serviceUnit, stateData);
tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
if (e != null) {
log.error("Failed cleaning the heartbeat namespace ownership serviceUnit:{}, "
+ "stateData:{}, cleanupErrorCnt:{}.",
serviceUnit, stateData,
totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart, e);
}
});
} else {
overrideOwnership(serviceUnit, stateData, broker);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,14 @@ public boolean registerNamespace(NamespaceName nsname, boolean ensureOwned) thro
// all pre-registered namespace is assumed to have bundles disabled
nsFullBundle = bundleFactory.getFullBundle(nsname);
// v2 namespace will always use full bundle object
NamespaceEphemeralData otherData = ownershipCache.tryAcquiringOwnership(nsFullBundle).get();
final NamespaceEphemeralData otherData;
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
ExtensibleLoadManagerImpl loadManager = ExtensibleLoadManagerImpl.get(this.loadManager.get());
otherData = loadManager.tryAcquiringOwnership(nsFullBundle).get();
} else {
otherData = ownershipCache.tryAcquiringOwnership(nsFullBundle).get();
}

if (StringUtils.equals(pulsar.getBrokerServiceUrl(), otherData.getNativeUrl())
|| StringUtils.equals(pulsar.getBrokerServiceUrlTls(), otherData.getNativeUrlTls())) {
if (nsFullBundle != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,16 @@
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.namespace.NamespaceBundleSplitListener;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.TableViewImpl;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.BrokerAssignment;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand Down Expand Up @@ -1038,19 +1041,48 @@ public void testListTopic() throws Exception {
}

@Test(timeOut = 30 * 1000)
public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws PulsarAdminException {
public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws Exception {
NamespaceName heartbeatNamespacePulsar1V1 =
NamespaceService.getHeartbeatNamespace(pulsar1.getAdvertisedAddress(), pulsar1.getConfiguration());
NamespaceName heartbeatNamespacePulsar1V2 =
NamespaceService.getHeartbeatNamespaceV2(pulsar1.getAdvertisedAddress(), pulsar1.getConfiguration());

NamespaceName heartbeatNamespacePulsar2V1 =
NamespaceService.getHeartbeatNamespace(pulsar2.getAdvertisedAddress(), pulsar2.getConfiguration());
NamespaceName heartbeatNamespacePulsar2V2 =
NamespaceService.getHeartbeatNamespaceV2(pulsar2.getAdvertisedAddress(), pulsar2.getConfiguration());

NamespaceBundle bundle1 = pulsar1.getNamespaceService().getNamespaceBundleFactory()
.getFullBundle(heartbeatNamespacePulsar1V1);
NamespaceBundle bundle2 = pulsar1.getNamespaceService().getNamespaceBundleFactory()
.getFullBundle(heartbeatNamespacePulsar1V2);

NamespaceBundle bundle3 = pulsar2.getNamespaceService().getNamespaceBundleFactory()
.getFullBundle(heartbeatNamespacePulsar2V1);
NamespaceBundle bundle4 = pulsar2.getNamespaceService().getNamespaceBundleFactory()
.getFullBundle(heartbeatNamespacePulsar2V2);

Set<NamespaceBundle> ownedServiceUnitsByPulsar1 = primaryLoadManager.getOwnedServiceUnits();
log.info("Owned service units: {}", ownedServiceUnitsByPulsar1);
assertTrue(ownedServiceUnitsByPulsar1.isEmpty());
// heartbeat namespace bundle will own by pulsar1
assertEquals(ownedServiceUnitsByPulsar1.size(), 2);
assertTrue(ownedServiceUnitsByPulsar1.contains(bundle1));
assertTrue(ownedServiceUnitsByPulsar1.contains(bundle2));
Set<NamespaceBundle> ownedServiceUnitsByPulsar2 = secondaryLoadManager.getOwnedServiceUnits();
log.info("Owned service units: {}", ownedServiceUnitsByPulsar2);
assertTrue(ownedServiceUnitsByPulsar2.isEmpty());
assertEquals(ownedServiceUnitsByPulsar2.size(), 2);
assertTrue(ownedServiceUnitsByPulsar2.contains(bundle3));
assertTrue(ownedServiceUnitsByPulsar2.contains(bundle4));
Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar1 =
admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar1.getLookupServiceAddress());
Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar2 =
admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar2.getLookupServiceAddress());
assertTrue(ownedNamespacesByPulsar1.isEmpty());
assertTrue(ownedNamespacesByPulsar2.isEmpty());
assertEquals(ownedNamespacesByPulsar1.size(), 2);
assertTrue(ownedNamespacesByPulsar1.containsKey(bundle1.toString()));
assertTrue(ownedNamespacesByPulsar1.containsKey(bundle2.toString()));
assertEquals(ownedNamespacesByPulsar2.size(), 2);
assertTrue(ownedNamespacesByPulsar2.containsKey(bundle3.toString()));
assertTrue(ownedNamespacesByPulsar2.containsKey(bundle4.toString()));

String topic = "persistent://" + defaultTestNamespace + "/test-get-owned-service-units";
admin.topics().createPartitionedTopic(topic, 1);
Expand Down Expand Up @@ -1083,6 +1115,23 @@ private void assertOwnedServiceUnits(
assertEquals(status.broker_assignment, BrokerAssignment.shared);
}

@Test(timeOut = 30 * 1000)
public void testTryAcquiringOwnership()
throws PulsarAdminException, ExecutionException, InterruptedException {
final String namespace = "public/testTryAcquiringOwnership";
admin.namespaces().createNamespace(namespace, 1);
String topic = "persistent://" + namespace + "/test";
NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).get();
NamespaceEphemeralData namespaceEphemeralData = primaryLoadManager.tryAcquiringOwnership(bundle).get();
assertEquals(namespaceEphemeralData.getNativeUrl(), pulsar1.getBrokerServiceUrl());
admin.namespaces().deleteNamespace(namespace, true);
}

@Test(timeOut = 30 * 1000)
public void testHealthcheck() throws PulsarAdminException {
admin.brokers().healthcheck(TopicVersion.V2);
}

private static abstract class MockBrokerFilter implements BrokerFilter {

@Override
Expand Down

0 comments on commit 3069464

Please sign in to comment.