Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Gracefully shut down load balancer extension #20315

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,17 @@ public void closeMetadataServiceSession() throws Exception {
localMetadataStore.close();
}

private void closeLeaderElectionService() throws Exception {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
ExtensibleLoadManagerImpl.get(loadManager.get()).getLeaderElectionService().close();
} else {
if (this.leaderElectionService != null) {
this.leaderElectionService.close();
this.leaderElectionService = null;
}
}
}

@Override
public void close() throws PulsarServerException {
try {
Expand Down Expand Up @@ -502,10 +513,7 @@ public CompletableFuture<Void> closeAsync() {
this.bkClientFactory = null;
}

if (this.leaderElectionService != null) {
this.leaderElectionService.close();
this.leaderElectionService = null;
}
closeLeaderElectionService();

if (adminClient != null) {
adminClient.close();
Expand Down Expand Up @@ -1316,7 +1324,11 @@ public boolean isRunning() {
* @return a reference of the current <code>LeaderElectionService</code> instance.
*/
public LeaderElectionService getLeaderElectionService() {
return this.leaderElectionService;
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Demogorgon314 Plz confirm this change.

return ExtensibleLoadManagerImpl.get(loadManager.get()).getLeaderElectionService();
} else {
return this.leaderElectionService;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -380,12 +381,22 @@ public CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnit
}

public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle) {
return selectAsync(bundle, Collections.emptySet());
}

public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle,
Set<String> excludeBrokerSet) {
BrokerRegistry brokerRegistry = getBrokerRegistry();
return brokerRegistry.getAvailableBrokerLookupDataAsync()
.thenCompose(availableBrokers -> {
LoadManagerContext context = this.getContext();

Map<String, BrokerLookupData> availableBrokerCandidates = new HashMap<>(availableBrokers);
if (!excludeBrokerSet.isEmpty()) {
for (String exclude : excludeBrokerSet) {
availableBrokerCandidates.remove(exclude);
}
}

// Filter out brokers that do not meet the rules.
List<BrokerFilter> filterPipeline = getBrokerFilterPipeline();
Expand Down Expand Up @@ -685,4 +696,10 @@ private void monitor() {
log.error("Failed to get the channel ownership.", e);
}
}

public void disableBroker() throws Exception {
serviceUnitStateChannel.cleanOwnerships();
leaderElectionService.close();
brokerRegistry.unregister();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> to

@Override
public void disableBroker() throws Exception {
this.loadManager.getBrokerRegistry().unregister();
this.loadManager.disableBroker();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,9 @@ public interface ServiceUnitStateChannel extends Closeable {
* Cancels the ownership monitor.
*/
void cancelOwnershipMonitor();

/**
* Cleans the service unit ownerships from the current broker's channel.
*/
void cleanOwnerships();
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
Expand All @@ -110,6 +109,10 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {

public static final CompressionType MSG_COMPRESSION_TYPE = CompressionType.ZSTD;
private static final long MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS = 30 * 1000; // 30sec

private static final int OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS = 5000;
private static final int OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS = 100;
private static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 3000;
public static final long VERSION_ID_INIT = 1; // initial versionId
private static final long OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS = 60;
public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3 mins
Expand Down Expand Up @@ -257,6 +260,11 @@ public void cancelOwnershipMonitor() {
}
}

@Override
public void cleanOwnerships() {
doCleanup(lookupServiceAddress);
}

public synchronized void start() throws PulsarServerException {
if (!validateChannelState(LeaderElectionServiceStarted, false)) {
throw new IllegalStateException("Invalid channel state:" + channelState.name());
Expand Down Expand Up @@ -284,7 +292,7 @@ public synchronized void start() throws PulsarServerException {
}
}
PulsarClusterMetadataSetup.createNamespaceIfAbsent
(pulsar.getPulsarResources(), NamespaceName.SYSTEM_NAMESPACE, config.getClusterName());
(pulsar.getPulsarResources(), SYSTEM_NAMESPACE, config.getClusterName());

producer = pulsar.getClient().newProducer(schema)
.enableBatching(true)
Expand Down Expand Up @@ -694,6 +702,8 @@ private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) {
if (isTargetBroker(data.dstBroker())) {
log(null, serviceUnit, data, null);
lastOwnEventHandledAt = System.currentTimeMillis();
} else if (data.force() && isTargetBroker(data.sourceBroker())) {
closeServiceUnit(serviceUnit);
}
}

Expand Down Expand Up @@ -1108,21 +1118,23 @@ private void scheduleCleanup(String broker, long delayInSecs) {


private ServiceUnitStateData getOverrideInactiveBrokerStateData(ServiceUnitStateData orphanData,
String selectedBroker) {
String selectedBroker,
String inactiveBroker) {
if (orphanData.state() == Splitting) {
return new ServiceUnitStateData(Splitting, orphanData.dstBroker(), selectedBroker,
Map.copyOf(orphanData.splitServiceUnitToDestBroker()),
true, getNextVersionId(orphanData));
} else {
return new ServiceUnitStateData(Owned, selectedBroker, true, getNextVersionId(orphanData));
return new ServiceUnitStateData(Owned, selectedBroker, inactiveBroker,
true, getNextVersionId(orphanData));
}
}

private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanData) {

Optional<String> selectedBroker = selectBroker(serviceUnit);
private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanData, String inactiveBroker) {
Optional<String> selectedBroker = selectBroker(serviceUnit, inactiveBroker);
if (selectedBroker.isPresent()) {
var override = getOverrideInactiveBrokerStateData(orphanData, selectedBroker.get());
var override = getOverrideInactiveBrokerStateData(
orphanData, selectedBroker.get(), inactiveBroker);
log.info("Overriding ownership serviceUnit:{} from orphanData:{} to overrideData:{}",
serviceUnit, orphanData, override);
publishOverrideEventAsync(serviceUnit, orphanData, override)
Expand All @@ -1140,26 +1152,69 @@ private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanDa
}
}

protected void waitForCleanups(boolean excludeSystemTopics, int timeoutInMillis) {
long started = System.currentTimeMillis();
while (System.currentTimeMillis() - started < timeoutInMillis) {
boolean cleaned = true;
for (var etr : tableview.entrySet()) {
var serviceUnit = etr.getKey();
var data = etr.getValue();

if (excludeSystemTopics && serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
continue;
}

if (data.state() == Owned && data.dstBroker().equals(lookupServiceAddress)) {
cleaned = false;
break;
}
}
if (cleaned) {
try {
MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS);
gaoran10 marked this conversation as resolved.
Show resolved Hide resolved
} catch (InterruptedException e) {
log.warn("Interrupted while gracefully waiting for the cleanup convergence.");
}
break;
} else {
try {
MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS);
} catch (InterruptedException e) {
log.warn("Interrupted while delaying the next service unit clean-up. Cleaning broker:{}",
lookupServiceAddress);
}
}
}
}

private void doCleanup(String broker) {
private synchronized void doCleanup(String broker) {
long startTime = System.nanoTime();
log.info("Started ownership cleanup for the inactive broker:{}", broker);
int orphanServiceUnitCleanupCnt = 0;
long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();

Map<String, ServiceUnitStateData> orphanSystemServiceUnits = new HashMap<>();
for (var etr : tableview.entrySet()) {
var stateData = etr.getValue();
var serviceUnit = etr.getKey();
var state = state(stateData);
if (StringUtils.equals(broker, stateData.dstBroker())) {
if (isActiveState(state)) {
overrideOwnership(serviceUnit, stateData);
if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
orphanSystemServiceUnits.put(serviceUnit, stateData);
} else {
overrideOwnership(serviceUnit, stateData, broker);
}
orphanServiceUnitCleanupCnt++;
}

} else if (StringUtils.equals(broker, stateData.sourceBroker())) {
if (isInFlightState(state)) {
overrideOwnership(serviceUnit, stateData);
if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
orphanSystemServiceUnits.put(serviceUnit, stateData);
} else {
overrideOwnership(serviceUnit, stateData, broker);
}
orphanServiceUnitCleanupCnt++;
}
}
Expand All @@ -1168,14 +1223,28 @@ private void doCleanup(String broker) {
try {
producer.flush();
} catch (PulsarClientException e) {
log.error("Failed to flush the in-flight messages.", e);
log.error("Failed to flush the in-flight non-system bundle override messages.", e);
}


if (orphanServiceUnitCleanupCnt > 0) {
waitForCleanups(true, OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS);
this.totalOrphanServiceUnitCleanupCnt += orphanServiceUnitCleanupCnt;
this.totalInactiveBrokerCleanupCnt++;
}

// clean system bundles in the end
for (var orphanSystemServiceUnit : orphanSystemServiceUnits.entrySet()) {
log.info("Overriding orphan system service unit:{}", orphanSystemServiceUnit.getKey());
overrideOwnership(orphanSystemServiceUnit.getKey(), orphanSystemServiceUnit.getValue(), broker);
}

try {
producer.flush();
} catch (PulsarClientException e) {
log.error("Failed to flush the in-flight system bundle override messages.", e);
}

double cleanupTime = TimeUnit.NANOSECONDS
.toMillis((System.nanoTime() - startTime));

Expand All @@ -1194,18 +1263,20 @@ private void doCleanup(String broker) {

}

private Optional<String> selectBroker(String serviceUnit) {
private Optional<String> selectBroker(String serviceUnit, String inactiveBroker) {
try {
return loadManager.selectAsync(getNamespaceBundle(serviceUnit))
return loadManager.selectAsync(getNamespaceBundle(serviceUnit), Set.of(inactiveBroker))
.get(inFlightStateWaitingTimeInMillis, MILLISECONDS);
} catch (Throwable e) {
log.error("Failed to select a broker for serviceUnit:{}", serviceUnit);
}
return Optional.empty();
}

private Optional<ServiceUnitStateData> getRollForwardStateData(String serviceUnit, long nextVersionId) {
Optional<String> selectedBroker = selectBroker(serviceUnit);
private Optional<ServiceUnitStateData> getRollForwardStateData(String serviceUnit,
String inactiveBroker,
long nextVersionId) {
Optional<String> selectedBroker = selectBroker(serviceUnit, inactiveBroker);
if (selectedBroker.isEmpty()) {
return Optional.empty();
}
Expand All @@ -1220,7 +1291,7 @@ private Optional<ServiceUnitStateData> getOverrideInFlightStateData(
var state = orphanData.state();
switch (state) {
case Assigning: {
return getRollForwardStateData(serviceUnit, nextVersionId);
return getRollForwardStateData(serviceUnit, orphanData.dstBroker(), nextVersionId);
}
case Splitting: {
return Optional.of(new ServiceUnitStateData(Splitting,
Expand All @@ -1233,7 +1304,7 @@ private Optional<ServiceUnitStateData> getOverrideInFlightStateData(
// rollback to the src
return Optional.of(new ServiceUnitStateData(Owned, orphanData.sourceBroker(), true, nextVersionId));
} else {
return getRollForwardStateData(serviceUnit, nextVersionId);
return getRollForwardStateData(serviceUnit, orphanData.sourceBroker(), nextVersionId);
}
}
default: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,51 @@ SplitDecision.Reason.Unknown, new AtomicLong(6))
assertEquals(actual, expected);
}

@Test
public void testDisableBroker() throws Exception {
// Test rollback to modular load manager.
ServiceConfiguration defaultConf = getDefaultConf();
defaultConf.setAllowAutoTopicCreation(true);
defaultConf.setForceDeleteNamespaceAllowed(true);
defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
defaultConf.setLoadBalancerSheddingEnabled(false);
try (var additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf)) {
var pulsar3 = additionalPulsarTestContext.getPulsarService();
ExtensibleLoadManagerImpl ternaryLoadManager = spy((ExtensibleLoadManagerImpl)
FieldUtils.readField(pulsar3.getLoadManager().get(), "loadManager", true));
String topic = "persistent://public/default/test";

String lookupResult1 = pulsar3.getAdminClient().lookups().lookupTopic(topic);
TopicName topicName = TopicName.get("test");
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
if (!pulsar3.getBrokerServiceUrl().equals(lookupResult1)) {
admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), bundle.getBundleRange(),
pulsar3.getLookupServiceAddress());
lookupResult1 = pulsar2.getAdminClient().lookups().lookupTopic(topic);
}
String lookupResult2 = pulsar1.getAdminClient().lookups().lookupTopic(topic);
String lookupResult3 = pulsar2.getAdminClient().lookups().lookupTopic(topic);

assertEquals(lookupResult1, pulsar3.getBrokerServiceUrl());
assertEquals(lookupResult1, lookupResult2);
assertEquals(lookupResult1, lookupResult3);


assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get());
assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get());
assertTrue(ternaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get());

ternaryLoadManager.disableBroker();

assertFalse(ternaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get());
if (primaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()) {
assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get());
} else {
assertTrue(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get());
}
}
}

private static abstract class MockBrokerFilter implements BrokerFilter {

@Override
Expand Down
Loading