From 5ef3f3f2665b8372546f3498e24485d3f10b24eb Mon Sep 17 00:00:00 2001 From: Jules Ivanic Date: Thu, 17 Nov 2022 20:11:46 +0100 Subject: [PATCH] Add Bytes Serde (#108) Co-authored-by: Mathieu Dulac --- zio-kafka-test/src/test/scala/zio/kafka/serde/SerdeSpec.scala | 3 +++ zio-kafka/src/main/scala/zio/kafka/serde/Serdes.scala | 2 ++ 2 files changed, 5 insertions(+) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/serde/SerdeSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/serde/SerdeSpec.scala index 6772023b4..4bbfba115 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/serde/SerdeSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/serde/SerdeSpec.scala @@ -11,6 +11,8 @@ object SerdeSpec extends ZIOSpecDefault { val testDataStructureSerde = Serde.string.inmap[TestDataStructure](TestDataStructure.apply)(_.value) + private val anyBytes = Gen.listOf(Gen.byte).map(bytes => new org.apache.kafka.common.utils.Bytes(bytes.toArray)) + override def spec = suite("Serde")( testSerde(Serde.string, Gen.string), testSerde(Serde.int, Gen.int), @@ -19,6 +21,7 @@ object SerdeSpec extends ZIOSpecDefault { testSerde(Serde.double, Gen.double), testSerde(Serde.long, Gen.long), testSerde(Serde.uuid, Gen.uuid), + testSerde(Serde.bytes, anyBytes), testSerde(Serde.byteArray, Gen.listOf(Gen.byte).map(_.toArray)), suite("asOption")( test("serialize and deserialize None values to null and visa versa") { diff --git a/zio-kafka/src/main/scala/zio/kafka/serde/Serdes.scala b/zio-kafka/src/main/scala/zio/kafka/serde/Serdes.scala index b73bae48d..3c770bbb9 100644 --- a/zio-kafka/src/main/scala/zio/kafka/serde/Serdes.scala +++ b/zio-kafka/src/main/scala/zio/kafka/serde/Serdes.scala @@ -2,6 +2,7 @@ package zio.kafka.serde import org.apache.kafka.common.header.Headers import org.apache.kafka.common.serialization.{ Serde => KafkaSerde, Serdes => KafkaSerdes } +import org.apache.kafka.common.utils.Bytes import zio.{ RIO, ZIO } import java.nio.ByteBuffer @@ -15,6 +16,7 @@ private[zio] trait Serdes { lazy val double: Serde[Any, Double] = convertPrimitiveSerde(KafkaSerdes.Double()).inmap(Double2double)(double2Double) lazy val string: Serde[Any, String] = convertPrimitiveSerde(KafkaSerdes.String()) lazy val byteArray: Serde[Any, Array[Byte]] = convertPrimitiveSerde(KafkaSerdes.ByteArray()) + lazy val bytes: Serde[Any, Bytes] = convertPrimitiveSerde(KafkaSerdes.Bytes()) lazy val byteBuffer: Serde[Any, ByteBuffer] = convertPrimitiveSerde(KafkaSerdes.ByteBuffer()) lazy val uuid: Serde[Any, UUID] = convertPrimitiveSerde(KafkaSerdes.UUID())