diff --git a/app/src/main/java/org/astraea/app/App.java b/app/src/main/java/org/astraea/app/App.java index 481501bfff..118b048af1 100644 --- a/app/src/main/java/org/astraea/app/App.java +++ b/app/src/main/java/org/astraea/app/App.java @@ -70,8 +70,8 @@ static void execute(Map> mains, List args) throws Throw method.invoke(null, (Object) args.subList(1, args.size()).toArray(String[]::new)); } catch (InvocationTargetException targetException) { // Print out ParameterException, don't throw. - if (targetException.getTargetException() instanceof ParameterException) { - System.out.println(targetException.getTargetException().getMessage()); + if (targetException.getTargetException() instanceof ParameterException exception) { + System.out.println(exception.getMessage()); } else { throw targetException.getTargetException(); } diff --git a/app/src/main/java/org/astraea/app/argument/PositiveIntegerListField.java b/app/src/main/java/org/astraea/app/argument/PositiveIntegerListField.java index 09dc5943b1..e2038ec71d 100644 --- a/app/src/main/java/org/astraea/app/argument/PositiveIntegerListField.java +++ b/app/src/main/java/org/astraea/app/argument/PositiveIntegerListField.java @@ -17,12 +17,11 @@ package org.astraea.app.argument; import java.util.List; -import java.util.stream.Collectors; import java.util.stream.Stream; public class PositiveIntegerListField extends PositiveNumberListField { @Override public List convert(String value) { - return Stream.of(value.split(SEPARATOR)).map(Integer::valueOf).collect(Collectors.toList()); + return Stream.of(value.split(SEPARATOR)).map(Integer::valueOf).toList(); } } diff --git a/app/src/main/java/org/astraea/app/argument/PositiveShortListField.java b/app/src/main/java/org/astraea/app/argument/PositiveShortListField.java index 6baf4f3592..93475db64c 100644 --- a/app/src/main/java/org/astraea/app/argument/PositiveShortListField.java +++ b/app/src/main/java/org/astraea/app/argument/PositiveShortListField.java @@ -17,12 +17,11 @@ package org.astraea.app.argument; import java.util.List; -import java.util.stream.Collectors; import java.util.stream.Stream; public class PositiveShortListField extends PositiveNumberListField { @Override public List convert(String value) { - return Stream.of(value.split(SEPARATOR)).map(Short::valueOf).collect(Collectors.toList()); + return Stream.of(value.split(SEPARATOR)).map(Short::valueOf).toList(); } } diff --git a/app/src/main/java/org/astraea/app/argument/StringListField.java b/app/src/main/java/org/astraea/app/argument/StringListField.java index 98b02621a6..731fb423b3 100644 --- a/app/src/main/java/org/astraea/app/argument/StringListField.java +++ b/app/src/main/java/org/astraea/app/argument/StringListField.java @@ -17,12 +17,11 @@ package org.astraea.app.argument; import java.util.List; -import java.util.stream.Collectors; import java.util.stream.Stream; public class StringListField extends ListField { @Override public List convert(String value) { - return Stream.of(value.split(SEPARATOR)).collect(Collectors.toList()); + return Stream.of(value.split(SEPARATOR)).toList(); } } diff --git a/app/src/main/java/org/astraea/app/web/BalancerHandler.java b/app/src/main/java/org/astraea/app/web/BalancerHandler.java index eaebb83a3c..9b88995175 100644 --- a/app/src/main/java/org/astraea/app/web/BalancerHandler.java +++ b/app/src/main/java/org/astraea/app/web/BalancerHandler.java @@ -29,7 +29,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.function.Supplier; -import java.util.stream.Collectors; import org.astraea.common.Configuration; import org.astraea.common.Utils; import org.astraea.common.admin.Admin; @@ -284,11 +283,11 @@ static Change from(Collection before, Collection after) { before.stream() .sorted(Comparator.comparing(Replica::isPreferredLeader).reversed()) .map(r -> new Placement(r, Optional.of(r.size()))) - .collect(Collectors.toList()), + .toList(), after.stream() .sorted(Comparator.comparing(Replica::isPreferredLeader).reversed()) .map(r -> new Placement(r, Optional.empty())) - .collect(Collectors.toList())); + .toList()); } Change(String topic, int partition, List before, List after) { diff --git a/app/src/main/java/org/astraea/app/web/ReassignmentHandler.java b/app/src/main/java/org/astraea/app/web/ReassignmentHandler.java index fee8c4823e..f798c05362 100644 --- a/app/src/main/java/org/astraea/app/web/ReassignmentHandler.java +++ b/app/src/main/java/org/astraea/app/web/ReassignmentHandler.java @@ -111,7 +111,7 @@ public CompletionStage post(Channel channel) { .filter( b -> b.topicPartitions().contains(tp)) .map(NodeInfo::id) - .collect(Collectors.toList()); + .toList(); if (!ids.isEmpty()) return ids; return List.of( availableBrokers diff --git a/app/src/main/java/org/astraea/app/web/RecordHandler.java b/app/src/main/java/org/astraea/app/web/RecordHandler.java index 8f3a70178f..a2eda70c95 100644 --- a/app/src/main/java/org/astraea/app/web/RecordHandler.java +++ b/app/src/main/java/org/astraea/app/web/RecordHandler.java @@ -17,7 +17,6 @@ package org.astraea.app.web; import static java.util.Objects.requireNonNull; -import static java.util.stream.Collectors.toList; import java.time.Duration; import java.util.Base64; @@ -163,8 +162,7 @@ public CompletionStage get(Channel channel) { // visible for testing GetResponse get(Consumer consumer, int limit, Duration timeout) { try { - return new GetResponse( - consumer, consumer.poll(timeout).stream().map(Record::new).collect(toList())); + return new GetResponse(consumer, consumer.poll(timeout).stream().map(Record::new).toList()); } catch (Exception e) { consumer.close(); throw e; @@ -190,9 +188,7 @@ public CompletionStage post(Channel channel) { () -> { try { return producer.send( - records.stream() - .map(record -> createRecord(producer, record)) - .collect(toList())); + records.stream().map(record -> createRecord(producer, record)).toList()); } finally { if (producer.transactional()) { producer.close(); @@ -214,7 +210,7 @@ public CompletionStage post(Channel channel) { return Response.for404("missing result"); })) .map(CompletionStage::toCompletableFuture) - .collect(toList()))); + .toList())); if (postRequest.async()) return CompletableFuture.completedFuture(Response.ACCEPT); return CompletableFuture.completedFuture( @@ -410,8 +406,8 @@ public String json() { @Override public void onComplete(Throwable error) { try { - if (error == null && consumer instanceof SubscribedConsumer) { - ((SubscribedConsumer) consumer).commitOffsets(Duration.ofSeconds(5)); + if (error == null && consumer instanceof SubscribedConsumer subscribedConsumer) { + subscribedConsumer.commitOffsets(Duration.ofSeconds(5)); } } finally { consumer.close(); @@ -438,7 +434,7 @@ static class Record { timestamp = record.timestamp(); serializedKeySize = record.serializedKeySize(); serializedValueSize = record.serializedValueSize(); - headers = record.headers().stream().map(Header::new).collect(toList()); + headers = record.headers().stream().map(Header::new).toList(); key = record.key(); value = record.value(); leaderEpoch = record.leaderEpoch().orElse(null); diff --git a/app/src/main/java/org/astraea/app/web/SkewedPartitionScenario.java b/app/src/main/java/org/astraea/app/web/SkewedPartitionScenario.java index 7e45b7700a..7fa6eb074c 100644 --- a/app/src/main/java/org/astraea/app/web/SkewedPartitionScenario.java +++ b/app/src/main/java/org/astraea/app/web/SkewedPartitionScenario.java @@ -62,8 +62,7 @@ public CompletionStage apply(Admin admin) { admin.waitPartitionLeaderSynced( Map.of(topicName, partitions), Duration.ofSeconds(4))) .thenCompose(ignored -> admin.brokers()) - .thenApply( - brokers -> brokers.stream().map(NodeInfo::id).sorted().collect(Collectors.toList())) + .thenApply(brokers -> brokers.stream().map(NodeInfo::id).sorted().toList()) .thenCompose( brokerIds -> { var distribution = diff --git a/app/src/main/java/org/astraea/app/web/TopicHandler.java b/app/src/main/java/org/astraea/app/web/TopicHandler.java index 34243bcb17..1e7beec0f2 100644 --- a/app/src/main/java/org/astraea/app/web/TopicHandler.java +++ b/app/src/main/java/org/astraea/app/web/TopicHandler.java @@ -190,7 +190,7 @@ public CompletionStage post(Channel channel) { .thenApply(ignored -> null) .toCompletableFuture(); }) - .collect(Collectors.toList())) + .toList()) .thenCompose(ignored -> get(topicNames, null, id -> true)) .exceptionally( ignored -> diff --git a/app/src/test/java/org/astraea/app/EnumInfoTest.java b/app/src/test/java/org/astraea/app/EnumInfoTest.java index 310ba1fcf1..82bdd57b3f 100644 --- a/app/src/test/java/org/astraea/app/EnumInfoTest.java +++ b/app/src/test/java/org/astraea/app/EnumInfoTest.java @@ -17,7 +17,6 @@ package org.astraea.app; import java.util.Arrays; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.astraea.common.EnumInfo; import org.junit.jupiter.api.Assertions; @@ -104,14 +103,13 @@ void testProductionClass() { Assertions.assertTrue(productionClasses.size() > 100); Assertions.assertTrue( productionClasses.stream().allMatch(x -> x.getPackageName().startsWith("org.astraea"))); - System.out.println( - productionClasses.stream().filter(Class::isEnum).collect(Collectors.toList())); + System.out.println(productionClasses.stream().filter(Class::isEnum).toList()); } @Test void testEnumClassProvider() { var enumClassProvider = new EnumClassProvider(); - var enumCls = enumClassProvider.provideArguments(null).collect(Collectors.toList()); + var enumCls = enumClassProvider.provideArguments(null).toList(); Assertions.assertTrue(enumCls.size() > 0); Assertions.assertTrue(enumCls.stream().map(x -> (Class) x.get()[0]).allMatch(Class::isEnum)); } diff --git a/app/src/test/java/org/astraea/app/TestUtils.java b/app/src/test/java/org/astraea/app/TestUtils.java index 426cf62c52..26074ba091 100644 --- a/app/src/test/java/org/astraea/app/TestUtils.java +++ b/app/src/test/java/org/astraea/app/TestUtils.java @@ -46,14 +46,14 @@ public static List> getProductionClass() { FileUtils.listFiles(mainDir.toFile(), new String[] {"class"}, true).stream() .map(File::toPath) .map(mainDir::relativize) - .collect(Collectors.toList()); + .toList(); var classNames = dirFiles.stream() .map(Path::toString) .map(FilenameUtils::removeExtension) .map(x -> x.replace(File.separatorChar, '.')) - .collect(Collectors.toList()); + .toList(); return classNames.stream() .map(x -> Utils.packException(() -> Class.forName(x))) diff --git a/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java b/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java index 73c9ba3aed..f6727111cb 100644 --- a/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java +++ b/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java @@ -193,10 +193,7 @@ private static Set createAndProduceTopic( .join(); if (skewed) { Utils.sleep(Duration.ofSeconds(1)); - var placement = - service.dataFolders().keySet().stream() - .limit(replicas) - .collect(Collectors.toUnmodifiableList()); + var placement = service.dataFolders().keySet().stream().limit(replicas).toList(); admin .moveToBrokers( admin.topicPartitions(Set.of(topic)).toCompletableFuture().join().stream() @@ -227,27 +224,19 @@ private static Set createAndProduceTopic( void testBestPlan() { try (var admin = Admin.of(SERVICE.bootstrapServers())) { var currentClusterInfo = - ClusterInfo.of( - "fake", - List.of(NodeInfo.of(10, "host", 22), NodeInfo.of(11, "host", 22)), - Map.of(), - List.of( - Replica.builder() - .topic("topic") - .partition(0) - .nodeInfo(NodeInfo.of(10, "host", 22)) - .lag(0) - .size(100) - .isLeader(true) - .isSync(true) - .isFuture(false) - .isOffline(false) - .isPreferredLeader(true) - .path("/tmp/aa") - .build())); + ClusterInfo.builder() + .addNode(Set.of(1, 2)) + .addFolders( + Map.ofEntries(Map.entry(1, Set.of("/folder")), Map.entry(2, Set.of("/folder")))) + .addTopic("topic", 1, (short) 1) + .build(); HasClusterCost clusterCostFunction = - (clusterInfo, clusterBean) -> () -> clusterInfo == currentClusterInfo ? 100D : 10D; + (clusterInfo, clusterBean) -> + () -> + ClusterInfo.findNonFulfilledAllocation(currentClusterInfo, clusterInfo).isEmpty() + ? 100D + : 10D; HasMoveCost moveCostFunction = HasMoveCost.EMPTY; HasMoveCost failMoveCostFunction = (before, after, clusterBean) -> () -> true; @@ -955,13 +944,13 @@ void testChangeOrder() { .mapToObj(partition -> Map.entry(ThreadLocalRandom.current().nextInt(), partition)) .sorted(Map.Entry.comparingByKey()) .map(Map.Entry::getValue) - .collect(Collectors.toUnmodifiableList()); + .toList(); var destPlacement = IntStream.range(0, 10) .mapToObj(partition -> Map.entry(ThreadLocalRandom.current().nextInt(), partition)) .sorted(Map.Entry.comparingByKey()) .map(Map.Entry::getValue) - .collect(Collectors.toUnmodifiableList()); + .toList(); var base = ClusterInfo.builder() .addNode(Set.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)) diff --git a/app/src/test/java/org/astraea/app/web/GroupHandlerTest.java b/app/src/test/java/org/astraea/app/web/GroupHandlerTest.java index 5bc4c86639..de375d0c2b 100644 --- a/app/src/test/java/org/astraea/app/web/GroupHandlerTest.java +++ b/app/src/test/java/org/astraea/app/web/GroupHandlerTest.java @@ -22,7 +22,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletionException; -import java.util.stream.Collectors; import java.util.stream.IntStream; import org.astraea.common.Utils; import org.astraea.common.admin.Admin; @@ -237,8 +236,7 @@ void testDeleteGroup() { try (var admin = Admin.of(SERVICE.bootstrapServers())) { var handler = new GroupHandler(admin); - var groupIds = - IntStream.range(0, 3).mapToObj(x -> Utils.randomString(10)).collect(Collectors.toList()); + var groupIds = IntStream.range(0, 3).mapToObj(x -> Utils.randomString(10)).toList(); groupIds.forEach( groupId -> { try (var consumer = diff --git a/app/src/test/java/org/astraea/app/web/ProducerHandlerTest.java b/app/src/test/java/org/astraea/app/web/ProducerHandlerTest.java index 631b044733..ee3ed3e0a0 100644 --- a/app/src/test/java/org/astraea/app/web/ProducerHandlerTest.java +++ b/app/src/test/java/org/astraea/app/web/ProducerHandlerTest.java @@ -55,10 +55,7 @@ void testListProducers() { handler.get(Channel.EMPTY).toCompletableFuture().join()); Assertions.assertNotEquals(0, result.partitions.size()); - var partitions = - result.partitions.stream() - .filter(t -> t.topic.equals(topicName)) - .collect(Collectors.toUnmodifiableList()); + var partitions = result.partitions.stream().filter(t -> t.topic.equals(topicName)).toList(); Assertions.assertEquals(1, partitions.size()); Assertions.assertEquals(topicName, partitions.iterator().next().topic); Assertions.assertEquals(0, partitions.iterator().next().partition); diff --git a/app/src/test/java/org/astraea/app/web/RecordHandlerTest.java b/app/src/test/java/org/astraea/app/web/RecordHandlerTest.java index ed44664d15..7673b035e1 100644 --- a/app/src/test/java/org/astraea/app/web/RecordHandlerTest.java +++ b/app/src/test/java/org/astraea/app/web/RecordHandlerTest.java @@ -38,7 +38,6 @@ import java.util.Map; import java.util.Set; import java.util.function.Function; -import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; import org.astraea.app.web.RecordHandler.Metadata; @@ -150,8 +149,7 @@ void testPostRawString() { .keyDeserializer(Deserializer.STRING) .valueDeserializer(Deserializer.STRING) .build()) { - var records = - consumer.poll(Duration.ofSeconds(5)).stream().collect(Collectors.toUnmodifiableList()); + var records = consumer.poll(Duration.ofSeconds(5)).stream().toList(); Assertions.assertEquals(1, records.size()); Assertions.assertEquals(0, records.get(0).partition()); Assertions.assertEquals("abc", records.get(0).key()); @@ -209,8 +207,7 @@ void testPost(boolean isTransaction) { .keyDeserializer(Deserializer.STRING) .valueDeserializer(Deserializer.INTEGER) .build()) { - var records = - consumer.poll(Duration.ofSeconds(10)).stream().collect(Collectors.toUnmodifiableList()); + var records = consumer.poll(Duration.ofSeconds(10)).stream().toList(); Assertions.assertEquals(2, records.size()); var record = records.get(0); @@ -820,7 +817,7 @@ void testDelete() { var records = Stream.of(0, 0, 1, 1, 1, 2, 2, 2, 2) .map(x -> Record.builder().topic(topicName).partition(x).value(new byte[100]).build()) - .collect(Collectors.toList()); + .toList(); producer.send(records); producer.flush(); @@ -904,7 +901,7 @@ void testDeleteOffset() { var records = Stream.of(0, 0, 1, 1, 1, 2, 2, 2, 2) .map(x -> Record.builder().topic(topicName).partition(x).value(new byte[100]).build()) - .collect(Collectors.toList()); + .toList(); producer.send(records); producer.flush(); @@ -961,7 +958,7 @@ void testDeletePartition() { var records = Stream.of(0, 0, 1, 1, 1, 2, 2, 2, 2) .map(x -> Record.builder().topic(topicName).partition(x).value(new byte[100]).build()) - .collect(Collectors.toList()); + .toList(); producer.send(records); producer.flush(); diff --git a/app/src/test/java/org/astraea/app/web/RequestTest.java b/app/src/test/java/org/astraea/app/web/RequestTest.java index 0aa3fd8b0e..74457b3e26 100644 --- a/app/src/test/java/org/astraea/app/web/RequestTest.java +++ b/app/src/test/java/org/astraea/app/web/RequestTest.java @@ -20,7 +20,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.astraea.app.TestUtils; import org.astraea.app.web.RecordHandler.PostRecord; @@ -60,7 +59,7 @@ private static List> requestClasses() { return TestUtils.getProductionClass().stream() .filter(Request.class::isAssignableFrom) .filter(c -> !c.isInterface()) - .collect(Collectors.toList()); + .toList(); } public static class RequestClassProvider implements ArgumentsProvider { diff --git a/app/src/test/java/org/astraea/app/web/ThrottleHandlerTest.java b/app/src/test/java/org/astraea/app/web/ThrottleHandlerTest.java index c00a850fa5..35ebc92214 100644 --- a/app/src/test/java/org/astraea/app/web/ThrottleHandlerTest.java +++ b/app/src/test/java/org/astraea/app/web/ThrottleHandlerTest.java @@ -176,9 +176,7 @@ void testThrottleSomeLogs() { handler.get(Channel.EMPTY).toCompletableFuture().join()); var topic = - throttleSetting.topics.stream() - .filter(t -> t.name.get().equals(topicName)) - .collect(Collectors.toList()); + throttleSetting.topics.stream().filter(t -> t.name.get().equals(topicName)).toList(); Assertions.assertEquals(2, topic.size()); var leader = @@ -228,9 +226,7 @@ void testThrottleEveryLog() { ThrottleHandler.ThrottleSetting.class, handler.get(Channel.EMPTY).toCompletableFuture().join()); var topic = - throttleSetting.topics.stream() - .filter(t -> t.name.get().equals(topicName)) - .collect(Collectors.toList()); + throttleSetting.topics.stream().filter(t -> t.name.get().equals(topicName)).toList(); Assertions.assertEquals(9, topic.size()); IntStream.range(0, 3) diff --git a/app/src/test/java/org/astraea/app/web/TopicHandlerForProbabilityTest.java b/app/src/test/java/org/astraea/app/web/TopicHandlerForProbabilityTest.java index 0617fa3757..71ca06ce5a 100644 --- a/app/src/test/java/org/astraea/app/web/TopicHandlerForProbabilityTest.java +++ b/app/src/test/java/org/astraea/app/web/TopicHandlerForProbabilityTest.java @@ -69,8 +69,7 @@ void testCreateTopicByProbability() { .partitions.stream() .flatMap(p -> p.replicas.stream()) .collect(Collectors.groupingBy(r -> r.broker)); - var numberOfReplicas = - groupByBroker.values().stream().map(List::size).collect(Collectors.toList()); + var numberOfReplicas = groupByBroker.values().stream().map(List::size).toList(); replica0 += numberOfReplicas.get(0); replica1 += numberOfReplicas.get(1); replica2 += numberOfReplicas.size() == 3 ? numberOfReplicas.get(2) : 0; diff --git a/app/src/test/java/org/astraea/app/web/TopicHandlerTest.java b/app/src/test/java/org/astraea/app/web/TopicHandlerTest.java index 6aa023deff..972394489e 100644 --- a/app/src/test/java/org/astraea/app/web/TopicHandlerTest.java +++ b/app/src/test/java/org/astraea/app/web/TopicHandlerTest.java @@ -322,8 +322,7 @@ void testCreateTopicWithReplicas() { @Test void testDeleteTopic() { - var topicNames = - IntStream.range(0, 3).mapToObj(x -> Utils.randomString(10)).collect(Collectors.toList()); + var topicNames = IntStream.range(0, 3).mapToObj(x -> Utils.randomString(10)).toList(); try (var admin = Admin.of(SERVICE.bootstrapServers())) { var handler = new TopicHandler(admin); for (var name : topicNames) diff --git a/common/src/main/java/org/astraea/common/ByteUtils.java b/common/src/main/java/org/astraea/common/ByteUtils.java index 38a509fe52..29c304a041 100644 --- a/common/src/main/java/org/astraea/common/ByteUtils.java +++ b/common/src/main/java/org/astraea/common/ByteUtils.java @@ -17,6 +17,7 @@ package org.astraea.common; import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Timestamp; import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; @@ -178,6 +179,10 @@ public static byte[] toBytes(BeanObject value) { // Bean attribute may contain non-primitive value. e.g. TimeUnit, Byte. } }); + beanBuilder.setCreatedTimestamp( + Timestamp.newBuilder() + .setSeconds(value.createdTimestamp() / 1000) + .setNanos((int) (value.createdTimestamp() % 1000) * 1000000)); return beanBuilder.build().toByteArray(); } @@ -313,7 +318,9 @@ public static BeanObject readBeanObject(byte[] bytes) throws SerializationExcept outerBean.getAttributesMap().entrySet().stream() .collect( Collectors.toUnmodifiableMap( - Map.Entry::getKey, e -> Objects.requireNonNull(toObject(e.getValue()))))); + Map.Entry::getKey, e -> Objects.requireNonNull(toObject(e.getValue())))), + outerBean.getCreatedTimestamp().getSeconds() * 1000 + + outerBean.getCreatedTimestamp().getNanos() / 1000000); } catch (InvalidProtocolBufferException ex) { // Pack exception thrown by protoBuf to Serialization exception. throw new SerializationException(ex); diff --git a/common/src/main/java/org/astraea/common/admin/ClusterInfoBuilder.java b/common/src/main/java/org/astraea/common/admin/ClusterInfoBuilder.java index c06919747a..1aa93e1c40 100644 --- a/common/src/main/java/org/astraea/common/admin/ClusterInfoBuilder.java +++ b/common/src/main/java/org/astraea/common/admin/ClusterInfoBuilder.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -92,6 +93,18 @@ public ClusterInfoBuilder addNode(Set brokerIds) { }); } + /** + * Remove specific brokers from the cluster state. + * + * @param toRemove id to remove + * @return this + */ + public ClusterInfoBuilder removeNodes(Predicate toRemove) { + return applyNodes( + (nodes, replicas) -> + nodes.stream().filter(node -> toRemove.negate().test(node.id())).toList()); + } + /** * Add some fake folders to a specific broker. * diff --git a/common/src/main/java/org/astraea/common/assignor/Generator.java b/common/src/main/java/org/astraea/common/assignor/Generator.java new file mode 100644 index 0000000000..1cda8bdbaf --- /dev/null +++ b/common/src/main/java/org/astraea/common/assignor/Generator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.assignor; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import org.astraea.common.admin.TopicPartition; + +@FunctionalInterface +public interface Generator { + + Map> get(); + + static Generator randomGenerator( + Map subscription, + Map partitionCost, + Hint hints) { + return () -> { + Map> combinator = + subscription.keySet().stream() + .map(c -> Map.entry(c, new ArrayList())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + List candidates; + + for (var tp : partitionCost.keySet()) { + candidates = hints.get(combinator, tp); + if (candidates.isEmpty()) candidates = subscription.keySet().stream().toList(); + + combinator + .get(candidates.get(ThreadLocalRandom.current().nextInt(candidates.size()))) + .add(tp); + } + + return combinator; + }; + } +} diff --git a/common/src/main/java/org/astraea/common/assignor/Hint.java b/common/src/main/java/org/astraea/common/assignor/Hint.java new file mode 100644 index 0000000000..87a7e41061 --- /dev/null +++ b/common/src/main/java/org/astraea/common/assignor/Hint.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.assignor; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.astraea.common.admin.TopicPartition; + +@FunctionalInterface +public interface Hint { + List get(Map> currentAssignment, TopicPartition tp); + + static Hint of(Set hints) { + return (currentAssignment, tp) -> + hints.stream() + .map(h -> h.get(currentAssignment, tp)) + .reduce((l1, l2) -> l1.stream().filter(l2::contains).toList()) + .get(); + } + + static Hint lowCostHint( + Map subscriptions, Map partitionCost) { + return (currentAssignment, tp) -> { + var candidates = + currentAssignment.entrySet().stream() + .filter(e -> subscriptions.get(e.getKey()).topics().contains(tp.topic())) + .map( + e -> + Map.entry( + e.getKey(), e.getValue().stream().mapToDouble(partitionCost::get).sum())) + .sorted(Map.Entry.comparingByValue()) + .map(Map.Entry::getKey) + .toList(); + + return candidates.stream().limit((long) Math.ceil(candidates.size() / 2.0)).toList(); + }; + } + + static Hint incompatibleHint( + Map subscriptions, + Map> incompatibilities) { + return (currentAssignment, tp) -> { + var subscriber = + subscriptions.entrySet().stream() + .filter(e -> e.getValue().topics().contains(tp.topic())) + .map(Map.Entry::getKey) + .toList(); + if (incompatibilities.get(tp).isEmpty()) return subscriber; + + var candidates = + currentAssignment.entrySet().stream() + .filter(e -> subscriber.contains(e.getKey())) + .map( + e -> + Map.entry( + e.getKey(), + e.getValue().stream() + .filter(p -> incompatibilities.get(p).contains(tp)) + .count())) + .collect( + Collectors.groupingBy( + Map.Entry::getValue, + Collectors.mapping(Map.Entry::getKey, Collectors.toList()))) + .entrySet() + .stream() + .min(Map.Entry.comparingByKey()) + .get() + .getValue(); + + return candidates.isEmpty() ? List.of() : candidates; + }; + } +} diff --git a/common/src/main/java/org/astraea/common/assignor/Limiter.java b/common/src/main/java/org/astraea/common/assignor/Limiter.java new file mode 100644 index 0000000000..a782eaaa6f --- /dev/null +++ b/common/src/main/java/org/astraea/common/assignor/Limiter.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.assignor; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.astraea.common.admin.TopicPartition; + +@FunctionalInterface +public interface Limiter { + + boolean check(Map> condition); + + static Limiter of(Set limiters) { + return (combinator) -> limiters.stream().allMatch(l -> l.check(combinator)); + } + + static Limiter incompatibleLimiter(Map> incompatible) { + return (combinator) -> + combinator.entrySet().stream() + .map( + e -> { + var consumer = e.getKey(); + var tps = e.getValue(); + + var unsuitable = + incompatible.entrySet().stream() + .filter(entry -> tps.contains(entry.getKey())) + .flatMap(entry -> entry.getValue().stream()) + .collect(Collectors.toUnmodifiableSet()); + return Map.entry(consumer, tps.stream().filter(unsuitable::contains).count()); + }) + .mapToDouble(Map.Entry::getValue) + .sum() + == 0; + } + + static Limiter skewCostLimiter( + Map partitionCost, Map subscriptions) { + var tmpConsumerCost = + subscriptions.keySet().stream() + .map(c -> Map.entry(c, 0.0)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + partitionCost.entrySet().stream() + .sorted(Map.Entry.comparingByValue()) + .map(Map.Entry::getKey) + .forEach( + tp -> { + var minCostConsumer = + tmpConsumerCost.entrySet().stream().min(Map.Entry.comparingByValue()).get(); + minCostConsumer.setValue(minCostConsumer.getValue() + partitionCost.get(tp)); + }); + var standardDeviation = + (Function, Double>) + (vs) -> { + var average = vs.stream().mapToDouble(c -> c).average().getAsDouble(); + return Math.sqrt( + vs.stream().mapToDouble(v -> Math.pow(v - average, 2)).average().getAsDouble()); + }; + var limit = standardDeviation.apply(tmpConsumerCost.values()); + + return (combinator) -> { + var sd = + standardDeviation.apply( + combinator.values().stream() + .map(tps -> tps.stream().mapToDouble(partitionCost::get).sum()) + .collect(Collectors.toSet())); + + return sd < limit; + }; + } +} diff --git a/common/src/main/java/org/astraea/common/assignor/Shuffler.java b/common/src/main/java/org/astraea/common/assignor/Shuffler.java new file mode 100644 index 0000000000..19ff5c7260 --- /dev/null +++ b/common/src/main/java/org/astraea/common/assignor/Shuffler.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.assignor; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.astraea.common.Configuration; +import org.astraea.common.admin.TopicPartition; + +public interface Shuffler { + Map> shuffle(); + + static Shuffler randomShuffler( + Map subscriptions, + Map partitionCost, + Map> incompatible, + Configuration config) { + var limiters = + Limiter.of( + Set.of( + Limiter.skewCostLimiter(partitionCost, subscriptions), + Limiter.incompatibleLimiter(incompatible))); + var hints = + Hint.of( + Set.of( + Hint.lowCostHint(subscriptions, partitionCost), + Hint.incompatibleHint(subscriptions, incompatible))); + var generator = Generator.randomGenerator(subscriptions, partitionCost, hints); + var shuffleTime = config.duration("shuffle.time").get().toMillis(); + var standardDeviation = + (Function>, Double>) + (combinator) -> { + var costPerConsumer = + combinator.entrySet().stream() + .map( + e -> + Map.entry( + e.getKey(), + e.getValue().stream().mapToDouble(partitionCost::get).sum())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + var avg = + costPerConsumer.values().stream().mapToDouble(d -> d).average().getAsDouble(); + + return Math.sqrt( + costPerConsumer.values().stream() + .mapToDouble(d -> Math.pow(d - avg, 2)) + .average() + .getAsDouble()); + }; + var rejectedCombinators = new HashSet>>(); + + return () -> { + Map> result = null; + var start = System.currentTimeMillis(); + + while (System.currentTimeMillis() - start < shuffleTime) { + var combinator = generator.get(); + if (limiters.check(combinator)) { + result = combinator; + break; + } + rejectedCombinators.add(combinator); + } + + return result == null + ? rejectedCombinators.stream() + .map(c -> Map.entry(c, standardDeviation.apply(c))) + .min(Map.Entry.comparingByValue()) + .get() + .getKey() + : result; + }; + } +} diff --git a/common/src/main/java/org/astraea/common/balancer/AlgorithmConfig.java b/common/src/main/java/org/astraea/common/balancer/AlgorithmConfig.java index 00f73572ef..cbfa9e4a19 100644 --- a/common/src/main/java/org/astraea/common/balancer/AlgorithmConfig.java +++ b/common/src/main/java/org/astraea/common/balancer/AlgorithmConfig.java @@ -27,59 +27,41 @@ import org.astraea.common.cost.HasMoveCost; import org.astraea.common.metrics.ClusterBean; -/** The generic algorithm parameter for resolving the Kafka rebalance problem. */ -public interface AlgorithmConfig { - - static Builder builder() { +/** + * The generic algorithm parameter for resolving the Kafka rebalance problem. + * + * @param executionId a String indicate the name of this execution. This information is used for + * debug and logging usage. + * @param clusterCostFunction the cluster cost function for this problem. + * @param moveCostFunction the movement cost functions for this problem + * @param balancerConfig the configuration of this balancer run + * @param clusterInfo the initial cluster state of this optimization problem + * @param clusterBean the metrics of the associated cluster and optimization problem + * @param timeout the execution limit of this optimization problem + */ +public record AlgorithmConfig( + String executionId, + HasClusterCost clusterCostFunction, + HasMoveCost moveCostFunction, + Configuration balancerConfig, + ClusterInfo clusterInfo, + ClusterBean clusterBean, + Duration timeout) { + + public static Builder builder() { return new Builder(null); } - static Builder builder(AlgorithmConfig config) { + public static Builder builder(AlgorithmConfig config) { return new Builder(config); } - /** - * @return a String indicate the name of this execution. This information is used for debug and - * logging usage. - */ - String executionId(); - - /** - * @return the cluster cost function for this problem. - */ - HasClusterCost clusterCostFunction(); - - /** - * @return the movement cost functions for this problem - */ - HasMoveCost moveCostFunction(); - - /** - * @return the configuration of this balancer run - */ - Configuration balancerConfig(); - - /** - * @return the initial cluster state of this optimization problem - */ - ClusterInfo clusterInfo(); - - /** - * @return the metrics of the associated cluster and optimization problem - */ - ClusterBean clusterBean(); - - /** - * @return the execution limit of this optimization problem - */ - Duration timeout(); - - class Builder { + public static class Builder { private String executionId = "noname-" + UUID.randomUUID(); private HasClusterCost clusterCostFunction; private HasMoveCost moveCostFunction = HasMoveCost.EMPTY; - private Map balancerConfig = new HashMap<>(); + private final Map balancerConfig = new HashMap<>(); private ClusterInfo clusterInfo; private ClusterBean clusterBean = ClusterBean.EMPTY; @@ -184,44 +166,14 @@ public Builder timeout(Duration timeout) { } public AlgorithmConfig build() { - var config = new Configuration(balancerConfig); - - return new AlgorithmConfig() { - @Override - public String executionId() { - return executionId; - } - - @Override - public HasClusterCost clusterCostFunction() { - return clusterCostFunction; - } - - @Override - public HasMoveCost moveCostFunction() { - return moveCostFunction; - } - - @Override - public Configuration balancerConfig() { - return config; - } - - @Override - public ClusterInfo clusterInfo() { - return clusterInfo; - } - - @Override - public ClusterBean clusterBean() { - return clusterBean; - } - - @Override - public Duration timeout() { - return timeout; - } - }; + return new AlgorithmConfig( + executionId, + clusterCostFunction, + moveCostFunction, + new Configuration(balancerConfig), + clusterInfo, + clusterBean, + timeout); } } } diff --git a/common/src/main/java/org/astraea/common/balancer/BalancerConfigs.java b/common/src/main/java/org/astraea/common/balancer/BalancerConfigs.java index 01ebcd26ff..da74c14425 100644 --- a/common/src/main/java/org/astraea/common/balancer/BalancerConfigs.java +++ b/common/src/main/java/org/astraea/common/balancer/BalancerConfigs.java @@ -35,9 +35,53 @@ private BalancerConfigs() {} public static final String BALANCER_ALLOWED_TOPICS_REGEX = "balancer.allowed.topics.regex"; /** - * A regular expression indicates which brokers are eligible for moving loading. When specified, a - * broker with an id that doesn't match this expression cannot accept a partition from the other - * broker or move its partition to other brokers. + * This configuration indicates the balancing mode for each broker. + * + *

This configuration requires a string with a series of key-value pairs, each pair is + * separated by a comma, and the key and value are separated by a colon. + * (brokerId_A|"default"):(mode),(brokerId_B):(mode), ... The key indicates the integer id + * for a broker. And the value indicates the balancing mode for the associated broker. When the + * key is a string value "default"(without the double quotes), it indicates the + * associated balancing mode should be the default mode for the rest of the brokers that are not + * addressed in the configuration. By default, all the brokers use "balancing" mode. + * + *

Possible balancing modes

+ * + *
    + *
  • balancing: The broker will participate in the load balancing process. The + * replica assignment for this broker is eligible for changes. + *
  • demoted: The broker should become empty after the rebalance. This mode + * allows the user to clear all the loadings for certain brokers, enabling a graceful + * removal of those brokers. Note to the balancer implementation: A broker in this mode + * assumes it will be out of service after the balancing is finished. Therefore, when + * evaluating the cluster cost, the brokers to demote should be excluded. However, these + * brokers will be included in the move cost evaluation. Since these brokers are still part + * of the cluster right now, and move cost focusing on the cost associated during the + * ongoing balancing process itself. + *
  • excluded: The broker will not participate in the load balancing process. The + * replica assignment for this broker is not eligible for changes. It will neither accept + * replicas from other brokers nor reassign replicas to other brokers. + *
+ * + *

Flag Interaction:

+ * + *
    + *
  1. When this flag is used in conjunction with {@link + * BalancerConfigs#BALANCER_ALLOWED_TOPICS_REGEX}, if a demoted broker contains partition + * from those forbidden topics, an exception should be raised. + *
+ * + *

Limitation:

+ * + *
    + *
  1. Demoting a broker may be infeasible if there are not enough brokers to fit the required + * replica factor for a specific partition. This situation is more likely to occur if there + * are many excluded brokers that reject accepting new replicas. If such a case + * is detected, an exception should be raised. + *
  2. Any broker with ongoing replica-move-in, replica-move-out, or inter-folder movement + * cannot be the demoting target. An exception will be raised if any of the demoting brokers + * have such ongoing events. * + *
*/ - public static final String BALANCER_ALLOWED_BROKERS_REGEX = "balancer.allowed.brokers.regex"; + public static final String BALANCER_BROKER_BALANCING_MODE = "balancer.broker.balancing.mode"; } diff --git a/common/src/main/java/org/astraea/common/balancer/BalancerUtils.java b/common/src/main/java/org/astraea/common/balancer/BalancerUtils.java new file mode 100644 index 0000000000..d846905f0b --- /dev/null +++ b/common/src/main/java/org/astraea/common/balancer/BalancerUtils.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.balancer; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.astraea.common.EnumInfo; +import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.admin.NodeInfo; +import org.astraea.common.admin.Replica; + +public final class BalancerUtils { + + private BalancerUtils() {} + + public static Map balancingMode(ClusterInfo cluster, String config) { + var num = Pattern.compile("[0-9]+"); + + var map = + Arrays.stream(config.split(",")) + .filter(Predicate.not(String::isEmpty)) + .map(x -> x.split(":")) + .collect( + Collectors.toUnmodifiableMap( + s -> (Object) (num.matcher(s[0]).find() ? Integer.parseInt(s[0]) : s[0]), + s -> + switch (s[1]) { + case "balancing" -> BalancingModes.BALANCING; + case "demoted" -> BalancingModes.DEMOTED; + case "excluded" -> BalancingModes.EXCLUDED; + default -> throw new IllegalArgumentException( + "Unsupported balancing mode: " + s[1]); + })); + + Function mode = + (id) -> map.getOrDefault(id, map.getOrDefault("default", BalancingModes.BALANCING)); + + return cluster.brokers().stream() + .map(NodeInfo::id) + .collect(Collectors.toUnmodifiableMap(Function.identity(), mode)); + } + + /** + * Verify there is no logic conflict between {@link BalancerConfigs#BALANCER_ALLOWED_TOPICS_REGEX} + * and {@link BalancerConfigs#BALANCER_BROKER_BALANCING_MODE}. It also performs other common + * validness checks to the cluster. + */ + public static void verifyClearBrokerValidness( + ClusterInfo cluster, Predicate isDemoted, Predicate allowedTopics) { + var disallowedTopicsToClear = + cluster.topicPartitionReplicas().stream() + .filter(tpr -> isDemoted.test(tpr.brokerId())) + .filter(tpr -> !allowedTopics.test(tpr.topic())) + .collect(Collectors.toUnmodifiableSet()); + if (!disallowedTopicsToClear.isEmpty()) + throw new IllegalArgumentException( + "Attempts to clear some brokers, but some of them contain topics that forbidden from being changed due to \"" + + BalancerConfigs.BALANCER_ALLOWED_TOPICS_REGEX + + "\": " + + disallowedTopicsToClear); + + var ongoingEventReplica = + cluster.replicas().stream() + .filter(r -> isDemoted.test(r.nodeInfo().id())) + .filter(r -> r.isAdding() || r.isRemoving() || r.isFuture()) + .map(Replica::topicPartitionReplica) + .collect(Collectors.toUnmodifiableSet()); + if (!ongoingEventReplica.isEmpty()) + throw new IllegalArgumentException( + "Attempts to clear broker with ongoing migration event (adding/removing/future replica): " + + ongoingEventReplica); + } + + /** + * Move all the replicas at the demoting broker to other allowed brokers. BE CAREFUL, The + * implementation made no assumption for MoveCost or ClusterCost of the returned ClusterInfo. + * Be aware of this limitation before using it as the starting point for a solution search. Some + * balancer implementation might have trouble finding answer when starting at a state where the + * MoveCost is already violated. + */ + public static ClusterInfo clearedCluster( + ClusterInfo initial, Predicate clearBrokers, Predicate allowedBrokers) { + final var allowed = + initial.nodes().stream() + .filter(node -> allowedBrokers.test(node.id())) + .filter(node -> Predicate.not(clearBrokers).test(node.id())) + .collect(Collectors.toUnmodifiableSet()); + final var nextBroker = Stream.generate(() -> allowed).flatMap(Collection::stream).iterator(); + final var nextBrokerFolder = + initial.brokerFolders().entrySet().stream() + .collect( + Collectors.toUnmodifiableMap( + Map.Entry::getKey, + x -> Stream.generate(x::getValue).flatMap(Collection::stream).iterator())); + + var trackingReplicaList = + initial.topicPartitions().stream() + .collect( + Collectors.toUnmodifiableMap( + tp -> tp, + tp -> + initial.replicas(tp).stream() + .map(Replica::nodeInfo) + .collect(Collectors.toSet()))); + return ClusterInfo.builder(initial) + .mapLog( + replica -> { + if (!clearBrokers.test(replica.nodeInfo().id())) return replica; + var currentReplicaList = trackingReplicaList.get(replica.topicPartition()); + var broker = + IntStream.range(0, allowed.size()) + .mapToObj(i -> nextBroker.next()) + .filter(b -> !currentReplicaList.contains(b)) + .findFirst() + .orElseThrow( + () -> + new IllegalStateException( + "Unable to clear replica " + + replica.topicPartitionReplica() + + " for broker " + + replica.nodeInfo().id() + + ", the allowed destination brokers are " + + allowed.stream() + .map(NodeInfo::id) + .collect(Collectors.toUnmodifiableSet()) + + " but all of them already hosting a replica for this partition. " + + "There is no broker can adopt this replica.")); + var folder = nextBrokerFolder.get(broker.id()).next(); + + // update the tracking list. have to do this to avoid putting two replicas from the + // same tp to one broker. + currentReplicaList.remove(replica.nodeInfo()); + currentReplicaList.add(broker); + + return Replica.builder(replica).nodeInfo(broker).path(folder).build(); + }) + .build(); + } + + public enum BalancingModes implements EnumInfo { + BALANCING, + DEMOTED, + EXCLUDED; + + public static BalancingModes ofAlias(String alias) { + return EnumInfo.ignoreCaseEnum(BalancingModes.class, alias); + } + + @Override + public String alias() { + return name(); + } + + @Override + public String toString() { + return alias(); + } + } +} diff --git a/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java b/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java index 63181b8dec..67136915a3 100644 --- a/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java +++ b/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.DoubleAccumulator; import java.util.concurrent.atomic.LongAdder; import java.util.function.BiFunction; +import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.regex.Pattern; @@ -32,6 +33,7 @@ import org.astraea.common.balancer.AlgorithmConfig; import org.astraea.common.balancer.Balancer; import org.astraea.common.balancer.BalancerConfigs; +import org.astraea.common.balancer.BalancerUtils; import org.astraea.common.balancer.tweakers.ShuffleTweaker; import org.astraea.common.cost.ClusterCost; import org.astraea.common.metrics.MBeanRegister; @@ -140,26 +142,38 @@ public Optional offer(AlgorithmConfig config) { .regexString(BalancerConfigs.BALANCER_ALLOWED_TOPICS_REGEX) .map(Pattern::asMatchPredicate) .orElse((ignore) -> true); - final var allowedBrokers = - config - .balancerConfig() - .regexString(BalancerConfigs.BALANCER_ALLOWED_BROKERS_REGEX) - .map(Pattern::asMatchPredicate) - .>map( - predicate -> (brokerId) -> predicate.test(Integer.toString(brokerId))) - .orElse((ignore) -> true); + final var balancingMode = + BalancerUtils.balancingMode( + config.clusterInfo(), + config + .balancerConfig() + .string(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE) + .orElse("")); + final Predicate isBalancing = + id -> balancingMode.get(id) == BalancerUtils.BalancingModes.BALANCING; + final Predicate isDemoted = + id -> balancingMode.get(id) == BalancerUtils.BalancingModes.DEMOTED; + final var hasDemoted = + balancingMode.values().stream().anyMatch(i -> i == BalancerUtils.BalancingModes.DEMOTED); + BalancerUtils.verifyClearBrokerValidness(config.clusterInfo(), isDemoted, allowedTopics); - final var currentClusterInfo = config.clusterInfo(); + final var currentClusterInfo = + BalancerUtils.clearedCluster(config.clusterInfo(), isDemoted, isBalancing); final var clusterBean = config.clusterBean(); final var allocationTweaker = ShuffleTweaker.builder() .numberOfShuffle(() -> ThreadLocalRandom.current().nextInt(minStep, maxStep)) .allowedTopics(allowedTopics) - .allowedBrokers(allowedBrokers) + .allowedBrokers(isBalancing) .build(); - final var clusterCostFunction = config.clusterCostFunction(); final var moveCostFunction = config.moveCostFunction(); - final var initialCost = clusterCostFunction.clusterCost(currentClusterInfo, clusterBean); + final Function evaluateCost = + (cluster) -> { + final var filteredCluster = + hasDemoted ? ClusterInfo.builder(cluster).removeNodes(isDemoted).build() : cluster; + return config.clusterCostFunction().clusterCost(filteredCluster, clusterBean); + }; + final var initialCost = evaluateCost.apply(currentClusterInfo); final var loop = new AtomicInteger(iteration); final var start = System.currentTimeMillis(); @@ -183,7 +197,7 @@ public Optional offer(AlgorithmConfig config) { config.clusterInfo(), initialCost, newAllocation, - clusterCostFunction.clusterCost(newAllocation, clusterBean))) + evaluateCost.apply(newAllocation))) .filter(plan -> plan.proposalClusterCost().value() < currentCost.value()) .findFirst(); var currentCost = initialCost; @@ -212,6 +226,25 @@ public Optional offer(AlgorithmConfig config) { currentCost = currentSolution.get().proposalClusterCost(); currentAllocation = currentSolution.get().proposal(); } - return currentSolution; + return currentSolution.or( + () -> { + // With demotion, the implementation detail start search from a demoted state. It is + // possible + // that the start state is already the ideal answer. In this case, it is directly + // returned. + if (hasDemoted + && initialCost.value() == 0.0 + && !moveCostFunction + .moveCost(config.clusterInfo(), currentClusterInfo, clusterBean) + .overflow()) { + return Optional.of( + new Plan( + config.clusterInfo(), + config.clusterCostFunction().clusterCost(config.clusterInfo(), clusterBean), + currentClusterInfo, + initialCost)); + } + return Optional.empty(); + }); } } diff --git a/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java b/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java index 8ae831209b..378816217e 100644 --- a/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java +++ b/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java @@ -21,13 +21,17 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; import java.util.function.Predicate; import java.util.regex.Pattern; import org.astraea.common.Utils; +import org.astraea.common.admin.ClusterInfo; import org.astraea.common.balancer.AlgorithmConfig; import org.astraea.common.balancer.Balancer; import org.astraea.common.balancer.BalancerConfigs; +import org.astraea.common.balancer.BalancerUtils; import org.astraea.common.balancer.tweakers.ShuffleTweaker; +import org.astraea.common.cost.ClusterCost; /** This algorithm proposes rebalance plan by tweaking the log allocation once. */ public class SingleStepBalancer implements Balancer { @@ -68,27 +72,39 @@ public Optional offer(AlgorithmConfig config) { .regexString(BalancerConfigs.BALANCER_ALLOWED_TOPICS_REGEX) .map(Pattern::asMatchPredicate) .orElse((ignore) -> true); - final var allowedBrokers = - config - .balancerConfig() - .regexString(BalancerConfigs.BALANCER_ALLOWED_BROKERS_REGEX) - .map(Pattern::asMatchPredicate) - .>map( - predicate -> (brokerId) -> predicate.test(Integer.toString(brokerId))) - .orElse((ignore) -> true); + final var balancingMode = + BalancerUtils.balancingMode( + config.clusterInfo(), + config + .balancerConfig() + .string(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE) + .orElse("")); + final Predicate isBalancing = + id -> balancingMode.get(id) == BalancerUtils.BalancingModes.BALANCING; + final Predicate isDemoted = + id -> balancingMode.get(id) == BalancerUtils.BalancingModes.DEMOTED; + final var hasDemoted = + balancingMode.values().stream().anyMatch(i -> i == BalancerUtils.BalancingModes.DEMOTED); + BalancerUtils.verifyClearBrokerValidness(config.clusterInfo(), isDemoted, allowedTopics); - final var currentClusterInfo = config.clusterInfo(); + final var currentClusterInfo = + BalancerUtils.clearedCluster(config.clusterInfo(), isDemoted, isBalancing); final var clusterBean = config.clusterBean(); final var allocationTweaker = ShuffleTweaker.builder() .numberOfShuffle(() -> ThreadLocalRandom.current().nextInt(minStep, maxStep)) .allowedTopics(allowedTopics) - .allowedBrokers(allowedBrokers) + .allowedBrokers(isBalancing) .build(); - final var clusterCostFunction = config.clusterCostFunction(); final var moveCostFunction = config.moveCostFunction(); - final var currentCost = - config.clusterCostFunction().clusterCost(currentClusterInfo, clusterBean); + + final Function evaluateCost = + (cluster) -> { + final var filteredCluster = + hasDemoted ? ClusterInfo.builder(cluster).removeNodes(isDemoted).build() : cluster; + return config.clusterCostFunction().clusterCost(filteredCluster, clusterBean); + }; + final var currentCost = evaluateCost.apply(currentClusterInfo); var start = System.currentTimeMillis(); return allocationTweaker @@ -108,8 +124,28 @@ public Optional offer(AlgorithmConfig config) { config.clusterInfo(), currentCost, newAllocation, - clusterCostFunction.clusterCost(newAllocation, clusterBean))) + evaluateCost.apply(newAllocation))) .filter(plan -> plan.proposalClusterCost().value() < currentCost.value()) - .min(Comparator.comparing(plan -> plan.proposalClusterCost().value())); + .min(Comparator.comparing(plan -> plan.proposalClusterCost().value())) + .or( + () -> { + // With demotion, the implementation detail start search from a demoted state. It is + // possible + // that the start state is already the ideal answer. In this case, it is directly + // returned. + if (hasDemoted + && currentCost.value() == 0.0 + && !moveCostFunction + .moveCost(config.clusterInfo(), currentClusterInfo, clusterBean) + .overflow()) { + return Optional.of( + new Plan( + config.clusterInfo(), + config.clusterCostFunction().clusterCost(config.clusterInfo(), clusterBean), + currentClusterInfo, + currentCost)); + } + return Optional.empty(); + }); } } diff --git a/common/src/main/java/org/astraea/common/cost/utils/ClusterInfoSensor.java b/common/src/main/java/org/astraea/common/cost/utils/ClusterInfoSensor.java index 1e994b9f35..ac2db0446e 100644 --- a/common/src/main/java/org/astraea/common/cost/utils/ClusterInfoSensor.java +++ b/common/src/main/java/org/astraea/common/cost/utils/ClusterInfoSensor.java @@ -17,7 +17,6 @@ package org.astraea.common.cost.utils; import java.util.Collection; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -44,7 +43,7 @@ public List fetch(BeanObjectClient client, ClusterBean LogMetrics.Log.SIZE.fetch(client), ClusterMetrics.Partition.REPLICAS_COUNT.fetch(client)) .flatMap(Collection::stream) - .collect(Collectors.toUnmodifiableList()); + .toList(); } /** @@ -61,6 +60,22 @@ public static ClusterInfo metricViewCluster(ClusterBean clusterBean) { .filter(id -> id != -1) .map(id -> NodeInfo.of(id, "", -1)) .collect(Collectors.toUnmodifiableMap(NodeInfo::id, x -> x)); + var replicaMap = + clusterBean.brokerIds().stream() + .collect( + Collectors.toUnmodifiableMap( + broker -> broker, + broker -> + clusterBean + .brokerMetrics(broker, LogMetrics.Log.Gauge.class) + .filter(x -> LogMetrics.Log.SIZE.metricName().equals(x.metricsName())) + .filter(x -> x.partitionIndex().isPresent()) + .collect( + Collectors.toUnmodifiableMap( + x -> x.partitionIndex().orElseThrow(), + x -> x, + (a, b) -> + a.createdTimestamp() > b.createdTimestamp() ? a : b)))); var replicas = clusterBean.brokerTopics().stream() .filter(bt -> bt.broker() != -1) @@ -70,50 +85,40 @@ public static ClusterInfo metricViewCluster(ClusterBean clusterBean) { var partitions = clusterBean .brokerTopicMetrics(bt, ClusterMetrics.PartitionMetric.class) - .sorted( - Comparator.comparingLong(HasBeanObject::createdTimestamp).reversed()) + .collect( + Collectors.toUnmodifiableMap( + ClusterMetrics.PartitionMetric::topicPartition, + x -> x, + (a, b) -> a.createdTimestamp() > b.createdTimestamp() ? a : b)) + .values() + .stream() .collect( Collectors.toUnmodifiableMap( ClusterMetrics.PartitionMetric::topicPartition, m -> { var tp = m.topicPartition(); - var size = - clusterBean - .brokerMetrics(broker, LogMetrics.Log.Gauge.class) - .filter(x -> x.partition() == tp.partition()) - .filter(x -> x.topic().equals(tp.topic())) - .filter( - x -> - LogMetrics.Log.SIZE - .metricName() - .equals(x.metricsName())) - .max( - Comparator.comparingLong( - HasBeanObject::createdTimestamp)) - .orElseThrow( - () -> - new IllegalStateException( - "Partition " - + tp - + " detected, but its size metric doesn't exists. " - + "Maybe the given cluster bean is partially sampled")) - .value(); + var size = replicaMap.get(broker).get(tp); + if (size == null) + throw new IllegalStateException( + "Partition " + + tp + + " detected, but its size metric doesn't exists. " + + "Maybe the given cluster bean is partially sampled"); var build = Replica.builder() .topic(tp.topic()) .partition(tp.partition()) .nodeInfo(nodes.get(broker)) .path("") - .size(size); + .size(size.value()); var isLeader = m.value() != 0; return isLeader ? build.buildLeader() : build.buildInSyncFollower(); - }, - (latest, earlier) -> latest)); + })); return partitions.values().stream(); }) - .collect(Collectors.toUnmodifiableList()); + .toList(); var clusterId = clusterBean.all().entrySet().stream() .filter(e -> e.getKey() != -1) diff --git a/common/src/main/proto/org/astraea/common/generated/BeanObject.proto b/common/src/main/proto/org/astraea/common/generated/BeanObject.proto index c6797f2c25..a1669bb67b 100644 --- a/common/src/main/proto/org/astraea/common/generated/BeanObject.proto +++ b/common/src/main/proto/org/astraea/common/generated/BeanObject.proto @@ -2,10 +2,12 @@ syntax = "proto3"; package org.astraea.common.generated; +import "google/protobuf/timestamp.proto"; import "org/astraea/common/generated/Primitive.proto"; message BeanObject { string domain = 1; map properties = 2; map attributes = 3; + google.protobuf.Timestamp createdTimestamp = 4; } diff --git a/common/src/test/java/org/astraea/common/admin/ClusterInfoBuilderTest.java b/common/src/test/java/org/astraea/common/admin/ClusterInfoBuilderTest.java index b7d2851632..0f2e5244d3 100644 --- a/common/src/test/java/org/astraea/common/admin/ClusterInfoBuilderTest.java +++ b/common/src/test/java/org/astraea/common/admin/ClusterInfoBuilderTest.java @@ -341,4 +341,23 @@ void testFakeBrokerInteraction(int id, String host, int port) { Assertions.assertEquals(node0, node1); Assertions.assertNotEquals(node0, node2); } + + @Test + void testRemoveNodes() { + var base = ClusterInfo.builder().addNode(Set.of(1, 2, 3, 4, 5, 6, 7, 8, 9)).build(); + Assertions.assertEquals( + Set.of(1, 2, 3), + ClusterInfo.builder(base) + .removeNodes(x -> Set.of(4, 5, 6, 7, 8, 9).contains(x)) + .build() + .nodes() + .stream() + .map(NodeInfo::id) + .collect(Collectors.toSet())); + Assertions.assertEquals( + Set.of(1, 3, 5, 7, 9), + ClusterInfo.builder(base).removeNodes(x -> x % 2 == 0).build().nodes().stream() + .map(NodeInfo::id) + .collect(Collectors.toSet())); + } } diff --git a/common/src/test/java/org/astraea/common/balancer/BalancerConfigTestSuite.java b/common/src/test/java/org/astraea/common/balancer/BalancerConfigTestSuite.java index 693c2be146..e3c2e66b35 100644 --- a/common/src/test/java/org/astraea/common/balancer/BalancerConfigTestSuite.java +++ b/common/src/test/java/org/astraea/common/balancer/BalancerConfigTestSuite.java @@ -17,17 +17,23 @@ package org.astraea.common.balancer; import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import org.astraea.common.Configuration; import org.astraea.common.Utils; import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.admin.Replica; import org.astraea.common.cost.ClusterCost; import org.astraea.common.cost.HasClusterCost; +import org.astraea.common.cost.ReplicaLeaderCost; import org.astraea.common.metrics.ClusterBean; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -121,7 +127,7 @@ public void testBalancerAllowedTopicsRegex() { } @Test - public void testBalancerAllowedBrokersRegex() { + public void testBalancingMode() { final var balancer = Utils.construct(balancerClass, Configuration.EMPTY); final var cluster = cluster(10, 10, 10, (short) 5); @@ -134,7 +140,7 @@ public void testBalancerAllowedBrokersRegex() { .clusterCost(decreasingCost()) .timeout(Duration.ofSeconds(2)) .configs(customConfig.raw()) - .config(BalancerConfigs.BALANCER_ALLOWED_BROKERS_REGEX, "[0-9]*") + .config(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, "default:balancing") .build()); AssertionsHelper.assertSomeMovement(cluster, plan.orElseThrow().proposal(), testName); } @@ -148,7 +154,7 @@ public void testBalancerAllowedBrokersRegex() { .clusterCost(decreasingCost()) .timeout(Duration.ofSeconds(2)) .configs(customConfig.raw()) - .config(BalancerConfigs.BALANCER_ALLOWED_BROKERS_REGEX, "NoMatch") + .config(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, "default:excluded") .build()); // since nothing can be moved. It is ok to return no plan. if (plan.isPresent()) { @@ -160,8 +166,10 @@ public void testBalancerAllowedBrokersRegex() { { var testName = "[test some match]"; var allowedBrokers = IntStream.range(1, 6).boxed().collect(Collectors.toUnmodifiableSet()); - var rawRegex = - allowedBrokers.stream().map(Object::toString).collect(Collectors.joining("|", "(", ")")); + var config = + allowedBrokers.stream() + .map(i -> i + ":balancing") + .collect(Collectors.joining(",", "default:excluded,", "")); var plan = balancer.offer( AlgorithmConfig.builder() @@ -169,13 +177,345 @@ public void testBalancerAllowedBrokersRegex() { .clusterCost(decreasingCost()) .timeout(Duration.ofSeconds(2)) .configs(customConfig.raw()) - .config(BalancerConfigs.BALANCER_ALLOWED_BROKERS_REGEX, rawRegex) + .config(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, config) .build()); AssertionsHelper.assertOnlyAllowedBrokerMovement( cluster, plan.orElseThrow().proposal(), allowedBrokers::contains, testName); } } + @Test + public void testBalancingModeDemoted() { + final var balancer = Utils.construct(balancerClass, Configuration.EMPTY); + final var cluster = cluster(10, 30, 10, (short) 5); + + { + var testName = "[test all clear]"; + Assertions.assertThrows( + Exception.class, + () -> + balancer.offer( + AlgorithmConfig.builder() + .clusterInfo(cluster) + .clusterCost(decreasingCost()) + .timeout(Duration.ofSeconds(2)) + .configs(customConfig.raw()) + .config(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, "default:demoted") + .build()), + testName); + } + + { + var testName = "[test some clear]"; + var plan = + balancer.offer( + AlgorithmConfig.builder() + .clusterInfo(cluster) + .clusterCost(decreasingCost()) + .timeout(Duration.ofSeconds(2)) + .configs(customConfig.raw()) + .config( + BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, + "0:demoted,1:demoted,2:demoted") + .build()); + Assertions.assertTrue(plan.isPresent(), testName); + var finalCluster = plan.get().proposal(); + Assertions.assertTrue(cluster.replicas().stream().anyMatch(x -> x.nodeInfo().id() == 0)); + Assertions.assertTrue(cluster.replicas().stream().anyMatch(x -> x.nodeInfo().id() == 1)); + Assertions.assertTrue(cluster.replicas().stream().anyMatch(x -> x.nodeInfo().id() == 2)); + Assertions.assertTrue( + finalCluster.replicas().stream().noneMatch(x -> x.nodeInfo().id() == 0)); + Assertions.assertTrue( + finalCluster.replicas().stream().noneMatch(x -> x.nodeInfo().id() == 1)); + Assertions.assertTrue( + finalCluster.replicas().stream().noneMatch(x -> x.nodeInfo().id() == 2)); + AssertionsHelper.assertBrokerEmpty( + finalCluster, (x) -> Set.of(0, 1, 2).contains(x), testName); + } + + { + var testName = "[test replication factor violation]"; + // 6 brokers, clear 3 brokers, remain 3 brokers, topic with replication factor 3 can fit this + // cluster. + var noViolatedCluster = cluster(6, 10, 10, (short) 3); + Assertions.assertDoesNotThrow( + () -> { + var solution = + balancer + .offer( + AlgorithmConfig.builder() + .clusterInfo(noViolatedCluster) + .clusterCost(decreasingCost()) + .timeout(Duration.ofSeconds(2)) + .configs(customConfig.raw()) + .config( + BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, + "0:demoted,1:demoted,2:demoted") + .build()) + .orElseThrow() + .proposal(); + AssertionsHelper.assertBrokerEmpty( + solution, (x) -> Set.of(0, 1, 2).contains(x), testName); + }, + testName); + + // 5 brokers, clear 3 brokers, remain 2 brokers, topic with replication factor 3 CANNOT fit + // this cluster. + var violatedCluster = cluster(5, 10, 10, (short) 3); + Assertions.assertThrows( + Exception.class, + () -> + balancer.offer( + AlgorithmConfig.builder() + .clusterInfo(violatedCluster) + .clusterCost(decreasingCost()) + .timeout(Duration.ofSeconds(2)) + .configs(customConfig.raw()) + .config( + BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, + "0:demoted,1:demoted,2:demoted") + .build())); + } + + { + var testName = "[test if allowed topics is used, clear disallow topic will raise an error]"; + var base = + ClusterInfo.builder() + .addNode(Set.of(1, 2, 3)) + .addFolders( + Map.ofEntries( + Map.entry(1, Set.of("/folder")), + Map.entry(2, Set.of("/folder")), + Map.entry(3, Set.of("/folder")))) + .build(); + var node12 = Stream.of(1, 2).map(base::node).iterator(); + var node13 = Stream.of(1, 3).map(base::node).iterator(); + var node123 = Stream.of(1, 2, 3).map(base::node).iterator(); + var testCluster = + ClusterInfo.builder(base) + .addTopic("OK", 1, (short) 1, r -> Replica.builder(r).nodeInfo(base.node(1)).build()) + .addTopic( + "OK_SKIP", 2, (short) 1, r -> Replica.builder(r).nodeInfo(node12.next()).build()) + .addTopic( + "Replica", 1, (short) 2, r -> Replica.builder(r).nodeInfo(node13.next()).build()) + .addTopic( + "Partition", + 3, + (short) 1, + r -> Replica.builder(r).nodeInfo(node123.next()).build()) + .build(); + + Assertions.assertDoesNotThrow( + () -> + balancer.offer( + AlgorithmConfig.builder() + .clusterInfo(testCluster) + .clusterCost(decreasingCost()) + .timeout(Duration.ofSeconds(2)) + .configs(customConfig.raw()) + // allow anything other than "OK" topic + .config(BalancerConfigs.BALANCER_ALLOWED_TOPICS_REGEX, "(?!OK).*") + // clear broker 3 + .config(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, "3:demoted") + // this won't raise an error since that topic didn't locate at 3 + .build()), + testName); + Assertions.assertDoesNotThrow( + () -> + balancer.offer( + AlgorithmConfig.builder() + .clusterInfo(testCluster) + .clusterCost(decreasingCost()) + .timeout(Duration.ofSeconds(2)) + .configs(customConfig.raw()) + // allow anything other than "OK" topic + .config(BalancerConfigs.BALANCER_ALLOWED_TOPICS_REGEX, "(?!OK_SKIP).*") + // clear broker 3 + .config(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, "3:demoted") + // this won't raise an error since that topic didn't locate at 3 + .build()), + testName); + Assertions.assertThrows( + Exception.class, + () -> + balancer.offer( + AlgorithmConfig.builder() + .clusterInfo(testCluster) + .clusterCost(decreasingCost()) + .timeout(Duration.ofSeconds(2)) + .configs(customConfig.raw()) + // allow anything other than "Replica" topic + .config(BalancerConfigs.BALANCER_ALLOWED_TOPICS_REGEX, "(?!Replica).*") + // clear broker 3 + .config(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, "3:demoted") + // this will raise an error since that topic has a replica at 3 + .build()), + testName); + Assertions.assertThrows( + Exception.class, + () -> + balancer.offer( + AlgorithmConfig.builder() + .clusterInfo(testCluster) + .clusterCost(decreasingCost()) + .timeout(Duration.ofSeconds(2)) + .configs(customConfig.raw()) + // allow anything other than "Replica" topic + .config(BalancerConfigs.BALANCER_ALLOWED_TOPICS_REGEX, "(?!Partition).*") + // clear broker 3 + .config(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, "3:demoted") + // this will raise an error since that topic has a partition at 3 + .build()), + testName); + } + + { + var testName = "[test if allowed brokers is used, disallowed broker won't be altered]"; + var solution = + balancer + .offer( + AlgorithmConfig.builder() + .clusterInfo(cluster) + .clusterCost(decreasingCost()) + .timeout(Duration.ofSeconds(2)) + .configs(customConfig.raw()) + // clear broker 0 + .config( + BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, + "0:demoted," + + + // allow broker 1,2,3,4,5,6 + "1:balancing,2:balancing,3:balancing,4:balancing,5:balancing,6:balancing,default:excluded") + // this will be ok since any replica at 0 can move to 1~6 without breaking + // replica factors + .build()) + .orElseThrow() + .proposal(); + var before = cluster.topicPartitionReplicas(); + var after = solution.topicPartitionReplicas(); + var changed = + after.stream() + .filter(Predicate.not(before::contains)) + .collect(Collectors.toUnmodifiableSet()); + Assertions.assertTrue(after.stream().noneMatch(r -> r.brokerId() == 0), testName); + Assertions.assertTrue( + changed.stream().allMatch(r -> Set.of(1, 2, 3, 4, 5, 6).contains(r.brokerId())), + testName); + } + + { + var testName = + "[test if allowed brokers is used, insufficient allowed broker to fit replica factor requirement will raise an error]"; + Assertions.assertThrows( + Exception.class, + () -> + balancer.offer( + AlgorithmConfig.builder() + .clusterInfo(cluster) + .clusterCost(decreasingCost()) + .timeout(Duration.ofSeconds(2)) + .configs(customConfig.raw()) + // clear broker 0, allow broker 1 + .config( + BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, + "0:demoted,1:balancing,default:excluded") + // this will raise an error if a partition has replicas at both 0 and 1. In + // this case, there is no allowed broker to adopt replica from 0, since the + // only allowed broker already has one replica on it. we cannot assign two + // replicas to one broker. + .build()), + testName); + } + + { + var testName = "[if replica on clear broker is adding/removing/future, raise an exception]"; + var adding = + ClusterInfo.builder(cluster) + .mapLog(r -> r.nodeInfo().id() != 0 ? r : Replica.builder(r).isAdding(true).build()) + .build(); + var removing = + ClusterInfo.builder(cluster) + .mapLog(r -> r.nodeInfo().id() != 0 ? r : Replica.builder(r).isRemoving(true).build()) + .build(); + var future = + ClusterInfo.builder(cluster) + .mapLog(r -> r.nodeInfo().id() != 0 ? r : Replica.builder(r).isFuture(true).build()) + .build(); + for (var cc : List.of(adding, removing, future)) { + Assertions.assertThrows( + Exception.class, + () -> + balancer.offer( + AlgorithmConfig.builder() + .clusterInfo(cc) + .clusterCost(decreasingCost()) + .timeout(Duration.ofSeconds(1)) + .configs(customConfig.raw()) + // clear broker 0 allow broker 1,2,3,4,5,6 + .config( + BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, + "0:demoted," + + "1:balancing,2:balancing,3:balancing,4:balancing,5:balancing,6:balancing") + .build()), + testName); + } + for (var cc : List.of(adding, removing, future)) { + Assertions.assertDoesNotThrow( + () -> + balancer.offer( + AlgorithmConfig.builder() + .clusterInfo(cc) + .clusterCost(decreasingCost()) + .timeout(Duration.ofSeconds(1)) + .configs(customConfig.raw()) + // clear broker 1 allow broker 0,2,3,4,5,6,7 + .config( + BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, + "1:demoted," + + "0:balancing,2:balancing,3:balancing,4:balancing,5:balancing,6:balancing," + + "7:balancing,default:excluded") + // adding/removing/future at 0 not 1, unrelated so no error + .build()), + testName); + } + } + + { + // Some balancer implementations have such logic flaw: + // 1. The initial state[A] cannot be solution. + // 2. There are brokers that need to be demoted. + // 3. The load on those brokers been redistributed to other brokers. Creating the start + // state[B] for the solution search. + // 4. The start state[B] solution is actually the best solution. + // 5. Balancer think the start state[B] is the initial state[A]. And cannot be a solution(as + // mentioned in 1). + // 6. In fact, the start state[B] doesn't equal to the initial state[A]. Since there is a + // cleaning work performed at step 3. + // 7. Balancer cannot find any solution that is better than the start state(4) and therefore + // returns no solution. + var testName = + "[If the cluster after clear is the best solution, balancer should be able to return it]"; + var testCluster = + ClusterInfo.builder() + .addNode(Set.of(1, 2)) + .addFolders( + Map.ofEntries(Map.entry(1, Set.of("/folder")), Map.entry(2, Set.of("/folder")))) + .addTopic("topic", 100, (short) 1) + .build(); + Assertions.assertNotEquals( + Optional.empty(), + balancer.offer( + AlgorithmConfig.builder() + .clusterInfo(testCluster) + .clusterBean(ClusterBean.EMPTY) + .clusterCost(new ReplicaLeaderCost()) + .config(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, "1:demoted") + .timeout(Duration.ofSeconds(2)) + .build()), + testName); + } + } + private static ClusterInfo cluster(int nodes, int topics, int partitions, short replicas) { var builder = ClusterInfo.builder() @@ -258,5 +598,16 @@ static void assertOnlyAllowedBrokerMovement( }); }); } + + static void assertBrokerEmpty(ClusterInfo target, Predicate clearBroker, String name) { + var violated = + target + .replicaStream() + .filter(i -> clearBroker.test(i.nodeInfo().id())) + .collect(Collectors.toUnmodifiableSet()); + Assertions.assertTrue( + violated.isEmpty(), + name + ": the following replica should move to somewhere else " + violated); + } } } diff --git a/common/src/test/java/org/astraea/common/balancer/BalancerUtilsTest.java b/common/src/test/java/org/astraea/common/balancer/BalancerUtilsTest.java new file mode 100644 index 0000000000..bf27b63426 --- /dev/null +++ b/common/src/test/java/org/astraea/common/balancer/BalancerUtilsTest.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.balancer; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Stream; +import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.admin.Replica; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class BalancerUtilsTest { + + @Test + void testBalancingMode() { + var cluster = ClusterInfo.builder().addNode(Set.of(1, 2, 3, 4, 5)).build(); + + Assertions.assertThrows(Exception.class, () -> BalancerUtils.balancingMode(cluster, "bad")); + Assertions.assertThrows(Exception.class, () -> BalancerUtils.balancingMode(cluster, "bad:bad")); + Assertions.assertThrows( + Exception.class, () -> BalancerUtils.balancingMode(cluster, "bad:bad:bad")); + Assertions.assertThrows( + Exception.class, () -> BalancerUtils.balancingMode(cluster, "1:balancing,bad:bad")); + Assertions.assertThrows( + Exception.class, () -> BalancerUtils.balancingMode(cluster, "1:balancing,bad:bad:bad")); + Assertions.assertThrows( + Exception.class, + () -> BalancerUtils.balancingMode(cluster, "1:balancing,2:demoted,3:excluded,4:oops")); + Assertions.assertThrows( + Exception.class, + () -> BalancerUtils.balancingMode(cluster, "1:balancing,2:demoted,3:excluded,4:")); + Assertions.assertThrows( + Exception.class, + () -> BalancerUtils.balancingMode(cluster, "1:balancing,2:demoted,3:excluded,1:")); + Assertions.assertThrows( + Exception.class, + () -> BalancerUtils.balancingMode(cluster, "1:balancing,2:demoted,3:excluded,:")); + Assertions.assertThrows( + Exception.class, + () -> BalancerUtils.balancingMode(cluster, "1:balancing,2:demoted,3:excluded,::")); + Assertions.assertThrows(Exception.class, () -> BalancerUtils.balancingMode(cluster, "1:")); + Assertions.assertThrows( + Exception.class, () -> BalancerUtils.balancingMode(cluster, "1:balancing,1:balancing")); + + Assertions.assertDoesNotThrow( + () -> BalancerUtils.balancingMode(cluster, "reserved_usage:balancing"), + "Intentionally reserved this usage"); + + Assertions.assertEquals( + BalancerUtils.BalancingModes.BALANCING, + BalancerUtils.balancingMode(cluster, "").get(1), + "default"); + Assertions.assertEquals( + BalancerUtils.BalancingModes.DEMOTED, + BalancerUtils.balancingMode(cluster, "1:demoted").get(1), + "value"); + Assertions.assertEquals( + BalancerUtils.BalancingModes.DEMOTED, + BalancerUtils.balancingMode(cluster, "default:demoted").get(5), + "user defined default"); + Assertions.assertEquals( + BalancerUtils.BalancingModes.EXCLUDED, + BalancerUtils.balancingMode(cluster, "3:excluded,4:excluded").get(3)); + Assertions.assertEquals( + BalancerUtils.BalancingModes.EXCLUDED, + BalancerUtils.balancingMode(cluster, "3:excluded,4:excluded").get(4)); + Assertions.assertEquals( + BalancerUtils.BalancingModes.BALANCING, + BalancerUtils.balancingMode(cluster, "3:excluded,4:excluded,1:balancing").get(1)); + Assertions.assertEquals( + Set.of(1, 2, 3, 4, 5), BalancerUtils.balancingMode(cluster, "").keySet()); + } + + @Test + void testVerifyClearBrokerValidness() { + var base = + ClusterInfo.builder() + .addNode(Set.of(1, 2, 3)) + .addFolders( + Map.ofEntries( + Map.entry(1, Set.of("/folder")), + Map.entry(2, Set.of("/folder")), + Map.entry(3, Set.of("/folder")))) + .build(); + var iter = Stream.of(1, 2, 3).map(base::node).iterator(); + var cluster = + ClusterInfo.builder(base) + .addTopic("A", 1, (short) 1, r -> Replica.builder(r).nodeInfo(iter.next()).build()) + .addTopic("B", 1, (short) 1, r -> Replica.builder(r).nodeInfo(iter.next()).build()) + .addTopic("C", 1, (short) 1, r -> Replica.builder(r).nodeInfo(iter.next()).build()) + .build(); + + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + BalancerUtils.verifyClearBrokerValidness(cluster, id -> id == 1, t -> !t.equals("A"))); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + BalancerUtils.verifyClearBrokerValidness(cluster, id -> id == 2, t -> !t.equals("B"))); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + BalancerUtils.verifyClearBrokerValidness(cluster, id -> id == 3, t -> !t.equals("C"))); + Assertions.assertDoesNotThrow( + () -> BalancerUtils.verifyClearBrokerValidness(cluster, id -> id == 1, t -> t.equals("A"))); + Assertions.assertDoesNotThrow( + () -> BalancerUtils.verifyClearBrokerValidness(cluster, id -> id == 2, t -> t.equals("B"))); + Assertions.assertDoesNotThrow( + () -> BalancerUtils.verifyClearBrokerValidness(cluster, id -> id == 3, t -> t.equals("C"))); + + var hasAdding = + ClusterInfo.builder(cluster).mapLog(r -> Replica.builder(r).isAdding(true).build()).build(); + var hasRemoving = + ClusterInfo.builder(cluster) + .mapLog(r -> Replica.builder(r).isRemoving(true).build()) + .build(); + var hasFuture = + ClusterInfo.builder(cluster).mapLog(r -> Replica.builder(r).isFuture(true).build()).build(); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> BalancerUtils.verifyClearBrokerValidness(hasAdding, x -> true, x -> true)); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> BalancerUtils.verifyClearBrokerValidness(hasRemoving, x -> true, x -> true)); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> BalancerUtils.verifyClearBrokerValidness(hasFuture, x -> true, x -> true)); + Assertions.assertDoesNotThrow( + () -> BalancerUtils.verifyClearBrokerValidness(hasAdding, x -> false, x -> true)); + Assertions.assertDoesNotThrow( + () -> BalancerUtils.verifyClearBrokerValidness(hasRemoving, x -> false, x -> true)); + Assertions.assertDoesNotThrow( + () -> BalancerUtils.verifyClearBrokerValidness(hasFuture, x -> false, x -> true)); + } + + @Test + void testClearedCluster() { + var cluster = + ClusterInfo.builder() + .addNode(Set.of(1, 2)) + .addFolders(Map.of(1, Set.of("/folder"))) + .addFolders(Map.of(2, Set.of("/folder"))) + .addTopic("topic", 100, (short) 2) + .addNode(Set.of(3, 4)) + .addFolders(Map.of(3, Set.of("/folder"))) + .addFolders(Map.of(4, Set.of("/folder"))) + .build(); + Assertions.assertThrows( + Exception.class, + () -> BalancerUtils.clearedCluster(cluster, id -> id == 1 || id == 2, id -> id == 3), + "Insufficient brokers to meet replica factor requirement"); + var clearedCluster = + Assertions.assertDoesNotThrow( + () -> + BalancerUtils.clearedCluster( + cluster, id -> id == 1 || id == 2, id -> id == 3 || id == 4)); + + Assertions.assertEquals( + List.of(), clearedCluster.replicas().stream().filter(x -> x.nodeInfo().id() == 1).toList()); + Assertions.assertEquals( + List.of(), clearedCluster.replicas().stream().filter(x -> x.nodeInfo().id() == 2).toList()); + Assertions.assertNotEquals( + List.of(), clearedCluster.replicas().stream().filter(x -> x.nodeInfo().id() == 3).toList()); + Assertions.assertNotEquals( + List.of(), clearedCluster.replicas().stream().filter(x -> x.nodeInfo().id() == 4).toList()); + + var sameCluster = + Assertions.assertDoesNotThrow( + () -> BalancerUtils.clearedCluster(cluster, id -> false, id -> true)); + Assertions.assertEquals( + Set.of(), + ClusterInfo.findNonFulfilledAllocation(cluster, sameCluster), + "Nothing to clear, nothing to change"); + + var aCluster = + Assertions.assertDoesNotThrow( + () -> BalancerUtils.clearedCluster(cluster, id -> id == 1, id -> id == 3)); + Assertions.assertEquals( + 0, aCluster.replicas().stream().filter(r -> r.nodeInfo().id() == 1).count(), "Demoted"); + Assertions.assertEquals( + 100, + aCluster.replicas().stream().filter(r -> r.nodeInfo().id() == 2).count(), + "Not allowed or cleared"); + Assertions.assertEquals( + 100, + aCluster.replicas().stream().filter(r -> r.nodeInfo().id() == 3).count(), + "Accept replicas broker demoted broker"); + Assertions.assertEquals( + 0, aCluster.replicas().stream().filter(r -> r.nodeInfo().id() == 4).count(), "Not allowed"); + } +} diff --git a/common/src/test/java/org/astraea/common/serializer/BeanObjectSerializerTest.java b/common/src/test/java/org/astraea/common/serializer/BeanObjectSerializerTest.java index a486ad4ace..f9546c2828 100644 --- a/common/src/test/java/org/astraea/common/serializer/BeanObjectSerializerTest.java +++ b/common/src/test/java/org/astraea/common/serializer/BeanObjectSerializerTest.java @@ -44,7 +44,8 @@ public void testSerializationDeserialization() { true, "String", "str"); - var bean = new BeanObject(domain, properties, attributes); + var time = System.currentTimeMillis(); + var bean = new BeanObject(domain, properties, attributes, time); // Valid arguments should not throw Assertions.assertDoesNotThrow( @@ -58,6 +59,7 @@ public void testSerializationDeserialization() { Assertions.assertEquals("domain", beanObj.domainName()); Assertions.assertEquals(properties, beanObj.properties()); Assertions.assertEquals(attributes, beanObj.attributes()); + Assertions.assertEquals(time, beanObj.createdTimestamp()); } @Test diff --git a/docs/web_server/web_api_balancer_chinese.md b/docs/web_server/web_api_balancer_chinese.md index 2aa124856c..050928ca0b 100644 --- a/docs/web_server/web_api_balancer_chinese.md +++ b/docs/web_server/web_api_balancer_chinese.md @@ -26,10 +26,10 @@ POST /balancer `balancerConfig` 是 balancer 實作開放給使用者設定的內部演算法行為參數,我們有針對常用情境的 balancer config 規範出一些固定的名稱, 參數是否支援要看 Balancer 實作本身。當指定的參數不被 balancer 實作支援時,該實作可能會丟出錯誤提示使用者。 -| config key | config value | -|--------------------------------|-------------------------------------------------------------------------------------------------------------------| -| balancer.allowed.topics.regex | 一個正則表達式,表達允許進行搬移操作的 topic 名稱,當沒有指定的時候,代表沒有任何限制,所有 topic 都可以做搬移 | -| balancer.allowed.brokers.regex | 一個正則表達式,表達允許進行搬移操作的 broker 編號,當沒有指定的時候,代表沒有任何限制,所有 broker 都可以做負載更動。當有指定時,只有那些 broker 編號有匹配此正則表達式的 broker 能進行負載的更動 | +| config key | config value | +|--------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| balancer.allowed.topics.regex | 一個正則表達式,表達允許進行搬移操作的 topic 名稱,當沒有指定的時候,代表沒有任何限制,所有 topic 都可以做搬移 | +| balancer.broker.balancing.mode | 這個參數指定每個 Broker 要採取的負載平衡設定,目前擁有的模式包含 `balancing`, `demoted` 和 `excluded`。`balancing` 代表特定節點要參予負載平衡的過程,該節點身上的負載可以被變動。`demoted` 代表特定節點身上的負載必須要全部移除,這個功能能夠協助使用者優雅地下線一個節點。`excluded` 代表特定節點不能夠參予負載平衡的過程,節點不能新增或移除負載。這個參數的格式是一系列的 key/value pair 的字串,每個 pair 之間透過逗號 "," 間隔,而 key/value 之間透過冒號 ":" 間隔,如 `(brokerId_A |"default"):(mode),(brokerId_B):(mode), ...`,其中 `key` 欄位代表這個是描述某 id 節點的設定,而對應的 `value` 欄位則是該節點要套用的負載平衡模式(`balancing`, `demoted` 或 `excluded`),另外 `key` 欄位可以填寫特殊字串 `default`,代表沒有被設定所提及的節點應該使用的負載平衡模式,比如填寫 `default:excluded` 可以使設定沒有提到的節點不參予負載平衡的過程,預設的 `default` 模式是 `balancing`,意即所有設定沒有提到的節點都將參予負載平衡的過程。 | costConfig: