Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-192: Implement extensible load manager #19102

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
5ccd54c
Implement broker registry for new load manager
Demogorgon314 Dec 7, 2022
da8c830
Add extensible load manager impl
Demogorgon314 Dec 19, 2022
a724902
Update
Demogorgon314 Dec 30, 2022
90422da
Return empty broker when has broker filter exception
Demogorgon314 Jan 4, 2023
ca074f9
Do not copy broker candidates
Demogorgon314 Jan 4, 2023
2e6149c
Add todo comments
Demogorgon314 Jan 4, 2023
e512ca5
Change to Optional.empty() back
Demogorgon314 Jan 4, 2023
7210bd8
Use try finally
Demogorgon314 Jan 4, 2023
5562d2f
Create system namespace in test
Demogorgon314 Jan 5, 2023
51be8fb
Revert: Return empty broker when has broker filter exception
Demogorgon314 Jan 17, 2023
3765839
Add setNamespaceBundleAffinity method
Demogorgon314 Jan 17, 2023
e23a6a4
Fix compilation issue
Demogorgon314 Jan 17, 2023
68993a1
Merge branch 'master' into Demogorgon314/Implement_extensible_load_ma…
Demogorgon314 Feb 1, 2023
fb156a5
Merge master into current branch
Demogorgon314 Feb 1, 2023
ef6d0cf
Use set instead of collection
Demogorgon314 Feb 1, 2023
d31f27f
Merge branch 'master' into Demogorgon314/Implement_extensible_load_ma…
Demogorgon314 Feb 1, 2023
053cd07
Fix test
Demogorgon314 Feb 1, 2023
0833b43
Remove synchronized modifier
Demogorgon314 Feb 2, 2023
b03c1bb
Close the load data store
Demogorgon314 Feb 2, 2023
9ac2d58
Start leader election service before load manager start
Demogorgon314 Feb 2, 2023
070827b
Fix test
Demogorgon314 Feb 3, 2023
b8af243
Fix code style
Demogorgon314 Feb 3, 2023
3684d5c
Use failed future
Demogorgon314 Feb 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.lookup.v1.TopicLookup;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.protocol.ProtocolHandlers;
Expand Down Expand Up @@ -818,6 +819,17 @@ public void start() throws PulsarServerException {
this.webSocketService.setLocalCluster(clusterData);
}

// Start the leader election service
startLeaderElectionService();

// By starting the Load manager service, the broker will also become visible
// to the rest of the broker by creating the registration z-node. This needs
// to be done only when the broker is fully operative.
//
// The load manager service and its service unit state channel need to be initialized first
// (namespace service depends on load manager)
this.startLoadManagementService();
Demogorgon314 marked this conversation as resolved.
Show resolved Hide resolved

// Initialize namespace service, after service url assigned. Should init zk and refresh self owner info.
this.nsService.initialize();

