Skip to content

Commit

Permalink
[improve][broker] Handle get owned namespaces admin API in Extensible…
Browse files Browse the repository at this point in the history
…LoadManager (#20552)
  • Loading branch information
Demogorgon314 authored Jun 12, 2023
1 parent 51c2bb4 commit bad231e
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Leader;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin;
import static org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.getNamespaceBundle;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -37,16 +38,20 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
Expand Down Expand Up @@ -170,7 +175,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
private final SplitCounter splitCounter = new SplitCounter();

// record unload metrics
private final AtomicReference<List<Metrics>> unloadMetrics = new AtomicReference();
private final AtomicReference<List<Metrics>> unloadMetrics = new AtomicReference<>();
// record split metrics
private final AtomicReference<List<Metrics>> splitMetrics = new AtomicReference<>();

Expand All @@ -180,6 +185,24 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
.build();
private final CountDownLatch initWaiter = new CountDownLatch(1);

/**
* Get all the bundles that are owned by this broker.
*/
public Set<NamespaceBundle> getOwnedServiceUnits() {
Set<Map.Entry<String, ServiceUnitStateData>> entrySet = serviceUnitStateChannel.getOwnershipEntrySet();
String brokerId = brokerRegistry.getBrokerId();
return entrySet.stream()
.filter(entry -> {
var stateData = entry.getValue();
return stateData.state() == ServiceUnitState.Owned
&& StringUtils.isNotBlank(stateData.dstBroker())
&& stateData.dstBroker().equals(brokerId);
}).map(entry -> {
var bundle = entry.getKey();
return getNamespaceBundle(pulsar, bundle);
}).collect(Collectors.toSet());
}

public enum Role {
Leader,
Follower
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,8 @@ private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) {
stateChangeListeners.notify(serviceUnit, data, null);
if (isTargetBroker(data.dstBroker())) {
log(null, serviceUnit, data, null);
pulsar.getNamespaceService().onNamespaceBundleOwned(getNamespaceBundle(serviceUnit));
pulsar.getNamespaceService()
.onNamespaceBundleOwned(LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit));
lastOwnEventHandledAt = System.currentTimeMillis();
} else if (data.force() && isTargetBroker(data.sourceBroker())) {
closeServiceUnit(serviceUnit);
Expand Down Expand Up @@ -803,12 +804,6 @@ private boolean isTargetBroker(String broker) {
return broker.equals(lookupServiceAddress);
}

private NamespaceBundle getNamespaceBundle(String bundle) {
final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
return pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName, bundleRange);
}

private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) {
return getOwnerRequests
.computeIfAbsent(serviceUnit, k -> {
Expand All @@ -829,7 +824,7 @@ private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) {
private CompletableFuture<Integer> closeServiceUnit(String serviceUnit) {
long startTime = System.nanoTime();
MutableInt unloadedTopics = new MutableInt();
NamespaceBundle bundle = getNamespaceBundle(serviceUnit);
NamespaceBundle bundle = LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit);
return pulsar.getBrokerService().unloadServiceUnit(
bundle,
true,
Expand Down Expand Up @@ -860,7 +855,7 @@ private CompletableFuture<Void> splitServiceUnit(String serviceUnit, ServiceUnit
long startTime = System.nanoTime();
NamespaceService namespaceService = pulsar.getNamespaceService();
NamespaceBundleFactory bundleFactory = namespaceService.getNamespaceBundleFactory();
NamespaceBundle bundle = getNamespaceBundle(serviceUnit);
NamespaceBundle bundle = LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit);
CompletableFuture<Void> completionFuture = new CompletableFuture<>();
Map<String, Optional<String>> bundleToDestBroker = data.splitServiceUnitToDestBroker();
List<Long> boundaries = null;
Expand Down Expand Up @@ -1275,7 +1270,8 @@ private synchronized void doCleanup(String broker) {

private Optional<String> selectBroker(String serviceUnit, String inactiveBroker) {
try {
return loadManager.selectAsync(getNamespaceBundle(serviceUnit), Set.of(inactiveBroker))
return loadManager.selectAsync(
LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit), Set.of(inactiveBroker))
.get(inFlightStateWaitingTimeInMillis, MILLISECONDS);
} catch (Throwable e) {
log.error("Failed to select a broker for serviceUnit:{}", serviceUnit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -711,4 +711,10 @@ public static void refreshBrokerToFailureDomainMap(PulsarService pulsar,
LOG.warn("Failed to get domain-list for cluster {}", e.getMessage());
}
}

public static NamespaceBundle getNamespaceBundle(PulsarService pulsar, String bundle) {
final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
return pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName, bundleRange);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -757,21 +757,42 @@ public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle) {
}

public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle, Optional<String> destinationBroker) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
return ExtensibleLoadManagerImpl.get(loadManager.get())
.unloadNamespaceBundleAsync(bundle, destinationBroker);
}

// unload namespace bundle
return unloadNamespaceBundle(bundle, config.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);
return unloadNamespaceBundle(bundle, destinationBroker,
config.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);
}

public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
Optional<String> destinationBroker,
long timeout,
TimeUnit timeoutUnit) {
return unloadNamespaceBundle(bundle, destinationBroker, timeout, timeoutUnit, true);
}

public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
long timeout,
TimeUnit timeoutUnit) {
return unloadNamespaceBundle(bundle, Optional.empty(), timeout, timeoutUnit, true);
}

