Skip to content

Commit

Permalink
[SERIALIZATION] Serialize Borker info in Clusterinfo (#1721)
Browse files Browse the repository at this point in the history
  • Loading branch information
chaohengstudent authored May 27, 2023
1 parent 6b4e546 commit 5114183
Show file tree
Hide file tree
Showing 8 changed files with 258 additions and 118 deletions.
218 changes: 137 additions & 81 deletions common/src/main/java/org/astraea/common/ByteUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@
import org.astraea.common.admin.Topic;
import org.astraea.common.admin.TopicPartition;
import org.astraea.common.generated.BeanObjectOuterClass;
import org.astraea.common.generated.ClusterInfoOuterClass;
import org.astraea.common.generated.PrimitiveOuterClass;
import org.astraea.common.generated.admin.BrokerOuterClass;
import org.astraea.common.generated.admin.ClusterInfoOuterClass;
import org.astraea.common.generated.admin.ReplicaOuterClass;
import org.astraea.common.generated.admin.TopicOuterClass;
import org.astraea.common.generated.admin.TopicPartitionOuterClass;
import org.astraea.common.metrics.BeanObject;

public final class ByteUtils {
Expand Down Expand Up @@ -186,58 +190,13 @@ public static byte[] toBytes(BeanObject value) {
return beanBuilder.build().toByteArray();
}

// TODO: Due to the change of NodeInfo to Broker. This and the test should be updated.
/** Serialize ClusterInfo by protocol buffer. */
public static byte[] toBytes(ClusterInfo value) {
return ClusterInfoOuterClass.ClusterInfo.newBuilder()
.setClusterId(value.clusterId())
.addAllNodeInfo(
value.brokers().stream()
.map(
nodeInfo ->
ClusterInfoOuterClass.ClusterInfo.NodeInfo.newBuilder()
.setId(nodeInfo.id())
.setHost(nodeInfo.host())
.setPort(nodeInfo.port())
.build())
.collect(Collectors.toList()))
.addAllTopic(
value.topics().values().stream()
.map(
topicClass ->
ClusterInfoOuterClass.ClusterInfo.Topic.newBuilder()
.setName(topicClass.name())
.putAllConfig(topicClass.config().raw())
.setInternal(topicClass.internal())
.addAllPartition(
topicClass.topicPartitions().stream()
.map(TopicPartition::partition)
.collect(Collectors.toList()))
.build())
.collect(Collectors.toList()))
.addAllReplica(
value.replicas().stream()
.map(
replica ->
ClusterInfoOuterClass.ClusterInfo.Replica.newBuilder()
.setTopic(replica.topic())
.setPartition(replica.partition())
.setNodeInfo(
ClusterInfoOuterClass.ClusterInfo.NodeInfo.newBuilder()
.setId(replica.broker().id())
.setHost(replica.broker().host())
.setPort(replica.broker().port())
.build())
.setLag(replica.lag())
.setSize(replica.size())
.setIsLeader(replica.isLeader())
.setIsSync(replica.isSync())
.setIsFuture(replica.isFuture())
.setIsOffline(replica.isOffline())
.setIsPreferredLeader(replica.isPreferredLeader())
.setPath(replica.path())
.build())
.collect(Collectors.toList()))
.addAllBroker(value.brokers().stream().map(ByteUtils::toOuterClass).toList())
.addAllTopic(value.topics().values().stream().map(ByteUtils::toOuterClass).toList())
.addAllReplica(value.replicas().stream().map(ByteUtils::toOuterClass).toList())
.build()
.toByteArray();
}
Expand Down Expand Up @@ -328,51 +287,148 @@ public static BeanObject readBeanObject(byte[] bytes) throws SerializationExcept
}
}