Expand All @@ -828,9 +840,6 @@ public void start() throws PulsarServerException {

this.topicPoliciesService.start();

// Start the leader election service
startLeaderElectionService();

// Register heartbeat and bootstrap namespaces.
this.nsService.registerBootstrapNamespaces();

Expand Down Expand Up @@ -859,11 +868,6 @@ public void start() throws PulsarServerException {

this.metricsGenerator = new MetricsGenerator(this);

// By starting the Load manager service, the broker will also become visible
// to the rest of the broker by creating the registration z-node. This needs
// to be done only when the broker is fully operative.
this.startLoadManagementService();

// Initialize the message protocol handlers.
// start the protocol handlers only after the broker is ready,
// so that the protocol handlers can access broker service properly.
Expand Down Expand Up @@ -1103,6 +1107,10 @@ protected void closeLocalMetadataStore() throws Exception {
}

protected void startLeaderElectionService() {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
LOG.info("The load manager extension is enabled. Skipping PulsarService LeaderElectionService.");
return;
Demogorgon314 marked this conversation as resolved.
Show resolved Hide resolved
}
this.leaderElectionService = new LeaderElectionService(coordinationService, getSafeWebServiceAddress(),
state -> {
if (state == LeaderElectionState.Leading) {
Expand Down Expand Up @@ -1207,7 +1215,7 @@ protected void startLoadManagementService() throws PulsarServerException {
LOG.info("Starting load management service ...");
this.loadManager.get().start();

if (config.isLoadBalancerEnabled()) {
if (config.isLoadBalancerEnabled() && !ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
LOG.info("Starting load balancer");
if (this.loadReportTask == null) {
long loadReportMinInterval = config.getLoadBalancerReportUpdateMinIntervalMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManager;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.Reflections;
Expand Down Expand Up @@ -58,6 +62,15 @@ public interface LoadManager {
*/
Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception;

default CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
throw new UnsupportedOperationException();
}

default CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
throw new UnsupportedOperationException();
}

/**
* Generate the load report.
*/
Expand Down Expand Up @@ -145,6 +158,11 @@ static LoadManager create(final PulsarService pulsar) {
final LoadManager casted = new ModularLoadManagerWrapper((ModularLoadManager) loadManagerInstance);
casted.initialize(pulsar);
return casted;
} else if (loadManagerInstance instanceof ExtensibleLoadManager) {
final LoadManager casted =
new ExtensibleLoadManagerWrapper((ExtensibleLoadManagerImpl) loadManagerInstance);
casted.initialize(pulsar);
return casted;
}
} catch (Exception e) {
LOG.warn("Error when trying to create load manager: ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ public interface ExtensibleLoadManager extends Closeable {
*/
CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnitId> topic, ServiceUnitId serviceUnit);

/**
* Check the incoming service unit is owned by the current broker.
*
* @param topic The optional topic, some method won't provide topic var in this param.
* @param serviceUnit The service unit (e.g. bundle).
* @return The broker lookup data.
*/
CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId serviceUnit);

/**
* Close the load manager.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory;
import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;

@Slf4j
public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {

public static final String BROKER_LOAD_DATA_STORE_TOPIC = TopicName.get(
TopicDomain.non_persistent.value(),
NamespaceName.SYSTEM_NAMESPACE,
"loadbalancer-broker-load-data").toString();

public static final String TOP_BUNDLES_LOAD_DATA_STORE_TOPIC = TopicName.get(
TopicDomain.non_persistent.value(),
NamespaceName.SYSTEM_NAMESPACE,
"loadbalancer-top-bundles-load-data").toString();

private PulsarService pulsar;

private ServiceConfiguration conf;

@Getter
private BrokerRegistry brokerRegistry;

private ServiceUnitStateChannel serviceUnitStateChannel;

private LoadDataStore<BrokerLoadData> brokerLoadDataStore;
private LoadDataStore<TopBundlesLoadData> topBundlesLoadDataStore;
Demogorgon314 marked this conversation as resolved.
Show resolved Hide resolved

@Getter
private LoadManagerContext context;

@Getter
private final BrokerSelectionStrategy brokerSelectionStrategy;

@Getter
private final List<BrokerFilter> brokerFilterPipeline;

private boolean started = false;

private final ConcurrentOpenHashMap<String, CompletableFuture<Optional<BrokerLookupData>>>
lookupRequests = ConcurrentOpenHashMap.<String,
CompletableFuture<Optional<BrokerLookupData>>>newBuilder()
.build();

/**
* Life cycle: Constructor -> initialize -> start -> close.
*/
public ExtensibleLoadManagerImpl() {
this.brokerFilterPipeline = new ArrayList<>();
// TODO: Make brokerSelectionStrategy configurable.
this.brokerSelectionStrategy = new LeastResourceUsageWithWeight();
}

public static boolean isLoadManagerExtensionEnabled(ServiceConfiguration conf) {
return ExtensibleLoadManagerImpl.class.getName().equals(conf.getLoadManagerClassName());
}

@Override
public void start() throws PulsarServerException {
if (this.started) {
return;
}
this.brokerRegistry = new BrokerRegistryImpl(pulsar);
this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
this.brokerRegistry.start();
this.serviceUnitStateChannel.start();

try {
this.brokerLoadDataStore = LoadDataStoreFactory
.create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);
this.topBundlesLoadDataStore = LoadDataStoreFactory
.create(pulsar.getClient(), TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
} catch (LoadDataStoreException e) {
throw new PulsarServerException(e);
}

this.context = LoadManagerContextImpl.builder()
.configuration(conf)
.brokerRegistry(brokerRegistry)
.brokerLoadDataStore(brokerLoadDataStore)
.topBundleLoadDataStore(topBundlesLoadDataStore).build();
// TODO: Start load data reporter.

// TODO: Start unload scheduler and bundle split scheduler
this.started = true;
}

@Override
public void initialize(PulsarService pulsar) {
Demogorgon314 marked this conversation as resolved.
Show resolved Hide resolved
this.pulsar = pulsar;
this.conf = pulsar.getConfiguration();
Demogorgon314 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnitId> topic,
ServiceUnitId serviceUnit) {

final String bundle = serviceUnit.toString();

CompletableFuture<Optional<BrokerLookupData>> future = lookupRequests.computeIfAbsent(bundle, k -> {
final CompletableFuture<Optional<String>> owner;
// Assign the bundle to channel owner if is internal topic, to avoid circular references.
if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
owner = serviceUnitStateChannel.getChannelOwnerAsync();
} else {
owner = serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
// If the bundle not assign yet, select and publish assign event to channel.
if (broker.isEmpty()) {
return this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
if (brokerOpt.isPresent()) {
log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle);
return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get())
.thenApply(Optional::of);
} else {
throw new IllegalStateException(
"Failed to select the new owner broker for bundle: " + bundle);
}
});
}
// Already assigned, return it.
return CompletableFuture.completedFuture(broker);
});
}

return owner.thenCompose(broker -> {
if (broker.isEmpty()) {
String errorMsg = String.format(
"Failed to look up a broker registry:%s for bundle:%s", broker, bundle);
log.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
return CompletableFuture.completedFuture(broker.get());
}).thenCompose(broker -> this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
if (brokerLookupData.isEmpty()) {
String errorMsg = String.format(
"Failed to look up a broker registry:%s for bundle:%s", broker, bundle);
log.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
return CompletableFuture.completedFuture(brokerLookupData);
}));
});
future.whenComplete((r, t) -> lookupRequests.remove(bundle));
return future;
}

private CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle) {
BrokerRegistry brokerRegistry = getBrokerRegistry();
return brokerRegistry.getAvailableBrokerLookupDataAsync()
.thenCompose(availableBrokers -> {
// TODO: Support isolation policies
LoadManagerContext context = this.getContext();

Map<String, BrokerLookupData> availableBrokerCandidates = new HashMap<>(availableBrokers);

// Filter out brokers that do not meet the rules.
List<BrokerFilter> filterPipeline = getBrokerFilterPipeline();
for (final BrokerFilter filter : filterPipeline) {
try {
filter.filter(availableBrokerCandidates, context);
} catch (BrokerFilterException e) {
// TODO: We may need to revisit this error case.
log.error("Failed to filter out brokers.", e);
availableBrokerCandidates = availableBrokers;
Demogorgon314 marked this conversation as resolved.
Show resolved Hide resolved
}
}
if (availableBrokerCandidates.isEmpty()) {
return CompletableFuture.completedFuture(Optional.empty());
}
Set<String> candidateBrokers = availableBrokerCandidates.keySet();

return CompletableFuture.completedFuture(
getBrokerSelectionStrategy().select(candidateBrokers, bundle, context));
});
}

@Override
public CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundleUnit) {
final String bundle = bundleUnit.toString();
CompletableFuture<Optional<String>> owner;
if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
owner = serviceUnitStateChannel.getChannelOwnerAsync();
} else {
owner = serviceUnitStateChannel.getOwnerAsync(bundle);
}

return owner.thenApply(broker -> brokerRegistry.getBrokerId().equals(broker.orElse(null)));
}

@Override
public void close() throws PulsarServerException {
Demogorgon314 marked this conversation as resolved.
Show resolved Hide resolved
if (!this.started) {
return;
}
try {
this.brokerLoadDataStore.close();
this.topBundlesLoadDataStore.close();
} catch (IOException ex) {
throw new PulsarServerException(ex);
} finally {
try {
this.brokerRegistry.close();
} finally {
try {
this.serviceUnitStateChannel.close();
} finally {
this.started = false;
}
}
}
}

private boolean isInternalTopic(String topic) {
return topic.startsWith(ServiceUnitStateChannelImpl.TOPIC)
|| topic.startsWith(BROKER_LOAD_DATA_STORE_TOPIC)
|| topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
}
}
Loading