Skip to content

Commit

Permalink
[feat][broker] PIP-368: Support lookup based on the lookup properties (
Browse files Browse the repository at this point in the history
…#23223)

PIP: #23075

### Motivation

This is the implementation for the PIP: #23075
Currently, the lookup process uses only the topic name as its parameter. However, to enhance this process, it's
beneficial for clients to provide additional information. This could be done by introducing the `lookupProperties` field
in the client configuration. Clients can then share these properties with the broker during lookup.

On the broker side, the broker could also contain some properties that are used for the lookup. We can also support the
lookupProperties for the broker. The broker can use these properties to make a better decision on which broker to
return.

Here is the rack-aware lookup scenario for using the client properties for the lookup:
Assuming there are two brokers that broker-0 configures the lookup property "rack" with "A" and broker-1 configures the
lookup property "rack" with "B". By using the lookup properties, clients can supply rack information during the lookup,
enabling the broker to identify and connect them to the nearest broker within the same rack. If a client that configures
the "rack" property with "A" connects to a lookup broker, the customized load manager can determine broker-0 as the
owner broker since the broker and the client have the same rack property.

### Modifications

- Add new configuration `lookupProperties` to the client. While looking up the broker, the client will send the properties
to the broker through `CommandLookupTopic` request.
- Add `properties` field to the `CommandLookupTopic`.
- Add `lookupProperties` to the `LookupOptions`. The Load Manager implementation can access
the `properties` through `LookupOptions` to make a better decision on which broker to return.
- Introduce a new broker configuration `lookupPropertyPrefix`. Any broker configuration properties that start with the `lookupPropertyPrefix`
will be included into the `BrokerLookupData` and be persisted in the metadata store. The broker can use these properties
during the lookup.

Co-authored-by: Yunze Xu <[email protected]>
  • Loading branch information
RobertIndie and BewareMyPower authored Aug 28, 2024
1 parent d9bd6b0 commit 9a97c84
Show file tree
Hide file tree
Showing 20 changed files with 220 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
Expand Down Expand Up @@ -2946,6 +2948,13 @@ public double getLoadBalancerBandwidthOutResourceWeight() {
@com.fasterxml.jackson.annotation.JsonIgnore
private Properties properties = new Properties();

@FieldContext(
category = CATEGORY_SERVER,
doc = "The properties whose name starts with this prefix will be uploaded to the metadata store for "
+ " the topic lookup"
)
private String lookupPropertyPrefix = "lookup.";

@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
Expand Down Expand Up @@ -3743,4 +3752,14 @@ public int getTopicOrderedExecutorThreadNum() {
public boolean isSystemTopicAndTopicLevelPoliciesEnabled() {
return topicLevelPoliciesEnabled && systemTopicEnabled;
}

public Map<String, String> lookupProperties() {
final var map = new HashMap<String, String>();
properties.forEach((key, value) -> {
if (key instanceof String && value instanceof String && ((String) key).startsWith(lookupPropertyPrefix)) {
map.put((String) key, (String) value);
}
});
return map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ public BrokerRegistryImpl(PulsarService pulsar) {
pulsar.getConfiguration().isEnableNonPersistentTopics(),
conf.getLoadManagerClassName(),
System.currentTimeMillis(),
pulsar.getBrokerVersion());
pulsar.getBrokerVersion(),
pulsar.getConfig().lookupProperties());
this.state = State.Init;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public record BrokerLookupData (String webServiceUrl,
boolean nonPersistentTopicsEnabled,
String loadManagerClassName,
long startTimestamp,
String brokerVersion) implements ServiceLookupData {
String brokerVersion,
Map<String, String> properties) implements ServiceLookupData {
@Override
public String getWebServiceUrl() {
return this.webServiceUrl();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.ws.rs.Encoded;
Expand Down Expand Up @@ -180,7 +182,7 @@ protected String internalGetNamespaceBundle(TopicName topicName) {
public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarService, TopicName topicName,
boolean authoritative, String clientAppId, AuthenticationDataSource authenticationData, long requestId) {
return lookupTopicAsync(pulsarService, topicName, authoritative, clientAppId,
authenticationData, requestId, null);
authenticationData, requestId, null, Collections.emptyMap());
}

/**
Expand Down Expand Up @@ -208,7 +210,8 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarService, TopicName topicName,
boolean authoritative, String clientAppId,
AuthenticationDataSource authenticationData,
long requestId, final String advertisedListenerName) {
long requestId, final String advertisedListenerName,
Map<String, String> properties) {

final CompletableFuture<ByteBuf> validationFuture = new CompletableFuture<>();
final CompletableFuture<ByteBuf> lookupfuture = new CompletableFuture<>();
Expand Down Expand Up @@ -299,6 +302,7 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
.authoritative(authoritative)
.advertisedListenerName(advertisedListenerName)
.loadTopicsInBundle(true)
.properties(properties)
.build();
pulsarService.getNamespaceService().getBrokerServiceUrlAsync(topicName, options)
.thenAccept(lookupResult -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.namespace;

import java.util.Map;
import lombok.Builder;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -46,6 +47,7 @@ public class LookupOptions {
private final boolean requestHttps;

private final String advertisedListenerName;
private final Map<String, String> properties;

public boolean hasAdvertisedListenerName() {
return StringUtils.isNotBlank(advertisedListenerName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -544,9 +545,19 @@ protected void handleLookup(CommandLookupTopic lookup) {
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, authenticationData, originalAuthData).thenApply(
isAuthorized -> {
if (isAuthorized) {
final Map<String, String> properties;
if (lookup.getPropertiesCount() > 0) {
properties = new HashMap<>();
for (int i = 0; i < lookup.getPropertiesCount(); i++) {
final var keyValue = lookup.getPropertyAt(i);
properties.put(keyValue.getKey(), keyValue.getValue());
}
} else {
properties = Collections.emptyMap();
}
lookupTopicAsync(getBrokerService().pulsar(), topicName, authoritative,
getPrincipal(), getAuthenticationData(),
requestId, advertisedListenerName).handle((lookupResponse, ex) -> {
requestId, advertisedListenerName, properties).handle((lookupResponse, ex) -> {
if (ex == null) {
writeAndFlush(lookupResponse);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -58,7 +59,8 @@ public void testConstructors() throws PulsarServerException, URISyntaxException
BrokerLookupData lookupData = new BrokerLookupData(
webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
pulsarServiceUrlTls, advertisedListeners, protocols, true, true,
ExtensibleLoadManagerImpl.class.getName(), System.currentTimeMillis(),"3.0");
ExtensibleLoadManagerImpl.class.getName(), System.currentTimeMillis(),"3.0",
Collections.emptyMap());
assertEquals(webServiceUrl, lookupData.webServiceUrl());
assertEquals(webServiceUrlTls, lookupData.webServiceUrlTls());
assertEquals(pulsarServiceUrl, lookupData.pulsarServiceUrl());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.mockito.Mockito.mock;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -136,6 +137,6 @@ public BrokerLookupData getLookupData(String version, String loadManagerClassNam
return new BrokerLookupData(
webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
pulsarServiceUrlTls, advertisedListeners, protocols, true, true,
loadManagerClassName, -1, version);
loadManagerClassName, -1, version, Collections.emptyMap());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -218,7 +219,7 @@ public BrokerLookupData getLookupData(boolean persistentTopicsEnabled,
webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
pulsarServiceUrlTls, advertisedListeners, protocols,
persistentTopicsEnabled, nonPersistentTopicsEnabled,
ExtensibleLoadManagerImpl.class.getName(), System.currentTimeMillis(), "3.0.0");
ExtensibleLoadManagerImpl.class.getName(), System.currentTimeMillis(), "3.0.0", Collections.emptyMap());
}

public LoadManagerContext getContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.testng.annotations.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -106,6 +108,6 @@ public BrokerLookupData getLookupData(String broker, String loadManagerClassName
return new BrokerLookupData(
webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
pulsarServiceUrlTls, advertisedListeners, protocols, true, true,
loadManagerClassName, startTimeStamp, "3.0.0");
loadManagerClassName, startTimeStamp, "3.0.0", Collections.emptyMap());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -697,7 +698,7 @@ public BrokerLookupData getLookupData() {
webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
pulsarServiceUrlTls, advertisedListeners, protocols,
true, true,
conf.getLoadManagerClassName(), System.currentTimeMillis(), "3.0.0");
conf.getLoadManagerClassName(), System.currentTimeMillis(), "3.0.0", Collections.emptyMap());
}

private void setIsolationPolicies(SimpleResourceAllocationPolicies policies,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.MultiBrokerBaseTest;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.testng.Assert;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker-api")
public class LookupPropertiesTest extends MultiBrokerBaseTest {

private static final String BROKER_KEY = "lookup.broker.id";
private static final String CLIENT_KEY = "broker.id";

@Override
protected void startBroker() throws Exception {
addCustomConfigs(conf, 0);
super.startBroker();
}

@Override
protected ServiceConfiguration createConfForAdditionalBroker(int additionalBrokerIndex) {
return addCustomConfigs(getDefaultConf(), additionalBrokerIndex + 10);
}

private static ServiceConfiguration addCustomConfigs(ServiceConfiguration config, int index) {
config.setDefaultNumberOfNamespaceBundles(16);
config.setLoadBalancerAutoBundleSplitEnabled(false);
config.setLoadManagerClassName(BrokerIdAwareLoadManager.class.getName());
config.setLoadBalancerAverageResourceUsageDifferenceThresholdPercentage(100);
config.setLoadBalancerDebugModeEnabled(true);
config.setBrokerShutdownTimeoutMs(1000);
final var properties = new Properties();
properties.setProperty(BROKER_KEY, "broker-" + index);
config.setProperties(properties);
return config;
}

@Test
public void testLookupProperty() throws Exception {
final var topic = "test-lookup-property";
admin.topics().createPartitionedTopic(topic, 16);
@Cleanup final var client = (PulsarClientImpl) PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.lookupProperties(
Collections.singletonMap(CLIENT_KEY, "broker-10")) // broker-10 refers to additionalBrokers[0]
.build();
@Cleanup final var producer = (PartitionedProducerImpl<byte[]>) client.newProducer().topic(topic).create();
Assert.assertNotNull(producer);
final var connections = producer.getProducers().stream().map(ProducerImpl::getClientCnx)
.collect(Collectors.toSet());
Assert.assertEquals(connections.size(), 1);
final var port = ((InetSocketAddress) connections.stream().findAny().orElseThrow().ctx().channel()
.remoteAddress()).getPort();
Assert.assertEquals(port, additionalBrokers.get(0).getBrokerListenPort().orElseThrow());
}

public static class BrokerIdAwareLoadManager extends ExtensibleLoadManagerImpl {
@Override
public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle, Set<String> excludeBrokerSet,
LookupOptions options) {
final var clientId = options.getProperties() == null ? null : options.getProperties().get(CLIENT_KEY);
if (clientId == null) {
return super.selectAsync(bundle, excludeBrokerSet, options);
}
return getBrokerRegistry().getAvailableBrokerLookupDataAsync().thenCompose(brokerLookupDataMap -> {
final var optBroker = brokerLookupDataMap.entrySet().stream().filter(entry -> {
final var brokerId = entry.getValue().properties().get(BROKER_KEY);
return brokerId != null && brokerId.equals(clientId);
}).findAny();
return optBroker.map(Map.Entry::getKey).map(Optional::of).map(CompletableFuture::completedFuture)
.orElseGet(() -> super.selectAsync(bundle, excludeBrokerSet, options));
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
Expand Down Expand Up @@ -393,4 +394,17 @@ public void testTopicNameCacheConfiguration() throws Exception {
assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 2);
assertEquals(conf.getTopicNameCacheMaxCapacity(), 100);
}

@Test
public void testLookupProperties() throws Exception {
var confFile = "lookup.key1=value1\nkey=value\nlookup.key2=value2";
var conf = (ServiceConfiguration) PulsarConfigurationLoader.create(
new ByteArrayInputStream(confFile.getBytes()), ServiceConfiguration.class);
assertEquals(conf.lookupProperties(), Map.of("lookup.key1", "value1", "lookup.key2", "value2"));

confFile = confFile + "\nlookupPropertyPrefix=lookup.key2";
conf = PulsarConfigurationLoader.create(new ByteArrayInputStream(confFile.getBytes()),
ServiceConfiguration.class);
assertEquals(conf.lookupProperties(), Map.of("lookup.key2", "value2"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -668,4 +668,16 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
* @return the client builder instance
*/
ClientBuilder autoCertRefreshSeconds(int autoCertRefreshSeconds);

/**
* Set the properties used for topic lookup.
* <p>
* When the broker performs topic lookup, these lookup properties will be taken into consideration in a customized
* load manager.
* <p>
* Note: The lookup properties are only used in topic lookup when:
* - The protocol is binary protocol, i.e. the service URL starts with "pulsar://" or "pulsar+ssl://"
* - The `loadManagerClassName` config in broker is a class that implements the `ExtensibleLoadManager` interface
*/
ClientBuilder lookupProperties(Map<String, String> properties);
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ private CompletableFuture<LookupTopicResult> findBroker(InetSocketAddress socket

client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newLookup(topicName.toString(), listenerName, authoritative, requestId);
ByteBuf request = Commands.newLookup(topicName.toString(), listenerName, authoritative, requestId,
client.getConfiguration().getLookupProperties());
clientCnx.newLookup(request, requestId).whenComplete((r, t) -> {
if (t != null) {
// lookup failed
Expand Down
Loading

0 comments on commit 9a97c84

Please sign in to comment.