-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feature/Option-to-import-JSON-as-is #18
feature/Option-to-import-JSON-as-is #18
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @ilikutle,
Thanks a lot for working on this issue and sending the pull request! We really appreciate it!
I have submitted some request change suggestions. Please have a look. Let me know if you have questions or want to discuss them.
Could you please also add a small test for this? You can check out how we use embedded-kafka library on KafkaTopicDataImportIT class. You can create another similar class (KafkaTopicDataImporterAsJsonIT) with a single test, and update the user properties accordingly.
doc/user_guide/user_guide.md
Outdated
The first two columns are used to store the metadata about Kafka topic partition | ||
```sql | ||
CREATE OR REPLACE TABLE <schema_name>.<table_name> ( | ||
-- These columns match the Kafka topic schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-- These columns match the Kafka topic schema | |
-- Single column as JSON string for Kafka topic record |
@@ -49,6 +49,7 @@ object KafkaTopicDataImporter extends LazyLogging { | |||
val maxRecords = kafkaProperties.getMaxRecordsPerRun() | |||
val minRecords = kafkaProperties.getMinRecordsPerRun() | |||
val timeout = kafkaProperties.getPollTimeoutMs() | |||
val singleColJson = kafkaProperties.getSingleColJson() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val singleColJson = kafkaProperties.getSingleColJson() | |
val singleColumnJson = kafkaProperties.getSingleColJson() |
val avroRow = AvroRow(record.value()).getValues().map(_.asInstanceOf[AnyRef]) | ||
val exasolRow: Seq[Object] = avroRow ++ metadata | ||
iterator.emit(exasolRow: _*) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add one private function for a row?
Suggestion:
val exasolRow: Seq[Object] = getAvroRow(singleColumnJson) ++ metadata
iterator.emit(exasolRow: _*)
where the getAvroRow
function contains the if else part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor typo in the changes doc, then looks good!
Co-authored-by: Muhammet Orazov <[email protected]>
#16 fix