Skip to content

Commit

Permalink
Feat/es suport pk from key (#162)
Browse files Browse the repository at this point in the history
* ElasticSearch Document Primary Key

The ES sink connector misses the feature of choosing the key from the Key or Header. No SMT would help move data from the Key into the Value payload so that the connector can work in the scenarios where the Key or a Header carries information to be used as part of the ElasticSearch document primary key.

The change refines the TransformAndExtractPK to take the Key and Headers. It adds tests that were missing for PrimaryKeyExtractor, JsonPayloadExtractor and TransformAndExtractPK

* Improve the code complexity

Co-authored-by: David Sloan <[email protected]>

* Improve the test for json payload to mix ing OptionValues and reduce the code required

Make the _key/_value/_header a constant.

* Avoid deseralising the key a json if there is not _key path in the primary keys list

* Enhances the functionality of PK path extraction by allowing the path to be specified as _key or nested paths like _key.fieldA.fieldB. This change broadens the scope of supported incoming types, ensuring compatibility with all Kafka Connect Struct types, as well as schemaless input. It provides more flexibility and robustness in handling diverse data formats for primary key extraction.

* Fix the unit tests and the handling of bytes/string

* Remove unused import

---------

Co-authored-by: stheppi <[email protected]>
Co-authored-by: David Sloan <[email protected]>
  • Loading branch information
3 people authored Nov 25, 2024
1 parent fac4ffd commit 0fe0531
Show file tree
Hide file tree
Showing 16 changed files with 2,756 additions and 191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,13 @@ class ElasticJsonWriter(client: KElasticClient, settings: ElasticSettings)
)
} else {
TransformAndExtractPK(
kcqlValue.fields,
kcqlValue.primaryKeysPath,
kcqlValue,
r.valueSchema(),
r.value(),
kcql.hasRetainStructure,
r.keySchema(),
r.key(),
r.headers(),
)
}
val idFromPk = pks.mkString(settings.pkJoinerSeparator)
Expand Down Expand Up @@ -211,15 +213,13 @@ class ElasticJsonWriter(client: KElasticClient, settings: ElasticSettings)
pks.mkString(settings.pkJoinerSeparator)
}

private case class KcqlValues(
fields: Seq[Field],
ignoredFields: Seq[Field],
primaryKeysPath: Seq[Vector[String]],
behaviorOnNullValues: NullValueBehavior,
)

}

