Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Handle get owned namespaces admin API in ExtensibleLoadManager #20552

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,17 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
Expand Down Expand Up @@ -180,6 +183,23 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
.build();
private final CountDownLatch initWaiter = new CountDownLatch(1);

/**
* Get all the bundles that are owned by this broker.
*/
public Set<NamespaceBundle> getOwnedServiceUnits() {
var entrySet = serviceUnitStateChannel.getOwnershipEntrySet();
return entrySet.stream()
.filter(entry -> {
var stateData = entry.getValue();
return stateData.state() == ServiceUnitState.Owned
&& StringUtils.isNotBlank(stateData.dstBroker())
&& stateData.dstBroker().equals(brokerRegistry.getBrokerId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: should we define a local var brokerId to avoid repeated registry access?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is repeated registry access will reduce performance?

Copy link
Contributor

@heesung-sn heesung-sn Jun 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. Afaik, the local variables will be likely in the CPU registers, which can be faster. brokerRegistry.getBrokerId() will need to deference, which could cause page hit/miss and stack getBrokerId() and deference again.

I am not sure if the modern java compiler optimizes this code with local var or not.

}).map(entry -> {
var bundle = entry.getKey();
return this.getNamespaceBundle(bundle);
}).collect(Collectors.toSet());
}

public enum Role {
Leader,
Follower
Expand Down Expand Up @@ -714,6 +734,12 @@ private void monitor() {
}
}

public NamespaceBundle getNamespaceBundle(String bundle) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:move this to a static LoadManagerShared method.

final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
return pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName, bundleRange);
}

