Skip to content

Commit

Permalink
[improve][broker] PIP-192: Implement extensible load manager (#19102)
Browse files Browse the repository at this point in the history
PIP: #16691

### Motivation

Implement extensible load manager.

### Modifications

For the PIP-192, this PR adds `ExtensibleLoadManagerImpl` and unit tests.

This PR also changes:
1. Added `CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
            Optional<ServiceUnitId> topic, ServiceUnitId bundle)` to `LoadManager ` interface.
2. Added `CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle)` to `LoadManager` interface.
3. Change `CompletableFuture<String> getOwnerAsync(String serviceUnit)` to `CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit)` to unify the result.
  • Loading branch information
Demogorgon314 authored Feb 3, 2023
1 parent 31fe347 commit c3126c6
Show file tree
Hide file tree
Showing 16 changed files with 905 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.lookup.v1.TopicLookup;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.protocol.ProtocolHandlers;
Expand Down Expand Up @@ -818,6 +819,17 @@ public void start() throws PulsarServerException {
this.webSocketService.setLocalCluster(clusterData);
}

// Start the leader election service
startLeaderElectionService();

// By starting the Load manager service, the broker will also become visible
// to the rest of the broker by creating the registration z-node. This needs
// to be done only when the broker is fully operative.
//
// The load manager service and its service unit state channel need to be initialized first
// (namespace service depends on load manager)
this.startLoadManagementService();

// Initialize namespace service, after service url assigned. Should init zk and refresh self owner info.
this.nsService.initialize();

Expand All @@ -828,9 +840,6 @@ public void start() throws PulsarServerException {

this.topicPoliciesService.start();

// Start the leader election service
startLeaderElectionService();

// Register heartbeat and bootstrap namespaces.
this.nsService.registerBootstrapNamespaces();

Expand Down Expand Up @@ -859,11 +868,6 @@ public void start() throws PulsarServerException {

this.metricsGenerator = new MetricsGenerator(this);

// By starting the Load manager service, the broker will also become visible
// to the rest of the broker by creating the registration z-node. This needs
// to be done only when the broker is fully operative.
this.startLoadManagementService();

// Initialize the message protocol handlers.
// start the protocol handlers only after the broker is ready,
// so that the protocol handlers can access broker service properly.
Expand Down Expand Up @@ -1103,6 +1107,10 @@ protected void closeLocalMetadataStore() throws Exception {
}

protected void startLeaderElectionService() {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
LOG.info("The load manager extension is enabled. Skipping PulsarService LeaderElectionService.");
return;
}
this.leaderElectionService = new LeaderElectionService(coordinationService, getSafeWebServiceAddress(),
state -> {
if (state == LeaderElectionState.Leading) {
Expand Down Expand Up @@ -1207,7 +1215,7 @@ protected void startLoadManagementService() throws PulsarServerException {
LOG.info("Starting load management service ...");
this.loadManager.get().start();

if (config.isLoadBalancerEnabled()) {
if (config.isLoadBalancerEnabled() && !ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
LOG.info("Starting load balancer");
if (this.loadReportTask == null) {
long loadReportMinInterval = config.getLoadBalancerReportUpdateMinIntervalMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManager;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.Reflections;
Expand Down Expand Up @@ -58,6 +62,15 @@ public interface LoadManager {
*/
Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception;

default CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
throw new UnsupportedOperationException();
}

default CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
throw new UnsupportedOperationException();
}

/**
* Generate the load report.
*/
Expand Down Expand Up @@ -145,6 +158,11 @@ static LoadManager create(final PulsarService pulsar) {
final LoadManager casted = new ModularLoadManagerWrapper((ModularLoadManager) loadManagerInstance);
casted.initialize(pulsar);
return casted;
} else if (loadManagerInstance instanceof ExtensibleLoadManager) {
final LoadManager casted =
new ExtensibleLoadManagerWrapper((ExtensibleLoadManagerImpl) loadManagerInstance);
casted.initialize(pulsar);
return casted;
}
} catch (Exception e) {
LOG.warn("Error when trying to create load manager: ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ public interface ExtensibleLoadManager extends Closeable {
*/
CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnitId> topic, ServiceUnitId serviceUnit);

/**
* Check the incoming service unit is owned by the current broker.
*
* @param topic The optional topic, some method won't provide topic var in this param.
* @param serviceUnit The service unit (e.g. bundle).
* @return The broker lookup data.
*/
CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId serviceUnit);

/**
* Close the load manager.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory;
import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;

@Slf4j
public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {

public static final String BROKER_LOAD_DATA_STORE_TOPIC = TopicName.get(
TopicDomain.non_persistent.value(),
NamespaceName.SYSTEM_NAMESPACE,
"loadbalancer-broker-load-data").toString();

public static final String TOP_BUNDLES_LOAD_DATA_STORE_TOPIC = TopicName.get(
TopicDomain.non_persistent.value(),
NamespaceName.SYSTEM_NAMESPACE,
"loadbalancer-top-bundles-load-data").toString();

private PulsarService pulsar;

private ServiceConfiguration conf;

@Getter
private BrokerRegistry brokerRegistry;

private ServiceUnitStateChannel serviceUnitStateChannel;

private LoadDataStore<BrokerLoadData> brokerLoadDataStore;
private LoadDataStore<TopBundlesLoadData> topBundlesLoadDataStore;

@Getter
private LoadManagerContext context;

@Getter
private final BrokerSelectionStrategy brokerSelectionStrategy;

@Getter
private final List<BrokerFilter> brokerFilterPipeline;

private boolean started = false;

private final ConcurrentOpenHashMap<String, CompletableFuture<Optional<BrokerLookupData>>>
lookupRequests = ConcurrentOpenHashMap.<String,
CompletableFuture<Optional<BrokerLookupData>>>newBuilder()
.build();

/**
* Life cycle: Constructor -> initialize -> start -> close.
*/
public ExtensibleLoadManagerImpl() {
this.brokerFilterPipeline = new ArrayList<>();
// TODO: Make brokerSelectionStrategy configurable.
this.brokerSelectionStrategy = new LeastResourceUsageWithWeight();
}

public static boolean isLoadManagerExtensionEnabled(ServiceConfiguration conf) {
return ExtensibleLoadManagerImpl.class.getName().equals(conf.getLoadManagerClassName());
}

@Override
public void start() throws PulsarServerException {
if (this.started) {
return;
}
this.brokerRegistry = new BrokerRegistryImpl(pulsar);
this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
this.brokerRegistry.start();
this.serviceUnitStateChannel.start();

try {
this.brokerLoadDataStore = LoadDataStoreFactory
.create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);
this.topBundlesLoadDataStore = LoadDataStoreFactory
.create(pulsar.getClient(), TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
} catch (LoadDataStoreException e) {
throw new PulsarServerException(e);
}

this.context = LoadManagerContextImpl.builder()
.configuration(conf)
.brokerRegistry(brokerRegistry)
.brokerLoadDataStore(brokerLoadDataStore)
.topBundleLoadDataStore(topBundlesLoadDataStore).build();
// TODO: Start load data reporter.

// TODO: Start unload scheduler and bundle split scheduler
this.started = true;
}

@Override
public void initialize(PulsarService pulsar) {
this.pulsar = pulsar;
this.conf = pulsar.getConfiguration();
}

@Override
public CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnitId> topic,
ServiceUnitId serviceUnit) {

final String bundle = serviceUnit.toString();

CompletableFuture<Optional<BrokerLookupData>> future = lookupRequests.computeIfAbsent(bundle, k -> {
final CompletableFuture<Optional<String>> owner;
// Assign the bundle to channel owner if is internal topic, to avoid circular references.
if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
owner = serviceUnitStateChannel.getChannelOwnerAsync();
} else {
owner = serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
// If the bundle not assign yet, select and publish assign event to channel.
if (broker.isEmpty()) {
return this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
if (brokerOpt.isPresent()) {
log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle);
return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get())
.thenApply(Optional::of);
} else {
throw new IllegalStateException(
"Failed to select the new owner broker for bundle: " + bundle);
}
});
}
// Already assigned, return it.
return CompletableFuture.completedFuture(broker);
});
}

return owner.thenCompose(broker -> {
if (broker.isEmpty()) {
String errorMsg = String.format(
"Failed to look up a broker registry:%s for bundle:%s", broker, bundle);
log.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
return CompletableFuture.completedFuture(broker.get());
}).thenCompose(broker -> this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
if (brokerLookupData.isEmpty()) {
String errorMsg = String.format(
"Failed to look up a broker registry:%s for bundle:%s", broker, bundle);
log.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
return CompletableFuture.completedFuture(brokerLookupData);
}));
});
future.whenComplete((r, t) -> lookupRequests.remove(bundle));
return future;
}

private CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle) {
BrokerRegistry brokerRegistry = getBrokerRegistry();
return brokerRegistry.getAvailableBrokerLookupDataAsync()
.thenCompose(availableBrokers -> {
// TODO: Support isolation policies
LoadManagerContext context = this.getContext();

Map<String, BrokerLookupData> availableBrokerCandidates = new HashMap<>(availableBrokers);

// Filter out brokers that do not meet the rules.
List<BrokerFilter> filterPipeline = getBrokerFilterPipeline();
for (final BrokerFilter filter : filterPipeline) {
try {
filter.filter(availableBrokerCandidates, context);
} catch (BrokerFilterException e) {
// TODO: We may need to revisit this error case.
log.error("Failed to filter out brokers.", e);
availableBrokerCandidates = availableBrokers;
}
}
if (availableBrokerCandidates.isEmpty()) {
return CompletableFuture.completedFuture(Optional.empty());
}
Set<String> candidateBrokers = availableBrokerCandidates.keySet();

return CompletableFuture.completedFuture(
getBrokerSelectionStrategy().select(candidateBrokers, bundle, context));
});
}

@Override
public CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundleUnit) {
final String bundle = bundleUnit.toString();
CompletableFuture<Optional<String>> owner;
if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
owner = serviceUnitStateChannel.getChannelOwnerAsync();
} else {
owner = serviceUnitStateChannel.getOwnerAsync(bundle);
}

return owner.thenApply(broker -> brokerRegistry.getBrokerId().equals(broker.orElse(null)));
}

@Override
public void close() throws PulsarServerException {
if (!this.started) {
return;
}
try {
this.brokerLoadDataStore.close();
this.topBundlesLoadDataStore.close();
} catch (IOException ex) {
throw new PulsarServerException(ex);
} finally {
try {
this.brokerRegistry.close();
} finally {
try {
this.serviceUnitStateChannel.close();
} finally {
this.started = false;
}
}
}
}

private boolean isInternalTopic(String topic) {
return topic.startsWith(ServiceUnitStateChannelImpl.TOPIC)
|| topic.startsWith(BROKER_LOAD_DATA_STORE_TOPIC)
|| topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
}
}
Loading

0 comments on commit c3126c6

Please sign in to comment.