From 4f1c2f880ed3eb33e7f5e9b6eb7abe94df94856c Mon Sep 17 00:00:00 2001 From: Badai Aqrandista Date: Sat, 22 Aug 2020 23:27:21 +1000 Subject: [PATCH] Fix error thrown by ConsoleConsumerTest and CustomDeserializerTest --- .../scala/kafka/tools/ConsoleConsumer.scala | 20 ++++++++----------- .../kafka/tools/ConsoleConsumerTest.scala | 12 ++++++++--- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index ad88cd6a3883..4de15b9f373f 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -503,8 +503,12 @@ class DefaultMessageFormatter extends MessageFormatter { output.write(lineSeparator) } - def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String): Unit = { - output.write(deserialize(deserializer, sourceBytes, topic)) + def deserialize(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String) = { + val nonNullBytes = Option(sourceBytes).getOrElse(nullLiteral) + val convertedBytes = deserializer + .map(d => utfBytes(d.deserialize(topic, consumerRecord.headers, nonNullBytes).toString)) + .getOrElse(nonNullBytes) + convertedBytes } import consumerRecord._ @@ -546,12 +550,12 @@ class DefaultMessageFormatter extends MessageFormatter { } if (printKey) { - write(keyDeserializer, key, topic) + output.write(deserialize(keyDeserializer, key, topic)) writeSeparator(columnSeparator = printValue) } if (printValue) { - write(valueDeserializer, value, topic) + output.write(deserialize(valueDeserializer, value, topic)) output.write(lineSeparator) } } @@ -565,14 +569,6 @@ class DefaultMessageFormatter extends MessageFormatter { newConfigs.asJava } - private def deserialize(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String) = { - val nonNullBytes = Option(sourceBytes).getOrElse(nullLiteral) - val convertedBytes = deserializer - .map(d => utfBytes(d.deserialize(topic, nonNullBytes).toString)) - .getOrElse(nonNullBytes) - convertedBytes - } - private def utfBytes(str: String) = str.getBytes(StandardCharsets.UTF_8) private def getByteProperty(configs: Map[String, _], key: String): Array[Byte] = { diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala index e930ad5644d7..8387201f0739 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala @@ -518,18 +518,24 @@ class ConsoleConsumerTest { formatter.configure(configs) out = new ByteArrayOutputStream() formatter.writeTo(record, new PrintStream(out)) - assertEquals("key\tvalue\t0\n", out.toString) + assertEquals("Partition:0\tkey\tvalue\n", out.toString) configs.put("print.timestamp", "true") formatter.configure(configs) out = new ByteArrayOutputStream() formatter.writeTo(record, new PrintStream(out)) - assertEquals("NO_TIMESTAMP\tkey\tvalue\t0\n", out.toString) + assertEquals("NO_TIMESTAMP\tPartition:0\tkey\tvalue\n", out.toString) + + configs.put("print.offset", "true") + formatter.configure(configs) + out = new ByteArrayOutputStream() + formatter.writeTo(record, new PrintStream(out)) + assertEquals("NO_TIMESTAMP\tPartition:0\tOffset:123\tkey\tvalue\n", out.toString) out = new ByteArrayOutputStream() val record2 = new ConsumerRecord("topic", 0, 123, 123L, TimestampType.CREATE_TIME, 321L, -1, -1, "key".getBytes, "value".getBytes) formatter.writeTo(record2, new PrintStream(out)) - assertEquals("CreateTime:123\tkey\tvalue\t0\n", out.toString) + assertEquals("CreateTime:123\tPartition:0\tOffset:123\tkey\tvalue\n", out.toString) formatter.close() }