diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 3a76842289d979..520a76e32e9892 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -22,8 +22,10 @@ import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER; +import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE; import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.SocketChannel; @@ -87,6 +89,7 @@ import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask; import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask; import org.apache.pulsar.broker.loadbalance.LoadSheddingTask; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper; import org.apache.pulsar.broker.lookup.v1.TopicLookup; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.protocol.ProtocolHandlers; @@ -94,7 +97,9 @@ import org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager; import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager; import org.apache.pulsar.broker.resources.ClusterResources; +import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.resources.PulsarResources; +import org.apache.pulsar.broker.resources.TenantResources; import org.apache.pulsar.broker.rest.Topics; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer; @@ -137,8 +142,11 @@ import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; +import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.protocol.schema.SchemaStorage; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown; @@ -794,6 +802,12 @@ public void start() throws PulsarServerException { } brokerService.start(); + if (this.loadManager.get() instanceof ExtensibleLoadManagerWrapper) { + // Init system namespace for extensible load manager + this.createNamespaceIfNotExists(this.getConfiguration().getClusterName(), + SYSTEM_NAMESPACE.getTenant(), SYSTEM_NAMESPACE); + } + // Load additional servlets this.brokerAdditionalServlets = AdditionalServlets.load(config); @@ -825,6 +839,11 @@ public void start() throws PulsarServerException { this.webSocketService.setLocalCluster(clusterData); } + // By starting the Load manager service, the broker will also become visible + // to the rest of the broker by creating the registration z-node. This needs + // to be done only when the broker is fully operative. + this.startLoadManagementService(); + // Initialize namespace service, after service url assigned. Should init zk and refresh self owner info. this.nsService.initialize(); @@ -866,11 +885,6 @@ public void start() throws PulsarServerException { this.metricsGenerator = new MetricsGenerator(this); - // By starting the Load manager service, the broker will also become visible - // to the rest of the broker by creating the registration z-node. This needs - // to be done only when the broker is fully operative. - this.startLoadManagementService(); - // Initialize the message protocol handlers. // start the protocol handlers only after the broker is ready, // so that the protocol handlers can access broker service properly. @@ -925,6 +939,36 @@ public void start() throws PulsarServerException { } } + protected void createNamespaceIfNotExists(String cluster, String publicTenant, NamespaceName ns) throws Exception { + ClusterResources cr = this.getPulsarResources().getClusterResources(); + TenantResources tr = this.getPulsarResources().getTenantResources(); + NamespaceResources nsr = this.getPulsarResources().getNamespaceResources(); + + if (!cr.clusterExists(cluster)) { + cr.createCluster(cluster, + ClusterData.builder() + .serviceUrl(this.getWebServiceAddress()) + .serviceUrlTls(this.getWebServiceAddressTls()) + .brokerServiceUrl(this.getBrokerServiceUrl()) + .brokerServiceUrlTls(this.getBrokerServiceUrlTls()) + .build()); + } + + if (!tr.tenantExists(publicTenant)) { + tr.createTenant(publicTenant, + TenantInfo.builder() + .adminRoles(Sets.newHashSet(config.getSuperUserRoles())) + .allowedClusters(Sets.newHashSet(cluster)) + .build()); + } + + if (!nsr.namespaceExists(ns)) { + Policies nsp = new Policies(); + nsp.replication_clusters = Collections.singleton(config.getClusterName()); + nsr.createPolicies(ns, nsp); + } + } + private synchronized void createMetricsServlet() { this.metricsServlet = new PulsarPrometheusMetricsServlet( this, config.isExposeTopicLevelMetricsInPrometheus(), @@ -1085,6 +1129,9 @@ protected void closeLocalMetadataStore() throws Exception { } protected void startLeaderElectionService() { + if (this.loadManager.get() instanceof ExtensibleLoadManagerWrapper) { + return; + } this.leaderElectionService = new LeaderElectionService(coordinationService, getSafeWebServiceAddress(), state -> { if (state == LeaderElectionState.Leading) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java index e34215d199648c..230d36cf938a42 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java @@ -25,8 +25,11 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper; import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper; import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; +import org.apache.pulsar.broker.lookup.LookupResult; import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.Reflections; @@ -58,6 +61,15 @@ public interface LoadManager { */ Optional getLeastLoaded(ServiceUnitId su) throws Exception; + default CompletableFuture> findBrokerServiceUrl( + Optional topic, ServiceUnitId bundle) { + return null; + } + + default CompletableFuture checkOwnershipAsync(Optional topic, ServiceUnitId bundle) { + return null; + } + /** * Generate the load report. */ @@ -143,6 +155,11 @@ static LoadManager create(final PulsarService pulsar) { final LoadManager casted = new ModularLoadManagerWrapper((ModularLoadManager) loadManagerInstance); casted.initialize(pulsar); return casted; + } else if (loadManagerInstance instanceof ExtensibleLoadManagerImpl) { + final LoadManager casted = + new ExtensibleLoadManagerWrapper((ExtensibleLoadManagerImpl) loadManagerInstance); + casted.initialize(pulsar); + return casted; } } catch (Exception e) { LOG.warn("Error when trying to create load manager: ", e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java index 25ff4c9fb891fc..f182448422872a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java @@ -82,7 +82,7 @@ public BrokerRegistryImpl(PulsarService pulsar) { this.listeners = new ArrayList<>(); this.brokerId = pulsar.getLookupServiceAddress(); this.brokerLookupData = new BrokerLookupData( - pulsar.getSafeWebServiceAddress(), + pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java index bb66bf731f417a..b7da70d1cf1de1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java @@ -64,6 +64,15 @@ public interface ExtensibleLoadManager extends Closeable { */ CompletableFuture> assign(Optional topic, ServiceUnitId serviceUnit); + /** + * Check the incoming service unit is owned by the current broker. + * + * @param topic The optional topic, some method won't provide topic var in this param. + * @param serviceUnit The service unit (e.g. bundle). + * @return The broker lookup data. + */ + CompletableFuture checkOwnershipAsync(Optional topic, ServiceUnitId serviceUnit); + /** * Close the load manager. * diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java new file mode 100644 index 00000000000000..3a6251639583c9 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.BrokerFilterException; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; +import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; +import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter; +import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy; +import org.apache.pulsar.common.naming.ServiceUnitId; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; + +@Slf4j +public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { + + private PulsarService pulsar; + + private ServiceConfiguration conf; + + @Getter + private BrokerRegistry brokerRegistry; + + private ServiceUnitStateChannel serviceUnitStateChannel; + + @Getter + private LoadManagerContext context; + + @Getter + private final BrokerSelectionStrategy brokerSelectionStrategy; + + @Getter + private List brokerFilterPipeline; + + private final AtomicBoolean started = new AtomicBoolean(false); + + private final ConcurrentOpenHashMap>> + lookupRequests = ConcurrentOpenHashMap.>>newBuilder() + .build(); + + /** + * Life cycle: Constructor -> initialize -> start -> close. + */ + public ExtensibleLoadManagerImpl() { + this.brokerFilterPipeline = new ArrayList<>(); + this.brokerSelectionStrategy = (brokers, bundle, context) -> { + if (brokers.isEmpty()) { + return Optional.empty(); + } + return Optional.of(brokers.get(ThreadLocalRandom.current().nextInt(brokers.size()))); + }; + } + + @Override + public void start() throws PulsarServerException { + if (this.started.get()) { + return; + } + this.brokerRegistry = new BrokerRegistryImpl(pulsar); + this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar); + this.brokerRegistry.start(); + this.serviceUnitStateChannel.start(); + + // TODO: Start the load data store. + + this.context = LoadManagerContextImpl.builder() + .configuration(conf) + .brokerRegistry(brokerRegistry) + .brokerLoadDataStore(null) + .topBundleLoadDataStore(null).build(); + // TODO: Start load data reporter. + + // TODO: Start unload scheduler and bundle split scheduler + + this.started.set(true); + } + + @Override + public void initialize(PulsarService pulsar) { + this.pulsar = pulsar; + this.conf = pulsar.getConfiguration(); + } + + @Override + public CompletableFuture> assign(Optional topic, + ServiceUnitId serviceUnit) { + + final String bundle = serviceUnit.toString(); + + CompletableFuture> future = lookupRequests.computeIfAbsent(bundle, k -> { + final CompletableFuture> owner; + // Assign the bundle to channel owner if is internal topic, to avoid circular references. + if (topic.isPresent() && isInternalTopic(topic.get().toString())) { + owner = serviceUnitStateChannel.getChannelOwnerAsync(); + } else { + owner = serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> { + // If the bundle not assign yet, select and publish assign event to channel. + if (broker.isEmpty()) { + return this.selectAsync(serviceUnit).thenCompose(brokerOpt -> { + if (brokerOpt.isPresent()) { + log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle); + return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get()); + } else { + throw new IllegalStateException( + "Failed to discover(select) the new owner broker for bundle: " + bundle); + } + }); + } + // Already assigned, return it. + return CompletableFuture.completedFuture(broker); + }); + } + + return owner.thenCompose(broker -> { + if (broker.isEmpty()) { + String errorMsg = String.format( + "Failed to look up a broker registry:%s for bundle:%s", broker, bundle); + log.error(errorMsg); + throw new IllegalStateException(errorMsg); + } + return CompletableFuture.completedFuture(broker.get()); + }).thenCompose(broker -> this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> { + if (brokerLookupData.isEmpty()) { + String errorMsg = String.format( + "Failed to look up a broker registry:%s for bundle:%s", broker, bundle); + log.error(errorMsg); + throw new IllegalStateException(errorMsg); + } + return CompletableFuture.completedFuture(brokerLookupData); + })); + }); + future.whenComplete((r, t) -> lookupRequests.remove(bundle)); + return future; + } + + public CompletableFuture> selectAsync(ServiceUnitId bundle) { + BrokerRegistry brokerRegistry = getBrokerRegistry(); + return brokerRegistry.getAvailableBrokerLookupDataAsync() + .thenCompose(availableBrokers -> { + // TODO: Support isolation policies + LoadManagerContext context = this.getContext(); + + // Filter out brokers that do not meet the rules. + List filterPipeline = getBrokerFilterPipeline(); + Map availableBrokerCandidates = new HashMap<>(availableBrokers); + for (final BrokerFilter filter : filterPipeline) { + try { + filter.filter(availableBrokerCandidates, context); + } catch (BrokerFilterException e) { + availableBrokerCandidates = availableBrokers; + } + } + if (availableBrokerCandidates.isEmpty()) { + return CompletableFuture.completedFuture(Optional.empty()); + } + ArrayList candidateBrokers = new ArrayList<>(availableBrokerCandidates.keySet()); + + return CompletableFuture.completedFuture( + getBrokerSelectionStrategy().select(candidateBrokers, bundle, context)); + }); + } + + @Override + public CompletableFuture checkOwnershipAsync(Optional topic, ServiceUnitId bundleUnit) { + final String bundle = bundleUnit.toString(); + CompletableFuture> owner; + if (topic.isPresent() && isInternalTopic(topic.get().toString())) { + owner = serviceUnitStateChannel.getChannelOwnerAsync(); + } else { + owner = serviceUnitStateChannel.getOwnerAsync(bundle); + } + + return owner.thenApply(broker -> brokerRegistry.getBrokerId().equals(broker.orElse(null))); + } + + @Override + public void close() throws PulsarServerException { + if (!this.started.get()) { + return; + } + try { + this.brokerRegistry.close(); + } catch (Exception e) { + throw new PulsarServerException(e); + } + this.serviceUnitStateChannel.close(); + this.started.set(false); + } + + private boolean isInternalTopic(String topic) { + return topic.startsWith(ServiceUnitStateChannelImpl.TOPIC); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java new file mode 100644 index 00000000000000..54c035f92766c2 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.loadbalance.LoadManager; +import org.apache.pulsar.broker.loadbalance.ResourceUnit; +import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; +import org.apache.pulsar.broker.lookup.LookupResult; +import org.apache.pulsar.common.naming.ServiceUnitId; +import org.apache.pulsar.common.stats.Metrics; +import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport; + +public class ExtensibleLoadManagerWrapper implements LoadManager { + + private PulsarService pulsar; + + private final ExtensibleLoadManagerImpl loadManager; + + public ExtensibleLoadManagerWrapper(ExtensibleLoadManagerImpl loadManager) { + this.loadManager = loadManager; + } + + @Override + public void start() throws PulsarServerException { + loadManager.start(); + } + + @Override + public void initialize(PulsarService pulsar) { + loadManager.initialize(pulsar); + this.pulsar = pulsar; + } + + @Override + public boolean isCentralized() { + return true; + } + + @Override + public CompletableFuture> findBrokerServiceUrl( + Optional topic, ServiceUnitId bundle) { + return loadManager.assign(topic, bundle) + .thenApply(lookupData -> lookupData.map(BrokerLookupData::toLookupResult)); + } + + @Override + public CompletableFuture checkOwnershipAsync(Optional topic, ServiceUnitId bundle) { + return loadManager.checkOwnershipAsync(topic, bundle); + } + + @Override + public void disableBroker() throws Exception { + this.loadManager.getBrokerRegistry().unregister(); + } + + @Override + public Set getAvailableBrokers() throws Exception { + return getAvailableBrokersAsync() + .get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); + } + + @Override + public CompletableFuture> getAvailableBrokersAsync() { + return this.loadManager.getBrokerRegistry().getAvailableBrokersAsync().thenApply(HashSet::new); + } + + @Override + public void stop() throws PulsarServerException { + this.loadManager.close(); + } + + + @Override + public Optional getLeastLoaded(ServiceUnitId su) throws Exception { + return Optional.empty(); + } + + @Override + public LoadManagerReport generateLoadReport() { + return null; + } + + @Override + public void setLoadReportForceUpdateFlag() { + // No-op. + } + + @Override + public void writeLoadReportOnZookeeper() throws Exception { + // No-op, this operation is not useful, the load data reporter will automatically write. + } + + @Override + public void writeResourceQuotasToZooKeeper() throws Exception { + // No-op, this operation is not useful, the load data reporter will automatically write. + } + + @Override + public List getLoadBalancingMetrics() { + return null; + } + + @Override + public void doLoadShedding() { + // No-op. + } + + @Override + public void doNamespaceBundleSplit() { + // No-op. + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerContextImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerContextImpl.java new file mode 100644 index 00000000000000..5f78b88e22c6e2 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerContextImpl.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import lombok.Builder; +import lombok.Setter; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData; +import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData; +import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore; + +@Setter +@Builder +public class LoadManagerContextImpl implements LoadManagerContext { + + private LoadDataStore brokerLoadDataStore; + + private LoadDataStore topBundleLoadDataStore; + + private BrokerRegistry brokerRegistry; + + private ServiceConfiguration configuration; + + + @Override + public ServiceConfiguration brokerConfiguration() { + return this.configuration; + } + + @Override + public LoadDataStore brokerLoadDataStore() { + return this.brokerLoadDataStore; + } + + @Override + public LoadDataStore topBundleLoadDataStore() { + return this.topBundleLoadDataStore; + } + + @Override + public BrokerRegistry brokerRegistry() { + return this.brokerRegistry; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java index 238c433e1776f1..0e28a31fa54b27 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java @@ -118,7 +118,7 @@ public interface ServiceUnitStateChannel extends Closeable { * the future object will time out. * Case 3: If none of them, it returns null. */ - CompletableFuture getOwnerAsync(String serviceUnit); + CompletableFuture> getOwnerAsync(String serviceUnit); /** * Asynchronously publishes the service unit assignment event to the system topic in this channel. @@ -132,7 +132,7 @@ public interface ServiceUnitStateChannel extends Closeable { * case 2: If the assigned broker does not take the ownership in time, * the future object will time out. */ - CompletableFuture publishAssignEventAsync(String serviceUnit, String broker); + CompletableFuture> publishAssignEventAsync(String serviceUnit, String broker); /** * Asynchronously publishes the service unit unload event to the system topic in this channel. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index a476be974a30c1..81bfc02ed79d6f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -84,7 +84,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { private static final int MAX_OUTSTANDING_PUB_MESSAGES = 500; private final PulsarService pulsar; private final Schema schema; - private final ConcurrentOpenHashMap> getOwnerRequests; + private final ConcurrentOpenHashMap>> getOwnerRequests; private final String lookupServiceAddress; // TODO: define BrokerRegistry private final ConcurrentOpenHashMap> cleanupJobs; @@ -130,13 +130,13 @@ public ServiceUnitStateChannelImpl(PulsarService pulsar) { this.lookupServiceAddress = pulsar.getLookupServiceAddress(); this.schema = Schema.JSON(ServiceUnitStateData.class); this.getOwnerRequests = ConcurrentOpenHashMap.>newBuilder().build(); + CompletableFuture>>newBuilder().build(); this.cleanupJobs = ConcurrentOpenHashMap.>newBuilder().build(); this.inFlightStateWaitingTimeInMillis = MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS; this.maxCleanupDelayTimeInSecs = MAX_CLEAN_UP_DELAY_TIME_IN_SECS; this.minCleanupDelayTimeInSecs = MIN_CLEAN_UP_DELAY_TIME_IN_SECS; this.leaderElectionService = new LeaderElectionService( - pulsar.getCoordinationService(), pulsar.getSafeWebServiceAddress(), + pulsar.getCoordinationService(), pulsar.getLookupServiceAddress(), state -> { if (state == LeaderElectionState.Leading) { log.debug("This broker:{} is the leader now.", lookupServiceAddress); @@ -242,11 +242,12 @@ public CompletableFuture> getChannelOwnerAsync() { // TODO: discard this protocol prefix removal // by a util func that returns lookupServiceAddress(serviceUrl) if (leader.isPresent()) { - String broker = leader.get().getServiceUrl(); - broker = broker.substring(broker.lastIndexOf('/') + 1); - return Optional.of(broker); + return Optional.of(leader.get().getServiceUrl()); } else { - return Optional.empty(); + // When leader is empty, we should throw exception to notify is failed. + String msg = "There is no channel owner now."; + log.error(msg); + throw new IllegalStateException(msg); } } ); @@ -275,27 +276,31 @@ private boolean isChannelOwner() { } } - public CompletableFuture getOwnerAsync(String serviceUnit) { + public CompletableFuture> getOwnerAsync(String serviceUnit) { validateChannelState(Started, true); ServiceUnitStateData data = tableview.get(serviceUnit); if (data == null) { - return CompletableFuture.completedFuture(null); + return CompletableFuture.completedFuture(Optional.empty()); } switch (data.state()) { case Owned, Splitting -> { - return CompletableFuture.completedFuture(data.broker()); + return CompletableFuture.completedFuture(Optional.of(data.broker())); } case Assigned, Released -> { return deferGetOwnerRequest(serviceUnit); } default -> { - return null; + String errorMsg = String.format("Failed to process service unit state data: %s when get owner.", data); + log.error(errorMsg); + CompletableFuture> future = new CompletableFuture<>(); + future.completeExceptionally(new IllegalStateException(errorMsg)); + return future; } } } - public CompletableFuture publishAssignEventAsync(String serviceUnit, String broker) { - CompletableFuture getOwnerRequest = deferGetOwnerRequest(serviceUnit); + public CompletableFuture> publishAssignEventAsync(String serviceUnit, String broker) { + CompletableFuture> getOwnerRequest = deferGetOwnerRequest(serviceUnit); pubAsync(serviceUnit, new ServiceUnitStateData(Assigned, broker)) .whenComplete((__, ex) -> { if (ex != null) { @@ -380,7 +385,7 @@ lookupServiceAddress, getLogEventTag(data), serviceUnit, private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) { var getOwnerRequest = getOwnerRequests.remove(serviceUnit); if (getOwnerRequest != null) { - getOwnerRequest.complete(data.broker()); + getOwnerRequest.complete(Optional.of(data.broker())); } if (isTargetBroker(data.broker())) { log(null, serviceUnit, data, null); @@ -465,10 +470,10 @@ private NamespaceBundle getNamespaceBundle(String bundle) { return pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName, bundleRange); } - private CompletableFuture deferGetOwnerRequest(String serviceUnit) { + private CompletableFuture> deferGetOwnerRequest(String serviceUnit) { return getOwnerRequests .computeIfAbsent(serviceUnit, k -> { - CompletableFuture future = new CompletableFuture<>(); + CompletableFuture> future = new CompletableFuture<>(); future.orTimeout(inFlightStateWaitingTimeInMillis, TimeUnit.MILLISECONDS) .whenComplete((v, e) -> { if (e != null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java index 4adc6aa1ce46d9..0a76446d3ce6f2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java @@ -18,9 +18,10 @@ */ package org.apache.pulsar.broker.loadbalance.extensions.filter; -import java.util.List; +import java.util.Map; import org.apache.pulsar.broker.loadbalance.BrokerFilterException; import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; +import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; /** * Filter out unqualified Brokers, which are not entered into LoadBalancer for decision-making. @@ -39,6 +40,7 @@ public interface BrokerFilter { * @param context The load manager context. * @return Filtered broker list. */ - List filter(List brokers, LoadManagerContext context) throws BrokerFilterException; + Map filter(Map brokers, LoadManagerContext context) + throws BrokerFilterException; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/BrokerSelectionStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/BrokerSelectionStrategy.java new file mode 100644 index 00000000000000..3b62777f1b626c --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/BrokerSelectionStrategy.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.strategy; + +import java.util.List; +import java.util.Optional; +import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; +import org.apache.pulsar.common.naming.ServiceUnitId; + +/** + * The broker selection strategy is designed to select the broker according to different implementations. + */ +public interface BrokerSelectionStrategy { + + /** + * Choose an appropriate broker according to different load balancing implementations. + * + * @param brokers + * The candidate brokers list. + * @param bundle + * The input bundle to select the owner broker + * @param context + * The context contains information needed for selection (load data, config, and etc). + */ + Optional select(List brokers, ServiceUnitId bundle, LoadManagerContext context); + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/package-info.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/package-info.java new file mode 100644 index 00000000000000..846b528045a13d --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.strategy; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 4be64f4b7b50cf..3c67216e85b7de 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -55,6 +55,7 @@ import org.apache.pulsar.broker.loadbalance.LeaderElectionService; import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.loadbalance.ResourceUnit; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper; import org.apache.pulsar.broker.lookup.LookupResult; import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; @@ -171,11 +172,21 @@ public void initialize() { } } + public boolean isExtensibleLoadManager(){ + return loadManager.get() instanceof ExtensibleLoadManagerWrapper; + } + public CompletableFuture> getBrokerServiceUrlAsync(TopicName topic, LookupOptions options) { long startTime = System.nanoTime(); CompletableFuture> future = getBundleAsync(topic) - .thenCompose(bundle -> findBrokerServiceUrl(bundle, options)); + .thenCompose(bundle -> { + if (isExtensibleLoadManager()) { + return loadManager.get().findBrokerServiceUrl(Optional.of(topic), bundle); + } else { + return findBrokerServiceUrl(bundle, options); + } + }); future.thenAccept(optResult -> { lookupLatency.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS); @@ -232,16 +243,18 @@ public CompletableFuture> getWebServiceUrlAsync(ServiceUnitId suNa LOG.debug("Getting web service URL of topic: {} - options: {}", name, options); } return getBundleAsync(name) - .thenCompose(namespaceBundle -> internalGetWebServiceUrl(namespaceBundle, options)); + .thenCompose(namespaceBundle -> + internalGetWebServiceUrl(Optional.of(name), namespaceBundle, options)); } if (suName instanceof NamespaceName) { return getFullBundleAsync((NamespaceName) suName) - .thenCompose(namespaceBundle -> internalGetWebServiceUrl(namespaceBundle, options)); + .thenCompose(namespaceBundle -> + internalGetWebServiceUrl(Optional.empty(), namespaceBundle, options)); } if (suName instanceof NamespaceBundle) { - return internalGetWebServiceUrl((NamespaceBundle) suName, options); + return internalGetWebServiceUrl(Optional.empty(), (NamespaceBundle) suName, options); } throw new IllegalArgumentException("Unrecognized class of NamespaceBundle: " + suName.getClass().getName()); @@ -257,9 +270,13 @@ public Optional getWebServiceUrl(ServiceUnitId suName, LookupOptions option .get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS); } - private CompletableFuture> internalGetWebServiceUrl(NamespaceBundle bundle, LookupOptions options) { + private CompletableFuture> internalGetWebServiceUrl(Optional topic, + NamespaceBundle bundle, + LookupOptions options) { - return findBrokerServiceUrl(bundle, options).thenApply(lookupResult -> { + return (isExtensibleLoadManager() + ? loadManager.get().findBrokerServiceUrl(topic, bundle) : + findBrokerServiceUrl(bundle, options)).thenApply(lookupResult -> { if (lookupResult.isPresent()) { try { LookupData lookupData = lookupResult.get().getLookupData(); @@ -1023,6 +1040,9 @@ public CompletableFuture isServiceUnitOwnedAsync(ServiceUnitId suName) } if (suName instanceof NamespaceBundle) { + if (isExtensibleLoadManager()) { + return loadManager.get().checkOwnershipAsync(Optional.empty(), suName); + } return CompletableFuture.completedFuture( ownershipCache.isNamespaceBundleOwned((NamespaceBundle) suName)); } @@ -1045,6 +1065,10 @@ public boolean isServiceUnitActive(TopicName topicName) { } public CompletableFuture isServiceUnitActiveAsync(TopicName topicName) { + if (isExtensibleLoadManager()) { + return getBundleAsync(topicName) + .thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle)); + } Optional> res = ownershipCache.getOwnedBundleAsync(getBundle(topicName)); if (!res.isPresent()) { return CompletableFuture.completedFuture(false); @@ -1058,15 +1082,27 @@ private boolean isNamespaceOwned(NamespaceName fqnn) throws Exception { } private CompletableFuture isNamespaceOwnedAsync(NamespaceName fqnn) { + if (isExtensibleLoadManager()) { + return getFullBundleAsync(fqnn) + .thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.empty(), bundle)); + } return getFullBundleAsync(fqnn) .thenApply(bundle -> ownershipCache.getOwnedBundle(bundle) != null); } private CompletableFuture isTopicOwnedAsync(TopicName topic) { + if (isExtensibleLoadManager()) { + return getBundleAsync(topic) + .thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topic), bundle)); + } return getBundleAsync(topic).thenApply(bundle -> ownershipCache.isNamespaceBundleOwned(bundle)); } public CompletableFuture checkTopicOwnership(TopicName topicName) { + if (isExtensibleLoadManager()) { + return getBundleAsync(topicName) + .thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle)); + } return getBundleAsync(topicName) .thenCompose(ownershipCache::checkOwnershipAsync); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java new file mode 100644 index 00000000000000..45365dbfbaa754 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import lombok.extern.slf4j.Slf4j; +import java.net.URL; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentMap; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.loadbalance.BrokerFilterException; +import org.apache.pulsar.broker.loadbalance.LeaderBroker; +import org.apache.pulsar.broker.loadbalance.LeaderElectionService; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; +import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; +import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter; +import org.apache.pulsar.broker.lookup.LookupResult; +import org.apache.pulsar.broker.namespace.LookupOptions; +import org.apache.pulsar.client.impl.TableViewImpl; +import org.apache.pulsar.common.naming.NamespaceBundle; +import org.apache.pulsar.common.naming.TopicName; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Unit test for {@link ExtensibleLoadManagerImpl}. + */ +@Slf4j +@Test(groups = "broker") +public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest { + + private PulsarService pulsar1; + private PulsarService pulsar2; + + private ExtensibleLoadManagerImpl primaryLoadManager; + + private ExtensibleLoadManagerImpl secondaryLoadManager; + + private ServiceUnitStateChannelImpl channel1; + private ServiceUnitStateChannelImpl channel2; + + @BeforeClass + public void setup() throws Exception { + conf.setAllowAutoTopicCreation(true); + conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); + super.internalSetup(conf); + pulsar1 = pulsar; + ServiceConfiguration defaultConf = getDefaultConf(); + defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); + pulsar2 = startBrokerWithoutAuthorization(defaultConf); + + ExtensibleLoadManagerWrapper primaryLoadManagerWrapper = + (ExtensibleLoadManagerWrapper) pulsar1.getLoadManager().get(); + primaryLoadManager = spy((ExtensibleLoadManagerImpl) + FieldUtils.readField(primaryLoadManagerWrapper, "loadManager", true)); + FieldUtils.writeField(primaryLoadManagerWrapper, "loadManager", primaryLoadManager, true); + + ExtensibleLoadManagerWrapper secondaryLoadManagerWrapper = + (ExtensibleLoadManagerWrapper) pulsar2.getLoadManager().get(); + secondaryLoadManager = spy((ExtensibleLoadManagerImpl) + FieldUtils.readField(secondaryLoadManagerWrapper, "loadManager", true)); + FieldUtils.writeField(secondaryLoadManagerWrapper, "loadManager", secondaryLoadManager, true); + + channel1 = (ServiceUnitStateChannelImpl) + FieldUtils.readField(primaryLoadManager, "serviceUnitStateChannel", true); + channel2 = (ServiceUnitStateChannelImpl) + FieldUtils.readField(secondaryLoadManager, "serviceUnitStateChannel", true); + + } + + @Override + protected void cleanup() throws Exception { + pulsar1 = null; + pulsar2.close(); + super.internalCleanup(); + } + + @BeforeMethod + protected void initializeState() throws IllegalAccessException { + reset(primaryLoadManager, secondaryLoadManager); + cleanTableView(channel1); + cleanTableView(channel2); + } + + @Test + public void testAssignInternalTopic() throws Exception { + Optional brokerLookupData1 = primaryLoadManager.assign( + Optional.of(TopicName.get(ServiceUnitStateChannelImpl.TOPIC)), + getBundleAsync(pulsar1, TopicName.get(ServiceUnitStateChannelImpl.TOPIC)).get()).get(); + Optional brokerLookupData2 = secondaryLoadManager.assign( + Optional.of(TopicName.get(ServiceUnitStateChannelImpl.TOPIC)), + getBundleAsync(pulsar1, TopicName.get(ServiceUnitStateChannelImpl.TOPIC)).get()).get(); + assertEquals(brokerLookupData1, brokerLookupData2); + assertTrue(brokerLookupData1.isPresent()); + + LeaderElectionService leaderElectionService = (LeaderElectionService) + FieldUtils.readField(channel1, "leaderElectionService", true); + Optional currentLeader = leaderElectionService.getCurrentLeader(); + assertTrue(currentLeader.isPresent()); + String webServiceUrl = brokerLookupData1.get().getWebServiceUrl(); + assertEquals(webServiceUrl.substring(webServiceUrl.lastIndexOf('/') + 1), + currentLeader.get().getServiceUrl()); + } + + @Test + public void testAssign() throws Exception { + TopicName topicName = TopicName.get("test-assign"); + NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get(); + Optional brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle).get(); + assertTrue(brokerLookupData.isPresent()); + log.info("Assign the bundle {} to {}", bundle, brokerLookupData); + // Should get owner info from channel. + Optional brokerLookupData1 = secondaryLoadManager.assign(Optional.empty(), bundle).get(); + assertEquals(brokerLookupData, brokerLookupData1); + + verify(primaryLoadManager, times(1)).getBrokerSelectionStrategy(); + verify(secondaryLoadManager, times(0)).getBrokerSelectionStrategy(); + + Optional lookupResult = pulsar2.getNamespaceService() + .getBrokerServiceUrlAsync(topicName, null).get(); + assertTrue(lookupResult.isPresent()); + assertEquals(lookupResult.get().getLookupData().getHttpUrl(), brokerLookupData.get().getWebServiceUrl()); + + Optional webServiceUrl = pulsar2.getNamespaceService() + .getWebServiceUrl(bundle, LookupOptions.builder().requestHttps(false).build()); + assertTrue(webServiceUrl.isPresent()); + assertEquals(webServiceUrl.get().toString(), brokerLookupData.get().getWebServiceUrl()); + } + + @Test + public void testCheckOwnershipAsync() throws Exception { + NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get("test-check-ownership")).get(); + // 1. The bundle is never assigned. + assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + + // 2. Assign the bundle to a broker. + Optional lookupData = primaryLoadManager.assign(Optional.empty(), bundle).get(); + assertTrue(lookupData.isPresent()); + if (lookupData.get().getPulsarServiceUrl().equals(pulsar1.getBrokerServiceUrl())) { + assertTrue(primaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + } else { + assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + assertTrue(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); + } + + } + + @Test + public void testFilter() throws Exception { + TopicName topicName = TopicName.get("test-filter"); + NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get(); + + doReturn(List.of(new BrokerFilter() { + @Override + public String name() { + return "Mock broker filter"; + } + + @Override + public Map filter(Map brokers, + LoadManagerContext context) { + brokers.remove(pulsar1.getLookupServiceAddress()); + return brokers; + } + })).when(primaryLoadManager).getBrokerFilterPipeline(); + + Optional brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle).get(); + assertTrue(brokerLookupData.isPresent()); + assertEquals(brokerLookupData.get().getWebServiceUrl(), pulsar2.getWebServiceAddress()); + } + + @Test + public void testFilterHasException() throws Exception { + TopicName topicName = TopicName.get("test-filter-has-exception"); + NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get(); + + doReturn(List.of(new BrokerFilter() { + @Override + public String name() { + return "Mock broker filter"; + } + + @Override + public Map filter(Map brokers, + LoadManagerContext context) throws BrokerFilterException { + brokers.clear(); + throw new BrokerFilterException("Test"); + } + })).when(primaryLoadManager).getBrokerFilterPipeline(); + + Optional brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle).get(); + assertTrue(brokerLookupData.isPresent()); + } + + private static void cleanTableView(ServiceUnitStateChannel channel) + throws IllegalAccessException { + var tv = (TableViewImpl) + FieldUtils.readField(channel, "tableview", true); + var cache = (ConcurrentMap) + FieldUtils.readField(tv, "data", true); + cache.clear(); + } + + private CompletableFuture getBundleAsync(PulsarService pulsar, TopicName topic) { + return pulsar.getNamespaceService().getBundleAsync(topic); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index ad4d0cb2f0b566..12e90300f21ac6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -28,7 +28,6 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -269,8 +268,8 @@ public void assignmentTest() var owner1 = channel1.getOwnerAsync(bundle); var owner2 = channel2.getOwnerAsync(bundle); - assertNull(owner1.get()); - assertNull(owner2.get()); + assertTrue(owner1.get().isEmpty()); + assertTrue(owner1.get().isEmpty()); var assigned1 = channel1.publishAssignEventAsync(bundle, lookupServiceAddress1); var assigned2 = channel2.publishAssignEventAsync(bundle, lookupServiceAddress2); @@ -278,8 +277,12 @@ public void assignmentTest() assertNotNull(assigned2); waitUntilOwnerChanges(channel1, bundle, null); waitUntilOwnerChanges(channel2, bundle, null); - String assignedAddr1 = assigned1.get(5, TimeUnit.SECONDS); - String assignedAddr2 = assigned2.get(5, TimeUnit.SECONDS); + Optional assignedAddrOpt1 = assigned1.get(5, TimeUnit.SECONDS); + Optional assignedAddrOpt2 = assigned2.get(5, TimeUnit.SECONDS); + assertTrue(assignedAddrOpt1.isPresent()); + assertTrue(assignedAddrOpt2.isPresent()); + String assignedAddr1 = assignedAddrOpt1.get(); + String assignedAddr2 = assignedAddrOpt2.get(); assertEquals(assignedAddr1, assignedAddr2); assertTrue(assignedAddr1.equals(lookupServiceAddress1) @@ -318,14 +321,16 @@ public void assignmentTestWhenOneAssignmentFails() var owner1 = channel1.getOwnerAsync(bundle); var owner2 = channel2.getOwnerAsync(bundle); - assertNull(owner1.get()); - assertNull(owner2.get()); + assertTrue(owner1.get().isEmpty()); + assertTrue(owner2.get().isEmpty()); owner1 = channel1.publishAssignEventAsync(bundle, lookupServiceAddress1); owner2 = channel2.publishAssignEventAsync(bundle, lookupServiceAddress2); assertTrue(owner1.isCompletedExceptionally()); assertNotNull(owner2); - String ownerAddr2 = owner2.get(5, TimeUnit.SECONDS); + Optional ownerAddrOpt2 = owner2.get(5, TimeUnit.SECONDS); + assertTrue(ownerAddrOpt2.isPresent()); + String ownerAddr2 = ownerAddrOpt2.get(); assertEquals(ownerAddr2, lookupServiceAddress2); waitUntilNewOwner(channel1, bundle, lookupServiceAddress2); assertEquals(0, getOwnerRequests1.size()); @@ -341,8 +346,8 @@ public void unloadTest() var owner1 = channel1.getOwnerAsync(bundle); var owner2 = channel2.getOwnerAsync(bundle); - assertNull(owner1.get()); - assertNull(owner2.get()); + assertTrue(owner1.get().isEmpty()); + assertTrue(owner2.get().isEmpty()); channel1.publishAssignEventAsync(bundle, lookupServiceAddress1); @@ -352,7 +357,7 @@ public void unloadTest() var ownerAddr2 = channel2.getOwnerAsync(bundle).get(); assertEquals(ownerAddr1, ownerAddr2); - assertEquals(ownerAddr1, lookupServiceAddress1); + assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1)); Unload unload = new Unload(lookupServiceAddress1, bundle, Optional.of(lookupServiceAddress2)); channel1.publishUnloadEventAsync(unload); @@ -363,7 +368,7 @@ public void unloadTest() ownerAddr1 = channel1.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS); ownerAddr2 = channel2.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS); assertEquals(ownerAddr1, ownerAddr2); - assertEquals(ownerAddr1, lookupServiceAddress2); + assertEquals(ownerAddr1, Optional.of(lookupServiceAddress2)); } @Test(priority = 5) @@ -382,7 +387,7 @@ public void unloadTestWhenDestBrokerFails() var ownerAddr2 = channel2.getOwnerAsync(bundle).get(); assertEquals(ownerAddr1, ownerAddr2); - assertEquals(ownerAddr1, lookupServiceAddress1); + assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1)); var producer = (Producer) FieldUtils.readDeclaredField(channel1, "producer", true); @@ -443,10 +448,11 @@ public void splitTest() throws Exception { waitUntilNewOwner(channel2, bundle, lookupServiceAddress1); var ownerAddr1 = channel1.getOwnerAsync(bundle).get(); var ownerAddr2 = channel2.getOwnerAsync(bundle).get(); - assertEquals(ownerAddr1, lookupServiceAddress1); - assertEquals(ownerAddr2, lookupServiceAddress1); + assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1)); + assertEquals(ownerAddr2, Optional.of(lookupServiceAddress1)); + assertTrue(ownerAddr1.isPresent()); - Split split = new Split(bundle, ownerAddr1, new HashMap<>()); + Split split = new Split(bundle, ownerAddr1.get(), new HashMap<>()); channel1.publishSplitEventAsync(split); waitUntilNewOwner(channel1, bundle, null); @@ -534,8 +540,8 @@ public void handleBrokerDeletionEventTest() var owner1 = channel1.getOwnerAsync(bundle1); var owner2 = channel2.getOwnerAsync(bundle2); - assertNull(owner1.get()); - assertNull(owner2.get()); + assertTrue(owner1.get().isEmpty()); + assertTrue(owner2.get().isEmpty()); String broker = lookupServiceAddress1; channel1.publishAssignEventAsync(bundle1, broker); @@ -731,11 +737,11 @@ private static void waitUntilOwnerChanges(ServiceUnitStateChannel channel, Strin .pollInterval(200, TimeUnit.MILLISECONDS) .atMost(10, TimeUnit.SECONDS) .until(() -> { // wait until true - CompletableFuture owner = channel.getOwnerAsync(serviceUnit); + CompletableFuture> owner = channel.getOwnerAsync(serviceUnit); if (!owner.isDone()) { return false; } - return !StringUtils.equals(oldOwner, owner.get()); + return !StringUtils.equals(oldOwner, owner.get().orElse(null)); }); } @@ -745,11 +751,11 @@ private static void waitUntilNewOwner(ServiceUnitStateChannel channel, String se .atMost(15, TimeUnit.SECONDS) .until(() -> { // wait until true try { - CompletableFuture owner = channel.getOwnerAsync(serviceUnit); + CompletableFuture> owner = channel.getOwnerAsync(serviceUnit); if (!owner.isDone()) { return false; } - return StringUtils.equals(newOwner, owner.get()); + return StringUtils.equals(newOwner, owner.get().orElse(null)); } catch (Exception e) { return false; }