From 9a97c843a46e23a0811e2172991cd00a3af642c0 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 28 Aug 2024 09:56:05 +0800 Subject: [PATCH] [feat][broker] PIP-368: Support lookup based on the lookup properties (#23223) PIP: https://github.com/apache/pulsar/pull/23075 ### Motivation This is the implementation for the PIP: https://github.com/apache/pulsar/pull/23075 Currently, the lookup process uses only the topic name as its parameter. However, to enhance this process, it's beneficial for clients to provide additional information. This could be done by introducing the `lookupProperties` field in the client configuration. Clients can then share these properties with the broker during lookup. On the broker side, the broker could also contain some properties that are used for the lookup. We can also support the lookupProperties for the broker. The broker can use these properties to make a better decision on which broker to return. Here is the rack-aware lookup scenario for using the client properties for the lookup: Assuming there are two brokers that broker-0 configures the lookup property "rack" with "A" and broker-1 configures the lookup property "rack" with "B". By using the lookup properties, clients can supply rack information during the lookup, enabling the broker to identify and connect them to the nearest broker within the same rack. If a client that configures the "rack" property with "A" connects to a lookup broker, the customized load manager can determine broker-0 as the owner broker since the broker and the client have the same rack property. ### Modifications - Add new configuration `lookupProperties` to the client. While looking up the broker, the client will send the properties to the broker through `CommandLookupTopic` request. - Add `properties` field to the `CommandLookupTopic`. - Add `lookupProperties` to the `LookupOptions`. The Load Manager implementation can access the `properties` through `LookupOptions` to make a better decision on which broker to return. - Introduce a new broker configuration `lookupPropertyPrefix`. Any broker configuration properties that start with the `lookupPropertyPrefix` will be included into the `BrokerLookupData` and be persisted in the metadata store. The broker can use these properties during the lookup. Co-authored-by: Yunze Xu --- .../pulsar/broker/ServiceConfiguration.java | 19 +++ .../extensions/BrokerRegistryImpl.java | 3 +- .../extensions/data/BrokerLookupData.java | 3 +- .../pulsar/broker/lookup/TopicLookupBase.java | 8 +- .../broker/namespace/LookupOptions.java | 2 + .../pulsar/broker/service/ServerCnx.java | 13 ++- .../extensions/data/BrokerLookupDataTest.java | 4 +- .../filter/BrokerFilterTestBase.java | 3 +- .../BrokerIsolationPoliciesFilterTest.java | 3 +- .../manager/RedirectManagerTest.java | 4 +- .../scheduler/TransferShedderTest.java | 3 +- .../client/api/LookupPropertiesTest.java | 110 ++++++++++++++++++ .../naming/ServiceConfigurationTest.java | 14 +++ .../pulsar/client/api/ClientBuilder.java | 12 ++ .../client/impl/BinaryProtoLookupService.java | 3 +- .../pulsar/client/impl/ClientBuilderImpl.java | 6 + .../impl/conf/ClientConfigurationData.java | 11 ++ .../impl/BinaryProtoLookupServiceTest.java | 2 + .../pulsar/common/protocol/Commands.java | 8 +- pulsar-common/src/main/proto/PulsarApi.proto | 2 + 20 files changed, 220 insertions(+), 13 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/LookupPropertiesTest.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index c836879b075f1..6488ace991e2f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -21,9 +21,11 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; @@ -2946,6 +2948,13 @@ public double getLoadBalancerBandwidthOutResourceWeight() { @com.fasterxml.jackson.annotation.JsonIgnore private Properties properties = new Properties(); + @FieldContext( + category = CATEGORY_SERVER, + doc = "The properties whose name starts with this prefix will be uploaded to the metadata store for " + + " the topic lookup" + ) + private String lookupPropertyPrefix = "lookup."; + @FieldContext( dynamic = true, category = CATEGORY_SERVER, @@ -3743,4 +3752,14 @@ public int getTopicOrderedExecutorThreadNum() { public boolean isSystemTopicAndTopicLevelPoliciesEnabled() { return topicLevelPoliciesEnabled && systemTopicEnabled; } + + public Map lookupProperties() { + final var map = new HashMap(); + properties.forEach((key, value) -> { + if (key instanceof String && value instanceof String && ((String) key).startsWith(lookupPropertyPrefix)) { + map.put((String) key, (String) value); + } + }); + return map; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java index 18e30ddf922d0..5db11d40c33ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java @@ -94,7 +94,8 @@ public BrokerRegistryImpl(PulsarService pulsar) { pulsar.getConfiguration().isEnableNonPersistentTopics(), conf.getLoadManagerClassName(), System.currentTimeMillis(), - pulsar.getBrokerVersion()); + pulsar.getBrokerVersion(), + pulsar.getConfig().lookupProperties()); this.state = State.Init; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java index 50a2b70404039..5d982076bd609 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java @@ -41,7 +41,8 @@ public record BrokerLookupData (String webServiceUrl, boolean nonPersistentTopicsEnabled, String loadManagerClassName, long startTimestamp, - String brokerVersion) implements ServiceLookupData { + String brokerVersion, + Map properties) implements ServiceLookupData { @Override public String getWebServiceUrl() { return this.webServiceUrl(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java index 9a05c3d992aaf..42f145d32aab1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java @@ -24,6 +24,8 @@ import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; +import java.util.Collections; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import javax.ws.rs.Encoded; @@ -180,7 +182,7 @@ protected String internalGetNamespaceBundle(TopicName topicName) { public static CompletableFuture lookupTopicAsync(PulsarService pulsarService, TopicName topicName, boolean authoritative, String clientAppId, AuthenticationDataSource authenticationData, long requestId) { return lookupTopicAsync(pulsarService, topicName, authoritative, clientAppId, - authenticationData, requestId, null); + authenticationData, requestId, null, Collections.emptyMap()); } /** @@ -208,7 +210,8 @@ public static CompletableFuture lookupTopicAsync(PulsarService pulsarSe public static CompletableFuture lookupTopicAsync(PulsarService pulsarService, TopicName topicName, boolean authoritative, String clientAppId, AuthenticationDataSource authenticationData, - long requestId, final String advertisedListenerName) { + long requestId, final String advertisedListenerName, + Map properties) { final CompletableFuture validationFuture = new CompletableFuture<>(); final CompletableFuture lookupfuture = new CompletableFuture<>(); @@ -299,6 +302,7 @@ public static CompletableFuture lookupTopicAsync(PulsarService pulsarSe .authoritative(authoritative) .advertisedListenerName(advertisedListenerName) .loadTopicsInBundle(true) + .properties(properties) .build(); pulsarService.getNamespaceService().getBrokerServiceUrlAsync(topicName, options) .thenAccept(lookupResult -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/LookupOptions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/LookupOptions.java index 431266682c51c..be5450646329d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/LookupOptions.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/LookupOptions.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.namespace; +import java.util.Map; import lombok.Builder; import lombok.Data; import org.apache.commons.lang3.StringUtils; @@ -46,6 +47,7 @@ public class LookupOptions { private final boolean requestHttps; private final String advertisedListenerName; + private final Map properties; public boolean hasAdvertisedListenerName() { return StringUtils.isNotBlank(advertisedListenerName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 2f9e9b2a1ac2d..d1fe9776e079d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -47,6 +47,7 @@ import java.net.SocketAddress; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; @@ -544,9 +545,19 @@ protected void handleLookup(CommandLookupTopic lookup) { isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, authenticationData, originalAuthData).thenApply( isAuthorized -> { if (isAuthorized) { + final Map properties; + if (lookup.getPropertiesCount() > 0) { + properties = new HashMap<>(); + for (int i = 0; i < lookup.getPropertiesCount(); i++) { + final var keyValue = lookup.getPropertyAt(i); + properties.put(keyValue.getKey(), keyValue.getValue()); + } + } else { + properties = Collections.emptyMap(); + } lookupTopicAsync(getBrokerService().pulsar(), topicName, authoritative, getPrincipal(), getAuthenticationData(), - requestId, advertisedListenerName).handle((lookupResponse, ex) -> { + requestId, advertisedListenerName, properties).handle((lookupResponse, ex) -> { if (ex == null) { writeAndFlush(lookupResponse); } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupDataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupDataTest.java index 66e8c917d1fc5..0a9742fd76175 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupDataTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupDataTest.java @@ -24,6 +24,7 @@ import java.net.URI; import java.net.URISyntaxException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -58,7 +59,8 @@ public void testConstructors() throws PulsarServerException, URISyntaxException BrokerLookupData lookupData = new BrokerLookupData( webServiceUrl, webServiceUrlTls, pulsarServiceUrl, pulsarServiceUrlTls, advertisedListeners, protocols, true, true, - ExtensibleLoadManagerImpl.class.getName(), System.currentTimeMillis(),"3.0"); + ExtensibleLoadManagerImpl.class.getName(), System.currentTimeMillis(),"3.0", + Collections.emptyMap()); assertEquals(webServiceUrl, lookupData.webServiceUrl()); assertEquals(webServiceUrlTls, lookupData.webServiceUrlTls()); assertEquals(pulsarServiceUrl, lookupData.pulsarServiceUrl()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java index a120ef473e9a5..ab0065e0aa5ba 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.mock; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -136,6 +137,6 @@ public BrokerLookupData getLookupData(String version, String loadManagerClassNam return new BrokerLookupData( webServiceUrl, webServiceUrlTls, pulsarServiceUrl, pulsarServiceUrlTls, advertisedListeners, protocols, true, true, - loadManagerClassName, -1, version); + loadManagerClassName, -1, version, Collections.emptyMap()); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java index 87aaf4bac7fae..d3553bd25d1fa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java @@ -28,6 +28,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -218,7 +219,7 @@ public BrokerLookupData getLookupData(boolean persistentTopicsEnabled, webServiceUrl, webServiceUrlTls, pulsarServiceUrl, pulsarServiceUrlTls, advertisedListeners, protocols, persistentTopicsEnabled, nonPersistentTopicsEnabled, - ExtensibleLoadManagerImpl.class.getName(), System.currentTimeMillis(), "3.0.0"); + ExtensibleLoadManagerImpl.class.getName(), System.currentTimeMillis(), "3.0.0", Collections.emptyMap()); } public LoadManagerContext getContext() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerTest.java index cbf77b59d5ad6..f2e9cf86868e2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerTest.java @@ -33,6 +33,8 @@ import org.apache.pulsar.broker.lookup.LookupResult; import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener; import org.testng.annotations.Test; + +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -106,6 +108,6 @@ public BrokerLookupData getLookupData(String broker, String loadManagerClassName return new BrokerLookupData( webServiceUrl, webServiceUrlTls, pulsarServiceUrl, pulsarServiceUrlTls, advertisedListeners, protocols, true, true, - loadManagerClassName, startTimeStamp, "3.0.0"); + loadManagerClassName, startTimeStamp, "3.0.0", Collections.emptyMap()); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java index efca2880949f2..48bef15b5f80a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java @@ -46,6 +46,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -697,7 +698,7 @@ public BrokerLookupData getLookupData() { webServiceUrl, webServiceUrlTls, pulsarServiceUrl, pulsarServiceUrlTls, advertisedListeners, protocols, true, true, - conf.getLoadManagerClassName(), System.currentTimeMillis(), "3.0.0"); + conf.getLoadManagerClassName(), System.currentTimeMillis(), "3.0.0", Collections.emptyMap()); } private void setIsolationPolicies(SimpleResourceAllocationPolicies policies, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/LookupPropertiesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/LookupPropertiesTest.java new file mode 100644 index 0000000000000..cb8b2d1e526af --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/LookupPropertiesTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.MultiBrokerBaseTest; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; +import org.apache.pulsar.broker.namespace.LookupOptions; +import org.apache.pulsar.client.impl.PartitionedProducerImpl; +import org.apache.pulsar.client.impl.ProducerImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.ServiceUnitId; +import org.testng.Assert; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-api") +public class LookupPropertiesTest extends MultiBrokerBaseTest { + + private static final String BROKER_KEY = "lookup.broker.id"; + private static final String CLIENT_KEY = "broker.id"; + + @Override + protected void startBroker() throws Exception { + addCustomConfigs(conf, 0); + super.startBroker(); + } + + @Override + protected ServiceConfiguration createConfForAdditionalBroker(int additionalBrokerIndex) { + return addCustomConfigs(getDefaultConf(), additionalBrokerIndex + 10); + } + + private static ServiceConfiguration addCustomConfigs(ServiceConfiguration config, int index) { + config.setDefaultNumberOfNamespaceBundles(16); + config.setLoadBalancerAutoBundleSplitEnabled(false); + config.setLoadManagerClassName(BrokerIdAwareLoadManager.class.getName()); + config.setLoadBalancerAverageResourceUsageDifferenceThresholdPercentage(100); + config.setLoadBalancerDebugModeEnabled(true); + config.setBrokerShutdownTimeoutMs(1000); + final var properties = new Properties(); + properties.setProperty(BROKER_KEY, "broker-" + index); + config.setProperties(properties); + return config; + } + + @Test + public void testLookupProperty() throws Exception { + final var topic = "test-lookup-property"; + admin.topics().createPartitionedTopic(topic, 16); + @Cleanup final var client = (PulsarClientImpl) PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .lookupProperties( + Collections.singletonMap(CLIENT_KEY, "broker-10")) // broker-10 refers to additionalBrokers[0] + .build(); + @Cleanup final var producer = (PartitionedProducerImpl) client.newProducer().topic(topic).create(); + Assert.assertNotNull(producer); + final var connections = producer.getProducers().stream().map(ProducerImpl::getClientCnx) + .collect(Collectors.toSet()); + Assert.assertEquals(connections.size(), 1); + final var port = ((InetSocketAddress) connections.stream().findAny().orElseThrow().ctx().channel() + .remoteAddress()).getPort(); + Assert.assertEquals(port, additionalBrokers.get(0).getBrokerListenPort().orElseThrow()); + } + + public static class BrokerIdAwareLoadManager extends ExtensibleLoadManagerImpl { + @Override + public CompletableFuture> selectAsync(ServiceUnitId bundle, Set excludeBrokerSet, + LookupOptions options) { + final var clientId = options.getProperties() == null ? null : options.getProperties().get(CLIENT_KEY); + if (clientId == null) { + return super.selectAsync(bundle, excludeBrokerSet, options); + } + return getBrokerRegistry().getAvailableBrokerLookupDataAsync().thenCompose(brokerLookupDataMap -> { + final var optBroker = brokerLookupDataMap.entrySet().stream().filter(entry -> { + final var brokerId = entry.getValue().properties().get(BROKER_KEY); + return brokerId != null && brokerId.equals(clientId); + }).findAny(); + return optBroker.map(Map.Entry::getKey).map(Optional::of).map(CompletableFuture::completedFuture) + .orElseGet(() -> super.selectAsync(bundle, excludeBrokerSet, options)); + }); + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java index 77bb36eb68de1..5972c6f724d8c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java @@ -35,6 +35,7 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; @@ -393,4 +394,17 @@ public void testTopicNameCacheConfiguration() throws Exception { assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 2); assertEquals(conf.getTopicNameCacheMaxCapacity(), 100); } + + @Test + public void testLookupProperties() throws Exception { + var confFile = "lookup.key1=value1\nkey=value\nlookup.key2=value2"; + var conf = (ServiceConfiguration) PulsarConfigurationLoader.create( + new ByteArrayInputStream(confFile.getBytes()), ServiceConfiguration.class); + assertEquals(conf.lookupProperties(), Map.of("lookup.key1", "value1", "lookup.key2", "value2")); + + confFile = confFile + "\nlookupPropertyPrefix=lookup.key2"; + conf = PulsarConfigurationLoader.create(new ByteArrayInputStream(confFile.getBytes()), + ServiceConfiguration.class); + assertEquals(conf.lookupProperties(), Map.of("lookup.key2", "value2")); + } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java index 4adf7d89b0e33..73ad555165c05 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java @@ -668,4 +668,16 @@ ClientBuilder authentication(String authPluginClassName, Map aut * @return the client builder instance */ ClientBuilder autoCertRefreshSeconds(int autoCertRefreshSeconds); + + /** + * Set the properties used for topic lookup. + *

+ * When the broker performs topic lookup, these lookup properties will be taken into consideration in a customized + * load manager. + *

+ * Note: The lookup properties are only used in topic lookup when: + * - The protocol is binary protocol, i.e. the service URL starts with "pulsar://" or "pulsar+ssl://" + * - The `loadManagerClassName` config in broker is a class that implements the `ExtensibleLoadManager` interface + */ + ClientBuilder lookupProperties(Map properties); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index 6ee6fafde1c25..9dd04acce7ee3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -178,7 +178,8 @@ private CompletableFuture findBroker(InetSocketAddress socket client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { long requestId = client.newRequestId(); - ByteBuf request = Commands.newLookup(topicName.toString(), listenerName, authoritative, requestId); + ByteBuf request = Commands.newLookup(topicName.toString(), listenerName, authoritative, requestId, + client.getConfiguration().getLookupProperties()); clientCnx.newLookup(request, requestId).whenComplete((r, t) -> { if (t != null) { // lookup failed diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java index d9edc53b50e37..6923218676743 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java @@ -476,4 +476,10 @@ public ClientBuilder description(String description) { conf.setDescription(description); return this; } + + @Override + public ClientBuilder lookupProperties(Map properties) { + conf.setLookupProperties(properties); + return this; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java index e2713644af641..c1c2e75925502 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java @@ -28,6 +28,7 @@ import java.net.URI; import java.time.Clock; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -412,6 +413,8 @@ public class ClientConfigurationData implements Serializable, Cloneable { ) private String description; + private Map lookupProperties; + private transient OpenTelemetry openTelemetry; /** @@ -477,4 +480,12 @@ public String getSocks5ProxyUsername() { public String getSocks5ProxyPassword() { return Objects.nonNull(socks5ProxyPassword) ? socks5ProxyPassword : System.getProperty("socks5Proxy.password"); } + + public void setLookupProperties(Map lookupProperties) { + this.lookupProperties = Collections.unmodifiableMap(lookupProperties); + } + + public Map getLookupProperties() { + return (lookupProperties == null) ? Collections.emptyMap() : Collections.unmodifiableMap(lookupProperties); + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java index 983cd21a7a9d8..f691215b04e08 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java @@ -70,6 +70,8 @@ public void setup() throws Exception { doReturn(cnxPool).when(client).getCnxPool(); doReturn(clientConfig).when(client).getConfiguration(); doReturn(1L).when(client).newRequestId(); + ClientConfigurationData data = new ClientConfigurationData(); + doReturn(data).when(client).getConfiguration(); lookup = spy( new BinaryProtoLookupService(client, "pulsar://localhost:6650", false, mock(ExecutorService.class))); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 224e093baf112..3fb2fd5ad3d25 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -936,10 +936,11 @@ public static ByteBuf newPartitionMetadataResponse(int partitions, long requestI } public static ByteBuf newLookup(String topic, boolean authoritative, long requestId) { - return newLookup(topic, null, authoritative, requestId); + return newLookup(topic, null, authoritative, requestId, null); } - public static ByteBuf newLookup(String topic, String listenerName, boolean authoritative, long requestId) { + public static ByteBuf newLookup(String topic, String listenerName, boolean authoritative, long requestId, + Map properties) { BaseCommand cmd = localCmd(Type.LOOKUP); CommandLookupTopic lookup = cmd.setLookupTopic() .setTopic(topic) @@ -948,6 +949,9 @@ public static ByteBuf newLookup(String topic, String listenerName, boolean autho if (StringUtils.isNotBlank(listenerName)) { lookup.setAdvertisedListenerName(listenerName); } + if (properties != null) { + properties.forEach((key, value) -> lookup.addProperty().setKey(key).setValue(value)); + } return serializeWithSize(cmd); } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 5067ed64079c9..19658c5e57ff9 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -446,6 +446,8 @@ message CommandLookupTopic { optional string original_auth_method = 6; // optional string advertised_listener_name = 7; + // The properties used for topic lookup + repeated KeyValue properties = 8; } message CommandLookupTopicResponse {