Skip to content

Commit

Permalink
[COMMON] Use toList instead of Collectors.toUnmodifiableList (opensou…
Browse files Browse the repository at this point in the history
  • Loading branch information
brandboat authored Jun 17, 2023
1 parent b1c11a2 commit cdde1b0
Show file tree
Hide file tree
Showing 37 changed files with 94 additions and 190 deletions.
3 changes: 1 addition & 2 deletions common/src/main/java/org/astraea/common/DataUnit.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

/**
* Utility related to data and unit, this class dedicated to easing the pain of converting and
Expand Down Expand Up @@ -131,5 +130,5 @@ public DataSize of(long measurement) {
Arrays.stream(DataUnit.values())
.filter(x -> x.candidateUnitForToString)
.sorted(Comparator.comparing(x -> x.bits))
.collect(Collectors.toUnmodifiableList());
.toList();
}
2 changes: 1 addition & 1 deletion common/src/main/java/org/astraea/common/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ default CompletionStage<Set<String>> topicNames(List<TopicChecker> checkers) {
FutureUtils.sequence(
checkers.stream()
.map(checker -> checker.test(this, topicNames).toCompletableFuture())
.collect(Collectors.toUnmodifiableList()))
.toList())
.thenApply(
all ->
all.stream()
Expand Down
30 changes: 9 additions & 21 deletions common/src/main/java/org/astraea/common/admin/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,7 @@ static Map<TopicPartition, Long> leaderSize(ClusterInfo clusterInfo) {
* @return A list of {@link Replica}.
*/
default List<Replica> replicaLeaders() {
return replicaStream()
.filter(Replica::isLeader)
.filter(Replica::isOnline)
.collect(Collectors.toUnmodifiableList());
return replicaStream().filter(Replica::isLeader).filter(Replica::isOnline).toList();
}

/**
Expand All @@ -182,10 +179,7 @@ default List<Replica> replicaLeaders() {
* @return A list of {@link Replica}.
*/
default List<Replica> replicaLeaders(String topic) {
return replicaStream(topic)
.filter(Replica::isLeader)
.filter(Replica::isOnline)
.collect(Collectors.toUnmodifiableList());
return replicaStream(topic).filter(Replica::isLeader).filter(Replica::isOnline).toList();
}

/**
Expand All @@ -195,10 +189,7 @@ default List<Replica> replicaLeaders(String topic) {
* @return A list of {@link Replica}.
*/
default List<Replica> replicaLeaders(int broker) {
return replicaStream(broker)
.filter(Replica::isLeader)
.filter(Replica::isOnline)
.collect(Collectors.toUnmodifiableList());
return replicaStream(broker).filter(Replica::isLeader).filter(Replica::isOnline).toList();
}

/**
Expand All @@ -207,10 +198,7 @@ default List<Replica> replicaLeaders(int broker) {
* @return A list of {@link Replica}.
*/
default List<Replica> replicaLeaders(BrokerTopic brokerTopic) {
return replicaStream(brokerTopic)
.filter(Replica::isLeader)
.filter(Replica::isOnline)
.collect(Collectors.toUnmodifiableList());
return replicaStream(brokerTopic).filter(Replica::isLeader).filter(Replica::isOnline).toList();
}

/**
Expand All @@ -235,7 +223,7 @@ default Optional<Replica> replicaLeader(TopicPartition topicPartition) {
* @return A list of {@link Replica}.
*/
default List<Replica> availableReplicas(String topic) {
return replicaStream(topic).filter(Replica::isOnline).collect(Collectors.toUnmodifiableList());
return replicaStream(topic).filter(Replica::isOnline).toList();
}

// ---------------------[for replicas]---------------------//
Expand All @@ -244,7 +232,7 @@ default List<Replica> availableReplicas(String topic) {
* @return all replicas cached by this cluster info.
*/
default List<Replica> replicas() {
return replicaStream().collect(Collectors.toUnmodifiableList());
return replicaStream().toList();
}

/**
Expand All @@ -254,7 +242,7 @@ default List<Replica> replicas() {
* @return A list of {@link Replica}.
*/
default List<Replica> replicas(String topic) {
return replicaStream(topic).collect(Collectors.toUnmodifiableList());
return replicaStream(topic).toList();
}

/**
Expand All @@ -265,15 +253,15 @@ default List<Replica> replicas(String topic) {
* @return A list of {@link Replica}.
*/
default List<Replica> replicas(TopicPartition topicPartition) {
return replicaStream(topicPartition).collect(Collectors.toUnmodifiableList());
return replicaStream(topicPartition).toList();
}

/**
* @param replica to search
* @return the replica matched to input replica
*/
default List<Replica> replicas(TopicPartitionReplica replica) {
return replicaStream(replica).collect(Collectors.toUnmodifiableList());
return replicaStream(replica).toList();
}

// ---------------------[others]---------------------//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.stream.Collectors;
import org.astraea.common.EnumInfo;

public enum TransactionState implements EnumInfo {
Expand Down Expand Up @@ -63,6 +62,6 @@ public static TransactionState of(String value) {
}

public static Collection<TransactionState> all() {
return Arrays.stream(TransactionState.values()).collect(Collectors.toUnmodifiableList());
return Arrays.stream(TransactionState.values()).toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ public static SubscriptionInfo from(ConsumerPartitionAssignor.Subscription subsc
var ownPartitions =
subscription.ownedPartitions() == null
? List.<TopicPartition>of()
: subscription.ownedPartitions().stream()
.map(TopicPartition::from)
.collect(Collectors.toUnmodifiableList());
: subscription.ownedPartitions().stream().map(TopicPartition::from).toList();

var kafkaUserData = subscription.userData();
// convert ByteBuffer into Map<String,String>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.zip.GZIPOutputStream;
import org.astraea.common.ByteUtils;
import org.astraea.common.DataSize;
Expand Down Expand Up @@ -70,7 +69,7 @@ public void append(Record<byte[], byte[]> record) {
? ByteString.EMPTY
: ByteString.copyFrom(header.value()))
.build())
.collect(Collectors.toUnmodifiableList()))
.toList())
.build()
.writeDelimitedTo(outputStream));
count.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ void noMetricCheck(ClusterBean clusterBean) {
clusterBean.all().entrySet().stream()
.filter(e -> e.getValue().size() == 0)
.map(Map.Entry::getKey)
.collect(Collectors.toUnmodifiableList());
.toList();
if (!noMetricBrokers.isEmpty())
throw new NoSufficientMetricsException(
this,
Expand Down
11 changes: 4 additions & 7 deletions common/src/main/java/org/astraea/common/cost/Normalizer.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,10 @@ static Normalizer minMax(boolean positive) {
double max = values.stream().max(comparator).orElse(0.0);
double min = values.stream().min(comparator).orElse(0.0);
// there is nothing to rescale, so we just all same values
if (max == min)
return IntStream.range(0, values.size())
.mapToObj(ignored -> 1.0)
.collect(Collectors.toUnmodifiableList());
if (max == min) return IntStream.range(0, values.size()).mapToObj(ignored -> 1.0).toList();
return values.stream()
.map(value -> (positive ? value - min : max - value) / (max - min))
.collect(Collectors.toUnmodifiableList());
.toList();
};
}

Expand All @@ -76,7 +73,7 @@ static Normalizer minMax(boolean positive) {
static Normalizer proportion() {
return values -> {
var sum = values.stream().mapToDouble(i -> i).sum();
return values.stream().map(v -> v / sum).collect(Collectors.toUnmodifiableList());
return values.stream().map(v -> v / sum).toList();
};
}

Expand Down Expand Up @@ -105,7 +102,7 @@ static Normalizer TScore() {
}
return Math.round(score * 100) / 100.0;
})
.collect(Collectors.toUnmodifiableList());
.toList();
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public Collection<BeanObject> beans(
return Stream.empty();
}
})
.collect(Collectors.toUnmodifiableList()));
.toList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.astraea.common.metrics.client.admin;

