Skip to content

Commit

Permalink
[COMMON] rewrite admin.Topic by java 17 record (#1775)
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 authored May 26, 2023
1 parent 6e6bafb commit 4c90349
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 104 deletions.
28 changes: 5 additions & 23 deletions common/src/main/java/org/astraea/common/ByteUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -341,29 +341,11 @@ public static ClusterInfo readClusterInfo(byte[] bytes) {
outerClusterInfo.getTopicList().stream()
.map(
protoTopic ->
new Topic() {
@Override
public String name() {
return protoTopic.getName();
}

@Override
public Config config() {
return new Config(protoTopic.getConfigMap());
}

@Override
public boolean internal() {
return protoTopic.getInternal();
}

@Override
public Set<TopicPartition> topicPartitions() {
return protoTopic.getPartitionList().stream()
.map(tp -> TopicPartition.of(protoTopic.getName(), tp))
.collect(Collectors.toSet());
}
})
new Topic(
protoTopic.getName(),
new Config(protoTopic.getConfigMap()),
protoTopic.getInternal(),
Set.copyOf(protoTopic.getPartitionList())))
.collect(Collectors.toMap(Topic::name, Function.identity())),
outerClusterInfo.getReplicaList().stream()
.map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -56,45 +55,18 @@ class OptimizedClusterInfo implements ClusterInfo {
.distinct()
.map(
topic ->
new Topic() {
@Override
public String name() {
return topic;
}

@Override
public Config config() {
return Optional.ofNullable(topics.get(name()))
new Topic(
topic,
Optional.ofNullable(topics.get(topic))
.map(Topic::config)
.orElse(Config.EMPTY);
}

@Override
public boolean internal() {
return Optional.ofNullable(topics.get(name()))
.orElse(Config.EMPTY),
Optional.ofNullable(topics.get(topic))
.map(Topic::internal)
.orElse(false);
}

@Override
public Set<TopicPartition> topicPartitions() {
return OptimizedClusterInfo.this.replicas(topic).stream()
.map(Replica::topicPartition)
.collect(Collectors.toUnmodifiableSet());
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
var objTopic = (Topic) obj;
return Objects.equals(name(), objTopic.name())
&& config().raw().equals(objTopic.config().raw())
&& internal() == objTopic.internal()
&& topicPartitions().equals(objTopic.topicPartitions());
}
})
.collect(Collectors.toUnmodifiableMap(t -> t.name(), t -> t)));
.orElse(false),
OptimizedClusterInfo.this.replicas(topic).stream()
.map(r -> r.topicPartition().partition())
.collect(Collectors.toUnmodifiableSet())))
.collect(Collectors.toUnmodifiableMap(Topic::name, t -> t)));
this.byBrokerTopic =
Lazy.of(
() ->
Expand Down
62 changes: 19 additions & 43 deletions common/src/main/java/org/astraea/common/admin/Topic.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,56 +19,32 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartitionInfo;

public interface Topic {
/**
* @param name topic name
* @param config topic configs. it includes both default configs and override configs
* @param internal true if this topic is internal (system) topic
* @param partitionIds partition id related to this topic
*/
public record Topic(String name, Config config, boolean internal, Set<Integer> partitionIds) {

static Topic of(
String name,
org.apache.kafka.clients.admin.TopicDescription topicDescription,
Map<String, String> kafkaConfig) {

var config = new Config(kafkaConfig);
var topicPartitions =
return new Topic(
name,
new Config(kafkaConfig),
topicDescription.isInternal(),
topicDescription.partitions().stream()
.map(p -> TopicPartition.of(name, p.partition()))
.collect(Collectors.toUnmodifiableSet());
return new Topic() {
@Override
public String name() {
return name;
}

@Override
public Config config() {
return config;
}

@Override
public boolean internal() {
return topicDescription.isInternal();
}

@Override
public Set<TopicPartition> topicPartitions() {
return topicPartitions;
}
};
.map(TopicPartitionInfo::partition)
.collect(Collectors.toUnmodifiableSet()));
}

/**
* @return topic name
*/
String name();

/**
* @return config used by this topic
*/
Config config();

/**
* @return true if this topic is internal (system) topic
*/
boolean internal();

Set<TopicPartition> topicPartitions();
public Set<TopicPartition> topicPartitions() {
return partitionIds.stream()
.map(id -> new TopicPartition(name, id))
.collect(Collectors.toUnmodifiableSet());
}
}

0 comments on commit 4c90349

Please sign in to comment.