// TODO: Due to the change of NodeInfo to Broker. This and the test should be updated.
/** Deserialize to ClusterInfo with protocol buffer */
public static ClusterInfo readClusterInfo(byte[] bytes) {
try {
var outerClusterInfo = ClusterInfoOuterClass.ClusterInfo.parseFrom(bytes);
return ClusterInfo.of(
outerClusterInfo.getClusterId(),
outerClusterInfo.getNodeInfoList().stream()
.map(nodeInfo -> Broker.of(nodeInfo.getId(), nodeInfo.getHost(), nodeInfo.getPort()))
.collect(Collectors.toList()),
outerClusterInfo.getBrokerList().stream().map(ByteUtils::toBroker).toList(),
outerClusterInfo.getTopicList().stream()
.map(
protoTopic ->
new Topic(
protoTopic.getName(),
new Config(protoTopic.getConfigMap()),
protoTopic.getInternal(),
Set.copyOf(protoTopic.getPartitionList())))
.map(ByteUtils::toTopic)
.collect(Collectors.toMap(Topic::name, Function.identity())),
outerClusterInfo.getReplicaList().stream()
.map(
replica ->
Replica.builder()
.topic(replica.getTopic())
.partition(replica.getPartition())
.broker(
Broker.of(
replica.getNodeInfo().getId(),
replica.getNodeInfo().getHost(),
replica.getNodeInfo().getPort()))
.lag(replica.getLag())
.size(replica.getSize())
.isLeader(replica.getIsLeader())
.isSync(replica.getIsSync())
.isFuture(replica.getIsFuture())
.isOffline(replica.getIsOffline())
.isPreferredLeader(replica.getIsPreferredLeader())
.path(replica.getPath())
.build())
.collect(Collectors.toList()));
outerClusterInfo.getReplicaList().stream().map(ByteUtils::toReplica).toList());
} catch (InvalidProtocolBufferException ex) {
throw new SerializationException(ex);
}
}

// ---------------------------Serialize To ProtoBuf Outer Class------------------------------- //

private static BrokerOuterClass.Broker.DataFolder toOuterClass(Broker.DataFolder dataFolder) {
return BrokerOuterClass.Broker.DataFolder.newBuilder()
.setPath(dataFolder.path())
.putAllPartitionSizes(
dataFolder.partitionSizes().entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey().toString(), Map.Entry::getValue)))
.putAllOrphanPartitionSizes(
dataFolder.orphanPartitionSizes().entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey().toString(), Map.Entry::getValue)))
.build();
}

private static TopicPartitionOuterClass.TopicPartition toOuterClass(
TopicPartition topicPartition) {
return TopicPartitionOuterClass.TopicPartition.newBuilder()
.setPartition(topicPartition.partition())
.setTopic(topicPartition.topic())
.build();
}

private static BrokerOuterClass.Broker toOuterClass(Broker broker) {
return BrokerOuterClass.Broker.newBuilder()
.setId(broker.id())
.setHost(broker.host())
.setPort(broker.port())
.setIsController(broker.isController())
.putAllConfig(broker.config().raw())
.addAllDataFolder(broker.dataFolders().stream().map(ByteUtils::toOuterClass).toList())
.addAllTopicPartitions(
broker.topicPartitions().stream().map(ByteUtils::toOuterClass).toList())
.addAllTopicPartitionLeaders(
broker.topicPartitionLeaders().stream().map(ByteUtils::toOuterClass).toList())
.build();
}

private static TopicOuterClass.Topic toOuterClass(Topic topic) {
return TopicOuterClass.Topic.newBuilder()
.setName(topic.name())
.putAllConfig(topic.config().raw())
.setInternal(topic.internal())
.addAllPartitionIds(topic.partitionIds())
.build();
}

private static ReplicaOuterClass.Replica toOuterClass(Replica replica) {
return ReplicaOuterClass.Replica.newBuilder()
.setTopic(replica.topic())
.setPartition(replica.partition())
.setBroker(toOuterClass(replica.broker()))
.setLag(replica.lag())
.setSize(replica.size())
.setIsInternal(replica.isInternal())
.setIsLeader(replica.isLeader())
.setIsAdding(replica.isAdding())
.setIsRemoving(replica.isRemoving())
.setIsSync(replica.isSync())
.setIsFuture(replica.isFuture())
.setIsOffline(replica.isOffline())
.setIsPreferredLeader(replica.isPreferredLeader())
.setPath(replica.path())
.build();
}

// -------------------------Deserialize From ProtoBuf Outer Class----------------------------- //

