diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilter.java index 4ee28a5225a0d..07109b277ae98 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilter.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.loadbalance.extensions.filter; import java.util.Map; +import java.util.Objects; import org.apache.pulsar.broker.loadbalance.BrokerFilterException; import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; @@ -43,7 +44,9 @@ public Map filter( } brokers.entrySet().removeIf(entry -> { BrokerLookupData v = entry.getValue(); - return !v.getLoadManagerClassName().equals(context.brokerConfiguration().getLoadManagerClassName()); + // The load manager class name can be null if the cluster has old version of broker. + return !Objects.equals(v.getLoadManagerClassName(), + context.brokerConfiguration().getLoadManagerClassName()); }); return brokers; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java index 4aff77937a5b4..3455b333b0ae7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java @@ -19,9 +19,11 @@ package org.apache.pulsar.broker.loadbalance.extensions.manager; import static org.apache.pulsar.broker.loadbalance.LoadManager.LOADBALANCE_BROKERS_ROOT; +import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -48,6 +50,12 @@ public RedirectManager(PulsarService pulsar) { this.brokerLookupDataLockManager = pulsar.getCoordinationService().getLockManager(BrokerLookupData.class); } + @VisibleForTesting + public RedirectManager(PulsarService pulsar, LockManager brokerLookupDataLockManager) { + this.pulsar = pulsar; + this.brokerLookupDataLockManager = brokerLookupDataLockManager; + } + public CompletableFuture> getAvailableBrokerLookupDataAsync() { return brokerLookupDataLockManager.listLocks(LOADBALANCE_BROKERS_ROOT).thenCompose(availableBrokers -> { Map map = new ConcurrentHashMap<>(); @@ -69,7 +77,7 @@ public CompletableFuture> getAvailableBrokerLookup public CompletableFuture> findRedirectLookupResultAsync() { String currentLMClassName = pulsar.getConfiguration().getLoadManagerClassName(); - boolean debug = ExtensibleLoadManagerImpl.debug(pulsar.getConfig(), log); + boolean debug = ExtensibleLoadManagerImpl.debug(pulsar.getConfiguration(), log); return getAvailableBrokerLookupDataAsync().thenApply(lookupDataMap -> { if (lookupDataMap.isEmpty()) { String errorMsg = "No available broker found."; @@ -89,9 +97,10 @@ public CompletableFuture> findRedirectLookupResultAsync() log.warn(errorMsg); throw new IllegalStateException(errorMsg); } - if (latestServiceLookupData.get().getLoadManagerClassName().equals(currentLMClassName)) { + + if (Objects.equals(latestServiceLookupData.get().getLoadManagerClassName(), currentLMClassName)) { if (debug) { - log.info("We don't need to redirect, current load manager class name: {}", + log.info("No need to redirect, current load manager class name: {}", currentLMClassName); } return Optional.empty(); @@ -99,7 +108,7 @@ public CompletableFuture> findRedirectLookupResultAsync() var serviceLookupDataObj = latestServiceLookupData.get(); var candidateBrokers = new ArrayList(); lookupDataMap.forEach((key, value) -> { - if (value.getLoadManagerClassName().equals(serviceLookupDataObj.getLoadManagerClassName())) { + if (Objects.equals(value.getLoadManagerClassName(), serviceLookupDataObj.getLoadManagerClassName())) { candidateBrokers.add(value); } }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLoadManagerClassFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLoadManagerClassFilter.java index 5d6a56ba86960..13e3fdc537e79 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLoadManagerClassFilter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLoadManagerClassFilter.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.loadbalance.impl; +import java.util.Objects; import java.util.Set; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.BrokerFilter; @@ -32,8 +33,8 @@ public void filter(Set brokers, BundleData bundleToAssign, LoadData loadData, ServiceConfiguration conf) throws BrokerFilterException { loadData.getBrokerData().forEach((key, value) -> { - if (!value.getLocalData().getLoadManagerClassName() - .equals(conf.getLoadManagerClassName())) { + // The load manager class name can be null if the cluster has old version of broker. + if (!Objects.equals(value.getLocalData().getLoadManagerClassName(), conf.getLoadManagerClassName())) { brokers.remove(key); } }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilterTest.java index 0169b57fe993e..4aef87cf63aa8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilterTest.java @@ -44,7 +44,8 @@ public void test() throws BrokerFilterException { "broker1", getLookupData("3.0.0", ExtensibleLoadManagerImpl.class.getName()), "broker2", getLookupData("3.0.0", ExtensibleLoadManagerImpl.class.getName()), "broker3", getLookupData("3.0.0", ModularLoadManagerImpl.class.getName()), - "broker4", getLookupData("3.0.0", ModularLoadManagerImpl.class.getName()) + "broker4", getLookupData("3.0.0", ModularLoadManagerImpl.class.getName()), + "broker5", getLookupData("3.0.0", null) ); Map result = filter.filter(new HashMap<>(originalBrokers), null, context); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerTest.java new file mode 100644 index 0000000000000..cbf77b59d5ad6 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.manager; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; +import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; +import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl; +import org.apache.pulsar.broker.lookup.LookupResult; +import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener; +import org.testng.annotations.Test; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + + +/** + * Unit test {@link RedirectManager}. + */ +public class RedirectManagerTest { + + @Test + public void testFindRedirectLookupResultAsync() throws ExecutionException, InterruptedException { + PulsarService pulsar = mock(PulsarService.class); + ServiceConfiguration configuration = new ServiceConfiguration(); + when(pulsar.getConfiguration()).thenReturn(configuration); + RedirectManager redirectManager = spy(new RedirectManager(pulsar, null)); + + configuration.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); + configuration.setLoadBalancerDebugModeEnabled(true); + + // Test 1: No load manager class name found. + doReturn(CompletableFuture.completedFuture( + new HashMap<>(){{ + put("broker-1", getLookupData("broker-1", null, 10)); + put("broker-2", getLookupData("broker-2", ModularLoadManagerImpl.class.getName(), 1)); + }} + )).when(redirectManager).getAvailableBrokerLookupDataAsync(); + + // Should redirect to broker-1, since broker-1 has the latest load manager, even though the class name is null. + Optional lookupResult = redirectManager.findRedirectLookupResultAsync().get(); + assertTrue(lookupResult.isPresent()); + assertTrue(lookupResult.get().getLookupData().getBrokerUrl().contains("broker-1")); + + // Test 2: Should redirect to broker-1, since the latest broker are using ExtensibleLoadManagerImpl + doReturn(CompletableFuture.completedFuture( + new HashMap<>(){{ + put("broker-1", getLookupData("broker-1", ExtensibleLoadManagerImpl.class.getName(), 10)); + put("broker-2", getLookupData("broker-2", ModularLoadManagerImpl.class.getName(), 1)); + }} + )).when(redirectManager).getAvailableBrokerLookupDataAsync(); + + lookupResult = redirectManager.findRedirectLookupResultAsync().get(); + assertTrue(lookupResult.isPresent()); + assertTrue(lookupResult.get().getLookupData().getBrokerUrl().contains("broker-1")); + + + // Test 3: Should not redirect, since current broker are using ModularLoadManagerImpl + doReturn(CompletableFuture.completedFuture( + new HashMap<>(){{ + put("broker-1", getLookupData("broker-1", ExtensibleLoadManagerImpl.class.getName(), 10)); + put("broker-2", getLookupData("broker-2", ModularLoadManagerImpl.class.getName(), 100)); + }} + )).when(redirectManager).getAvailableBrokerLookupDataAsync(); + + lookupResult = redirectManager.findRedirectLookupResultAsync().get(); + assertFalse(lookupResult.isPresent()); + } + + + public BrokerLookupData getLookupData(String broker, String loadManagerClassName, long startTimeStamp) { + String webServiceUrl = "http://" + broker + ":8080"; + String webServiceUrlTls = "https://" + broker + ":8081"; + String pulsarServiceUrl = "pulsar://" + broker + ":6650"; + String pulsarServiceUrlTls = "pulsar+ssl://" + broker + ":6651"; + Map advertisedListeners = new HashMap<>(); + Map protocols = new HashMap<>(){{ + put("kafka", "9092"); + }}; + return new BrokerLookupData( + webServiceUrl, webServiceUrlTls, pulsarServiceUrl, + pulsarServiceUrlTls, advertisedListeners, protocols, true, true, + loadManagerClassName, startTimeStamp, "3.0.0"); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLoadManagerClassFilterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLoadManagerClassFilterTest.java index 856bbac029226..56332111f935b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLoadManagerClassFilterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLoadManagerClassFilterTest.java @@ -46,8 +46,12 @@ public void test() throws BrokerFilterException { LocalBrokerData localBrokerData1 = new LocalBrokerData(); localBrokerData1.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); + + LocalBrokerData localBrokerData2 = new LocalBrokerData(); + localBrokerData2.setLoadManagerClassName(null); loadData.getBrokerData().put("broker1", new BrokerData(localBrokerData)); loadData.getBrokerData().put("broker2", new BrokerData(localBrokerData1)); + loadData.getBrokerData().put("broker3", new BrokerData(localBrokerData2)); ServiceConfiguration conf = new ServiceConfiguration(); conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); @@ -55,6 +59,7 @@ public void test() throws BrokerFilterException { Set brokers = new HashSet<>(){{ add("broker1"); add("broker2"); + add("broker3"); }}; filter.filter(brokers, null, loadData, conf); @@ -64,6 +69,7 @@ public void test() throws BrokerFilterException { brokers = new HashSet<>(){{ add("broker1"); add("broker2"); + add("broker3"); }}; conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); filter.filter(brokers, null, loadData, conf);