Skip to content

Commit

Permalink
ZOLLAK2-211 initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Wölfel committed Dec 22, 2023
1 parent 428298a commit 41c178d
Show file tree
Hide file tree
Showing 16 changed files with 770 additions and 4 deletions.
38 changes: 34 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,39 @@ buildNumber.properties
.mvn/timing.properties
# https://github.com/takari/maven-wrapper#usage-without-binary-jar
.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/

# Eclipse m2e generated files
# Eclipse Core
.project
# JDT-specific (Eclipse Java Development Tools)
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr

### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache

### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/

### VS Code ###
.vscode/

### Mac OS ###
.DS_Store
56 changes: 56 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@

# Interception Kafka Connect Converter

This repo allows a user to reference an existing Kafka Connect Converter and inject one or more interceptors to alter the
connect Schema before the conversion is performed.

This Class is aligned with the confluent convertor configuration.

i.e.
```
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.enhanced.avro.schema.support: false
value.converter.schema.registry.url: http://schema-registry.strimzi.svc.cluster.local:8081
value.converter.schemas.enable: "true"
```

could be intercepted by using this configuration

```
value.converter: com.tiki.lakehouse.kafka.connect.converter.InterceptionConverter
value.converter.wrapped.class: io.confluent.connect.avro.AvroConverter
value.converter.wrapped.config.enhanced.avro.schema.support: false
value.converter.wrapped.config.schema.registry.url: http://schema-registry.strimzi.svc.cluster.local:8081
value.converter.wrapped.config.schemas.enable: "true"
value.converter.interceptors=com.tiki.lakehouse.kafka.connect.converter.interceptors.InitConnectDecimalFix
```

## Interceptors

- ### com.tiki.lakehouse.kafka.connect.converter.interceptors.InitConnectDecimalFix

Fixes a missing property of the connect schema regarding Decimals (``org.apache.kafka.connect.data.Decimal``).

While using ``org.init.ohja.kafka.connect.odp.source.ODPSourceConnector`` you can observe the connect configuration below for Decimals.
This configuration is missing the property ``connect.decimal.precision`` for kafka connect (using confluent) to successfully convert Decimals in a Avro Message using ``io.confluent.connect.avro.AvroConverter``.

So:
```
{
"scale": "3",
"jcoType": "JCoBCDType",
"length": "7",
"decimals": "3"
}
```
will be converted into :
```
{
"scale": "3",
"jcoType": "JCoBCDType",
"length": "7",
"decimals": "3",
"connect.decimal.precision": "3"
}
```
by cloning the property ``decimals``
99 changes: 99 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<licenses>
<license>
<name>Apache License 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.html</url>
<distribution>repo</distribution>
</license>
</licenses>

<artifactId>kafka-connect-interception-converter</artifactId>
<name>kafka-connect-interception-converter</name>

<groupId>com.tiki</groupId>
<version>0.1</version>
<url>https://tiki-institut.com/</url>
<packaging>jar</packaging>

<repositories>
<repository>
<id>central</id>
<url>https://repo.maven.apache.org/maven2</url>
</repository>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>

