Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SERIALIZATION] add serialization and deserialization functionality for ClusterBean #1769

Merged
104 changes: 86 additions & 18 deletions common/src/main/java/org/astraea/common/ByteUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand All @@ -41,6 +43,7 @@
import org.astraea.common.generated.admin.TopicOuterClass;
import org.astraea.common.generated.admin.TopicPartitionOuterClass;
import org.astraea.common.metrics.BeanObject;
import org.astraea.common.metrics.HasBeanObject;

public final class ByteUtils {

Expand Down Expand Up @@ -170,24 +173,7 @@ public static byte[] toBytes(boolean value) {

/** Serialize BeanObject by protocol buffer. The unsupported value will be ignored. */
public static byte[] toBytes(BeanObject value) {
var beanBuilder = BeanObjectOuterClass.BeanObject.newBuilder();
beanBuilder.setDomain(value.domainName());
beanBuilder.putAllProperties(value.properties());
value
.attributes()
.forEach(
(key, val) -> {
try {
beanBuilder.putAttributes(key, primitive(val));
} catch (SerializationException ignore) {
// Bean attribute may contain non-primitive value. e.g. TimeUnit, Byte.
}
});
beanBuilder.setCreatedTimestamp(
Timestamp.newBuilder()
.setSeconds(value.createdTimestamp() / 1000)
.setNanos((int) (value.createdTimestamp() % 1000) * 1000000));
return beanBuilder.build().toByteArray();
return toOuterClass(value).toByteArray();
}

/** Serialize ClusterInfo by protocol buffer. */
Expand All @@ -201,6 +187,33 @@ public static byte[] toBytes(ClusterInfo value) {
.toByteArray();
}

public static byte[] toBytes(Map<Integer, Collection<HasBeanObject>> values) {
var mapOfBeanObjects =
values.entrySet().stream()
.collect(
Collectors.toUnmodifiableMap(
Map.Entry::getKey,
e ->
e.getValue().stream()
.map(HasBeanObject::beanObject)
// convert BeanObject to protocol buffer
.map(ByteUtils::toOuterClass)
.toList()));

return BeanObjectOuterClass.MapOfBeanObjects.newBuilder()
.putAllAllBeans(
mapOfBeanObjects.entrySet().stream()
.collect(
Collectors.toUnmodifiableMap(
Map.Entry::getKey,
Objects ->
BeanObjectOuterClass.MapOfBeanObjects.BeanObjects.newBuilder()
.addAllBeanObjects(Objects.getValue())
.build())))
.build()
.toByteArray();
}

public static int readInt(ReadableByteChannel channel) {
return Utils.packException(
() -> {
Expand Down Expand Up @@ -287,6 +300,36 @@ public static BeanObject readBeanObject(byte[] bytes) throws SerializationExcept
}
}

/** Deserialize to a map with Integer keys and list of BeanObject values using protocol buffer */
public static Map<Integer, List<BeanObject>> readBeanObjects(byte[] bytes) {
try {
var outerClusterBean = BeanObjectOuterClass.MapOfBeanObjects.parseFrom(bytes);
return outerClusterBean.getAllBeansMap().entrySet().stream()
.collect(
Collectors.toUnmodifiableMap(
k -> k.getKey(),
v ->
v.getValue().getBeanObjectsList().stream()
.map(
i ->
new BeanObject(
i.getDomain(),
i.getPropertiesMap(),
i.getAttributesMap().entrySet().stream()
.collect(
Collectors.toUnmodifiableMap(
Map.Entry::getKey,
e ->
Objects.requireNonNull(
toObject(e.getValue())))),
i.getCreatedTimestamp().getSeconds() * 1000
Haser0305 marked this conversation as resolved.
Show resolved Hide resolved
+ i.getCreatedTimestamp().getNanos() / 1000000))
.toList()));
} catch (InvalidProtocolBufferException ex) {
throw new SerializationException(ex);
}
}

/** Deserialize to ClusterInfo with protocol buffer */
public static ClusterInfo readClusterInfo(byte[] bytes) {
try {
Expand Down Expand Up @@ -368,6 +411,31 @@ private static ReplicaOuterClass.Replica toOuterClass(Replica replica) {
.build();
}

private static BeanObjectOuterClass.BeanObject toOuterClass(BeanObject beanObject) {
var beanBuilder = BeanObjectOuterClass.BeanObject.newBuilder();
beanBuilder.setDomain(beanObject.domainName());
beanBuilder.putAllProperties(beanObject.properties());
beanObject
.attributes()
.forEach(
(key, val) -> {
try {
beanBuilder.putAttributes(key, primitive(val));
} catch (SerializationException ignore) {
// Bean attribute may contain non-primitive value. e.g. TimeUnit, Byte.
}
});
return beanBuilder
// the following code sets the created timestamp field using
// the recommended
// style by protobuf documentation.
.setCreatedTimestamp(
Timestamp.newBuilder()
.setSeconds(beanObject.createdTimestamp() / 1000)
.setNanos((int) (beanObject.createdTimestamp() % 1000 * 1000000)))
.build();
}

// -------------------------Deserialize From ProtoBuf Outer Class----------------------------- //

private static Broker.DataFolder toDataFolder(BrokerOuterClass.Broker.DataFolder dataFolder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,12 @@ message BeanObject {
map<string, Primitive> attributes = 3;
google.protobuf.Timestamp createdTimestamp = 4;
}


message MapOfBeanObjects {
message BeanObjects {
repeated BeanObject beanObjects = 1;
}

map<int32, BeanObjects> allBeans = 1;
}
38 changes: 38 additions & 0 deletions common/src/test/java/org/astraea/common/ByteUtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.astraea.common.admin.Admin;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.metrics.BeanObject;
import org.astraea.common.metrics.ClusterBean;
import org.astraea.common.metrics.broker.HasGauge;
import org.astraea.common.metrics.broker.LogMetrics;
import org.astraea.it.Service;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -113,4 +119,36 @@ void testReadAndToBytesEmptyClusterInfo() {
Assertions.assertEquals(clusterInfo.topics(), deserializedClusterInfo.topics());
Assertions.assertEquals(clusterInfo.replicas(), deserializedClusterInfo.replicas());
}

@Test
void testReadAndToBytesBeanObjects() {
BeanObject testBeanObject =
new BeanObject(
"kafka.log",
Map.of(
"name",
LogMetrics.Log.SIZE.metricName(),
"type",
"Log",
"topic",
"testBeans",
"partition",
"0"),
Map.of("Value", 100));
var clusterBean = ClusterBean.of(Map.of(1, List.of(HasGauge.of(testBeanObject))));

var bytes = ByteUtils.toBytes(clusterBean.all());
var deserializedClusterBean = ByteUtils.readBeanObjects(bytes);

Assertions.assertEquals(1, deserializedClusterBean.size());
Assertions.assertEquals(
testBeanObject.domainName(), deserializedClusterBean.get(1).get(0).domainName());
Assertions.assertEquals(
testBeanObject.createdTimestamp(),
deserializedClusterBean.get(1).get(0).createdTimestamp());
Assertions.assertEquals(
testBeanObject.properties(), deserializedClusterBean.get(1).get(0).properties());
Assertions.assertEquals(
testBeanObject.attributes(), deserializedClusterBean.get(1).get(0).attributes());
}
}