From 2d262efb00521f824e28cbeb1a1c9d9aeecde123 Mon Sep 17 00:00:00 2001 From: Alyssa Huang Date: Fri, 22 Sep 2023 11:50:20 -0700 Subject: [PATCH] [MINOR] QuorumController tests use testToImage (#14405) --- .../controller/QuorumControllerTest.java | 82 ++++++++++++++++++- .../kafka/metadata/RecordTestUtils.java | 30 +++++++ 2 files changed, 108 insertions(+), 4 deletions(-) diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index 3a70312ca1c7a..61d4338106883 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -90,12 +91,32 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.controller.QuorumController.ConfigResourceExistenceChecker; +import org.apache.kafka.image.AclsDelta; +import org.apache.kafka.image.AclsImage; +import org.apache.kafka.image.ClientQuotasDelta; +import org.apache.kafka.image.ClientQuotasImage; +import org.apache.kafka.image.ClusterDelta; +import org.apache.kafka.image.ClusterImage; +import org.apache.kafka.image.ConfigurationsDelta; +import org.apache.kafka.image.ConfigurationsImage; +import org.apache.kafka.image.DelegationTokenDelta; +import org.apache.kafka.image.DelegationTokenImage; +import org.apache.kafka.image.FeaturesDelta; +import org.apache.kafka.image.FeaturesImage; +import org.apache.kafka.image.ProducerIdsDelta; +import org.apache.kafka.image.ProducerIdsImage; +import org.apache.kafka.image.ScramDelta; +import org.apache.kafka.image.ScramImage; +import org.apache.kafka.image.TopicsDelta; +import org.apache.kafka.image.TopicsImage; import org.apache.kafka.metadata.BrokerHeartbeatReply; import org.apache.kafka.metadata.BrokerRegistrationFencingChange; import org.apache.kafka.metadata.BrokerRegistrationReply; import org.apache.kafka.metadata.FinalizedControllerFeatures; import org.apache.kafka.metadata.PartitionRegistration; import org.apache.kafka.metadata.RecordTestUtils; +import org.apache.kafka.metadata.RecordTestUtils.ImageDeltaPair; +import org.apache.kafka.metadata.RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper; import org.apache.kafka.metadata.authorizer.StandardAuthorizer; import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; import org.apache.kafka.metadata.migration.ZkMigrationState; @@ -109,7 +130,6 @@ import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.fault.FaultHandlerException; import org.apache.kafka.snapshot.FileRawSnapshotReader; -import org.apache.kafka.snapshot.RawSnapshotReader; import org.apache.kafka.snapshot.Snapshots; import org.apache.kafka.test.TestUtils; import org.apache.kafka.timeline.SnapshotRegistry; @@ -171,6 +191,8 @@ public void testConfigurationOperations() throws Throwable { setBrokerId(0). setClusterId(logEnv.clusterId())).get(); testConfigurationOperations(controlEnv.activeController()); + + testToImages(logEnv.allRecords()); } } @@ -212,6 +234,8 @@ public void testDelayedConfigurationOperations() throws Throwable { setBrokerId(0). setClusterId(logEnv.clusterId())).get(); testDelayedConfigurationOperations(logEnv, controlEnv.activeController()); + + testToImages(logEnv.allRecords()); } } @@ -325,6 +349,8 @@ public void testFenceMultipleBrokers() throws Throwable { // Check that there are imbalaned partitions assertTrue(active.replicationControl().arePartitionLeadersImbalanced()); + + testToImages(logEnv.allRecords()); } } @@ -469,6 +495,8 @@ public void testBalancePartitionLeaders() throws Throwable { TimeUnit.MILLISECONDS.convert(leaderImbalanceCheckIntervalNs * 10, TimeUnit.NANOSECONDS), "Leaders were not balanced after unfencing all of the brokers" ); + + testToImages(logEnv.allRecords()); } } @@ -506,7 +534,6 @@ public void testNoOpRecordWriteAfterTimeout() throws Throwable { "High watermark was not established" ); - final long firstHighWatermark = localLogManager.highWatermark().getAsLong(); TestUtils.waitForCondition( () -> localLogManager.highWatermark().getAsLong() > firstHighWatermark, @@ -585,6 +612,8 @@ public void testUnregisterBroker() throws Throwable { return iterator.next(); }); assertEquals(0, topicPartitionFuture.get().partitionId()); + + testToImages(logEnv.allRecords()); } } @@ -604,7 +633,6 @@ private RegisterBrokerRecord.BrokerFeatureCollection registrationFeatures( public void testSnapshotSaveAndLoad() throws Throwable { final int numBrokers = 4; Map brokerEpochs = new HashMap<>(); - RawSnapshotReader reader = null; Uuid fooId; try ( LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). @@ -655,6 +683,8 @@ public void testSnapshotSaveAndLoad() throws Throwable { new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get(); controlEnv.close(); assertEquals(generateTestRecords(fooId, brokerEpochs), logEnv.allRecords()); + + testToImages(logEnv.allRecords()); } } @@ -784,6 +814,8 @@ public void testTimeouts() throws Throwable { assertYieldsTimeout(electLeadersFuture); assertYieldsTimeout(alterReassignmentsFuture); assertYieldsTimeout(listReassignmentsFuture); + + testToImages(logEnv.allRecords()); } } @@ -833,6 +865,8 @@ public void testEarlyControllerResults() throws Throwable { electLeadersFuture.get(); alterReassignmentsFuture.get(); countDownLatch.countDown(); + + testToImages(logEnv.allRecords()); } } @@ -939,6 +973,8 @@ public void testMissingInMemorySnapshot() throws Exception { partitionsWithReplica2 ) ); + + testToImages(logEnv.allRecords()); } } @@ -980,6 +1016,8 @@ public void testConfigResourceExistenceChecker() throws Throwable { // Topic bar does not exist, so this should throw an exception. assertThrows(UnknownTopicOrPartitionException.class, () -> checker.accept(new ConfigResource(TOPIC, "bar"))); + + testToImages(logEnv.allRecords()); } } @@ -1136,6 +1174,8 @@ public void testUpgradeFromPreProductionVersion() throws Exception { // were already records present. assertEquals(Collections.emptyMap(), active.configurationControl(). getConfigs(new ConfigResource(BROKER, ""))); + + testToImages(logEnv.allRecords()); } } @@ -1177,6 +1217,8 @@ public void testInsertBootstrapRecordsToEmptyLog() throws Exception { return resultOrError.isResult() && Collections.singletonMap("foo", "bar").equals(resultOrError.result()); }, "Failed to see expected config change from bootstrap metadata"); + + testToImages(logEnv.allRecords()); } } @@ -1252,8 +1294,10 @@ public ZkMigrationState checkBootstrapZkMigrationRecord( build(); ) { QuorumController active = controlEnv.activeController(); - return active.appendReadEvent("read migration state", OptionalLong.empty(), + ZkMigrationState zkMigrationState = active.appendReadEvent("read migration state", OptionalLong.empty(), () -> active.featureControl().zkMigrationState()).get(30, TimeUnit.SECONDS); + testToImages(logEnv.allRecords()); + return zkMigrationState; } } @@ -1278,6 +1322,8 @@ public void testUpgradeMigrationStateFrom34() throws Exception { assertEquals(active.featureControl().zkMigrationState(), ZkMigrationState.MIGRATION); assertFalse(active.featureControl().inPreMigrationMode()); } + + testToImages(logEnv.allRecords()); } } @@ -1419,6 +1465,34 @@ public void testMigrationsEnabledForOldBootstrapMetadataVersion() throws Excepti assertEquals(ZkMigrationState.NONE, active.appendReadEvent("read migration state", OptionalLong.empty(), () -> active.featureControl().zkMigrationState()).get(30, TimeUnit.SECONDS)); assertThrows(FaultHandlerException.class, controlEnv::close); + + testToImages(logEnv.allRecords()); + } + } + + /** + * Tests all intermediate images lead to the same final image for each image & delta type. + * @param fromRecords + */ + @SuppressWarnings("unchecked") + private static void testToImages(List fromRecords) { + List> testMatrix = Arrays.asList( + new ImageDeltaPair<>(() -> AclsImage.EMPTY, AclsDelta::new), + new ImageDeltaPair<>(() -> ClientQuotasImage.EMPTY, ClientQuotasDelta::new), + new ImageDeltaPair<>(() -> ClusterImage.EMPTY, ClusterDelta::new), + new ImageDeltaPair<>(() -> ConfigurationsImage.EMPTY, ConfigurationsDelta::new), + new ImageDeltaPair<>(() -> DelegationTokenImage.EMPTY, DelegationTokenDelta::new), + new ImageDeltaPair<>(() -> FeaturesImage.EMPTY, FeaturesDelta::new), + new ImageDeltaPair<>(() -> ProducerIdsImage.EMPTY, ProducerIdsDelta::new), + new ImageDeltaPair<>(() -> ScramImage.EMPTY, ScramDelta::new), + new ImageDeltaPair<>(() -> TopicsImage.EMPTY, TopicsDelta::new) + ); + + // test from empty image stopping each of the various intermediate images along the way + for (ImageDeltaPair pair : testMatrix) { + new TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>( + (Supplier) pair.imageSupplier(), (Function) pair.deltaCreator() + ).test(fromRecords); } } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java index 14f626e93ebe8..2dcccb6dda25e 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java @@ -121,6 +121,24 @@ public static Optional recordAtIndexAs( } } + public static class ImageDeltaPair { + private final Supplier imageSupplier; + private final Function deltaCreator; + + public ImageDeltaPair(Supplier imageSupplier, Function deltaCreator) { + this.imageSupplier = imageSupplier; + this.deltaCreator = deltaCreator; + } + + public Supplier imageSupplier() { + return imageSupplier; + } + + public Function deltaCreator() { + return deltaCreator; + } + } + public static class TestThroughAllIntermediateImagesLeadingToFinalImageHelper { private final Supplier emptyImageSupplier; private final Function deltaUponImageCreator; @@ -187,6 +205,18 @@ public void test(I finalImage, List fromRecords) { } } } + + /** + * Tests applying records in all variations of batch sizes will result in the same image as applying all records in one batch. + * @param fromRecords The list of records to apply. + */ + public void test(List fromRecords) { + D finalImageDelta = createDeltaUponImage(getEmptyImage()); + RecordTestUtils.replayAll(finalImageDelta, fromRecords); + I finalImage = createImageByApplyingDelta(finalImageDelta); + + test(finalImage, fromRecords); + } } /**