Skip to content

Commit

Permalink
Fix error thrown by ConsoleConsumerTest and CustomDeserializerTest
Browse files Browse the repository at this point in the history
  • Loading branch information
badaiaqrandista committed Oct 2, 2020
1 parent b112a26 commit 4f1c2f8
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 15 deletions.
20 changes: 8 additions & 12 deletions core/src/main/scala/kafka/tools/ConsoleConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -503,8 +503,12 @@ class DefaultMessageFormatter extends MessageFormatter {
output.write(lineSeparator)
}

def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String): Unit = {
output.write(deserialize(deserializer, sourceBytes, topic))
def deserialize(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String) = {
val nonNullBytes = Option(sourceBytes).getOrElse(nullLiteral)
val convertedBytes = deserializer
.map(d => utfBytes(d.deserialize(topic, consumerRecord.headers, nonNullBytes).toString))
.getOrElse(nonNullBytes)
convertedBytes
}

import consumerRecord._
Expand Down Expand Up @@ -546,12 +550,12 @@ class DefaultMessageFormatter extends MessageFormatter {
}

if (printKey) {
write(keyDeserializer, key, topic)
output.write(deserialize(keyDeserializer, key, topic))
writeSeparator(columnSeparator = printValue)
}

if (printValue) {
write(valueDeserializer, value, topic)
output.write(deserialize(valueDeserializer, value, topic))
output.write(lineSeparator)
}
}
Expand All @@ -565,14 +569,6 @@ class DefaultMessageFormatter extends MessageFormatter {
newConfigs.asJava
}

private def deserialize(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String) = {
val nonNullBytes = Option(sourceBytes).getOrElse(nullLiteral)
val convertedBytes = deserializer
.map(d => utfBytes(d.deserialize(topic, nonNullBytes).toString))
.getOrElse(nonNullBytes)
convertedBytes
}

private def utfBytes(str: String) = str.getBytes(StandardCharsets.UTF_8)

private def getByteProperty(configs: Map[String, _], key: String): Array[Byte] = {
Expand Down
12 changes: 9 additions & 3 deletions core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -518,18 +518,24 @@ class ConsoleConsumerTest {
formatter.configure(configs)
out = new ByteArrayOutputStream()
formatter.writeTo(record, new PrintStream(out))
assertEquals("key\tvalue\t0\n", out.toString)
assertEquals("Partition:0\tkey\tvalue\n", out.toString)

configs.put("print.timestamp", "true")
formatter.configure(configs)
out = new ByteArrayOutputStream()
formatter.writeTo(record, new PrintStream(out))
assertEquals("NO_TIMESTAMP\tkey\tvalue\t0\n", out.toString)
assertEquals("NO_TIMESTAMP\tPartition:0\tkey\tvalue\n", out.toString)

configs.put("print.offset", "true")
formatter.configure(configs)
out = new ByteArrayOutputStream()
formatter.writeTo(record, new PrintStream(out))
assertEquals("NO_TIMESTAMP\tPartition:0\tOffset:123\tkey\tvalue\n", out.toString)

out = new ByteArrayOutputStream()
val record2 = new ConsumerRecord("topic", 0, 123, 123L, TimestampType.CREATE_TIME, 321L, -1, -1, "key".getBytes, "value".getBytes)
formatter.writeTo(record2, new PrintStream(out))
assertEquals("CreateTime:123\tkey\tvalue\t0\n", out.toString)
assertEquals("CreateTime:123\tPartition:0\tOffset:123\tkey\tvalue\n", out.toString)
formatter.close()
}

Expand Down

0 comments on commit 4f1c2f8

Please sign in to comment.