<properties>
<project.build.sourceEncoding>utf-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>utf-8</project.reporting.outputEncoding>
<apache.connect.version>5.5.15</apache.connect.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>${apache.connect.version}-ccs</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${apache.connect.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<version>${apache.connect.version}</version>
<scope>provided</scope>
</dependency>


<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptors>
<descriptor>src/main/assembly/dist-zip.xml</descriptor>
</descriptors>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
29 changes: 29 additions & 0 deletions src/main/assembly/dist-zip.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
<id>dist-zip</id>
<includeBaseDirectory>true</includeBaseDirectory>

<formats>
<format>zip</format>
</formats>
<fileSets>
<fileSet>
<directory>${project.basedir}/conf</directory>
</fileSet>
</fileSets>
<files>
<file>
<source> ${project.build.directory}/${project.artifactId}-${project.version}.jar</source>
<outputDirectory>/</outputDirectory>
</file>
</files>
<dependencySets>
<dependencySet>
<outputDirectory>lib</outputDirectory>
<excludes>
<exclude>${project.groupId}:${project.artifactId}:jar:*</exclude>
</excludes>
</dependencySet>
</dependencySets>
</assembly>
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.tiki.lakehouse.kafka.connect.converter;

import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;

import java.util.Comparator;
import java.util.function.Function;

public abstract class BaseInterceptor implements Interceptor {


protected Schema cloneSchema(Schema schema, Function<SchemaBuilder, SchemaBuilder> cb) {

SchemaBuilder builder;
switch (schema.type()) {
case STRUCT:
builder = new SchemaBuilder(Schema.Type.STRUCT);
var structFields = schema.fields();
if (!structFields.isEmpty()) {
structFields.sort(Comparator.comparingInt(Field::index));
for (var f : structFields) {
builder.field(f.name(), f.schema());
}
}
break;
case ARRAY:
builder = SchemaBuilder.array(schema.valueSchema());
break;
case MAP:
builder = SchemaBuilder.map(schema.keySchema(), schema.valueSchema());
break;
default:
builder = SchemaBuilder.type(schema.type());
break;
}


if (schema.isOptional()) {
builder.optional();
} else {
builder.required();
}

var schemaDefaultValue = schema.defaultValue();
if (schemaDefaultValue != null) builder.defaultValue(schemaDefaultValue);

builder.name(schema.name());
builder.version(schema.version());
builder.doc(schema.doc());

var schemaParameters = schema.parameters();
if (schemaParameters != null) builder.parameters(schemaParameters);

return cb.apply(builder).build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.tiki.lakehouse.kafka.connect.converter;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;

import java.util.List;
import java.util.Map;

public class InterceptionConverter implements Converter {

public List<Interceptor> interceptors;

public Converter wrappedConverter;

public InterceptionConverter() {
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
var myConfig = new InterceptionConverterConfig(configs);
this.wrappedConverter = myConfig.wrappedConfiguredConverterInstance(isKey);
this.interceptors = myConfig.interceptors();
}

@Override
public byte[] fromConnectData(String s, Schema schema, Object o) {
Schema result = schema;
for (Interceptor interceptor : this.interceptors) {
result = interceptor.Apply(result);
}
return this.wrappedConverter.fromConnectData(s, result, o);
}

@Override
public SchemaAndValue toConnectData(String s, byte[] bytes) {

var schemaAndValue = this.wrappedConverter.toConnectData(s, bytes);

if (schemaAndValue == SchemaAndValue.NULL) {
return schemaAndValue;
}

var schema = schemaAndValue.schema();
for (Interceptor interceptor : this.interceptors) {
interceptor.Apply(schema);
}

return new SchemaAndValue(schema, schemaAndValue.value());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.tiki.lakehouse.kafka.connect.converter;

import io.confluent.common.config.AbstractConfig;
import io.confluent.common.config.ConfigDef;
import org.apache.kafka.connect.storage.Converter;

import java.util.Collections;
import java.util.List;
import java.util.Map;

public class InterceptionConverterConfig extends AbstractConfig {
public InterceptionConverterConfig(Map<?, ?> props) {
super(baseConfigDef(), props);
}

public Converter wrappedConfiguredConverterInstance(Boolean isKey) {
var result = this.getConfiguredInstance(WRAPPED_CONVERTER_CLASS_CONFIG, Converter.class);
result.configure(this.originalsWithPrefix(WRAPPED_CONVERTER_CLASS_CONFIG_PREFIX), isKey);
return result;
}

public List<Interceptor> interceptors() {
return this.getConfiguredInstances(WRAPPED_CONVERTER_INTERCEPTORS_CONFIG, Interceptor.class);
}

public static final String WRAPPED_CONVERTER_CLASS_CONFIG = "wrapped.class";
public static final String WRAPPED_CONVERTER_CLASS_CONFIG_DOC =
"The wrapped instance which shall be used for conversion.";

public static final String WRAPPED_CONVERTER_CLASS_CONFIG_PREFIX = "wrapped.config.";

public static final String WRAPPED_CONVERTER_INTERCEPTORS_CONFIG = "interceptors";
public static final String WRAPPED_CONVERTER_INTERCEPTORS_CONFIG_DOC =
"Comma-separated list of class names used as interceptors.";


public static ConfigDef baseConfigDef() {
var configDef = new ConfigDef()
.define(WRAPPED_CONVERTER_CLASS_CONFIG, ConfigDef.Type.CLASS, ConfigDef.Importance.HIGH, WRAPPED_CONVERTER_CLASS_CONFIG_DOC)
.define(WRAPPED_CONVERTER_INTERCEPTORS_CONFIG, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.LOW, WRAPPED_CONVERTER_INTERCEPTORS_CONFIG_DOC);

return configDef;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.tiki.lakehouse.kafka.connect.converter;
import org.apache.kafka.connect.data.Schema;

public interface Interceptor {
Schema Apply(Schema target ) ;
}
Loading

0 comments on commit 41c178d

Please sign in to comment.