Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
Demogorgon314 committed Dec 30, 2022
1 parent d48b52a commit 3d54c15
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.lookup.v1.TopicLookup;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.protocol.ProtocolHandlers;
Expand Down Expand Up @@ -802,7 +802,7 @@ public void start() throws PulsarServerException {
}
brokerService.start();

if (this.loadManager.get() instanceof ExtensibleLoadManagerWrapper) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
// Init system namespace for extensible load manager
this.createNamespaceIfNotExists(this.getConfiguration().getClusterName(),
SYSTEM_NAMESPACE.getTenant(), SYSTEM_NAMESPACE);
Expand Down Expand Up @@ -842,6 +842,9 @@ public void start() throws PulsarServerException {
// By starting the Load manager service, the broker will also become visible
// to the rest of the broker by creating the registration z-node. This needs
// to be done only when the broker is fully operative.
//
// The load manager service and its service unit state channel need to be initialized first
// (namespace service depends on load manager)
this.startLoadManagementService();

// Initialize namespace service, after service url assigned. Should init zk and refresh self owner info.
Expand Down Expand Up @@ -1129,7 +1132,8 @@ protected void closeLocalMetadataStore() throws Exception {
}

protected void startLeaderElectionService() {
if (this.loadManager.get() instanceof ExtensibleLoadManagerWrapper) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
LOG.info("The load manager extension is enabled. Skipping PulsarService LeaderElectionService.");
return;
}
this.leaderElectionService = new LeaderElectionService(coordinationService, getSafeWebServiceAddress(),
Expand Down Expand Up @@ -1236,7 +1240,7 @@ protected void startLoadManagementService() throws PulsarServerException {
LOG.info("Starting load management service ...");
this.loadManager.get().start();

if (config.isLoadBalancerEnabled()) {
if (config.isLoadBalancerEnabled() && !ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
LOG.info("Starting load balancer");
if (this.loadReportTask == null) {
long loadReportMinInterval = config.getLoadBalancerReportUpdateMinIntervalMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManager;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
Expand Down Expand Up @@ -63,11 +64,11 @@ public interface LoadManager {

default CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
return null;
throw new UnsupportedOperationException();
}

default CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
return null;
throw new UnsupportedOperationException();
}

/**
Expand Down Expand Up @@ -155,7 +156,7 @@ static LoadManager create(final PulsarService pulsar) {
final LoadManager casted = new ModularLoadManagerWrapper((ModularLoadManager) loadManagerInstance);
casted.initialize(pulsar);
return casted;
} else if (loadManagerInstance instanceof ExtensibleLoadManagerImpl) {
} else if (loadManagerInstance instanceof ExtensibleLoadManager) {
final LoadManager casted =
new ExtensibleLoadManagerWrapper((ExtensibleLoadManagerImpl) loadManagerInstance);
casted.initialize(pulsar);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
Expand Down Expand Up @@ -61,7 +60,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
@Getter
private List<BrokerFilter> brokerFilterPipeline;

private final AtomicBoolean started = new AtomicBoolean(false);
private volatile boolean started = false;

private final ConcurrentOpenHashMap<String, CompletableFuture<Optional<BrokerLookupData>>>
lookupRequests = ConcurrentOpenHashMap.<String,
Expand All @@ -73,6 +72,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
*/
public ExtensibleLoadManagerImpl() {
this.brokerFilterPipeline = new ArrayList<>();
// TODO: Make brokerSelectionStrategy configurable.
this.brokerSelectionStrategy = (brokers, bundle, context) -> {
if (brokers.isEmpty()) {
return Optional.empty();
Expand All @@ -81,9 +81,13 @@ public ExtensibleLoadManagerImpl() {
};
}

public static boolean isLoadManagerExtensionEnabled(ServiceConfiguration conf) {
return ExtensibleLoadManagerImpl.class.getName().equals(conf.getLoadManagerClassName());
}

@Override
public void start() throws PulsarServerException {
if (this.started.get()) {
public synchronized void start() throws PulsarServerException {
if (this.started) {
return;
}
this.brokerRegistry = new BrokerRegistryImpl(pulsar);
Expand All @@ -101,8 +105,7 @@ public void start() throws PulsarServerException {
// TODO: Start load data reporter.

// TODO: Start unload scheduler and bundle split scheduler

this.started.set(true);
this.started = true;
}

@Override
Expand All @@ -129,10 +132,11 @@ public CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnit
return this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
if (brokerOpt.isPresent()) {
log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle);
return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get());
return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get())
.thenApply(Optional::of);
} else {
throw new IllegalStateException(
"Failed to discover(select) the new owner broker for bundle: " + bundle);
"Failed to select the new owner broker for bundle: " + bundle);
}
});
}
Expand Down Expand Up @@ -204,8 +208,8 @@ public CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> to
}

@Override
public void close() throws PulsarServerException {
if (!this.started.get()) {
public synchronized void close() throws PulsarServerException {
if (!this.started) {
return;
}
try {
Expand All @@ -214,7 +218,7 @@ public void close() throws PulsarServerException {
throw new PulsarServerException(e);
}
this.serviceUnitStateChannel.close();
this.started.set(false);
this.started = false;
}

private boolean isInternalTopic(String topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,42 +96,45 @@ public void stop() throws PulsarServerException {

@Override
public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception {
return Optional.empty();
throw new UnsupportedOperationException();
}

@Override
public LoadManagerReport generateLoadReport() {
return null;
throw new UnsupportedOperationException();
}

@Override
public void setLoadReportForceUpdateFlag() {
// No-op.
throw new UnsupportedOperationException();
}

@Override
public void writeLoadReportOnZookeeper() throws Exception {
// No-op, this operation is not useful, the load data reporter will automatically write.
throw new UnsupportedOperationException();
}

@Override
public void writeResourceQuotasToZooKeeper() throws Exception {
// No-op, this operation is not useful, the load data reporter will automatically write.
throw new UnsupportedOperationException();
}

@Override
public List<Metrics> getLoadBalancingMetrics() {
// TODO: Add metrics.
return null;
}

@Override
public void doLoadShedding() {
// No-op.
throw new UnsupportedOperationException();
}

@Override
public void doNamespaceBundleSplit() {
// No-op.
throw new UnsupportedOperationException();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public interface ServiceUnitStateChannel extends Closeable {
* the future object will complete and return the owner broker.
* Sub-case2: If the assigned broker does not take the ownership in time,
* the future object will time out.
* Case 3: If none of them, it returns null.
* Case 3: If none of them, it returns Optional.empty().
*/
CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit);

Expand All @@ -132,7 +132,7 @@ public interface ServiceUnitStateChannel extends Closeable {
* case 2: If the assigned broker does not take the ownership in time,
* the future object will time out.
*/
CompletableFuture<Optional<String>> publishAssignEventAsync(String serviceUnit, String broker);
CompletableFuture<String> publishAssignEventAsync(String serviceUnit, String broker);

/**
* Asynchronously publishes the service unit unload event to the system topic in this channel.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
private static final int MAX_OUTSTANDING_PUB_MESSAGES = 500;
private final PulsarService pulsar;
private final Schema<ServiceUnitStateData> schema;
private final ConcurrentOpenHashMap<String, CompletableFuture<Optional<String>>> getOwnerRequests;
private final ConcurrentOpenHashMap<String, CompletableFuture<String>> getOwnerRequests;
private final String lookupServiceAddress;
// TODO: define BrokerRegistry
private final ConcurrentOpenHashMap<String, CompletableFuture<Void>> cleanupJobs;
Expand Down Expand Up @@ -130,13 +130,13 @@ public ServiceUnitStateChannelImpl(PulsarService pulsar) {
this.lookupServiceAddress = pulsar.getLookupServiceAddress();
this.schema = Schema.JSON(ServiceUnitStateData.class);
this.getOwnerRequests = ConcurrentOpenHashMap.<String,
CompletableFuture<Optional<String>>>newBuilder().build();
CompletableFuture<String>>newBuilder().build();
this.cleanupJobs = ConcurrentOpenHashMap.<String, CompletableFuture<Void>>newBuilder().build();
this.inFlightStateWaitingTimeInMillis = MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS;
this.maxCleanupDelayTimeInSecs = MAX_CLEAN_UP_DELAY_TIME_IN_SECS;
this.minCleanupDelayTimeInSecs = MIN_CLEAN_UP_DELAY_TIME_IN_SECS;
this.leaderElectionService = new LeaderElectionService(
pulsar.getCoordinationService(), pulsar.getLookupServiceAddress(),
pulsar.getCoordinationService(), pulsar.getSafeWebServiceAddress(),
state -> {
if (state == LeaderElectionState.Leading) {
log.debug("This broker:{} is the leader now.", lookupServiceAddress);
Expand Down Expand Up @@ -242,7 +242,9 @@ public CompletableFuture<Optional<String>> getChannelOwnerAsync() {
// TODO: discard this protocol prefix removal
// by a util func that returns lookupServiceAddress(serviceUrl)
if (leader.isPresent()) {
return Optional.of(leader.get().getServiceUrl());
String broker = leader.get().getServiceUrl();
broker = broker.substring(broker.lastIndexOf('/') + 1);
return Optional.of(broker);
} else {
// When leader is empty, we should throw exception to notify is failed.
String msg = "There is no channel owner now.";
Expand Down Expand Up @@ -287,7 +289,7 @@ public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
return CompletableFuture.completedFuture(Optional.of(data.broker()));
}
case Assigned, Released -> {
return deferGetOwnerRequest(serviceUnit);
return deferGetOwnerRequest(serviceUnit).thenApply(Optional::of);
}
default -> {
String errorMsg = String.format("Failed to process service unit state data: %s when get owner.", data);
Expand All @@ -299,8 +301,8 @@ public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
}
}

public CompletableFuture<Optional<String>> publishAssignEventAsync(String serviceUnit, String broker) {
CompletableFuture<Optional<String>> getOwnerRequest = deferGetOwnerRequest(serviceUnit);
public CompletableFuture<String> publishAssignEventAsync(String serviceUnit, String broker) {
CompletableFuture<String> getOwnerRequest = deferGetOwnerRequest(serviceUnit);
pubAsync(serviceUnit, new ServiceUnitStateData(Assigned, broker))
.whenComplete((__, ex) -> {
if (ex != null) {
Expand Down Expand Up @@ -385,7 +387,7 @@ lookupServiceAddress, getLogEventTag(data), serviceUnit,
private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) {
var getOwnerRequest = getOwnerRequests.remove(serviceUnit);
if (getOwnerRequest != null) {
getOwnerRequest.complete(Optional.of(data.broker()));
getOwnerRequest.complete(data.broker());
}
if (isTargetBroker(data.broker())) {
log(null, serviceUnit, data, null);
Expand Down Expand Up @@ -470,10 +472,10 @@ private NamespaceBundle getNamespaceBundle(String bundle) {
return pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName, bundleRange);
}

private CompletableFuture<Optional<String>> deferGetOwnerRequest(String serviceUnit) {
private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) {
return getOwnerRequests
.computeIfAbsent(serviceUnit, k -> {
CompletableFuture<Optional<String>> future = new CompletableFuture<>();
CompletableFuture<String> future = new CompletableFuture<>();
future.orTimeout(inFlightStateWaitingTimeInMillis, TimeUnit.MILLISECONDS)
.whenComplete((v, e) -> {
if (e != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
Expand Down Expand Up @@ -172,16 +172,12 @@ public void initialize() {
}
}

public boolean isExtensibleLoadManager(){
return loadManager.get() instanceof ExtensibleLoadManagerWrapper;
}

public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic, LookupOptions options) {
long startTime = System.nanoTime();

CompletableFuture<Optional<LookupResult>> future = getBundleAsync(topic)
.thenCompose(bundle -> {
if (isExtensibleLoadManager()) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
return loadManager.get().findBrokerServiceUrl(Optional.of(topic), bundle);
} else {
return findBrokerServiceUrl(bundle, options);
Expand Down Expand Up @@ -274,7 +270,7 @@ private CompletableFuture<Optional<URL>> internalGetWebServiceUrl(Optional<Servi
NamespaceBundle bundle,
LookupOptions options) {

return (isExtensibleLoadManager()
return (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)
? loadManager.get().findBrokerServiceUrl(topic, bundle) :
findBrokerServiceUrl(bundle, options)).thenApply(lookupResult -> {
if (lookupResult.isPresent()) {
Expand Down Expand Up @@ -1040,7 +1036,7 @@ public CompletableFuture<Boolean> isServiceUnitOwnedAsync(ServiceUnitId suName)
}

if (suName instanceof NamespaceBundle) {
if (isExtensibleLoadManager()) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
return loadManager.get().checkOwnershipAsync(Optional.empty(), suName);
}
return CompletableFuture.completedFuture(
Expand All @@ -1065,7 +1061,7 @@ public boolean isServiceUnitActive(TopicName topicName) {
}

public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName topicName) {
if (isExtensibleLoadManager()) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
return getBundleAsync(topicName)
.thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle));
}
Expand All @@ -1082,7 +1078,7 @@ private boolean isNamespaceOwned(NamespaceName fqnn) throws Exception {
}

private CompletableFuture<Boolean> isNamespaceOwnedAsync(NamespaceName fqnn) {
if (isExtensibleLoadManager()) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
return getFullBundleAsync(fqnn)
.thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.empty(), bundle));
}
Expand All @@ -1091,15 +1087,15 @@ private CompletableFuture<Boolean> isNamespaceOwnedAsync(NamespaceName fqnn) {
}

private CompletableFuture<Boolean> isTopicOwnedAsync(TopicName topic) {
if (isExtensibleLoadManager()) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
return getBundleAsync(topic)
.thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topic), bundle));
}
return getBundleAsync(topic).thenApply(bundle -> ownershipCache.isNamespaceBundleOwned(bundle));
}

public CompletableFuture<Boolean> checkTopicOwnership(TopicName topicName) {
if (isExtensibleLoadManager()) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
return getBundleAsync(topicName)
.thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle));
}
Expand Down
Loading

0 comments on commit 3d54c15

Please sign in to comment.