Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SERIALIZATION] Serialize Borker info in Clusterinfo #1721

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 137 additions & 81 deletions common/src/main/java/org/astraea/common/ByteUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@
import org.astraea.common.admin.Topic;
import org.astraea.common.admin.TopicPartition;
import org.astraea.common.generated.BeanObjectOuterClass;
import org.astraea.common.generated.ClusterInfoOuterClass;
import org.astraea.common.generated.PrimitiveOuterClass;
import org.astraea.common.generated.admin.BrokerOuterClass;
import org.astraea.common.generated.admin.ClusterInfoOuterClass;
import org.astraea.common.generated.admin.ReplicaOuterClass;
import org.astraea.common.generated.admin.TopicOuterClass;
import org.astraea.common.generated.admin.TopicPartitionOuterClass;
import org.astraea.common.metrics.BeanObject;

public final class ByteUtils {
Expand Down Expand Up @@ -186,58 +190,13 @@ public static byte[] toBytes(BeanObject value) {
return beanBuilder.build().toByteArray();
}

// TODO: Due to the change of NodeInfo to Broker. This and the test should be updated.
/** Serialize ClusterInfo by protocol buffer. */
public static byte[] toBytes(ClusterInfo value) {
return ClusterInfoOuterClass.ClusterInfo.newBuilder()
.setClusterId(value.clusterId())
.addAllNodeInfo(
value.brokers().stream()
.map(
nodeInfo ->
ClusterInfoOuterClass.ClusterInfo.NodeInfo.newBuilder()
.setId(nodeInfo.id())
.setHost(nodeInfo.host())
.setPort(nodeInfo.port())
.build())
.collect(Collectors.toList()))
.addAllTopic(
value.topics().values().stream()
.map(
topicClass ->
ClusterInfoOuterClass.ClusterInfo.Topic.newBuilder()
.setName(topicClass.name())
.putAllConfig(topicClass.config().raw())
.setInternal(topicClass.internal())
.addAllPartition(
topicClass.topicPartitions().stream()
.map(TopicPartition::partition)
.collect(Collectors.toList()))
.build())
.collect(Collectors.toList()))
.addAllReplica(
value.replicas().stream()
.map(
replica ->
ClusterInfoOuterClass.ClusterInfo.Replica.newBuilder()
.setTopic(replica.topic())
.setPartition(replica.partition())
.setNodeInfo(
ClusterInfoOuterClass.ClusterInfo.NodeInfo.newBuilder()
.setId(replica.broker().id())
.setHost(replica.broker().host())
.setPort(replica.broker().port())
.build())
.setLag(replica.lag())
.setSize(replica.size())
.setIsLeader(replica.isLeader())
.setIsSync(replica.isSync())
.setIsFuture(replica.isFuture())
.setIsOffline(replica.isOffline())
.setIsPreferredLeader(replica.isPreferredLeader())
.setPath(replica.path())
.build())
.collect(Collectors.toList()))
.addAllBroker(value.brokers().stream().map(ByteUtils::toOuterClass).toList())
.addAllTopic(value.topics().values().stream().map(ByteUtils::toOuterClass).toList())
.addAllReplica(value.replicas().stream().map(ByteUtils::toOuterClass).toList())
.build()
.toByteArray();
}
Expand Down Expand Up @@ -328,51 +287,148 @@ public static BeanObject readBeanObject(byte[] bytes) throws SerializationExcept
}
}

// TODO: Due to the change of NodeInfo to Broker. This and the test should be updated.
/** Deserialize to ClusterInfo with protocol buffer */
public static ClusterInfo readClusterInfo(byte[] bytes) {
try {
var outerClusterInfo = ClusterInfoOuterClass.ClusterInfo.parseFrom(bytes);
return ClusterInfo.of(
outerClusterInfo.getClusterId(),
outerClusterInfo.getNodeInfoList().stream()
.map(nodeInfo -> Broker.of(nodeInfo.getId(), nodeInfo.getHost(), nodeInfo.getPort()))
.collect(Collectors.toList()),
outerClusterInfo.getBrokerList().stream().map(ByteUtils::toBroker).toList(),
outerClusterInfo.getTopicList().stream()
.map(
protoTopic ->
new Topic(
protoTopic.getName(),
new Config(protoTopic.getConfigMap()),
protoTopic.getInternal(),
Set.copyOf(protoTopic.getPartitionList())))
.map(ByteUtils::toTopic)
.collect(Collectors.toMap(Topic::name, Function.identity())),
outerClusterInfo.getReplicaList().stream()
.map(
replica ->
Replica.builder()
.topic(replica.getTopic())
.partition(replica.getPartition())
.broker(
Broker.of(
replica.getNodeInfo().getId(),
replica.getNodeInfo().getHost(),
replica.getNodeInfo().getPort()))
.lag(replica.getLag())
.size(replica.getSize())
.isLeader(replica.getIsLeader())
.isSync(replica.getIsSync())
.isFuture(replica.getIsFuture())
.isOffline(replica.getIsOffline())
.isPreferredLeader(replica.getIsPreferredLeader())
.path(replica.getPath())
.build())
.collect(Collectors.toList()));
outerClusterInfo.getReplicaList().stream().map(ByteUtils::toReplica).toList());
} catch (InvalidProtocolBufferException ex) {
throw new SerializationException(ex);
}
}

// ---------------------------Serialize To ProtoBuf Outer Class------------------------------- //

private static BrokerOuterClass.Broker.DataFolder toOuterClass(Broker.DataFolder dataFolder) {
return BrokerOuterClass.Broker.DataFolder.newBuilder()
.setPath(dataFolder.path())
.putAllPartitionSizes(
dataFolder.partitionSizes().entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey().toString(), Map.Entry::getValue)))
.putAllOrphanPartitionSizes(
dataFolder.orphanPartitionSizes().entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey().toString(), Map.Entry::getValue)))
.build();
}

private static TopicPartitionOuterClass.TopicPartition toOuterClass(
TopicPartition topicPartition) {
return TopicPartitionOuterClass.TopicPartition.newBuilder()
.setPartition(topicPartition.partition())
.setTopic(topicPartition.topic())
.build();
}

private static BrokerOuterClass.Broker toOuterClass(Broker broker) {
return BrokerOuterClass.Broker.newBuilder()
.setId(broker.id())
.setHost(broker.host())
.setPort(broker.port())
.setIsController(broker.isController())
.putAllConfig(broker.config().raw())
.addAllDataFolder(broker.dataFolders().stream().map(ByteUtils::toOuterClass).toList())
.addAllTopicPartitions(
broker.topicPartitions().stream().map(ByteUtils::toOuterClass).toList())
.addAllTopicPartitionLeaders(
broker.topicPartitionLeaders().stream().map(ByteUtils::toOuterClass).toList())
.build();
}

private static TopicOuterClass.Topic toOuterClass(Topic topic) {
return TopicOuterClass.Topic.newBuilder()
.setName(topic.name())
.putAllConfig(topic.config().raw())
.setInternal(topic.internal())
.addAllPartitionIds(topic.partitionIds())
.build();
}

private static ReplicaOuterClass.Replica toOuterClass(Replica replica) {
return ReplicaOuterClass.Replica.newBuilder()
.setTopic(replica.topic())
.setPartition(replica.partition())
.setBroker(toOuterClass(replica.broker()))
.setLag(replica.lag())
.setSize(replica.size())
.setIsInternal(replica.isInternal())
.setIsLeader(replica.isLeader())
.setIsAdding(replica.isAdding())
.setIsRemoving(replica.isRemoving())
.setIsSync(replica.isSync())
.setIsFuture(replica.isFuture())
.setIsOffline(replica.isOffline())
.setIsPreferredLeader(replica.isPreferredLeader())
.setPath(replica.path())
.build();
}

// -------------------------Deserialize From ProtoBuf Outer Class----------------------------- //

