Skip to content

Commit

Permalink
[MINOR] QuorumController tests use testToImage (apache#14405)
Browse files Browse the repository at this point in the history
  • Loading branch information
ahuang98 authored Sep 22, 2023
1 parent 5bdea94 commit 2d262ef
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -90,12 +91,32 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.controller.QuorumController.ConfigResourceExistenceChecker;
import org.apache.kafka.image.AclsDelta;
import org.apache.kafka.image.AclsImage;
import org.apache.kafka.image.ClientQuotasDelta;
import org.apache.kafka.image.ClientQuotasImage;
import org.apache.kafka.image.ClusterDelta;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.ConfigurationsDelta;
import org.apache.kafka.image.ConfigurationsImage;
import org.apache.kafka.image.DelegationTokenDelta;
import org.apache.kafka.image.DelegationTokenImage;
import org.apache.kafka.image.FeaturesDelta;
import org.apache.kafka.image.FeaturesImage;
import org.apache.kafka.image.ProducerIdsDelta;
import org.apache.kafka.image.ProducerIdsImage;
import org.apache.kafka.image.ScramDelta;
import org.apache.kafka.image.ScramImage;
import org.apache.kafka.image.TopicsDelta;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.RecordTestUtils.ImageDeltaPair;
import org.apache.kafka.metadata.RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper;
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.migration.ZkMigrationState;
Expand All @@ -109,7 +130,6 @@
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.FaultHandlerException;
import org.apache.kafka.snapshot.FileRawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.Snapshots;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.timeline.SnapshotRegistry;
Expand Down Expand Up @@ -171,6 +191,8 @@ public void testConfigurationOperations() throws Throwable {
setBrokerId(0).
setClusterId(logEnv.clusterId())).get();
testConfigurationOperations(controlEnv.activeController());

testToImages(logEnv.allRecords());
}
}

Expand Down Expand Up @@ -212,6 +234,8 @@ public void testDelayedConfigurationOperations() throws Throwable {
setBrokerId(0).
setClusterId(logEnv.clusterId())).get();
testDelayedConfigurationOperations(logEnv, controlEnv.activeController());

testToImages(logEnv.allRecords());
}
}

Expand Down Expand Up @@ -325,6 +349,8 @@ public void testFenceMultipleBrokers() throws Throwable {

// Check that there are imbalaned partitions
assertTrue(active.replicationControl().arePartitionLeadersImbalanced());

testToImages(logEnv.allRecords());
}
}

Expand Down Expand Up @@ -469,6 +495,8 @@ public void testBalancePartitionLeaders() throws Throwable {
TimeUnit.MILLISECONDS.convert(leaderImbalanceCheckIntervalNs * 10, TimeUnit.NANOSECONDS),
"Leaders were not balanced after unfencing all of the brokers"
);

testToImages(logEnv.allRecords());
}
}

Expand Down Expand Up @@ -506,7 +534,6 @@ public void testNoOpRecordWriteAfterTimeout() throws Throwable {
"High watermark was not established"
);


final long firstHighWatermark = localLogManager.highWatermark().getAsLong();
TestUtils.waitForCondition(
() -> localLogManager.highWatermark().getAsLong() > firstHighWatermark,
Expand Down Expand Up @@ -585,6 +612,8 @@ public void testUnregisterBroker() throws Throwable {
return iterator.next();
});
assertEquals(0, topicPartitionFuture.get().partitionId());

testToImages(logEnv.allRecords());
}
}

Expand All @@ -604,7 +633,6 @@ private RegisterBrokerRecord.BrokerFeatureCollection registrationFeatures(
public void testSnapshotSaveAndLoad() throws Throwable {
final int numBrokers = 4;
Map<Integer, Long> brokerEpochs = new HashMap<>();
RawSnapshotReader reader = null;
Uuid fooId;
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
Expand Down Expand Up @@ -655,6 +683,8 @@ public void testSnapshotSaveAndLoad() throws Throwable {
new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get();
controlEnv.close();
assertEquals(generateTestRecords(fooId, brokerEpochs), logEnv.allRecords());

testToImages(logEnv.allRecords());
}
}

Expand Down Expand Up @@ -784,6 +814,8 @@ public void testTimeouts() throws Throwable {
assertYieldsTimeout(electLeadersFuture);
assertYieldsTimeout(alterReassignmentsFuture);
assertYieldsTimeout(listReassignmentsFuture);

testToImages(logEnv.allRecords());
}
}

Expand Down Expand Up @@ -833,6 +865,8 @@ public void testEarlyControllerResults() throws Throwable {
electLeadersFuture.get();
alterReassignmentsFuture.get();
countDownLatch.countDown();

testToImages(logEnv.allRecords());
}
}

Expand Down Expand Up @@ -939,6 +973,8 @@ public void testMissingInMemorySnapshot() throws Exception {
partitionsWithReplica2
)
);

testToImages(logEnv.allRecords());
}
}

Expand Down Expand Up @@ -980,6 +1016,8 @@ public void testConfigResourceExistenceChecker() throws Throwable {
// Topic bar does not exist, so this should throw an exception.
assertThrows(UnknownTopicOrPartitionException.class,
() -> checker.accept(new ConfigResource(TOPIC, "bar")));

testToImages(logEnv.allRecords());
}
}

Expand Down Expand Up @@ -1136,6 +1174,8 @@ public void testUpgradeFromPreProductionVersion() throws Exception {
// were already records present.
assertEquals(Collections.emptyMap(), active.configurationControl().
getConfigs(new ConfigResource(BROKER, "")));

testToImages(logEnv.allRecords());
}
}

Expand Down Expand Up @@ -1177,6 +1217,8 @@ public void testInsertBootstrapRecordsToEmptyLog() throws Exception {
return resultOrError.isResult() &&
Collections.singletonMap("foo", "bar").equals(resultOrError.result());
}, "Failed to see expected config change from bootstrap metadata");

testToImages(logEnv.allRecords());
}
}

Expand Down Expand Up @@ -1252,8 +1294,10 @@ public ZkMigrationState checkBootstrapZkMigrationRecord(
build();
) {
QuorumController active = controlEnv.activeController();
return active.appendReadEvent("read migration state", OptionalLong.empty(),
ZkMigrationState zkMigrationState = active.appendReadEvent("read migration state", OptionalLong.empty(),
() -> active.featureControl().zkMigrationState()).get(30, TimeUnit.SECONDS);
testToImages(logEnv.allRecords());
return zkMigrationState;
}
}

Expand All @@ -1278,6 +1322,8 @@ public void testUpgradeMigrationStateFrom34() throws Exception {
assertEquals(active.featureControl().zkMigrationState(), ZkMigrationState.MIGRATION);
assertFalse(active.featureControl().inPreMigrationMode());
}

testToImages(logEnv.allRecords());
}
}

Expand Down Expand Up @@ -1419,6 +1465,34 @@ public void testMigrationsEnabledForOldBootstrapMetadataVersion() throws Excepti
assertEquals(ZkMigrationState.NONE, active.appendReadEvent("read migration state", OptionalLong.empty(),
() -> active.featureControl().zkMigrationState()).get(30, TimeUnit.SECONDS));
assertThrows(FaultHandlerException.class, controlEnv::close);

testToImages(logEnv.allRecords());
}
}

/**
* Tests all intermediate images lead to the same final image for each image & delta type.
* @param fromRecords
*/
@SuppressWarnings("unchecked")
private static void testToImages(List<ApiMessageAndVersion> fromRecords) {
List<ImageDeltaPair<?, ?>> testMatrix = Arrays.asList(
new ImageDeltaPair<>(() -> AclsImage.EMPTY, AclsDelta::new),
new ImageDeltaPair<>(() -> ClientQuotasImage.EMPTY, ClientQuotasDelta::new),
new ImageDeltaPair<>(() -> ClusterImage.EMPTY, ClusterDelta::new),
new ImageDeltaPair<>(() -> ConfigurationsImage.EMPTY, ConfigurationsDelta::new),
new ImageDeltaPair<>(() -> DelegationTokenImage.EMPTY, DelegationTokenDelta::new),
new ImageDeltaPair<>(() -> FeaturesImage.EMPTY, FeaturesDelta::new),
new ImageDeltaPair<>(() -> ProducerIdsImage.EMPTY, ProducerIdsDelta::new),
new ImageDeltaPair<>(() -> ScramImage.EMPTY, ScramDelta::new),
new ImageDeltaPair<>(() -> TopicsImage.EMPTY, TopicsDelta::new)
);

// test from empty image stopping each of the various intermediate images along the way
for (ImageDeltaPair<?, ?> pair : testMatrix) {
new TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>(
(Supplier<Object>) pair.imageSupplier(), (Function<Object, Object>) pair.deltaCreator()
).test(fromRecords);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,24 @@ public static <T extends ApiMessage> Optional<T> recordAtIndexAs(
}
}

public static class ImageDeltaPair<I, D> {
private final Supplier<I> imageSupplier;
private final Function<I, D> deltaCreator;

public ImageDeltaPair(Supplier<I> imageSupplier, Function<I, D> deltaCreator) {
this.imageSupplier = imageSupplier;
this.deltaCreator = deltaCreator;
}

public Supplier<I> imageSupplier() {
return imageSupplier;
}

public Function<I, D> deltaCreator() {
return deltaCreator;
}
}

public static class TestThroughAllIntermediateImagesLeadingToFinalImageHelper<D, I> {
private final Supplier<I> emptyImageSupplier;
private final Function<I, D> deltaUponImageCreator;
Expand Down Expand Up @@ -187,6 +205,18 @@ public void test(I finalImage, List<ApiMessageAndVersion> fromRecords) {
}
}
}

/**
* Tests applying records in all variations of batch sizes will result in the same image as applying all records in one batch.
* @param fromRecords The list of records to apply.
*/
public void test(List<ApiMessageAndVersion> fromRecords) {
D finalImageDelta = createDeltaUponImage(getEmptyImage());
RecordTestUtils.replayAll(finalImageDelta, fromRecords);
I finalImage = createImageByApplyingDelta(finalImageDelta);

test(finalImage, fromRecords);
}
}

/**
Expand Down

0 comments on commit 2d262ef

Please sign in to comment.