From a544ee768a2982a4869899d16e398fec745b35de Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Thu, 18 Aug 2022 11:56:27 +0800 Subject: [PATCH] Revise `RebalancePlanProposal` (#539) * revise proposal return type remove `Optional` * add javadoc * fix * fix javadoc * ShufflePlanGenerator should return just one plan when unable to propose * fix merge --- .../app/balancer/BalanceProcessDemo.java | 5 +- .../app/balancer/RebalancePlanProposal.java | 29 ++--------- .../generator/RebalancePlanGenerator.java | 14 ++++++ .../generator/ShufflePlanGenerator.java | 50 ++++++++++--------- .../balancer/RebalancePlanProposalTest.java | 27 +--------- .../generator/ShufflePlanGeneratorTest.java | 16 ++++-- 6 files changed, 60 insertions(+), 81 deletions(-) diff --git a/app/src/main/java/org/astraea/app/balancer/BalanceProcessDemo.java b/app/src/main/java/org/astraea/app/balancer/BalanceProcessDemo.java index d83c962ac4..469efa2a4a 100644 --- a/app/src/main/java/org/astraea/app/balancer/BalanceProcessDemo.java +++ b/app/src/main/java/org/astraea/app/balancer/BalanceProcessDemo.java @@ -50,10 +50,7 @@ public static void main(String[] args) { // TODO: implement one interface to select the best plan from many plan ,see #544 var rebalancePlan = rebalancePlanGenerator.generate(clusterInfo).findFirst().orElseThrow(); System.out.println(rebalancePlan); - var targetAllocation = - rebalancePlan - .rebalancePlan() - .orElseThrow(() -> new IllegalStateException("No suitable plan found")); + var targetAllocation = rebalancePlan.rebalancePlan(); System.out.println("[Target Allocation]"); System.out.println(ClusterLogAllocation.toString(targetAllocation)); diff --git a/app/src/main/java/org/astraea/app/balancer/RebalancePlanProposal.java b/app/src/main/java/org/astraea/app/balancer/RebalancePlanProposal.java index 12d5256a85..f8c38e44f5 100644 --- a/app/src/main/java/org/astraea/app/balancer/RebalancePlanProposal.java +++ b/app/src/main/java/org/astraea/app/balancer/RebalancePlanProposal.java @@ -20,12 +20,11 @@ import java.util.Collections; import java.util.List; import java.util.Objects; -import java.util.Optional; import org.astraea.app.balancer.log.ClusterLogAllocation; public interface RebalancePlanProposal { - Optional rebalancePlan(); + ClusterLogAllocation rebalancePlan(); List info(); @@ -40,45 +39,27 @@ class Build { List info = Collections.synchronizedList(new ArrayList<>()); List warnings = Collections.synchronizedList(new ArrayList<>()); - // guard by this - private boolean built = false; - - public synchronized Build noRebalancePlan() { - ensureNotBuiltYet(); - this.allocation = null; - return this; - } - public synchronized Build withRebalancePlan(ClusterLogAllocation clusterLogAllocation) { - ensureNotBuiltYet(); this.allocation = Objects.requireNonNull(clusterLogAllocation); return this; } public synchronized Build addWarning(String warning) { - ensureNotBuiltYet(); this.warnings.add(warning); return this; } public synchronized Build addInfo(String info) { - ensureNotBuiltYet(); this.info.add(info); return this; } - private synchronized void ensureNotBuiltYet() { - if (built) throw new IllegalStateException("This builder already built."); - } - public synchronized RebalancePlanProposal build() { - final var allocationRef = allocation; + final var allocationRef = + Objects.requireNonNull(allocation, () -> "No log allocation specified for this proposal"); final var infoRef = info; final var warningRef = warnings; - ensureNotBuiltYet(); - - built = true; allocation = null; info = null; warnings = null; @@ -86,8 +67,8 @@ public synchronized RebalancePlanProposal build() { return new RebalancePlanProposal() { @Override - public Optional rebalancePlan() { - return Optional.ofNullable(allocationRef); + public ClusterLogAllocation rebalancePlan() { + return allocationRef; } @Override diff --git a/app/src/main/java/org/astraea/app/balancer/generator/RebalancePlanGenerator.java b/app/src/main/java/org/astraea/app/balancer/generator/RebalancePlanGenerator.java index 26c46ca242..0f30dd4625 100644 --- a/app/src/main/java/org/astraea/app/balancer/generator/RebalancePlanGenerator.java +++ b/app/src/main/java/org/astraea/app/balancer/generator/RebalancePlanGenerator.java @@ -29,6 +29,13 @@ public interface RebalancePlanGenerator { * same plan for the same input argument. There can be some randomization that takes part in this * process. * + *

If the generator implementation thinks it can't find any rebalance proposal(which the plan + * might improve the cluster). Then the implementation should return a Stream with exactly one + * rebalance plan proposal in it, where the proposed allocation will be exactly the same as the + * {@code baseAllocation} parameter. This means there is no movement or alteration will occur. And + * the implementation should place some detailed information in the info/warning/error string, to + * indicate the reason for no meaningful plan. + * * @param clusterInfo the cluster state, implementation can take advantage of the data inside to * proposal the plan it feels confident to improve the cluster. * @return a {@link Stream} generating rebalance plan regarding the given {@link ClusterInfo} @@ -42,6 +49,13 @@ default Stream generate(ClusterInfo clusterInfo) { * same plan for the same input argument. There can be some randomization that takes part in this * process. * + *

If the generator implementation thinks it can't find any rebalance proposal(which the plan + * might improve the cluster). Then the implementation should return a Stream with exactly one + * rebalance plan proposal in it, where the proposed allocation will be exactly the same as the + * {@code baseAllocation} parameter. This means there is no movement or alteration that will + * occur. And The implementation should place some detailed information in the info/warning/error + * string, to indicate the reason for no meaningful plan. + * * @param clusterInfo the cluster state, implementation can take advantage of the data inside to * proposal the plan it feels confident to improve the cluster. * @param baseAllocation the cluster log allocation as the based of proposal generation. diff --git a/app/src/main/java/org/astraea/app/balancer/generator/ShufflePlanGenerator.java b/app/src/main/java/org/astraea/app/balancer/generator/ShufflePlanGenerator.java index 0de7f2d778..a098c53c02 100644 --- a/app/src/main/java/org/astraea/app/balancer/generator/ShufflePlanGenerator.java +++ b/app/src/main/java/org/astraea/app/balancer/generator/ShufflePlanGenerator.java @@ -63,32 +63,36 @@ public ShufflePlanGenerator(Supplier numberOfShuffle) { @Override public Stream generate( ClusterInfo clusterInfo, ClusterLogAllocation baseAllocation) { + final var brokerIds = + clusterInfo.nodes().stream().map(NodeInfo::id).collect(Collectors.toUnmodifiableSet()); + + if (brokerIds.size() == 0) { + return Stream.of( + RebalancePlanProposal.builder() + .withRebalancePlan(baseAllocation) + .addWarning("There is no broker") + .build()); + } + + if (brokerIds.size() == 1) { + return Stream.of( + RebalancePlanProposal.builder() + .withRebalancePlan(baseAllocation) + .addWarning("Only one broker exists, unable to do some migration") + .build()); + } + + if (clusterInfo.topics().size() == 0) { + return Stream.of( + RebalancePlanProposal.builder() + .withRebalancePlan(baseAllocation) + .addWarning("No non-ignored topic to working on.") + .build()); + } + return Stream.generate( () -> { final var rebalancePlanBuilder = RebalancePlanProposal.builder(); - final var brokerIds = - clusterInfo.nodes().stream() - .map(NodeInfo::id) - .collect(Collectors.toUnmodifiableSet()); - - if (brokerIds.size() == 0) - return rebalancePlanBuilder - .addWarning("Why there is no broker?") - .noRebalancePlan() - .build(); - - if (brokerIds.size() == 1) - return rebalancePlanBuilder - .addWarning("Only one broker exists. There is no reason to rebalance.") - .noRebalancePlan() - .build(); - - if (clusterInfo.topics().size() == 0) - return rebalancePlanBuilder - .addWarning("No non-ignored topic to working on.") - .noRebalancePlan() - .build(); - final var shuffleCount = numberOfShuffle.get(); rebalancePlanBuilder.addInfo( diff --git a/app/src/test/java/org/astraea/app/balancer/RebalancePlanProposalTest.java b/app/src/test/java/org/astraea/app/balancer/RebalancePlanProposalTest.java index e8e8235215..bd48c226b9 100644 --- a/app/src/test/java/org/astraea/app/balancer/RebalancePlanProposalTest.java +++ b/app/src/test/java/org/astraea/app/balancer/RebalancePlanProposalTest.java @@ -38,8 +38,7 @@ void testBuild() { .addWarning("Warning2") .build(); - Assertions.assertTrue(build.rebalancePlan().isPresent()); - final var thatAllocation = build.rebalancePlan().orElseThrow(); + final var thatAllocation = build.rebalancePlan(); final var thisTps = thisAllocation.topicPartitions(); final var thatTps = thatAllocation.topicPartitions(); Assertions.assertEquals(thisTps, thatTps); @@ -54,28 +53,4 @@ void testBuild() { Assertions.assertEquals("Warning1", build.warnings().get(1)); Assertions.assertEquals("Warning2", build.warnings().get(2)); } - - @Test - void testNoBuildTwice() { - // A builder should only build once. If a builder can build multiple times then it will have to - // do much copy work once a new build is requested. This will harm performance. - final var fakeClusterInfo = ClusterInfoProvider.fakeClusterInfo(10, 10, 10, 10); - final var logAllocation = ClusterLogAllocation.of(fakeClusterInfo); - final var build = RebalancePlanProposal.builder().withRebalancePlan(logAllocation); - - Assertions.assertDoesNotThrow(build::build); - Assertions.assertThrows(IllegalStateException.class, build::build); - } - - @Test - void testNoModifyAfterBuild() { - final var fakeClusterInfo = ClusterInfoProvider.fakeClusterInfo(10, 10, 10, 10); - final var logAllocation = ClusterLogAllocation.of(fakeClusterInfo); - final var build = RebalancePlanProposal.builder().withRebalancePlan(logAllocation); - - RebalancePlanProposal proposal = build.build(); - Assertions.assertThrows( - IllegalStateException.class, () -> build.addInfo("modify after built.")); - Assertions.assertFalse(proposal.info().contains("modify after built.")); - } } diff --git a/app/src/test/java/org/astraea/app/balancer/generator/ShufflePlanGeneratorTest.java b/app/src/test/java/org/astraea/app/balancer/generator/ShufflePlanGeneratorTest.java index a2b1d583c4..ee52531d84 100644 --- a/app/src/test/java/org/astraea/app/balancer/generator/ShufflePlanGeneratorTest.java +++ b/app/src/test/java/org/astraea/app/balancer/generator/ShufflePlanGeneratorTest.java @@ -56,7 +56,7 @@ void testMovement(int shuffle) { .limit(100) .forEach( proposal -> { - final var that = proposal.rebalancePlan().orElseThrow(); + final var that = proposal.rebalancePlan(); final var thisTps = allocation.topicPartitions(); final var thatTps = that.topicPartitions(); final var thisMap = @@ -78,8 +78,12 @@ void testNoNodes() { final var proposal = shufflePlanGenerator.generate(fakeClusterInfo).iterator().next(); System.out.println(proposal); - Assertions.assertFalse(proposal.rebalancePlan().isPresent()); + Assertions.assertTrue( + ClusterLogAllocation.findNonFulfilledAllocation( + ClusterLogAllocation.of(fakeClusterInfo), proposal.rebalancePlan()) + .isEmpty()); Assertions.assertTrue(proposal.warnings().size() >= 1); + Assertions.assertEquals(1, shufflePlanGenerator.generate(fakeClusterInfo).limit(10).count()); } @Test @@ -90,8 +94,8 @@ void testOneNode() { final var proposal = shufflePlanGenerator.generate(fakeClusterInfo).iterator().next(); System.out.println(proposal); - Assertions.assertFalse(proposal.rebalancePlan().isPresent()); Assertions.assertTrue(proposal.warnings().size() >= 1); + Assertions.assertEquals(1, shufflePlanGenerator.generate(fakeClusterInfo).limit(10).count()); } @Test @@ -102,8 +106,12 @@ void testNoTopic() { final var proposal = shufflePlanGenerator.generate(fakeClusterInfo).iterator().next(); System.out.println(proposal); - Assertions.assertFalse(proposal.rebalancePlan().isPresent()); + Assertions.assertTrue( + ClusterLogAllocation.findNonFulfilledAllocation( + ClusterLogAllocation.of(fakeClusterInfo), proposal.rebalancePlan()) + .isEmpty()); Assertions.assertTrue(proposal.warnings().size() >= 1); + Assertions.assertEquals(1, shufflePlanGenerator.generate(fakeClusterInfo).limit(10).count()); } @ParameterizedTest(name = "[{0}] {1} nodes, {2} topics, {3} partitions, {4} replicas")