Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SERIALIZATION] add serialization and deserialization functionality for ClusterBean #1769

Merged
103 changes: 85 additions & 18 deletions common/src/main/java/org/astraea/common/ByteUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand All @@ -41,6 +43,7 @@
import org.astraea.common.generated.admin.TopicOuterClass;
import org.astraea.common.generated.admin.TopicPartitionOuterClass;
import org.astraea.common.metrics.BeanObject;
import org.astraea.common.metrics.HasBeanObject;

public final class ByteUtils {

Expand Down Expand Up @@ -170,24 +173,7 @@ public static byte[] toBytes(boolean value) {

/** Serialize BeanObject by protocol buffer. The unsupported value will be ignored. */
public static byte[] toBytes(BeanObject value) {
var beanBuilder = BeanObjectOuterClass.BeanObject.newBuilder();
beanBuilder.setDomain(value.domainName());
beanBuilder.putAllProperties(value.properties());
value
.attributes()
.forEach(
(key, val) -> {
try {
beanBuilder.putAttributes(key, primitive(val));
} catch (SerializationException ignore) {
// Bean attribute may contain non-primitive value. e.g. TimeUnit, Byte.
}
});
beanBuilder.setCreatedTimestamp(
Timestamp.newBuilder()
.setSeconds(value.createdTimestamp() / 1000)
.setNanos((int) (value.createdTimestamp() % 1000) * 1000000));
return beanBuilder.build().toByteArray();
return toOuterClass(value).toByteArray();
}

/** Serialize ClusterInfo by protocol buffer. */
Expand All @@ -201,6 +187,33 @@ public static byte[] toBytes(ClusterInfo value) {
.toByteArray();
}

public static byte[] toBytes(Map<Integer, Collection<HasBeanObject>> values) {
var mapOfBeanObjects =
values.entrySet().stream()
.collect(
Collectors.toUnmodifiableMap(
Map.Entry::getKey,
e ->
e.getValue().stream()
.map(HasBeanObject::beanObject)
// convert BeanObject to protocol buffer
.map(ByteUtils::toOuterClass)
.toList()));

return BeanObjectOuterClass.MapOfBeanObjects.newBuilder()
.putAllAllBeans(
mapOfBeanObjects.entrySet().stream()
.collect(
Collectors.toUnmodifiableMap(
Map.Entry::getKey,
Objects ->
BeanObjectOuterClass.MapOfBeanObjects.BeanObjects.newBuilder()
.addAllBeanObjects(Objects.getValue())
.build())))
.build()
.toByteArray();
}

public static int readInt(ReadableByteChannel channel) {
return Utils.packException(
() -> {
Expand Down Expand Up @@ -368,6 +381,31 @@ private static ReplicaOuterClass.Replica toOuterClass(Replica replica) {
.build();
}

private static BeanObjectOuterClass.BeanObject toOuterClass(BeanObject beanObject) {
var beanBuilder = BeanObjectOuterClass.BeanObject.newBuilder();
beanBuilder.setDomain(beanObject.domainName());
beanBuilder.putAllProperties(beanObject.properties());
beanObject
.attributes()
.forEach(
(key, val) -> {
try {
beanBuilder.putAttributes(key, primitive(val));
} catch (SerializationException ignore) {
// Bean attribute may contain non-primitive value. e.g. TimeUnit, Byte.
}
});
return beanBuilder
// the following code sets the created timestamp field using
// the recommended
// style by protobuf documentation.
.setCreatedTimestamp(
Timestamp.newBuilder()
.setSeconds(beanObject.createdTimestamp() / 1000)
.setNanos((int) (beanObject.createdTimestamp() % 1000 * 1000000)))
.build();
}

// -------------------------Deserialize From ProtoBuf Outer Class----------------------------- //

private static Broker.DataFolder toDataFolder(BrokerOuterClass.Broker.DataFolder dataFolder) {
Expand Down Expand Up @@ -429,6 +467,35 @@ private static Replica toReplica(ReplicaOuterClass.Replica replica) {
.build();
}

public static Map<Integer, List<BeanObject>> readBeanObjects(byte[] bytes) {
try {
var outerClusterBean = BeanObjectOuterClass.MapOfBeanObjects.parseFrom(bytes);
return outerClusterBean.getAllBeansMap().entrySet().stream()
.collect(
Collectors.toUnmodifiableMap(
k -> k.getKey(),
v ->
v.getValue().getBeanObjectsList().stream()
.map(
i ->
new BeanObject(
i.getDomain(),
i.getPropertiesMap(),
i.getAttributesMap().entrySet().stream()
.collect(
Collectors.toUnmodifiableMap(
Map.Entry::getKey,
e ->
Objects.requireNonNull(
toObject(e.getValue())))),
i.getCreatedTimestamp().getSeconds() * 1000
Haser0305 marked this conversation as resolved.
Show resolved Hide resolved
+ i.getCreatedTimestamp().getNanos() / 1000000))
.toList()));
} catch (InvalidProtocolBufferException ex) {
throw new SerializationException(ex);
}
}

// --------------------------------ProtoBuf Primitive----------------------------------------- //

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,12 @@ message BeanObject {
map<string, Primitive> attributes = 3;
google.protobuf.Timestamp createdTimestamp = 4;
}


message MapOfBeanObjects {
message BeanObjects {
repeated BeanObject beanObjects = 1;
}

map<int32, BeanObjects> allBeans = 1;
}
38 changes: 38 additions & 0 deletions common/src/test/java/org/astraea/common/ByteUtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.astraea.common.admin.Admin;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.metrics.BeanObject;
import org.astraea.common.metrics.ClusterBean;
import org.astraea.common.metrics.broker.HasGauge;
import org.astraea.common.metrics.broker.LogMetrics;
import org.astraea.it.Service;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -113,4 +119,36 @@ void testReadAndToBytesEmptyClusterInfo() {
Assertions.assertEquals(clusterInfo.topics(), deserializedClusterInfo.topics());
Assertions.assertEquals(clusterInfo.replicas(), deserializedClusterInfo.replicas());
}

@Test
void testReadAndToBytesBeanObjects() {
BeanObject testBeanObject =
new BeanObject(
"kafka.log",
Map.of(
"name",
LogMetrics.Log.SIZE.metricName(),
"type",
"Log",
"topic",
"testBeans",
"partition",
"0"),
Map.of("Value", 100));
var clusterBean = ClusterBean.of(Map.of(1, List.of(HasGauge.of(testBeanObject))));

var bytes = ByteUtils.toBytes(clusterBean.all());
var deserializedClusterBean = ByteUtils.readBeanObjects(bytes);

Assertions.assertEquals(1, deserializedClusterBean.size());
Assertions.assertEquals(
testBeanObject.domainName(), deserializedClusterBean.get(1).get(0).domainName());
Assertions.assertEquals(
testBeanObject.createdTimestamp(),
deserializedClusterBean.get(1).get(0).createdTimestamp());
Assertions.assertEquals(
testBeanObject.properties(), deserializedClusterBean.get(1).get(0).properties());
Assertions.assertEquals(
testBeanObject.attributes(), deserializedClusterBean.get(1).get(0).attributes());
}
}