Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/skiptests/astraea into addP…
Browse files Browse the repository at this point in the history
…artitionMigrateTimeCost
  • Loading branch information
qoo332001 committed May 22, 2023
2 parents e3dd72e + 9613d6b commit 38031ee
Show file tree
Hide file tree
Showing 37 changed files with 1,378 additions and 238 deletions.
4 changes: 2 additions & 2 deletions app/src/main/java/org/astraea/app/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ static void execute(Map<String, Class<?>> mains, List<String> args) throws Throw
method.invoke(null, (Object) args.subList(1, args.size()).toArray(String[]::new));
} catch (InvocationTargetException targetException) {
// Print out ParameterException, don't throw.
if (targetException.getTargetException() instanceof ParameterException) {
System.out.println(targetException.getTargetException().getMessage());
if (targetException.getTargetException() instanceof ParameterException exception) {
System.out.println(exception.getMessage());
} else {
throw targetException.getTargetException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
package org.astraea.app.argument;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class PositiveIntegerListField extends PositiveNumberListField<Integer> {
@Override
public List<Integer> convert(String value) {
return Stream.of(value.split(SEPARATOR)).map(Integer::valueOf).collect(Collectors.toList());
return Stream.of(value.split(SEPARATOR)).map(Integer::valueOf).toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
package org.astraea.app.argument;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class PositiveShortListField extends PositiveNumberListField<Short> {
@Override
public List<Short> convert(String value) {
return Stream.of(value.split(SEPARATOR)).map(Short::valueOf).collect(Collectors.toList());
return Stream.of(value.split(SEPARATOR)).map(Short::valueOf).toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
package org.astraea.app.argument;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class StringListField extends ListField<String> {
@Override
public List<String> convert(String value) {
return Stream.of(value.split(SEPARATOR)).collect(Collectors.toList());
return Stream.of(value.split(SEPARATOR)).toList();
}
}
5 changes: 2 additions & 3 deletions app/src/main/java/org/astraea/app/web/BalancerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.astraea.common.Configuration;
import org.astraea.common.Utils;
import org.astraea.common.admin.Admin;
Expand Down Expand Up @@ -284,11 +283,11 @@ static Change from(Collection<Replica> before, Collection<Replica> after) {
before.stream()
.sorted(Comparator.comparing(Replica::isPreferredLeader).reversed())
.map(r -> new Placement(r, Optional.of(r.size())))
.collect(Collectors.toList()),
.toList(),
after.stream()
.sorted(Comparator.comparing(Replica::isPreferredLeader).reversed())
.map(r -> new Placement(r, Optional.empty()))
.collect(Collectors.toList()));
.toList());
}

Change(String topic, int partition, List<Placement> before, List<Placement> after) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public CompletionStage<Response> post(Channel channel) {
.filter(
b -> b.topicPartitions().contains(tp))
.map(NodeInfo::id)
.collect(Collectors.toList());
.toList();
if (!ids.isEmpty()) return ids;
return List.of(
availableBrokers
Expand Down
16 changes: 6 additions & 10 deletions app/src/main/java/org/astraea/app/web/RecordHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.astraea.app.web;

import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

import java.time.Duration;
import java.util.Base64;
Expand Down Expand Up @@ -163,8 +162,7 @@ public CompletionStage<Response> get(Channel channel) {
// visible for testing
GetResponse get(Consumer<byte[], byte[]> consumer, int limit, Duration timeout) {
try {
return new GetResponse(
consumer, consumer.poll(timeout).stream().map(Record::new).collect(toList()));
return new GetResponse(consumer, consumer.poll(timeout).stream().map(Record::new).toList());
} catch (Exception e) {
consumer.close();
throw e;
Expand All @@ -190,9 +188,7 @@ public CompletionStage<Response> post(Channel channel) {
() -> {
try {
return producer.send(
records.stream()
.map(record -> createRecord(producer, record))
.collect(toList()));
records.stream().map(record -> createRecord(producer, record)).toList());
} finally {
if (producer.transactional()) {
producer.close();
Expand All @@ -214,7 +210,7 @@ public CompletionStage<Response> post(Channel channel) {
return Response.for404("missing result");
}))
.map(CompletionStage::toCompletableFuture)
.collect(toList())));
.toList()));

if (postRequest.async()) return CompletableFuture.completedFuture(Response.ACCEPT);
return CompletableFuture.completedFuture(
Expand Down Expand Up @@ -410,8 +406,8 @@ public String json() {
@Override
public void onComplete(Throwable error) {
try {
if (error == null && consumer instanceof SubscribedConsumer) {
((SubscribedConsumer<byte[], byte[]>) consumer).commitOffsets(Duration.ofSeconds(5));
if (error == null && consumer instanceof SubscribedConsumer subscribedConsumer) {
subscribedConsumer.commitOffsets(Duration.ofSeconds(5));
}
} finally {
consumer.close();
Expand All @@ -438,7 +434,7 @@ static class Record {
timestamp = record.timestamp();
serializedKeySize = record.serializedKeySize();
serializedValueSize = record.serializedValueSize();
headers = record.headers().stream().map(Header::new).collect(toList());
headers = record.headers().stream().map(Header::new).toList();
key = record.key();
value = record.value();
leaderEpoch = record.leaderEpoch().orElse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ public CompletionStage<Result> apply(Admin admin) {
admin.waitPartitionLeaderSynced(
Map.of(topicName, partitions), Duration.ofSeconds(4)))
.thenCompose(ignored -> admin.brokers())
.thenApply(
brokers -> brokers.stream().map(NodeInfo::id).sorted().collect(Collectors.toList()))
.thenApply(brokers -> brokers.stream().map(NodeInfo::id).sorted().toList())
.thenCompose(
brokerIds -> {
var distribution =
Expand Down
2 changes: 1 addition & 1 deletion app/src/main/java/org/astraea/app/web/TopicHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public CompletionStage<Topics> post(Channel channel) {
.thenApply(ignored -> null)
.toCompletableFuture();
})
.collect(Collectors.toList()))
.toList())
.thenCompose(ignored -> get(topicNames, null, id -> true))
.exceptionally(
ignored ->
Expand Down
6 changes: 2 additions & 4 deletions app/src/test/java/org/astraea/app/EnumInfoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.astraea.app;

import java.util.Arrays;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.astraea.common.EnumInfo;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -104,14 +103,13 @@ void testProductionClass() {
Assertions.assertTrue(productionClasses.size() > 100);
Assertions.assertTrue(
productionClasses.stream().allMatch(x -> x.getPackageName().startsWith("org.astraea")));
System.out.println(
productionClasses.stream().filter(Class::isEnum).collect(Collectors.toList()));
System.out.println(productionClasses.stream().filter(Class::isEnum).toList());
}

@Test
void testEnumClassProvider() {
var enumClassProvider = new EnumClassProvider();
var enumCls = enumClassProvider.provideArguments(null).collect(Collectors.toList());
var enumCls = enumClassProvider.provideArguments(null).toList();
Assertions.assertTrue(enumCls.size() > 0);
Assertions.assertTrue(enumCls.stream().map(x -> (Class<?>) x.get()[0]).allMatch(Class::isEnum));
}
Expand Down
4 changes: 2 additions & 2 deletions app/src/test/java/org/astraea/app/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ public static List<Class<?>> getProductionClass() {
FileUtils.listFiles(mainDir.toFile(), new String[] {"class"}, true).stream()
.map(File::toPath)
.map(mainDir::relativize)
.collect(Collectors.toList());
.toList();

var classNames =
dirFiles.stream()
.map(Path::toString)
.map(FilenameUtils::removeExtension)
.map(x -> x.replace(File.separatorChar, '.'))
.collect(Collectors.toList());
.toList();

return classNames.stream()
.map(x -> Utils.packException(() -> Class.forName(x)))
Expand Down
39 changes: 14 additions & 25 deletions app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,7 @@ private static Set<String> createAndProduceTopic(
.join();
if (skewed) {
Utils.sleep(Duration.ofSeconds(1));
var placement =
service.dataFolders().keySet().stream()
.limit(replicas)
.collect(Collectors.toUnmodifiableList());
var placement = service.dataFolders().keySet().stream().limit(replicas).toList();
admin
.moveToBrokers(
admin.topicPartitions(Set.of(topic)).toCompletableFuture().join().stream()
Expand Down Expand Up @@ -227,27 +224,19 @@ private static Set<String> createAndProduceTopic(
void testBestPlan() {
try (var admin = Admin.of(SERVICE.bootstrapServers())) {
var currentClusterInfo =
ClusterInfo.of(
"fake",
List.of(NodeInfo.of(10, "host", 22), NodeInfo.of(11, "host", 22)),
Map.of(),
List.of(
Replica.builder()
.topic("topic")
.partition(0)
.nodeInfo(NodeInfo.of(10, "host", 22))
.lag(0)
.size(100)
.isLeader(true)
.isSync(true)
.isFuture(false)
.isOffline(false)
.isPreferredLeader(true)
.path("/tmp/aa")
.build()));
ClusterInfo.builder()
.addNode(Set.of(1, 2))
.addFolders(
Map.ofEntries(Map.entry(1, Set.of("/folder")), Map.entry(2, Set.of("/folder"))))
.addTopic("topic", 1, (short) 1)
.build();

HasClusterCost clusterCostFunction =
(clusterInfo, clusterBean) -> () -> clusterInfo == currentClusterInfo ? 100D : 10D;
(clusterInfo, clusterBean) ->
() ->
ClusterInfo.findNonFulfilledAllocation(currentClusterInfo, clusterInfo).isEmpty()
? 100D
: 10D;
HasMoveCost moveCostFunction = HasMoveCost.EMPTY;
HasMoveCost failMoveCostFunction = (before, after, clusterBean) -> () -> true;

Expand Down Expand Up @@ -955,13 +944,13 @@ void testChangeOrder() {
.mapToObj(partition -> Map.entry(ThreadLocalRandom.current().nextInt(), partition))
.sorted(Map.Entry.comparingByKey())
.map(Map.Entry::getValue)
.collect(Collectors.toUnmodifiableList());
.toList();
var destPlacement =
IntStream.range(0, 10)
.mapToObj(partition -> Map.entry(ThreadLocalRandom.current().nextInt(), partition))
.sorted(Map.Entry.comparingByKey())
.map(Map.Entry::getValue)
.collect(Collectors.toUnmodifiableList());
.toList();
var base =
ClusterInfo.builder()
.addNode(Set.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))
Expand Down
4 changes: 1 addition & 3 deletions app/src/test/java/org/astraea/app/web/GroupHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.astraea.common.Utils;
import org.astraea.common.admin.Admin;
Expand Down Expand Up @@ -237,8 +236,7 @@ void testDeleteGroup() {
try (var admin = Admin.of(SERVICE.bootstrapServers())) {
var handler = new GroupHandler(admin);

var groupIds =
IntStream.range(0, 3).mapToObj(x -> Utils.randomString(10)).collect(Collectors.toList());
var groupIds = IntStream.range(0, 3).mapToObj(x -> Utils.randomString(10)).toList();
groupIds.forEach(
groupId -> {
try (var consumer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,7 @@ void testListProducers() {
handler.get(Channel.EMPTY).toCompletableFuture().join());
Assertions.assertNotEquals(0, result.partitions.size());

var partitions =
result.partitions.stream()
.filter(t -> t.topic.equals(topicName))
.collect(Collectors.toUnmodifiableList());
var partitions = result.partitions.stream().filter(t -> t.topic.equals(topicName)).toList();
Assertions.assertEquals(1, partitions.size());
Assertions.assertEquals(topicName, partitions.iterator().next().topic);
Assertions.assertEquals(0, partitions.iterator().next().partition);
Expand Down
13 changes: 5 additions & 8 deletions app/src/test/java/org/astraea/app/web/RecordHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.astraea.app.web.RecordHandler.Metadata;
Expand Down Expand Up @@ -150,8 +149,7 @@ void testPostRawString() {
.keyDeserializer(Deserializer.STRING)
.valueDeserializer(Deserializer.STRING)
.build()) {
var records =
consumer.poll(Duration.ofSeconds(5)).stream().collect(Collectors.toUnmodifiableList());
var records = consumer.poll(Duration.ofSeconds(5)).stream().toList();
Assertions.assertEquals(1, records.size());
Assertions.assertEquals(0, records.get(0).partition());
Assertions.assertEquals("abc", records.get(0).key());
Expand Down Expand Up @@ -209,8 +207,7 @@ void testPost(boolean isTransaction) {
.keyDeserializer(Deserializer.STRING)
.valueDeserializer(Deserializer.INTEGER)
.build()) {
var records =
consumer.poll(Duration.ofSeconds(10)).stream().collect(Collectors.toUnmodifiableList());
var records = consumer.poll(Duration.ofSeconds(10)).stream().toList();
Assertions.assertEquals(2, records.size());

var record = records.get(0);
Expand Down Expand Up @@ -820,7 +817,7 @@ void testDelete() {
var records =
Stream.of(0, 0, 1, 1, 1, 2, 2, 2, 2)
.map(x -> Record.builder().topic(topicName).partition(x).value(new byte[100]).build())
.collect(Collectors.toList());
.toList();
producer.send(records);
producer.flush();

Expand Down Expand Up @@ -904,7 +901,7 @@ void testDeleteOffset() {
var records =
Stream.of(0, 0, 1, 1, 1, 2, 2, 2, 2)
.map(x -> Record.builder().topic(topicName).partition(x).value(new byte[100]).build())
.collect(Collectors.toList());
.toList();
producer.send(records);
producer.flush();

Expand Down Expand Up @@ -961,7 +958,7 @@ void testDeletePartition() {
var records =
Stream.of(0, 0, 1, 1, 1, 2, 2, 2, 2)
.map(x -> Record.builder().topic(topicName).partition(x).value(new byte[100]).build())
.collect(Collectors.toList());
.toList();
producer.send(records);
producer.flush();

Expand Down
3 changes: 1 addition & 2 deletions app/src/test/java/org/astraea/app/web/RequestTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.astraea.app.TestUtils;
import org.astraea.app.web.RecordHandler.PostRecord;
Expand Down Expand Up @@ -60,7 +59,7 @@ private static List<Class<?>> requestClasses() {
return TestUtils.getProductionClass().stream()
.filter(Request.class::isAssignableFrom)
.filter(c -> !c.isInterface())
.collect(Collectors.toList());
.toList();
}

public static class RequestClassProvider implements ArgumentsProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,7 @@ void testThrottleSomeLogs() {
handler.get(Channel.EMPTY).toCompletableFuture().join());

var topic =
throttleSetting.topics.stream()
.filter(t -> t.name.get().equals(topicName))
.collect(Collectors.toList());
throttleSetting.topics.stream().filter(t -> t.name.get().equals(topicName)).toList();
Assertions.assertEquals(2, topic.size());

var leader =
Expand Down Expand Up @@ -228,9 +226,7 @@ void testThrottleEveryLog() {
ThrottleHandler.ThrottleSetting.class,
handler.get(Channel.EMPTY).toCompletableFuture().join());
var topic =
throttleSetting.topics.stream()
.filter(t -> t.name.get().equals(topicName))
.collect(Collectors.toList());
throttleSetting.topics.stream().filter(t -> t.name.get().equals(topicName)).toList();
Assertions.assertEquals(9, topic.size());

IntStream.range(0, 3)
Expand Down
Loading

0 comments on commit 38031ee

Please sign in to comment.