Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/Option-to-import-JSON-as-is #18

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions doc/changes/changes_0.2.1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Kafka Connector Extension 0.2.1, released DD-MM-YYYY

## Summary

This release includes new feature to import Kafka message
as whole JSON document in a single column in database

## Features

* #16: Option-to-import-JSON-as-is (PR #18)
21 changes: 20 additions & 1 deletion doc/user_guide/user_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ according to your deployment setup.

## Prepare Exasol Table

### Avro preparation

You should create a corresponding table in Exasol that stores the data from
a Kafka topic.

Expand All @@ -181,8 +183,21 @@ CREATE OR REPLACE TABLE <schema_name>.<table_name> (
KAFKA_OFFSET DECIMAL(36, 0),
);
```
### JSON preparation

In case you want to add whole json document in one single column, (see **AS_JSON_DOC** on: [Optional
consumer properties](#optional-properties)) then create table like this:

```sql
CREATE OR REPLACE TABLE <schema_name>.<table_name> (
-- Single column as JSON string for Kafka topic record
JSON_DOC_COL VARCHAR(2000000),
-- Required for Kafka import UDF
KAFKA_PARTITION DECIMAL(18, 0),
KAFKA_OFFSET DECIMAL(36, 0),
```

The first two columns are used to store the metadata about Kafka topic partition
The last two columns are used to store the metadata about Kafka topic partition
and record offset inside a partition:

- KAFKA_PARTITION DECIMAL(18,0)
Expand Down Expand Up @@ -425,6 +440,10 @@ These are optional parameters with their default values.
* ``MAX_PARTITION_FETCH_BYTES`` - It is the maximum amount of data per
partition the server will return. The default value is **1048576**.

* ``AS_JSON_DOC`` - It defines the way the data will be imported into the database.
If set to **'true'** data will be imported as one JSON document in one column.
Default value is **'false'**

The following properties should be provided to enable a secure connection to the
Kafka clusters.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package com.exasol.cloudetl.kafka

import java.lang.{Integer => JInt}
import java.lang.{Long => JLong}

import com.exasol.ExaMetadata

import org.mockito.ArgumentMatchers._
import org.mockito.Mockito.times
import org.mockito.Mockito.verify

class KafkaTopicDataImporterAsJsonIT extends KafkaIntegrationTest {

test("run emits records from starting initial offset") {
val newProperties = properties ++ Map(
"AS_JSON_DOC" -> "true"
)
createCustomTopic(topic)
publishToKafka(topic, AvroRecord("{'Value':'abc'}", 3, 13))
publishToKafka(topic, AvroRecord("{'Value':'hello'}", 4, 14))
publishToKafka(topic, AvroRecord("{'Value':'xyz'}", 5, 15))

val iter = mockExasolIterator(newProperties, Seq(0), Seq(-1))
KafkaTopicDataImporter.run(mock[ExaMetadata], iter)

verify(iter, times(3)).emit(Seq(any[Object]): _*)
verify(iter, times(3)).emit(
anyString(),
anyInt().asInstanceOf[JInt],
anyLong().asInstanceOf[JLong]
)
verify(iter, times(1)).emit(
"{\"col_str\": \"{'Value':'abc'}\", \"col_int\": 3, \"col_long\": 13}",
JInt.valueOf(0),
JLong.valueOf(0)
)
verify(iter, times(1)).emit(
"{\"col_str\": \"{'Value':'hello'}\", \"col_int\": 4, \"col_long\": 14}",
JInt.valueOf(0),
JLong.valueOf(1)
)
verify(iter, times(1)).emit(
"{\"col_str\": \"{'Value':'xyz'}\", \"col_int\": 5, \"col_long\": 15}",
JInt.valueOf(0),
JLong.valueOf(2)
)
}

test("run emits records starting from provided offset") {
val newProperties = properties ++ Map(
"AS_JSON_DOC" -> "true"
)
createCustomTopic(topic)
publishToKafka(topic, AvroRecord("{'Value':'abc'}", 3, 13))
publishToKafka(topic, AvroRecord("{'Value':'hello'}", 4, 14))
publishToKafka(topic, AvroRecord("{'Value':'def'}", 7, 17))
publishToKafka(topic, AvroRecord("{'Value':'xyz'}", 13, 23))

// records at 0, 1 are already read, committed
val iter = mockExasolIterator(newProperties, Seq(0), Seq(1))
KafkaTopicDataImporter.run(mock[ExaMetadata], iter)

verify(iter, times(2)).emit(Seq(any[Object]): _*)
verify(iter, times(2)).emit(
anyString(),
anyInt().asInstanceOf[JInt],
anyLong().asInstanceOf[JLong]
)
verify(iter, times(1)).emit(
"{\"col_str\": \"{'Value':'def'}\", \"col_int\": 7, \"col_long\": 17}",
JInt.valueOf(0),
JLong.valueOf(2)
)
verify(iter, times(1)).emit(
"{\"col_str\": \"{'Value':'xyz'}\", \"col_int\": 13, \"col_long\": 23}",
JInt.valueOf(0),
JLong.valueOf(3)
)
}

test("run emits records within min / max records per run") {
val newProperties = properties ++ Map(
"MAX_POLL_RECORDS" -> "2",
"MIN_RECORDS_PER_RUN" -> "2",
"MAX_RECORDS_PER_RUN" -> "4",
"AS_JSON_DOC" -> "true"
)
createCustomTopic(topic)
publishToKafka(topic, AvroRecord("{'Value':'abc'}", 3, 13))
publishToKafka(topic, AvroRecord("{'Value':'hello'}", 4, 14))
publishToKafka(topic, AvroRecord("{'Value':'def'}", 7, 17))
publishToKafka(topic, AvroRecord("{'Value':'xyz'}", 13, 23))

// comsumer in two batches each with 2 records
val iter = mockExasolIterator(newProperties, Seq(0), Seq(-1))
KafkaTopicDataImporter.run(mock[ExaMetadata], iter)

verify(iter, times(4)).emit(Seq(any[Object]): _*)
verify(iter, times(4)).emit(
anyString(),
anyInt().asInstanceOf[JInt],
anyLong().asInstanceOf[JLong]
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ class KafkaConsumerProperties(private val properties: Map[String, String])
final def getGroupId(): String =
get(GROUP_ID.userPropertyName).fold(GROUP_ID.defaultValue)(identity)

/**
* Returns user provided boolean,
* if it is not provided by user
* returns default value of false.
*/
final def getSingleColJson(): Boolean =
isEnabled(AS_JSON_DOC)

/** Returns the user provided topic name. */
final def getTopic(): String =
getString(TOPIC_NAME)
Expand Down Expand Up @@ -342,6 +350,14 @@ object KafkaConsumerProperties extends CommonProperties {
"EXASOL_KAFKA_UDFS_CONSUMERS"
)

/**
*
* It is a boolean that defines whether data should be imported as
* JSON in single column when [[AS_JSON_DOC]] is set to 'true'
* or as avro message when [[AS_JSON_DOC]] is 'false' or not set
*/
private[kafka] final val AS_JSON_DOC: String = "AS_JSON_DOC"

/**
* This is the {@code max.poll.records} configuration setting.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.exasol.ExaMetadata
import com.exasol.common.avro.AvroRow

import com.typesafe.scalalogging.LazyLogging
import org.apache.avro.generic.GenericData
import org.apache.kafka.common.TopicPartition

/**
Expand All @@ -18,6 +19,13 @@ import org.apache.kafka.common.TopicPartition
*/
object KafkaTopicDataImporter extends LazyLogging {

private def getAvroRow(singleJson: Boolean, recordValue: GenericData.Record): Seq[Object] =
if (singleJson) {
Seq(s"$recordValue")
} else {
AvroRow(recordValue).getValues().map(_.asInstanceOf[AnyRef])
}

/**
* Consumes Kafka topic records and emits them into an Exasol table.
*
Expand Down Expand Up @@ -49,6 +57,7 @@ object KafkaTopicDataImporter extends LazyLogging {
val maxRecords = kafkaProperties.getMaxRecordsPerRun()
val minRecords = kafkaProperties.getMinRecordsPerRun()
val timeout = kafkaProperties.getPollTimeoutMs()
val singleColumnJson = kafkaProperties.getSingleColJson()

try {
var recordCount = 0
Expand All @@ -63,13 +72,16 @@ object KafkaTopicDataImporter extends LazyLogging {
s"'${record.offset()}' with key '${record.key()}' and " +
s"value '${record.value()}'"
)

val metadata: Seq[Object] = Seq(
record.partition().asInstanceOf[AnyRef],
record.offset().asInstanceOf[AnyRef]
)
val avroRow = AvroRow(record.value()).getValues().map(_.asInstanceOf[AnyRef])
val exasolRow: Seq[Object] = avroRow ++ metadata

val recordValue = record.value().asInstanceOf[GenericData.Record]
val exasolRow: Seq[Object] = getAvroRow(singleColumnJson, recordValue) ++ metadata
iterator.emit(exasolRow: _*)

}
logger.info(
s"Emitted total '$totalRecordCount' records for partition " +
Expand Down