Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter #9099

Merged
174 changes: 112 additions & 62 deletions core/src/main/scala/kafka/tools/ConsoleConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -221,16 +221,23 @@ object ConsoleConsumer extends Logging {
.ofType(classOf[String])
.defaultsTo(classOf[DefaultMessageFormatter].getName)
val messageFormatterArgOpt = parser.accepts("property",
"The properties to initialize the message formatter. Default properties include:\n" +
"\tprint.timestamp=true|false\n" +
"\tprint.key=true|false\n" +
"\tprint.value=true|false\n" +
"\tkey.separator=<key.separator>\n" +
"\tline.separator=<line.separator>\n" +
"\tkey.deserializer=<key.deserializer>\n" +
"\tvalue.deserializer=<value.deserializer>\n" +
"\nUsers can also pass in customized properties for their formatter; more specifically, users " +
"can pass in properties keyed with \'key.deserializer.\' and \'value.deserializer.\' prefixes to configure their deserializers.")
"""The properties to initialize the message formatter. Default properties include:
| print.timestamp=true|false
| print.key=true|false
| print.offset=true|false
| print.partition=true|false
| print.headers=true|false
| print.value=true|false
| key.separator=<key.separator>
| line.separator=<line.separator>
| headers.separator=<line.separator>
| null.literal=<null.literal>
| key.deserializer=<key.deserializer>
| value.deserializer=<value.deserializer>
| header.deserializer=<header.deserializer>
|
|Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers."""
.stripMargin)
.withRequiredArg
.describedAs("prop")
.ofType(classOf[String])
Expand Down Expand Up @@ -459,48 +466,32 @@ class DefaultMessageFormatter extends MessageFormatter {
var printKey = false
var printValue = true
var printPartition = false
var keySeparator = "\t".getBytes(StandardCharsets.UTF_8)
var lineSeparator = "\n".getBytes(StandardCharsets.UTF_8)
var printOffset = false
var printHeaders = false
var keySeparator = utfBytes("\t")
var lineSeparator = utfBytes("\n")
var headersSeparator = utfBytes(",")
var nullLiteral = utfBytes("null")

var keyDeserializer: Option[Deserializer[_]] = None
var valueDeserializer: Option[Deserializer[_]] = None
var headersDeserializer: Option[Deserializer[_]] = None

override def configure(configs: Map[String, _]): Unit = {
val props = new java.util.Properties()
configs.asScala.forKeyValue { (key, value) => props.put(key, value.toString) }
if (props.containsKey("print.timestamp"))
printTimestamp = props.getProperty("print.timestamp").trim.equalsIgnoreCase("true")
if (props.containsKey("print.key"))
printKey = props.getProperty("print.key").trim.equalsIgnoreCase("true")
if (props.containsKey("print.value"))
printValue = props.getProperty("print.value").trim.equalsIgnoreCase("true")
if (props.containsKey("print.partition"))
printPartition = props.getProperty("print.partition").trim.equalsIgnoreCase("true")
if (props.containsKey("key.separator"))
keySeparator = props.getProperty("key.separator").getBytes(StandardCharsets.UTF_8)
if (props.containsKey("line.separator"))
lineSeparator = props.getProperty("line.separator").getBytes(StandardCharsets.UTF_8)
// Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
if (props.containsKey("key.deserializer")) {
keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).getDeclaredConstructor()
.newInstance().asInstanceOf[Deserializer[_]])
keyDeserializer.get.configure(propertiesWithKeyPrefixStripped("key.deserializer.", props).asScala.asJava, true)
}
// Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
if (props.containsKey("value.deserializer")) {
valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).getDeclaredConstructor()
.newInstance().asInstanceOf[Deserializer[_]])
valueDeserializer.get.configure(propertiesWithKeyPrefixStripped("value.deserializer.", props).asScala.asJava, false)
}
}

private def propertiesWithKeyPrefixStripped(prefix: String, props: Properties): Properties = {
val newProps = new Properties()
props.asScala.forKeyValue { (key, value) =>
if (key.startsWith(prefix) && key.length > prefix.length)
newProps.put(key.substring(prefix.length), value)
}
newProps
getPropertyIfExists(configs, "print.timestamp", getBoolProperty).foreach(printTimestamp = _)
getPropertyIfExists(configs, "print.key", getBoolProperty).foreach(printKey = _)
getPropertyIfExists(configs, "print.offset", getBoolProperty).foreach(printOffset = _)
getPropertyIfExists(configs, "print.partition", getBoolProperty).foreach(printPartition = _)
getPropertyIfExists(configs, "print.headers", getBoolProperty).foreach(printHeaders = _)
getPropertyIfExists(configs, "print.value", getBoolProperty).foreach(printValue = _)
getPropertyIfExists(configs, "key.separator", getByteProperty).foreach(keySeparator = _)
getPropertyIfExists(configs, "line.separator", getByteProperty).foreach(lineSeparator = _)
getPropertyIfExists(configs, "headers.separator", getByteProperty).foreach(headersSeparator = _)
getPropertyIfExists(configs, "null.literal", getByteProperty).foreach(nullLiteral = _)

keyDeserializer = getPropertyIfExists(configs, "key.deserializer", getDeserializerProperty(true))
valueDeserializer = getPropertyIfExists(configs, "value.deserializer", getDeserializerProperty(false))
headersDeserializer = getPropertyIfExists(configs, "headers.deserializer", getDeserializerProperty(false))
}