public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle, long timeout, TimeUnit timeoutUnit) {
return unloadNamespaceBundle(bundle, timeout, timeoutUnit, true);
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
long timeout,
TimeUnit timeoutUnit,
boolean closeWithoutWaitingClientDisconnect) {
return unloadNamespaceBundle(bundle, Optional.empty(), timeout,
timeoutUnit, closeWithoutWaitingClientDisconnect);
}

public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle, long timeout,
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
Optional<String> destinationBroker,
long timeout,
TimeUnit timeoutUnit,
boolean closeWithoutWaitingClientDisconnect) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
return ExtensibleLoadManagerImpl.get(loadManager.get())
.unloadNamespaceBundleAsync(bundle, destinationBroker);
}
// unload namespace bundle
OwnedBundle ob = ownershipCache.getOwnedBundle(bundle);
if (ob == null) {
Expand All @@ -790,24 +811,34 @@ public CompletableFuture<Map<String, NamespaceOwnershipStatus>> getOwnedNameSpac
.getIsolationDataPoliciesAsync(pulsar.getConfiguration().getClusterName())
.thenApply(nsIsolationPoliciesOpt -> nsIsolationPoliciesOpt.orElseGet(NamespaceIsolationPolicies::new))
.thenCompose(namespaceIsolationPolicies -> {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
ExtensibleLoadManagerImpl extensibleLoadManager =
ExtensibleLoadManagerImpl.get(loadManager.get());
var statusMap = extensibleLoadManager.getOwnedServiceUnits().stream()
.collect(Collectors.toMap(NamespaceBundle::toString,
bundle -> getNamespaceOwnershipStatus(true,
namespaceIsolationPolicies.getPolicyByNamespace(
bundle.getNamespaceObject()))));
return CompletableFuture.completedFuture(statusMap);
}
Collection<CompletableFuture<OwnedBundle>> futures =
ownershipCache.getOwnedBundlesAsync().values();
return FutureUtil.waitForAll(futures)
.thenApply(__ -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toMap(bundle -> bundle.getNamespaceBundle().toString(),
bundle -> getNamespaceOwnershipStatus(bundle,
bundle -> getNamespaceOwnershipStatus(bundle.isActive(),
namespaceIsolationPolicies.getPolicyByNamespace(
bundle.getNamespaceBundle().getNamespaceObject()))
))
);
});
}

