diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 4aa71c04ed2ad..4de15b9f373f5 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -221,16 +221,23 @@ object ConsoleConsumer extends Logging { .ofType(classOf[String]) .defaultsTo(classOf[DefaultMessageFormatter].getName) val messageFormatterArgOpt = parser.accepts("property", - "The properties to initialize the message formatter. Default properties include:\n" + - "\tprint.timestamp=true|false\n" + - "\tprint.key=true|false\n" + - "\tprint.value=true|false\n" + - "\tkey.separator=\n" + - "\tline.separator=\n" + - "\tkey.deserializer=\n" + - "\tvalue.deserializer=\n" + - "\nUsers can also pass in customized properties for their formatter; more specifically, users " + - "can pass in properties keyed with \'key.deserializer.\' and \'value.deserializer.\' prefixes to configure their deserializers.") + """The properties to initialize the message formatter. Default properties include: + | print.timestamp=true|false + | print.key=true|false + | print.offset=true|false + | print.partition=true|false + | print.headers=true|false + | print.value=true|false + | key.separator= + | line.separator= + | headers.separator= + | null.literal= + | key.deserializer= + | value.deserializer= + | header.deserializer= + | + |Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers.""" + .stripMargin) .withRequiredArg .describedAs("prop") .ofType(classOf[String]) @@ -459,48 +466,32 @@ class DefaultMessageFormatter extends MessageFormatter { var printKey = false var printValue = true var printPartition = false - var keySeparator = "\t".getBytes(StandardCharsets.UTF_8) - var lineSeparator = "\n".getBytes(StandardCharsets.UTF_8) + var printOffset = false + var printHeaders = false + var keySeparator = utfBytes("\t") + var lineSeparator = utfBytes("\n") + var headersSeparator = utfBytes(",") + var nullLiteral = utfBytes("null") var keyDeserializer: Option[Deserializer[_]] = None var valueDeserializer: Option[Deserializer[_]] = None + var headersDeserializer: Option[Deserializer[_]] = None override def configure(configs: Map[String, _]): Unit = { - val props = new java.util.Properties() - configs.asScala.forKeyValue { (key, value) => props.put(key, value.toString) } - if (props.containsKey("print.timestamp")) - printTimestamp = props.getProperty("print.timestamp").trim.equalsIgnoreCase("true") - if (props.containsKey("print.key")) - printKey = props.getProperty("print.key").trim.equalsIgnoreCase("true") - if (props.containsKey("print.value")) - printValue = props.getProperty("print.value").trim.equalsIgnoreCase("true") - if (props.containsKey("print.partition")) - printPartition = props.getProperty("print.partition").trim.equalsIgnoreCase("true") - if (props.containsKey("key.separator")) - keySeparator = props.getProperty("key.separator").getBytes(StandardCharsets.UTF_8) - if (props.containsKey("line.separator")) - lineSeparator = props.getProperty("line.separator").getBytes(StandardCharsets.UTF_8) - // Note that `toString` will be called on the instance returned by `Deserializer.deserialize` - if (props.containsKey("key.deserializer")) { - keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).getDeclaredConstructor() - .newInstance().asInstanceOf[Deserializer[_]]) - keyDeserializer.get.configure(propertiesWithKeyPrefixStripped("key.deserializer.", props).asScala.asJava, true) - } - // Note that `toString` will be called on the instance returned by `Deserializer.deserialize` - if (props.containsKey("value.deserializer")) { - valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).getDeclaredConstructor() - .newInstance().asInstanceOf[Deserializer[_]]) - valueDeserializer.get.configure(propertiesWithKeyPrefixStripped("value.deserializer.", props).asScala.asJava, false) - } - } - - private def propertiesWithKeyPrefixStripped(prefix: String, props: Properties): Properties = { - val newProps = new Properties() - props.asScala.forKeyValue { (key, value) => - if (key.startsWith(prefix) && key.length > prefix.length) - newProps.put(key.substring(prefix.length), value) - } - newProps + getPropertyIfExists(configs, "print.timestamp", getBoolProperty).foreach(printTimestamp = _) + getPropertyIfExists(configs, "print.key", getBoolProperty).foreach(printKey = _) + getPropertyIfExists(configs, "print.offset", getBoolProperty).foreach(printOffset = _) + getPropertyIfExists(configs, "print.partition", getBoolProperty).foreach(printPartition = _) + getPropertyIfExists(configs, "print.headers", getBoolProperty).foreach(printHeaders = _) + getPropertyIfExists(configs, "print.value", getBoolProperty).foreach(printValue = _) + getPropertyIfExists(configs, "key.separator", getByteProperty).foreach(keySeparator = _) + getPropertyIfExists(configs, "line.separator", getByteProperty).foreach(lineSeparator = _) + getPropertyIfExists(configs, "headers.separator", getByteProperty).foreach(headersSeparator = _) + getPropertyIfExists(configs, "null.literal", getByteProperty).foreach(nullLiteral = _) + + keyDeserializer = getPropertyIfExists(configs, "key.deserializer", getDeserializerProperty(true)) + valueDeserializer = getPropertyIfExists(configs, "value.deserializer", getDeserializerProperty(false)) + headersDeserializer = getPropertyIfExists(configs, "headers.deserializer", getDeserializerProperty(false)) } def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = { @@ -512,37 +503,96 @@ class DefaultMessageFormatter extends MessageFormatter { output.write(lineSeparator) } - def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String): Unit = { - val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes(StandardCharsets.UTF_8)) - val convertedBytes = deserializer.map(_.deserialize(topic, consumerRecord.headers, nonNullBytes).toString. - getBytes(StandardCharsets.UTF_8)).getOrElse(nonNullBytes) - output.write(convertedBytes) + def deserialize(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String) = { + val nonNullBytes = Option(sourceBytes).getOrElse(nullLiteral) + val convertedBytes = deserializer + .map(d => utfBytes(d.deserialize(topic, consumerRecord.headers, nonNullBytes).toString)) + .getOrElse(nonNullBytes) + convertedBytes } import consumerRecord._ if (printTimestamp) { if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) - output.write(s"$timestampType:$timestamp".getBytes(StandardCharsets.UTF_8)) + output.write(utfBytes(s"$timestampType:$timestamp")) else - output.write(s"NO_TIMESTAMP".getBytes(StandardCharsets.UTF_8)) - writeSeparator(printKey || printValue) + output.write(utfBytes("NO_TIMESTAMP")) + writeSeparator(columnSeparator = printOffset || printPartition || printHeaders || printKey || printValue) + } + + if (printPartition) { + output.write(utfBytes("Partition:")) + output.write(utfBytes(partition().toString)) + writeSeparator(columnSeparator = printOffset || printHeaders || printKey || printValue) + } + + if (printOffset) { + output.write(utfBytes("Offset:")) + output.write(utfBytes(offset().toString)) + writeSeparator(columnSeparator = printHeaders || printKey || printValue) + } + + if (printHeaders) { + val headersIt = headers().iterator.asScala + if (headersIt.hasNext) { + headersIt.foreach { header => + output.write(utfBytes(header.key() + ":")) + output.write(deserialize(headersDeserializer, header.value(), topic)) + if (headersIt.hasNext) { + output.write(headersSeparator) + } + } + } else { + output.write(utfBytes("NO_HEADERS")) + } + writeSeparator(columnSeparator = printKey || printValue) } if (printKey) { - write(keyDeserializer, key, topic) - writeSeparator(printValue) + output.write(deserialize(keyDeserializer, key, topic)) + writeSeparator(columnSeparator = printValue) } if (printValue) { - write(valueDeserializer, value, topic) - writeSeparator(printPartition) + output.write(deserialize(valueDeserializer, value, topic)) + output.write(lineSeparator) } + } - if (printPartition) { - output.write(s"$partition".getBytes(StandardCharsets.UTF_8)) - output.write(lineSeparator) + private def propertiesWithKeyPrefixStripped(prefix: String, configs: Map[String, _]): Map[String, _] = { + val newConfigs = collection.mutable.Map[String, Any]() + configs.asScala.foreach { case (key, value) => + if (key.startsWith(prefix) && key.length > prefix.length) + newConfigs.put(key.substring(prefix.length), value) } + newConfigs.asJava + } + + private def utfBytes(str: String) = str.getBytes(StandardCharsets.UTF_8) + + private def getByteProperty(configs: Map[String, _], key: String): Array[Byte] = { + utfBytes(configs.get(key).asInstanceOf[String]) + } + + private def getBoolProperty(configs: Map[String, _], key: String): Boolean = { + configs.get(key).asInstanceOf[String].trim.equalsIgnoreCase("true") + } + + private def getDeserializerProperty(isKey: Boolean)(configs: Map[String, _], propertyName: String): Deserializer[_] = { + val deserializer = Class.forName(configs.get(propertyName).asInstanceOf[String]).newInstance().asInstanceOf[Deserializer[_]] + val deserializerConfig = propertiesWithKeyPrefixStripped(propertyName + ".", configs) + .asScala + .asJava + deserializer.configure(deserializerConfig, isKey) + deserializer + } + + private def getPropertyIfExists[T](configs: Map[String, _], key: String, getter: (Map[String, _], String) => T): Option[T] = { + if (configs.containsKey(key)) + Some(getter(configs, key)) + else + None } } diff --git a/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala b/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala new file mode 100644 index 0000000000000..27442856515ab --- /dev/null +++ b/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.tools + +import java.io.{ByteArrayOutputStream, Closeable, PrintStream} +import java.nio.charset.StandardCharsets +import java.util + +import kafka.tools.DefaultMessageFormatter +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.header.Header +import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders} +import org.apache.kafka.common.record.TimestampType +import org.apache.kafka.common.serialization.Deserializer +import org.junit.Assert._ +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.runners.Parameterized.Parameters + +import scala.jdk.CollectionConverters._ + +@RunWith(value = classOf[Parameterized]) +class DefaultMessageFormatterTest(name: String, record: ConsumerRecord[Array[Byte], Array[Byte]], properties: Map[String, String], expected: String) { + import DefaultMessageFormatterTest._ + + @Test + def testWriteRecord()= { + withResource(new ByteArrayOutputStream()) { baos => + withResource(new PrintStream(baos)) { ps => + val formatter = buildFormatter(properties) + formatter.writeTo(record, ps) + val actual = new String(baos.toByteArray(), StandardCharsets.UTF_8) + assertEquals(expected, actual) + + } + } + } +} + +object DefaultMessageFormatterTest { + @Parameters(name = "Test {index} - {0}") + def parameters: java.util.Collection[Array[Object]] = { + Seq( + Array( + "print nothing", + consumerRecord(), + Map("print.value" -> "false"), + ""), + Array( + "print key", + consumerRecord(), + Map("print.key" -> "true", + "print.value" -> "false"), + "someKey\n"), + Array( + "print value", + consumerRecord(), + Map(), + "someValue\n"), + Array( + "print empty timestamp", + consumerRecord(timestampType = TimestampType.NO_TIMESTAMP_TYPE), + Map("print.timestamp" -> "true", + "print.value" -> "false"), + "NO_TIMESTAMP\n"), + Array( + "print log append time timestamp", + consumerRecord(timestampType = TimestampType.LOG_APPEND_TIME), + Map("print.timestamp" -> "true", + "print.value" -> "false"), + "LogAppendTime:1234\n"), + Array( + "print create time timestamp", + consumerRecord(timestampType = TimestampType.CREATE_TIME), + Map("print.timestamp" -> "true", + "print.value" -> "false"), + "CreateTime:1234\n"), + Array( + "print partition", + consumerRecord(), + Map("print.partition" -> "true", + "print.value" -> "false"), + "Partition:9\n"), + Array( + "print offset", + consumerRecord(), + Map("print.offset" -> "true", + "print.value" -> "false"), + "Offset:9876\n"), + Array( + "print headers", + consumerRecord(), + Map("print.headers" -> "true", + "print.value" -> "false"), + "h1:v1,h2:v2\n"), + Array( + "print empty headers", + consumerRecord(headers = Nil), + Map("print.headers" -> "true", + "print.value" -> "false"), + "NO_HEADERS\n"), + Array( + "print all possible fields with default delimiters", + consumerRecord(), + Map("print.key" -> "true", + "print.timestamp" -> "true", + "print.partition" -> "true", + "print.offset" -> "true", + "print.headers" -> "true", + "print.value" -> "true"), + "CreateTime:1234\tPartition:9\tOffset:9876\th1:v1,h2:v2\tsomeKey\tsomeValue\n"), + Array( + "print all possible fields with custom delimiters", + consumerRecord(), + Map("key.separator" -> "|", + "line.separator" -> "^", + "headers.separator" -> "#", + "print.key" -> "true", + "print.timestamp" -> "true", + "print.partition" -> "true", + "print.offset" -> "true", + "print.headers" -> "true", + "print.value" -> "true"), + "CreateTime:1234|Partition:9|Offset:9876|h1:v1#h2:v2|someKey|someValue^"), + Array( + "print key with custom deserializer", + consumerRecord(), + Map("print.key" -> "true", + "print.headers" -> "true", + "print.value" -> "true", + "key.deserializer" -> "unit.kafka.tools.UpperCaseDeserializer"), + "h1:v1,h2:v2\tSOMEKEY\tsomeValue\n"), + Array( + "print value with custom deserializer", + consumerRecord(), + Map("print.key" -> "true", + "print.headers" -> "true", + "print.value" -> "true", + "value.deserializer" -> "unit.kafka.tools.UpperCaseDeserializer"), + "h1:v1,h2:v2\tsomeKey\tSOMEVALUE\n"), + Array( + "print headers with custom deserializer", + consumerRecord(), + Map("print.key" -> "true", + "print.headers" -> "true", + "print.value" -> "true", + "headers.deserializer" -> "unit.kafka.tools.UpperCaseDeserializer"), + "h1:V1,h2:V2\tsomeKey\tsomeValue\n"), + Array( + "print key and value", + consumerRecord(), + Map("print.key" -> "true", + "print.value" -> "true"), + "someKey\tsomeValue\n"), + Array( + "print fields in the beginning, middle and the end", + consumerRecord(), + Map("print.key" -> "true", + "print.value" -> "true", + "print.partition" -> "true"), + "Partition:9\tsomeKey\tsomeValue\n"), + Array( + "null value without custom null literal", + consumerRecord(value = null), + Map("print.key" -> "true"), + "someKey\tnull\n"), + Array( + "null value with custom null literal", + consumerRecord(value = null), + Map("print.key" -> "true", + "null.literal" -> "NULL"), + "someKey\tNULL\n"), + ).asJava + } + + private def buildFormatter(propsToSet: Map[String, String]): DefaultMessageFormatter = { + val formatter = new DefaultMessageFormatter() + formatter.configure(propsToSet.asJava) + formatter + } + + + private def header(key: String, value: String) = { + new RecordHeader(key, value.getBytes(StandardCharsets.UTF_8)) + } + + private def consumerRecord(key: String = "someKey", + value: String = "someValue", + headers: Iterable[Header] = Seq(header("h1", "v1"), header("h2", "v2")), + partition: Int = 9, + offset: Long = 9876, + timestamp: Long = 1234, + timestampType: TimestampType = TimestampType.CREATE_TIME) = { + new ConsumerRecord[Array[Byte], Array[Byte]]( + "someTopic", + partition, + offset, + timestamp, + timestampType, + 0L, + 0, + 0, + if (key == null) null else key.getBytes(StandardCharsets.UTF_8), + if (value == null) null else value.getBytes(StandardCharsets.UTF_8), + new RecordHeaders(headers.asJava)) + } + + private def withResource[Resource <: Closeable, Result](resource: Resource)(handler: Resource => Result): Result = { + try { + handler(resource) + } finally { + resource.close() + } + } +} + +class UpperCaseDeserializer extends Deserializer[String] { + override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {} + override def deserialize(topic: String, data: Array[Byte]): String = new String(data, StandardCharsets.UTF_8).toUpperCase + override def close(): Unit = {} +} diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala index e930ad5644d79..8387201f07398 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala @@ -518,18 +518,24 @@ class ConsoleConsumerTest { formatter.configure(configs) out = new ByteArrayOutputStream() formatter.writeTo(record, new PrintStream(out)) - assertEquals("key\tvalue\t0\n", out.toString) + assertEquals("Partition:0\tkey\tvalue\n", out.toString) configs.put("print.timestamp", "true") formatter.configure(configs) out = new ByteArrayOutputStream() formatter.writeTo(record, new PrintStream(out)) - assertEquals("NO_TIMESTAMP\tkey\tvalue\t0\n", out.toString) + assertEquals("NO_TIMESTAMP\tPartition:0\tkey\tvalue\n", out.toString) + + configs.put("print.offset", "true") + formatter.configure(configs) + out = new ByteArrayOutputStream() + formatter.writeTo(record, new PrintStream(out)) + assertEquals("NO_TIMESTAMP\tPartition:0\tOffset:123\tkey\tvalue\n", out.toString) out = new ByteArrayOutputStream() val record2 = new ConsumerRecord("topic", 0, 123, 123L, TimestampType.CREATE_TIME, 321L, -1, -1, "key".getBytes, "value".getBytes) formatter.writeTo(record2, new PrintStream(out)) - assertEquals("CreateTime:123\tkey\tvalue\t0\n", out.toString) + assertEquals("CreateTime:123\tPartition:0\tOffset:123\tkey\tvalue\n", out.toString) formatter.close() }