def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
Expand All @@ -512,37 +503,96 @@ class DefaultMessageFormatter extends MessageFormatter {
output.write(lineSeparator)
}

def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String): Unit = {
val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes(StandardCharsets.UTF_8))
val convertedBytes = deserializer.map(_.deserialize(topic, consumerRecord.headers, nonNullBytes).toString.
getBytes(StandardCharsets.UTF_8)).getOrElse(nonNullBytes)
output.write(convertedBytes)
def deserialize(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String) = {
val nonNullBytes = Option(sourceBytes).getOrElse(nullLiteral)
val convertedBytes = deserializer
.map(d => utfBytes(d.deserialize(topic, consumerRecord.headers, nonNullBytes).toString))
.getOrElse(nonNullBytes)
convertedBytes
}

import consumerRecord._

if (printTimestamp) {
if (timestampType != TimestampType.NO_TIMESTAMP_TYPE)
output.write(s"$timestampType:$timestamp".getBytes(StandardCharsets.UTF_8))
output.write(utfBytes(s"$timestampType:$timestamp"))
else
output.write(s"NO_TIMESTAMP".getBytes(StandardCharsets.UTF_8))
writeSeparator(printKey || printValue)
output.write(utfBytes("NO_TIMESTAMP"))
writeSeparator(columnSeparator = printOffset || printPartition || printHeaders || printKey || printValue)
}

if (printPartition) {
output.write(utfBytes("Partition:"))
output.write(utfBytes(partition().toString))
writeSeparator(columnSeparator = printOffset || printHeaders || printKey || printValue)
}

if (printOffset) {
output.write(utfBytes("Offset:"))
output.write(utfBytes(offset().toString))
writeSeparator(columnSeparator = printHeaders || printKey || printValue)
}

if (printHeaders) {
val headersIt = headers().iterator.asScala
if (headersIt.hasNext) {
headersIt.foreach { header =>
output.write(utfBytes(header.key() + ":"))
output.write(deserialize(headersDeserializer, header.value(), topic))
if (headersIt.hasNext) {
output.write(headersSeparator)
}
}
} else {
output.write(utfBytes("NO_HEADERS"))
}
writeSeparator(columnSeparator = printKey || printValue)
}

if (printKey) {
write(keyDeserializer, key, topic)
writeSeparator(printValue)
output.write(deserialize(keyDeserializer, key, topic))
writeSeparator(columnSeparator = printValue)
}

if (printValue) {
write(valueDeserializer, value, topic)
writeSeparator(printPartition)
output.write(deserialize(valueDeserializer, value, topic))
output.write(lineSeparator)
}
}

if (printPartition) {
output.write(s"$partition".getBytes(StandardCharsets.UTF_8))
output.write(lineSeparator)
private def propertiesWithKeyPrefixStripped(prefix: String, configs: Map[String, _]): Map[String, _] = {
val newConfigs = collection.mutable.Map[String, Any]()
configs.asScala.foreach { case (key, value) =>
if (key.startsWith(prefix) && key.length > prefix.length)
newConfigs.put(key.substring(prefix.length), value)
}
newConfigs.asJava
}

private def utfBytes(str: String) = str.getBytes(StandardCharsets.UTF_8)

private def getByteProperty(configs: Map[String, _], key: String): Array[Byte] = {
utfBytes(configs.get(key).asInstanceOf[String])
}

private def getBoolProperty(configs: Map[String, _], key: String): Boolean = {
configs.get(key).asInstanceOf[String].trim.equalsIgnoreCase("true")
}

private def getDeserializerProperty(isKey: Boolean)(configs: Map[String, _], propertyName: String): Deserializer[_] = {
val deserializer = Class.forName(configs.get(propertyName).asInstanceOf[String]).newInstance().asInstanceOf[Deserializer[_]]
val deserializerConfig = propertiesWithKeyPrefixStripped(propertyName + ".", configs)
.asScala
.asJava
deserializer.configure(deserializerConfig, isKey)
deserializer
}

private def getPropertyIfExists[T](configs: Map[String, _], key: String, getter: (Map[String, _], String) => T): Option[T] = {
if (configs.containsKey(key))
Some(getter(configs, key))
else
None
}
}

Expand Down
Loading