Skip to content

Commit

Permalink
Revise RebalancePlanProposal (opensource4you#539)
Browse files Browse the repository at this point in the history
* revise proposal return type

remove `Optional`

* add javadoc

* fix

* fix javadoc

* ShufflePlanGenerator should return just one plan when unable to propose

* fix merge
  • Loading branch information
garyparrot authored Aug 18, 2022
1 parent eff7f21 commit a544ee7
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,7 @@ public static void main(String[] args) {
// TODO: implement one interface to select the best plan from many plan ,see #544
var rebalancePlan = rebalancePlanGenerator.generate(clusterInfo).findFirst().orElseThrow();
System.out.println(rebalancePlan);
var targetAllocation =
rebalancePlan
.rebalancePlan()
.orElseThrow(() -> new IllegalStateException("No suitable plan found"));
var targetAllocation = rebalancePlan.rebalancePlan();

System.out.println("[Target Allocation]");
System.out.println(ClusterLogAllocation.toString(targetAllocation));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.astraea.app.balancer.log.ClusterLogAllocation;

public interface RebalancePlanProposal {

Optional<ClusterLogAllocation> rebalancePlan();
ClusterLogAllocation rebalancePlan();

List<String> info();

Expand All @@ -40,54 +39,36 @@ class Build {
List<String> info = Collections.synchronizedList(new ArrayList<>());
List<String> warnings = Collections.synchronizedList(new ArrayList<>());

// guard by this
private boolean built = false;

public synchronized Build noRebalancePlan() {
ensureNotBuiltYet();
this.allocation = null;
return this;
}

public synchronized Build withRebalancePlan(ClusterLogAllocation clusterLogAllocation) {
ensureNotBuiltYet();
this.allocation = Objects.requireNonNull(clusterLogAllocation);
return this;
}

public synchronized Build addWarning(String warning) {
ensureNotBuiltYet();
this.warnings.add(warning);
return this;
}

public synchronized Build addInfo(String info) {
ensureNotBuiltYet();
this.info.add(info);
return this;
}

private synchronized void ensureNotBuiltYet() {
if (built) throw new IllegalStateException("This builder already built.");
}

public synchronized RebalancePlanProposal build() {
final var allocationRef = allocation;
final var allocationRef =
Objects.requireNonNull(allocation, () -> "No log allocation specified for this proposal");
final var infoRef = info;
final var warningRef = warnings;

ensureNotBuiltYet();

built = true;
allocation = null;
info = null;
warnings = null;

return new RebalancePlanProposal() {

@Override
public Optional<ClusterLogAllocation> rebalancePlan() {
return Optional.ofNullable(allocationRef);
public ClusterLogAllocation rebalancePlan() {
return allocationRef;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ public interface RebalancePlanGenerator {
* same plan for the same input argument. There can be some randomization that takes part in this
* process.
*
* <p>If the generator implementation thinks it can't find any rebalance proposal(which the plan
* might improve the cluster). Then the implementation should return a Stream with exactly one
* rebalance plan proposal in it, where the proposed allocation will be exactly the same as the
* {@code baseAllocation} parameter. This means there is no movement or alteration will occur. And
* the implementation should place some detailed information in the info/warning/error string, to
* indicate the reason for no meaningful plan.
*
* @param clusterInfo the cluster state, implementation can take advantage of the data inside to
* proposal the plan it feels confident to improve the cluster.
* @return a {@link Stream} generating rebalance plan regarding the given {@link ClusterInfo}
Expand All @@ -42,6 +49,13 @@ default Stream<RebalancePlanProposal> generate(ClusterInfo clusterInfo) {
* same plan for the same input argument. There can be some randomization that takes part in this
* process.
*
* <p>If the generator implementation thinks it can't find any rebalance proposal(which the plan
* might improve the cluster). Then the implementation should return a Stream with exactly one
* rebalance plan proposal in it, where the proposed allocation will be exactly the same as the
* {@code baseAllocation} parameter. This means there is no movement or alteration that will
* occur. And The implementation should place some detailed information in the info/warning/error
* string, to indicate the reason for no meaningful plan.
*
* @param clusterInfo the cluster state, implementation can take advantage of the data inside to
* proposal the plan it feels confident to improve the cluster.
* @param baseAllocation the cluster log allocation as the based of proposal generation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,32 +63,36 @@ public ShufflePlanGenerator(Supplier<Integer> numberOfShuffle) {
@Override
public Stream<RebalancePlanProposal> generate(
ClusterInfo clusterInfo, ClusterLogAllocation baseAllocation) {
final var brokerIds =
clusterInfo.nodes().stream().map(NodeInfo::id).collect(Collectors.toUnmodifiableSet());

if (brokerIds.size() == 0) {
return Stream.of(
RebalancePlanProposal.builder()
.withRebalancePlan(baseAllocation)
.addWarning("There is no broker")
.build());
}

if (brokerIds.size() == 1) {
return Stream.of(
RebalancePlanProposal.builder()
.withRebalancePlan(baseAllocation)
.addWarning("Only one broker exists, unable to do some migration")
.build());
}

if (clusterInfo.topics().size() == 0) {
return Stream.of(
RebalancePlanProposal.builder()
.withRebalancePlan(baseAllocation)
.addWarning("No non-ignored topic to working on.")
.build());
}

return Stream.generate(
() -> {
final var rebalancePlanBuilder = RebalancePlanProposal.builder();
final var brokerIds =
clusterInfo.nodes().stream()
.map(NodeInfo::id)
.collect(Collectors.toUnmodifiableSet());

if (brokerIds.size() == 0)
return rebalancePlanBuilder
.addWarning("Why there is no broker?")
.noRebalancePlan()
.build();

if (brokerIds.size() == 1)
return rebalancePlanBuilder
.addWarning("Only one broker exists. There is no reason to rebalance.")
.noRebalancePlan()
.build();

if (clusterInfo.topics().size() == 0)
return rebalancePlanBuilder
.addWarning("No non-ignored topic to working on.")
.noRebalancePlan()
.build();

final var shuffleCount = numberOfShuffle.get();

rebalancePlanBuilder.addInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ void testBuild() {
.addWarning("Warning2")
.build();

Assertions.assertTrue(build.rebalancePlan().isPresent());
final var thatAllocation = build.rebalancePlan().orElseThrow();
final var thatAllocation = build.rebalancePlan();
final var thisTps = thisAllocation.topicPartitions();
final var thatTps = thatAllocation.topicPartitions();
Assertions.assertEquals(thisTps, thatTps);
Expand All @@ -54,28 +53,4 @@ void testBuild() {
Assertions.assertEquals("Warning1", build.warnings().get(1));
Assertions.assertEquals("Warning2", build.warnings().get(2));
}

@Test
void testNoBuildTwice() {
// A builder should only build once. If a builder can build multiple times then it will have to
// do much copy work once a new build is requested. This will harm performance.
final var fakeClusterInfo = ClusterInfoProvider.fakeClusterInfo(10, 10, 10, 10);
final var logAllocation = ClusterLogAllocation.of(fakeClusterInfo);
final var build = RebalancePlanProposal.builder().withRebalancePlan(logAllocation);

Assertions.assertDoesNotThrow(build::build);
Assertions.assertThrows(IllegalStateException.class, build::build);
}

@Test
void testNoModifyAfterBuild() {
final var fakeClusterInfo = ClusterInfoProvider.fakeClusterInfo(10, 10, 10, 10);
final var logAllocation = ClusterLogAllocation.of(fakeClusterInfo);
final var build = RebalancePlanProposal.builder().withRebalancePlan(logAllocation);

RebalancePlanProposal proposal = build.build();
Assertions.assertThrows(
IllegalStateException.class, () -> build.addInfo("modify after built."));
Assertions.assertFalse(proposal.info().contains("modify after built."));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void testMovement(int shuffle) {
.limit(100)
.forEach(
proposal -> {
final var that = proposal.rebalancePlan().orElseThrow();
final var that = proposal.rebalancePlan();
final var thisTps = allocation.topicPartitions();
final var thatTps = that.topicPartitions();
final var thisMap =
Expand All @@ -78,8 +78,12 @@ void testNoNodes() {
final var proposal = shufflePlanGenerator.generate(fakeClusterInfo).iterator().next();

System.out.println(proposal);
Assertions.assertFalse(proposal.rebalancePlan().isPresent());
Assertions.assertTrue(
ClusterLogAllocation.findNonFulfilledAllocation(
ClusterLogAllocation.of(fakeClusterInfo), proposal.rebalancePlan())
.isEmpty());
Assertions.assertTrue(proposal.warnings().size() >= 1);
Assertions.assertEquals(1, shufflePlanGenerator.generate(fakeClusterInfo).limit(10).count());
}

@Test
Expand All @@ -90,8 +94,8 @@ void testOneNode() {
final var proposal = shufflePlanGenerator.generate(fakeClusterInfo).iterator().next();

System.out.println(proposal);
Assertions.assertFalse(proposal.rebalancePlan().isPresent());
Assertions.assertTrue(proposal.warnings().size() >= 1);
Assertions.assertEquals(1, shufflePlanGenerator.generate(fakeClusterInfo).limit(10).count());
}

@Test
Expand All @@ -102,8 +106,12 @@ void testNoTopic() {
final var proposal = shufflePlanGenerator.generate(fakeClusterInfo).iterator().next();

System.out.println(proposal);
Assertions.assertFalse(proposal.rebalancePlan().isPresent());
Assertions.assertTrue(
ClusterLogAllocation.findNonFulfilledAllocation(
ClusterLogAllocation.of(fakeClusterInfo), proposal.rebalancePlan())
.isEmpty());
Assertions.assertTrue(proposal.warnings().size() >= 1);
Assertions.assertEquals(1, shufflePlanGenerator.generate(fakeClusterInfo).limit(10).count());
}

@ParameterizedTest(name = "[{0}] {1} nodes, {2} topics, {3} partitions, {4} replicas")
Expand Down

0 comments on commit a544ee7

Please sign in to comment.