-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add topic name as a column in the Kafka Input format #14857
Add topic name as a column in the Kafka Input format #14857
Conversation
@@ -683,7 +700,8 @@ public void testMissingTimestampThrowsException() throws IOException | |||
while (iterator.hasNext()) { | |||
Throwable t = Assert.assertThrows(ParseException.class, () -> iterator.next()); | |||
Assert.assertEquals( | |||
"Timestamp[null] is unparseable! Event: {foo=x, kafka.newts.timestamp=1624492800000, kafka.newkey.key=sampleKey, root_baz=4, bar=null, kafka...", | |||
"Timestamp[null] is unparseable! Event: {kafka.newtopic.topic=sample, foo=x, kafka.newts" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Maybe break the line right before kafka.newts.timestamp
or before Event:
@@ -59,6 +59,7 @@ public class KafkaInputFormatTest | |||
{ | |||
private KafkaRecordEntity inputEntity; | |||
private final long timestamp = DateTimes.of("2021-06-24").getMillis(); | |||
private final String TOPIC = "sample"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private final String TOPIC = "sample"; | |
private static final String TOPIC = "sample"; |
// Add kafka record topic to the mergelist, we will skip record topic if the same key exists already in | ||
// the header list |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
// Add kafka record topic to the mergelist, we will skip record topic if the same key exists already in | |
// the header list | |
// Add kafka record topic to the mergelist, only if the key doesn't already exist |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be very helpful for debugging! Left some minor comments.
Edit: For ingestion from single topic, this column would have redundant data leading to unnecessary storage costs. How can we avoid populating it in such cases while still populating the other metadata columns such as kafka.header.x
and kafka.timestamp
?
We should also include a release note in the PR description and update the docs.
Does it make sense to update the API docs in this PR also? |
@kfaraz - I suppose you just wouldn't add the column to your dimension spec. it's just like input format exposing 11 columns instead of 10 but you can choose to ingest only 10 of those. @vogievetsky - maybe we can make topic column optional in the console or detect in console if topicPattern is set and only then ask for kafka topic column name? |
@vogievetsky - I will make the docs changes in a separate PR. |
4682623
to
8260b97
Compare
@kfaraz - one thing I can do is to keep the default kafka topic name as null. So unless explicitly specific, this column will not be populated. what do you think? |
Do you mean the DEFAULT_TOPIC_COLUMN_NAME would be Also, I guess what you had previously suggested wouldn't work, right? Because we seem to be adding the metadata columns even if they are not specified in the
|
@kfaraz - I read it a bit more. So these fields are populated if you set the input format to Kafka. If you leave the input format same as underlying data e.g. avro, these metadata fields are not populated. If you set the input format to Kafka and still want to not ingest this column, there are following ways
Now that we know above, I think it's ok to not do special handling for topic column. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 🚀
This PR adds a way to store the topic name in a column. Such a column can be used to distinguish messages coming from different topics in multi-topic ingestion.
Release notes
You can now optionally ingest the name of the Kafka topic to the datasource. It is particularly helpful when datasource is getting data from multiple Kafka topics.
This PR has: