diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java index 4adc6aa1ce46d9..35f4b6817f1314 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java @@ -18,9 +18,10 @@ */ package org.apache.pulsar.broker.loadbalance.extensions.filter; -import java.util.List; +import java.util.Map; import org.apache.pulsar.broker.loadbalance.BrokerFilterException; import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; +import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; /** * Filter out unqualified Brokers, which are not entered into LoadBalancer for decision-making. @@ -35,10 +36,11 @@ public interface BrokerFilter { /** * Filter out unqualified brokers based on implementation. * - * @param brokers The full brokers. + * @param brokers The full broker and lookup data. * @param context The load manager context. * @return Filtered broker list. */ - List filter(List brokers, LoadManagerContext context) throws BrokerFilterException; + Map filter(Map brokers, LoadManagerContext context) + throws BrokerFilterException; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java new file mode 100644 index 00000000000000..869fb049a3cd85 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.filter; + +import com.github.zafarkhaja.semver.Version; +import java.util.Iterator; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.BrokerFilterBadVersionException; +import org.apache.pulsar.broker.loadbalance.BrokerFilterException; +import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; +import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; + +/** + * Filter by broker version. + */ +@Slf4j +public class BrokerVersionFilter implements BrokerFilter { + + public static final String FILTER_NAME = "broker_version_filter"; + + + /** + * From the given set of available broker candidates, filter those old brokers using the version numbers. + * + * @param brokers The currently available brokers that have not already been filtered. + * @param context The load manager context. + * + */ + @Override + public Map filter(Map brokers, LoadManagerContext context) + throws BrokerFilterException { + ServiceConfiguration conf = context.brokerConfiguration(); + if (!conf.isPreferLaterVersions() || brokers.isEmpty()) { + return brokers; + } + + Version latestVersion; + try { + latestVersion = getLatestVersionNumber(brokers); + if (log.isDebugEnabled()) { + log.debug("Latest broker version found was [{}]", latestVersion); + } + } catch (Exception ex) { + log.warn("Disabling PreferLaterVersions feature; reason: " + ex.getMessage()); + throw new BrokerFilterBadVersionException("Cannot determine newest broker version: " + ex.getMessage()); + } + + int numBrokersLatestVersion = 0; + int numBrokersOlderVersion = 0; + + Iterator> brokerIterator = brokers.entrySet().iterator(); + while (brokerIterator.hasNext()) { + Map.Entry next = brokerIterator.next(); + String brokerId = next.getKey(); + String version = next.getValue().brokerVersion(); + Version brokerVersionVersion = Version.valueOf(version); + if (brokerVersionVersion.equals(latestVersion)) { + log.debug("Broker [{}] is running the latest version ([{}])", brokerId, version); + numBrokersLatestVersion++; + } else { + log.info("Broker [{}] is running an older version ([{}]); latest version is [{}]", + brokerId, version, latestVersion); + numBrokersOlderVersion++; + brokerIterator.remove(); + } + } + if (numBrokersOlderVersion == 0) { + log.info("All {} brokers are running the latest version [{}]", numBrokersLatestVersion, latestVersion); + } + return brokers; + } + + /** + * Get the most recent broker version number from the broker lookup data of all the running brokers. + * The version number is from the build artifact in the pom and got added to the package when it was built by Maven + * + * @param brokerMap + * The BrokerId -> BrokerLookupData Map. + * @return The most recent broker version + * @throws BrokerFilterBadVersionException + * If the most recent version is undefined (e.g., a bad broker version was encountered or a broker + * does not have a version string in its lookup data. + */ + public Version getLatestVersionNumber(Map brokerMap) + throws BrokerFilterBadVersionException { + + if (brokerMap.size() == 0) { + throw new BrokerFilterBadVersionException( + "Unable to determine latest version since broker version map was empty"); + } + + Version latestVersion = null; + for (Map.Entry entry : brokerMap.entrySet()) { + String brokerId = entry.getKey(); + String version = entry.getValue().brokerVersion(); + if (null == version || version.length() == 0) { + log.warn("No version string in lookup data for broker [{}]; disabling PreferLaterVersions feature", + brokerId); + // Trigger the load manager to reset all the brokers to the original set + throw new BrokerFilterBadVersionException("No version string in lookup data for broker \"" + + brokerId + "\""); + } + Version brokerVersionVersion; + try { + brokerVersionVersion = Version.valueOf(version); + } catch (Exception x) { + log.warn("Invalid version string in lookup data for broker [{}]: [{}];" + + " disabling PreferLaterVersions feature", + brokerId, version); + // Trigger the load manager to reset all the brokers to the original set + throw new BrokerFilterBadVersionException("Invalid version string in lookup data for broker \"" + + brokerId + "\": \"" + version + "\")"); + } + + if (latestVersion == null) { + latestVersion = brokerVersionVersion; + } else if (Version.BUILD_AWARE_ORDER.compare(latestVersion, brokerVersionVersion) < 0) { + latestVersion = brokerVersionVersion; + } + } + + return latestVersion; + } + + @Override + public String name() { + return FILTER_NAME; + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java new file mode 100644 index 00000000000000..1fcc3836a6fac4 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.filter; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import java.util.HashMap; +import java.util.Map; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.BrokerFilterBadVersionException; +import org.apache.pulsar.broker.loadbalance.BrokerFilterException; +import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; +import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; +import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener; +import org.testng.annotations.Test; + +/** + * Unit test for {@link BrokerVersionFilter}. + */ +@Test(groups = "broker") +public class BrokerVersionFilterTest { + + + @Test + public void testFilterEmptyBrokerList() throws BrokerFilterException { + BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter(); + Map result = brokerVersionFilter.filter(new HashMap<>(), getContext()); + assertTrue(result.isEmpty()); + } + + @Test + public void testDisabledFilter() throws BrokerFilterException { + LoadManagerContext context = getContext(); + ServiceConfiguration configuration = new ServiceConfiguration(); + configuration.setPreferLaterVersions(false); + doReturn(configuration).when(context).brokerConfiguration(); + + Map originalBrokers = Map.of( + "localhost:6650", getLookupData("2.10.0"), + "localhost:6651", getLookupData("2.10.1") + ); + Map brokers = new HashMap<>(originalBrokers); + BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter(); + Map result = brokerVersionFilter.filter(brokers, context); + assertEquals(result, originalBrokers); + } + + @Test + public void testFilter() throws BrokerFilterException { + Map originalBrokers = Map.of( + "localhost:6650", getLookupData("2.10.0"), + "localhost:6651", getLookupData("2.10.1"), + "localhost:6652", getLookupData("2.10.1"), + "localhost:6653", getLookupData("2.10.1") + ); + BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter(); + Map result = brokerVersionFilter.filter(new HashMap<>(originalBrokers), getContext()); + assertEquals(result, Map.of( + "localhost:6651", getLookupData("2.10.1"), + "localhost:6652", getLookupData("2.10.1"), + "localhost:6653", getLookupData("2.10.1") + )); + + originalBrokers = Map.of( + "localhost:6650", getLookupData("2.10.0"), + "localhost:6651", getLookupData("2.10.1-SNAPSHOT"), + "localhost:6652", getLookupData("2.10.1"), + "localhost:6653", getLookupData("2.10.1") + ); + result = brokerVersionFilter.filter(new HashMap<>(originalBrokers), getContext()); + + assertEquals(result, Map.of( + "localhost:6652", getLookupData("2.10.1"), + "localhost:6653", getLookupData("2.10.1") + )); + + originalBrokers = Map.of( + "localhost:6650", getLookupData("2.10.0"), + "localhost:6651", getLookupData("2.10.1-SNAPSHOT"), + "localhost:6652", getLookupData("2.10.1"), + "localhost:6653", getLookupData("2.10.2-SNAPSHOT") + ); + + result = brokerVersionFilter.filter(new HashMap<>(originalBrokers), getContext()); + assertEquals(result, Map.of( + "localhost:6653", getLookupData("2.10.2-SNAPSHOT") + )); + + } + + @Test(expectedExceptions = BrokerFilterBadVersionException.class) + public void testInvalidVersionString() throws BrokerFilterException { + Map originalBrokers = Map.of( + "localhost:6650", getLookupData("xxx") + ); + BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter(); + brokerVersionFilter.filter(new HashMap<>(originalBrokers), getContext()); + } + + public LoadManagerContext getContext() { + LoadManagerContext mockContext = mock(LoadManagerContext.class); + ServiceConfiguration configuration = new ServiceConfiguration(); + configuration.setPreferLaterVersions(true); + doReturn(configuration).when(mockContext).brokerConfiguration(); + return mockContext; + } + + public BrokerLookupData getLookupData(String version) { + String webServiceUrl = "http://localhost:8080"; + String webServiceUrlTls = "https://localhoss:8081"; + String pulsarServiceUrl = "pulsar://localhost:6650"; + String pulsarServiceUrlTls = "pulsar+ssl://localhost:6651"; + Map advertisedListeners = new HashMap<>(); + Map protocols = new HashMap<>(){{ + put("kafka", "9092"); + }}; + return new BrokerLookupData( + webServiceUrl, webServiceUrlTls, pulsarServiceUrl, + pulsarServiceUrlTls, advertisedListeners, protocols, true, true, version); + } +}