Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add topic name as a column in the Kafka Input format #14857

Merged
merged 2 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class KafkaInputFormat implements InputFormat
{
private static final String DEFAULT_HEADER_COLUMN_PREFIX = "kafka.header.";
private static final String DEFAULT_TIMESTAMP_COLUMN_NAME = "kafka.timestamp";
private static final String DEFAULT_TOPIC_COLUMN_NAME = "kafka.topic";
private static final String DEFAULT_KEY_COLUMN_NAME = "kafka.key";
public static final String DEFAULT_AUTO_TIMESTAMP_STRING = "__kif_auto_timestamp";

Expand All @@ -54,14 +55,16 @@ public class KafkaInputFormat implements InputFormat
private final String headerColumnPrefix;
private final String keyColumnName;
private final String timestampColumnName;
private final String topicColumnName;

public KafkaInputFormat(
@JsonProperty("headerFormat") @Nullable KafkaHeaderFormat headerFormat,
@JsonProperty("keyFormat") @Nullable InputFormat keyFormat,
@JsonProperty("valueFormat") InputFormat valueFormat,
@JsonProperty("headerColumnPrefix") @Nullable String headerColumnPrefix,
@JsonProperty("keyColumnName") @Nullable String keyColumnName,
@JsonProperty("timestampColumnName") @Nullable String timestampColumnName
@JsonProperty("timestampColumnName") @Nullable String timestampColumnName,
@JsonProperty("topicColumnName") @Nullable String topicColumnName
)
{
this.headerFormat = headerFormat;
Expand All @@ -70,6 +73,7 @@ public KafkaInputFormat(
this.headerColumnPrefix = headerColumnPrefix != null ? headerColumnPrefix : DEFAULT_HEADER_COLUMN_PREFIX;
this.keyColumnName = keyColumnName != null ? keyColumnName : DEFAULT_KEY_COLUMN_NAME;
this.timestampColumnName = timestampColumnName != null ? timestampColumnName : DEFAULT_TIMESTAMP_COLUMN_NAME;
this.topicColumnName = topicColumnName != null ? topicColumnName : DEFAULT_TOPIC_COLUMN_NAME;
}

@Override
Expand Down Expand Up @@ -116,7 +120,8 @@ record ->
temporaryDirectory
),
keyColumnName,
timestampColumnName
timestampColumnName,
topicColumnName
);
}

Expand Down Expand Up @@ -161,6 +166,13 @@ public String getTimestampColumnName()
return timestampColumnName;
}

@Nullable
@JsonProperty
public String getTopicColumnName()
{
return topicColumnName;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -176,14 +188,15 @@ public boolean equals(Object o)
&& Objects.equals(keyFormat, that.keyFormat)
&& Objects.equals(headerColumnPrefix, that.headerColumnPrefix)
&& Objects.equals(keyColumnName, that.keyColumnName)
&& Objects.equals(timestampColumnName, that.timestampColumnName);
&& Objects.equals(timestampColumnName, that.timestampColumnName)
&& Objects.equals(topicColumnName, that.topicColumnName);
}

@Override
public int hashCode()
{
return Objects.hash(headerFormat, valueFormat, keyFormat,
headerColumnPrefix, keyColumnName, timestampColumnName
headerColumnPrefix, keyColumnName, timestampColumnName, topicColumnName
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class KafkaInputReader implements InputEntityReader
private final InputEntityReader valueParser;
private final String keyColumnName;
private final String timestampColumnName;
private final String topicColumnName;

/**
*
Expand All @@ -74,7 +75,8 @@ public KafkaInputReader(
@Nullable Function<KafkaRecordEntity, InputEntityReader> keyParserSupplier,
InputEntityReader valueParser,
String keyColumnName,
String timestampColumnName
String timestampColumnName,
String topicColumnName
)
{
this.inputRowSchema = inputRowSchema;
Expand All @@ -84,6 +86,7 @@ public KafkaInputReader(
this.valueParser = valueParser;
this.keyColumnName = keyColumnName;
this.timestampColumnName = timestampColumnName;
this.topicColumnName = topicColumnName;
}

@Override
Expand Down Expand Up @@ -128,6 +131,9 @@ private Map<String, Object> extractHeader(KafkaRecordEntity record)
// the header list
mergedHeaderMap.putIfAbsent(timestampColumnName, record.getRecord().timestamp());

// Add kafka record topic to the mergelist, only if the key doesn't already exist
mergedHeaderMap.putIfAbsent(topicColumnName, record.getRecord().topic());

return mergedHeaderMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class KafkaInputFormatTest
{
private KafkaRecordEntity inputEntity;
private final long timestamp = DateTimes.of("2021-06-24").getMillis();
private static final String TOPIC = "sample";
private static final Iterable<Header> SAMPLE_HEADERS = ImmutableList.of(
new Header()
{
Expand Down Expand Up @@ -126,7 +127,8 @@ public void setUp()
),
"kafka.newheader.",
"kafka.newkey.key",
"kafka.newts.timestamp"
"kafka.newts.timestamp",
"kafka.newtopic.topic"
);
}

Expand Down Expand Up @@ -166,7 +168,8 @@ public void testSerde() throws JsonProcessingException
),
"kafka.newheader.",
"kafka.newkey.key",
"kafka.newts.timestamp"
"kafka.newts.timestamp",
"kafka.newtopic.topic"
);
Assert.assertEquals(format, kif);

Expand Down Expand Up @@ -209,7 +212,8 @@ public void testWithHeaderKeyAndValue() throws IOException
"foo",
"kafka.newheader.encoding",
"kafka.newheader.kafkapkc",
"kafka.newts.timestamp"
"kafka.newts.timestamp",
"kafka.newtopic.topic"
)
)
),
Expand All @@ -231,7 +235,8 @@ public void testWithHeaderKeyAndValue() throws IOException
"foo",
"kafka.newheader.encoding",
"kafka.newheader.kafkapkc",
"kafka.newts.timestamp"
"kafka.newts.timestamp",
"kafka.newtopic.topic"
),
row.getDimensions()
);
Expand All @@ -254,6 +259,10 @@ public void testWithHeaderKeyAndValue() throws IOException
String.valueOf(DateTimes.of("2021-06-24").getMillis()),
Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp"))
);
Assert.assertEquals(
TOPIC,
Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic"))
);
Assert.assertEquals(
"2021-06-25",
Iterables.getOnlyElement(row.getDimension("timestamp"))
Expand Down Expand Up @@ -302,7 +311,8 @@ public void testWithOutKey() throws IOException
"foo",
"kafka.newheader.encoding",
"kafka.newheader.kafkapkc",
"kafka.newts.timestamp"
"kafka.newts.timestamp",
"kafka.newtopic.topic"
)
)
),
Expand Down Expand Up @@ -478,7 +488,7 @@ public void testWithOutKeyAndHeaderSpecs() throws IOException
null, null, false, //make sure JsonReader is used
false, false
),
"kafka.newheader.", "kafka.newkey.", "kafka.newts."
"kafka.newheader.", "kafka.newkey.", "kafka.newts.", "kafka.newtopic."
);

final InputEntityReader reader = localFormat.createReader(
Expand All @@ -489,7 +499,8 @@ public void testWithOutKeyAndHeaderSpecs() throws IOException
ImmutableList.of(
"bar",
"foo",
"kafka.newts.timestamp"
"kafka.newts.timestamp",
"kafka.newtopic.topic"
)
)
),
Expand Down Expand Up @@ -567,7 +578,8 @@ public void testWithMultipleMixedRecords() throws IOException
"foo",
"kafka.newheader.encoding",
"kafka.newheader.kafkapkc",
"kafka.newts.timestamp"
"kafka.newts.timestamp",
"kafka.newtopic.topic"
)
)
),
Expand Down Expand Up @@ -613,6 +625,10 @@ public void testWithMultipleMixedRecords() throws IOException
String.valueOf(DateTimes.of("2021-06-24").getMillis()),
Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp"))
);
Assert.assertEquals(
TOPIC,
Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic"))
);
Assert.assertEquals(String.valueOf(i), Iterables.getOnlyElement(row.getDimension("kafka.newheader.indexH")));


Expand Down Expand Up @@ -669,7 +685,8 @@ public void testMissingTimestampThrowsException() throws IOException
"foo",
"kafka.newheader.encoding",
"kafka.newheader.kafkapkc",
"kafka.newts.timestamp"
"kafka.newts.timestamp",
"kafka.newtopic.topic"
)
)
),
Expand All @@ -683,7 +700,8 @@ public void testMissingTimestampThrowsException() throws IOException
while (iterator.hasNext()) {
Throwable t = Assert.assertThrows(ParseException.class, () -> iterator.next());
Assert.assertEquals(
"Timestamp[null] is unparseable! Event: {foo=x, kafka.newts.timestamp=1624492800000, kafka.newkey.key=sampleKey, root_baz=4, bar=null, kafka...",
"Timestamp[null] is unparseable! Event: {kafka.newtopic.topic=sample, foo=x, kafka.newts"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Maybe break the line right before kafka.newts.timestamp or before Event:

+ ".timestamp=1624492800000, kafka.newkey.key=sampleKey...",
t.getMessage()
);
}
Expand Down Expand Up @@ -733,6 +751,7 @@ public void testWithSchemaDiscovery() throws IOException
final InputRow row = iterator.next();
Assert.assertEquals(
Arrays.asList(
"kafka.newtopic.topic",
"foo",
"kafka.newts.timestamp",
"kafka.newkey.key",
Expand Down Expand Up @@ -767,6 +786,10 @@ public void testWithSchemaDiscovery() throws IOException
String.valueOf(DateTimes.of("2021-06-24").getMillis()),
Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp"))
);
Assert.assertEquals(
TOPIC,
Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic"))
);
Assert.assertEquals(
"2021-06-25",
Iterables.getOnlyElement(row.getDimension("timestamp"))
Expand Down Expand Up @@ -834,6 +857,7 @@ public void testWithPartialDeclarationSchemaDiscovery() throws IOException
Arrays.asList(
"bar",
"kafka.newheader.kafkapkc",
"kafka.newtopic.topic",
"foo",
"kafka.newts.timestamp",
"kafka.newkey.key",
Expand Down Expand Up @@ -866,6 +890,10 @@ public void testWithPartialDeclarationSchemaDiscovery() throws IOException
String.valueOf(DateTimes.of("2021-06-24").getMillis()),
Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp"))
);
Assert.assertEquals(
TOPIC,
Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic"))
);
Assert.assertEquals(
"2021-06-25",
Iterables.getOnlyElement(row.getDimension("timestamp"))
Expand All @@ -889,7 +917,7 @@ private KafkaRecordEntity makeInputEntity(byte[] key, byte[] payload, Headers he
{
return new KafkaRecordEntity(
new ConsumerRecord<>(
"sample",
TOPIC,
0,
0,
timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public byte[] value()
new KafkaStringHeaderFormat(null),
INPUT_FORMAT,
INPUT_FORMAT,
"kafka.testheader.", "kafka.key", "kafka.timestamp"
"kafka.testheader.", "kafka.key", "kafka.timestamp", "kafka.topic"
);

private static TestingCluster zkServer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ public void testSampleKafkaInputFormat()
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null,
null,
null,
null
),

Expand Down
Loading