Skip to content

Commit

Permalink
[fix][broker] Fix broker load manager class filter NPE (#20350)
Browse files Browse the repository at this point in the history
PIP: #16691

### Motivation
When upgrading the pulsar version and changing the pulsar load manager to `ExtensibleLoadManagerImpl` it might cause NPE. The root cause is the old version of pulsar does not contain the `loadManagerClassName` field.
```
2023-05-18T05:42:50,557+0000 [pulsar-io-4-1] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.6:51345] connected with role=[[email protected]](mailto:[email protected]) using authMethod=token, clientVersion=Pulsar Go 0.9.0, clientProtocolVersion=18, proxyVersion=null
2023-05-18T05:42:50,558+0000 [pulsar-io-4-1] WARN  org.apache.pulsar.broker.lookup.TopicLookupBase - Failed to lookup [[email protected]](mailto:[email protected]) for topic persistent://xxx with error java.lang.NullPointerException: Cannot invoke “String.equals(Object)” because the return value of “org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData.getLoadManagerClassName()” is null
java.util.concurrent.CompletionException: java.lang.NullPointerException: Cannot invoke “String.equals(Object)” because the return value of “org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData.getLoadManagerClassName()” is null
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315) ~[?:?]
	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1194) ~[?:?]
	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309) ~[?:?]
	at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.selectAsync(ExtensibleLoadManagerImpl.java:385) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1]
	at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.lambda$assign$6(ExtensibleLoadManagerImpl.java:336) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1]
	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187) ~[?:?]
	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309) ~[?:?]
	at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.lambda$assign$10(ExtensibleLoadManagerImpl.java:333) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1]
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:409) ~[io.streamnative-pulsar-common-3.0.0.1.jar:3.0.0.1]
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:243) ~[io.streamnative-pulsar-common-3.0.0.1.jar:3.0.0.1]
	at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.assign(ExtensibleLoadManagerImpl.java:327) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1]
	at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper.findBrokerServiceUrl(ExtensibleLoadManagerWrapper.java:66) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1]
	at org.apache.pulsar.broker.namespace.NamespaceService.lambda$getBrokerServiceUrlAsync$0(NamespaceService.java:191) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1]
```

### Modifications

* Add null check when using`getLoadManagerClassName`.
* Add test to cover this case.
* Add `RedirectManager` unit test.
  • Loading branch information
Demogorgon314 authored May 22, 2023
1 parent a9f2f28 commit b7f0004
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.loadbalance.extensions.filter;

import java.util.Map;
import java.util.Objects;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
Expand All @@ -43,7 +44,9 @@ public Map<String, BrokerLookupData> filter(
}
brokers.entrySet().removeIf(entry -> {
BrokerLookupData v = entry.getValue();
return !v.getLoadManagerClassName().equals(context.brokerConfiguration().getLoadManagerClassName());
// The load manager class name can be null if the cluster has old version of broker.
return !Objects.equals(v.getLoadManagerClassName(),
context.brokerConfiguration().getLoadManagerClassName());
});
return brokers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
package org.apache.pulsar.broker.loadbalance.extensions.manager;

import static org.apache.pulsar.broker.loadbalance.LoadManager.LOADBALANCE_BROKERS_ROOT;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -48,6 +50,12 @@ public RedirectManager(PulsarService pulsar) {
this.brokerLookupDataLockManager = pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
}

@VisibleForTesting
public RedirectManager(PulsarService pulsar, LockManager<BrokerLookupData> brokerLookupDataLockManager) {
this.pulsar = pulsar;
this.brokerLookupDataLockManager = brokerLookupDataLockManager;
}

public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookupDataAsync() {
return brokerLookupDataLockManager.listLocks(LOADBALANCE_BROKERS_ROOT).thenCompose(availableBrokers -> {
Map<String, BrokerLookupData> map = new ConcurrentHashMap<>();
Expand All @@ -69,7 +77,7 @@ public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookup

public CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync() {
String currentLMClassName = pulsar.getConfiguration().getLoadManagerClassName();
boolean debug = ExtensibleLoadManagerImpl.debug(pulsar.getConfig(), log);
boolean debug = ExtensibleLoadManagerImpl.debug(pulsar.getConfiguration(), log);
return getAvailableBrokerLookupDataAsync().thenApply(lookupDataMap -> {
if (lookupDataMap.isEmpty()) {
String errorMsg = "No available broker found.";
Expand All @@ -89,17 +97,18 @@ public CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync()
log.warn(errorMsg);
throw new IllegalStateException(errorMsg);
}
if (latestServiceLookupData.get().getLoadManagerClassName().equals(currentLMClassName)) {

if (Objects.equals(latestServiceLookupData.get().getLoadManagerClassName(), currentLMClassName)) {
if (debug) {
log.info("We don't need to redirect, current load manager class name: {}",
log.info("No need to redirect, current load manager class name: {}",
currentLMClassName);
}
return Optional.empty();
}
var serviceLookupDataObj = latestServiceLookupData.get();
var candidateBrokers = new ArrayList<ServiceLookupData>();
lookupDataMap.forEach((key, value) -> {
if (value.getLoadManagerClassName().equals(serviceLookupDataObj.getLoadManagerClassName())) {
if (Objects.equals(value.getLoadManagerClassName(), serviceLookupDataObj.getLoadManagerClassName())) {
candidateBrokers.add(value);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;

import java.util.Objects;
import java.util.Set;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerFilter;
Expand All @@ -32,8 +33,8 @@ public void filter(Set<String> brokers, BundleData bundleToAssign,
LoadData loadData,
ServiceConfiguration conf) throws BrokerFilterException {
loadData.getBrokerData().forEach((key, value) -> {
if (!value.getLocalData().getLoadManagerClassName()
.equals(conf.getLoadManagerClassName())) {
// The load manager class name can be null if the cluster has old version of broker.
if (!Objects.equals(value.getLocalData().getLoadManagerClassName(), conf.getLoadManagerClassName())) {
brokers.remove(key);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public void test() throws BrokerFilterException {
"broker1", getLookupData("3.0.0", ExtensibleLoadManagerImpl.class.getName()),
"broker2", getLookupData("3.0.0", ExtensibleLoadManagerImpl.class.getName()),
"broker3", getLookupData("3.0.0", ModularLoadManagerImpl.class.getName()),
"broker4", getLookupData("3.0.0", ModularLoadManagerImpl.class.getName())
"broker4", getLookupData("3.0.0", ModularLoadManagerImpl.class.getName()),
"broker5", getLookupData("3.0.0", null)
);

Map<String, BrokerLookupData> result = filter.filter(new HashMap<>(originalBrokers), null, context);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions.manager;

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.testng.annotations.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;


/**
* Unit test {@link RedirectManager}.
*/
public class RedirectManagerTest {

@Test
public void testFindRedirectLookupResultAsync() throws ExecutionException, InterruptedException {
PulsarService pulsar = mock(PulsarService.class);
ServiceConfiguration configuration = new ServiceConfiguration();
when(pulsar.getConfiguration()).thenReturn(configuration);
RedirectManager redirectManager = spy(new RedirectManager(pulsar, null));

configuration.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
configuration.setLoadBalancerDebugModeEnabled(true);

// Test 1: No load manager class name found.
doReturn(CompletableFuture.completedFuture(
new HashMap<>(){{
put("broker-1", getLookupData("broker-1", null, 10));
put("broker-2", getLookupData("broker-2", ModularLoadManagerImpl.class.getName(), 1));
}}
)).when(redirectManager).getAvailableBrokerLookupDataAsync();

// Should redirect to broker-1, since broker-1 has the latest load manager, even though the class name is null.
Optional<LookupResult> lookupResult = redirectManager.findRedirectLookupResultAsync().get();
assertTrue(lookupResult.isPresent());
assertTrue(lookupResult.get().getLookupData().getBrokerUrl().contains("broker-1"));

// Test 2: Should redirect to broker-1, since the latest broker are using ExtensibleLoadManagerImpl
doReturn(CompletableFuture.completedFuture(
new HashMap<>(){{
put("broker-1", getLookupData("broker-1", ExtensibleLoadManagerImpl.class.getName(), 10));
put("broker-2", getLookupData("broker-2", ModularLoadManagerImpl.class.getName(), 1));
}}
)).when(redirectManager).getAvailableBrokerLookupDataAsync();

lookupResult = redirectManager.findRedirectLookupResultAsync().get();
assertTrue(lookupResult.isPresent());
assertTrue(lookupResult.get().getLookupData().getBrokerUrl().contains("broker-1"));


// Test 3: Should not redirect, since current broker are using ModularLoadManagerImpl
doReturn(CompletableFuture.completedFuture(
new HashMap<>(){{
put("broker-1", getLookupData("broker-1", ExtensibleLoadManagerImpl.class.getName(), 10));
put("broker-2", getLookupData("broker-2", ModularLoadManagerImpl.class.getName(), 100));
}}
)).when(redirectManager).getAvailableBrokerLookupDataAsync();

lookupResult = redirectManager.findRedirectLookupResultAsync().get();
assertFalse(lookupResult.isPresent());
}


public BrokerLookupData getLookupData(String broker, String loadManagerClassName, long startTimeStamp) {
String webServiceUrl = "http://" + broker + ":8080";
String webServiceUrlTls = "https://" + broker + ":8081";
String pulsarServiceUrl = "pulsar://" + broker + ":6650";
String pulsarServiceUrlTls = "pulsar+ssl://" + broker + ":6651";
Map<String, AdvertisedListener> advertisedListeners = new HashMap<>();
Map<String, String> protocols = new HashMap<>(){{
put("kafka", "9092");
}};
return new BrokerLookupData(
webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
pulsarServiceUrlTls, advertisedListeners, protocols, true, true,
loadManagerClassName, startTimeStamp, "3.0.0");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,20 @@ public void test() throws BrokerFilterException {

LocalBrokerData localBrokerData1 = new LocalBrokerData();
localBrokerData1.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());

LocalBrokerData localBrokerData2 = new LocalBrokerData();
localBrokerData2.setLoadManagerClassName(null);
loadData.getBrokerData().put("broker1", new BrokerData(localBrokerData));
loadData.getBrokerData().put("broker2", new BrokerData(localBrokerData1));
loadData.getBrokerData().put("broker3", new BrokerData(localBrokerData2));

ServiceConfiguration conf = new ServiceConfiguration();
conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());

Set<String> brokers = new HashSet<>(){{
add("broker1");
add("broker2");
add("broker3");
}};
filter.filter(brokers, null, loadData, conf);

Expand All @@ -64,6 +69,7 @@ public void test() throws BrokerFilterException {
brokers = new HashSet<>(){{
add("broker1");
add("broker2");
add("broker3");
}};
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
filter.filter(brokers, null, loadData, conf);
Expand Down

0 comments on commit b7f0004

Please sign in to comment.