Skip to content

Commit

Permalink
apply toList under app/src/main. (#1755)
Browse files Browse the repository at this point in the history
  • Loading branch information
chaohengstudent authored May 17, 2023
1 parent 94aa36a commit 12bcb0b
Show file tree
Hide file tree
Showing 16 changed files with 33 additions and 57 deletions.
2 changes: 1 addition & 1 deletion app/src/main/java/org/astraea/app/backup/Backup.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void restoreDistribution(ClusterInfo clusterInfo, String bootstrapServers
Comparator.comparing(
replica -> !replica.isLeader()))
.map(replica -> replica.nodeInfo().id())
.collect(Collectors.toUnmodifiableList()))))))
.toList())))))
.configs(topic.config().raw())
.run()
.toCompletableFuture()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@ static List<ConsumerThread> create(
consumerSupplier) {
if (consumers == 0) return List.of();
var closeLatches =
IntStream.range(0, consumers)
.mapToObj(ignored -> new CountDownLatch(1))
.collect(Collectors.toUnmodifiableList());
IntStream.range(0, consumers).mapToObj(ignored -> new CountDownLatch(1)).toList();
var executors = Executors.newFixedThreadPool(consumers);
// monitor
CompletableFuture.runAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.astraea.common.Configuration;
import org.astraea.common.DataUnit;
Expand All @@ -49,15 +48,13 @@ static DataGenerator of(
RecordGenerator.builder()
.batchSize(argument.transactionSize)
.keyTableSeed(argument.recordKeyTableSeed)
.keyRange(
LongStream.rangeClosed(0, 10000).boxed().collect(Collectors.toUnmodifiableList()))
.keyRange(LongStream.rangeClosed(0, 10000).boxed().toList())
.keyDistribution(argument.keyDistributionType.create(10000, keyDistConfig))
.keySizeDistribution(
argument.keySizeDistributionType.create(
(int) argument.keySize.bytes(), keySizeDistConfig))
.valueTableSeed(argument.recordValueTableSeed)
.valueRange(
LongStream.rangeClosed(0, 10000).boxed().collect(Collectors.toUnmodifiableList()))
.valueRange(LongStream.rangeClosed(0, 10000).boxed().toList())
.valueDistribution(argument.valueDistributionType.create(10000, valueDistConfig))
.valueSizeDistribution(
argument.valueDistributionType.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.astraea.common.Utils;
import org.astraea.common.consumer.Consumer;
import org.astraea.common.consumer.ConsumerConfigs;
Expand Down Expand Up @@ -52,7 +51,7 @@ static List<MonkeyThread> play(List<ConsumerThread> consumerThreads, Performance
return unsubscribeMonkey(consumerThreads, entry.getValue());
}
})
.collect(Collectors.toUnmodifiableList());
.toList();
}