public void disableBroker() throws Exception {
serviceUnitStateChannel.cleanOwnerships();
leaderElectionService.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListener;
import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.client.api.CompressionType;
Expand Down Expand Up @@ -703,7 +702,7 @@ private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) {
stateChangeListeners.notify(serviceUnit, data, null);
if (isTargetBroker(data.dstBroker())) {
log(null, serviceUnit, data, null);
pulsar.getNamespaceService().onNamespaceBundleOwned(getNamespaceBundle(serviceUnit));
pulsar.getNamespaceService().onNamespaceBundleOwned(loadManager.getNamespaceBundle(serviceUnit));
lastOwnEventHandledAt = System.currentTimeMillis();
} else if (data.force() && isTargetBroker(data.sourceBroker())) {
closeServiceUnit(serviceUnit);
Expand Down Expand Up @@ -803,12 +802,6 @@ private boolean isTargetBroker(String broker) {
return broker.equals(lookupServiceAddress);
}

private NamespaceBundle getNamespaceBundle(String bundle) {
final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
return pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName, bundleRange);
}

private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) {
return getOwnerRequests
.computeIfAbsent(serviceUnit, k -> {
Expand All @@ -829,7 +822,7 @@ private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) {
private CompletableFuture<Integer> closeServiceUnit(String serviceUnit) {
long startTime = System.nanoTime();
MutableInt unloadedTopics = new MutableInt();
NamespaceBundle bundle = getNamespaceBundle(serviceUnit);
NamespaceBundle bundle = loadManager.getNamespaceBundle(serviceUnit);
return pulsar.getBrokerService().unloadServiceUnit(
bundle,
true,
Expand Down Expand Up @@ -860,7 +853,7 @@ private CompletableFuture<Void> splitServiceUnit(String serviceUnit, ServiceUnit
long startTime = System.nanoTime();
NamespaceService namespaceService = pulsar.getNamespaceService();
NamespaceBundleFactory bundleFactory = namespaceService.getNamespaceBundleFactory();
NamespaceBundle bundle = getNamespaceBundle(serviceUnit);
NamespaceBundle bundle = loadManager.getNamespaceBundle(serviceUnit);
CompletableFuture<Void> completionFuture = new CompletableFuture<>();
Map<String, Optional<String>> bundleToDestBroker = data.splitServiceUnitToDestBroker();
List<Long> boundaries = null;
Expand Down Expand Up @@ -1275,7 +1268,7 @@ private synchronized void doCleanup(String broker) {

private Optional<String> selectBroker(String serviceUnit, String inactiveBroker) {
try {
return loadManager.selectAsync(getNamespaceBundle(serviceUnit), Set.of(inactiveBroker))
return loadManager.selectAsync(loadManager.getNamespaceBundle(serviceUnit), Set.of(inactiveBroker))
.get(inFlightStateWaitingTimeInMillis, MILLISECONDS);
} catch (Throwable e) {
log.error("Failed to select a broker for serviceUnit:{}", serviceUnit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -757,21 +757,42 @@ public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle) {
}

public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle, Optional<String> destinationBroker) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
return ExtensibleLoadManagerImpl.get(loadManager.get())
.unloadNamespaceBundleAsync(bundle, destinationBroker);
}

// unload namespace bundle
return unloadNamespaceBundle(bundle, config.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);
return unloadNamespaceBundle(bundle, destinationBroker,
config.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);
}

public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
Optional<String> destinationBroker,
long timeout,
TimeUnit timeoutUnit) {
return unloadNamespaceBundle(bundle, destinationBroker, timeout, timeoutUnit, true);
}

public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
long timeout,
TimeUnit timeoutUnit) {
return unloadNamespaceBundle(bundle, Optional.empty(), timeout, timeoutUnit, true);
}

public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle, long timeout, TimeUnit timeoutUnit) {
return unloadNamespaceBundle(bundle, timeout, timeoutUnit, true);
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
long timeout,
TimeUnit timeoutUnit,
boolean closeWithoutWaitingClientDisconnect) {
return unloadNamespaceBundle(bundle, Optional.empty(), timeout,
timeoutUnit, closeWithoutWaitingClientDisconnect);
}

public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle, long timeout,
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
Optional<String> destinationBroker,
long timeout,
TimeUnit timeoutUnit,
boolean closeWithoutWaitingClientDisconnect) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
return ExtensibleLoadManagerImpl.get(loadManager.get())
.unloadNamespaceBundleAsync(bundle, destinationBroker);
}
// unload namespace bundle
OwnedBundle ob = ownershipCache.getOwnedBundle(bundle);
if (ob == null) {
Expand All @@ -790,24 +811,34 @@ public CompletableFuture<Map<String, NamespaceOwnershipStatus>> getOwnedNameSpac
.getIsolationDataPoliciesAsync(pulsar.getConfiguration().getClusterName())
.thenApply(nsIsolationPoliciesOpt -> nsIsolationPoliciesOpt.orElseGet(NamespaceIsolationPolicies::new))
.thenCompose(namespaceIsolationPolicies -> {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
ExtensibleLoadManagerImpl extensibleLoadManager =
ExtensibleLoadManagerImpl.get(loadManager.get());
var statusMap = extensibleLoadManager.getOwnedServiceUnits().stream()
.collect(Collectors.toMap(NamespaceBundle::toString,
bundle -> getNamespaceOwnershipStatus(true,
namespaceIsolationPolicies.getPolicyByNamespace(
bundle.getNamespaceObject()))));
return CompletableFuture.completedFuture(statusMap);
}
Collection<CompletableFuture<OwnedBundle>> futures =
ownershipCache.getOwnedBundlesAsync().values();
return FutureUtil.waitForAll(futures)
.thenApply(__ -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toMap(bundle -> bundle.getNamespaceBundle().toString(),
bundle -> getNamespaceOwnershipStatus(bundle,
bundle -> getNamespaceOwnershipStatus(bundle.isActive(),
namespaceIsolationPolicies.getPolicyByNamespace(
bundle.getNamespaceBundle().getNamespaceObject()))
))
);
});
}

