diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 0164b51d2c2b0..ae0fb1a9f283c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -87,6 +87,7 @@ import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask; import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask; import org.apache.pulsar.broker.loadbalance.LoadSheddingTask; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.lookup.v1.TopicLookup; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.protocol.ProtocolHandlers; @@ -818,6 +819,17 @@ public void start() throws PulsarServerException { this.webSocketService.setLocalCluster(clusterData); } + // Start the leader election service + startLeaderElectionService(); + + // By starting the Load manager service, the broker will also become visible + // to the rest of the broker by creating the registration z-node. This needs + // to be done only when the broker is fully operative. + // + // The load manager service and its service unit state channel need to be initialized first + // (namespace service depends on load manager) + this.startLoadManagementService(); + // Initialize namespace service, after service url assigned. Should init zk and refresh self owner info. this.nsService.initialize(); @@ -828,9 +840,6 @@ public void start() throws PulsarServerException { this.topicPoliciesService.start(); - // Start the leader election service - startLeaderElectionService(); - // Register heartbeat and bootstrap namespaces. this.nsService.registerBootstrapNamespaces(); @@ -859,11 +868,6 @@ public void start() throws PulsarServerException { this.metricsGenerator = new MetricsGenerator(this); - // By starting the Load manager service, the broker will also become visible - // to the rest of the broker by creating the registration z-node. This needs - // to be done only when the broker is fully operative. - this.startLoadManagementService(); - // Initialize the message protocol handlers. // start the protocol handlers only after the broker is ready, // so that the protocol handlers can access broker service properly. @@ -1103,6 +1107,10 @@ protected void closeLocalMetadataStore() throws Exception { } protected void startLeaderElectionService() { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { + LOG.info("The load manager extension is enabled. Skipping PulsarService LeaderElectionService."); + return; + } this.leaderElectionService = new LeaderElectionService(coordinationService, getSafeWebServiceAddress(), state -> { if (state == LeaderElectionState.Leading) { @@ -1207,7 +1215,7 @@ protected void startLoadManagementService() throws PulsarServerException { LOG.info("Starting load management service ..."); this.loadManager.get().start(); - if (config.isLoadBalancerEnabled()) { + if (config.isLoadBalancerEnabled() && !ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { LOG.info("Starting load balancer"); if (this.loadReportTask == null) { long loadReportMinInterval = config.getLoadBalancerReportUpdateMinIntervalMillis(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java index b4df5d31968dd..17bff57b85c42 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java @@ -25,8 +25,12 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManager; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper; import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper; import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; +import org.apache.pulsar.broker.lookup.LookupResult; import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.Reflections; @@ -58,6 +62,15 @@ public interface LoadManager { */ Optional getLeastLoaded(ServiceUnitId su) throws Exception; + default CompletableFuture> findBrokerServiceUrl( + Optional topic, ServiceUnitId bundle) { + throw new UnsupportedOperationException(); + } + + default CompletableFuture checkOwnershipAsync(Optional topic, ServiceUnitId bundle) { + throw new UnsupportedOperationException(); + } + /** * Generate the load report. */ @@ -145,6 +158,11 @@ static LoadManager create(final PulsarService pulsar) { final LoadManager casted = new ModularLoadManagerWrapper((ModularLoadManager) loadManagerInstance); casted.initialize(pulsar); return casted; + } else if (loadManagerInstance instanceof ExtensibleLoadManager) { + final LoadManager casted = + new ExtensibleLoadManagerWrapper((ExtensibleLoadManagerImpl) loadManagerInstance); + casted.initialize(pulsar); + return casted; } } catch (Exception e) { LOG.warn("Error when trying to create load manager: ", e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java index bb66bf731f417..b7da70d1cf1de 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java @@ -64,6 +64,15 @@ public interface ExtensibleLoadManager extends Closeable { */ CompletableFuture> assign(Optional topic, ServiceUnitId serviceUnit); + /** + * Check the incoming service unit is owned by the current broker. + * + * @param topic The optional topic, some method won't provide topic var in this param. + * @param serviceUnit The service unit (e.g. bundle). + * @return The broker lookup data. + */ + CompletableFuture checkOwnershipAsync(Optional topic, ServiceUnitId serviceUnit); + /** * Close the load manager. * diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java new file mode 100644 index 0000000000000..d95bacd157e7f --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.BrokerFilterException; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; +import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData; +import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; +import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData; +import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter; +import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore; +import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException; +import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory; +import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy; +import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.ServiceUnitId; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; + +@Slf4j +public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { + + public static final String BROKER_LOAD_DATA_STORE_TOPIC = TopicName.get( + TopicDomain.non_persistent.value(), + NamespaceName.SYSTEM_NAMESPACE, + "loadbalancer-broker-load-data").toString(); + + public static final String TOP_BUNDLES_LOAD_DATA_STORE_TOPIC = TopicName.get( + TopicDomain.non_persistent.value(), + NamespaceName.SYSTEM_NAMESPACE, + "loadbalancer-top-bundles-load-data").toString(); + + private PulsarService pulsar; + + private ServiceConfiguration conf; + + @Getter + private BrokerRegistry brokerRegistry; + + private ServiceUnitStateChannel serviceUnitStateChannel; + + private LoadDataStore brokerLoadDataStore; + private LoadDataStore topBundlesLoadDataStore; + + @Getter + private LoadManagerContext context; + + @Getter + private final BrokerSelectionStrategy brokerSelectionStrategy; + + @Getter + private final List brokerFilterPipeline; + + private boolean started = false; + + private final ConcurrentOpenHashMap>> + lookupRequests = ConcurrentOpenHashMap.>>newBuilder() + .build(); + + /** + * Life cycle: Constructor -> initialize -> start -> close. + */ + public ExtensibleLoadManagerImpl() { + this.brokerFilterPipeline = new ArrayList<>(); + // TODO: Make brokerSelectionStrategy configurable. + this.brokerSelectionStrategy = new LeastResourceUsageWithWeight(); + } + + public static boolean isLoadManagerExtensionEnabled(ServiceConfiguration conf) { + return ExtensibleLoadManagerImpl.class.getName().equals(conf.getLoadManagerClassName()); + } + + @Override + public void start() throws PulsarServerException { + if (this.started) { + return; + } + this.brokerRegistry = new BrokerRegistryImpl(pulsar); + this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar); + this.brokerRegistry.start(); + this.serviceUnitStateChannel.start(); + + try { + this.brokerLoadDataStore = LoadDataStoreFactory + .create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class); + this.topBundlesLoadDataStore = LoadDataStoreFactory + .create(pulsar.getClient(), TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class); + } catch (LoadDataStoreException e) { + throw new PulsarServerException(e); + } + + this.context = LoadManagerContextImpl.builder() + .configuration(conf) + .brokerRegistry(brokerRegistry) + .brokerLoadDataStore(brokerLoadDataStore) + .topBundleLoadDataStore(topBundlesLoadDataStore).build(); + // TODO: Start load data reporter. + + // TODO: Start unload scheduler and bundle split scheduler + this.started = true; + } + + @Override + public void initialize(PulsarService pulsar) { + this.pulsar = pulsar; + this.conf = pulsar.getConfiguration(); + } + + @Override + public CompletableFuture> assign(Optional topic, + ServiceUnitId serviceUnit) { + + final String bundle = serviceUnit.toString(); + + CompletableFuture> future = lookupRequests.computeIfAbsent(bundle, k -> { + final CompletableFuture> owner; + // Assign the bundle to channel owner if is internal topic, to avoid circular references. + if (topic.isPresent() && isInternalTopic(topic.get().toString())) { + owner = serviceUnitStateChannel.getChannelOwnerAsync(); + } else { + owner = serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> { + // If the bundle not assign yet, select and publish assign event to channel. + if (broker.isEmpty()) { + return this.selectAsync(serviceUnit).thenCompose(brokerOpt -> { + if (brokerOpt.isPresent()) { + log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle); + return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get()) + .thenApply(Optional::of); + } else { + throw new IllegalStateException( + "Failed to select the new owner broker for bundle: " + bundle); + } + }); + } + // Already assigned, return it. + return CompletableFuture.completedFuture(broker); + }); + } + + return owner.thenCompose(broker -> { + if (broker.isEmpty()) { + String errorMsg = String.format( + "Failed to look up a broker registry:%s for bundle:%s", broker, bundle); + log.error(errorMsg); + throw new IllegalStateException(errorMsg); + } + return CompletableFuture.completedFuture(broker.get()); + }).thenCompose(broker -> this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> { + if (brokerLookupData.isEmpty()) { + String errorMsg = String.format( + "Failed to look up a broker registry:%s for bundle:%s", broker, bundle); + log.error(errorMsg); + throw new IllegalStateException(errorMsg); + } + return CompletableFuture.completedFuture(brokerLookupData); + })); + }); + future.whenComplete((r, t) -> lookupRequests.remove(bundle)); + return future; + } + + private CompletableFuture> selectAsync(ServiceUnitId bundle) { + BrokerRegistry brokerRegistry = getBrokerRegistry(); + return brokerRegistry.getAvailableBrokerLookupDataAsync() + .thenCompose(availableBrokers -> { + // TODO: Support isolation policies + LoadManagerContext context = this.getContext(); + + Map availableBrokerCandidates = new HashMap<>(availableBrokers); + + // Filter out brokers that do not meet the rules. + List filterPipeline = getBrokerFilterPipeline(); + for (final BrokerFilter filter : filterPipeline) { + try { + filter.filter(availableBrokerCandidates, context); + } catch (BrokerFilterException e) { + // TODO: We may need to revisit this error case. + log.error("Failed to filter out brokers.", e); + availableBrokerCandidates = availableBrokers; + } + } + if (availableBrokerCandidates.isEmpty()) { + return CompletableFuture.completedFuture(Optional.empty()); + } + Set candidateBrokers = availableBrokerCandidates.keySet(); + + return CompletableFuture.completedFuture( + getBrokerSelectionStrategy().select(candidateBrokers, bundle, context)); + }); + } + + @Override + public CompletableFuture checkOwnershipAsync(Optional topic, ServiceUnitId bundleUnit) { + final String bundle = bundleUnit.toString(); + CompletableFuture> owner; + if (topic.isPresent() && isInternalTopic(topic.get().toString())) { + owner = serviceUnitStateChannel.getChannelOwnerAsync(); + } else { + owner = serviceUnitStateChannel.getOwnerAsync(bundle); + } + + return owner.thenApply(broker -> brokerRegistry.getBrokerId().equals(broker.orElse(null))); + } + + @Override + public void close() throws PulsarServerException { + if (!this.started) { + return; + } + try { + this.brokerLoadDataStore.close(); + this.topBundlesLoadDataStore.close(); + } catch (IOException ex) { + throw new PulsarServerException(ex); + } finally { + try { + this.brokerRegistry.close(); + } finally { + try { + this.serviceUnitStateChannel.close(); + } finally { + this.started = false; + } + } + } + } + + private boolean isInternalTopic(String topic) { + return topic.startsWith(ServiceUnitStateChannelImpl.TOPIC) + || topic.startsWith(BROKER_LOAD_DATA_STORE_TOPIC) + || topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java new file mode 100644 index 0000000000000..6d5797eed6663 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.loadbalance.LoadManager; +import org.apache.pulsar.broker.loadbalance.ResourceUnit; +import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; +import org.apache.pulsar.broker.lookup.LookupResult; +import org.apache.pulsar.common.naming.ServiceUnitId; +import org.apache.pulsar.common.stats.Metrics; +import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport; + +public class ExtensibleLoadManagerWrapper implements LoadManager { + + private PulsarService pulsar; + + private final ExtensibleLoadManagerImpl loadManager; + + public ExtensibleLoadManagerWrapper(ExtensibleLoadManagerImpl loadManager) { + this.loadManager = loadManager; + } + + @Override + public void start() throws PulsarServerException { + loadManager.start(); + } + + @Override + public void initialize(PulsarService pulsar) { + loadManager.initialize(pulsar); + this.pulsar = pulsar; + } + + @Override + public boolean isCentralized() { + return true; + } + + @Override + public CompletableFuture> findBrokerServiceUrl( + Optional topic, ServiceUnitId bundle) { + return loadManager.assign(topic, bundle) + .thenApply(lookupData -> lookupData.map(BrokerLookupData::toLookupResult)); + } + + @Override + public CompletableFuture checkOwnershipAsync(Optional topic, ServiceUnitId bundle) { + return loadManager.checkOwnershipAsync(topic, bundle); + } + + @Override + public void disableBroker() throws Exception { + this.loadManager.getBrokerRegistry().unregister(); + } + + @Override + public Set getAvailableBrokers() throws Exception { + return getAvailableBrokersAsync() + .get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); + } + + @Override + public CompletableFuture> getAvailableBrokersAsync() { + return this.loadManager.getBrokerRegistry().getAvailableBrokersAsync().thenApply(HashSet::new); + } + + @Override + public String setNamespaceBundleAffinity(String bundle, String broker) { + // TODO: Add namespace bundle affinity support. + return null; + } + + @Override + public void stop() throws PulsarServerException { + this.loadManager.close(); + } + + + @Override + public Optional getLeastLoaded(ServiceUnitId su) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public LoadManagerReport generateLoadReport() { + throw new UnsupportedOperationException(); + } + + @Override + public void setLoadReportForceUpdateFlag() { + throw new UnsupportedOperationException(); + } + + @Override + public void writeLoadReportOnZookeeper() throws Exception { + // No-op, this operation is not useful, the load data reporter will automatically write. + throw new UnsupportedOperationException(); + } + + @Override + public void writeResourceQuotasToZooKeeper() throws Exception { + // No-op, this operation is not useful, the load data reporter will automatically write. + throw new UnsupportedOperationException(); + } + + @Override + public List getLoadBalancingMetrics() { + // TODO: Add metrics. + return null; + } + + @Override + public void doLoadShedding() { + throw new UnsupportedOperationException(); + } + + @Override + public void doNamespaceBundleSplit() { + throw new UnsupportedOperationException(); + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerContextImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerContextImpl.java new file mode 100644 index 0000000000000..5f78b88e22c6e --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerContextImpl.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import lombok.Builder; +import lombok.Setter; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData; +import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData; +import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore; + +@Setter +@Builder +public class LoadManagerContextImpl implements LoadManagerContext { + + private LoadDataStore brokerLoadDataStore; + + private LoadDataStore topBundleLoadDataStore; + + private BrokerRegistry brokerRegistry; + + private ServiceConfiguration configuration; + + + @Override + public ServiceConfiguration brokerConfiguration() { + return this.configuration; + } + + @Override + public LoadDataStore brokerLoadDataStore() { + return this.brokerLoadDataStore; + } + + @Override + public LoadDataStore topBundleLoadDataStore() { + return this.topBundleLoadDataStore; + } + + @Override + public BrokerRegistry brokerRegistry() { + return this.brokerRegistry; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java index 238c433e1776f..fece425e75fd9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java @@ -116,9 +116,9 @@ public interface ServiceUnitStateChannel extends Closeable { * the future object will complete and return the owner broker. * Sub-case2: If the assigned broker does not take the ownership in time, * the future object will time out. - * Case 3: If none of them, it returns null. + * Case 3: If none of them, it returns Optional.empty(). */ - CompletableFuture getOwnerAsync(String serviceUnit); + CompletableFuture> getOwnerAsync(String serviceUnit); /** * Asynchronously publishes the service unit assignment event to the system topic in this channel. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 38e8afa50f302..37dfe6090bbcf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -63,6 +63,7 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.coordination.LeaderElectionState; @@ -276,21 +277,23 @@ private boolean isChannelOwner() { } } - public CompletableFuture getOwnerAsync(String serviceUnit) { + public CompletableFuture> getOwnerAsync(String serviceUnit) { validateChannelState(Started, true); ServiceUnitStateData data = tableview.get(serviceUnit); if (data == null) { - return CompletableFuture.completedFuture(null); + return CompletableFuture.completedFuture(Optional.empty()); } switch (data.state()) { case Owned, Splitting -> { - return CompletableFuture.completedFuture(data.broker()); + return CompletableFuture.completedFuture(Optional.of(data.broker())); } case Assigned, Released -> { - return deferGetOwnerRequest(serviceUnit); + return deferGetOwnerRequest(serviceUnit).thenApply(Optional::of); } default -> { - return null; + String errorMsg = String.format("Failed to process service unit state data: %s when get owner.", data); + log.error(errorMsg); + return FutureUtil.failedFuture(new IllegalStateException(errorMsg)); } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java index 4adc6aa1ce46d..0a76446d3ce6f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java @@ -18,9 +18,10 @@ */ package org.apache.pulsar.broker.loadbalance.extensions.filter; -import java.util.List; +import java.util.Map; import org.apache.pulsar.broker.loadbalance.BrokerFilterException; import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; +import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; /** * Filter out unqualified Brokers, which are not entered into LoadBalancer for decision-making. @@ -39,6 +40,7 @@ public interface BrokerFilter { * @param context The load manager context. * @return Filtered broker list. */ - List filter(List brokers, LoadManagerContext context) throws BrokerFilterException; + Map filter(Map brokers, LoadManagerContext context) + throws BrokerFilterException; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/BrokerSelectionStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/BrokerSelectionStrategy.java index 3b62777f1b626..e0a9122383c22 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/BrokerSelectionStrategy.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/BrokerSelectionStrategy.java @@ -18,8 +18,8 @@ */ package org.apache.pulsar.broker.loadbalance.extensions.strategy; -import java.util.List; import java.util.Optional; +import java.util.Set; import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; import org.apache.pulsar.common.naming.ServiceUnitId; @@ -38,6 +38,6 @@ public interface BrokerSelectionStrategy { * @param context * The context contains information needed for selection (load data, config, and etc). */ - Optional select(List brokers, ServiceUnitId bundle, LoadManagerContext context); + Optional select(Set brokers, ServiceUnitId bundle, LoadManagerContext context); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java index f48bab54f895a..678927dac9293 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java @@ -20,7 +20,6 @@ import java.util.ArrayList; import java.util.HashSet; -import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; @@ -79,7 +78,8 @@ private double getMaxResourceUsageWithWeight(final String broker, final BrokerLo * @return The name of the selected broker as it appears on ZooKeeper. */ @Override - public Optional select(List candidates, ServiceUnitId bundleToAssign, LoadManagerContext context) { + public Optional select( + Set candidates, ServiceUnitId bundleToAssign, LoadManagerContext context) { var conf = context.brokerConfiguration(); if (candidates.isEmpty()) { log.info("There are no available brokers as candidates at this point for bundle: {}", bundleToAssign); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/package-info.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/package-info.java index d2349768352e3..846b528045a13 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/package-info.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/package-info.java @@ -16,4 +16,4 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.loadbalance.extensions.strategy; \ No newline at end of file +package org.apache.pulsar.broker.loadbalance.extensions.strategy; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 84bce75bf5a73..abbabcd3b00a1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -55,6 +55,7 @@ import org.apache.pulsar.broker.loadbalance.LeaderElectionService; import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.loadbalance.ResourceUnit; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.lookup.LookupResult; import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; @@ -175,7 +176,14 @@ public CompletableFuture> getBrokerServiceUrlAsync(TopicN long startTime = System.nanoTime(); CompletableFuture> future = getBundleAsync(topic) - .thenCompose(bundle -> findBrokerServiceUrl(bundle, options)); + .thenCompose(bundle -> { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { + return loadManager.get().findBrokerServiceUrl(Optional.of(topic), bundle); + } else { + // TODO: Add unit tests cover it. + return findBrokerServiceUrl(bundle, options); + } + }); future.thenAccept(optResult -> { lookupLatency.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS); @@ -232,16 +240,18 @@ public CompletableFuture> getWebServiceUrlAsync(ServiceUnitId suNa LOG.debug("Getting web service URL of topic: {} - options: {}", name, options); } return getBundleAsync(name) - .thenCompose(namespaceBundle -> internalGetWebServiceUrl(namespaceBundle, options)); + .thenCompose(namespaceBundle -> + internalGetWebServiceUrl(Optional.of(name), namespaceBundle, options)); } if (suName instanceof NamespaceName) { return getFullBundleAsync((NamespaceName) suName) - .thenCompose(namespaceBundle -> internalGetWebServiceUrl(namespaceBundle, options)); + .thenCompose(namespaceBundle -> + internalGetWebServiceUrl(Optional.empty(), namespaceBundle, options)); } if (suName instanceof NamespaceBundle) { - return internalGetWebServiceUrl((NamespaceBundle) suName, options); + return internalGetWebServiceUrl(Optional.empty(), (NamespaceBundle) suName, options); } throw new IllegalArgumentException("Unrecognized class of NamespaceBundle: " + suName.getClass().getName()); @@ -257,9 +267,14 @@ public Optional getWebServiceUrl(ServiceUnitId suName, LookupOptions option .get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS); } - private CompletableFuture> internalGetWebServiceUrl(NamespaceBundle bundle, LookupOptions options) { + private CompletableFuture> internalGetWebServiceUrl(Optional topic, + NamespaceBundle bundle, + LookupOptions options) { - return findBrokerServiceUrl(bundle, options).thenApply(lookupResult -> { + return (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config) + ? loadManager.get().findBrokerServiceUrl(topic, bundle) : + // TODO: Add unit tests cover it. + findBrokerServiceUrl(bundle, options)).thenApply(lookupResult -> { if (lookupResult.isPresent()) { try { LookupData lookupData = lookupResult.get().getLookupData(); @@ -1024,6 +1039,10 @@ public CompletableFuture isServiceUnitOwnedAsync(ServiceUnitId suName) } if (suName instanceof NamespaceBundle) { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { + return loadManager.get().checkOwnershipAsync(Optional.empty(), suName); + } + // TODO: Add unit tests cover it. return CompletableFuture.completedFuture( ownershipCache.isNamespaceBundleOwned((NamespaceBundle) suName)); } @@ -1046,6 +1065,11 @@ public boolean isServiceUnitActive(TopicName topicName) { } public CompletableFuture isServiceUnitActiveAsync(TopicName topicName) { + // TODO: Add unit tests cover it. + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { + return getBundleAsync(topicName) + .thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle)); + } Optional> res = ownershipCache.getOwnedBundleAsync(getBundle(topicName)); if (!res.isPresent()) { return CompletableFuture.completedFuture(false); @@ -1059,15 +1083,30 @@ private boolean isNamespaceOwned(NamespaceName fqnn) throws Exception { } private CompletableFuture isNamespaceOwnedAsync(NamespaceName fqnn) { + // TODO: Add unit tests cover it. + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { + return getFullBundleAsync(fqnn) + .thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.empty(), bundle)); + } return getFullBundleAsync(fqnn) .thenApply(bundle -> ownershipCache.getOwnedBundle(bundle) != null); } private CompletableFuture isTopicOwnedAsync(TopicName topic) { + // TODO: Add unit tests cover it. + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { + return getBundleAsync(topic) + .thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topic), bundle)); + } return getBundleAsync(topic).thenApply(bundle -> ownershipCache.isNamespaceBundleOwned(bundle)); } public CompletableFuture checkTopicOwnership(TopicName topicName) { + // TODO: Add unit tests cover it. + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { + return getBundleAsync(topicName) + .thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle)); + } return getBundleAsync(topicName) .thenCompose(ownershipCache::checkOwnershipAsync); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java new file mode 100644 index 0000000000000..d650567be8b3d --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import com.google.common.collect.Sets; +import lombok.extern.slf4j.Slf4j; +import java.net.URL; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentMap; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.loadbalance.BrokerFilterException; +import org.apache.pulsar.broker.loadbalance.LeaderBroker; +import org.apache.pulsar.broker.loadbalance.LeaderElectionService; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; +import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; +import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter; +import org.apache.pulsar.broker.lookup.LookupResult; +import org.apache.pulsar.broker.namespace.LookupOptions; +import org.apache.pulsar.broker.resources.NamespaceResources; +import org.apache.pulsar.broker.resources.PulsarResources; +import org.apache.pulsar.broker.resources.TenantResources; +import org.apache.pulsar.broker.testcontext.PulsarTestContext; +import org.apache.pulsar.client.impl.TableViewImpl; +import org.apache.pulsar.common.naming.NamespaceBundle; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Unit test for {@link ExtensibleLoadManagerImpl}. + */ +@Slf4j +@Test(groups = "broker") +public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest { + + private PulsarService pulsar1; + private PulsarService pulsar2; + + private PulsarTestContext additionalPulsarTestContext; + + private PulsarResources resources; + + private ExtensibleLoadManagerImpl primaryLoadManager; + + private ExtensibleLoadManagerImpl secondaryLoadManager; + + private ServiceUnitStateChannelImpl channel1; + private ServiceUnitStateChannelImpl channel2; + + @BeforeClass + public void setup() throws Exception { + conf.setAllowAutoTopicCreation(true); + conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); + super.internalSetup(conf); + pulsar1 = pulsar; + ServiceConfiguration defaultConf = getDefaultConf(); + defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); + additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf); + pulsar2 = additionalPulsarTestContext.getPulsarService(); + + ExtensibleLoadManagerWrapper primaryLoadManagerWrapper = + (ExtensibleLoadManagerWrapper) pulsar1.getLoadManager().get(); + primaryLoadManager = spy((ExtensibleLoadManagerImpl) + FieldUtils.readField(primaryLoadManagerWrapper, "loadManager", true)); + FieldUtils.writeField(primaryLoadManagerWrapper, "loadManager", primaryLoadManager, true); + + ExtensibleLoadManagerWrapper secondaryLoadManagerWrapper = + (ExtensibleLoadManagerWrapper) pulsar2.getLoadManager().get(); + secondaryLoadManager = spy((ExtensibleLoadManagerImpl) + FieldUtils.readField(secondaryLoadManagerWrapper, "loadManager", true)); + FieldUtils.writeField(secondaryLoadManagerWrapper, "loadManager", secondaryLoadManager, true); + + channel1 = (ServiceUnitStateChannelImpl) + FieldUtils.readField(primaryLoadManager, "serviceUnitStateChannel", true); + channel2 = (ServiceUnitStateChannelImpl) + FieldUtils.readField(secondaryLoadManager, "serviceUnitStateChannel", true); + + } + + protected void beforePulsarStart(PulsarService pulsar) throws Exception { + if (resources == null) { + MetadataStoreExtended localStore = pulsar.createLocalMetadataStore(null); + MetadataStoreExtended configStore = (MetadataStoreExtended) pulsar.createConfigurationMetadataStore(null); + resources = new PulsarResources(localStore, configStore); + } + this.createNamespaceIfNotExists(resources, NamespaceName.SYSTEM_NAMESPACE.getTenant(), + NamespaceName.SYSTEM_NAMESPACE); + } + + protected void createNamespaceIfNotExists(PulsarResources resources, + String publicTenant, + NamespaceName ns) throws Exception { + TenantResources tr = resources.getTenantResources(); + NamespaceResources nsr = resources.getNamespaceResources(); + + if (!tr.tenantExists(publicTenant)) { + tr.createTenant(publicTenant, + TenantInfo.builder() + .adminRoles(Sets.newHashSet(conf.getSuperUserRoles())) + .allowedClusters(Sets.newHashSet(conf.getClusterName())) + .build()); + } + + if (!nsr.namespaceExists(ns)) { + Policies nsp = new Policies(); + nsp.replication_clusters = Collections.singleton(conf.getClusterName()); + nsr.createPolicies(ns, nsp); + } + } + + @Override + protected void cleanup() throws Exception { + pulsar1 = null; + pulsar2.close(); + super.internalCleanup(); + this.additionalPulsarTestContext.close(); + } + + @BeforeMethod + protected void initializeState() throws IllegalAccessException { + reset(primaryLoadManager, secondaryLoadManager); + cleanTableView(channel1); + cleanTableView(channel2); + } + + @Test + public void testAssignInternalTopic() throws Exception { + Optional brokerLookupData1 = primaryLoadManager.assign( + Optional.of(TopicName.get(ServiceUnitStateChannelImpl.TOPIC)), + getBundleAsync(pulsar1, TopicName.get(ServiceUnitStateChannelImpl.TOPIC)).get()).get(); + Optional brokerLookupData2 = secondaryLoadManager.assign( + Optional.of(TopicName.get(ServiceUnitStateChannelImpl.TOPIC)), + getBundleAsync(pulsar1, TopicName.get(ServiceUnitStateChannelImpl.TOPIC)).get()).get(); + assertEquals(brokerLookupData1, brokerLookupData2); + assertTrue(brokerLookupData1.isPresent()); + + LeaderElectionService leaderElectionService = (LeaderElectionService) + FieldUtils.readField(channel1, "leaderElectionService", true); + Optional currentLeader = leaderElectionService.getCurrentLeader(); + assertTrue(currentLeader.isPresent()); + assertEquals(brokerLookupData1.get().getWebServiceUrl(), currentLeader.get().getServiceUrl()); + } + + @Test + public void testAssign() throws Exception { + TopicName topicName = TopicName.get("test-assign"); + NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get(); + Optional brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle).get(); + assertTrue(brokerLookupData.isPresent()); + log.info("Assign the bundle {} to {}", bundle, brokerLookupData); + // Should get owner info from channel. + Optional brokerLookupData1 = secondaryLoadManager.assign(Optional.empty(), bundle).get(); + assertEquals(brokerLookupData, brokerLookupData1); + + verify(primaryLoadManager, times(1)).getBrokerSelectionStrategy(); + verify(secondaryLoadManager, times(0)).getBrokerSelectionStrategy(); + + Optional lookupResult = pulsar2.getNamespaceService() + .getBrokerServiceUrlAsync(topicName, null).get(); + assertTrue(lookupResult.isPresent()); + assertEquals(lookupResult.get().getLookupData().getHttpUrl(), brokerLookupData.get().getWebServiceUrl()); + + Optional webServiceUrl = pulsar2.getNamespaceService() + .getWebServiceUrl(bundle, LookupOptions.builder().requestHttps(false).build()); + assertTrue(webServiceUrl.isPresent()); + assertEquals(webServiceUrl.get().toString(), brokerLookupData.get().getWebServiceUrl()); + } + + @Test + public void testCheckOwnershipAsync() throws Exception { + NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get("test-check-ownership")).get(); + // 1. The bundle is never assigned. + assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + + // 2. Assign the bundle to a broker. + Optional lookupData = primaryLoadManager.assign(Optional.empty(), bundle).get(); + assertTrue(lookupData.isPresent()); + if (lookupData.get().getPulsarServiceUrl().equals(pulsar1.getBrokerServiceUrl())) { + assertTrue(primaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + } else { + assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + assertTrue(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + } + + } + + @Test + public void testFilter() throws Exception { + TopicName topicName = TopicName.get("test-filter"); + NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get(); + + doReturn(List.of(new BrokerFilter() { + @Override + public String name() { + return "Mock broker filter"; + } + + @Override + public Map filter(Map brokers, + LoadManagerContext context) { + brokers.remove(pulsar1.getLookupServiceAddress()); + return brokers; + } + })).when(primaryLoadManager).getBrokerFilterPipeline(); + + Optional brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle).get(); + assertTrue(brokerLookupData.isPresent()); + assertEquals(brokerLookupData.get().getWebServiceUrl(), pulsar2.getWebServiceAddress()); + } + + @Test + public void testFilterHasException() throws Exception { + TopicName topicName = TopicName.get("test-filter-has-exception"); + NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get(); + + doReturn(List.of(new BrokerFilter() { + @Override + public String name() { + return "Mock broker filter"; + } + + @Override + public Map filter(Map brokers, + LoadManagerContext context) throws BrokerFilterException { + brokers.clear(); + throw new BrokerFilterException("Test"); + } + })).when(primaryLoadManager).getBrokerFilterPipeline(); + + Optional brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle).get(); + assertTrue(brokerLookupData.isPresent()); + } + + private static void cleanTableView(ServiceUnitStateChannel channel) + throws IllegalAccessException { + var tv = (TableViewImpl) + FieldUtils.readField(channel, "tableview", true); + var cache = (ConcurrentMap) + FieldUtils.readField(tv, "data", true); + cache.clear(); + } + + private CompletableFuture getBundleAsync(PulsarService pulsar, TopicName topic) { + return pulsar.getNamespaceService().getBundleAsync(topic); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index bc85403b7cd9b..1c0a4f376338b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -29,7 +29,6 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -160,7 +159,7 @@ public void channelOwnerTest() throws Exception { assertEquals(newChannelOwner1, newChannelOwner2); assertNotEquals(channelOwner1, newChannelOwner1); - if (newChannelOwner1.equals(lookupServiceAddress1)) { + if (newChannelOwner1.equals(Optional.of(lookupServiceAddress1))) { assertTrue(channel1.isChannelOwnerAsync().get(2, TimeUnit.SECONDS)); assertFalse(channel2.isChannelOwnerAsync().get(2, TimeUnit.SECONDS)); } else { @@ -282,8 +281,8 @@ public void assignmentTest() var owner1 = channel1.getOwnerAsync(bundle); var owner2 = channel2.getOwnerAsync(bundle); - assertNull(owner1.get()); - assertNull(owner2.get()); + assertTrue(owner1.get().isEmpty()); + assertTrue(owner2.get().isEmpty()); var assigned1 = channel1.publishAssignEventAsync(bundle, lookupServiceAddress1); var assigned2 = channel2.publishAssignEventAsync(bundle, lookupServiceAddress2); @@ -329,15 +328,15 @@ public void assignmentTestWhenOneAssignmentFails() var owner1 = channel1.getOwnerAsync(bundle); var owner2 = channel2.getOwnerAsync(bundle); - assertNull(owner1.get()); - assertNull(owner2.get()); + assertTrue(owner1.get().isEmpty()); + assertTrue(owner2.get().isEmpty()); - owner1 = channel1.publishAssignEventAsync(bundle, lookupServiceAddress1); - owner2 = channel2.publishAssignEventAsync(bundle, lookupServiceAddress2); - assertTrue(owner1.isCompletedExceptionally()); - assertNotNull(owner2); - String ownerAddr2 = owner2.get(5, TimeUnit.SECONDS); - assertEquals(ownerAddr2, lookupServiceAddress2); + var owner3 = channel1.publishAssignEventAsync(bundle, lookupServiceAddress1); + var owner4 = channel2.publishAssignEventAsync(bundle, lookupServiceAddress2); + assertTrue(owner3.isCompletedExceptionally()); + assertNotNull(owner4); + String ownerAddrOpt2 = owner4.get(5, TimeUnit.SECONDS); + assertEquals(ownerAddrOpt2, lookupServiceAddress2); waitUntilNewOwner(channel1, bundle, lookupServiceAddress2); assertEquals(0, getOwnerRequests1.size()); assertEquals(0, getOwnerRequests2.size()); @@ -352,8 +351,8 @@ public void unloadTest() var owner1 = channel1.getOwnerAsync(bundle); var owner2 = channel2.getOwnerAsync(bundle); - assertNull(owner1.get()); - assertNull(owner2.get()); + assertTrue(owner1.get().isEmpty()); + assertTrue(owner2.get().isEmpty()); channel1.publishAssignEventAsync(bundle, lookupServiceAddress1); @@ -363,7 +362,7 @@ public void unloadTest() var ownerAddr2 = channel2.getOwnerAsync(bundle).get(); assertEquals(ownerAddr1, ownerAddr2); - assertEquals(ownerAddr1, lookupServiceAddress1); + assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1)); Unload unload = new Unload(lookupServiceAddress1, bundle, Optional.of(lookupServiceAddress2)); channel1.publishUnloadEventAsync(unload); @@ -374,7 +373,7 @@ public void unloadTest() ownerAddr1 = channel1.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS); ownerAddr2 = channel2.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS); assertEquals(ownerAddr1, ownerAddr2); - assertEquals(ownerAddr1, lookupServiceAddress2); + assertEquals(ownerAddr1, Optional.of(lookupServiceAddress2)); } @Test(priority = 5) @@ -393,7 +392,7 @@ public void unloadTestWhenDestBrokerFails() var ownerAddr2 = channel2.getOwnerAsync(bundle).get(); assertEquals(ownerAddr1, ownerAddr2); - assertEquals(ownerAddr1, lookupServiceAddress1); + assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1)); var producer = (Producer) FieldUtils.readDeclaredField(channel1, "producer", true); @@ -454,10 +453,11 @@ public void splitTest() throws Exception { waitUntilNewOwner(channel2, bundle, lookupServiceAddress1); var ownerAddr1 = channel1.getOwnerAsync(bundle).get(); var ownerAddr2 = channel2.getOwnerAsync(bundle).get(); - assertEquals(ownerAddr1, lookupServiceAddress1); - assertEquals(ownerAddr2, lookupServiceAddress1); + assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1)); + assertEquals(ownerAddr2, Optional.of(lookupServiceAddress1)); + assertTrue(ownerAddr1.isPresent()); - Split split = new Split(bundle, ownerAddr1, new HashMap<>()); + Split split = new Split(bundle, ownerAddr1.get(), new HashMap<>()); channel1.publishSplitEventAsync(split); waitUntilNewOwner(channel1, bundle, null); @@ -545,8 +545,8 @@ public void handleBrokerDeletionEventTest() var owner1 = channel1.getOwnerAsync(bundle1); var owner2 = channel2.getOwnerAsync(bundle2); - assertNull(owner1.get()); - assertNull(owner2.get()); + assertTrue(owner1.get().isEmpty()); + assertTrue(owner2.get().isEmpty()); String broker = lookupServiceAddress1; channel1.publishAssignEventAsync(bundle1, broker); @@ -701,8 +701,8 @@ public void conflictAndCompactionTest() throws ExecutionException, InterruptedEx producer.newMessage().key(bundle).send(); var owner1 = channel1.getOwnerAsync(bundle); var owner2 = channel2.getOwnerAsync(bundle); - assertNull(owner1.get()); - assertNull(owner2.get()); + assertTrue(owner1.get().isEmpty()); + assertTrue(owner2.get().isEmpty()); var assigned1 = channel1.publishAssignEventAsync(bundle, lookupServiceAddress1); assertNotNull(assigned1); @@ -724,8 +724,8 @@ public void conflictAndCompactionTest() throws ExecutionException, InterruptedEx } assertNotNull(ex); assertEquals(TimeoutException.class, ex.getCause().getClass()); - assertEquals(lookupServiceAddress1, channel2.getOwnerAsync(bundle).get()); - assertEquals(lookupServiceAddress1, channel1.getOwnerAsync(bundle).get()); + assertEquals(Optional.of(lookupServiceAddress1), channel2.getOwnerAsync(bundle).get()); + assertEquals(Optional.of(lookupServiceAddress1), channel1.getOwnerAsync(bundle).get()); var compactor = spy (pulsar1.getStrategicCompactor()); Field strategicCompactorField = FieldUtils.getDeclaredField(PulsarService.class, "strategicCompactor", true); @@ -743,7 +743,7 @@ public void conflictAndCompactionTest() throws ExecutionException, InterruptedEx .pollInterval(200, TimeUnit.MILLISECONDS) .atMost(5, TimeUnit.SECONDS) .untilAsserted(() -> assertEquals( - channel3.getOwnerAsync(bundle).get(), lookupServiceAddress1)); + channel3.getOwnerAsync(bundle).get(), Optional.of(lookupServiceAddress1))); channel3.close(); FieldUtils.writeDeclaredField(channel2, "inFlightStateWaitingTimeInMillis", 30 * 1000, true); @@ -787,10 +787,8 @@ private static void waitUntilNewChannelOwner(ServiceUnitStateChannel channel, St if (!owner.isDone()) { return false; } - if (oldOwner == null) { - return owner != null; - } - return !oldOwner.equals(owner); + + return !StringUtils.equals(oldOwner, owner.get().orElse(null)); }); } @@ -799,11 +797,11 @@ private static void waitUntilOwnerChanges(ServiceUnitStateChannel channel, Strin .pollInterval(200, TimeUnit.MILLISECONDS) .atMost(10, TimeUnit.SECONDS) .until(() -> { // wait until true - CompletableFuture owner = channel.getOwnerAsync(serviceUnit); + CompletableFuture> owner = channel.getOwnerAsync(serviceUnit); if (!owner.isDone()) { return false; } - return !StringUtils.equals(oldOwner, owner.get()); + return !StringUtils.equals(oldOwner, owner.get().orElse(null)); }); } @@ -813,11 +811,11 @@ private static void waitUntilNewOwner(ServiceUnitStateChannel channel, String se .atMost(15, TimeUnit.SECONDS) .until(() -> { // wait until true try { - CompletableFuture owner = channel.getOwnerAsync(serviceUnit); + CompletableFuture> owner = channel.getOwnerAsync(serviceUnit); if (!owner.isDone()) { return false; } - return StringUtils.equals(newOwner, owner.get()); + return StringUtils.equals(newOwner, owner.get().orElse(null)); } catch (Exception e) { return false; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java index ef0e65762f1ad..2856dde892a8f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java @@ -24,9 +24,8 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; +import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -78,7 +77,7 @@ public void testSelect() { LeastResourceUsageWithWeight strategy = new LeastResourceUsageWithWeight(); // Should choice broker from broker1 2 3. - List candidates = new ArrayList<>(); + Set candidates = new HashSet<>(); candidates.add("1"); candidates.add("2"); candidates.add("3"); @@ -125,7 +124,7 @@ public void testArithmeticException() LeastResourceUsageWithWeight strategy = new LeastResourceUsageWithWeight(); // Should choice broker from broker1 2 3. - List candidates = new ArrayList<>(); + Set candidates = new HashSet<>(); candidates.add("1"); candidates.add("2"); candidates.add("3"); @@ -141,7 +140,7 @@ public void testNoLoadDataBrokers() { LeastResourceUsageWithWeight strategy = new LeastResourceUsageWithWeight(); - List candidates = new ArrayList<>(); + Set candidates = new HashSet<>(); var brokerLoadDataStore = ctx.brokerLoadDataStore(); brokerLoadDataStore.pushAsync("1", createBrokerData(ctx,50, 100)); brokerLoadDataStore.pushAsync("2", createBrokerData(ctx,100, 100)); @@ -189,7 +188,7 @@ private void updateLoad(LoadManagerContext ctx, String broker, double usage) { 1, 1, 1, 1, ctx.brokerConfiguration()); } - public LoadManagerContext getContext() { + public static LoadManagerContext getContext() { var ctx = mock(LoadManagerContext.class); var conf = new ServiceConfiguration(); conf.setLoadBalancerCPUResourceWeight(1.0);