Skip to content

Commit

Permalink
feat: Implement tombstone message handling in JDBC sink connector
Browse files Browse the repository at this point in the history
- Add support for handling tombstone messages in the JDBC sink connector.
- Implement ability to delete rows based on tombstone messages.
- Introduce new parameter `delete.enabled` to control delete behavior.
- Align functionality with documented approach for
  processing tombstones, similar to Confluent JDBC driver behavior.
- Add new integration test with Oracle database.
- Implement integration tests for PostgreSQL and Oracle database
  to test insert and delete operations.
- Add new sink config validation for delete enabled config.
- Updated the docs to include the new config for sink connector.

Signed-off-by: Joel Hanson <[email protected]>
Signed-off-by: Joel Hanson <[email protected]>
  • Loading branch information
Joel-hanson authored and Joel Hanson committed Jun 11, 2024
1 parent 667202b commit 4a10120
Show file tree
Hide file tree
Showing 20 changed files with 1,553 additions and 82 deletions.
5 changes: 4 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import java.io.FileOutputStream
import java.net.URL

/*
* Copyright 2021 Aiven Oy and jdbc-connector-for-apache-kafka contributors
* Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -163,6 +163,7 @@ dependencies {

runtimeOnly("org.xerial:sqlite-jdbc:3.46.0.0")
runtimeOnly("org.postgresql:postgresql:42.7.3")
runtimeOnly("com.oracle.database.jdbc:ojdbc8:23.4.0.24.05")
runtimeOnly("net.sourceforge.jtds:jtds:1.3.1")
runtimeOnly("net.snowflake:snowflake-jdbc:3.16.0")
runtimeOnly("com.microsoft.sqlserver:mssql-jdbc:12.6.1.jre11")
Expand Down Expand Up @@ -201,6 +202,8 @@ dependencies {
integrationTestImplementation("org.testcontainers:kafka:$testcontainersVersion") // this is not Kafka version
integrationTestImplementation("org.testcontainers:testcontainers:$testcontainersVersion")
integrationTestImplementation("org.testcontainers:postgresql:$testcontainersVersion")
integrationTestImplementation("org.testcontainers:oracle-free:$testcontainersVersion")

integrationTestImplementation("org.awaitility:awaitility:$awaitilityVersion")
integrationTestImplementation("org.assertj:assertj-db:2.0.2")

Expand Down
19 changes: 19 additions & 0 deletions docs/sink-connector-config-options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,25 @@ Writes
* Valid Values: [0,...]
* Importance: medium

``delete.enabled``
Enable deletion of rows based on tombstone messages.

* Type: boolean
* Default: false
* Importance: medium

Note:

A tombstone message has:

- a not null key
- a null value

In case of tombstone messages and ``delete.enabled`` set to ``true``,
the JDBC sink connector will delete the row referenced by the
message key. If set to ``true``, it requires the ``pk.mode`` to be
``record_key`` to be able to identify the rows to delete.

Data Mapping
^^^^^^^^^^^^

Expand Down
20 changes: 20 additions & 0 deletions docs/sink-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,26 @@ the same.

To use this mode, set `pk.mode=record_value`.

## Deletion Handling

### Tombstone Messages

A tombstone message is a special type of record in Kafka that signifies
the deletion of a key. It has:

- a not null **key**
- a null **value**

Tombstone messages are typically used in compacted topics to indicate
that the key should be removed from the downstream system.

In case of tombstone messages and `delete.enabled` set to `true`,
the JDBC sink connector will delete the row referenced by the
message key. If set to `true`, it requires the `pk.mode` to be
`record_key` to be able to identify the rows to delete.

To enable deletion handling, set `delete.enabled=true`.

## Table Auto-Creation and Auto-Evolution

### Auto-Creation
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.jdbc.oracle;

import javax.sql.DataSource;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;

import io.aiven.kafka.connect.jdbc.AbstractIT;

import oracle.jdbc.pool.OracleDataSource;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.oracle.OracleContainer;
import org.testcontainers.utility.DockerImageName;

public class AbstractOracleIT extends AbstractIT {

public static final String DEFAULT_ORACLE_TAG = "slim-faststart";
private static final DockerImageName DEFAULT_ORACLE_IMAGE_NAME =
DockerImageName.parse("gvenzl/oracle-free")
.withTag(DEFAULT_ORACLE_TAG);
@Container
protected final OracleContainer oracleContainer = new OracleContainer(DEFAULT_ORACLE_IMAGE_NAME);

protected void executeSqlStatement(final String sqlStatement) throws SQLException {
try (final Connection connection = getDatasource().getConnection();
final Statement statement = connection.createStatement()) {
statement.executeUpdate(sqlStatement);
}
}

protected DataSource getDatasource() throws SQLException {
final OracleDataSource dataSource = new OracleDataSource();
dataSource.setServerName(oracleContainer.getHost());
// Assuming the default Oracle port is 1521
dataSource.setPortNumber(oracleContainer.getMappedPort(1521));
// Or use setDatabaseName() if that's how your Oracle is configured
dataSource.setServiceName(oracleContainer.getDatabaseName());
dataSource.setUser(oracleContainer.getUsername());
dataSource.setPassword(oracleContainer.getPassword());
dataSource.setDriverType("thin");
return dataSource;
}


protected Map<String, String> basicConnectorConfig() {
final HashMap<String, String> config = new HashMap<>();
config.put("tasks.max", "1");
config.put("connection.url", oracleContainer.getJdbcUrl());
config.put("connection.user", oracleContainer.getUsername());
config.put("connection.password", oracleContainer.getPassword());
config.put("dialect.name", "OracleDatabaseDialect");
config.put("key.converter", "io.confluent.connect.avro.AvroConverter");
config.put("key.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl());
config.put("value.converter", "io.confluent.connect.avro.AvroConverter");
config.put("value.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl());
return config;
}
}
Loading

0 comments on commit 4a10120

Please sign in to comment.