diff --git a/common/src/main/java/org/astraea/common/ByteUtils.java b/common/src/main/java/org/astraea/common/ByteUtils.java index 405c01f3c6..9f6f01dc92 100644 --- a/common/src/main/java/org/astraea/common/ByteUtils.java +++ b/common/src/main/java/org/astraea/common/ByteUtils.java @@ -341,29 +341,11 @@ public static ClusterInfo readClusterInfo(byte[] bytes) { outerClusterInfo.getTopicList().stream() .map( protoTopic -> - new Topic() { - @Override - public String name() { - return protoTopic.getName(); - } - - @Override - public Config config() { - return new Config(protoTopic.getConfigMap()); - } - - @Override - public boolean internal() { - return protoTopic.getInternal(); - } - - @Override - public Set topicPartitions() { - return protoTopic.getPartitionList().stream() - .map(tp -> TopicPartition.of(protoTopic.getName(), tp)) - .collect(Collectors.toSet()); - } - }) + new Topic( + protoTopic.getName(), + new Config(protoTopic.getConfigMap()), + protoTopic.getInternal(), + Set.copyOf(protoTopic.getPartitionList()))) .collect(Collectors.toMap(Topic::name, Function.identity())), outerClusterInfo.getReplicaList().stream() .map( diff --git a/common/src/main/java/org/astraea/common/admin/OptimizedClusterInfo.java b/common/src/main/java/org/astraea/common/admin/OptimizedClusterInfo.java index e034b9368e..e463d2bc45 100644 --- a/common/src/main/java/org/astraea/common/admin/OptimizedClusterInfo.java +++ b/common/src/main/java/org/astraea/common/admin/OptimizedClusterInfo.java @@ -18,7 +18,6 @@ import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -56,45 +55,18 @@ class OptimizedClusterInfo implements ClusterInfo { .distinct() .map( topic -> - new Topic() { - @Override - public String name() { - return topic; - } - - @Override - public Config config() { - return Optional.ofNullable(topics.get(name())) + new Topic( + topic, + Optional.ofNullable(topics.get(topic)) .map(Topic::config) - .orElse(Config.EMPTY); - } - - @Override - public boolean internal() { - return Optional.ofNullable(topics.get(name())) + .orElse(Config.EMPTY), + Optional.ofNullable(topics.get(topic)) .map(Topic::internal) - .orElse(false); - } - - @Override - public Set topicPartitions() { - return OptimizedClusterInfo.this.replicas(topic).stream() - .map(Replica::topicPartition) - .collect(Collectors.toUnmodifiableSet()); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) return true; - if (obj == null || getClass() != obj.getClass()) return false; - var objTopic = (Topic) obj; - return Objects.equals(name(), objTopic.name()) - && config().raw().equals(objTopic.config().raw()) - && internal() == objTopic.internal() - && topicPartitions().equals(objTopic.topicPartitions()); - } - }) - .collect(Collectors.toUnmodifiableMap(t -> t.name(), t -> t))); + .orElse(false), + OptimizedClusterInfo.this.replicas(topic).stream() + .map(r -> r.topicPartition().partition()) + .collect(Collectors.toUnmodifiableSet()))) + .collect(Collectors.toUnmodifiableMap(Topic::name, t -> t))); this.byBrokerTopic = Lazy.of( () -> diff --git a/common/src/main/java/org/astraea/common/admin/Topic.java b/common/src/main/java/org/astraea/common/admin/Topic.java index f7e7b8ff19..9db850eb3d 100644 --- a/common/src/main/java/org/astraea/common/admin/Topic.java +++ b/common/src/main/java/org/astraea/common/admin/Topic.java @@ -19,56 +19,32 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.apache.kafka.common.TopicPartitionInfo; -public interface Topic { +/** + * @param name topic name + * @param config topic configs. it includes both default configs and override configs + * @param internal true if this topic is internal (system) topic + * @param partitionIds partition id related to this topic + */ +public record Topic(String name, Config config, boolean internal, Set partitionIds) { static Topic of( String name, org.apache.kafka.clients.admin.TopicDescription topicDescription, Map kafkaConfig) { - - var config = new Config(kafkaConfig); - var topicPartitions = + return new Topic( + name, + new Config(kafkaConfig), + topicDescription.isInternal(), topicDescription.partitions().stream() - .map(p -> TopicPartition.of(name, p.partition())) - .collect(Collectors.toUnmodifiableSet()); - return new Topic() { - @Override - public String name() { - return name; - } - - @Override - public Config config() { - return config; - } - - @Override - public boolean internal() { - return topicDescription.isInternal(); - } - - @Override - public Set topicPartitions() { - return topicPartitions; - } - }; + .map(TopicPartitionInfo::partition) + .collect(Collectors.toUnmodifiableSet())); } - /** - * @return topic name - */ - String name(); - - /** - * @return config used by this topic - */ - Config config(); - - /** - * @return true if this topic is internal (system) topic - */ - boolean internal(); - - Set topicPartitions(); + public Set topicPartitions() { + return partitionIds.stream() + .map(id -> new TopicPartition(name, id)) + .collect(Collectors.toUnmodifiableSet()); + } }