diff --git a/common/src/main/java/org/astraea/common/ByteUtils.java b/common/src/main/java/org/astraea/common/ByteUtils.java index 68a233d217..b681d17718 100644 --- a/common/src/main/java/org/astraea/common/ByteUtils.java +++ b/common/src/main/java/org/astraea/common/ByteUtils.java @@ -22,6 +22,8 @@ import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -41,6 +43,7 @@ import org.astraea.common.generated.admin.TopicOuterClass; import org.astraea.common.generated.admin.TopicPartitionOuterClass; import org.astraea.common.metrics.BeanObject; +import org.astraea.common.metrics.HasBeanObject; public final class ByteUtils { @@ -170,24 +173,7 @@ public static byte[] toBytes(boolean value) { /** Serialize BeanObject by protocol buffer. The unsupported value will be ignored. */ public static byte[] toBytes(BeanObject value) { - var beanBuilder = BeanObjectOuterClass.BeanObject.newBuilder(); - beanBuilder.setDomain(value.domainName()); - beanBuilder.putAllProperties(value.properties()); - value - .attributes() - .forEach( - (key, val) -> { - try { - beanBuilder.putAttributes(key, primitive(val)); - } catch (SerializationException ignore) { - // Bean attribute may contain non-primitive value. e.g. TimeUnit, Byte. - } - }); - beanBuilder.setCreatedTimestamp( - Timestamp.newBuilder() - .setSeconds(value.createdTimestamp() / 1000) - .setNanos((int) (value.createdTimestamp() % 1000) * 1000000)); - return beanBuilder.build().toByteArray(); + return toOuterClass(value).toByteArray(); } /** Serialize ClusterInfo by protocol buffer. */ @@ -201,6 +187,33 @@ public static byte[] toBytes(ClusterInfo value) { .toByteArray(); } + public static byte[] toBytes(Map> values) { + var mapOfBeanObjects = + values.entrySet().stream() + .collect( + Collectors.toUnmodifiableMap( + Map.Entry::getKey, + e -> + e.getValue().stream() + .map(HasBeanObject::beanObject) + // convert BeanObject to protocol buffer + .map(ByteUtils::toOuterClass) + .toList())); + + return BeanObjectOuterClass.MapOfBeanObjects.newBuilder() + .putAllAllBeans( + mapOfBeanObjects.entrySet().stream() + .collect( + Collectors.toUnmodifiableMap( + Map.Entry::getKey, + Objects -> + BeanObjectOuterClass.MapOfBeanObjects.BeanObjects.newBuilder() + .addAllBeanObjects(Objects.getValue()) + .build()))) + .build() + .toByteArray(); + } + public static int readInt(ReadableByteChannel channel) { return Utils.packException( () -> { @@ -287,6 +300,36 @@ public static BeanObject readBeanObject(byte[] bytes) throws SerializationExcept } } + /** Deserialize to a map with Integer keys and list of BeanObject values using protocol buffer */ + public static Map> readBeanObjects(byte[] bytes) { + try { + var outerClusterBean = BeanObjectOuterClass.MapOfBeanObjects.parseFrom(bytes); + return outerClusterBean.getAllBeansMap().entrySet().stream() + .collect( + Collectors.toUnmodifiableMap( + k -> k.getKey(), + v -> + v.getValue().getBeanObjectsList().stream() + .map( + i -> + new BeanObject( + i.getDomain(), + i.getPropertiesMap(), + i.getAttributesMap().entrySet().stream() + .collect( + Collectors.toUnmodifiableMap( + Map.Entry::getKey, + e -> + Objects.requireNonNull( + toObject(e.getValue())))), + i.getCreatedTimestamp().getSeconds() * 1000 + + i.getCreatedTimestamp().getNanos() / 1000000)) + .toList())); + } catch (InvalidProtocolBufferException ex) { + throw new SerializationException(ex); + } + } + /** Deserialize to ClusterInfo with protocol buffer */ public static ClusterInfo readClusterInfo(byte[] bytes) { try { @@ -368,6 +411,31 @@ private static ReplicaOuterClass.Replica toOuterClass(Replica replica) { .build(); } + private static BeanObjectOuterClass.BeanObject toOuterClass(BeanObject beanObject) { + var beanBuilder = BeanObjectOuterClass.BeanObject.newBuilder(); + beanBuilder.setDomain(beanObject.domainName()); + beanBuilder.putAllProperties(beanObject.properties()); + beanObject + .attributes() + .forEach( + (key, val) -> { + try { + beanBuilder.putAttributes(key, primitive(val)); + } catch (SerializationException ignore) { + // Bean attribute may contain non-primitive value. e.g. TimeUnit, Byte. + } + }); + return beanBuilder + // the following code sets the created timestamp field using + // the recommended + // style by protobuf documentation. + .setCreatedTimestamp( + Timestamp.newBuilder() + .setSeconds(beanObject.createdTimestamp() / 1000) + .setNanos((int) (beanObject.createdTimestamp() % 1000 * 1000000))) + .build(); + } + // -------------------------Deserialize From ProtoBuf Outer Class----------------------------- // private static Broker.DataFolder toDataFolder(BrokerOuterClass.Broker.DataFolder dataFolder) { diff --git a/common/src/main/proto/org/astraea/common/generated/BeanObject.proto b/common/src/main/proto/org/astraea/common/generated/BeanObject.proto index a1669bb67b..0577576b42 100644 --- a/common/src/main/proto/org/astraea/common/generated/BeanObject.proto +++ b/common/src/main/proto/org/astraea/common/generated/BeanObject.proto @@ -11,3 +11,12 @@ message BeanObject { map attributes = 3; google.protobuf.Timestamp createdTimestamp = 4; } + + +message MapOfBeanObjects { + message BeanObjects { + repeated BeanObject beanObjects = 1; + } + + map allBeans = 1; +} \ No newline at end of file diff --git a/common/src/test/java/org/astraea/common/ByteUtilsTest.java b/common/src/test/java/org/astraea/common/ByteUtilsTest.java index 03e7a0cc76..862d04cc49 100644 --- a/common/src/test/java/org/astraea/common/ByteUtilsTest.java +++ b/common/src/test/java/org/astraea/common/ByteUtilsTest.java @@ -18,9 +18,15 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.List; +import java.util.Map; import java.util.Set; import org.astraea.common.admin.Admin; import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.metrics.BeanObject; +import org.astraea.common.metrics.ClusterBean; +import org.astraea.common.metrics.broker.HasGauge; +import org.astraea.common.metrics.broker.LogMetrics; import org.astraea.it.Service; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -113,4 +119,36 @@ void testReadAndToBytesEmptyClusterInfo() { Assertions.assertEquals(clusterInfo.topics(), deserializedClusterInfo.topics()); Assertions.assertEquals(clusterInfo.replicas(), deserializedClusterInfo.replicas()); } + + @Test + void testReadAndToBytesBeanObjects() { + BeanObject testBeanObject = + new BeanObject( + "kafka.log", + Map.of( + "name", + LogMetrics.Log.SIZE.metricName(), + "type", + "Log", + "topic", + "testBeans", + "partition", + "0"), + Map.of("Value", 100)); + var clusterBean = ClusterBean.of(Map.of(1, List.of(HasGauge.of(testBeanObject)))); + + var bytes = ByteUtils.toBytes(clusterBean.all()); + var deserializedClusterBean = ByteUtils.readBeanObjects(bytes); + + Assertions.assertEquals(1, deserializedClusterBean.size()); + Assertions.assertEquals( + testBeanObject.domainName(), deserializedClusterBean.get(1).get(0).domainName()); + Assertions.assertEquals( + testBeanObject.createdTimestamp(), + deserializedClusterBean.get(1).get(0).createdTimestamp()); + Assertions.assertEquals( + testBeanObject.properties(), deserializedClusterBean.get(1).get(0).properties()); + Assertions.assertEquals( + testBeanObject.attributes(), deserializedClusterBean.get(1).get(0).attributes()); + } }