private static MonkeyThread killMonkey(List<ConsumerThread> consumerThreads, Duration frequency) {
Expand Down
19 changes: 6 additions & 13 deletions app/src/main/java/org/astraea/app/performance/Performance.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static List<String> execute(final Argument param) {
var blockingQueues =
IntStream.range(0, param.producers)
.mapToObj(i -> new ArrayBlockingQueue<List<Record<byte[], byte[]>>>(3000))
.collect(Collectors.toUnmodifiableList());
.toList();
// ensure topics are existent
System.out.println("checking topics: " + String.join(",", param.topics));
param.checkTopics();
Expand Down Expand Up @@ -123,10 +123,7 @@ public static List<String> execute(final Argument param) {
var current = Report.recordsConsumedTotal();

if (blockingQueues.stream().allMatch(Collection::isEmpty)) {
var unfinishedProducers =
producerThreads.stream()
.filter(p -> !p.closed())
.collect(Collectors.toUnmodifiableList());
var unfinishedProducers = producerThreads.stream().filter(p -> !p.closed()).toList();
unfinishedProducers.forEach(AbstractThread::close);
}

Expand Down Expand Up @@ -388,7 +385,7 @@ else if (specifiedByBroker) {
.filter(replica -> specifyBrokers.contains(replica.nodeInfo().id()))
.map(replica -> TopicPartition.of(replica.topic(), replica.partition()))
.distinct()
.collect(Collectors.toUnmodifiableList());
.toList();
if (selections.isEmpty())
throw new IllegalArgumentException(
"No partition match the specify.brokers requirement");
Expand Down Expand Up @@ -426,8 +423,7 @@ else if (specifiedByBroker) {
"The following topic/partitions are nonexistent in the cluster: " + notExist);
}

final var selection =
specifyPartitions.stream().distinct().collect(Collectors.toUnmodifiableList());
final var selection = specifyPartitions.stream().distinct().toList();
return () -> selection.get(ThreadLocalRandom.current().nextInt(selection.size()));
} else if (throttle) {
// TODO: The functions of throttle and select partitioner should not conflict with each
Expand All @@ -444,15 +440,12 @@ else if (specifiedByBroker) {
.replicaStream()
.map(Replica::topicPartition)
.distinct()
.collect(Collectors.toUnmodifiableList());
.toList();
return () -> selection.get(ThreadLocalRandom.current().nextInt(selection.size()));
}
} else {
final var selection =
topics.stream()
.map(topic -> TopicPartition.of(topic, -1))
.distinct()
.collect(Collectors.toUnmodifiableList());
topics.stream().map(topic -> TopicPartition.of(topic, -1)).distinct().toList();
return () -> selection.get(ThreadLocalRandom.current().nextInt(selection.size()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ static List<ProducerThread> create(
var producers = queues.size();
if (producers <= 0) return List.of();
var closeLatches =
IntStream.range(0, producers)
.mapToObj(ignored -> new CountDownLatch(1))
.collect(Collectors.toUnmodifiableList());
IntStream.range(0, producers).mapToObj(ignored -> new CountDownLatch(1)).toList();
var executors = Executors.newFixedThreadPool(producers);
// monitor
CompletableFuture.runAsync(
Expand Down
2 changes: 1 addition & 1 deletion app/src/main/java/org/astraea/app/web/BalancerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private PlanExecutionProgress progress(String taskId) {
tp ->
Change.from(
contextCluster.replicas(tp), solution.proposal().replicas(tp)))
.collect(Collectors.toUnmodifiableList());
.toList();
var report =
(Supplier<PlanReport>)
() ->
Expand Down
11 changes: 4 additions & 7 deletions app/src/main/java/org/astraea/app/web/BeanHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.astraea.common.admin.Admin;
import org.astraea.common.metrics.BeanObject;
import org.astraea.common.metrics.BeanQuery;
Expand Down Expand Up @@ -48,12 +47,10 @@ public CompletionStage<Response> get(Channel channel) {
try (var client = JndiClient.of(b.host(), jmxPorts.apply(b.id()))) {
return new NodeBean(
b.host(),
client.beans(builder.build()).stream()
.map(Bean::new)
.collect(Collectors.toUnmodifiableList()));
client.beans(builder.build()).stream().map(Bean::new).toList());
}
})
.collect(Collectors.toUnmodifiableList())));
.toList()));
}

static class Property implements Response {
Expand Down Expand Up @@ -86,11 +83,11 @@ static class Bean implements Response {
this.properties =
obj.properties().entrySet().stream()
.map(e -> new Property(e.getKey(), e.getValue()))
.collect(Collectors.toUnmodifiableList());
.toList();
this.attributes =
obj.attributes().entrySet().stream()
.map(e -> new Attribute(e.getKey(), e.getValue().toString()))
.collect(Collectors.toUnmodifiableList());
.toList();
}
}

Expand Down
4 changes: 2 additions & 2 deletions app/src/main/java/org/astraea/app/web/BrokerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public CompletionStage<Response> get(Channel channel) {
brokers.stream()
.filter(b -> ids.contains(b.id()))
.map(Broker::new)
.collect(Collectors.toList())))
.toList()))
.thenApply(
brokers -> {
if (brokers.isEmpty()) throw new NoSuchElementException("no brokers are found");
Expand Down Expand Up @@ -96,7 +96,7 @@ static class Broker implements Response {
.entrySet()
.stream()
.map(e -> new Topic(e.getKey(), e.getValue().size()))
.collect(Collectors.toUnmodifiableList());
.toList();
this.configs = broker.config().raw();
}
}
Expand Down
2 changes: 1 addition & 1 deletion app/src/main/java/org/astraea/app/web/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ static Channel of(HttpExchange exchange) {
Arrays.stream(uri.getPath().split("/"))
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toUnmodifiableList());
.toList();
// form: /resource/target
if (allPaths.size() == 1) return Optional.empty();
else if (allPaths.size() == 2) return Optional.of(allPaths.get(1));
Expand Down
7 changes: 3 additions & 4 deletions app/src/main/java/org/astraea/app/web/GroupHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,9 @@ public CompletionStage<Response> get(Channel channel) {
empty())
.filter(Optional::isPresent)
.map(Optional::get)
.collect(
Collectors.toUnmodifiableList())))
.collect(Collectors.toUnmodifiableList())))
.collect(Collectors.toUnmodifiableList())))
.toList()))
.toList()))
.toList()))
.thenApply(
groups -> {
if (channel.target().isPresent() && groups.size() == 1) return groups.get(0);
Expand Down
5 changes: 2 additions & 3 deletions app/src/main/java/org/astraea/app/web/ProducerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public CompletionStage<Partitions> get(Channel channel) {
.entrySet()
.stream()
.map(e -> new Partition(e.getKey(), e.getValue()))
.collect(Collectors.toUnmodifiableList())));
.toList()));
}

