Skip to content

Commit

Permalink
KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessa…
Browse files Browse the repository at this point in the history
…geFormatter (apache#9099)

Implementation of KIP-431 - Support of printing additional ConsumerRecord fields in DefaultMessageFormatter

https://cwiki.apache.org/confluence/display/KAFKA/KIP-431%3A+Support+of+printing+additional+ConsumerRecord+fields+in+DefaultMessageFormatter

Reviewers: David Jacot <[email protected]>, Bill Bejeck <[email protected]>
  • Loading branch information
badaiaqrandista authored and javierfreire committed Oct 8, 2020
1 parent becfa88 commit 31f90ce
Show file tree
Hide file tree
Showing 3 changed files with 358 additions and 65 deletions.
174 changes: 112 additions & 62 deletions core/src/main/scala/kafka/tools/ConsoleConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -221,16 +221,23 @@ object ConsoleConsumer extends Logging {
.ofType(classOf[String])
.defaultsTo(classOf[DefaultMessageFormatter].getName)
val messageFormatterArgOpt = parser.accepts("property",
"The properties to initialize the message formatter. Default properties include:\n" +
"\tprint.timestamp=true|false\n" +
"\tprint.key=true|false\n" +
"\tprint.value=true|false\n" +
"\tkey.separator=<key.separator>\n" +
"\tline.separator=<line.separator>\n" +
"\tkey.deserializer=<key.deserializer>\n" +
"\tvalue.deserializer=<value.deserializer>\n" +
"\nUsers can also pass in customized properties for their formatter; more specifically, users " +
"can pass in properties keyed with \'key.deserializer.\' and \'value.deserializer.\' prefixes to configure their deserializers.")
"""The properties to initialize the message formatter. Default properties include:
| print.timestamp=true|false
| print.key=true|false
| print.offset=true|false
| print.partition=true|false
| print.headers=true|false
| print.value=true|false
| key.separator=<key.separator>
| line.separator=<line.separator>
| headers.separator=<line.separator>
| null.literal=<null.literal>
| key.deserializer=<key.deserializer>
| value.deserializer=<value.deserializer>
| header.deserializer=<header.deserializer>
|
|Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers."""
.stripMargin)
.withRequiredArg
.describedAs("prop")
.ofType(classOf[String])
Expand Down Expand Up @@ -459,48 +466,32 @@ class DefaultMessageFormatter extends MessageFormatter {
var printKey = false
var printValue = true
var printPartition = false
var keySeparator = "\t".getBytes(StandardCharsets.UTF_8)
var lineSeparator = "\n".getBytes(StandardCharsets.UTF_8)
var printOffset = false
var printHeaders = false
var keySeparator = utfBytes("\t")
var lineSeparator = utfBytes("\n")
var headersSeparator = utfBytes(",")
var nullLiteral = utfBytes("null")

var keyDeserializer: Option[Deserializer[_]] = None
var valueDeserializer: Option[Deserializer[_]] = None
var headersDeserializer: Option[Deserializer[_]] = None

override def configure(configs: Map[String, _]): Unit = {
val props = new java.util.Properties()
configs.asScala.forKeyValue { (key, value) => props.put(key, value.toString) }
if (props.containsKey("print.timestamp"))
printTimestamp = props.getProperty("print.timestamp").trim.equalsIgnoreCase("true")
if (props.containsKey("print.key"))
printKey = props.getProperty("print.key").trim.equalsIgnoreCase("true")
if (props.containsKey("print.value"))
printValue = props.getProperty("print.value").trim.equalsIgnoreCase("true")
if (props.containsKey("print.partition"))
printPartition = props.getProperty("print.partition").trim.equalsIgnoreCase("true")
if (props.containsKey("key.separator"))
keySeparator = props.getProperty("key.separator").getBytes(StandardCharsets.UTF_8)
if (props.containsKey("line.separator"))
lineSeparator = props.getProperty("line.separator").getBytes(StandardCharsets.UTF_8)
// Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
if (props.containsKey("key.deserializer")) {
keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).getDeclaredConstructor()
.newInstance().asInstanceOf[Deserializer[_]])
keyDeserializer.get.configure(propertiesWithKeyPrefixStripped("key.deserializer.", props).asScala.asJava, true)
}
// Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
if (props.containsKey("value.deserializer")) {
valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).getDeclaredConstructor()
.newInstance().asInstanceOf[Deserializer[_]])
valueDeserializer.get.configure(propertiesWithKeyPrefixStripped("value.deserializer.", props).asScala.asJava, false)
}
}

