diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 9450c2a9cc467..95882cfb21b3c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -124,7 +124,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS private static final String ELECTION_ROOT = "/loadbalance/extension/leader"; - private static final Set INTERNAL_TOPICS = + public static final Set INTERNAL_TOPICS = Set.of(BROKER_LOAD_DATA_STORE_TOPIC, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TOPIC); @VisibleForTesting @@ -146,7 +146,10 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS @Getter private IsolationPoliciesHelper isolationPoliciesHelper; + @Getter private LoadDataStore brokerLoadDataStore; + + @Getter private LoadDataStore topBundlesLoadDataStore; private LoadManagerScheduler unloadScheduler; @@ -259,6 +262,7 @@ public enum Role { Follower } + @Getter private volatile Role role; /** @@ -903,6 +907,7 @@ synchronized void playFollower() { } unloadScheduler.close(); serviceUnitStateChannel.cancelOwnershipMonitor(); + closeInternalTopics(); brokerLoadDataStore.init(); topBundlesLoadDataStore.close(); topBundlesLoadDataStore.startProducer(); @@ -1006,12 +1011,13 @@ public void disableBroker() throws Exception { private void closeInternalTopics() { List> futures = new ArrayList<>(); for (String name : INTERNAL_TOPICS) { - futures.add(pulsar.getBrokerService().getTopicIfExists(name) - .thenAccept(topicOptional -> topicOptional.ifPresent(topic -> topic.close(true))) - .exceptionally(__ -> { - log.warn("Failed to close internal topic:{}", name); - return null; - })); + pulsar.getBrokerService() + .getTopicReference(name) + .ifPresent(topic -> futures.add(topic.close(true) + .exceptionally(__ -> { + log.warn("Failed to close internal topic:{}", name); + return null; + }))); } try { FutureUtil.waitForAll(futures) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 4a9b80c798f86..69a65caf2943c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -102,7 +102,6 @@ import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter; import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter; import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder; -import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore; import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl; import org.apache.pulsar.broker.lookup.LookupResult; import org.apache.pulsar.broker.namespace.LookupOptions; @@ -122,7 +121,6 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.LookupService; -import org.apache.pulsar.client.impl.TableViewImpl; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; @@ -134,6 +132,7 @@ import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage; import org.awaitility.Awaitility; @@ -1186,55 +1185,49 @@ private void assertLookupSLANamespaceOwner(PulsarService pulsar, assertEquals(result, expectedBrokerServiceUrl); } - @Test(priority = 10) - public void testTopBundlesLoadDataStoreTableViewFromChannelOwner() throws Exception { - var topBundlesLoadDataStorePrimary = - (LoadDataStore) FieldUtils.readDeclaredField(primaryLoadManager, "topBundlesLoadDataStore", true); - var serviceUnitStateChannelPrimary = - (ServiceUnitStateChannelImpl) FieldUtils.readDeclaredField(primaryLoadManager, - "serviceUnitStateChannel", true); - var tvPrimary = - (TableViewImpl) FieldUtils.readDeclaredField(topBundlesLoadDataStorePrimary, "tableView", true); - - var topBundlesLoadDataStoreSecondary = - (LoadDataStore) FieldUtils.readDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore", true); - var tvSecondary = - (TableViewImpl) FieldUtils.readDeclaredField(topBundlesLoadDataStoreSecondary, "tableView", true); - - if (serviceUnitStateChannelPrimary.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) { - assertNotNull(tvPrimary); - assertNull(tvSecondary); - } else { - assertNull(tvPrimary); - assertNotNull(tvSecondary); + + private void makePrimaryAsLeader() throws Exception { + log.info("makePrimaryAsLeader"); + if (channel2.isChannelOwner()) { + pulsar2.getLeaderElectionService().close(); + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + assertTrue(channel1.isChannelOwner()); + }); + pulsar2.getLeaderElectionService().start(); } - restartBroker(); - pulsar1 = pulsar; - setPrimaryLoadManager(); - admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace, - Sets.newHashSet(this.conf.getClusterName())); - - var serviceUnitStateChannelPrimaryNew = - (ServiceUnitStateChannelImpl) FieldUtils.readDeclaredField(primaryLoadManager, - "serviceUnitStateChannel", true); - var topBundlesLoadDataStorePrimaryNew = - (LoadDataStore) FieldUtils.readDeclaredField(primaryLoadManager, "topBundlesLoadDataStore" - , true); - Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { - assertFalse(serviceUnitStateChannelPrimaryNew.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)); - assertNotNull(FieldUtils.readDeclaredField(topBundlesLoadDataStoreSecondary, "tableView" - , true)); - assertNull(FieldUtils.readDeclaredField(topBundlesLoadDataStorePrimaryNew, "tableView" - , true)); - } - ); + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + assertTrue(channel1.isChannelOwner()); + }); + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + assertFalse(channel2.isChannelOwner()); + }); } - @Test - public void testRoleChange() throws Exception { - var topBundlesLoadDataStorePrimary = (LoadDataStore) - FieldUtils.readDeclaredField(primaryLoadManager, "topBundlesLoadDataStore", true); + private void makeSecondaryAsLeader() throws Exception { + log.info("makeSecondaryAsLeader"); + if (channel1.isChannelOwner()) { + pulsar1.getLeaderElectionService().close(); + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + assertTrue(channel2.isChannelOwner()); + }); + pulsar1.getLeaderElectionService().start(); + } + + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + assertTrue(channel2.isChannelOwner()); + }); + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + assertFalse(channel1.isChannelOwner()); + }); + } + + @Test(timeOut = 30 * 1000) + public void testRoleChangeIdempotency() throws Exception { + + makePrimaryAsLeader(); + + var topBundlesLoadDataStorePrimary = primaryLoadManager.getTopBundlesLoadDataStore(); var topBundlesLoadDataStorePrimarySpy = spy(topBundlesLoadDataStorePrimary); AtomicInteger countPri = new AtomicInteger(3); AtomicInteger countPri2 = new AtomicInteger(3); @@ -1255,8 +1248,7 @@ public void testRoleChange() throws Exception { return null; }).when(topBundlesLoadDataStorePrimarySpy).closeTableView(); - var topBundlesLoadDataStoreSecondary = (LoadDataStore) - FieldUtils.readDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore", true); + var topBundlesLoadDataStoreSecondary = secondaryLoadManager.getTopBundlesLoadDataStore(); var topBundlesLoadDataStoreSecondarySpy = spy(topBundlesLoadDataStoreSecondary); AtomicInteger countSec = new AtomicInteger(3); AtomicInteger countSec2 = new AtomicInteger(3); @@ -1284,51 +1276,30 @@ public void testRoleChange() throws Exception { topBundlesLoadDataStoreSecondarySpy, true); - if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) { - primaryLoadManager.playLeader(); - secondaryLoadManager.playFollower(); - verify(topBundlesLoadDataStorePrimarySpy, times(3)).startTableView(); - verify(topBundlesLoadDataStorePrimarySpy, times(5)).closeTableView(); - verify(topBundlesLoadDataStoreSecondarySpy, times(0)).startTableView(); - verify(topBundlesLoadDataStoreSecondarySpy, times(3)).closeTableView(); - } else { - primaryLoadManager.playFollower(); - secondaryLoadManager.playLeader(); - verify(topBundlesLoadDataStoreSecondarySpy, times(3)).startTableView(); - verify(topBundlesLoadDataStoreSecondarySpy, times(5)).closeTableView(); - verify(topBundlesLoadDataStorePrimarySpy, times(0)).startTableView(); - verify(topBundlesLoadDataStorePrimarySpy, times(3)).closeTableView(); - } + + primaryLoadManager.playLeader(); + secondaryLoadManager.playFollower(); + verify(topBundlesLoadDataStorePrimarySpy, times(3)).startTableView(); + verify(topBundlesLoadDataStorePrimarySpy, times(5)).closeTableView(); + verify(topBundlesLoadDataStoreSecondarySpy, times(0)).startTableView(); + verify(topBundlesLoadDataStoreSecondarySpy, times(3)).closeTableView(); + primaryLoadManager.playFollower(); secondaryLoadManager.playFollower(); + assertEquals(ExtensibleLoadManagerImpl.Role.Leader, + primaryLoadManager.getRole()); + assertEquals(ExtensibleLoadManagerImpl.Role.Follower, + secondaryLoadManager.getRole()); - if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) { - assertEquals(ExtensibleLoadManagerImpl.Role.Leader, - FieldUtils.readDeclaredField(primaryLoadManager, "role", true)); - assertEquals(ExtensibleLoadManagerImpl.Role.Follower, - FieldUtils.readDeclaredField(secondaryLoadManager, "role", true)); - } else { - assertEquals(ExtensibleLoadManagerImpl.Role.Follower, - FieldUtils.readDeclaredField(primaryLoadManager, "role", true)); - assertEquals(ExtensibleLoadManagerImpl.Role.Leader, - FieldUtils.readDeclaredField(secondaryLoadManager, "role", true)); - } primaryLoadManager.playLeader(); secondaryLoadManager.playLeader(); + assertEquals(ExtensibleLoadManagerImpl.Role.Leader, + primaryLoadManager.getRole()); + assertEquals(ExtensibleLoadManagerImpl.Role.Follower, + secondaryLoadManager.getRole()); - if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) { - assertEquals(ExtensibleLoadManagerImpl.Role.Leader, - FieldUtils.readDeclaredField(primaryLoadManager, "role", true)); - assertEquals(ExtensibleLoadManagerImpl.Role.Follower, - FieldUtils.readDeclaredField(secondaryLoadManager, "role", true)); - } else { - assertEquals(ExtensibleLoadManagerImpl.Role.Follower, - FieldUtils.readDeclaredField(primaryLoadManager, "role", true)); - assertEquals(ExtensibleLoadManagerImpl.Role.Leader, - FieldUtils.readDeclaredField(secondaryLoadManager, "role", true)); - } } finally { FieldUtils.writeDeclaredField(primaryLoadManager, "topBundlesLoadDataStore", topBundlesLoadDataStorePrimary, true); @@ -1336,6 +1307,98 @@ public void testRoleChange() throws Exception { topBundlesLoadDataStoreSecondary, true); } } + @Test(timeOut = 30 * 1000) + public void testRoleChange() throws Exception { + makePrimaryAsLeader(); + + var leader = primaryLoadManager; + var follower = secondaryLoadManager; + + BrokerLoadData brokerLoadExpected = new BrokerLoadData(); + SystemResourceUsage usage = new SystemResourceUsage(); + var cpu = new ResourceUsage(1.0, 100.0); + String key = "b1"; + usage.setCpu(cpu); + brokerLoadExpected.update(usage, 0, 0, 0, 0, 0, 0, conf); + String bundle = "public/default/0x00000000_0xffffffff"; + TopBundlesLoadData topBundlesExpected = new TopBundlesLoadData(); + topBundlesExpected.getTopBundlesLoadData().clear(); + topBundlesExpected.getTopBundlesLoadData().add(new TopBundlesLoadData.BundleLoadData(bundle, new NamespaceBundleStats())); + + follower.getBrokerLoadDataStore().pushAsync(key, brokerLoadExpected); + follower.getTopBundlesLoadDataStore().pushAsync(bundle, topBundlesExpected); + + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + + assertNotNull(FieldUtils.readDeclaredField(leader.getTopBundlesLoadDataStore(), "tableView", true)); + assertNull(FieldUtils.readDeclaredField(follower.getTopBundlesLoadDataStore(), "tableView", true)); + + for (String internalTopic : ExtensibleLoadManagerImpl.INTERNAL_TOPICS) { + assertTrue(leader.pulsar.getBrokerService().getTopicReference(internalTopic) + .isPresent()); + assertTrue(follower.pulsar.getBrokerService().getTopicReference(internalTopic) + .isEmpty()); + + assertTrue(leader.pulsar.getNamespaceService() + .isServiceUnitOwnedAsync(TopicName.get(internalTopic)).get()); + assertFalse(follower.pulsar.getNamespaceService() + .isServiceUnitOwnedAsync(TopicName.get(internalTopic)).get()); + } + + var actualBrokerLoadLeader = leader.getBrokerLoadDataStore().get(key); + if (actualBrokerLoadLeader.isPresent()) { + assertEquals(actualBrokerLoadLeader.get(), brokerLoadExpected); + } + + var actualTopBundlesLeader = leader.getTopBundlesLoadDataStore().get(bundle); + if (actualTopBundlesLeader.isPresent()) { + assertEquals(actualTopBundlesLeader.get(), topBundlesExpected); + } + + var actualBrokerLoadFollower = follower.getBrokerLoadDataStore().get(key); + if (actualBrokerLoadFollower.isPresent()) { + assertEquals(actualBrokerLoadFollower.get(), brokerLoadExpected); + } + }); + + makeSecondaryAsLeader(); + + var leader2 = secondaryLoadManager; + var follower2 = primaryLoadManager; + + brokerLoadExpected.update(usage, 1, 0, 0, 0, 0, 0, conf); + topBundlesExpected.getTopBundlesLoadData().get(0).stats().msgRateIn = 1; + + follower.getBrokerLoadDataStore().pushAsync(key, brokerLoadExpected); + follower.getTopBundlesLoadDataStore().pushAsync(bundle, topBundlesExpected); + + Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { + assertNotNull(FieldUtils.readDeclaredField(leader2.getTopBundlesLoadDataStore(), "tableView", true)); + assertNull(FieldUtils.readDeclaredField(follower2.getTopBundlesLoadDataStore(), "tableView", true)); + + for (String internalTopic : ExtensibleLoadManagerImpl.INTERNAL_TOPICS) { + assertTrue(leader2.pulsar.getBrokerService().getTopicReference(internalTopic) + .isPresent()); + assertTrue(follower2.pulsar.getBrokerService().getTopicReference(internalTopic) + .isEmpty()); + + assertTrue(leader2.pulsar.getNamespaceService() + .isServiceUnitOwnedAsync(TopicName.get(internalTopic)).get()); + assertFalse(follower2.pulsar.getNamespaceService() + .isServiceUnitOwnedAsync(TopicName.get(internalTopic)).get()); + } + + + var actualBrokerLoadLeader = leader2.getBrokerLoadDataStore().get(key); + assertEquals(actualBrokerLoadLeader.get(), brokerLoadExpected); + + var actualTopBundlesLeader = leader2.getTopBundlesLoadDataStore().get(bundle); + assertEquals(actualTopBundlesLeader.get(), topBundlesExpected); + + var actualBrokerLoadFollower = follower2.getBrokerLoadDataStore().get(key); + assertEquals(actualBrokerLoadFollower.get(), brokerLoadExpected); + }); + } @Test public void testGetMetrics() throws Exception {