From 12bcb0ba8b85a173b72a416acc23c84dc9293732 Mon Sep 17 00:00:00 2001 From: Chao-Heng Lee Date: Thu, 18 May 2023 02:18:11 +0800 Subject: [PATCH] apply toList under app/src/main. (#1755) --- .../java/org/astraea/app/backup/Backup.java | 2 +- .../app/performance/ConsumerThread.java | 4 +--- .../app/performance/DataGenerator.java | 7 ++----- .../astraea/app/performance/MonkeyThread.java | 3 +-- .../astraea/app/performance/Performance.java | 19 ++++++------------- .../app/performance/ProducerThread.java | 4 +--- .../org/astraea/app/web/BalancerHandler.java | 2 +- .../java/org/astraea/app/web/BeanHandler.java | 11 ++++------- .../org/astraea/app/web/BrokerHandler.java | 4 ++-- .../java/org/astraea/app/web/Channel.java | 2 +- .../org/astraea/app/web/GroupHandler.java | 7 +++---- .../org/astraea/app/web/ProducerHandler.java | 5 ++--- .../org/astraea/app/web/QuotaHandler.java | 2 +- .../astraea/app/web/ReassignmentHandler.java | 8 +++----- .../org/astraea/app/web/TopicHandler.java | 6 +++--- .../astraea/app/web/TransactionHandler.java | 4 +--- 16 files changed, 33 insertions(+), 57 deletions(-) diff --git a/app/src/main/java/org/astraea/app/backup/Backup.java b/app/src/main/java/org/astraea/app/backup/Backup.java index 0becc9f4cd..4b92ee152c 100644 --- a/app/src/main/java/org/astraea/app/backup/Backup.java +++ b/app/src/main/java/org/astraea/app/backup/Backup.java @@ -51,7 +51,7 @@ public void restoreDistribution(ClusterInfo clusterInfo, String bootstrapServers Comparator.comparing( replica -> !replica.isLeader())) .map(replica -> replica.nodeInfo().id()) - .collect(Collectors.toUnmodifiableList())))))) + .toList()))))) .configs(topic.config().raw()) .run() .toCompletableFuture() diff --git a/app/src/main/java/org/astraea/app/performance/ConsumerThread.java b/app/src/main/java/org/astraea/app/performance/ConsumerThread.java index 3b65aec33b..2f268adb80 100644 --- a/app/src/main/java/org/astraea/app/performance/ConsumerThread.java +++ b/app/src/main/java/org/astraea/app/performance/ConsumerThread.java @@ -73,9 +73,7 @@ static List create( consumerSupplier) { if (consumers == 0) return List.of(); var closeLatches = - IntStream.range(0, consumers) - .mapToObj(ignored -> new CountDownLatch(1)) - .collect(Collectors.toUnmodifiableList()); + IntStream.range(0, consumers).mapToObj(ignored -> new CountDownLatch(1)).toList(); var executors = Executors.newFixedThreadPool(consumers); // monitor CompletableFuture.runAsync( diff --git a/app/src/main/java/org/astraea/app/performance/DataGenerator.java b/app/src/main/java/org/astraea/app/performance/DataGenerator.java index 7b73729513..1e28b650e2 100644 --- a/app/src/main/java/org/astraea/app/performance/DataGenerator.java +++ b/app/src/main/java/org/astraea/app/performance/DataGenerator.java @@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; -import java.util.stream.Collectors; import java.util.stream.LongStream; import org.astraea.common.Configuration; import org.astraea.common.DataUnit; @@ -49,15 +48,13 @@ static DataGenerator of( RecordGenerator.builder() .batchSize(argument.transactionSize) .keyTableSeed(argument.recordKeyTableSeed) - .keyRange( - LongStream.rangeClosed(0, 10000).boxed().collect(Collectors.toUnmodifiableList())) + .keyRange(LongStream.rangeClosed(0, 10000).boxed().toList()) .keyDistribution(argument.keyDistributionType.create(10000, keyDistConfig)) .keySizeDistribution( argument.keySizeDistributionType.create( (int) argument.keySize.bytes(), keySizeDistConfig)) .valueTableSeed(argument.recordValueTableSeed) - .valueRange( - LongStream.rangeClosed(0, 10000).boxed().collect(Collectors.toUnmodifiableList())) + .valueRange(LongStream.rangeClosed(0, 10000).boxed().toList()) .valueDistribution(argument.valueDistributionType.create(10000, valueDistConfig)) .valueSizeDistribution( argument.valueDistributionType.create( diff --git a/app/src/main/java/org/astraea/app/performance/MonkeyThread.java b/app/src/main/java/org/astraea/app/performance/MonkeyThread.java index d7908ec6e7..5a7d573615 100644 --- a/app/src/main/java/org/astraea/app/performance/MonkeyThread.java +++ b/app/src/main/java/org/astraea/app/performance/MonkeyThread.java @@ -22,7 +22,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; import org.astraea.common.Utils; import org.astraea.common.consumer.Consumer; import org.astraea.common.consumer.ConsumerConfigs; @@ -52,7 +51,7 @@ static List play(List consumerThreads, Performance return unsubscribeMonkey(consumerThreads, entry.getValue()); } }) - .collect(Collectors.toUnmodifiableList()); + .toList(); } private static MonkeyThread killMonkey(List consumerThreads, Duration frequency) { diff --git a/app/src/main/java/org/astraea/app/performance/Performance.java b/app/src/main/java/org/astraea/app/performance/Performance.java index 07067472e0..f3a8b3f6d1 100644 --- a/app/src/main/java/org/astraea/app/performance/Performance.java +++ b/app/src/main/java/org/astraea/app/performance/Performance.java @@ -76,7 +76,7 @@ public static List execute(final Argument param) { var blockingQueues = IntStream.range(0, param.producers) .mapToObj(i -> new ArrayBlockingQueue>>(3000)) - .collect(Collectors.toUnmodifiableList()); + .toList(); // ensure topics are existent System.out.println("checking topics: " + String.join(",", param.topics)); param.checkTopics(); @@ -123,10 +123,7 @@ public static List execute(final Argument param) { var current = Report.recordsConsumedTotal(); if (blockingQueues.stream().allMatch(Collection::isEmpty)) { - var unfinishedProducers = - producerThreads.stream() - .filter(p -> !p.closed()) - .collect(Collectors.toUnmodifiableList()); + var unfinishedProducers = producerThreads.stream().filter(p -> !p.closed()).toList(); unfinishedProducers.forEach(AbstractThread::close); } @@ -388,7 +385,7 @@ else if (specifiedByBroker) { .filter(replica -> specifyBrokers.contains(replica.nodeInfo().id())) .map(replica -> TopicPartition.of(replica.topic(), replica.partition())) .distinct() - .collect(Collectors.toUnmodifiableList()); + .toList(); if (selections.isEmpty()) throw new IllegalArgumentException( "No partition match the specify.brokers requirement"); @@ -426,8 +423,7 @@ else if (specifiedByBroker) { "The following topic/partitions are nonexistent in the cluster: " + notExist); } - final var selection = - specifyPartitions.stream().distinct().collect(Collectors.toUnmodifiableList()); + final var selection = specifyPartitions.stream().distinct().toList(); return () -> selection.get(ThreadLocalRandom.current().nextInt(selection.size())); } else if (throttle) { // TODO: The functions of throttle and select partitioner should not conflict with each @@ -444,15 +440,12 @@ else if (specifiedByBroker) { .replicaStream() .map(Replica::topicPartition) .distinct() - .collect(Collectors.toUnmodifiableList()); + .toList(); return () -> selection.get(ThreadLocalRandom.current().nextInt(selection.size())); } } else { final var selection = - topics.stream() - .map(topic -> TopicPartition.of(topic, -1)) - .distinct() - .collect(Collectors.toUnmodifiableList()); + topics.stream().map(topic -> TopicPartition.of(topic, -1)).distinct().toList(); return () -> selection.get(ThreadLocalRandom.current().nextInt(selection.size())); } } diff --git a/app/src/main/java/org/astraea/app/performance/ProducerThread.java b/app/src/main/java/org/astraea/app/performance/ProducerThread.java index 1218c99055..777e50aace 100644 --- a/app/src/main/java/org/astraea/app/performance/ProducerThread.java +++ b/app/src/main/java/org/astraea/app/performance/ProducerThread.java @@ -51,9 +51,7 @@ static List create( var producers = queues.size(); if (producers <= 0) return List.of(); var closeLatches = - IntStream.range(0, producers) - .mapToObj(ignored -> new CountDownLatch(1)) - .collect(Collectors.toUnmodifiableList()); + IntStream.range(0, producers).mapToObj(ignored -> new CountDownLatch(1)).toList(); var executors = Executors.newFixedThreadPool(producers); // monitor CompletableFuture.runAsync( diff --git a/app/src/main/java/org/astraea/app/web/BalancerHandler.java b/app/src/main/java/org/astraea/app/web/BalancerHandler.java index 9fbd533f36..ae12056b8e 100644 --- a/app/src/main/java/org/astraea/app/web/BalancerHandler.java +++ b/app/src/main/java/org/astraea/app/web/BalancerHandler.java @@ -179,7 +179,7 @@ private PlanExecutionProgress progress(String taskId) { tp -> Change.from( contextCluster.replicas(tp), solution.proposal().replicas(tp))) - .collect(Collectors.toUnmodifiableList()); + .toList(); var report = (Supplier) () -> diff --git a/app/src/main/java/org/astraea/app/web/BeanHandler.java b/app/src/main/java/org/astraea/app/web/BeanHandler.java index ed88814c60..473640c648 100644 --- a/app/src/main/java/org/astraea/app/web/BeanHandler.java +++ b/app/src/main/java/org/astraea/app/web/BeanHandler.java @@ -19,7 +19,6 @@ import java.util.List; import java.util.concurrent.CompletionStage; import java.util.function.Function; -import java.util.stream.Collectors; import org.astraea.common.admin.Admin; import org.astraea.common.metrics.BeanObject; import org.astraea.common.metrics.BeanQuery; @@ -48,12 +47,10 @@ public CompletionStage get(Channel channel) { try (var client = JndiClient.of(b.host(), jmxPorts.apply(b.id()))) { return new NodeBean( b.host(), - client.beans(builder.build()).stream() - .map(Bean::new) - .collect(Collectors.toUnmodifiableList())); + client.beans(builder.build()).stream().map(Bean::new).toList()); } }) - .collect(Collectors.toUnmodifiableList()))); + .toList())); } static class Property implements Response { @@ -86,11 +83,11 @@ static class Bean implements Response { this.properties = obj.properties().entrySet().stream() .map(e -> new Property(e.getKey(), e.getValue())) - .collect(Collectors.toUnmodifiableList()); + .toList(); this.attributes = obj.attributes().entrySet().stream() .map(e -> new Attribute(e.getKey(), e.getValue().toString())) - .collect(Collectors.toUnmodifiableList()); + .toList(); } } diff --git a/app/src/main/java/org/astraea/app/web/BrokerHandler.java b/app/src/main/java/org/astraea/app/web/BrokerHandler.java index af92a5b7b6..849fb4d333 100644 --- a/app/src/main/java/org/astraea/app/web/BrokerHandler.java +++ b/app/src/main/java/org/astraea/app/web/BrokerHandler.java @@ -64,7 +64,7 @@ public CompletionStage get(Channel channel) { brokers.stream() .filter(b -> ids.contains(b.id())) .map(Broker::new) - .collect(Collectors.toList()))) + .toList())) .thenApply( brokers -> { if (brokers.isEmpty()) throw new NoSuchElementException("no brokers are found"); @@ -96,7 +96,7 @@ static class Broker implements Response { .entrySet() .stream() .map(e -> new Topic(e.getKey(), e.getValue().size())) - .collect(Collectors.toUnmodifiableList()); + .toList(); this.configs = broker.config().raw(); } } diff --git a/app/src/main/java/org/astraea/app/web/Channel.java b/app/src/main/java/org/astraea/app/web/Channel.java index e772b303ab..c0b071fd40 100644 --- a/app/src/main/java/org/astraea/app/web/Channel.java +++ b/app/src/main/java/org/astraea/app/web/Channel.java @@ -159,7 +159,7 @@ static Channel of(HttpExchange exchange) { Arrays.stream(uri.getPath().split("/")) .map(String::trim) .filter(s -> !s.isEmpty()) - .collect(Collectors.toUnmodifiableList()); + .toList(); // form: /resource/target if (allPaths.size() == 1) return Optional.empty(); else if (allPaths.size() == 2) return Optional.of(allPaths.get(1)); diff --git a/app/src/main/java/org/astraea/app/web/GroupHandler.java b/app/src/main/java/org/astraea/app/web/GroupHandler.java index acbe3ec7cb..c5ec35757f 100644 --- a/app/src/main/java/org/astraea/app/web/GroupHandler.java +++ b/app/src/main/java/org/astraea/app/web/GroupHandler.java @@ -144,10 +144,9 @@ public CompletionStage get(Channel channel) { empty()) .filter(Optional::isPresent) .map(Optional::get) - .collect( - Collectors.toUnmodifiableList()))) - .collect(Collectors.toUnmodifiableList()))) - .collect(Collectors.toUnmodifiableList()))) + .toList())) + .toList())) + .toList())) .thenApply( groups -> { if (channel.target().isPresent() && groups.size() == 1) return groups.get(0); diff --git a/app/src/main/java/org/astraea/app/web/ProducerHandler.java b/app/src/main/java/org/astraea/app/web/ProducerHandler.java index ab6620418c..0419bbb1c5 100644 --- a/app/src/main/java/org/astraea/app/web/ProducerHandler.java +++ b/app/src/main/java/org/astraea/app/web/ProducerHandler.java @@ -60,7 +60,7 @@ public CompletionStage get(Channel channel) { .entrySet() .stream() .map(e -> new Partition(e.getKey(), e.getValue())) - .collect(Collectors.toUnmodifiableList()))); + .toList())); } static class ProducerState implements Response { @@ -88,8 +88,7 @@ static class Partition implements Response { Collection states) { this.topic = tp.topic(); this.partition = tp.partition(); - this.states = - states.stream().map(ProducerState::new).collect(Collectors.toUnmodifiableList()); + this.states = states.stream().map(ProducerState::new).toList(); } } diff --git a/app/src/main/java/org/astraea/app/web/QuotaHandler.java b/app/src/main/java/org/astraea/app/web/QuotaHandler.java index 3c292d492c..828dda20d0 100644 --- a/app/src/main/java/org/astraea/app/web/QuotaHandler.java +++ b/app/src/main/java/org/astraea/app/web/QuotaHandler.java @@ -167,7 +167,7 @@ static class Quotas implements Response { final List quotas; Quotas(Collection quotas) { - this.quotas = quotas.stream().map(Quota::new).collect(Collectors.toUnmodifiableList()); + this.quotas = quotas.stream().map(Quota::new).toList(); } } diff --git a/app/src/main/java/org/astraea/app/web/ReassignmentHandler.java b/app/src/main/java/org/astraea/app/web/ReassignmentHandler.java index 05312ee909..fee8c4823e 100644 --- a/app/src/main/java/org/astraea/app/web/ReassignmentHandler.java +++ b/app/src/main/java/org/astraea/app/web/ReassignmentHandler.java @@ -90,9 +90,7 @@ public CompletionStage post(Channel channel) { if (excludedBroker.isEmpty()) return CompletableFuture.completedFuture(Response.BAD_REQUEST); var availableBrokers = - brokers.stream() - .filter(b -> b.id() != exclude) - .collect(Collectors.toList()); + brokers.stream().filter(b -> b.id() != exclude).toList(); var partitions = excludedBroker.get().topicPartitions().stream() .filter( @@ -131,7 +129,7 @@ public CompletionStage post(Channel channel) { Stream.of(process2Folders, process2Nodes, processExclude) .flatMap(Function.identity()) .map(CompletionStage::toCompletableFuture) - .collect(Collectors.toUnmodifiableList())) + .toList()) .thenApply( rs -> { if (!rs.isEmpty() && rs.stream().allMatch(r -> r == Response.ACCEPT)) @@ -161,7 +159,7 @@ public CompletionStage get(Channel channel) { r -> new AddingReplica( r, leaderSizes.getOrDefault(r.topicPartition(), 0L))) - .collect(Collectors.toUnmodifiableList())); + .toList()); }); } diff --git a/app/src/main/java/org/astraea/app/web/TopicHandler.java b/app/src/main/java/org/astraea/app/web/TopicHandler.java index d4320a3033..34243bcb17 100644 --- a/app/src/main/java/org/astraea/app/web/TopicHandler.java +++ b/app/src/main/java/org/astraea/app/web/TopicHandler.java @@ -116,7 +116,7 @@ private CompletionStage get( .filter(replica -> replica.topic().equals(p.topic())) .filter(replica -> replica.partition() == p.partition()) .map(Replica::new) - .collect(Collectors.toUnmodifiableList())), + .toList()), Collectors.toList()))); // topic name -> group ids var gs = @@ -149,7 +149,7 @@ private CompletionStage get( gs.getOrDefault(topic.name(), Set.of()), ps.get(topic.name()), topic.config().raw())) - .collect(Collectors.toUnmodifiableList())); + .toList()); }); } @@ -197,7 +197,7 @@ public CompletionStage post(Channel channel) { new Topics( topicNames.stream() .map(t -> new TopicInfo(t, Set.of(), List.of(), Map.of())) - .collect(Collectors.toUnmodifiableList()))); + .toList())); } @Override diff --git a/app/src/main/java/org/astraea/app/web/TransactionHandler.java b/app/src/main/java/org/astraea/app/web/TransactionHandler.java index 837ea0c62b..ec45a81fca 100644 --- a/app/src/main/java/org/astraea/app/web/TransactionHandler.java +++ b/app/src/main/java/org/astraea/app/web/TransactionHandler.java @@ -49,9 +49,7 @@ public CompletionStage get(Channel channel) { .thenCompose(admin::transactions) .thenApply( transactions -> - transactions.stream() - .map(t -> new Transaction(t.transactionId(), t)) - .collect(Collectors.toUnmodifiableList())) + transactions.stream().map(t -> new Transaction(t.transactionId(), t)).toList()) .thenApply( transactions -> { if (channel.target().isPresent() && transactions.size() == 1)