private def propertiesWithKeyPrefixStripped(prefix: String, props: Properties): Properties = {
val newProps = new Properties()
props.asScala.forKeyValue { (key, value) =>
if (key.startsWith(prefix) && key.length > prefix.length)
newProps.put(key.substring(prefix.length), value)
}
newProps
getPropertyIfExists(configs, "print.timestamp", getBoolProperty).foreach(printTimestamp = _)
getPropertyIfExists(configs, "print.key", getBoolProperty).foreach(printKey = _)
getPropertyIfExists(configs, "print.offset", getBoolProperty).foreach(printOffset = _)
getPropertyIfExists(configs, "print.partition", getBoolProperty).foreach(printPartition = _)
getPropertyIfExists(configs, "print.headers", getBoolProperty).foreach(printHeaders = _)
getPropertyIfExists(configs, "print.value", getBoolProperty).foreach(printValue = _)
getPropertyIfExists(configs, "key.separator", getByteProperty).foreach(keySeparator = _)
getPropertyIfExists(configs, "line.separator", getByteProperty).foreach(lineSeparator = _)
getPropertyIfExists(configs, "headers.separator", getByteProperty).foreach(headersSeparator = _)
getPropertyIfExists(configs, "null.literal", getByteProperty).foreach(nullLiteral = _)

keyDeserializer = getPropertyIfExists(configs, "key.deserializer", getDeserializerProperty(true))
valueDeserializer = getPropertyIfExists(configs, "value.deserializer", getDeserializerProperty(false))
headersDeserializer = getPropertyIfExists(configs, "headers.deserializer", getDeserializerProperty(false))
}

def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
Expand All @@ -512,37 +503,96 @@ class DefaultMessageFormatter extends MessageFormatter {
output.write(lineSeparator)
}

def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String): Unit = {
val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes(StandardCharsets.UTF_8))
val convertedBytes = deserializer.map(_.deserialize(topic, consumerRecord.headers, nonNullBytes).toString.
getBytes(StandardCharsets.UTF_8)).getOrElse(nonNullBytes)
output.write(convertedBytes)
def deserialize(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String) = {
val nonNullBytes = Option(sourceBytes).getOrElse(nullLiteral)
val convertedBytes = deserializer
.map(d => utfBytes(d.deserialize(topic, consumerRecord.headers, nonNullBytes).toString))
.getOrElse(nonNullBytes)
convertedBytes
}

import consumerRecord._

if (printTimestamp) {
if (timestampType != TimestampType.NO_TIMESTAMP_TYPE)
output.write(s"$timestampType:$timestamp".getBytes(StandardCharsets.UTF_8))
output.write(utfBytes(s"$timestampType:$timestamp"))
else
output.write(s"NO_TIMESTAMP".getBytes(StandardCharsets.UTF_8))
writeSeparator(printKey || printValue)
output.write(utfBytes("NO_TIMESTAMP"))
writeSeparator(columnSeparator = printOffset || printPartition || printHeaders || printKey || printValue)
}

if (printPartition) {
output.write(utfBytes("Partition:"))
output.write(utfBytes(partition().toString))
writeSeparator(columnSeparator = printOffset || printHeaders || printKey || printValue)
}

if (printOffset) {
output.write(utfBytes("Offset:"))
output.write(utfBytes(offset().toString))
writeSeparator(columnSeparator = printHeaders || printKey || printValue)
}

if (printHeaders) {
val headersIt = headers().iterator.asScala
if (headersIt.hasNext) {
headersIt.foreach { header =>
output.write(utfBytes(header.key() + ":"))
output.write(deserialize(headersDeserializer, header.value(), topic))
if (headersIt.hasNext) {
output.write(headersSeparator)
}
}
} else {
output.write(utfBytes("NO_HEADERS"))
}
writeSeparator(columnSeparator = printKey || printValue)
}

if (printKey) {
write(keyDeserializer, key, topic)
writeSeparator(printValue)
output.write(deserialize(keyDeserializer, key, topic))
writeSeparator(columnSeparator = printValue)
}

if (printValue) {
write(valueDeserializer, value, topic)
writeSeparator(printPartition)
output.write(deserialize(valueDeserializer, value, topic))
output.write(lineSeparator)
}
}

if (printPartition) {
output.write(s"$partition".getBytes(StandardCharsets.UTF_8))
output.write(lineSeparator)
private def propertiesWithKeyPrefixStripped(prefix: String, configs: Map[String, _]): Map[String, _] = {
val newConfigs = collection.mutable.Map[String, Any]()
configs.asScala.foreach { case (key, value) =>
if (key.startsWith(prefix) && key.length > prefix.length)
newConfigs.put(key.substring(prefix.length), value)
}
newConfigs.asJava
}

private def utfBytes(str: String) = str.getBytes(StandardCharsets.UTF_8)

private def getByteProperty(configs: Map[String, _], key: String): Array[Byte] = {
utfBytes(configs.get(key).asInstanceOf[String])
}

private def getBoolProperty(configs: Map[String, _], key: String): Boolean = {
configs.get(key).asInstanceOf[String].trim.equalsIgnoreCase("true")
}

private def getDeserializerProperty(isKey: Boolean)(configs: Map[String, _], propertyName: String): Deserializer[_] = {
val deserializer = Class.forName(configs.get(propertyName).asInstanceOf[String]).newInstance().asInstanceOf[Deserializer[_]]
val deserializerConfig = propertiesWithKeyPrefixStripped(propertyName + ".", configs)
.asScala
.asJava
deserializer.configure(deserializerConfig, isKey)
deserializer
}

private def getPropertyIfExists[T](configs: Map[String, _], key: String, getter: (Map[String, _], String) => T): Option[T] = {
if (configs.containsKey(key))
Some(getter(configs, key))
else
None
}
}

Expand Down
Loading

0 comments on commit 31f90ce

Please sign in to comment.