diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index c836879b075f1..6488ace991e2f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -21,9 +21,11 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; @@ -2946,6 +2948,13 @@ public double getLoadBalancerBandwidthOutResourceWeight() { @com.fasterxml.jackson.annotation.JsonIgnore private Properties properties = new Properties(); + @FieldContext( + category = CATEGORY_SERVER, + doc = "The properties whose name starts with this prefix will be uploaded to the metadata store for " + + " the topic lookup" + ) + private String lookupPropertyPrefix = "lookup."; + @FieldContext( dynamic = true, category = CATEGORY_SERVER, @@ -3743,4 +3752,14 @@ public int getTopicOrderedExecutorThreadNum() { public boolean isSystemTopicAndTopicLevelPoliciesEnabled() { return topicLevelPoliciesEnabled && systemTopicEnabled; } + + public Map lookupProperties() { + final var map = new HashMap(); + properties.forEach((key, value) -> { + if (key instanceof String && value instanceof String && ((String) key).startsWith(lookupPropertyPrefix)) { + map.put((String) key, (String) value); + } + }); + return map; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java index 18e30ddf922d0..5db11d40c33ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java @@ -94,7 +94,8 @@ public BrokerRegistryImpl(PulsarService pulsar) { pulsar.getConfiguration().isEnableNonPersistentTopics(), conf.getLoadManagerClassName(), System.currentTimeMillis(), - pulsar.getBrokerVersion()); + pulsar.getBrokerVersion(), + pulsar.getConfig().lookupProperties()); this.state = State.Init; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java index 50a2b70404039..5d982076bd609 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java @@ -41,7 +41,8 @@ public record BrokerLookupData (String webServiceUrl, boolean nonPersistentTopicsEnabled, String loadManagerClassName, long startTimestamp, - String brokerVersion) implements ServiceLookupData { + String brokerVersion, + Map properties) implements ServiceLookupData { @Override public String getWebServiceUrl() { return this.webServiceUrl(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java index 9a05c3d992aaf..42f145d32aab1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java @@ -24,6 +24,8 @@ import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; +import java.util.Collections; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import javax.ws.rs.Encoded; @@ -180,7 +182,7 @@ protected String internalGetNamespaceBundle(TopicName topicName) { public static CompletableFuture lookupTopicAsync(PulsarService pulsarService, TopicName topicName, boolean authoritative, String clientAppId, AuthenticationDataSource authenticationData, long requestId) { return lookupTopicAsync(pulsarService, topicName, authoritative, clientAppId, - authenticationData, requestId, null); + authenticationData, requestId, null, Collections.emptyMap()); } /** @@ -208,7 +210,8 @@ public static CompletableFuture lookupTopicAsync(PulsarService pulsarSe public static CompletableFuture lookupTopicAsync(PulsarService pulsarService, TopicName topicName, boolean authoritative, String clientAppId, AuthenticationDataSource authenticationData, - long requestId, final String advertisedListenerName) { + long requestId, final String advertisedListenerName, + Map properties) { final CompletableFuture validationFuture = new CompletableFuture<>(); final CompletableFuture lookupfuture = new CompletableFuture<>(); @@ -299,6 +302,7 @@ public static CompletableFuture lookupTopicAsync(PulsarService pulsarSe .authoritative(authoritative) .advertisedListenerName(advertisedListenerName) .loadTopicsInBundle(true) + .properties(properties) .build(); pulsarService.getNamespaceService().getBrokerServiceUrlAsync(topicName, options) .thenAccept(lookupResult -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/LookupOptions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/LookupOptions.java index 431266682c51c..be5450646329d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/LookupOptions.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/LookupOptions.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.namespace; +import java.util.Map; import lombok.Builder; import lombok.Data; import org.apache.commons.lang3.StringUtils; @@ -46,6 +47,7 @@ public class LookupOptions { private final boolean requestHttps; private final String advertisedListenerName; + private final Map properties; public boolean hasAdvertisedListenerName() { return StringUtils.isNotBlank(advertisedListenerName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 2f9e9b2a1ac2d..d1fe9776e079d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -47,6 +47,7 @@ import java.net.SocketAddress; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; @@ -544,9 +545,19 @@ protected void handleLookup(CommandLookupTopic lookup) { isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, authenticationData, originalAuthData).thenApply( isAuthorized -> { if (isAuthorized) { + final Map properties; + if (lookup.getPropertiesCount() > 0) { + properties = new HashMap<>(); + for (int i = 0; i < lookup.getPropertiesCount(); i++) { + final var keyValue = lookup.getPropertyAt(i); + properties.put(keyValue.getKey(), keyValue.getValue()); + } + } else { + properties = Collections.emptyMap(); + } lookupTopicAsync(getBrokerService().pulsar(), topicName, authoritative, getPrincipal(), getAuthenticationData(), - requestId, advertisedListenerName).handle((lookupResponse, ex) -> { + requestId, advertisedListenerName, properties).handle((lookupResponse, ex) -> { if (ex == null) { writeAndFlush(lookupResponse); } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupDataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupDataTest.java index 66e8c917d1fc5..0a9742fd76175 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupDataTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupDataTest.java @@ -24,6 +24,7 @@ import java.net.URI; import java.net.URISyntaxException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -58,7 +59,8 @@ public void testConstructors() throws PulsarServerException, URISyntaxException BrokerLookupData lookupData = new BrokerLookupData( webServiceUrl, webServiceUrlTls, pulsarServiceUrl, pulsarServiceUrlTls, advertisedListeners, protocols, true, true, - ExtensibleLoadManagerImpl.class.getName(), System.currentTimeMillis(),"3.0"); + ExtensibleLoadManagerImpl.class.getName(), System.currentTimeMillis(),"3.0", + Collections.emptyMap()); assertEquals(webServiceUrl, lookupData.webServiceUrl()); assertEquals(webServiceUrlTls, lookupData.webServiceUrlTls()); assertEquals(pulsarServiceUrl, lookupData.pulsarServiceUrl()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java index a120ef473e9a5..ab0065e0aa5ba 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.mock; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -136,6 +137,6 @@ public BrokerLookupData getLookupData(String version, String loadManagerClassNam return new BrokerLookupData( webServiceUrl, webServiceUrlTls, pulsarServiceUrl, pulsarServiceUrlTls, advertisedListeners, protocols, true, true, - loadManagerClassName, -1, version); + loadManagerClassName, -1, version, Collections.emptyMap()); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java index 87aaf4bac7fae..d3553bd25d1fa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java @@ -28,6 +28,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -218,7 +219,7 @@ public BrokerLookupData getLookupData(boolean persistentTopicsEnabled, webServiceUrl, webServiceUrlTls, pulsarServiceUrl, pulsarServiceUrlTls, advertisedListeners, protocols, persistentTopicsEnabled, nonPersistentTopicsEnabled, - ExtensibleLoadManagerImpl.class.getName(), System.currentTimeMillis(), "3.0.0"); + ExtensibleLoadManagerImpl.class.getName(), System.currentTimeMillis(), "3.0.0", Collections.emptyMap()); } public LoadManagerContext getContext() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerTest.java index cbf77b59d5ad6..f2e9cf86868e2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerTest.java @@ -33,6 +33,8 @@ import org.apache.pulsar.broker.lookup.LookupResult; import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener; import org.testng.annotations.Test; + +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -106,6 +108,6 @@ public BrokerLookupData getLookupData(String broker, String loadManagerClassName return new BrokerLookupData( webServiceUrl, webServiceUrlTls, pulsarServiceUrl, pulsarServiceUrlTls, advertisedListeners, protocols, true, true, - loadManagerClassName, startTimeStamp, "3.0.0"); + loadManagerClassName, startTimeStamp, "3.0.0", Collections.emptyMap()); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java index efca2880949f2..48bef15b5f80a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java @@ -46,6 +46,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -697,7 +698,7 @@ public BrokerLookupData getLookupData() { webServiceUrl, webServiceUrlTls, pulsarServiceUrl, pulsarServiceUrlTls, advertisedListeners, protocols, true, true, - conf.getLoadManagerClassName(), System.currentTimeMillis(), "3.0.0"); + conf.getLoadManagerClassName(), System.currentTimeMillis(), "3.0.0", Collections.emptyMap()); } private void setIsolationPolicies(SimpleResourceAllocationPolicies policies, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/LookupPropertiesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/LookupPropertiesTest.java new file mode 100644 index 0000000000000..cb8b2d1e526af --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/LookupPropertiesTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.MultiBrokerBaseTest; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; +import org.apache.pulsar.broker.namespace.LookupOptions; +import org.apache.pulsar.client.impl.PartitionedProducerImpl; +import org.apache.pulsar.client.impl.ProducerImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.ServiceUnitId; +import org.testng.Assert; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-api") +public class LookupPropertiesTest extends MultiBrokerBaseTest { + + private static final String BROKER_KEY = "lookup.broker.id"; + private static final String CLIENT_KEY = "broker.id"; + + @Override + protected void startBroker() throws Exception { + addCustomConfigs(conf, 0); + super.startBroker(); + } + + @Override + protected ServiceConfiguration createConfForAdditionalBroker(int additionalBrokerIndex) { + return addCustomConfigs(getDefaultConf(), additionalBrokerIndex + 10); + } + + private static ServiceConfiguration addCustomConfigs(ServiceConfiguration config, int index) { + config.setDefaultNumberOfNamespaceBundles(16); + config.setLoadBalancerAutoBundleSplitEnabled(false); + config.setLoadManagerClassName(BrokerIdAwareLoadManager.class.getName()); + config.setLoadBalancerAverageResourceUsageDifferenceThresholdPercentage(100); + config.setLoadBalancerDebugModeEnabled(true); + config.setBrokerShutdownTimeoutMs(1000); + final var properties = new Properties(); + properties.setProperty(BROKER_KEY, "broker-" + index); + config.setProperties(properties); + return config; + } + + @Test + public void testLookupProperty() throws Exception { + final var topic = "test-lookup-property"; + admin.topics().createPartitionedTopic(topic, 16); + @Cleanup final var client = (PulsarClientImpl) PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .lookupProperties( + Collections.singletonMap(CLIENT_KEY, "broker-10")) // broker-10 refers to additionalBrokers[0] + .build(); + @Cleanup final var producer = (PartitionedProducerImpl) client.newProducer().topic(topic).create(); + Assert.assertNotNull(producer); + final var connections = producer.getProducers().stream().map(ProducerImpl::getClientCnx) + .collect(Collectors.toSet()); + Assert.assertEquals(connections.size(), 1); + final var port = ((InetSocketAddress) connections.stream().findAny().orElseThrow().ctx().channel() + .remoteAddress()).getPort(); + Assert.assertEquals(port, additionalBrokers.get(0).getBrokerListenPort().orElseThrow()); + } + + public static class BrokerIdAwareLoadManager extends ExtensibleLoadManagerImpl { + @Override + public CompletableFuture> selectAsync(ServiceUnitId bundle, Set excludeBrokerSet, + LookupOptions options) { + final var clientId = options.getProperties() == null ? null : options.getProperties().get(CLIENT_KEY); + if (clientId == null) { + return super.selectAsync(bundle, excludeBrokerSet, options); + } + return getBrokerRegistry().getAvailableBrokerLookupDataAsync().thenCompose(brokerLookupDataMap -> { + final var optBroker = brokerLookupDataMap.entrySet().stream().filter(entry -> { + final var brokerId = entry.getValue().properties().get(BROKER_KEY); + return brokerId != null && brokerId.equals(clientId); + }).findAny(); + return optBroker.map(Map.Entry::getKey).map(Optional::of).map(CompletableFuture::completedFuture) + .orElseGet(() -> super.selectAsync(bundle, excludeBrokerSet, options)); + }); + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java index 77bb36eb68de1..5972c6f724d8c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java @@ -35,6 +35,7 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; @@ -393,4 +394,17 @@ public void testTopicNameCacheConfiguration() throws Exception { assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 2); assertEquals(conf.getTopicNameCacheMaxCapacity(), 100); } + + @Test + public void testLookupProperties() throws Exception { + var confFile = "lookup.key1=value1\nkey=value\nlookup.key2=value2"; + var conf = (ServiceConfiguration) PulsarConfigurationLoader.create( + new ByteArrayInputStream(confFile.getBytes()), ServiceConfiguration.class); + assertEquals(conf.lookupProperties(), Map.of("lookup.key1", "value1", "lookup.key2", "value2")); + + confFile = confFile + "\nlookupPropertyPrefix=lookup.key2"; + conf = PulsarConfigurationLoader.create(new ByteArrayInputStream(confFile.getBytes()), + ServiceConfiguration.class); + assertEquals(conf.lookupProperties(), Map.of("lookup.key2", "value2")); + } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java index 4adf7d89b0e33..73ad555165c05 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java @@ -668,4 +668,16 @@ ClientBuilder authentication(String authPluginClassName, Map aut * @return the client builder instance */ ClientBuilder autoCertRefreshSeconds(int autoCertRefreshSeconds); + + /** + * Set the properties used for topic lookup. + *

+ * When the broker performs topic lookup, these lookup properties will be taken into consideration in a customized + * load manager. + *

+ * Note: The lookup properties are only used in topic lookup when: + * - The protocol is binary protocol, i.e. the service URL starts with "pulsar://" or "pulsar+ssl://" + * - The `loadManagerClassName` config in broker is a class that implements the `ExtensibleLoadManager` interface + */ + ClientBuilder lookupProperties(Map properties); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index 6ee6fafde1c25..9dd04acce7ee3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -178,7 +178,8 @@ private CompletableFuture findBroker(InetSocketAddress socket client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { long requestId = client.newRequestId(); - ByteBuf request = Commands.newLookup(topicName.toString(), listenerName, authoritative, requestId); + ByteBuf request = Commands.newLookup(topicName.toString(), listenerName, authoritative, requestId, + client.getConfiguration().getLookupProperties()); clientCnx.newLookup(request, requestId).whenComplete((r, t) -> { if (t != null) { // lookup failed diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java index d9edc53b50e37..6923218676743 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java @@ -476,4 +476,10 @@ public ClientBuilder description(String description) { conf.setDescription(description); return this; } + + @Override + public ClientBuilder lookupProperties(Map properties) { + conf.setLookupProperties(properties); + return this; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java index e2713644af641..c1c2e75925502 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java @@ -28,6 +28,7 @@ import java.net.URI; import java.time.Clock; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -412,6 +413,8 @@ public class ClientConfigurationData implements Serializable, Cloneable { ) private String description; + private Map lookupProperties; + private transient OpenTelemetry openTelemetry; /** @@ -477,4 +480,12 @@ public String getSocks5ProxyUsername() { public String getSocks5ProxyPassword() { return Objects.nonNull(socks5ProxyPassword) ? socks5ProxyPassword : System.getProperty("socks5Proxy.password"); } + + public void setLookupProperties(Map lookupProperties) { + this.lookupProperties = Collections.unmodifiableMap(lookupProperties); + } + + public Map getLookupProperties() { + return (lookupProperties == null) ? Collections.emptyMap() : Collections.unmodifiableMap(lookupProperties); + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java index 983cd21a7a9d8..f691215b04e08 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java @@ -70,6 +70,8 @@ public void setup() throws Exception { doReturn(cnxPool).when(client).getCnxPool(); doReturn(clientConfig).when(client).getConfiguration(); doReturn(1L).when(client).newRequestId(); + ClientConfigurationData data = new ClientConfigurationData(); + doReturn(data).when(client).getConfiguration(); lookup = spy( new BinaryProtoLookupService(client, "pulsar://localhost:6650", false, mock(ExecutorService.class))); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 224e093baf112..3fb2fd5ad3d25 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -936,10 +936,11 @@ public static ByteBuf newPartitionMetadataResponse(int partitions, long requestI } public static ByteBuf newLookup(String topic, boolean authoritative, long requestId) { - return newLookup(topic, null, authoritative, requestId); + return newLookup(topic, null, authoritative, requestId, null); } - public static ByteBuf newLookup(String topic, String listenerName, boolean authoritative, long requestId) { + public static ByteBuf newLookup(String topic, String listenerName, boolean authoritative, long requestId, + Map properties) { BaseCommand cmd = localCmd(Type.LOOKUP); CommandLookupTopic lookup = cmd.setLookupTopic() .setTopic(topic) @@ -948,6 +949,9 @@ public static ByteBuf newLookup(String topic, String listenerName, boolean autho if (StringUtils.isNotBlank(listenerName)) { lookup.setAdvertisedListenerName(listenerName); } + if (properties != null) { + properties.forEach((key, value) -> lookup.addProperty().setKey(key).setValue(value)); + } return serializeWithSize(cmd); } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 5067ed64079c9..19658c5e57ff9 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -446,6 +446,8 @@ message CommandLookupTopic { optional string original_auth_method = 6; // optional string advertised_listener_name = 7; + // The properties used for topic lookup + repeated KeyValue properties = 8; } message CommandLookupTopicResponse {