Skip to content

Custom Kafka Connect SMT implementation that allows to use JSLT (https://github.com/schibsted/jslt) as transformation logic

License

Notifications You must be signed in to change notification settings

willhaben/wh-kafka-connect-jslt-transform

Repository files navigation

Kafka Connect JSLT Single Message Transform

This is an implementation of the Kafka Connect SMT interface to offer transformation capabilities using the Schibsted JSLT library.

Build

The library uses Gradle to build the JAR.

  1. Install latest Java SDK.
  2. Checkout the Git repository and change to its root folder.
  3. Execute ./gradlew build

The JAR can then be found in the build/libs/ subfolder.

Install

After the JAR was build as described above, copy it to your Kafka Connect instance into one of the directories listed in the plugin.path property in the connect worker configuration file.

Make sure to do this on all Kafka Connect worker nodes!

See the Confluent Kafka Connect Plugins Userguide for more details.

Usage

Connector Configuration

The transformer expects the jslt attribute in the connector config JSON. In the following example the dot means it will not perform any transformation but this string can be any valid JSLT expression.

Note that Json does not support multiline strings. So linebreaks and quotes must be escaped with a backslash (e.g. " -> \")

{
  "name": ...,
  "config": {
    ...,
    "transforms": "jsltTransform",
    "transforms.jsltTransform.type": "at.willhaben.kafka.connect.transforms.jslt.JsltTransform$Value",
    "transforms.jsltTransform.jslt": "."
  }
}

Note that the transformer only supports some kind of structured input. So make sure that there is a converter class (e.g. AvroConverter, JsonConverter) like "value.converter": "io.confluent.connect.avro.AvroConverter" that provides the data in a structured format.

The following links are helpful to learn more about JSLT:

Example JSLT

The following JSLT takes a nested structure as input and

  • adds a new constant field producer_team
  • maps the values of the field customer_status from a numeric representation to a code, using custom logic
  • pseudonymizes the customer_id field
  • adds a list of objects field locations based on the separate invoice/delivery address fields of the input
  • extracts and flattens the two attributes customer_type and customer_class

Input Record

{
  "cusomer_id": 123456,
  "customer_status": 2,
  "invoice_address_street": "McDuck Manor",
  "invoice_address_zip_code": "1312",
  "invoice_address_city": "Duckburg",
  "delivery_address_street": "Webfoot Walk",
  "delivery_address_zip_code": "1313",
  "delivery_address_city": "Duckburg",
  "attributes": {
    "customer_type": "C2C",
    "customer_class": "A",
    "last_order": "2022-03-10T18:25:43.511Z"
  }
}

JSLT

{
  def map_status(status)
  if ($status == 0)
    "ACTIVE"
  else if ($status == 1)
    "INACTIVE"
  else
    "UNDEFINED"

  "customer_id": sha256-hex(.customer_id),
  "customer_status_code": map_status(.customer_status),
  "locations": [
    {
      "zip_code": .invoice_address_zip_code,
      "city": .invoice_address_city
    },
    {
      "zip_code": .delivery_address_zip_code,
      "city": .delivery_address_city
    }
  ],
  "customer_type": .attributes.customer_type,
  "customer_class": .attributes.customer_class,
  "producer_team": "us.california.burbank.disney"
}

Output Record

{
  "customer_id": "8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92",
  "customer_status_code": "INACTIVE",
  "locations": [
    {
      "zip_code": "1312",
      "city": "Duckburg"
    },
    {
      "zip_code": "1313",
      "city": "Duckburg"
    }
  ],
  "customer_type": "C2C",
  "customer_class": "A",
  "producer_team": "us.california.burbank.disney"
}

Additional Notes

The JSLT library works with the JsonNode class to perform the transformation.

This class uses quite generic data types and the conversion to/from JsonNode could lead to some changes in the returned datatype. This might especially an issue with floating point values!