diff --git a/README.md b/README.md index a5022ed3cf..ffee3952d1 100644 --- a/README.md +++ b/README.md @@ -15,9 +15,8 @@ This project offers many kafka tools to simplify the life for kafka users. 1. [Kafka quick start](#kafka-cluster-quick-start): set up a true kafka cluster in one minute 2. [Kafka performance](#Performance-Benchmark): check producing/consuming performance. 3. [Kafka metric explorer](#kafka-metric-explorer): utility for accessing kafka Mbean metrics via JMX. -4. [Kafka partition score](#Kafka-partition-score): score all broker's partitions. -5. [Kafka replica syncing monitor](#Kafka-replica-syncing-monitor): Tracking replica syncing progress. -6. [Astraea Web Server 中文文件連結](./docs/web_server/README.md) +4. [Kafka replica syncing monitor](#Kafka-replica-syncing-monitor): Tracking replica syncing progress. +5. [Astraea Web Server 中文文件連結](./docs/web_server/README.md) [Github packages](https://github.com/orgs/skiptests/packages?repo_name=astraea) offers the docker image to run mentioned tools ```shell @@ -254,22 +253,6 @@ Run the tool from release --- -## Kafka Partition Score - -This tool will score the partition on brokers, the higher score the heavier load. - -### Start scoring partitions on broker address "192.168.103.39:9092" - -```shell -./gradlew run --args="score --bootstrap.servers 192.168.103.39:9092" -``` - -### Partition Score Configurations - -1. --bootstrap.servers: the server to connect to -2. --exclude.internal.topics: True if you want to ignore internal topics like _consumer_offsets while counting score. -3. --hide.balanced: True if you want to hide topics and partitions thar already balanced.:q - ## Kafka Replica Syncing Monitor This tool will track partition replica syncing progress. This tool can be used to observe @@ -316,4 +299,4 @@ $ ./gradlew run --args="monitor --bootstrap.servers 192.168.103.39:9092" 2. --interval: the frequency(time interval in second) to check replica state, support floating point value. (default: 1 second) 3. --prop.file: the path to a file that containing the properties to be passed to kafka admin. 4. --topic: topics to track (default: track all non-synced partition by default) -5. --track: keep track even if all the replicas are synced. Also attempts to discover any non-synced replicas. (default: false) \ No newline at end of file +5. --track: keep track even if all the replicas are synced. Also attempts to discover any non-synced replicas. (default: false) diff --git a/app/src/main/java/org/astraea/app/App.java b/app/src/main/java/org/astraea/app/App.java index b6e23e3212..c117782288 100644 --- a/app/src/main/java/org/astraea/app/App.java +++ b/app/src/main/java/org/astraea/app/App.java @@ -23,7 +23,6 @@ import java.util.Map; import org.astraea.app.admin.ReplicaSyncingMonitor; import org.astraea.app.automation.Automation; -import org.astraea.app.cost.topic.PartitionScore; import org.astraea.app.metrics.MetricExplorer; import org.astraea.app.performance.Performance; import org.astraea.app.web.WebService; @@ -32,7 +31,6 @@ public class App { private static final Map> MAIN_CLASSES = Map.of( "metrics", MetricExplorer.class, - "score", PartitionScore.class, "performance", Performance.class, "monitor", ReplicaSyncingMonitor.class, "automation", Automation.class, diff --git a/app/src/main/java/org/astraea/app/cost/topic/CalculateUtils.java b/app/src/main/java/org/astraea/app/cost/topic/CalculateUtils.java deleted file mode 100644 index 54ee0dc89c..0000000000 --- a/app/src/main/java/org/astraea/app/cost/topic/CalculateUtils.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.app.cost.topic; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.function.Function; -import java.util.stream.Collectors; -import org.astraea.app.admin.TopicPartition; - -public class CalculateUtils { - public static Map> getLoad( - Map> brokerPartitionSize, - Map retentionMillis) { - Map> brokerPartitionLoad = new HashMap<>(); - - brokerPartitionSize - .keySet() - .forEach( - (broker) -> { - Map partitionLoad = - brokerPartitionSize.get(broker).keySet().stream() - .filter(partition -> retentionMillis.containsKey(partition.topic())) - .collect( - Collectors.toMap( - Function.identity(), - partition -> - (double) brokerPartitionSize.get(broker).get(partition) - / retentionMillis.get(partition.topic()))); - brokerPartitionLoad.put(broker, partitionLoad); - }); - return brokerPartitionLoad; - } - - private static Double countSum(Set in) { - return in.stream().mapToDouble(i -> i).sum(); - } - - public static Map> getScore( - Map> load) { - Map brokerLoad = new HashMap<>(); - Map partitionLoad = new HashMap<>(); - Map partitionSD = new HashMap<>(); - Map partitionMean = new HashMap<>(); - Map> brokerPartitionScore = new HashMap<>(); - double brokerSD; - Set loadSet = new HashSet<>(); - Set LoadSQR = new HashSet<>(); - - load.keySet() - .forEach( - broker -> { - load.get(broker) - .keySet() - .forEach( - tp -> { - loadSet.add(load.get(broker).get(tp)); - partitionLoad.put(tp, load.get(broker).get(tp)); - LoadSQR.add(Math.pow(load.get(broker).get(tp), 2)); - }); - var loadSum = countSum(loadSet); - var partitionNum = load.get(broker).keySet().size(); - brokerLoad.put(broker, loadSum); - var mean = loadSum / load.get(broker).size(); - partitionMean.put(broker, loadSum / load.get(broker).size()); - var SD = - Math.pow((countSum(LoadSQR) - mean * mean * partitionNum) / partitionNum, 0.5); - partitionSD.put(broker, SD); - loadSet.clear(); - LoadSQR.clear(); - }); - - brokerLoad - .keySet() - .forEach( - broker -> { - loadSet.add(brokerLoad.get(broker)); - LoadSQR.add(Math.pow(brokerLoad.get(broker), 2)); - }); - var brokerLoadMean = countSum(loadSet) / brokerLoad.keySet().size(); - var brokerLoadSQR = countSum(LoadSQR); - brokerSD = - Math.pow( - (brokerLoadSQR - brokerLoadMean * brokerLoadMean * brokerLoad.keySet().size()) - / brokerLoad.keySet().size(), - 0.5); - load.keySet() - .forEach( - broker -> { - var partitionScore = new TreeMap(); - load.get(broker) - .keySet() - .forEach( - topicPartition -> { - if (brokerLoad.get(broker) - brokerLoadMean > 0) { - partitionScore.put( - topicPartition, - Math.round( - (((brokerLoad.get(broker) - brokerLoadMean) / brokerSD) - * ((partitionLoad.get(topicPartition) - - partitionMean.get(broker)) - / partitionSD.get(broker)) - * 60.0) - * 100.0) - / 100.0); - } else { - partitionScore.put(topicPartition, 0.0); - } - brokerPartitionScore.put(broker, partitionScore); - }); - }); - return brokerPartitionScore; - } -} diff --git a/app/src/main/java/org/astraea/app/cost/topic/GetPartitionInf.java b/app/src/main/java/org/astraea/app/cost/topic/GetPartitionInf.java deleted file mode 100644 index a8b39918a1..0000000000 --- a/app/src/main/java/org/astraea/app/cost/topic/GetPartitionInf.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.app.cost.topic; - -import java.util.HashMap; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.TreeMap; -import java.util.stream.Collectors; -import org.astraea.app.admin.Admin; -import org.astraea.app.admin.TopicPartition; - -public class GetPartitionInf { - static Map> getSize(Admin client) { - Map> brokerPartitionSize = new HashMap<>(); - client - .brokerIds() - .forEach( - (broker) -> { - var partitionSize = new TreeMap(); - client - .replicas(client.topicNames()) - .forEach( - (tp, assignment) -> { - assignment.forEach( - partition -> { - if (partition.broker() == broker) - partitionSize.put(tp, (int) partition.size()); - }); - brokerPartitionSize.put(broker, partitionSize); - }); - }); - return brokerPartitionSize; - } - - static Map getRetentionMillis(Admin client) { - return client.topics().entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - entry -> - entry - .getValue() - .value("retention.ms") - .map(Integer::parseInt) - .orElseThrow( - () -> new NoSuchElementException("retention.ms does not exist")))); - } -} diff --git a/app/src/main/java/org/astraea/app/cost/topic/PartitionScore.java b/app/src/main/java/org/astraea/app/cost/topic/PartitionScore.java deleted file mode 100644 index 59fd058279..0000000000 --- a/app/src/main/java/org/astraea/app/cost/topic/PartitionScore.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.app.cost.topic; - -import com.beust.jcommander.Parameter; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; -import org.astraea.app.admin.Admin; -import org.astraea.app.admin.TopicPartition; -import org.astraea.app.argument.BooleanField; - -public class PartitionScore { - public static void printScore( - Map> score, Argument argument) { - List partitionGood = new ArrayList<>(); - Map brokerGood = new HashMap<>(); - score - .keySet() - .forEach( - broker -> { - brokerGood.put(broker, true); - score - .get(broker) - .keySet() - .forEach( - tp -> { - if (score.get(broker).get(tp) > 0) brokerGood.put(broker, false); - }); - - if (!brokerGood.get(broker)) { - System.out.println("\nbroker: " + broker); - score - .get(broker) - .keySet() - .forEach( - tp -> { - if (score.get(broker).get(tp) > 0) { - System.out.println(tp + ": " + score.get(broker).get(tp)); - } else { - partitionGood.add(tp); - } - }); - } - }); - if (!argument.hideBalanced) { - System.out.println( - "\nThe following brokers are balanced: " - + brokerGood.entrySet().stream() - .filter(Map.Entry::getValue) - .map(Map.Entry::getKey) - .collect(Collectors.toSet())); - - System.out.println( - "The following partitions are balanced: " - + partitionGood.stream() - .sorted() - .map(String::valueOf) - .collect(Collectors.joining(", ", "[", "]"))); - } - } - - public static Map> execute(Argument argument, Admin admin) { - var internalTopic = - Set.of( - "__consumer_offsets", - "_confluent-command", - "_confluent-metrics", - "_confluent-telemetry-metrics", - "__transaction_state"); - var brokerPartitionSize = GetPartitionInf.getSize(admin); - var retentionMillis = GetPartitionInf.getRetentionMillis(admin); - if (argument.excludeInternalTopic) internalTopic.forEach(retentionMillis::remove); - var load = CalculateUtils.getLoad(brokerPartitionSize, retentionMillis); - return CalculateUtils.getScore(load); - } - - public static void main(String[] args) { - var argument = org.astraea.app.argument.Argument.parse(new Argument(), args); - var admin = Admin.of(argument.bootstrapServers()); - var score = execute(argument, admin); - printScore(score, argument); - } - - static class Argument extends org.astraea.app.argument.Argument { - @Parameter( - names = {"--exclude.internal.topics"}, - description = - "True if you want to ignore internal topics like _consumer_offsets while counting score.", - validateWith = BooleanField.class, - converter = BooleanField.class) - boolean excludeInternalTopic = false; - - @Parameter( - names = {"--hide.balanced"}, - description = "True if you want to hide topics and partitions thar already balanced.", - validateWith = BooleanField.class, - converter = BooleanField.class) - boolean hideBalanced = false; - } -} diff --git a/app/src/test/java/org/astraea/app/cost/topic/CalculateUtilsTest.java b/app/src/test/java/org/astraea/app/cost/topic/CalculateUtilsTest.java deleted file mode 100644 index ce8ca6645a..0000000000 --- a/app/src/test/java/org/astraea/app/cost/topic/CalculateUtilsTest.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.app.cost.topic; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import java.util.HashMap; -import java.util.Map; -import org.astraea.app.admin.TopicPartition; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; - -public class CalculateUtilsTest { - static Map> fakeBrokerPartitionSize = new HashMap<>(); - static Map fakeRetentionMillis = new HashMap<>(); - static Map> fakePartitionLoad = new HashMap<>(); - - @BeforeAll - static void setup() { - // set partition size and retention time - Map fakePartitionSize = new HashMap<>(); - fakePartitionSize.put(TopicPartition.of("test0", 0), 100000000); - fakePartitionSize.put(TopicPartition.of("test0", 1), 200000000); - fakePartitionSize.put(TopicPartition.of("test0", 2), 300000000); - fakePartitionSize.put(TopicPartition.of("test0", 3), 400000000); - fakeBrokerPartitionSize.put(0, fakePartitionSize); - fakePartitionSize = new HashMap<>(); - fakePartitionSize.put(TopicPartition.of("test1", 0), 500000000); - fakePartitionSize.put(TopicPartition.of("test1", 1), 600000000); - fakePartitionSize.put(TopicPartition.of("test1", 2), 700000000); - fakePartitionSize.put(TopicPartition.of("test1", 3), 800000000); - fakeBrokerPartitionSize.put(1, fakePartitionSize); - fakeRetentionMillis.put("test0", 604800000); - fakeRetentionMillis.put("test1", 604800000); - - // set partition load - fakePartitionLoad = new HashMap<>(); - Map fakeTopicLoad = new HashMap<>(); - fakeTopicLoad.put(TopicPartition.of("test0", 0), 0.1); - fakeTopicLoad.put(TopicPartition.of("test0", 1), 0.2); - fakeTopicLoad.put(TopicPartition.of("test0", 2), 0.3); - fakeTopicLoad.put(TopicPartition.of("test0", 3), 0.4); - fakePartitionLoad.put(0, fakeTopicLoad); - fakeTopicLoad = new HashMap<>(); - fakeTopicLoad.put(TopicPartition.of("test1", 0), 1.5); - fakeTopicLoad.put(TopicPartition.of("test1", 1), 1.6); - fakeTopicLoad.put(TopicPartition.of("test1", 2), 1.7); - fakeTopicLoad.put(TopicPartition.of("test1", 3), 1.8); - fakePartitionLoad.put(1, fakeTopicLoad); - } - - @Test - void testGetLoad() { - var Load = CalculateUtils.getLoad(fakeBrokerPartitionSize, fakeRetentionMillis); - assertEquals(2, Load.size()); - assertEquals(4, Load.get(0).size()); - assertEquals(4, Load.get(1).size()); - assertEquals(0.17, round(Load.get(0).get(TopicPartition.of("test0", 0)))); - assertEquals(0.33, round(Load.get(0).get(TopicPartition.of("test0", 1)))); - assertEquals(0.50, round(Load.get(0).get(TopicPartition.of("test0", 2)))); - assertEquals(0.66, round(Load.get(0).get(TopicPartition.of("test0", 3)))); - assertEquals(0.83, round(Load.get(1).get(TopicPartition.of("test1", 0)))); - assertEquals(0.99, round(Load.get(1).get(TopicPartition.of("test1", 1)))); - assertEquals(1.16, round(Load.get(1).get(TopicPartition.of("test1", 2)))); - assertEquals(1.32, round(Load.get(1).get(TopicPartition.of("test1", 3)))); - } - - @Test - void testGetScore() { - var Score = CalculateUtils.getScore(fakePartitionLoad); - assertEquals(2, Score.size()); - assertEquals(4, Score.get(0).size()); - assertEquals(4, Score.get(1).size()); - assertEquals(0.0, round(Score.get(0).get(TopicPartition.of("test0", 0)))); - assertEquals(0.0, round(Score.get(0).get(TopicPartition.of("test0", 1)))); - assertEquals(0.0, round(Score.get(0).get(TopicPartition.of("test0", 2)))); - assertEquals(0.0, round(Score.get(0).get(TopicPartition.of("test0", 3)))); - assertEquals(-80.50, round(Score.get(1).get(TopicPartition.of("test1", 0)))); - assertEquals(-26.83, round(Score.get(1).get(TopicPartition.of("test1", 1)))); - assertEquals(26.83, round(Score.get(1).get(TopicPartition.of("test1", 2)))); - assertEquals(80.50, round(Score.get(1).get(TopicPartition.of("test1", 3)))); - } - - double round(double score) { - return Math.round(100 * score) / 100.0; - } -} diff --git a/app/src/test/java/org/astraea/app/cost/topic/GetPartitionInfTest.java b/app/src/test/java/org/astraea/app/cost/topic/GetPartitionInfTest.java deleted file mode 100644 index 48f1508ccb..0000000000 --- a/app/src/test/java/org/astraea/app/cost/topic/GetPartitionInfTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.app.cost.topic; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import java.time.Duration; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import org.astraea.app.admin.Admin; -import org.astraea.app.common.Utils; -import org.astraea.app.producer.Producer; -import org.astraea.app.producer.Serializer; -import org.astraea.app.service.RequireBrokerCluster; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; - -public class GetPartitionInfTest extends RequireBrokerCluster { - static Admin admin; - - @BeforeAll - static void setup() throws ExecutionException, InterruptedException { - admin = Admin.of(bootstrapServers()); - Map topicName = new HashMap<>(); - topicName.put(0, "testPartitionScore0"); - topicName.put(1, "testPartitionScore1"); - topicName.put(2, "testPartitionScore2"); - - try (var admin = Admin.of(bootstrapServers())) { - admin - .creator() - .topic(topicName.get(0)) - .numberOfPartitions(4) - .numberOfReplicas((short) 1) - .create(); - admin - .creator() - .topic(topicName.get(1)) - .numberOfPartitions(4) - .numberOfReplicas((short) 1) - .create(); - admin - .creator() - .topic(topicName.get(2)) - .numberOfPartitions(4) - .numberOfReplicas((short) 1) - .create(); - // wait for topic - Utils.sleep(Duration.ofSeconds(5)); - } - var producer = - Producer.builder() - .bootstrapServers(bootstrapServers()) - .keySerializer(Serializer.STRING) - .build(); - int size = 10000; - for (int t = 0; t <= 2; t++) { - for (int p = 0; p <= 3; p++) { - producer - .sender() - .topic(topicName.get(t)) - .partition(p) - .value(new byte[size]) - .run() - .toCompletableFuture() - .get(); - } - size += 10000; - } - } - - @Test - void testGetSize() { - var brokerPartitionSize = GetPartitionInf.getSize(admin); - assertEquals(3, brokerPartitionSize.size()); - assertEquals( - 3 * 4, - brokerPartitionSize.get(0).size() - + brokerPartitionSize.get(1).size() - + brokerPartitionSize.get(2).size()); - } - - @Test - void testGetRetentionMillis() { - var brokerPartitionRetentionMillis = GetPartitionInf.getRetentionMillis(admin); - assertEquals(3, brokerPartitionRetentionMillis.size()); - } -} diff --git a/app/src/test/java/org/astraea/app/cost/topic/PartitionScoreTest.java b/app/src/test/java/org/astraea/app/cost/topic/PartitionScoreTest.java deleted file mode 100644 index 3298c59197..0000000000 --- a/app/src/test/java/org/astraea/app/cost/topic/PartitionScoreTest.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.app.cost.topic; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import java.time.Duration; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import org.astraea.app.admin.Admin; -import org.astraea.app.common.Utils; -import org.astraea.app.producer.Producer; -import org.astraea.app.producer.Serializer; -import org.astraea.app.service.RequireBrokerCluster; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; - -public class PartitionScoreTest extends RequireBrokerCluster { - static Admin admin; - - @BeforeAll - static void setup() throws ExecutionException, InterruptedException { - admin = Admin.of(bootstrapServers()); - Map topicName = new HashMap<>(); - topicName.put(0, "testPartitionScore0"); - topicName.put(1, "testPartitionScore1"); - topicName.put(2, "__consumer_offsets"); - try (var admin = Admin.of(bootstrapServers())) { - admin - .creator() - .topic(topicName.get(0)) - .numberOfPartitions(4) - .numberOfReplicas((short) 1) - .create(); - admin - .creator() - .topic(topicName.get(1)) - .numberOfPartitions(4) - .numberOfReplicas((short) 1) - .create(); - admin - .creator() - .topic(topicName.get(2)) - .numberOfPartitions(4) - .numberOfReplicas((short) 1) - .create(); - // wait for topic - Utils.sleep(Duration.ofSeconds(5)); - } - var producer = - Producer.builder() - .bootstrapServers(bootstrapServers()) - .keySerializer(Serializer.STRING) - .build(); - int size = 10000; - for (int t = 0; t <= 1; t++) { - for (int p = 0; p <= 3; p++) { - producer - .sender() - .topic(topicName.get(t)) - .partition(p) - .value(new byte[size]) - .run() - .toCompletableFuture() - .get(); - } - size += 10000; - } - producer.close(); - } - - @Test - void testGetScore() { - PartitionScore.Argument argument = new PartitionScore.Argument(); - argument.excludeInternalTopic = false; - var score = PartitionScore.execute(argument, admin); - assertEquals(3, score.size()); - assertEquals(3 * 4, score.get(0).size() + score.get(1).size() + score.get(2).size()); - argument.excludeInternalTopic = true; - score = PartitionScore.execute(argument, admin); - assertEquals(3, score.size()); - assertEquals(2 * 4, score.get(0).size() + score.get(1).size() + score.get(2).size()); - } -}