From 64ca6c219ca9f4cb00c2726c5542b7590653af0e Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Tue, 9 May 2023 21:16:18 +0800 Subject: [PATCH 1/7] Move files --- .../bench => app/benchmark/balancer}/BalancerBenchmark.java | 2 +- .../bench => app/benchmark/balancer}/CostProfilingImpl.java | 2 +- .../bench => app/benchmark/balancer}/ExperimentBuilderImpl.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) rename app/src/main/java/org/astraea/{balancer/bench => app/benchmark/balancer}/BalancerBenchmark.java (98%) rename app/src/main/java/org/astraea/{balancer/bench => app/benchmark/balancer}/CostProfilingImpl.java (99%) rename app/src/main/java/org/astraea/{balancer/bench => app/benchmark/balancer}/ExperimentBuilderImpl.java (98%) diff --git a/app/src/main/java/org/astraea/balancer/bench/BalancerBenchmark.java b/app/src/main/java/org/astraea/app/benchmark/balancer/BalancerBenchmark.java similarity index 98% rename from app/src/main/java/org/astraea/balancer/bench/BalancerBenchmark.java rename to app/src/main/java/org/astraea/app/benchmark/balancer/BalancerBenchmark.java index de516066da..cd2b1e8d9d 100644 --- a/app/src/main/java/org/astraea/balancer/bench/BalancerBenchmark.java +++ b/app/src/main/java/org/astraea/app/benchmark/balancer/BalancerBenchmark.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.astraea.balancer.bench; +package org.astraea.app.benchmark.balancer; import java.time.Duration; import java.util.Comparator; diff --git a/app/src/main/java/org/astraea/balancer/bench/CostProfilingImpl.java b/app/src/main/java/org/astraea/app/benchmark/balancer/CostProfilingImpl.java similarity index 99% rename from app/src/main/java/org/astraea/balancer/bench/CostProfilingImpl.java rename to app/src/main/java/org/astraea/app/benchmark/balancer/CostProfilingImpl.java index c9ecfedfa2..6ba9bc6329 100644 --- a/app/src/main/java/org/astraea/balancer/bench/CostProfilingImpl.java +++ b/app/src/main/java/org/astraea/app/benchmark/balancer/CostProfilingImpl.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.astraea.balancer.bench; +package org.astraea.app.benchmark.balancer; import java.time.Duration; import java.util.Collections; diff --git a/app/src/main/java/org/astraea/balancer/bench/ExperimentBuilderImpl.java b/app/src/main/java/org/astraea/app/benchmark/balancer/ExperimentBuilderImpl.java similarity index 98% rename from app/src/main/java/org/astraea/balancer/bench/ExperimentBuilderImpl.java rename to app/src/main/java/org/astraea/app/benchmark/balancer/ExperimentBuilderImpl.java index 05c3ba3d46..d62d1c32ed 100644 --- a/app/src/main/java/org/astraea/balancer/bench/ExperimentBuilderImpl.java +++ b/app/src/main/java/org/astraea/app/benchmark/balancer/ExperimentBuilderImpl.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.astraea.balancer.bench; +package org.astraea.app.benchmark.balancer; import java.time.Duration; import java.util.Set; From 549ea30aba5aeff818757eac7c80fad01020b036 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Tue, 9 May 2023 21:52:32 +0800 Subject: [PATCH 2/7] [BALANCER] Implementation of `BalancerBenchmarkApp` --- app/src/main/java/org/astraea/app/App.java | 5 +- .../app/benchmark/BalancerBenchmarkApp.java | 503 ++++++++++++++++++ 2 files changed, 507 insertions(+), 1 deletion(-) create mode 100644 app/src/main/java/org/astraea/app/benchmark/BalancerBenchmarkApp.java diff --git a/app/src/main/java/org/astraea/app/App.java b/app/src/main/java/org/astraea/app/App.java index 481501bfff..9acbf8e44f 100644 --- a/app/src/main/java/org/astraea/app/App.java +++ b/app/src/main/java/org/astraea/app/App.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import org.astraea.app.automation.Automation; +import org.astraea.app.benchmark.BalancerBenchmarkApp; import org.astraea.app.performance.Performance; import org.astraea.app.publisher.MetricPublisher; import org.astraea.app.version.Version; @@ -39,7 +40,9 @@ public class App { "version", Version.class, "metric_publisher", - MetricPublisher.class); + MetricPublisher.class, + "balancer_benchmark", + BalancerBenchmarkApp.class); static void execute(Map> mains, List args) throws Throwable { diff --git a/app/src/main/java/org/astraea/app/benchmark/BalancerBenchmarkApp.java b/app/src/main/java/org/astraea/app/benchmark/BalancerBenchmarkApp.java new file mode 100644 index 0000000000..e5441c2534 --- /dev/null +++ b/app/src/main/java/org/astraea/app/benchmark/BalancerBenchmarkApp.java @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.benchmark; + +import com.beust.jcommander.Parameter; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.astraea.app.argument.Argument; +import org.astraea.app.argument.PathField; +import org.astraea.app.argument.PositiveIntegerField; +import org.astraea.app.benchmark.balancer.BalancerBenchmark; +import org.astraea.common.Configuration; +import org.astraea.common.Utils; +import org.astraea.common.VersionUtils; +import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.balancer.Balancer; +import org.astraea.common.balancer.BalancerProblemFormat; +import org.astraea.common.cost.ClusterCost; +import org.astraea.common.json.JsonConverter; +import org.astraea.common.json.TypeRef; +import org.astraea.common.metrics.BeanObject; +import org.astraea.common.metrics.ClusterBean; +import org.astraea.common.metrics.HasBeanObject; + +public class BalancerBenchmarkApp { + + public void execute(String[] args) { + var name = args.length > 0 ? args[0] : "help"; + var arguments = Arrays.stream(args).skip(1).toArray(String[]::new); + var benchmarks = + Map.of( + "experiment", + () -> runExperiment(Argument.parse(new ExperimentArgument(), arguments)), + "cost_profiling", + () -> runCostProfiling(Argument.parse(new CostProfilingArgument(), arguments))); + + Runnable help = + () -> { + if (!name.equalsIgnoreCase("help")) System.out.println("Unknown benchmark name: " + name); + System.out.printf("Usage: %s [args ...]%n", benchmarks.keySet()); + }; + + // try to run the specified benchmark or cry for help + benchmarks.getOrDefault(name, help).run(); + } + + void runExperiment(ExperimentArgument argument) { + var cluster = argument.fetchClusterInfo(); + var beans = argument.fetchClusterBean(); + var problem = argument.fetchBalancerProblem(); + var balancer = + Utils.construct( + (Class) Utils.packException(() -> Class.forName(problem.balancer)), + Configuration.of(problem.balancerConfig)); + + System.out.println(optimizationSummary(argument, cluster, beans, problem)); + + var result = + BalancerBenchmark.experiment() + .setBalancer(balancer) + .setClusterInfo(cluster) + .setClusterBean(beans) + .setAlgorithmConfig(problem.parse()) + .setExperimentTrials(argument.trials) + .setExecutionTimeout(problem.timeout) + .start() + .toCompletableFuture() + .join(); + + System.out.println(experimentSummary(result)); + } + + void runCostProfiling(CostProfilingArgument argument) { + var cluster = argument.fetchClusterInfo(); + var beans = argument.fetchClusterBean(); + var problem = argument.fetchBalancerProblem(); + var balancer = + Utils.construct( + (Class) Utils.packException(() -> Class.forName(problem.balancer)), + Configuration.of(problem.balancerConfig)); + + System.out.println(optimizationSummary(argument, cluster, beans, problem)); + + var result = + BalancerBenchmark.costProfiling() + .setBalancer(balancer) + .setClusterInfo(cluster) + .setClusterBean(beans) + .setAlgorithmConfig(problem.parse()) + .setExecutionTimeout(problem.timeout) + .start() + .toCompletableFuture() + .join(); + + System.out.println(costProfilingSummary(argument, result)); + } + + private String optimizationSummary( + CommonArgument arg, ClusterInfo info, ClusterBean bean, BalancerProblemFormat optimization) { + var format = + """ + Balancer Benchmark + =============================== + + * Version: %s + * Build Time: %s + * Revision: %s + * Author: %s + + ## Balancing Problem + + ```json + %s + ``` + + * Execution: %s + * Balancer: %s + * Balancer Configuration: + %s + * Cluster Cost Function: %s + * Move Cost Function: %s + * Cost Function Configuration: + %s + + ## ClusterInfo Summary + + * ClusterId: %s + * Topics: %d + * Partition: %d + * Replicas: %d + * Broker Count: %d + + ## ClusterBean Summary + + * Total Metrics: %d + * Avg Metrics Per Broker: %f + * Broker Count: %d + * Metrics Start From: %s + * Metrics End at: %s + * Recorded Duration: %s + + """; + + var metricStart = + bean.all().values().stream() + .flatMap(Collection::stream) + .map(HasBeanObject::beanObject) + .mapToLong(BeanObject::createdTimestamp) + .min() + .stream() + .mapToObj( + time -> + ZonedDateTime.ofInstant(Instant.ofEpochMilli(time), ZoneId.systemDefault()) + .toLocalDateTime()) + .findFirst() + .orElse(null); + var metricEnd = + bean.all().values().stream() + .flatMap(Collection::stream) + .map(HasBeanObject::beanObject) + .mapToLong(BeanObject::createdTimestamp) + .max() + .stream() + .mapToObj( + time -> + ZonedDateTime.ofInstant(Instant.ofEpochMilli(time), ZoneId.systemDefault()) + .toLocalDateTime()) + .findFirst() + .orElse(null); + var duration = + metricStart != null && metricEnd != null + ? Duration.between(metricStart, metricEnd) + : Duration.ZERO; + + return String.format( + format, + // astraeae version + VersionUtils.VERSION, + VersionUtils.DATE, + VersionUtils.REVISION, + VersionUtils.BUILDER, + // Balancer Problem Summary + arg.fetchBalancerProblemJson(), + optimization.timeout, + optimization.balancer, + Optional.of( + optimization.balancerConfig.entrySet().stream() + .map(e -> String.format(" * \"%s\": %s", e.getKey(), e.getValue())) + .collect(Collectors.joining(System.lineSeparator()))) + .filter(Predicate.not(String::isEmpty)) + .orElse(" * no config"), + optimization.parse().clusterCostFunction().toString(), + optimization.parse().moveCostFunction().toString(), + Optional.of( + optimization.costConfig.entrySet().stream() + .map(e -> String.format(" * \"%s\": %s", e.getKey(), e.getValue())) + .collect(Collectors.joining(System.lineSeparator()))) + .filter(Predicate.not(String::isEmpty)) + .orElse(" * no config"), + // ClusterInfo Summary + info.clusterId(), + info.topicNames().size(), + info.topicPartitions().size(), + info.replicas().size(), + info.brokers().size(), + // ClusterBean Summary + bean.all().values().stream().mapToInt(Collection::size).sum(), + (double) bean.all().values().stream().mapToInt(Collection::size).sum() + / bean.brokerIds().size(), + bean.brokerIds().size(), + metricStart, + metricEnd, + duration); + } + + private String experimentSummary(BalancerBenchmark.ExperimentResult result) { + var format = + """ + Balancer Experiment Result + =============================== + + * Attempted Trials: %d + * Solution Found Trials: %d + * No Solution Found Trials: %d + + ## ClusterCost Detail + + * Initial ClusterCost: %f + > %s + * Best ClusterCost: %s + > %s + + ## Statistics + + * Initial Cost: %d + * Min Cost: %s + * Average Cost: %s + * Max Cost: %s + * Cost Variance: %s + + ## All Cost Values + + ``` + %s + ``` + + """; + + var count = result.costSummary().getCount(); + + return String.format( + format, + // Trials + result.trials(), + result.costs().size(), + result.trials() - result.costs().size(), + // Cost Detail + result.initial(), + result.initial().value(), + result.bestCost().map(Object::toString).orElse("no usable solution found"), + result + .bestCost() + .map(ClusterCost::value) + .map(Object::toString) + .orElse("no usable solution found"), + // Cost Statistics + result.initial().value(), + count > 0 ? result.costSummary().getMin() : -1, + count > 0 ? result.costSummary().getAverage() : -1, + count > 0 ? result.costSummary().getMax() : -1, + result.variance().orElse(-1), + // All values + result.costs().stream() + .mapToDouble(ClusterCost::value) + .sorted() + .mapToObj(Double::toString) + .collect(Collectors.joining(System.lineSeparator()))); + } + + private String costProfilingSummary( + CostProfilingArgument arg, BalancerBenchmark.CostProfilingResult result) { + var format = + """ + Balancer Cost Profiling Result + =============================== + + * Initial Cost Value: %f + > %s + + * Best Cost Value: %s + > %s + + ## Runtime Statistics + + * Execution Time: %s + * Average Iteration Time: %.3f ms + * Average Balancer Operation Time: %.3f ms + * Average ClusterCost Processing Time: %.3f ms + * Average MoveCost Processing Time: %.3f ms + * Total ClusterCost Evaluation: %d + * Total MoveCost Evaluation: %d + + ## Detail + + * Cost Profiling Result (ClusterCost Only) in CSV: %s + * Cost Profiling Result (All) in CSV: %s + """; + + // use the move cost evaluation count as the number of iteration(an optimization attempt) been + // performed. we are not using cluster cost since some balancer implementation won't perform + // cluster cost evaluation if it knows the solution is infeasible. + var iterations = result.moveCostProcessingTimeNs().getCount(); + var time = System.currentTimeMillis(); + var randomName = Utils.randomString(4); + + var csvClusterCostFilename = "cost-profiling-" + time + "-" + randomName + ".csv"; + var csv0 = Path.of(arg.exportFolder.toAbsolutePath().toString(), csvClusterCostFilename); + exportCsv( + csv0, + result.costTimeSeries().entrySet().stream() + .sorted(Map.Entry.comparingByKey(Comparator.comparingLong(x -> x))) + .map(e -> List.of(e.getKey(), e.getValue().value()))); + + var csvVerboseFilename = "cost-profiling-" + time + "-" + randomName + "-verbose.csv"; + var csv1 = Path.of(arg.exportFolder.toAbsolutePath().toString(), csvVerboseFilename); + exportCsv( + csv1, + Stream.concat( + result.costTimeSeries().entrySet().stream() + // time, cluster-cost-value, move-cost-overflow, cluster-cost, move-cost + .map(e -> List.of(e.getKey(), e.getValue().value(), "", e.getValue(), "")), + result.moveCostTimeSeries().entrySet().stream() + // time, cluster-cost-value, move-cost-overflow, cluster-cost, move-cost + .map( + e -> + List.of( + e.getKey(), "", e.getValue().overflow() ? 1 : 0, "", e.getValue()))) + .sorted(Comparator.comparingLong(x -> (long) x.get(0)))); + + return String.format( + format, + // summary + result.initial().value(), + result.initial(), + result + .plan() + .map(Balancer.Plan::proposalClusterCost) + .map(ClusterCost::value) + .map(Object::toString) + .orElse("no solution found"), + result + .plan() + .map(Balancer.Plan::proposalClusterCost) + .map(Object::toString) + .orElse("no solution found"), + // runtime statistics + result.executionTime(), + result.executionTime().dividedBy(iterations).toNanos() / 1e6, + result + .executionTime() + .minusNanos(result.clusterCostProcessingTimeNs().getSum()) + .minusNanos(result.moveCostProcessingTimeNs().getSum()) + .dividedBy(iterations) + .toNanos() + / 1e6, + result.clusterCostProcessingTimeNs().getAverage() / 1e6, + result.moveCostProcessingTimeNs().getAverage() / 1e6, + result.clusterCostProcessingTimeNs().getCount(), + result.moveCostProcessingTimeNs().getCount(), + // details + csv0, + csv1); + } + + static void exportCsv(Path location, Stream> timeSeries) { + try (var writer = Utils.packException(() -> Files.newBufferedWriter(location))) { + var iterator = timeSeries.iterator(); + + while (iterator.hasNext()) { + boolean commas = false; + for (var item : iterator.next()) { + if (commas) writer.write(","); + var s = item.toString(); + // deal with commas inside the field according to RFC 4180 + if (s.contains(",")) { + // the field should be wrapped by double-quotes + writer.write("\""); + // any double-quote inside the field will be covert into two double-quotes + writer.write(s.replace("\"", "\"\"")); + // the field should be wrapped by double-quotes + writer.write("\""); + } else { + writer.write(s); + } + commas = true; + } + writer.newLine(); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public static void main(String[] args) { + new BalancerBenchmarkApp().execute(args); + } + + static class CommonArgument { + @Parameter( + names = {"--cluster.info"}, + description = "String: path to the serialized cluster info file", + required = true, + converter = PathField.class) + Path serializedClusterInfo; + + @Parameter( + names = {"--cluster.bean"}, + description = "String: path to the serialized cluster bean file", + required = true, + converter = PathField.class) + Path serializedClusterBean; + + @Parameter( + names = {"--optimization.config"}, + description = + "String: path to the json file containing the optimization problem definition.", + required = true, + converter = PathField.class) + Path optimizationConfig; + + ClusterInfo fetchClusterInfo() { + try (var reader = Files.newInputStream(serializedClusterInfo)) { + throw new UnsupportedOperationException(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + ClusterBean fetchClusterBean() { + try (var reader = Files.newInputStream(serializedClusterBean)) { + throw new UnsupportedOperationException(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + String fetchBalancerProblemJson() { + var bytes = Utils.packException(() -> Files.readAllBytes(optimizationConfig)); + return new String(bytes); + } + + BalancerProblemFormat fetchBalancerProblem() { + return JsonConverter.defaultConverter() + .fromJson(fetchBalancerProblemJson(), TypeRef.of(BalancerProblemFormat.class)); + } + } + + static class ExperimentArgument extends CommonArgument { + @Parameter( + names = {"--trials"}, + description = "Integer: the number of experiments to perform.", + required = true, + converter = PositiveIntegerField.class) + int trials; + } + + static class CostProfilingArgument extends CommonArgument { + @Parameter( + names = {"--export.folder"}, + description = "String: the directory to store experiment result.", + converter = PathField.class) + Path exportFolder = Path.of(System.getProperty("java.io.tmpdir")); + } +} From eb93f0c6ea1f866f08660357b73abfad4e52b89e Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Sat, 13 May 2023 11:49:12 +0800 Subject: [PATCH 3/7] Unit Test --- .../app/benchmark/BalancerBenchmarkApp.java | 17 +- .../benchmark/BalancerBenchmarkAppTest.java | 168 ++++++++++++++++++ 2 files changed, 177 insertions(+), 8 deletions(-) create mode 100644 app/src/test/java/org/astraea/app/benchmark/BalancerBenchmarkAppTest.java diff --git a/app/src/main/java/org/astraea/app/benchmark/BalancerBenchmarkApp.java b/app/src/main/java/org/astraea/app/benchmark/BalancerBenchmarkApp.java index e5441c2534..9fd3b1b7b4 100644 --- a/app/src/main/java/org/astraea/app/benchmark/BalancerBenchmarkApp.java +++ b/app/src/main/java/org/astraea/app/benchmark/BalancerBenchmarkApp.java @@ -65,8 +65,9 @@ public void execute(String[] args) { Runnable help = () -> { - if (!name.equalsIgnoreCase("help")) System.out.println("Unknown benchmark name: " + name); System.out.printf("Usage: %s [args ...]%n", benchmarks.keySet()); + if (!name.equalsIgnoreCase("help")) + throw new IllegalArgumentException("Unknown benchmark name: " + name); }; // try to run the specified benchmark or cry for help @@ -203,7 +204,7 @@ private String optimizationSummary( return String.format( format, - // astraeae version + // astraea version VersionUtils.VERSION, VersionUtils.DATE, VersionUtils.REVISION, @@ -237,8 +238,8 @@ private String optimizationSummary( (double) bean.all().values().stream().mapToInt(Collection::size).sum() / bean.brokerIds().size(), bean.brokerIds().size(), - metricStart, - metricEnd, + metricStart != null ? metricStart : "no metric", + metricEnd != null ? metricEnd : "no metric", duration); } @@ -261,7 +262,7 @@ private String experimentSummary(BalancerBenchmark.ExperimentResult result) { ## Statistics - * Initial Cost: %d + * Initial Cost: %f * Min Cost: %s * Average Cost: %s * Max Cost: %s @@ -284,14 +285,14 @@ private String experimentSummary(BalancerBenchmark.ExperimentResult result) { result.costs().size(), result.trials() - result.costs().size(), // Cost Detail - result.initial(), result.initial().value(), - result.bestCost().map(Object::toString).orElse("no usable solution found"), + result.initial(), result .bestCost() .map(ClusterCost::value) .map(Object::toString) .orElse("no usable solution found"), + result.bestCost().map(Object::toString).orElse("no usable solution found"), // Cost Statistics result.initial().value(), count > 0 ? result.costSummary().getMin() : -1, @@ -338,7 +339,7 @@ private String costProfilingSummary( // use the move cost evaluation count as the number of iteration(an optimization attempt) been // performed. we are not using cluster cost since some balancer implementation won't perform // cluster cost evaluation if it knows the solution is infeasible. - var iterations = result.moveCostProcessingTimeNs().getCount(); + var iterations = Math.max(1, result.moveCostProcessingTimeNs().getCount()); var time = System.currentTimeMillis(); var randomName = Utils.randomString(4); diff --git a/app/src/test/java/org/astraea/app/benchmark/BalancerBenchmarkAppTest.java b/app/src/test/java/org/astraea/app/benchmark/BalancerBenchmarkAppTest.java new file mode 100644 index 0000000000..9953c801c8 --- /dev/null +++ b/app/src/test/java/org/astraea/app/benchmark/BalancerBenchmarkAppTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.benchmark; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ThreadLocalRandom; +import java.util.regex.Pattern; +import org.astraea.common.Configuration; +import org.astraea.common.VersionUtils; +import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.balancer.AlgorithmConfig; +import org.astraea.common.balancer.Balancer; +import org.astraea.common.balancer.BalancerProblemFormat; +import org.astraea.common.cost.ClusterCost; +import org.astraea.common.cost.ReplicaLeaderCost; +import org.astraea.common.metrics.ClusterBean; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class BalancerBenchmarkAppTest { + + private PrintStream original; + private ByteArrayOutputStream output = new ByteArrayOutputStream(); + + @BeforeEach + void setOutput() { + original = System.out; + System.setOut(new PrintStream(output)); + } + + @AfterEach + void recoverOutput() { + System.setOut(original); + } + + @Test + void testExecuteHelp() { + var help = new String[] {"help"}; + var noBench = new String[] {"no_such"}; + Assertions.assertDoesNotThrow(() -> new BalancerBenchmarkApp().execute(help)); + Assertions.assertThrows( + IllegalArgumentException.class, () -> new BalancerBenchmarkApp().execute(noBench)); + } + + @Test + void testExecuteExperiment() { + var args = + new BalancerBenchmarkApp.ExperimentArgument() { + + @Override + ClusterInfo fetchClusterInfo() { + return ClusterInfo.empty(); + } + + @Override + ClusterBean fetchClusterBean() { + return ClusterBean.EMPTY; + } + + @Override + String fetchBalancerProblemJson() { + return "BALANCER_PROBLEM"; + } + + @Override + BalancerProblemFormat fetchBalancerProblem() { + var bpf = new BalancerProblemFormat(); + bpf.balancer = NoOpBalancer.class.getName(); + bpf.clusterCosts = List.of(costWeight(ReplicaLeaderCost.class.getName(), 1)); + return bpf; + } + }; + args.trials = 1; + new BalancerBenchmarkApp().runExperiment(args); + + var stdout = output.toString(); + Assertions.assertTrue(stdout.contains("Version: " + VersionUtils.VERSION)); + Assertions.assertTrue(stdout.contains("Balancer: " + NoOpBalancer.class.getName())); + Assertions.assertTrue(stdout.contains("BALANCER_PROBLEM")); + Assertions.assertTrue(stdout.contains("Attempted Trials: 1")); + Assertions.assertTrue(stdout.contains("MOCKED_RESULT")); + } + + @Test + void testExecuteCostProfiling() { + var args = + new BalancerBenchmarkApp.CostProfilingArgument() { + + @Override + ClusterInfo fetchClusterInfo() { + return ClusterInfo.empty(); + } + + @Override + ClusterBean fetchClusterBean() { + return ClusterBean.EMPTY; + } + + @Override + String fetchBalancerProblemJson() { + return "BALANCER_PROBLEM"; + } + + @Override + BalancerProblemFormat fetchBalancerProblem() { + var bpf = new BalancerProblemFormat(); + bpf.balancer = NoOpBalancer.class.getName(); + bpf.clusterCosts = List.of(costWeight(ReplicaLeaderCost.class.getName(), 1)); + return bpf; + } + }; + new BalancerBenchmarkApp().runCostProfiling(args); + + var stdout = output.toString(); + Assertions.assertTrue(stdout.contains("Version: " + VersionUtils.VERSION)); + Assertions.assertTrue(stdout.contains("Balancer: " + NoOpBalancer.class.getName())); + Assertions.assertTrue(stdout.contains("BALANCER_PROBLEM")); + Assertions.assertTrue(stdout.contains("MOCKED_RESULT")); + + var matcher = Pattern.compile(": (.+\\.csv)").matcher(stdout); + Assertions.assertTrue(matcher.find()); + Assertions.assertTrue(Files.exists(Path.of(matcher.group(1)))); + Assertions.assertTrue(matcher.find()); + Assertions.assertTrue(Files.exists(Path.of(matcher.group(1)))); + } + + private static BalancerProblemFormat.CostWeight costWeight(String cost, double weight) { + var cw = new BalancerProblemFormat.CostWeight(); + cw.cost = cost; + cw.weight = weight; + return cw; + } + + private static class NoOpBalancer implements Balancer { + public NoOpBalancer(Configuration config) {} + + @Override + public Optional offer(AlgorithmConfig config) { + return Optional.of( + new Plan( + config.clusterInfo(), + config.clusterCostFunction().clusterCost(config.clusterInfo(), config.clusterBean()), + config.clusterInfo(), + ClusterCost.of(ThreadLocalRandom.current().nextDouble(), () -> "MOCKED_RESULT"))); + } + } +} From f7fffee03ac5a562802e0da3c6a1c4a7d4b8fcf8 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Sat, 13 May 2023 12:14:25 +0800 Subject: [PATCH 4/7] Revise --- .../org/astraea/app/benchmark/BalancerBenchmarkApp.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/app/src/main/java/org/astraea/app/benchmark/BalancerBenchmarkApp.java b/app/src/main/java/org/astraea/app/benchmark/BalancerBenchmarkApp.java index 9fd3b1b7b4..88d75ad708 100644 --- a/app/src/main/java/org/astraea/app/benchmark/BalancerBenchmarkApp.java +++ b/app/src/main/java/org/astraea/app/benchmark/BalancerBenchmarkApp.java @@ -83,7 +83,8 @@ void runExperiment(ExperimentArgument argument) { (Class) Utils.packException(() -> Class.forName(problem.balancer)), Configuration.of(problem.balancerConfig)); - System.out.println(optimizationSummary(argument, cluster, beans, problem)); + System.out.println("Running Experiment..."); + System.out.println(); var result = BalancerBenchmark.experiment() @@ -97,6 +98,7 @@ void runExperiment(ExperimentArgument argument) { .toCompletableFuture() .join(); + System.out.println(optimizationSummary(argument, cluster, beans, problem)); System.out.println(experimentSummary(result)); } @@ -109,7 +111,8 @@ void runCostProfiling(CostProfilingArgument argument) { (Class) Utils.packException(() -> Class.forName(problem.balancer)), Configuration.of(problem.balancerConfig)); - System.out.println(optimizationSummary(argument, cluster, beans, problem)); + System.out.println("Running CostProfiling..."); + System.out.println(); var result = BalancerBenchmark.costProfiling() @@ -122,6 +125,7 @@ void runCostProfiling(CostProfilingArgument argument) { .toCompletableFuture() .join(); + System.out.println(optimizationSummary(argument, cluster, beans, problem)); System.out.println(costProfilingSummary(argument, result)); } From 9d48a9214a6795f065266623d5162cda012a9e82 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Tue, 23 May 2023 16:51:30 +0800 Subject: [PATCH 5/7] Fix merge --- .../java/org/astraea/app/benchmark/BalancerBenchmarkApp.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/src/main/java/org/astraea/app/benchmark/BalancerBenchmarkApp.java b/app/src/main/java/org/astraea/app/benchmark/BalancerBenchmarkApp.java index 88d75ad708..756449ff75 100644 --- a/app/src/main/java/org/astraea/app/benchmark/BalancerBenchmarkApp.java +++ b/app/src/main/java/org/astraea/app/benchmark/BalancerBenchmarkApp.java @@ -81,7 +81,7 @@ void runExperiment(ExperimentArgument argument) { var balancer = Utils.construct( (Class) Utils.packException(() -> Class.forName(problem.balancer)), - Configuration.of(problem.balancerConfig)); + new Configuration(problem.balancerConfig)); System.out.println("Running Experiment..."); System.out.println(); @@ -109,7 +109,7 @@ void runCostProfiling(CostProfilingArgument argument) { var balancer = Utils.construct( (Class) Utils.packException(() -> Class.forName(problem.balancer)), - Configuration.of(problem.balancerConfig)); + new Configuration(problem.balancerConfig)); System.out.println("Running CostProfiling..."); System.out.println(); From d750e53b56b181dbc3af77ce1db58222c4e42924 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Tue, 23 May 2023 16:54:12 +0800 Subject: [PATCH 6/7] Use Java 15 `String::formatted` method --- .../app/benchmark/BalancerBenchmarkApp.java | 326 +++++++++--------- 1 file changed, 160 insertions(+), 166 deletions(-) diff --git a/app/src/main/java/org/astraea/app/benchmark/BalancerBenchmarkApp.java b/app/src/main/java/org/astraea/app/benchmark/BalancerBenchmarkApp.java index 756449ff75..567e7a2d77 100644 --- a/app/src/main/java/org/astraea/app/benchmark/BalancerBenchmarkApp.java +++ b/app/src/main/java/org/astraea/app/benchmark/BalancerBenchmarkApp.java @@ -131,8 +131,39 @@ void runCostProfiling(CostProfilingArgument argument) { private String optimizationSummary( CommonArgument arg, ClusterInfo info, ClusterBean bean, BalancerProblemFormat optimization) { - var format = - """ + + var metricStart = + bean.all().values().stream() + .flatMap(Collection::stream) + .map(HasBeanObject::beanObject) + .mapToLong(BeanObject::createdTimestamp) + .min() + .stream() + .mapToObj( + time -> + ZonedDateTime.ofInstant(Instant.ofEpochMilli(time), ZoneId.systemDefault()) + .toLocalDateTime()) + .findFirst() + .orElse(null); + var metricEnd = + bean.all().values().stream() + .flatMap(Collection::stream) + .map(HasBeanObject::beanObject) + .mapToLong(BeanObject::createdTimestamp) + .max() + .stream() + .mapToObj( + time -> + ZonedDateTime.ofInstant(Instant.ofEpochMilli(time), ZoneId.systemDefault()) + .toLocalDateTime()) + .findFirst() + .orElse(null); + var duration = + metricStart != null && metricEnd != null + ? Duration.between(metricStart, metricEnd) + : Duration.ZERO; + + return """ Balancer Benchmark =============================== @@ -173,83 +204,52 @@ private String optimizationSummary( * Metrics End at: %s * Recorded Duration: %s - """; - - var metricStart = - bean.all().values().stream() - .flatMap(Collection::stream) - .map(HasBeanObject::beanObject) - .mapToLong(BeanObject::createdTimestamp) - .min() - .stream() - .mapToObj( - time -> - ZonedDateTime.ofInstant(Instant.ofEpochMilli(time), ZoneId.systemDefault()) - .toLocalDateTime()) - .findFirst() - .orElse(null); - var metricEnd = - bean.all().values().stream() - .flatMap(Collection::stream) - .map(HasBeanObject::beanObject) - .mapToLong(BeanObject::createdTimestamp) - .max() - .stream() - .mapToObj( - time -> - ZonedDateTime.ofInstant(Instant.ofEpochMilli(time), ZoneId.systemDefault()) - .toLocalDateTime()) - .findFirst() - .orElse(null); - var duration = - metricStart != null && metricEnd != null - ? Duration.between(metricStart, metricEnd) - : Duration.ZERO; - - return String.format( - format, - // astraea version - VersionUtils.VERSION, - VersionUtils.DATE, - VersionUtils.REVISION, - VersionUtils.BUILDER, - // Balancer Problem Summary - arg.fetchBalancerProblemJson(), - optimization.timeout, - optimization.balancer, - Optional.of( - optimization.balancerConfig.entrySet().stream() - .map(e -> String.format(" * \"%s\": %s", e.getKey(), e.getValue())) - .collect(Collectors.joining(System.lineSeparator()))) - .filter(Predicate.not(String::isEmpty)) - .orElse(" * no config"), - optimization.parse().clusterCostFunction().toString(), - optimization.parse().moveCostFunction().toString(), - Optional.of( - optimization.costConfig.entrySet().stream() - .map(e -> String.format(" * \"%s\": %s", e.getKey(), e.getValue())) - .collect(Collectors.joining(System.lineSeparator()))) - .filter(Predicate.not(String::isEmpty)) - .orElse(" * no config"), - // ClusterInfo Summary - info.clusterId(), - info.topicNames().size(), - info.topicPartitions().size(), - info.replicas().size(), - info.brokers().size(), - // ClusterBean Summary - bean.all().values().stream().mapToInt(Collection::size).sum(), - (double) bean.all().values().stream().mapToInt(Collection::size).sum() - / bean.brokerIds().size(), - bean.brokerIds().size(), - metricStart != null ? metricStart : "no metric", - metricEnd != null ? metricEnd : "no metric", - duration); + """ + .formatted( + // astraea version + VersionUtils.VERSION, + VersionUtils.DATE, + VersionUtils.REVISION, + VersionUtils.BUILDER, + // Balancer Problem Summary + arg.fetchBalancerProblemJson(), + optimization.timeout, + optimization.balancer, + Optional.of( + optimization.balancerConfig.entrySet().stream() + .map(e -> String.format(" * \"%s\": %s", e.getKey(), e.getValue())) + .collect(Collectors.joining(System.lineSeparator()))) + .filter(Predicate.not(String::isEmpty)) + .orElse(" * no config"), + optimization.parse().clusterCostFunction().toString(), + optimization.parse().moveCostFunction().toString(), + Optional.of( + optimization.costConfig.entrySet().stream() + .map(e -> String.format(" * \"%s\": %s", e.getKey(), e.getValue())) + .collect(Collectors.joining(System.lineSeparator()))) + .filter(Predicate.not(String::isEmpty)) + .orElse(" * no config"), + // ClusterInfo Summary + info.clusterId(), + info.topicNames().size(), + info.topicPartitions().size(), + info.replicas().size(), + info.brokers().size(), + // ClusterBean Summary + bean.all().values().stream().mapToInt(Collection::size).sum(), + (double) bean.all().values().stream().mapToInt(Collection::size).sum() + / bean.brokerIds().size(), + bean.brokerIds().size(), + metricStart != null ? metricStart : "no metric", + metricEnd != null ? metricEnd : "no metric", + duration); } private String experimentSummary(BalancerBenchmark.ExperimentResult result) { - var format = - """ + + var count = result.costSummary().getCount(); + + return """ Balancer Experiment Result =============================== @@ -278,67 +278,37 @@ private String experimentSummary(BalancerBenchmark.ExperimentResult result) { %s ``` - """; - - var count = result.costSummary().getCount(); - - return String.format( - format, - // Trials - result.trials(), - result.costs().size(), - result.trials() - result.costs().size(), - // Cost Detail - result.initial().value(), - result.initial(), - result - .bestCost() - .map(ClusterCost::value) - .map(Object::toString) - .orElse("no usable solution found"), - result.bestCost().map(Object::toString).orElse("no usable solution found"), - // Cost Statistics - result.initial().value(), - count > 0 ? result.costSummary().getMin() : -1, - count > 0 ? result.costSummary().getAverage() : -1, - count > 0 ? result.costSummary().getMax() : -1, - result.variance().orElse(-1), - // All values - result.costs().stream() - .mapToDouble(ClusterCost::value) - .sorted() - .mapToObj(Double::toString) - .collect(Collectors.joining(System.lineSeparator()))); + """ + .formatted( + // Trials + result.trials(), + result.costs().size(), + result.trials() - result.costs().size(), + // Cost Detail + result.initial().value(), + result.initial(), + result + .bestCost() + .map(ClusterCost::value) + .map(Object::toString) + .orElse("no usable solution found"), + result.bestCost().map(Object::toString).orElse("no usable solution found"), + // Cost Statistics + result.initial().value(), + count > 0 ? result.costSummary().getMin() : -1, + count > 0 ? result.costSummary().getAverage() : -1, + count > 0 ? result.costSummary().getMax() : -1, + result.variance().orElse(-1), + // All values + result.costs().stream() + .mapToDouble(ClusterCost::value) + .sorted() + .mapToObj(Double::toString) + .collect(Collectors.joining(System.lineSeparator()))); } private String costProfilingSummary( CostProfilingArgument arg, BalancerBenchmark.CostProfilingResult result) { - var format = - """ - Balancer Cost Profiling Result - =============================== - - * Initial Cost Value: %f - > %s - - * Best Cost Value: %s - > %s - - ## Runtime Statistics - - * Execution Time: %s - * Average Iteration Time: %.3f ms - * Average Balancer Operation Time: %.3f ms - * Average ClusterCost Processing Time: %.3f ms - * Average MoveCost Processing Time: %.3f ms - * Total ClusterCost Evaluation: %d - * Total MoveCost Evaluation: %d - - ## Detail - - * Cost Profiling Result (ClusterCost Only) in CSV: %s - * Cost Profiling Result (All) in CSV: %s - """; // use the move cost evaluation count as the number of iteration(an optimization attempt) been // performed. we are not using cluster cost since some balancer implementation won't perform @@ -371,39 +341,63 @@ private String costProfilingSummary( e.getKey(), "", e.getValue().overflow() ? 1 : 0, "", e.getValue()))) .sorted(Comparator.comparingLong(x -> (long) x.get(0)))); - return String.format( - format, - // summary - result.initial().value(), - result.initial(), - result - .plan() - .map(Balancer.Plan::proposalClusterCost) - .map(ClusterCost::value) - .map(Object::toString) - .orElse("no solution found"), - result - .plan() - .map(Balancer.Plan::proposalClusterCost) - .map(Object::toString) - .orElse("no solution found"), - // runtime statistics - result.executionTime(), - result.executionTime().dividedBy(iterations).toNanos() / 1e6, - result - .executionTime() - .minusNanos(result.clusterCostProcessingTimeNs().getSum()) - .minusNanos(result.moveCostProcessingTimeNs().getSum()) - .dividedBy(iterations) - .toNanos() - / 1e6, - result.clusterCostProcessingTimeNs().getAverage() / 1e6, - result.moveCostProcessingTimeNs().getAverage() / 1e6, - result.clusterCostProcessingTimeNs().getCount(), - result.moveCostProcessingTimeNs().getCount(), - // details - csv0, - csv1); + return """ + Balancer Cost Profiling Result + =============================== + + * Initial Cost Value: %f + > %s + + * Best Cost Value: %s + > %s + + ## Runtime Statistics + + * Execution Time: %s + * Average Iteration Time: %.3f ms + * Average Balancer Operation Time: %.3f ms + * Average ClusterCost Processing Time: %.3f ms + * Average MoveCost Processing Time: %.3f ms + * Total ClusterCost Evaluation: %d + * Total MoveCost Evaluation: %d + + ## Detail + + * Cost Profiling Result (ClusterCost Only) in CSV: %s + * Cost Profiling Result (All) in CSV: %s + """ + .formatted( + // summary + result.initial().value(), + result.initial(), + result + .plan() + .map(Balancer.Plan::proposalClusterCost) + .map(ClusterCost::value) + .map(Object::toString) + .orElse("no solution found"), + result + .plan() + .map(Balancer.Plan::proposalClusterCost) + .map(Object::toString) + .orElse("no solution found"), + // runtime statistics + result.executionTime(), + result.executionTime().dividedBy(iterations).toNanos() / 1e6, + result + .executionTime() + .minusNanos(result.clusterCostProcessingTimeNs().getSum()) + .minusNanos(result.moveCostProcessingTimeNs().getSum()) + .dividedBy(iterations) + .toNanos() + / 1e6, + result.clusterCostProcessingTimeNs().getAverage() / 1e6, + result.moveCostProcessingTimeNs().getAverage() / 1e6, + result.clusterCostProcessingTimeNs().getCount(), + result.moveCostProcessingTimeNs().getCount(), + // details + csv0, + csv1); } static void exportCsv(Path location, Stream> timeSeries) { From 26cd10677d19270ca6d98c5e4f6a4ff4f64f47b9 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Fri, 2 Jun 2023 17:18:06 +0800 Subject: [PATCH 7/7] Put on deserialization logic --- .../app/benchmark/BalancerBenchmarkApp.java | 68 +++++++++++++++---- .../benchmark/BalancerBenchmarkAppTest.java | 21 ++++-- 2 files changed, 70 insertions(+), 19 deletions(-) diff --git a/app/src/main/java/org/astraea/app/benchmark/BalancerBenchmarkApp.java b/app/src/main/java/org/astraea/app/benchmark/BalancerBenchmarkApp.java index 567e7a2d77..05004673b6 100644 --- a/app/src/main/java/org/astraea/app/benchmark/BalancerBenchmarkApp.java +++ b/app/src/main/java/org/astraea/app/benchmark/BalancerBenchmarkApp.java @@ -38,6 +38,7 @@ import org.astraea.app.argument.PathField; import org.astraea.app.argument.PositiveIntegerField; import org.astraea.app.benchmark.balancer.BalancerBenchmark; +import org.astraea.common.ByteUtils; import org.astraea.common.Configuration; import org.astraea.common.Utils; import org.astraea.common.VersionUtils; @@ -50,11 +51,13 @@ import org.astraea.common.metrics.BeanObject; import org.astraea.common.metrics.ClusterBean; import org.astraea.common.metrics.HasBeanObject; +import org.astraea.common.metrics.collector.MetricSensor; +import org.astraea.common.metrics.collector.MetricStore; public class BalancerBenchmarkApp { public void execute(String[] args) { - var name = args.length > 0 ? args[0] : "help"; + var name = args.length > 0 ? args[0] : ""; var arguments = Arrays.stream(args).skip(1).toArray(String[]::new); var benchmarks = Map.of( @@ -66,7 +69,8 @@ public void execute(String[] args) { Runnable help = () -> { System.out.printf("Usage: %s [args ...]%n", benchmarks.keySet()); - if (!name.equalsIgnoreCase("help")) + if (name.isEmpty()) throw new IllegalArgumentException("No argument specified"); + else if (!name.equalsIgnoreCase("help")) throw new IllegalArgumentException("Unknown benchmark name: " + name); }; @@ -76,12 +80,19 @@ public void execute(String[] args) { void runExperiment(ExperimentArgument argument) { var cluster = argument.fetchClusterInfo(); - var beans = argument.fetchClusterBean(); + var beans = argument.fetchBeanObjects(); var problem = argument.fetchBalancerProblem(); + var configs = problem.parse(); var balancer = Utils.construct( (Class) Utils.packException(() -> Class.forName(problem.balancer)), new Configuration(problem.balancerConfig)); + var clusterBean = + toClusterBean( + List.of( + configs.clusterCostFunction().metricSensor(), + configs.moveCostFunction().metricSensor()), + beans); System.out.println("Running Experiment..."); System.out.println(); @@ -90,26 +101,33 @@ void runExperiment(ExperimentArgument argument) { BalancerBenchmark.experiment() .setBalancer(balancer) .setClusterInfo(cluster) - .setClusterBean(beans) - .setAlgorithmConfig(problem.parse()) + .setClusterBean(clusterBean) + .setAlgorithmConfig(configs) .setExperimentTrials(argument.trials) .setExecutionTimeout(problem.timeout) .start() .toCompletableFuture() .join(); - System.out.println(optimizationSummary(argument, cluster, beans, problem)); + System.out.println(optimizationSummary(argument, cluster, clusterBean, problem)); System.out.println(experimentSummary(result)); } void runCostProfiling(CostProfilingArgument argument) { var cluster = argument.fetchClusterInfo(); - var beans = argument.fetchClusterBean(); + var beans = argument.fetchBeanObjects(); var problem = argument.fetchBalancerProblem(); + var configs = problem.parse(); var balancer = Utils.construct( (Class) Utils.packException(() -> Class.forName(problem.balancer)), new Configuration(problem.balancerConfig)); + var clusterBean = + toClusterBean( + List.of( + configs.clusterCostFunction().metricSensor(), + configs.moveCostFunction().metricSensor()), + beans); System.out.println("Running CostProfiling..."); System.out.println(); @@ -118,14 +136,14 @@ void runCostProfiling(CostProfilingArgument argument) { BalancerBenchmark.costProfiling() .setBalancer(balancer) .setClusterInfo(cluster) - .setClusterBean(beans) + .setClusterBean(clusterBean) .setAlgorithmConfig(problem.parse()) .setExecutionTimeout(problem.timeout) .start() .toCompletableFuture() .join(); - System.out.println(optimizationSummary(argument, cluster, beans, problem)); + System.out.println(optimizationSummary(argument, cluster, clusterBean, problem)); System.out.println(costProfilingSummary(argument, result)); } @@ -429,6 +447,28 @@ static void exportCsv(Path location, Stream> timeSeries) { } } + static ClusterBean toClusterBean( + Collection sensors, Map> beans) { + try (var store = + MetricStore.builder() + .receivers( + List.of( + MetricStore.Receiver.fixed( + beans.entrySet().stream() + .collect( + Collectors.toUnmodifiableMap( + Map.Entry::getKey, Map.Entry::getValue))))) + .sensorsSupplier( + () -> + sensors.stream() + .collect(Collectors.toUnmodifiableMap(x -> x, x -> (id, err) -> {}))) + .build()) { + for (int i = 0; i < 3 && store.clusterBean().all().isEmpty(); i++) + Utils.sleep(Duration.ofSeconds(1)); + return store.clusterBean(); + } + } + public static void main(String[] args) { new BalancerBenchmarkApp().execute(args); } @@ -457,16 +497,16 @@ static class CommonArgument { Path optimizationConfig; ClusterInfo fetchClusterInfo() { - try (var reader = Files.newInputStream(serializedClusterInfo)) { - throw new UnsupportedOperationException(); + try (var stream = Files.newInputStream(serializedClusterInfo)) { + return ByteUtils.readClusterInfo(stream.readAllBytes()); } catch (IOException e) { throw new UncheckedIOException(e); } } - ClusterBean fetchClusterBean() { - try (var reader = Files.newInputStream(serializedClusterBean)) { - throw new UnsupportedOperationException(); + Map> fetchBeanObjects() { + try (var stream = Files.newInputStream(serializedClusterBean)) { + return ByteUtils.readBeanObjects(stream.readAllBytes()); } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/app/src/test/java/org/astraea/app/benchmark/BalancerBenchmarkAppTest.java b/app/src/test/java/org/astraea/app/benchmark/BalancerBenchmarkAppTest.java index 9953c801c8..9e226a357f 100644 --- a/app/src/test/java/org/astraea/app/benchmark/BalancerBenchmarkAppTest.java +++ b/app/src/test/java/org/astraea/app/benchmark/BalancerBenchmarkAppTest.java @@ -21,6 +21,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.ThreadLocalRandom; import java.util.regex.Pattern; @@ -32,7 +33,7 @@ import org.astraea.common.balancer.BalancerProblemFormat; import org.astraea.common.cost.ClusterCost; import org.astraea.common.cost.ReplicaLeaderCost; -import org.astraea.common.metrics.ClusterBean; +import org.astraea.common.metrics.BeanObject; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -74,8 +75,8 @@ ClusterInfo fetchClusterInfo() { } @Override - ClusterBean fetchClusterBean() { - return ClusterBean.EMPTY; + Map> fetchBeanObjects() { + return Map.of(); } @Override @@ -113,8 +114,8 @@ ClusterInfo fetchClusterInfo() { } @Override - ClusterBean fetchClusterBean() { - return ClusterBean.EMPTY; + Map> fetchBeanObjects() { + return Map.of(); } @Override @@ -145,6 +146,15 @@ BalancerProblemFormat fetchBalancerProblem() { Assertions.assertTrue(Files.exists(Path.of(matcher.group(1)))); } + @Test + void testArgument() { + Assertions.assertThrows( + IllegalArgumentException.class, () -> BalancerBenchmarkApp.main(new String[] {})); + Assertions.assertThrows( + IllegalArgumentException.class, () -> BalancerBenchmarkApp.main(new String[] {"Bad"})); + Assertions.assertDoesNotThrow(() -> BalancerBenchmarkApp.main(new String[] {"help"})); + } + private static BalancerProblemFormat.CostWeight costWeight(String cost, double weight) { var cw = new BalancerProblemFormat.CostWeight(); cw.cost = cost; @@ -159,6 +169,7 @@ public NoOpBalancer(Configuration config) {} public Optional offer(AlgorithmConfig config) { return Optional.of( new Plan( + config.clusterBean(), config.clusterInfo(), config.clusterCostFunction().clusterCost(config.clusterInfo(), config.clusterBean()), config.clusterInfo(),