private static Broker.DataFolder toDataFolder(BrokerOuterClass.Broker.DataFolder dataFolder) {
return new Broker.DataFolder(
dataFolder.getPath(),
dataFolder.getPartitionSizesMap().entrySet().stream()
.collect(
Collectors.toMap(entry -> TopicPartition.of(entry.getKey()), Map.Entry::getValue)),
dataFolder.getOrphanPartitionSizesMap().entrySet().stream()
.collect(
Collectors.toMap(entry -> TopicPartition.of(entry.getKey()), Map.Entry::getValue)));
}

private static TopicPartition toTopicPartition(
TopicPartitionOuterClass.TopicPartition topicPartition) {
return new TopicPartition(topicPartition.getTopic(), topicPartition.getPartition());
}

private static Broker toBroker(BrokerOuterClass.Broker broker) {
return new Broker(
broker.getId(),
broker.getHost(),
broker.getPort(),
broker.getIsController(),
new Config(broker.getConfigMap()),
broker.getDataFolderList().stream().map(ByteUtils::toDataFolder).toList(),
broker.getTopicPartitionsList().stream()
.map(ByteUtils::toTopicPartition)
.collect(Collectors.toSet()),
broker.getTopicPartitionLeadersList().stream()
.map(ByteUtils::toTopicPartition)
.collect(Collectors.toSet()));
}

private static Topic toTopic(TopicOuterClass.Topic topic) {
return new Topic(
topic.getName(),
new Config(topic.getConfigMap()),
topic.getInternal(),
Set.copyOf(topic.getPartitionIdsList()));
}

private static Replica toReplica(ReplicaOuterClass.Replica replica) {
return Replica.builder()
.topic(replica.getTopic())
.partition(replica.getPartition())
.broker(toBroker(replica.getBroker()))
.lag(replica.getLag())
.size(replica.getSize())
.isInternal(replica.getIsInternal())
.isLeader(replica.getIsLeader())
.isAdding(replica.getIsAdding())
.isRemoving(replica.getIsRemoving())
.isSync(replica.getIsSync())
.isFuture(replica.getIsFuture())
.isOffline(replica.getIsOffline())
.isPreferredLeader(replica.getIsPreferredLeader())
.path(replica.getPath())
.build();
}

// --------------------------------ProtoBuf Primitive----------------------------------------- //

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
syntax = "proto3";

package org.astraea.common.generated.admin;

import "org/astraea/common/generated/admin/TopicPartition.proto";

message Broker {
int32 id = 1;
string host = 2;
int32 port = 3;
bool isController = 4;
map<string, string> config = 5;
repeated DataFolder dataFolder = 6;
repeated TopicPartition topicPartitions = 7;
repeated TopicPartition topicPartitionLeaders = 8;

message DataFolder {
string path = 1;
map<string, int64> partitionSizes = 2;
map<string, int64> orphanPartitionSizes = 3;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
syntax = "proto3";

package org.astraea.common.generated.admin;

import "org/astraea/common/generated/admin/Broker.proto";
import "org/astraea/common/generated/admin/Topic.proto";
import "org/astraea/common/generated/admin/Replica.proto";

message ClusterInfo {
string clusterId = 1;
repeated Broker broker = 2;
repeated Topic topic = 3;
repeated Replica replica = 4;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
syntax = "proto3";

package org.astraea.common.generated.admin;

import "org/astraea/common/generated/admin/Broker.proto";

message Replica {
string topic = 1;
int32 partition = 2;
Broker broker = 3;
bool isLeader = 7;
bool isSync = 10;
bool isOffline = 12;
bool isAdding = 8;
bool isRemoving = 9;
bool isFuture = 11;
bool isPreferredLeader = 13;
int64 lag = 4;
int64 size = 5;
string path = 14;
bool isInternal = 6;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
syntax = "proto3";

package org.astraea.common.generated.admin;

message Topic {
string name = 1;
map<string, string> config = 2;
bool internal = 3;
repeated int32 partitionIds = 4;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
syntax = "proto3";

package org.astraea.common.generated.admin;

message TopicPartition {
int32 partition = 1;
string topic = 2;
}
Loading