private NamespaceOwnershipStatus getNamespaceOwnershipStatus(OwnedBundle nsObj,
private NamespaceOwnershipStatus getNamespaceOwnershipStatus(boolean isActive,
NamespaceIsolationPolicy nsIsolationPolicy) {
NamespaceOwnershipStatus nsOwnedStatus = new NamespaceOwnershipStatus(BrokerAssignment.shared, false,
nsObj.isActive());
isActive);
if (nsIsolationPolicy == null) {
// no matching policy found, this namespace must be an uncontrolled one and using shared broker
return nsOwnedStatus;
Expand Down Expand Up @@ -1103,6 +1134,10 @@ public OwnershipCache getOwnershipCache() {
}

public Set<NamespaceBundle> getOwnedServiceUnits() {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, having all these references to a specific implementation (ExtensibleLoadManagerImpl) is a code smell.

We should rely on object oriented programming principals.
I am not going to block this patch, but I think that we are accumulating some tech debt that we will have to pay later

Copy link
Member Author

@Demogorgon314 Demogorgon314 Jun 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we must do some refactoring in the feature... We might need to do some abstract for NamespaceService.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that the new vs old LM logic variations in NamespaceService are more than what we originally estimated. Yes, we can define NamespaceServiceExtension extends NamespaceService.

If the community decides to only maintain the extension logic in the future(I assume this wont happen in the near future), I think we can clean the old LMlogic in NamespaceService too.

ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
return extensibleLoadManager.getOwnedServiceUnits();
}
return ownershipCache.getOwnedBundles().values().stream().map(OwnedBundle::getNamespaceBundle)
.collect(Collectors.toSet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BrokerAssignment;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.stats.Metrics;
Expand Down Expand Up @@ -568,7 +570,7 @@ public void testDeployAndRollbackLoadManager() throws Exception {
assertEquals(lookupResult1, lookupResult2);
assertEquals(lookupResult1, lookupResult3);

NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get("test")).get();
NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).get();
LookupOptions options = LookupOptions.builder()
.authoritative(false)
.requestHttps(false)
Expand Down Expand Up @@ -964,10 +966,10 @@ public void testDisableBroker() throws Exception {
var pulsar3 = additionalPulsarTestContext.getPulsarService();
ExtensibleLoadManagerImpl ternaryLoadManager = spy((ExtensibleLoadManagerImpl)
FieldUtils.readField(pulsar3.getLoadManager().get(), "loadManager", true));
String topic = "persistent://public/default/test";
String topic = "persistent://" + defaultTestNamespace +"/test";

String lookupResult1 = pulsar3.getAdminClient().lookups().lookupTopic(topic);
TopicName topicName = TopicName.get("test");
TopicName topicName = TopicName.get(topic);
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
if (!pulsar3.getBrokerServiceUrl().equals(lookupResult1)) {
admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), bundle.getBundleRange(),
Expand Down Expand Up @@ -1035,6 +1037,52 @@ public void testListTopic() throws Exception {
admin.namespaces().deleteNamespace(namespace, true);
}

@Test(timeOut = 30 * 1000)
public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws PulsarAdminException {
Set<NamespaceBundle> ownedServiceUnitsByPulsar1 = primaryLoadManager.getOwnedServiceUnits();
log.info("Owned service units: {}", ownedServiceUnitsByPulsar1);
assertTrue(ownedServiceUnitsByPulsar1.isEmpty());
Set<NamespaceBundle> ownedServiceUnitsByPulsar2 = secondaryLoadManager.getOwnedServiceUnits();
log.info("Owned service units: {}", ownedServiceUnitsByPulsar2);
assertTrue(ownedServiceUnitsByPulsar2.isEmpty());
Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar1 =
admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar1.getLookupServiceAddress());
Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar2 =
admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar2.getLookupServiceAddress());
assertTrue(ownedNamespacesByPulsar1.isEmpty());
assertTrue(ownedNamespacesByPulsar2.isEmpty());

String topic = "persistent://" + defaultTestNamespace + "/test-get-owned-service-units";
admin.topics().createPartitionedTopic(topic, 1);
NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).join();
CompletableFuture<Optional<BrokerLookupData>> owner = primaryLoadManager.assign(Optional.empty(), bundle);
assertFalse(owner.join().isEmpty());

BrokerLookupData brokerLookupData = owner.join().get();
if (brokerLookupData.getWebServiceUrl().equals(pulsar1.getWebServiceAddress())) {
assertOwnedServiceUnits(pulsar1, primaryLoadManager, bundle);
} else {
assertOwnedServiceUnits(pulsar2, secondaryLoadManager, bundle);
}
}

private void assertOwnedServiceUnits(
PulsarService pulsar,
ExtensibleLoadManagerImpl extensibleLoadManager,
NamespaceBundle bundle) throws PulsarAdminException {
Awaitility.await().untilAsserted(() -> {
Set<NamespaceBundle> ownedBundles = extensibleLoadManager.getOwnedServiceUnits();
assertTrue(ownedBundles.contains(bundle));
});
Map<String, NamespaceOwnershipStatus> ownedNamespaces =
admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar.getLookupServiceAddress());
assertTrue(ownedNamespaces.containsKey(bundle.toString()));
NamespaceOwnershipStatus status = ownedNamespaces.get(bundle.toString());
assertTrue(status.is_active);
assertFalse(status.is_controlled);
assertEquals(status.broker_assignment, BrokerAssignment.shared);
}

private static abstract class MockBrokerFilter implements BrokerFilter {

@Override
Expand Down