private NamespaceOwnershipStatus getNamespaceOwnershipStatus(OwnedBundle nsObj,
private NamespaceOwnershipStatus getNamespaceOwnershipStatus(boolean isActive,
NamespaceIsolationPolicy nsIsolationPolicy) {
NamespaceOwnershipStatus nsOwnedStatus = new NamespaceOwnershipStatus(BrokerAssignment.shared, false,
nsObj.isActive());
isActive);
if (nsIsolationPolicy == null) {
// no matching policy found, this namespace must be an uncontrolled one and using shared broker
return nsOwnedStatus;
Expand Down Expand Up @@ -1103,6 +1134,10 @@ public OwnershipCache getOwnershipCache() {
}

public Set<NamespaceBundle> getOwnedServiceUnits() {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
return extensibleLoadManager.getOwnedServiceUnits();
}
return ownershipCache.getOwnedBundles().values().stream().map(OwnedBundle::getNamespaceBundle)
.collect(Collectors.toSet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BrokerAssignment;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.stats.Metrics;
Expand Down Expand Up @@ -568,7 +570,7 @@ public void testDeployAndRollbackLoadManager() throws Exception {
assertEquals(lookupResult1, lookupResult2);
assertEquals(lookupResult1, lookupResult3);

NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get("test")).get();
NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).get();
LookupOptions options = LookupOptions.builder()
.authoritative(false)
.requestHttps(false)
Expand Down Expand Up @@ -964,10 +966,10 @@ public void testDisableBroker() throws Exception {
var pulsar3 = additionalPulsarTestContext.getPulsarService();
ExtensibleLoadManagerImpl ternaryLoadManager = spy((ExtensibleLoadManagerImpl)
FieldUtils.readField(pulsar3.getLoadManager().get(), "loadManager", true));
String topic = "persistent://public/default/test";
String topic = "persistent://" + defaultTestNamespace +"/test";

String lookupResult1 = pulsar3.getAdminClient().lookups().lookupTopic(topic);
TopicName topicName = TopicName.get("test");
TopicName topicName = TopicName.get(topic);
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
if (!pulsar3.getBrokerServiceUrl().equals(lookupResult1)) {
admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), bundle.getBundleRange(),
Expand Down Expand Up @@ -1035,6 +1037,52 @@ public void testListTopic() throws Exception {
admin.namespaces().deleteNamespace(namespace, true);
}

@Test(timeOut = 30 * 1000)
public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws PulsarAdminException {
Set<NamespaceBundle> ownedServiceUnitsByPulsar1 = primaryLoadManager.getOwnedServiceUnits();
log.info("Owned service units: {}", ownedServiceUnitsByPulsar1);
assertTrue(ownedServiceUnitsByPulsar1.isEmpty());
Set<NamespaceBundle> ownedServiceUnitsByPulsar2 = secondaryLoadManager.getOwnedServiceUnits();
log.info("Owned service units: {}", ownedServiceUnitsByPulsar2);
assertTrue(ownedServiceUnitsByPulsar2.isEmpty());
Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar1 =
admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar1.getLookupServiceAddress());
Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar2 =
admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar2.getLookupServiceAddress());
assertTrue(ownedNamespacesByPulsar1.isEmpty());
assertTrue(ownedNamespacesByPulsar2.isEmpty());

String topic = "persistent://" + defaultTestNamespace + "/test-get-owned-service-units";
admin.topics().createPartitionedTopic(topic, 1);
NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).join();
CompletableFuture<Optional<BrokerLookupData>> owner = primaryLoadManager.assign(Optional.empty(), bundle);
assertFalse(owner.join().isEmpty());

BrokerLookupData brokerLookupData = owner.join().get();
if (brokerLookupData.getWebServiceUrl().equals(pulsar1.getWebServiceAddress())) {
assertOwnedServiceUnits(pulsar1, primaryLoadManager, bundle);
} else {
assertOwnedServiceUnits(pulsar2, secondaryLoadManager, bundle);
}
}

private void assertOwnedServiceUnits(
PulsarService pulsar,
ExtensibleLoadManagerImpl extensibleLoadManager,
NamespaceBundle bundle) throws PulsarAdminException {
Awaitility.await().untilAsserted(() -> {
Set<NamespaceBundle> ownedBundles = extensibleLoadManager.getOwnedServiceUnits();
assertTrue(ownedBundles.contains(bundle));
});
Map<String, NamespaceOwnershipStatus> ownedNamespaces =
admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar.getLookupServiceAddress());
assertTrue(ownedNamespaces.containsKey(bundle.toString()));
NamespaceOwnershipStatus status = ownedNamespaces.get(bundle.toString());
assertTrue(status.is_active);
assertFalse(status.is_controlled);
assertEquals(status.broker_assignment, BrokerAssignment.shared);
}

private static abstract class MockBrokerFilter implements BrokerFilter {

@Override
Expand Down

0 comments on commit bad231e

Please sign in to comment.