case class KcqlValues(
fields: Seq[Field],
ignoredFields: Seq[Field],
primaryKeysPath: Seq[Vector[String]],
behaviorOnNullValues: NullValueBehavior,
)
case object IndexableJsonNode extends Indexable[JsonNode] {
override def json(t: JsonNode): String = t.toString
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.elastic6

import cats.implicits.catsSyntaxEitherId
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.BigIntegerNode
import com.fasterxml.jackson.databind.node.BooleanNode
import com.fasterxml.jackson.databind.node.DecimalNode
import com.fasterxml.jackson.databind.node.DoubleNode
import com.fasterxml.jackson.databind.node.FloatNode
import com.fasterxml.jackson.databind.node.IntNode
import com.fasterxml.jackson.databind.node.LongNode
import com.fasterxml.jackson.databind.node.TextNode
import io.lenses.json.sql.JacksonJson
import io.lenses.streamreactor.connect.json.SimpleJsonConverter
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.Struct

import java.nio.ByteBuffer
import scala.jdk.CollectionConverters.CollectionHasAsScala
import scala.jdk.CollectionConverters.MapHasAsScala
import scala.util.Failure
import scala.util.Success
import scala.util.Try

object JsonPayloadExtractor {
lazy val simpleJsonConverter = new SimpleJsonConverter()

def extractJsonNode(value: Any, schema: Schema): Either[String, Option[JsonNode]] =
(Option(value), Option(schema).map(_.`type`())) match {
case (None, _) => Right(None)
case (Some(_), Some(Schema.Type.BYTES)) => handleBytes(value)
case (Some(_), Some(Schema.Type.STRING)) => handleString(value)
case (Some(_), Some(Schema.Type.INT8)) => handleLong(value)
case (Some(_), Some(Schema.Type.INT16)) => handleLong(value)
case (Some(_), Some(Schema.Type.INT32)) => handleLong(value)
case (Some(_), Some(Schema.Type.INT64)) => handleLong(value)
case (Some(_), Some(Schema.Type.FLOAT32)) => handleFloat(value)
case (Some(_), Some(Schema.Type.FLOAT64)) => handleDouble(value)
case (Some(_), Some(Schema.Type.STRUCT)) => handleStruct(value)
case (Some(_), Some(Schema.Type.BOOLEAN)) => handleBoolean(value)
case (Some(_), Some(Schema.Type.ARRAY)) => handleArray(value)
case (Some(_), Some(Schema.Type.MAP)) => handleMap(value)
case (Some(_), Some(other)) => Left(s"Unsupported Schema type: $other")
case (Some(v), None) => handleSchemaLess(v)
}

private def handleArray(value: Any): Either[String, Option[JsonNode]] =
value match {
case l: Iterable[_] =>
val arrayNode = JacksonJson.mapper.createArrayNode()
l.foreach { item =>
extractJsonNode(item, null) match {
case Right(Some(node)) => arrayNode.add(node)
case Right(None) => // ignore
case Left(err) => return Left(err)
}
}
Right(Some(arrayNode))

case jc: java.util.Collection[_] =>
val arrayNode = JacksonJson.mapper.createArrayNode()
jc.asScala.foreach { item =>
extractJsonNode(item, null) match {
case Right(Some(node)) => arrayNode.add(node)
case Right(None) => // ignore
case Left(err) => return Left(err)
}
}
Right(Some(arrayNode))
case a: Array[_] =>
val arrayNode = JacksonJson.mapper.createArrayNode()
a.foreach { item =>
extractJsonNode(item, null) match {
case Right(Some(node)) => arrayNode.add(node)
case Right(None) => // ignore
case Left(err) => return Left(err)
}
}
Right(Some(arrayNode))
case other => Left(s"Expected array but got: $other")
}

private def handleMap(value: Any): Either[String, Option[JsonNode]] =
value match {
case m: java.util.Map[_, _] =>
val map = m.asInstanceOf[java.util.Map[String, Any]]
val mapNode = JacksonJson.mapper.createObjectNode()
map.asScala.foreach {
case (key, value) =>
extractJsonNode(value, null) match {
case Right(Some(node)) => mapNode.set(key, node)
case Right(None) => // ignore
case Left(err) => return Left(err)
}
}
Right(Some(mapNode))
case other => Left(s"Expected map but got: $other")
}
private def handleBoolean(value: Any): Either[String, Option[JsonNode]] =
value match {
case b: Boolean => Some(BooleanNode.valueOf(b)).asRight[String]
case other => Left(s"Expected boolean but got: $other")
}
private def handleDouble(value: Any): Either[String, Option[JsonNode]] =
value match {
case f: Float => Some(DoubleNode.valueOf(f.toDouble)).asRight[String]
case d: Double => Some(DoubleNode.valueOf(d)).asRight[String]
case other => Left(s"Expected double but got: $other")
}

private def handleFloat(value: Any): Either[String, Option[JsonNode]] =
value match {
case f: Float => Some(FloatNode.valueOf(f)).asRight[String]
case d: Double => Some(FloatNode.valueOf(d.toFloat)).asRight[String]
case other => Left(s"Expected float but got: $other")
}

private def handleLong(value: Any): Either[String, Option[JsonNode]] =
value match {
case b: Byte => Some(LongNode.valueOf(b.toLong)).asRight[String]
case s: Short => Some(LongNode.valueOf(s.toLong)).asRight[String]
case i: Int => Some(LongNode.valueOf(i.toLong)).asRight[String]
case l: Long => Some(LongNode.valueOf(l)).asRight[String]
case other => Left(s"Expected long but got: $other")
}

private def handleBytes(value: Any): Either[String, Option[JsonNode]] =
value match {
case bytes: Array[Byte] =>
tryReadJson(bytes).map(Some(_))
case byteBuffer: ByteBuffer =>
val bytes = new Array[Byte](byteBuffer.remaining())
byteBuffer.get(bytes)
tryReadJson(bytes).map(Some(_))
case other => Left(s"Expected byte array or ByteBuffer but got: $other")
}

private def handleString(value: Any): Either[String, Option[JsonNode]] =
value match {
case s: String =>
tryParseJson(s).map(Some(_)) match {
case Left(_) => TextNode.valueOf(s).asRight[String].map(Some(_))
case r => r
}
case other => Left(s"Expected string but got: $other")
}

private def handleStruct(value: Any): Either[String, Option[JsonNode]] =
value match {
case struct: Struct =>
Try(simpleJsonConverter.fromConnectData(struct.schema(), struct)) match {
case Success(jsonNode) => Right(Some(jsonNode))
case Failure(e) => Left(s"Failed to convert Struct to JsonNode: ${e.getMessage}")
}
case other => Left(s"Expected Struct but got: $other")
}

private def handleSchemaLess(value: Any): Either[String, Option[JsonNode]] =
value match {
case m: java.util.Map[_, _] =>
Try {
val map = m.asInstanceOf[java.util.Map[String, Any]]
JacksonJson.mapper.valueToTree[JsonNode](map)
} match {
case Success(node) => Right(Some(node))
case Failure(e) => Left(s"Failed to convert Map to JsonNode: ${e.getMessage}")
}

case s: String =>
tryParseJson(s).map(Some(_)) match {
case Left(_) => TextNode.valueOf(s).asRight[String].map(Some(_))
case r => r
}
case b: Array[Byte] => tryReadJson(b).map(Some(_))
case b: Byte => IntNode.valueOf(b.toInt).asRight[String].map(Some(_))
case s: Short => IntNode.valueOf(s.toInt).asRight[String].map(Some(_))
case i: Int => IntNode.valueOf(i).asRight[String].map(Some(_))
case l: Long => LongNode.valueOf(l).asRight[String].map(Some(_))
case f: Float => FloatNode.valueOf(f).asRight[String].map(Some(_))
case double: Double => DoubleNode.valueOf(double).asRight[String].map(Some(_))
case bigDecimal: BigDecimal => DecimalNode.valueOf(bigDecimal.bigDecimal).asRight[String].map(Some(_))
case bigDecimal: java.math.BigDecimal => DecimalNode.valueOf(bigDecimal).asRight[String].map(Some(_))
case boolean: Boolean => BooleanNode.valueOf(boolean).asRight[String].map(Some(_))
case bi: BigInt => BigIntegerNode.valueOf(bi.bigInteger).asRight[String].map(Some(_))
case bi: java.math.BigInteger => BigIntegerNode.valueOf(bi).asRight[String].map(Some(_))
case other => Left(s"Unsupported value type: ${other.getClass.getName}")
}

private def tryParseJson(str: String): Either[String, JsonNode] =
Try(JacksonJson.asJson(str)) match {
case Success(json) => Right(json)
case Failure(e) => Left(s"Invalid JSON string: ${e.getMessage}")
}

private def tryReadJson(bytes: Array[Byte]): Either[String, JsonNode] =
Try(JacksonJson.mapper.readTree(bytes)) match {
case Success(json) => Right(json)
case Failure(e) => Left(s"Invalid JSON bytes: ${e.getMessage}")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.elastic6

object KafkaMessageParts {
val Key = "_key"
val Value = "_value"
val Header = "_header"
val Topic = "_topic"
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import scala.jdk.CollectionConverters.IteratorHasAsScala
import scala.jdk.CollectionConverters.ListHasAsScala

object PrimaryKeyExtractor {
def extract(node: JsonNode, path: Vector[String]): Any = {
def extract(node: JsonNode, path: Vector[String], prefix: String = ""): Any = {
@tailrec
def innerExtract(n: JsonNode, p: Vector[String]): Any = {
def checkValidPath(): Unit =
if (p.nonEmpty) {
throw new IllegalArgumentException(
s"Invalid field selection for '${path.mkString(".")}'. It doesn't resolve to a primitive field",
s"Invalid field selection for '$prefix${path.mkString(".")}'. It doesn't resolve to a primitive field",
)
}

Expand Down Expand Up @@ -79,24 +79,24 @@ object PrimaryKeyExtractor {
case node: ObjectNode =>
if (p.isEmpty) {
throw new IllegalArgumentException(
s"Invalid field selection for '${path.mkString(".")}'. The path is not resolving to a primitive field",
s"Invalid field selection for '$prefix${path.mkString(".")}'. The path is not resolving to a primitive field",
)
}
val childNode = Option(node.get(p.head)).getOrElse {
throw new IllegalArgumentException(
s"Invalid field selection for '${path.mkString(".")}'. Can't find ${p.head} field. Field found are:${node.fieldNames().asScala.mkString(",")}",
s"Invalid field selection for '$prefix${path.mkString(".")}'. Can't find ${p.head} field. Field found are:${node.fieldNames().asScala.mkString(",")}",
)
}

innerExtract(childNode, p.tail)
case _: ArrayNode =>
throw new IllegalArgumentException(
s"Invalid field selection for '${path.mkString(".")}'. The path is involving an array structure",
s"Invalid field selection for '$prefix${path.mkString(".")}'. The path is involving an array structure",
)

case other =>
throw new IllegalArgumentException(
s"Invalid field selection for '${path.mkString(".")}'. $other is not handled",
s"Invalid field selection for '$prefix${path.mkString(".")}'. $other is not handled",
)
}
}
Expand Down
Loading

0 comments on commit 0fe0531

Please sign in to comment.