import java.util.Collection;
import java.util.stream.Collectors;
import org.astraea.common.Utils;
import org.astraea.common.metrics.BeanQuery;
import org.astraea.common.metrics.MBeanClient;
Expand Down Expand Up @@ -51,14 +50,10 @@ public class AdminMetrics {
* @return key is broker id, and value is associated to broker metrics recorded by all consumers
*/
public static Collection<HasNodeMetrics> node(MBeanClient mBeanClient) {
return mBeanClient.beans(NODE_QUERY).stream()
.map(b -> (HasNodeMetrics) () -> b)
.collect(Collectors.toUnmodifiableList());
return mBeanClient.beans(NODE_QUERY).stream().map(b -> (HasNodeMetrics) () -> b).toList();
}

public static Collection<HasSelectorMetrics> admin(MBeanClient mBeanClient) {
return mBeanClient.beans(ADMIN_QUERY).stream()
.map(b -> (HasSelectorMetrics) () -> b)
.collect(Collectors.toUnmodifiableList());
return mBeanClient.beans(ADMIN_QUERY).stream().map(b -> (HasSelectorMetrics) () -> b).toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,26 +77,24 @@ public static List<AppInfo> appInfo(MBeanClient client) {
* @return key is broker id, and value is associated to broker metrics recorded by all consumers
*/
public static Collection<HasNodeMetrics> node(MBeanClient mBeanClient) {
return mBeanClient.beans(NODE_QUERY).stream()
.map(b -> (HasNodeMetrics) () -> b)
.collect(Collectors.toUnmodifiableList());
return mBeanClient.beans(NODE_QUERY).stream().map(b -> (HasNodeMetrics) () -> b).toList();
}

public static Collection<HasConsumerCoordinatorMetrics> coordinator(MBeanClient mBeanClient) {
return mBeanClient.beans(COORDINATOR_QUERY).stream()
.map(b -> (HasConsumerCoordinatorMetrics) () -> b)
.collect(Collectors.toUnmodifiableList());
.toList();
}

public static Collection<HasConsumerFetchMetrics> fetch(MBeanClient mBeanClient) {
return mBeanClient.beans(FETCH_QUERY).stream()
.map(b -> (HasConsumerFetchMetrics) () -> b)
.collect(Collectors.toUnmodifiableList());
.toList();
}

public static Collection<HasConsumerMetrics> consumer(MBeanClient mBeanClient) {
return mBeanClient.beans(CONSUMER_QUERY).stream()
.map(b -> (HasConsumerMetrics) () -> b)
.collect(Collectors.toUnmodifiableList());
.toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@ public static List<AppInfo> appInfo(MBeanClient client) {
* @return key is broker id, and value is associated to broker metrics recorded by all producers
*/
public static Collection<HasNodeMetrics> node(MBeanClient mBeanClient) {
return mBeanClient.beans(NODE_QUERY).stream()
.map(b -> (HasNodeMetrics) () -> b)
.collect(Collectors.toUnmodifiableList());
return mBeanClient.beans(NODE_QUERY).stream().map(b -> (HasNodeMetrics) () -> b).toList();
}

/**
Expand All @@ -87,13 +85,13 @@ public static Collection<HasNodeMetrics> node(MBeanClient mBeanClient) {
public static Collection<HasProducerTopicMetrics> topic(MBeanClient mBeanClient) {
return mBeanClient.beans(TOPIC_QUERY).stream()
.map(b -> (HasProducerTopicMetrics) () -> b)
.collect(Collectors.toUnmodifiableList());
.toList();
}

public static Collection<HasProducerMetrics> producer(MBeanClient mBeanClient) {
return mBeanClient.beans(PRODUCER_QUERY).stream()
.map(b -> (HasProducerMetrics) () -> b)
.collect(Collectors.toUnmodifiableList());
.toList();
}

private ProducerMetrics() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public Collection<BeanObject> beans(
&& e.getValue()
.matcher(storedEntry.properties().get(e.getKey()))
.matches()))
.collect(Collectors.toUnmodifiableList());
.toList();
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,25 +120,21 @@ public class ConnectorMetrics {
public static List<SourceTaskInfo> sourceTaskInfo(MBeanClient client) {
return client.beans(SOURCE_TASK_INFO_QUERY).stream()
.map(b -> (SourceTaskInfo) () -> b)
.collect(Collectors.toUnmodifiableList());
.toList();
}

public static List<SinkTaskInfo> sinkTaskInfo(MBeanClient client) {
return client.beans(SINK_TASK_INFO_QUERY).stream()
.map(b -> (SinkTaskInfo) () -> b)
.collect(Collectors.toUnmodifiableList());
return client.beans(SINK_TASK_INFO_QUERY).stream().map(b -> (SinkTaskInfo) () -> b).toList();
}

public static List<TaskError> taskError(MBeanClient client) {
return client.beans(TASK_ERROR_QUERY).stream()
.map(b -> (TaskError) () -> b)
.collect(Collectors.toUnmodifiableList());
return client.beans(TASK_ERROR_QUERY).stream().map(b -> (TaskError) () -> b).toList();
}

public static List<ConnectorTaskInfo> connectorTaskInfo(MBeanClient client) {
return client.beans(CONNECTOR_TASK_INFO_QUERY).stream()
.map(b -> (ConnectorTaskInfo) () -> b)
.collect(Collectors.toUnmodifiableList());
.toList();
}

public static List<AppInfo> appInfo(MBeanClient client) {
Expand All @@ -150,42 +146,34 @@ public static List<AppInfo> appInfo(MBeanClient client) {
public static List<ConnectCoordinatorInfo> coordinatorInfo(MBeanClient client) {
return client.beans(COORDINATOR_INFO_QUERY).stream()
.map(b -> (ConnectCoordinatorInfo) () -> b)
.collect(Collectors.toUnmodifiableList());
.toList();
}

public static List<ConnectorInfo> connectorInfo(MBeanClient client) {
return client.beans(CONNECTOR_INFO_QUERY).stream()
.map(b -> (ConnectorInfo) () -> b)
.collect(Collectors.toUnmodifiableList());
return client.beans(CONNECTOR_INFO_QUERY).stream().map(b -> (ConnectorInfo) () -> b).toList();
}

public static List<ConnectWorkerRebalanceInfo> workerRebalanceInfo(MBeanClient client) {
return client.beans(WORKER_REBALANCE_INFO_QUERY).stream()
.map(b -> (ConnectWorkerRebalanceInfo) () -> b)
.collect(Collectors.toUnmodifiableList());
.toList();
}

public static List<HasNodeMetrics> nodeInfo(MBeanClient client) {
return client.beans(NODE_INFO_QUERY).stream()
.map(b -> (HasNodeMetrics) () -> b)
.collect(Collectors.toUnmodifiableList());
return client.beans(NODE_INFO_QUERY).stream().map(b -> (HasNodeMetrics) () -> b).toList();
}

public static List<ConnectWorkerInfo> workerInfo(MBeanClient client) {
return client.beans(WORKER_INFO_QUERY).stream()
.map(b -> (ConnectWorkerInfo) () -> b)
.collect(Collectors.toUnmodifiableList());
return client.beans(WORKER_INFO_QUERY).stream().map(b -> (ConnectWorkerInfo) () -> b).toList();
}

public static List<ConnectWorkerConnectorInfo> workerConnectorInfo(MBeanClient client) {
return client.beans(WORKER_CONNECTOR_INFO_QUERY).stream()
.map(b -> (ConnectWorkerConnectorInfo) () -> b)
.collect(Collectors.toUnmodifiableList());
.toList();
}

public static List<HasSelectorMetrics> connector(MBeanClient client) {
return client.beans(CONNECTOR_QUERY).stream()
.map(b -> (HasSelectorMetrics) () -> b)
.collect(Collectors.toUnmodifiableList());
return client.beans(CONNECTOR_QUERY).stream().map(b -> (HasSelectorMetrics) () -> b).toList();
}
}
Loading

0 comments on commit cdde1b0

Please sign in to comment.