static class ProducerState implements Response {
Expand Down Expand Up @@ -88,8 +88,7 @@ static class Partition implements Response {
Collection<org.astraea.common.admin.ProducerState> states) {
this.topic = tp.topic();
this.partition = tp.partition();
this.states =
states.stream().map(ProducerState::new).collect(Collectors.toUnmodifiableList());
this.states = states.stream().map(ProducerState::new).toList();
}
}

Expand Down
2 changes: 1 addition & 1 deletion app/src/main/java/org/astraea/app/web/QuotaHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ static class Quotas implements Response {
final List<Quota> quotas;

Quotas(Collection<org.astraea.common.admin.Quota> quotas) {
this.quotas = quotas.stream().map(Quota::new).collect(Collectors.toUnmodifiableList());
this.quotas = quotas.stream().map(Quota::new).toList();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,7 @@ public CompletionStage<Response> post(Channel channel) {
if (excludedBroker.isEmpty())
return CompletableFuture.completedFuture(Response.BAD_REQUEST);
var availableBrokers =
brokers.stream()
.filter(b -> b.id() != exclude)
.collect(Collectors.toList());
brokers.stream().filter(b -> b.id() != exclude).toList();
var partitions =
excludedBroker.get().topicPartitions().stream()
.filter(
Expand Down Expand Up @@ -131,7 +129,7 @@ public CompletionStage<Response> post(Channel channel) {
Stream.of(process2Folders, process2Nodes, processExclude)
.flatMap(Function.identity())
.map(CompletionStage::toCompletableFuture)
.collect(Collectors.toUnmodifiableList()))
.toList())
.thenApply(
rs -> {
if (!rs.isEmpty() && rs.stream().allMatch(r -> r == Response.ACCEPT))
Expand Down Expand Up @@ -161,7 +159,7 @@ public CompletionStage<Reassignments> get(Channel channel) {
r ->
new AddingReplica(
r, leaderSizes.getOrDefault(r.topicPartition(), 0L)))
.collect(Collectors.toUnmodifiableList()));
.toList());
});
}

Expand Down
6 changes: 3 additions & 3 deletions app/src/main/java/org/astraea/app/web/TopicHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private CompletionStage<Topics> get(
.filter(replica -> replica.topic().equals(p.topic()))
.filter(replica -> replica.partition() == p.partition())
.map(Replica::new)
.collect(Collectors.toUnmodifiableList())),
.toList()),
Collectors.toList())));
// topic name -> group ids
var gs =
Expand Down Expand Up @@ -149,7 +149,7 @@ private CompletionStage<Topics> get(
gs.getOrDefault(topic.name(), Set.of()),
ps.get(topic.name()),
topic.config().raw()))
.collect(Collectors.toUnmodifiableList()));
.toList());
});
}

Expand Down Expand Up @@ -197,7 +197,7 @@ public CompletionStage<Topics> post(Channel channel) {
new Topics(
topicNames.stream()
.map(t -> new TopicInfo(t, Set.of(), List.of(), Map.of()))
.collect(Collectors.toUnmodifiableList())));
.toList()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ public CompletionStage<Response> get(Channel channel) {
.thenCompose(admin::transactions)
.thenApply(
transactions ->
transactions.stream()
.map(t -> new Transaction(t.transactionId(), t))
.collect(Collectors.toUnmodifiableList()))
transactions.stream().map(t -> new Transaction(t.transactionId(), t)).toList())
.thenApply(
transactions -> {
if (channel.target().isPresent() && transactions.size() == 1)
Expand Down

0 comments on commit 12bcb0b

Please sign in to comment.