private static Broker.DataFolder toDataFolder(BrokerOuterClass.Broker.DataFolder dataFolder) {
return new Broker.DataFolder(
dataFolder.getPath(),
dataFolder.getPartitionSizesMap().entrySet().stream()
.collect(
Collectors.toMap(entry -> TopicPartition.of(entry.getKey()), Map.Entry::getValue)),
dataFolder.getOrphanPartitionSizesMap().entrySet().stream()
.collect(
Collectors.toMap(entry -> TopicPartition.of(entry.getKey()), Map.Entry::getValue)));
}

private static TopicPartition toTopicPartition(
TopicPartitionOuterClass.TopicPartition topicPartition) {
return new TopicPartition(topicPartition.getTopic(), topicPartition.getPartition());
}

private static Broker toBroker(BrokerOuterClass.Broker broker) {
return new Broker(
broker.getId(),
broker.getHost(),
broker.getPort(),
broker.getIsController(),
new Config(broker.getConfigMap()),
broker.getDataFolderList().stream().map(ByteUtils::toDataFolder).toList(),
broker.getTopicPartitionsList().stream()
.map(ByteUtils::toTopicPartition)
.collect(Collectors.toSet()),
broker.getTopicPartitionLeadersList().stream()
.map(ByteUtils::toTopicPartition)
.collect(Collectors.toSet()));
}

private static Topic toTopic(TopicOuterClass.Topic topic) {
return new Topic(
topic.getName(),
new Config(topic.getConfigMap()),
topic.getInternal(),
Set.copyOf(topic.getPartitionIdsList()));
}

private static Replica toReplica(ReplicaOuterClass.Replica replica) {
return Replica.builder()
.topic(replica.getTopic())
.partition(replica.getPartition())
.broker(toBroker(replica.getBroker()))
.lag(replica.getLag())
.size(replica.getSize())
.isInternal(replica.getIsInternal())
.isLeader(replica.getIsLeader())
.isAdding(replica.getIsAdding())
.isRemoving(replica.getIsRemoving())
.isSync(replica.getIsSync())
.isFuture(replica.getIsFuture())
.isOffline(replica.getIsOffline())
.isPreferredLeader(replica.getIsPreferredLeader())
.path(replica.getPath())
.build();
}

// --------------------------------ProtoBuf Primitive----------------------------------------- //

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
syntax = "proto3";

package org.astraea.common.generated.admin;

import "org/astraea/common/generated/admin/TopicPartition.proto";

message Broker {
int32 id = 1;
string host = 2;
int32 port = 3;
bool isController = 4;
map<string, string> config = 5;
repeated DataFolder dataFolder = 6;
repeated TopicPartition topicPartitions = 7;
repeated TopicPartition topicPartitionLeaders = 8;

message DataFolder {
string path = 1;
map<string, int64> partitionSizes = 2;
map<string, int64> orphanPartitionSizes = 3;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
syntax = "proto3";

package org.astraea.common.generated.admin;

import "org/astraea/common/generated/admin/Broker.proto";
import "org/astraea/common/generated/admin/Topic.proto";
import "org/astraea/common/generated/admin/Replica.proto";

message ClusterInfo {
string clusterId = 1;
repeated Broker broker = 2;
repeated Topic topic = 3;
repeated Replica replica = 4;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
syntax = "proto3";

package org.astraea.common.generated.admin;

import "org/astraea/common/generated/admin/Broker.proto";

message Replica {
string topic = 1;
int32 partition = 2;
Broker broker = 3;
bool isLeader = 7;
bool isSync = 10;
bool isOffline = 12;
bool isAdding = 8;
bool isRemoving = 9;
bool isFuture = 11;
bool isPreferredLeader = 13;
int64 lag = 4;
int64 size = 5;
string path = 14;
bool isInternal = 6;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
syntax = "proto3";

package org.astraea.common.generated.admin;

message Topic {
string name = 1;
map<string, string> config = 2;
bool internal = 3;
repeated int32 partitionIds = 4;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
syntax = "proto3";

package org.astraea.common.generated.admin;

message TopicPartition {
int32 partition = 1;
string topic = 2;
}
Loading

0 comments on commit 5114183

Please sign in to comment.