Skip to content

Commit

Permalink
KAFKA-16845 Migrate ReplicationQuotasTestRig to new test infra (apach…
Browse files Browse the repository at this point in the history
…e#17089)

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
wernerdv authored Oct 24, 2024
1 parent 5311839 commit 1eb7644
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 88 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2484,6 +2484,7 @@ project(':tools') {
testImplementation project(':connect:runtime').sourceSets.test.output
testImplementation project(':storage:storage-api').sourceSets.main.output
testImplementation project(':storage').sourceSets.test.output
testImplementation project(':test-common')
testImplementation libs.junitJupiter
testImplementation libs.mockitoCore
testImplementation libs.mockitoJunitJupiter // supports MockitoExtension
Expand Down
2 changes: 2 additions & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@

<subpackage name="tools">
<allow pkg="org.apache.kafka.common"/>
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metadata.properties" />
<allow pkg="org.apache.kafka.network" />
<allow pkg="org.apache.kafka.server.util" />
Expand Down Expand Up @@ -316,6 +317,7 @@
<allow pkg="org.apache.kafka.tools" />
<allow pkg="org.apache.kafka.tools.api" />
<allow pkg="org.apache.kafka.tools.filter" />
<allow pkg="org.apache.kafka.image" />

<subpackage name="consumer">
<allow pkg="org.apache.kafka.tools"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@
package org.apache.kafka.tools.other;

import kafka.log.UnifiedLog;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.QuorumTestHarness;
import kafka.utils.EmptyTestInfo;
import kafka.server.BrokerServer;
import kafka.server.KafkaBroker;
import kafka.utils.TestUtils;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.MetricName;
Expand All @@ -33,9 +31,13 @@
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.server.quota.QuotaType;
import org.apache.kafka.tools.reassign.ReassignPartitionsCommand;

Expand All @@ -57,13 +59,11 @@
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
Expand All @@ -74,8 +74,6 @@
import javax.imageio.ImageIO;

import scala.Option;
import scala.collection.Seq;
import scala.jdk.javaapi.CollectionConverters;

import static java.nio.file.StandardOpenOption.APPEND;
import static java.nio.file.StandardOpenOption.CREATE;
Expand All @@ -85,7 +83,7 @@
* Test rig for measuring throttling performance. Configure the parameters for a set of experiments, then execute them
* and view the html output file, with charts, that are produced. You can also render the charts to the screen if
* you wish.
*
* <p>
* Currently you'll need about 40GB of disk space to run these experiments (largest data written x2). Tune the msgSize
* & #partitions and throttle to adjust this.
*/
Expand Down Expand Up @@ -129,7 +127,6 @@ public static void main(String[] args) {
static void run(ExperimentDef config, Journal journal, boolean displayChartsOnScreen) {
Experiment experiment = new Experiment();
try {
experiment.setUp(new EmptyTestInfo());
experiment.run(config, journal, displayChartsOnScreen);
journal.footer();
} catch (Exception e) {
Expand Down Expand Up @@ -159,69 +156,93 @@ public ExperimentDef(String name, int brokers, int partitions, long throttle, in
}
}

static class Experiment extends QuorumTestHarness {
static class Experiment {
static final String TOPIC_NAME = "my-topic";

String experimentName = "unset";
List<KafkaServer> servers;
Map<Integer, List<Double>> leaderRates = new HashMap<>();
Map<Integer, List<Double>> followerRates = new HashMap<>();
KafkaClusterTestKit cluster;
Admin adminClient;

void startBrokers(List<Integer> brokerIds) {
void startBrokers(int numBrokerNodes) {
System.out.println("Starting Brokers");
servers = brokerIds.stream().map(i -> createBrokerConfig(i, zkConnect()))
.map(c -> TestUtils.createServer(KafkaConfig.fromProps(c), Time.SYSTEM))
.collect(Collectors.toList());

TestUtils.waitUntilBrokerMetadataIsPropagated(seq(servers), DEFAULT_MAX_WAIT_MS);
String brokerList = TestUtils.plaintextBootstrapServers(seq(servers));
adminClient = Admin.create(Collections.singletonMap(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList
));

try {
cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder()
.setNumControllerNodes(1)
.setNumBrokerNodes(numBrokerNodes)
.build()
).build();
cluster.format();
cluster.startup();
cluster.waitForReadyBrokers();
} catch (Exception e) {
throw new RuntimeException("Failed to start test Kafka cluster", e);
}

adminClient = Admin.create(cluster.clientProperties());
}

@Override public void tearDown() {
public void tearDown() {
Utils.closeQuietly(adminClient, "adminClient");
TestUtils.shutdownServers(seq(servers), true);
super.tearDown();
try {
cluster.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@SuppressWarnings("unchecked")
public void run(ExperimentDef config, Journal journal, boolean displayChartsOnScreen) throws Exception {
experimentName = config.name;
List<Integer> brokers = IntStream.rangeClosed(100, 100 + config.brokers).boxed().collect(Collectors.toList());
int shift = Math.round(config.brokers / 2f);

IntSupplier nextReplicaRoundRobin = new IntSupplier() {
int count = 0;

@Override public int getAsInt() {
count++;
return 100 + (count + shift) % config.brokers;
return (count + shift) % config.brokers;
}
};

Map<Integer, Seq<Integer>> replicas = IntStream.rangeClosed(0, config.partitions).boxed().collect(Collectors.toMap(
Map<Integer, List<Integer>> replicas = IntStream.rangeClosed(0, config.partitions - 1).boxed().collect(Collectors.toMap(
Function.identity(),
partition -> seq(Collections.singleton(nextReplicaRoundRobin.getAsInt()))
partition -> Collections.singletonList(nextReplicaRoundRobin.getAsInt())
));

startBrokers(brokers);
TestUtils.createTopic(zkClient(), TOPIC_NAME, (scala.collection.Map) CollectionConverters.asScala(replicas), seq(servers));
startBrokers(config.brokers);
adminClient.createTopics(Collections.singleton(new NewTopic(TOPIC_NAME, replicas))).all().get();

TestUtils.waitUntilTrue(
() -> cluster.brokers().values().stream().allMatch(server -> {
TopicImage image = server.metadataCache().currentImage().topics().getTopic(TOPIC_NAME);
return image != null && image.partitions().values().stream().allMatch(PartitionRegistration::hasLeader);
}),
() -> "Timed out waiting for topic listing",
DEFAULT_MAX_WAIT_MS,
500L
);

System.out.println("Writing Data");
KafkaProducer<byte[], byte[]> producer = createProducer(TestUtils.plaintextBootstrapServers(seq(servers)));

for (int x = 0; x < config.msgsPerPartition; x++) {
for (int partition = 0; partition < config.partitions; partition++) {
producer.send(new ProducerRecord<>(TOPIC_NAME, partition, null, new byte[config.msgSize]));
try (KafkaProducer<byte[], byte[]> producer = createProducer()) {
for (int x = 0; x < config.msgsPerPartition; x++) {
for (int partition = 0; partition < config.partitions; partition++) {
producer.send(new ProducerRecord<>(TOPIC_NAME, partition, null, new byte[config.msgSize]));
}
}
}

System.out.println("Generating Reassignment");
Map<TopicPartition, List<Integer>> newAssignment = ReassignPartitionsCommand.generateAssignment(adminClient,
json(TOPIC_NAME), brokers.stream().map(Object::toString).collect(Collectors.joining(",")), true).getKey();
Map<TopicPartition, List<Integer>> newAssignment = ReassignPartitionsCommand.generateAssignment(
adminClient,
json(TOPIC_NAME),
cluster.brokers().values().stream()
.map(server -> String.valueOf(server.replicaManager().localBrokerId()))
.collect(Collectors.joining(",")),
true
).getKey();

System.out.println("Starting Reassignment");
long start = System.currentTimeMillis();
Expand All @@ -246,20 +267,21 @@ public void run(ExperimentDef config, Journal journal, boolean displayChartsOnSc

void validateAllOffsetsMatch(ExperimentDef config) {
//Validate that offsets are correct in all brokers
for (KafkaServer broker : servers) {
for (KafkaBroker broker : cluster.brokers().values()) {
for (int partitionId = 0; partitionId < config.partitions; partitionId++) {
long offset = broker.getLogManager().getLog(new TopicPartition(TOPIC_NAME, partitionId), false).map(UnifiedLog::logEndOffset).getOrElse(() -> -1L);
long offset = broker.logManager().getLog(new TopicPartition(TOPIC_NAME, partitionId), false)
.map(UnifiedLog::logEndOffset).getOrElse(() -> -1L);
if (offset >= 0 && offset != config.msgsPerPartition) {
throw new RuntimeException(
"Run failed as offsets did not match for partition " + partitionId + " on broker " + broker.config().brokerId() + ". " +
"Run failed as offsets did not match for partition " + partitionId + " on broker " + broker.config().nodeId() + ". " +
"Expected " + config.msgsPerPartition + " but was " + offset + "."
);
}
}
}
}

void logOutput(ExperimentDef config, Map<Integer, Seq<Integer>> replicas, Map<TopicPartition, List<Integer>> newAssignment) throws Exception {
void logOutput(ExperimentDef config, Map<Integer, List<Integer>> replicas, Map<TopicPartition, List<Integer>> newAssignment) throws Exception {
List<TopicPartitionInfo> actual = adminClient.describeTopics(Collections.singleton(TOPIC_NAME))
.allTopicNames().get().get(TOPIC_NAME).partitions();

Expand All @@ -272,7 +294,7 @@ void logOutput(ExperimentDef config, Map<Integer, Seq<Integer>> replicas, Map<To
System.out.println("The replicas are " + new TreeMap<>(replicas).entrySet().stream().map(e -> "\n" + e).collect(Collectors.joining()));
System.out.println("This is the current replica assignment:\n" + curAssignment);
System.out.println("proposed assignment is: \n" + newAssignment);
System.out.println("This is the assignment we ended up with" + curAssignment);
System.out.println("This is the assignment we ended up with " + curAssignment);

//Test Stats
System.out.println("numBrokers: " + config.brokers);
Expand Down Expand Up @@ -341,29 +363,29 @@ XYSeriesCollection addDataToChart(Map<Integer, List<Double>> data) {
return dataset;
}

void record(Map<Integer, List<Double>> rates, int brokerId, Double currentRate) {
List<Double> leaderRatesBroker = rates.getOrDefault(brokerId, new ArrayList<>());
void record(Map<Integer, List<Double>> rates, int nodeId, Double currentRate) {
List<Double> leaderRatesBroker = rates.getOrDefault(nodeId, new ArrayList<>());
leaderRatesBroker.add(currentRate);
rates.put(brokerId, leaderRatesBroker);
rates.put(nodeId, leaderRatesBroker);
}

void printRateMetrics() {
for (KafkaServer broker : servers) {
for (BrokerServer broker : cluster.brokers().values()) {
double leaderRate = measuredRate(broker, QuotaType.LEADER_REPLICATION);
if (broker.config().brokerId() == 100)
LOGGER.info("waiting... Leader rate on 101 is " + leaderRate);
record(leaderRates, broker.config().brokerId(), leaderRate);
if (broker.config().nodeId() == 0)
LOGGER.info("waiting... Leader rate on 1 is {}", leaderRate);
record(leaderRates, broker.config().nodeId(), leaderRate);
if (leaderRate > 0)
LOGGER.trace("Leader Rate on " + broker.config().brokerId() + " is " + leaderRate);
LOGGER.trace("Leader Rate on {} is {}", broker.config().nodeId(), leaderRate);

double followerRate = measuredRate(broker, QuotaType.FOLLOWER_REPLICATION);
record(followerRates, broker.config().brokerId(), followerRate);
record(followerRates, broker.config().nodeId(), followerRate);
if (followerRate > 0)
LOGGER.trace("Follower Rate on " + broker.config().brokerId() + " is " + followerRate);
LOGGER.trace("Follower Rate on {} is {}", broker.config().nodeId(), followerRate);
}
}

private double measuredRate(KafkaServer broker, QuotaType repType) {
private double measuredRate(KafkaBroker broker, QuotaType repType) {
MetricName metricName = broker.metrics().metricName("byte-rate", repType.toString());
return broker.metrics().metrics().containsKey(metricName)
? (double) broker.metrics().metrics().get(metricName).metricValue()
Expand All @@ -375,10 +397,10 @@ String json(String... topic) {
return "{\"topics\": [" + topicStr + "],\"version\":1}";
}

KafkaProducer<byte[], byte[]> createProducer(String brokerList) {
KafkaProducer<byte[], byte[]> createProducer() {
return TestUtils.createProducer(
brokerList,
0,
cluster.bootstrapServers(),
1,
60 * 1000L,
1024L * 1024L,
Integer.MAX_VALUE,
Expand All @@ -395,31 +417,6 @@ KafkaProducer<byte[], byte[]> createProducer(String brokerList) {
false
);
}

Properties createBrokerConfig(int brokerId, String zkConnect) {
return TestUtils.createBrokerConfig(
brokerId,
zkConnect,
false, // shorten test time
true,
TestUtils.RandomPort(),
Option.empty(),
Option.empty(),
Option.empty(),
true,
false,
TestUtils.RandomPort(),
false,
TestUtils.RandomPort(),
false,
TestUtils.RandomPort(),
Option.empty(),
3,
false,
1,
(short) 1,
false);
}
}

static class Journal {
Expand Down Expand Up @@ -476,8 +473,4 @@ String path() {
return log.getAbsolutePath();
}
}

private static <T> Seq<T> seq(Collection<T> seq) {
return CollectionConverters.asScala(seq).toSeq();
}
}

0 comments on commit 1eb7644

Please sign in to comment.