Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Explicitly close LB internal topics when playing a follower (ExtensibleLoadManagerImpl only) #23144

Merged
merged 1 commit into from
Aug 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS

private static final String ELECTION_ROOT = "/loadbalance/extension/leader";

private static final Set<String> INTERNAL_TOPICS =
public static final Set<String> INTERNAL_TOPICS =
Set.of(BROKER_LOAD_DATA_STORE_TOPIC, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TOPIC);

@VisibleForTesting
Expand All @@ -146,7 +146,10 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS
@Getter
private IsolationPoliciesHelper isolationPoliciesHelper;

@Getter
private LoadDataStore<BrokerLoadData> brokerLoadDataStore;

@Getter
private LoadDataStore<TopBundlesLoadData> topBundlesLoadDataStore;

private LoadManagerScheduler unloadScheduler;
Expand Down Expand Up @@ -259,6 +262,7 @@ public enum Role {
Follower
}

@Getter
private volatile Role role;

/**
Expand Down Expand Up @@ -903,6 +907,7 @@ synchronized void playFollower() {
}
unloadScheduler.close();
serviceUnitStateChannel.cancelOwnershipMonitor();
closeInternalTopics();
brokerLoadDataStore.init();
topBundlesLoadDataStore.close();
topBundlesLoadDataStore.startProducer();
Expand Down Expand Up @@ -1006,12 +1011,13 @@ public void disableBroker() throws Exception {
private void closeInternalTopics() {
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (String name : INTERNAL_TOPICS) {
futures.add(pulsar.getBrokerService().getTopicIfExists(name)
.thenAccept(topicOptional -> topicOptional.ifPresent(topic -> topic.close(true)))
.exceptionally(__ -> {
log.warn("Failed to close internal topic:{}", name);
return null;
}));
pulsar.getBrokerService()
.getTopicReference(name)
.ifPresent(topic -> futures.add(topic.close(true)
.exceptionally(__ -> {
log.warn("Failed to close internal topic:{}", name);
return null;
})));
}
try {
FutureUtil.waitForAll(futures)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
Expand All @@ -122,7 +121,6 @@
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.TableViewImpl;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
Expand All @@ -134,6 +132,7 @@
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -1186,55 +1185,49 @@ private void assertLookupSLANamespaceOwner(PulsarService pulsar,
assertEquals(result, expectedBrokerServiceUrl);
}

@Test(priority = 10)
public void testTopBundlesLoadDataStoreTableViewFromChannelOwner() throws Exception {
var topBundlesLoadDataStorePrimary =
(LoadDataStore) FieldUtils.readDeclaredField(primaryLoadManager, "topBundlesLoadDataStore", true);
var serviceUnitStateChannelPrimary =
(ServiceUnitStateChannelImpl) FieldUtils.readDeclaredField(primaryLoadManager,
"serviceUnitStateChannel", true);
var tvPrimary =
(TableViewImpl) FieldUtils.readDeclaredField(topBundlesLoadDataStorePrimary, "tableView", true);

var topBundlesLoadDataStoreSecondary =
(LoadDataStore) FieldUtils.readDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore", true);
var tvSecondary =
(TableViewImpl) FieldUtils.readDeclaredField(topBundlesLoadDataStoreSecondary, "tableView", true);

if (serviceUnitStateChannelPrimary.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
assertNotNull(tvPrimary);
assertNull(tvSecondary);
} else {
assertNull(tvPrimary);
assertNotNull(tvSecondary);

private void makePrimaryAsLeader() throws Exception {
log.info("makePrimaryAsLeader");
if (channel2.isChannelOwner()) {
pulsar2.getLeaderElectionService().close();
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
assertTrue(channel1.isChannelOwner());
});
pulsar2.getLeaderElectionService().start();
}

restartBroker();
pulsar1 = pulsar;
setPrimaryLoadManager();
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
Sets.newHashSet(this.conf.getClusterName()));

var serviceUnitStateChannelPrimaryNew =
(ServiceUnitStateChannelImpl) FieldUtils.readDeclaredField(primaryLoadManager,
"serviceUnitStateChannel", true);
var topBundlesLoadDataStorePrimaryNew =
(LoadDataStore) FieldUtils.readDeclaredField(primaryLoadManager, "topBundlesLoadDataStore"
, true);
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
assertFalse(serviceUnitStateChannelPrimaryNew.isChannelOwnerAsync().get(5, TimeUnit.SECONDS));
assertNotNull(FieldUtils.readDeclaredField(topBundlesLoadDataStoreSecondary, "tableView"
, true));
assertNull(FieldUtils.readDeclaredField(topBundlesLoadDataStorePrimaryNew, "tableView"
, true));
}
);
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
assertTrue(channel1.isChannelOwner());
});
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
assertFalse(channel2.isChannelOwner());
});
}

@Test
public void testRoleChange() throws Exception {
var topBundlesLoadDataStorePrimary = (LoadDataStore<TopBundlesLoadData>)
FieldUtils.readDeclaredField(primaryLoadManager, "topBundlesLoadDataStore", true);
private void makeSecondaryAsLeader() throws Exception {
log.info("makeSecondaryAsLeader");
if (channel1.isChannelOwner()) {
pulsar1.getLeaderElectionService().close();
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
assertTrue(channel2.isChannelOwner());
});
pulsar1.getLeaderElectionService().start();
}

Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
assertTrue(channel2.isChannelOwner());
});
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
assertFalse(channel1.isChannelOwner());
});
}

@Test(timeOut = 30 * 1000)
public void testRoleChangeIdempotency() throws Exception {

makePrimaryAsLeader();

var topBundlesLoadDataStorePrimary = primaryLoadManager.getTopBundlesLoadDataStore();
var topBundlesLoadDataStorePrimarySpy = spy(topBundlesLoadDataStorePrimary);
AtomicInteger countPri = new AtomicInteger(3);
AtomicInteger countPri2 = new AtomicInteger(3);
Expand All @@ -1255,8 +1248,7 @@ public void testRoleChange() throws Exception {
return null;
}).when(topBundlesLoadDataStorePrimarySpy).closeTableView();

var topBundlesLoadDataStoreSecondary = (LoadDataStore<TopBundlesLoadData>)
FieldUtils.readDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore", true);
var topBundlesLoadDataStoreSecondary = secondaryLoadManager.getTopBundlesLoadDataStore();
var topBundlesLoadDataStoreSecondarySpy = spy(topBundlesLoadDataStoreSecondary);
AtomicInteger countSec = new AtomicInteger(3);
AtomicInteger countSec2 = new AtomicInteger(3);
Expand Down Expand Up @@ -1284,58 +1276,129 @@ public void testRoleChange() throws Exception {
topBundlesLoadDataStoreSecondarySpy, true);


if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
primaryLoadManager.playLeader();
secondaryLoadManager.playFollower();
verify(topBundlesLoadDataStorePrimarySpy, times(3)).startTableView();
verify(topBundlesLoadDataStorePrimarySpy, times(5)).closeTableView();
verify(topBundlesLoadDataStoreSecondarySpy, times(0)).startTableView();
verify(topBundlesLoadDataStoreSecondarySpy, times(3)).closeTableView();
} else {
primaryLoadManager.playFollower();
secondaryLoadManager.playLeader();
verify(topBundlesLoadDataStoreSecondarySpy, times(3)).startTableView();
verify(topBundlesLoadDataStoreSecondarySpy, times(5)).closeTableView();
verify(topBundlesLoadDataStorePrimarySpy, times(0)).startTableView();
verify(topBundlesLoadDataStorePrimarySpy, times(3)).closeTableView();
}

primaryLoadManager.playLeader();
secondaryLoadManager.playFollower();
verify(topBundlesLoadDataStorePrimarySpy, times(3)).startTableView();
verify(topBundlesLoadDataStorePrimarySpy, times(5)).closeTableView();
verify(topBundlesLoadDataStoreSecondarySpy, times(0)).startTableView();
verify(topBundlesLoadDataStoreSecondarySpy, times(3)).closeTableView();


primaryLoadManager.playFollower();
secondaryLoadManager.playFollower();
assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
primaryLoadManager.getRole());
assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
secondaryLoadManager.getRole());

if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
FieldUtils.readDeclaredField(primaryLoadManager, "role", true));
assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
FieldUtils.readDeclaredField(secondaryLoadManager, "role", true));
} else {
assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
FieldUtils.readDeclaredField(primaryLoadManager, "role", true));
assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
FieldUtils.readDeclaredField(secondaryLoadManager, "role", true));
}

primaryLoadManager.playLeader();
secondaryLoadManager.playLeader();
assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
primaryLoadManager.getRole());
assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
secondaryLoadManager.getRole());

if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
FieldUtils.readDeclaredField(primaryLoadManager, "role", true));
assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
FieldUtils.readDeclaredField(secondaryLoadManager, "role", true));
} else {
assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
FieldUtils.readDeclaredField(primaryLoadManager, "role", true));
assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
FieldUtils.readDeclaredField(secondaryLoadManager, "role", true));
}
} finally {
FieldUtils.writeDeclaredField(primaryLoadManager, "topBundlesLoadDataStore",
topBundlesLoadDataStorePrimary, true);
FieldUtils.writeDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore",
topBundlesLoadDataStoreSecondary, true);
}
}
@Test(timeOut = 30 * 1000)
public void testRoleChange() throws Exception {
makePrimaryAsLeader();

var leader = primaryLoadManager;
var follower = secondaryLoadManager;

BrokerLoadData brokerLoadExpected = new BrokerLoadData();
SystemResourceUsage usage = new SystemResourceUsage();
var cpu = new ResourceUsage(1.0, 100.0);
String key = "b1";
usage.setCpu(cpu);
brokerLoadExpected.update(usage, 0, 0, 0, 0, 0, 0, conf);
String bundle = "public/default/0x00000000_0xffffffff";
TopBundlesLoadData topBundlesExpected = new TopBundlesLoadData();
topBundlesExpected.getTopBundlesLoadData().clear();
topBundlesExpected.getTopBundlesLoadData().add(new TopBundlesLoadData.BundleLoadData(bundle, new NamespaceBundleStats()));

follower.getBrokerLoadDataStore().pushAsync(key, brokerLoadExpected);
follower.getTopBundlesLoadDataStore().pushAsync(bundle, topBundlesExpected);

Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {

assertNotNull(FieldUtils.readDeclaredField(leader.getTopBundlesLoadDataStore(), "tableView", true));
assertNull(FieldUtils.readDeclaredField(follower.getTopBundlesLoadDataStore(), "tableView", true));

for (String internalTopic : ExtensibleLoadManagerImpl.INTERNAL_TOPICS) {
assertTrue(leader.pulsar.getBrokerService().getTopicReference(internalTopic)
.isPresent());
assertTrue(follower.pulsar.getBrokerService().getTopicReference(internalTopic)
.isEmpty());

assertTrue(leader.pulsar.getNamespaceService()
.isServiceUnitOwnedAsync(TopicName.get(internalTopic)).get());
assertFalse(follower.pulsar.getNamespaceService()
.isServiceUnitOwnedAsync(TopicName.get(internalTopic)).get());
}

var actualBrokerLoadLeader = leader.getBrokerLoadDataStore().get(key);
if (actualBrokerLoadLeader.isPresent()) {
assertEquals(actualBrokerLoadLeader.get(), brokerLoadExpected);
}

var actualTopBundlesLeader = leader.getTopBundlesLoadDataStore().get(bundle);
if (actualTopBundlesLeader.isPresent()) {
assertEquals(actualTopBundlesLeader.get(), topBundlesExpected);
}

var actualBrokerLoadFollower = follower.getBrokerLoadDataStore().get(key);
if (actualBrokerLoadFollower.isPresent()) {
assertEquals(actualBrokerLoadFollower.get(), brokerLoadExpected);
}
});

makeSecondaryAsLeader();

var leader2 = secondaryLoadManager;
var follower2 = primaryLoadManager;

brokerLoadExpected.update(usage, 1, 0, 0, 0, 0, 0, conf);
topBundlesExpected.getTopBundlesLoadData().get(0).stats().msgRateIn = 1;

follower.getBrokerLoadDataStore().pushAsync(key, brokerLoadExpected);
follower.getTopBundlesLoadDataStore().pushAsync(bundle, topBundlesExpected);

Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
assertNotNull(FieldUtils.readDeclaredField(leader2.getTopBundlesLoadDataStore(), "tableView", true));
assertNull(FieldUtils.readDeclaredField(follower2.getTopBundlesLoadDataStore(), "tableView", true));

for (String internalTopic : ExtensibleLoadManagerImpl.INTERNAL_TOPICS) {
assertTrue(leader2.pulsar.getBrokerService().getTopicReference(internalTopic)
.isPresent());
assertTrue(follower2.pulsar.getBrokerService().getTopicReference(internalTopic)
.isEmpty());

assertTrue(leader2.pulsar.getNamespaceService()
.isServiceUnitOwnedAsync(TopicName.get(internalTopic)).get());
assertFalse(follower2.pulsar.getNamespaceService()
.isServiceUnitOwnedAsync(TopicName.get(internalTopic)).get());
}


var actualBrokerLoadLeader = leader2.getBrokerLoadDataStore().get(key);
assertEquals(actualBrokerLoadLeader.get(), brokerLoadExpected);

var actualTopBundlesLeader = leader2.getTopBundlesLoadDataStore().get(bundle);
assertEquals(actualTopBundlesLeader.get(), topBundlesExpected);

var actualBrokerLoadFollower = follower2.getBrokerLoadDataStore().get(key);
assertEquals(actualBrokerLoadFollower.get(), brokerLoadExpected);
});
}

@Test
public void testGetMetrics() throws Exception {
Expand Down
Loading