From 4a101208daa10acd356263891ec6f522f1d31122 Mon Sep 17 00:00:00 2001 From: Joel Hanson Date: Wed, 27 Mar 2024 21:13:09 +0530 Subject: [PATCH] feat: Implement tombstone message handling in JDBC sink connector - Add support for handling tombstone messages in the JDBC sink connector. - Implement ability to delete rows based on tombstone messages. - Introduce new parameter `delete.enabled` to control delete behavior. - Align functionality with documented approach for processing tombstones, similar to Confluent JDBC driver behavior. - Add new integration test with Oracle database. - Implement integration tests for PostgreSQL and Oracle database to test insert and delete operations. - Add new sink config validation for delete enabled config. - Updated the docs to include the new config for sink connector. Signed-off-by: Joel Hanson Signed-off-by: Joel Hanson --- build.gradle.kts | 5 +- docs/sink-connector-config-options.rst | 19 ++ docs/sink-connector.md | 20 ++ .../connect/jdbc/oracle/AbstractOracleIT.java | 77 +++++ .../connect/jdbc/oracle/VerifyDeleteIT.java | 263 ++++++++++++++++++ .../connect/jdbc/oracle/VerifyInsertIT.java | 213 ++++++++++++++ .../jdbc/postgres/AbstractPostgresIT.java | 6 +- .../PartitionedTableIntegrationTest.java | 8 +- .../UnqualifiedTableNamesIntegrationTest.java | 30 +- .../connect/jdbc/postgres/VerifyDeleteIT.java | 263 ++++++++++++++++++ .../connect/jdbc/postgres/VerifyInsertIT.java | 212 ++++++++++++++ .../aiven/connect/jdbc/JdbcSinkConnector.java | 7 +- .../connect/jdbc/dialect/DatabaseDialect.java | 28 +- .../jdbc/dialect/GenericDatabaseDialect.java | 26 +- .../connect/jdbc/sink/BufferedRecords.java | 178 ++++++++---- .../connect/jdbc/sink/JdbcSinkConfig.java | 49 +++- .../jdbc/sink/PreparedStatementBinder.java | 7 +- .../jdbc/sink/BufferedRecordsTest.java | 93 +++++++ .../connect/jdbc/sink/JdbcSinkConfigTest.java | 17 +- .../jdbc/sink/JdbcSinkConnectorTest.java | 114 ++++++++ 20 files changed, 1553 insertions(+), 82 deletions(-) create mode 100644 src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/AbstractOracleIT.java create mode 100644 src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/VerifyDeleteIT.java create mode 100644 src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/VerifyInsertIT.java create mode 100644 src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/VerifyDeleteIT.java create mode 100644 src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/VerifyInsertIT.java create mode 100644 src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConnectorTest.java diff --git a/build.gradle.kts b/build.gradle.kts index 75480471..9bdf92e0 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -2,7 +2,7 @@ import java.io.FileOutputStream import java.net.URL /* - * Copyright 2021 Aiven Oy and jdbc-connector-for-apache-kafka contributors + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka contributors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -163,6 +163,7 @@ dependencies { runtimeOnly("org.xerial:sqlite-jdbc:3.46.0.0") runtimeOnly("org.postgresql:postgresql:42.7.3") + runtimeOnly("com.oracle.database.jdbc:ojdbc8:23.4.0.24.05") runtimeOnly("net.sourceforge.jtds:jtds:1.3.1") runtimeOnly("net.snowflake:snowflake-jdbc:3.16.0") runtimeOnly("com.microsoft.sqlserver:mssql-jdbc:12.6.1.jre11") @@ -201,6 +202,8 @@ dependencies { integrationTestImplementation("org.testcontainers:kafka:$testcontainersVersion") // this is not Kafka version integrationTestImplementation("org.testcontainers:testcontainers:$testcontainersVersion") integrationTestImplementation("org.testcontainers:postgresql:$testcontainersVersion") + integrationTestImplementation("org.testcontainers:oracle-free:$testcontainersVersion") + integrationTestImplementation("org.awaitility:awaitility:$awaitilityVersion") integrationTestImplementation("org.assertj:assertj-db:2.0.2") diff --git a/docs/sink-connector-config-options.rst b/docs/sink-connector-config-options.rst index ca59fea6..b905d2e1 100644 --- a/docs/sink-connector-config-options.rst +++ b/docs/sink-connector-config-options.rst @@ -83,6 +83,25 @@ Writes * Valid Values: [0,...] * Importance: medium +``delete.enabled`` + Enable deletion of rows based on tombstone messages. + + * Type: boolean + * Default: false + * Importance: medium + + Note: + + A tombstone message has: + + - a not null key + - a null value + + In case of tombstone messages and ``delete.enabled`` set to ``true``, + the JDBC sink connector will delete the row referenced by the + message key. If set to ``true``, it requires the ``pk.mode`` to be + ``record_key`` to be able to identify the rows to delete. + Data Mapping ^^^^^^^^^^^^ diff --git a/docs/sink-connector.md b/docs/sink-connector.md index b2e55a19..397cacf5 100644 --- a/docs/sink-connector.md +++ b/docs/sink-connector.md @@ -182,6 +182,26 @@ the same. To use this mode, set `pk.mode=record_value`. +## Deletion Handling + +### Tombstone Messages + +A tombstone message is a special type of record in Kafka that signifies +the deletion of a key. It has: + +- a not null **key** +- a null **value** + +Tombstone messages are typically used in compacted topics to indicate +that the key should be removed from the downstream system. + +In case of tombstone messages and `delete.enabled` set to `true`, +the JDBC sink connector will delete the row referenced by the +message key. If set to `true`, it requires the `pk.mode` to be +`record_key` to be able to identify the rows to delete. + +To enable deletion handling, set `delete.enabled=true`. + ## Table Auto-Creation and Auto-Evolution ### Auto-Creation diff --git a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/AbstractOracleIT.java b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/AbstractOracleIT.java new file mode 100644 index 00000000..f2700bcd --- /dev/null +++ b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/AbstractOracleIT.java @@ -0,0 +1,77 @@ +/* + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.jdbc.oracle; + +import javax.sql.DataSource; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; + +import io.aiven.kafka.connect.jdbc.AbstractIT; + +import oracle.jdbc.pool.OracleDataSource; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.oracle.OracleContainer; +import org.testcontainers.utility.DockerImageName; + +public class AbstractOracleIT extends AbstractIT { + + public static final String DEFAULT_ORACLE_TAG = "slim-faststart"; + private static final DockerImageName DEFAULT_ORACLE_IMAGE_NAME = + DockerImageName.parse("gvenzl/oracle-free") + .withTag(DEFAULT_ORACLE_TAG); + @Container + protected final OracleContainer oracleContainer = new OracleContainer(DEFAULT_ORACLE_IMAGE_NAME); + + protected void executeSqlStatement(final String sqlStatement) throws SQLException { + try (final Connection connection = getDatasource().getConnection(); + final Statement statement = connection.createStatement()) { + statement.executeUpdate(sqlStatement); + } + } + + protected DataSource getDatasource() throws SQLException { + final OracleDataSource dataSource = new OracleDataSource(); + dataSource.setServerName(oracleContainer.getHost()); + // Assuming the default Oracle port is 1521 + dataSource.setPortNumber(oracleContainer.getMappedPort(1521)); + // Or use setDatabaseName() if that's how your Oracle is configured + dataSource.setServiceName(oracleContainer.getDatabaseName()); + dataSource.setUser(oracleContainer.getUsername()); + dataSource.setPassword(oracleContainer.getPassword()); + dataSource.setDriverType("thin"); + return dataSource; + } + + + protected Map basicConnectorConfig() { + final HashMap config = new HashMap<>(); + config.put("tasks.max", "1"); + config.put("connection.url", oracleContainer.getJdbcUrl()); + config.put("connection.user", oracleContainer.getUsername()); + config.put("connection.password", oracleContainer.getPassword()); + config.put("dialect.name", "OracleDatabaseDialect"); + config.put("key.converter", "io.confluent.connect.avro.AvroConverter"); + config.put("key.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl()); + config.put("value.converter", "io.confluent.connect.avro.AvroConverter"); + config.put("value.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl()); + return config; + } +} diff --git a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/VerifyDeleteIT.java b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/VerifyDeleteIT.java new file mode 100644 index 00000000..bafad76c --- /dev/null +++ b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/VerifyDeleteIT.java @@ -0,0 +1,263 @@ +/* + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.jdbc.oracle; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +import io.aiven.connect.jdbc.JdbcSinkConnector; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.assertj.db.type.Table; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.db.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +public class VerifyDeleteIT extends AbstractOracleIT { + + private static final String TEST_TOPIC_NAME = "SINK_TOPIC"; + private static final String CONNECTOR_NAME = "test-sink-connector"; + private static final int TEST_TOPIC_PARTITIONS = 1; + + private static final Schema VALUE_RECORD_SCHEMA = new Schema.Parser().parse("{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"record\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"name\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value\",\n" + + " \"type\": \"string\"\n" + + " }\n" + + " ]\n" + + "}"); + + private static final String CREATE_TABLE = String.format("CREATE TABLE \"%s\" (\n" + + " \"id\" NUMBER NOT NULL,\n" + + " \"name\" VARCHAR2(255) NOT NULL,\n" + + " \"value\" VARCHAR2(255) NOT NULL,\n" + + "PRIMARY KEY(\"id\")" + + ")", TEST_TOPIC_NAME); + + private Map sinkConnectorConfigForDelete() { + final Map config = basicConnectorConfig(); + config.put("name", CONNECTOR_NAME); + config.put("connector.class", JdbcSinkConnector.class.getName()); + config.put("topics", TEST_TOPIC_NAME); + config.put("pk.mode", "record_key"); + config.put("pk.fields", "id"); // assigned name for the primitive key + config.put("delete.enabled", String.valueOf(true)); + return config; + } + + private ProducerRecord createRecord( + final int id, final int partition, final String name, final String value) { + final GenericRecord record = new GenericData.Record(VALUE_RECORD_SCHEMA); + record.put("name", name); + record.put("value", value); + return new ProducerRecord<>(TEST_TOPIC_NAME, partition, String.valueOf(id), record); + } + + private ProducerRecord createTombstoneRecord( + final int id, final int partition) { + return new ProducerRecord<>(TEST_TOPIC_NAME, partition, String.valueOf(id), null); + } + + private void sendTestData(final int numberOfRecords) throws InterruptedException, ExecutionException { + final List> sendFutures = new ArrayList<>(); + for (int i = 0; i < numberOfRecords; i++) { + for (int partition = 0; partition < TEST_TOPIC_PARTITIONS; partition++) { + final String recordName = "user-" + i; + final String recordValue = "value-" + i; + final ProducerRecord msg = createRecord(i, partition, recordName, recordValue); + sendFutures.add(producer.send(msg)); + } + } + producer.flush(); + for (final Future sendFuture : sendFutures) { + sendFuture.get(); + } + } + + private void sendTestDataWithTombstone(final int numberOfRecords) throws InterruptedException, ExecutionException { + final List> sendFutures = new ArrayList<>(); + for (int i = 0; i < numberOfRecords; i++) { + for (int partition = 0; partition < TEST_TOPIC_PARTITIONS; partition++) { + final ProducerRecord record = createTombstoneRecord(i, partition); + sendFutures.add(producer.send(record)); + } + } + producer.flush(); + for (final Future sendFuture : sendFutures) { + sendFuture.get(); + } + } + + private void sendMixedTestDataWithTombstone(final int numberOfRecords, final int numberOfTombstoneRecords) + throws InterruptedException, ExecutionException { + final List> sendFutures = new ArrayList<>(); + for (int i = 0; i < numberOfRecords; i++) { + for (int partition = 0; partition < TEST_TOPIC_PARTITIONS; partition++) { + final String recordName = "user-" + i; + final String recordValue = "value-" + i; + final ProducerRecord msg = createRecord( + i, partition, recordName, recordValue); + sendFutures.add(producer.send(msg)); + if (i < numberOfTombstoneRecords) { + final ProducerRecord record = createTombstoneRecord(i, partition); + sendFutures.add(producer.send(record)); + } + } + } + + producer.flush(); + for (final Future sendFuture : sendFutures) { + sendFuture.get(); + } + } + + @BeforeEach + public void beforeEach() throws Exception { + executeSqlStatement(CREATE_TABLE); + createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name + } + + @Test + public void testDeleteTombstoneRecord() throws Exception { + // Test deleting records using tombstone records + connectRunner.createConnector(sinkConnectorConfigForDelete()); + sendTestData(3); + + await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(5)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(3); + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID") + .value().isEqualTo("0") + .value().isEqualTo("1") + .value().isEqualTo("2"); + }); + + // Send test data to Kafka topic (including a tombstone record) + sendTestDataWithTombstone(1); + + await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(5)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(2); + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID") + .value().isEqualTo("1") + .value().isEqualTo("2"); + }); + } + + @Test + public void testWithJustTombstoneRecordInInsertMode() throws Exception { + // Test behavior with only tombstone records in insert mode + + final Map config = sinkConnectorConfigForDelete(); + connectRunner.createConnector(config); + + sendTestDataWithTombstone(2); + + // TODO: Instead of sleeping for a fixed interval, + // wait for the connector to commit offsets for the records we sent + Thread.sleep(5_000); // Give the connector at least five seconds to read our tombstone messages from Kafka + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(0); + } + + @Test + public void testDeleteTombstoneRecordWithMultiMode() throws Exception { + // Test deleting records using tombstone records with multi-insert mode enabled + + final Map config = sinkConnectorConfigForDelete(); + config.put("insert.mode", "MULTI"); + connectRunner.createConnector(config); + + sendTestData(5); + + await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(5)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(5); + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID") + .value().isEqualTo("0") + .value().isEqualTo("1") + .value().isEqualTo("2") + .value().isEqualTo("3") + .value().isEqualTo("4"); + }); + + sendTestDataWithTombstone(1); + + await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(5)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(4); + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID") + .value().isEqualTo("1") + .value().isEqualTo("2") + .value().isEqualTo("3") + .value().isEqualTo("4"); + }); + } + + @Test + public void testWithJustTombstoneRecordWithMultiMode() throws Exception { + // Test behavior with only tombstone records in multi-insert mode + + final Map config = sinkConnectorConfigForDelete(); + config.put("insert.mode", "MULTI"); + connectRunner.createConnector(config); + + sendTestDataWithTombstone(2); + + // TODO: Instead of sleeping for a fixed interval, + // wait for the connector to commit offsets for the records we sent + Thread.sleep(5_000); // Give the connector at least five seconds to read our tombstone messages from Kafka + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(0); + } + + @Test + public void testMixTombstoneRecordsWithMultiMode() throws Exception { + // Test behavior with mixed tombstone and insert records in multi-insert mode + + final Map config = sinkConnectorConfigForDelete(); + config.put("insert.mode", "MULTI"); + connectRunner.createConnector(config); + + sendMixedTestDataWithTombstone(5, 2); + + await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(5)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(3); + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID") + .value().isEqualTo("2") + .value().isEqualTo("3") + .value().isEqualTo("4"); + }); + } +} diff --git a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/VerifyInsertIT.java b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/VerifyInsertIT.java new file mode 100644 index 00000000..7b5e7e38 --- /dev/null +++ b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/VerifyInsertIT.java @@ -0,0 +1,213 @@ +/* + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.jdbc.oracle; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +import io.aiven.connect.jdbc.JdbcSinkConnector; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.assertj.db.type.Table; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.db.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +public class VerifyInsertIT extends AbstractOracleIT { + + private static final String TEST_TOPIC_NAME = "SINK_TOPIC"; + private static final String CONNECTOR_NAME = "test-sink-connector"; + private static final int TEST_TOPIC_PARTITIONS = 1; + private static final Schema VALUE_RECORD_SCHEMA = new Schema.Parser().parse("{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"record\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"id\",\n" + + " \"type\": \"int\"\n" + + " },\n" + + " {\n" + + " \"name\": \"name\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value\",\n" + + " \"type\": \"string\"\n" + + " }\n" + + " ]\n" + + "}"); + private static final String CREATE_TABLE_WITH_PK = String.format("CREATE TABLE \"%s\" (\n" + + " \"id\" NUMBER NOT NULL,\n" + + " \"name\" VARCHAR2(255) NOT NULL,\n" + + " \"value\" VARCHAR2(255) NOT NULL,\n" + + "PRIMARY KEY(\"id\")" + + ")", TEST_TOPIC_NAME); + private static final String CREATE_TABLE = String.format("CREATE TABLE \"%s\" (\n" + + " \"id\" NUMBER NOT NULL,\n" + + " \"name\" VARCHAR2(255) NOT NULL,\n" + + " \"value\" VARCHAR2(255) NOT NULL\n" + + ")", TEST_TOPIC_NAME); + + + private Map basicSinkConnectorConfig() { + final Map config = basicConnectorConfig(); + config.put("name", CONNECTOR_NAME); + config.put("connector.class", JdbcSinkConnector.class.getName()); + config.put("topics", TEST_TOPIC_NAME); + return config; + } + + private Map sinkConnectorConfigWith_PKModeRecordKey() { + final Map config = basicSinkConnectorConfig(); + config.put("pk.mode", "record_key"); + config.put("pk.fields", "id"); // assigned name for the primitive key + return config; + } + + private Map sinkConnectorConfigWith_PKModeRecordKey_DeleteEnabled() { + final Map config = sinkConnectorConfigWith_PKModeRecordKey(); + config.put("delete.enabled", String.valueOf(true)); + return config; + } + + private Map sinkConnectorConfigWith_PKModeRecordKey_DeleteEnabled_InsertModeMulti() { + final Map config = sinkConnectorConfigWith_PKModeRecordKey_DeleteEnabled(); + config.put("insert.mode", "MULTI"); + return config; + } + + private ProducerRecord createRecord( + final int id, final int partition, final String name, final String value) { + final GenericRecord record = new GenericData.Record(VALUE_RECORD_SCHEMA); + record.put("id", id); + record.put("name", name); + record.put("value", value); + return new ProducerRecord<>(TEST_TOPIC_NAME, partition, String.valueOf(id), record); + } + + private void sendTestData(final int numberOfRecords) throws InterruptedException, ExecutionException { + final List> sendFutures = new ArrayList<>(); + for (int i = 0; i < numberOfRecords; i++) { + for (int partition = 0; partition < TEST_TOPIC_PARTITIONS; partition++) { + final String recordName = "user-" + i; + final String recordValue = "value-" + i; + final ProducerRecord msg = createRecord(i, partition, recordName, recordValue); + sendFutures.add(producer.send(msg)); + } + } + producer.flush(); + for (final Future sendFuture : sendFutures) { + sendFuture.get(); + } + } + + @BeforeEach + public void beforeEach() throws Exception { + createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name + } + + @Test + public void testSinkConnector() throws Exception { + // Test basic sink connector functionality + executeSqlStatement(CREATE_TABLE); + + // Start the sink connector + connectRunner.createConnector(basicSinkConnectorConfig()); + + // Send test data to Kafka topic + sendTestData(1); + + await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(5)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(1); + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID") + .value().isEqualTo("0"); + }); + } + + @Test + public void testSinkWithPKModeRecordKeyConnector() throws Exception { + // Test sink connector functionality with primary key mode set to record key + executeSqlStatement(CREATE_TABLE_WITH_PK); + + // Start the sink connector + connectRunner.createConnector(sinkConnectorConfigWith_PKModeRecordKey()); + + // Send test data to Kafka topic + sendTestData(1); + + await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(5)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(1); + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID") + .value().isEqualTo("0"); + }); + } + + @Test + public void testSinkConnectorInDeleteMode() throws Exception { + // Test sink connector functionality with delete mode enabled + executeSqlStatement(CREATE_TABLE); + + // Start the sink connector + connectRunner.createConnector(sinkConnectorConfigWith_PKModeRecordKey_DeleteEnabled()); + + // Send test data to Kafka topic + sendTestData(1); + + await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(5)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(1); + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID") + .value().isEqualTo("0"); + }); + } + + @Test + public void testMultiInsertInDeleteMode() throws Exception { + // Test multi-insert functionality in delete mode + executeSqlStatement(CREATE_TABLE); + + // Start the sink connector + final Map config = sinkConnectorConfigWith_PKModeRecordKey_DeleteEnabled_InsertModeMulti(); + connectRunner.createConnector(config); + + sendTestData(5); + + await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(5)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(5); + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID") + .value().isEqualTo("0") + .value().isEqualTo("1") + .value().isEqualTo("2") + .value().isEqualTo("3") + .value().isEqualTo("4"); + }); + } +} diff --git a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/AbstractPostgresIT.java b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/AbstractPostgresIT.java index 2f319789..a28773c9 100644 --- a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/AbstractPostgresIT.java +++ b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/AbstractPostgresIT.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,10 +43,10 @@ public class AbstractPostgresIT extends AbstractIT { protected final PostgreSQLContainer postgreSqlContainer = new PostgreSQLContainer<>(DEFAULT_POSTGRES_IMAGE_NAME) .withStartupTimeout(CONTAINER_STARTUP_TIMEOUT); - protected void executeUpdate(final String updateStatement) throws SQLException { + protected void executeSqlStatement(final String sqlStatement) throws SQLException { try (final Connection connection = getDatasource().getConnection(); final Statement statement = connection.createStatement()) { - statement.executeUpdate(updateStatement); + statement.executeUpdate(sqlStatement); } } diff --git a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/PartitionedTableIntegrationTest.java b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/PartitionedTableIntegrationTest.java index 9fe9d1ba..2f2d7c35 100644 --- a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/PartitionedTableIntegrationTest.java +++ b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/PartitionedTableIntegrationTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -71,7 +71,7 @@ public class PartitionedTableIntegrationTest extends AbstractPostgresIT { @Test final void testBasicDelivery() throws ExecutionException, InterruptedException, SQLException { - executeUpdate(CREATE_TABLE); + executeSqlStatement(CREATE_TABLE); connectRunner.createConnector(basicSinkConnectorConfig()); sendTestData(1000); @@ -82,8 +82,8 @@ final void testBasicDelivery() throws ExecutionException, InterruptedException, @Test final void testBasicDeliveryForPartitionedTable() throws ExecutionException, InterruptedException, SQLException { - executeUpdate(CREATE_TABLE_WITH_PARTITION); - executeUpdate(CREATE_PARTITION); + executeSqlStatement(CREATE_TABLE_WITH_PARTITION); + executeSqlStatement(CREATE_PARTITION); connectRunner.createConnector(basicSinkConnectorConfig()); sendTestData(1000); diff --git a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/UnqualifiedTableNamesIntegrationTest.java b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/UnqualifiedTableNamesIntegrationTest.java index efd2b6e1..09747131 100644 --- a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/UnqualifiedTableNamesIntegrationTest.java +++ b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/UnqualifiedTableNamesIntegrationTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -69,10 +69,10 @@ public void testSingleTable() throws Exception { // Make sure that the topic starts empty assertEmptyPoll(Duration.ofSeconds(1)); - executeUpdate(createSchema(PREFERRED_SCHEMA)); - executeUpdate(setSearchPath(postgreSqlContainer.getDatabaseName())); - executeUpdate(CREATE_PREFERRED_TABLE); - executeUpdate(POPULATE_PREFERRED_TABLE); + executeSqlStatement(createSchema(PREFERRED_SCHEMA)); + executeSqlStatement(setSearchPath(postgreSqlContainer.getDatabaseName())); + executeSqlStatement(CREATE_PREFERRED_TABLE); + executeSqlStatement(POPULATE_PREFERRED_TABLE); connectRunner.createConnector(basicTimestampModeSourceConnectorConfig()); await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(100)) @@ -107,19 +107,19 @@ private void testMultipleTables(final Map connectorConfig) throw // Make sure that the topic starts empty assertEmptyPoll(Duration.ofSeconds(1)); - executeUpdate(createSchema(PREFERRED_SCHEMA)); - executeUpdate(createSchema(OTHER_SCHEMA)); - executeUpdate(setSearchPath(postgreSqlContainer.getDatabaseName())); - executeUpdate(CREATE_PREFERRED_TABLE); - executeUpdate(POPULATE_PREFERRED_TABLE); - executeUpdate(CREATE_OTHER_TABLE); - executeUpdate(POPULATE_OTHER_TABLE); + executeSqlStatement(createSchema(PREFERRED_SCHEMA)); + executeSqlStatement(createSchema(OTHER_SCHEMA)); + executeSqlStatement(setSearchPath(postgreSqlContainer.getDatabaseName())); + executeSqlStatement(CREATE_PREFERRED_TABLE); + executeSqlStatement(POPULATE_PREFERRED_TABLE); + executeSqlStatement(CREATE_OTHER_TABLE); + executeSqlStatement(POPULATE_OTHER_TABLE); connectRunner.createConnector(connectorConfig); await().atMost(Duration.ofSeconds(5)).pollInterval(Duration.ofMillis(100)) .until(this::assertSingleNewRecordProduced); - executeUpdate(POPULATE_OTHER_TABLE); + executeSqlStatement(POPULATE_OTHER_TABLE); // Make sure that, even after adding another row to the other table, the connector // doesn't publish any new records @@ -127,7 +127,7 @@ private void testMultipleTables(final Map connectorConfig) throw // Add one more row to the preferred table, and verify that the connector // is able to read it - executeUpdate(POPULATE_PREFERRED_TABLE); + executeSqlStatement(POPULATE_PREFERRED_TABLE); await().atMost(Duration.ofSeconds(5)).pollInterval(Duration.ofMillis(100)) .until(this::assertSingleNewRecordProduced); @@ -136,7 +136,7 @@ private void testMultipleTables(final Map connectorConfig) throw // Add one more row to the preferred table, and verify that the connector // is able to read it - executeUpdate(POPULATE_PREFERRED_TABLE); + executeSqlStatement(POPULATE_PREFERRED_TABLE); await().atMost(Duration.ofSeconds(5)).pollInterval(Duration.ofMillis(100)) .until(this::assertSingleNewRecordProduced); diff --git a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/VerifyDeleteIT.java b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/VerifyDeleteIT.java new file mode 100644 index 00000000..1fd6ef5c --- /dev/null +++ b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/VerifyDeleteIT.java @@ -0,0 +1,263 @@ +/* + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.jdbc.postgres; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +import io.aiven.connect.jdbc.JdbcSinkConnector; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.assertj.db.type.Table; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.db.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +public class VerifyDeleteIT extends AbstractPostgresIT { + + private static final String TEST_TOPIC_NAME = "sink_topic"; + private static final String CONNECTOR_NAME = "test-sink-connector"; + private static final int TEST_TOPIC_PARTITIONS = 1; + + private static final Schema VALUE_RECORD_SCHEMA = new Schema.Parser().parse("{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"record\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"name\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value\",\n" + + " \"type\": \"string\"\n" + + " }\n" + + " ]\n" + + "}"); + + private static final String CREATE_TABLE = String.format("CREATE TABLE \"%s\" (\n" + + " \"id\" VARCHAR(30) NOT NULL,\n" + + " \"name\" VARCHAR(255) NOT NULL,\n" + + " \"value\" VARCHAR(255) NOT NULL,\n" + + "PRIMARY KEY(\"id\")" + + ")", TEST_TOPIC_NAME); + + private Map sinkConnectorConfigForDelete() { + final Map config = basicConnectorConfig(); + config.put("name", CONNECTOR_NAME); + config.put("connector.class", JdbcSinkConnector.class.getName()); + config.put("topics", TEST_TOPIC_NAME); + config.put("pk.mode", "record_key"); + config.put("pk.fields", "id"); // assigned name for the primitive key + config.put("delete.enabled", String.valueOf(true)); + return config; + } + + private ProducerRecord createRecord( + final int id, final int partition, final String name, final String value) { + final GenericRecord record = new GenericData.Record(VALUE_RECORD_SCHEMA); + record.put("name", name); + record.put("value", value); + return new ProducerRecord<>(TEST_TOPIC_NAME, partition, String.valueOf(id), record); + } + + private ProducerRecord createTombstoneRecord( + final int id, final int partition) { + return new ProducerRecord<>(TEST_TOPIC_NAME, partition, String.valueOf(id), null); + } + + private void sendTestData(final int numberOfRecords) throws InterruptedException, ExecutionException { + final List> sendFutures = new ArrayList<>(); + for (int i = 0; i < numberOfRecords; i++) { + for (int partition = 0; partition < TEST_TOPIC_PARTITIONS; partition++) { + final String recordName = "user-" + i; + final String recordValue = "value-" + i; + final ProducerRecord msg = createRecord(i, partition, recordName, recordValue); + sendFutures.add(producer.send(msg)); + } + } + producer.flush(); + for (final Future sendFuture : sendFutures) { + sendFuture.get(); + } + } + + private void sendTestDataWithTombstone(final int numberOfRecords) throws InterruptedException, ExecutionException { + final List> sendFutures = new ArrayList<>(); + for (int i = 0; i < numberOfRecords; i++) { + for (int partition = 0; partition < TEST_TOPIC_PARTITIONS; partition++) { + final ProducerRecord record = createTombstoneRecord(i, partition); + sendFutures.add(producer.send(record)); + } + } + producer.flush(); + for (final Future sendFuture : sendFutures) { + sendFuture.get(); + } + } + + private void sendMixedTestDataWithTombstone(final int numberOfRecords, final int numberOfTombstoneRecords) + throws InterruptedException, ExecutionException { + final List> sendFutures = new ArrayList<>(); + for (int i = 0; i < numberOfRecords; i++) { + for (int partition = 0; partition < TEST_TOPIC_PARTITIONS; partition++) { + final String recordName = "user-" + i; + final String recordValue = "value-" + i; + final ProducerRecord msg = createRecord( + i, partition, recordName, recordValue); + sendFutures.add(producer.send(msg)); + if (i < numberOfTombstoneRecords) { + final ProducerRecord record = createTombstoneRecord(i, partition); + sendFutures.add(producer.send(record)); + } + } + } + + producer.flush(); + for (final Future sendFuture : sendFutures) { + sendFuture.get(); + } + } + + @BeforeEach + public void beforeEach() throws Exception { + executeSqlStatement(CREATE_TABLE); + createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name + } + + @Test + public void testDeleteTombstoneRecord() throws Exception { + // Test deleting records using tombstone records + connectRunner.createConnector(sinkConnectorConfigForDelete()); + sendTestData(3); + + await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(5)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(3); + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID") + .value().isEqualTo("0") + .value().isEqualTo("1") + .value().isEqualTo("2"); + }); + + // Send test data to Kafka topic (including a tombstone record) + sendTestDataWithTombstone(1); + + await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(5)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(2); + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID") + .value().isEqualTo("1") + .value().isEqualTo("2"); + }); + } + + @Test + public void testWithJustTombstoneRecordInInsertMode() throws Exception { + // Test behavior with only tombstone records in insert mode + + final Map config = sinkConnectorConfigForDelete(); + connectRunner.createConnector(config); + + sendTestDataWithTombstone(2); + + // TODO: Instead of sleeping for a fixed interval, + // wait for the connector to commit offsets for the records we sent + Thread.sleep(5_000); // Give the connector at least five seconds to read our tombstone messages from Kafka + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(0); + } + + @Test + public void testDeleteTombstoneRecordWithMultiMode() throws Exception { + // Test deleting records using tombstone records with multi-insert mode enabled + + final Map config = sinkConnectorConfigForDelete(); + config.put("insert.mode", "MULTI"); + connectRunner.createConnector(config); + + sendTestData(5); + + await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(5)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(5); + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID") + .value().isEqualTo("0") + .value().isEqualTo("1") + .value().isEqualTo("2") + .value().isEqualTo("3") + .value().isEqualTo("4"); + }); + + sendTestDataWithTombstone(1); + + await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(5)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(4); + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID") + .value().isEqualTo("1") + .value().isEqualTo("2") + .value().isEqualTo("3") + .value().isEqualTo("4"); + }); + } + + @Test + public void testWithJustTombstoneRecordWithMultiMode() throws Exception { + // Test behavior with only tombstone records in multi-insert mode + + final Map config = sinkConnectorConfigForDelete(); + config.put("insert.mode", "MULTI"); + connectRunner.createConnector(config); + + sendTestDataWithTombstone(2); + + // TODO: Instead of sleeping for a fixed interval, + // wait for the connector to commit offsets for the records we sent + Thread.sleep(5_000); // Give the connector at least five seconds to read our tombstone messages from Kafka + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(0); + } + + @Test + public void testMixTombstoneRecordsWithMultiMode() throws Exception { + // Test behavior with mixed tombstone and insert records in multi-insert mode + + final Map config = sinkConnectorConfigForDelete(); + config.put("insert.mode", "MULTI"); + connectRunner.createConnector(config); + + sendMixedTestDataWithTombstone(5, 2); + + await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(5)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(3); + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID") + .value().isEqualTo("2") + .value().isEqualTo("3") + .value().isEqualTo("4"); + }); + } +} diff --git a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/VerifyInsertIT.java b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/VerifyInsertIT.java new file mode 100644 index 00000000..c4436f64 --- /dev/null +++ b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/VerifyInsertIT.java @@ -0,0 +1,212 @@ +/* + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.jdbc.postgres; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +import io.aiven.connect.jdbc.JdbcSinkConnector; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.assertj.db.type.Table; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.db.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +public class VerifyInsertIT extends AbstractPostgresIT { + + private static final String TEST_TOPIC_NAME = "sink_topic"; + private static final String CONNECTOR_NAME = "test-sink-connector"; + private static final int TEST_TOPIC_PARTITIONS = 1; + private static final Schema VALUE_RECORD_SCHEMA = new Schema.Parser().parse("{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"record\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"id\",\n" + + " \"type\": \"int\"\n" + + " },\n" + + " {\n" + + " \"name\": \"name\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value\",\n" + + " \"type\": \"string\"\n" + + " }\n" + + " ]\n" + + "}"); + private static final String CREATE_TABLE_WITH_PK = String.format("CREATE TABLE \"%s\" (\n" + + " \"id\" VARCHAR(30) NOT NULL PRIMARY KEY,\n" + + " \"name\" VARCHAR(255) NOT NULL,\n" + + " \"value\" VARCHAR(255) NOT NULL\n" + + ")", TEST_TOPIC_NAME); + private static final String CREATE_TABLE = String.format("CREATE TABLE \"%s\" (\n" + + " \"id\" VARCHAR(30) NOT NULL,\n" + + " \"name\" VARCHAR(255) NOT NULL,\n" + + " \"value\" VARCHAR(255) NOT NULL\n" + + ")", TEST_TOPIC_NAME); + + + private Map basicSinkConnectorConfig() { + final Map config = basicConnectorConfig(); + config.put("name", CONNECTOR_NAME); + config.put("connector.class", JdbcSinkConnector.class.getName()); + config.put("topics", TEST_TOPIC_NAME); + return config; + } + + private Map sinkConnectorConfigWith_PKModeRecordKey() { + final Map config = basicSinkConnectorConfig(); + config.put("pk.mode", "record_key"); + config.put("pk.fields", "id"); // assigned name for the primitive key + return config; + } + + private Map sinkConnectorConfigWith_PKModeRecordKey_DeleteEnabled() { + final Map config = sinkConnectorConfigWith_PKModeRecordKey(); + config.put("delete.enabled", String.valueOf(true)); + return config; + } + + private Map sinkConnectorConfigWith_PKModeRecordKey_DeleteEnabled_InsertModeMulti() { + final Map config = sinkConnectorConfigWith_PKModeRecordKey_DeleteEnabled(); + config.put("insert.mode", "MULTI"); + return config; + } + + private ProducerRecord createRecord( + final int id, final int partition, final String name, final String value) { + final GenericRecord record = new GenericData.Record(VALUE_RECORD_SCHEMA); + record.put("id", id); + record.put("name", name); + record.put("value", value); + return new ProducerRecord<>(TEST_TOPIC_NAME, partition, String.valueOf(id), record); + } + + private void sendTestData(final int numberOfRecords) throws InterruptedException, ExecutionException { + final List> sendFutures = new ArrayList<>(); + for (int i = 0; i < numberOfRecords; i++) { + for (int partition = 0; partition < TEST_TOPIC_PARTITIONS; partition++) { + final String recordName = "user-" + i; + final String recordValue = "value-" + i; + final ProducerRecord msg = createRecord(i, partition, recordName, recordValue); + sendFutures.add(producer.send(msg)); + } + } + producer.flush(); + for (final Future sendFuture : sendFutures) { + sendFuture.get(); + } + } + + @BeforeEach + public void beforeEach() throws Exception { + createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name + } + + @Test + public void testSinkConnector() throws Exception { + // Test basic sink connector functionality + executeSqlStatement(CREATE_TABLE); + + // Start the sink connector + connectRunner.createConnector(basicSinkConnectorConfig()); + + // Send test data to Kafka topic + sendTestData(1); + + await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(5)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(1); + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID") + .value().isEqualTo("0"); + }); + } + + @Test + public void testSinkWithPKModeRecordKeyConnector() throws Exception { + // Test sink connector functionality with primary key mode set to record key + executeSqlStatement(CREATE_TABLE_WITH_PK); + + // Start the sink connector + connectRunner.createConnector(sinkConnectorConfigWith_PKModeRecordKey()); + + // Send test data to Kafka topic + sendTestData(1); + + await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(5)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(1); + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID") + .value().isEqualTo("0"); + }); + } + + @Test + public void testSinkConnectorInDeleteMode() throws Exception { + // Test sink connector functionality with delete mode enabled + executeSqlStatement(CREATE_TABLE); + + // Start the sink connector + connectRunner.createConnector(sinkConnectorConfigWith_PKModeRecordKey_DeleteEnabled()); + + // Send test data to Kafka topic + sendTestData(1); + + await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(5)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(1); + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID") + .value().isEqualTo("0"); + }); + } + + @Test + public void testMultiInsertInDeleteMode() throws Exception { + // Test multi-insert functionality in delete mode + executeSqlStatement(CREATE_TABLE); + + // Start the sink connector + final Map config = sinkConnectorConfigWith_PKModeRecordKey_DeleteEnabled_InsertModeMulti(); + connectRunner.createConnector(config); + + sendTestData(5); + + await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(5)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(5); + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID") + .value().isEqualTo("0") + .value().isEqualTo("1") + .value().isEqualTo("2") + .value().isEqualTo("3") + .value().isEqualTo("4"); + }); + } +} diff --git a/src/main/java/io/aiven/connect/jdbc/JdbcSinkConnector.java b/src/main/java/io/aiven/connect/jdbc/JdbcSinkConnector.java index 4c84a82e..da59fb5f 100644 --- a/src/main/java/io/aiven/connect/jdbc/JdbcSinkConnector.java +++ b/src/main/java/io/aiven/connect/jdbc/JdbcSinkConnector.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors * Copyright 2016 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -69,7 +69,10 @@ public ConfigDef config() { @Override public Config validate(final Map connectorConfigs) { // TODO cross-fields validation here: pkFields against the pkMode - return super.validate(connectorConfigs); + final Config config = super.validate(connectorConfigs); + + JdbcSinkConfig.validateDeleteEnabled(config); + return config; } @Override diff --git a/src/main/java/io/aiven/connect/jdbc/dialect/DatabaseDialect.java b/src/main/java/io/aiven/connect/jdbc/dialect/DatabaseDialect.java index 2a4befdc..3ca5ce82 100644 --- a/src/main/java/io/aiven/connect/jdbc/dialect/DatabaseDialect.java +++ b/src/main/java/io/aiven/connect/jdbc/dialect/DatabaseDialect.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors * Copyright 2018 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -360,6 +360,21 @@ default String buildInsertStatement(TableId table, return buildInsertStatement(table, keyColumns, nonKeyColumns); } + /** + * Build the DELETE prepared statement expression for the given table and its columns. + * + * @param table the identifier of the table; may not be null + * @param keyColumns the identifiers of the columns in the primary/unique key; may not be null + * but may be empty + * @return the DELETE statement; may not be null + * @throws UnsupportedOperationException if the dialect does not support delete + */ + default String buildDeleteStatement(TableId table, + int records, + Collection keyColumns) { + throw new UnsupportedOperationException(); + } + /** * Build the UPDATE prepared statement expression for the given table and its columns. Variables * for each key column should also appear in the WHERE clause of the statement. @@ -524,6 +539,17 @@ default void bindRecord(SinkRecord record) throws SQLException { * @throws SQLException if there is a problem binding values into the statement */ int bindRecord(int index, SinkRecord record) throws SQLException; + + /** + * Bind the values in the supplied tombstone record. + * + * @param record the sink record with values to be bound into the statement; never null + * @throws SQLException if there is a problem binding values into the statement + * @throws UnsupportedOperationException if binding values is not supported + */ + default void bindTombstoneRecord(SinkRecord record) throws SQLException, UnsupportedOperationException { + throw new UnsupportedOperationException(); + } } /** diff --git a/src/main/java/io/aiven/connect/jdbc/dialect/GenericDatabaseDialect.java b/src/main/java/io/aiven/connect/jdbc/dialect/GenericDatabaseDialect.java index 1ccd2744..57acbec6 100644 --- a/src/main/java/io/aiven/connect/jdbc/dialect/GenericDatabaseDialect.java +++ b/src/main/java/io/aiven/connect/jdbc/dialect/GenericDatabaseDialect.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors * Copyright 2018 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -1398,6 +1398,30 @@ public String buildMultiInsertStatement(final TableId table, return insertStatement + allRowsPlaceholder; } + @Override + public String buildDeleteStatement( + final TableId table, + final int records, + final Collection keyColumns + ) { + if (records < 1) { + throw new IllegalArgumentException("number of records must be a positive number, but got: " + records); + } + if (isEmpty(keyColumns)) { + throw new IllegalArgumentException("no columns specified"); + } + requireNonNull(table, "table must not be null"); + final ExpressionBuilder builder = expressionBuilder(); + builder.append("DELETE FROM "); + builder.append(table); + builder.append(" WHERE "); + builder.appendList() + .delimitedBy(" AND ") + .transformedBy(ExpressionBuilder.columnNamesWith(" = ?")) + .of(keyColumns); + return builder.toString(); + } + @Override public String buildUpdateStatement( final TableId table, diff --git a/src/main/java/io/aiven/connect/jdbc/sink/BufferedRecords.java b/src/main/java/io/aiven/connect/jdbc/sink/BufferedRecords.java index 9d93bdea..305db362 100644 --- a/src/main/java/io/aiven/connect/jdbc/sink/BufferedRecords.java +++ b/src/main/java/io/aiven/connect/jdbc/sink/BufferedRecords.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors * Copyright 2016 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -52,13 +52,17 @@ public class BufferedRecords { private final DbStructure dbStructure; private final Connection connection; - private List records = new ArrayList<>(); + private final List records = new ArrayList<>(); + private final List tombstoneRecords = new ArrayList<>(); private SchemaPair currentSchemaPair; private FieldsMetadata fieldsMetadata; private TableDefinition tableDefinition; private PreparedStatement preparedStatement; private StatementBinder preparedStatementBinder; + private PreparedStatement deletePreparedStatement; + private StatementBinder deletePreparedStatementBinder; + public BufferedRecords( final JdbcSinkConfig config, final TableId tableId, @@ -86,12 +90,18 @@ public List add(final SinkRecord record) throws SQLException { } final List flushed; - if (currentSchemaPair.equals(schemaPair)) { + // Skip the schemaPair check for all tombstone records or the current schema pair matches + if (record.value() == null || currentSchemaPair.equals(schemaPair)) { // Continue with current batch state - records.add(record); - if (records.size() >= config.batchSize) { - log.debug("Flushing buffered records after exceeding configured batch size {}.", - config.batchSize); + if (config.deleteEnabled && isTombstone(record)) { + tombstoneRecords.add(record); + } else { + records.add(record); + } + if (records.size() + tombstoneRecords.size() >= config.batchSize) { + log.debug("Flushing buffered records {} and tombstone records {} " + + "after exceeding the configured batch size of {}.", + records.size(), tombstoneRecords.size(), config.batchSize); flushed = flush(); } else { flushed = Collections.emptyList(); @@ -109,25 +119,33 @@ public List add(final SinkRecord record) throws SQLException { } private void prepareStatement() throws SQLException { - final String sql; - log.debug("Generating query for insert mode {} and {} records", config.insertMode, records.size()); - if (config.insertMode == MULTI) { - sql = getMultiInsertSql(); - } else { - sql = getInsertSql(); + close(); + if (!records.isEmpty()) { + final String insertSql = config.insertMode == MULTI ? getMultiInsertSql() : getInsertSql(); + log.debug("Prepared SQL for insert mode {} with {} records: {}", + config.insertMode, records.size(), insertSql); + preparedStatement = connection.prepareStatement(insertSql); + preparedStatementBinder = dbDialect.statementBinder( + preparedStatement, + config.pkMode, + currentSchemaPair, + fieldsMetadata, + config.insertMode + ); } - log.debug("Prepared SQL {} for insert mode {}", sql, config.insertMode); - - close(); - preparedStatement = connection.prepareStatement(sql); - preparedStatementBinder = dbDialect.statementBinder( - preparedStatement, - config.pkMode, - currentSchemaPair, - fieldsMetadata, - config.insertMode - ); + if (!tombstoneRecords.isEmpty()) { + final String deleteSql = getDeleteSql(); + log.debug("Prepared SQL for tombstone with {} records: {}", records.size(), deleteSql); + deletePreparedStatement = connection.prepareStatement(deleteSql); + deletePreparedStatementBinder = dbDialect.statementBinder( + deletePreparedStatement, + config.pkMode, + currentSchemaPair, + fieldsMetadata, + config.insertMode + ); + } } /** @@ -153,66 +171,103 @@ private void reInitialize(final SchemaPair schemaPair) throws SQLException { } public List flush() throws SQLException { - if (records.isEmpty()) { - log.debug("Records is empty"); + if (records.isEmpty() && tombstoneRecords.isEmpty()) { + log.debug("Records and tombstone records are empty."); return new ArrayList<>(); } + prepareStatement(); bindRecords(); - int totalUpdateCount = 0; + processBatch(records, "regular"); + processBatch(tombstoneRecords, "tombstone"); + + final List flushedRecords = new ArrayList<>(records); + flushedRecords.addAll(tombstoneRecords); + + records.clear(); + tombstoneRecords.clear(); + + return flushedRecords; + } + + private void processBatch(final List batchRecords, final String recordType) throws SQLException { + if (batchRecords.isEmpty()) { + log.debug("No {} records to process.", recordType); + return; + } + + int totalSuccessfulExecutionCount = 0; boolean successNoInfo = false; - log.debug("Executing batch..."); - for (final int updateCount : executeBatch()) { + log.debug("Executing {} record batch...", recordType); + final int[] updateCounts = recordType.equals("tombstone") ? executeDeleteBatch() : executeBatch(); + for (final int updateCount : updateCounts) { if (updateCount == Statement.SUCCESS_NO_INFO) { successNoInfo = true; - continue; + } else { + totalSuccessfulExecutionCount += updateCount; } - totalUpdateCount += updateCount; } - log.debug("Done executing batch."); - if (totalUpdateCount != records.size() && !successNoInfo) { + + log.debug("Done executing {} record batch.", recordType); + verifySuccessfulExecutions(totalSuccessfulExecutionCount, batchRecords, successNoInfo, recordType); + } + + private void verifySuccessfulExecutions(final int totalSuccessfulExecutionCount, + final List batchRecords, + final boolean successNoInfo, + final String recordType) { + if (totalSuccessfulExecutionCount != batchRecords.size() && !successNoInfo) { switch (config.insertMode) { case INSERT: case MULTI: throw new ConnectException(String.format( - "Update count (%d) did not sum up to total number of records inserted (%d)", - totalUpdateCount, - records.size() + "Update count (%d) did not sum up to total number of %s records (%d)", + totalSuccessfulExecutionCount, + recordType, + batchRecords.size() )); case UPSERT: case UPDATE: - log.debug( - "{} records:{} resulting in in totalUpdateCount:{}", + log.debug("{} {} records:{} resulting in totalSuccessfulExecutionCount:{}", config.insertMode, - records.size(), - totalUpdateCount + recordType, + batchRecords.size(), + totalSuccessfulExecutionCount ); break; default: throw new ConnectException("Unknown insert mode: " + config.insertMode); } } + if (successNoInfo) { - log.info( - "{} records:{} , but no count of the number of rows it affected is available", + log.info("{} {} records:{} , but no count of the number of rows it affected is available", config.insertMode, - records.size() + recordType, + batchRecords.size() ); } - - final List flushedRecords = records; - records = new ArrayList<>(); - return flushedRecords; } private int[] executeBatch() throws SQLException { - if (config.insertMode == MULTI) { - preparedStatement.addBatch(); + if (preparedStatement != null) { + if (config.insertMode == MULTI) { + preparedStatement.addBatch(); + } + log.debug("Executing batch with insert mode {}", config.insertMode); + return preparedStatement.executeBatch(); } - log.debug("Executing batch with insert mode {}", config.insertMode); - return preparedStatement.executeBatch(); + return new int[0]; + } + + private int[] executeDeleteBatch() throws SQLException { + if (deletePreparedStatement != null) { + log.debug("Executing batch delete"); + return deletePreparedStatement.executeBatch(); + } + return new int[0]; } private void bindRecords() throws SQLException { @@ -228,15 +283,30 @@ private void bindRecords() throws SQLException { preparedStatementBinder.bindRecord(record); } } + + for (final SinkRecord tombstoneRecord : tombstoneRecords) { + deletePreparedStatementBinder.bindTombstoneRecord(tombstoneRecord); + } log.debug("Done binding records."); } + private boolean isTombstone(final SinkRecord record) { + // Tombstone records are events with a null value. + return record.value() == null; + } + public void close() throws SQLException { log.info("Closing BufferedRecords with preparedStatement: {}", preparedStatement); if (preparedStatement != null) { preparedStatement.close(); preparedStatement = null; } + + log.info("Closing BufferedRecords with deletePreparedStatement: {}", deletePreparedStatement); + if (deletePreparedStatement != null) { + deletePreparedStatement.close(); + deletePreparedStatement = null; + } } private String getMultiInsertSql() { @@ -305,6 +375,12 @@ private String getInsertSql() { } } + private String getDeleteSql() { + return dbDialect.buildDeleteStatement(tableId, + tombstoneRecords.size(), + asColumns(fieldsMetadata.keyFieldNames)); + } + private Collection asColumns(final Collection names) { return names.stream() .map(name -> new ColumnId(tableId, name)) diff --git a/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java b/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java index 04be94c1..df67e1ef 100644 --- a/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java +++ b/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors * Copyright 2016 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -29,13 +29,19 @@ import java.util.TimeZone; import java.util.stream.Collectors; +import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.ConfigValue; import io.aiven.connect.jdbc.config.JdbcConfig; import io.aiven.connect.jdbc.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class JdbcSinkConfig extends JdbcConfig { + private static final Logger log = LoggerFactory.getLogger(JdbcSinkConfig.class); public enum InsertMode { INSERT, @@ -176,6 +182,12 @@ public enum PrimaryKeyMode { + " while this configuration is applicable for the other columns."; private static final String FIELDS_WHITELIST_DISPLAY = "Fields Whitelist"; + public static final String DELETE_ENABLED = "delete.enabled"; + private static final String DELETE_ENABLED_DEFAULT = "false"; + private static final String DELETE_ENABLED_DOC = + "Whether to enable the deletion of rows in the target table on tombstone messages"; + private static final String DELETE_ENABLED_DISPLAY = "Delete enabled"; + private static final ConfigDef.Range NON_NEGATIVE_INT_VALIDATOR = ConfigDef.Range.atLeast(0); private static final String WRITES_GROUP = "Writes"; @@ -218,7 +230,20 @@ public enum PrimaryKeyMode { BATCH_SIZE_DOC, WRITES_GROUP, 2, ConfigDef.Width.SHORT, - BATCH_SIZE_DISPLAY); + BATCH_SIZE_DISPLAY) + .define( + // Delete can only be enabled with delete.enabled=true, + // but only when the pk.mode is set to record_key. + // This is because deleting a row from the table + // requires the primary key be used as criteria. + DELETE_ENABLED, + ConfigDef.Type.BOOLEAN, + DELETE_ENABLED_DEFAULT, + ConfigDef.Importance.MEDIUM, + DELETE_ENABLED_DOC, WRITES_GROUP, + 3, + ConfigDef.Width.SHORT, + DELETE_ENABLED_DISPLAY); // Data Mapping CONFIG_DEF @@ -370,6 +395,7 @@ public void ensureValid(final String name, final Object value) { public final List pkFields; public final Set fieldsWhitelist; public final TimeZone timeZone; + public final boolean deleteEnabled; public JdbcSinkConfig(final Map props) { super(CONFIG_DEF, props); @@ -387,6 +413,7 @@ public JdbcSinkConfig(final Map props) { fieldsWhitelist = new HashSet<>(getList(FIELDS_WHITELIST)); final String dbTimeZone = getString(DB_TIMEZONE_CONFIG); timeZone = TimeZone.getTimeZone(ZoneId.of(dbTimeZone)); + deleteEnabled = getBoolean(DELETE_ENABLED); } static Map topicToTableMapping(final List value) { @@ -437,4 +464,22 @@ public static void main(final String... args) { System.out.println(); System.out.println(CONFIG_DEF.toEnrichedRst()); } + + public static void validateDeleteEnabled(final Config config) { + // Collect all configuration values + final Map configValues = config.configValues().stream() + .collect(Collectors.toMap(ConfigValue::name, v -> v)); + + // Check if DELETE_ENABLED is true + final ConfigValue deleteEnabledConfigValue = configValues.get(JdbcSinkConfig.DELETE_ENABLED); + final boolean deleteEnabled = (boolean) deleteEnabledConfigValue.value(); + + // Check if PK_MODE is RECORD_KEY + final ConfigValue pkModeConfigValue = configValues.get(JdbcSinkConfig.PK_MODE); + final String pkMode = (String) pkModeConfigValue.value(); + + if (deleteEnabled && !JdbcSinkConfig.PrimaryKeyMode.RECORD_KEY.name().equalsIgnoreCase(pkMode)) { + deleteEnabledConfigValue.addErrorMessage("Delete support only works with pk.mode=record_key"); + } + } } diff --git a/src/main/java/io/aiven/connect/jdbc/sink/PreparedStatementBinder.java b/src/main/java/io/aiven/connect/jdbc/sink/PreparedStatementBinder.java index f49e30a4..ecbf6cec 100644 --- a/src/main/java/io/aiven/connect/jdbc/sink/PreparedStatementBinder.java +++ b/src/main/java/io/aiven/connect/jdbc/sink/PreparedStatementBinder.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors * Copyright 2016 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -97,6 +97,11 @@ public int bindRecord(int index, final SinkRecord record) throws SQLException { return nextIndex; } + public void bindTombstoneRecord(final SinkRecord record) throws SQLException { + bindKeyFields(record, 1); + statement.addBatch(); + } + protected int bindKeyFields(final SinkRecord record, int index) throws SQLException { switch (pkMode) { case NONE: diff --git a/src/test/java/io/aiven/connect/jdbc/sink/BufferedRecordsTest.java b/src/test/java/io/aiven/connect/jdbc/sink/BufferedRecordsTest.java index ef63bd26..41e50536 100644 --- a/src/test/java/io/aiven/connect/jdbc/sink/BufferedRecordsTest.java +++ b/src/test/java/io/aiven/connect/jdbc/sink/BufferedRecordsTest.java @@ -45,6 +45,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.contains; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -190,6 +191,98 @@ public void testInsertModeUpdate() throws SQLException { } + @Test + public void testInsertModeWithDeleteEnabled() throws SQLException { + final HashMap props = new HashMap<>(); + props.put("connection.url", ""); + props.put("auto.create", true); + props.put("auto.evolve", true); + props.put("batch.size", 1000); + props.put("pk.mode", "record_key"); + props.put("pk.fields", "id"); + props.put("delete.enabled", true); + final JdbcSinkConfig config = new JdbcSinkConfig(props); + + final DatabaseDialect dbDialect = DatabaseDialects.findBestFor(dbUrl, config); + final DbStructure dbStructureMock = mock(DbStructure.class); + when(dbStructureMock.createOrAmendIfNecessary(any(JdbcSinkConfig.class), + any(Connection.class), + any(TableId.class), + any(FieldsMetadata.class))) + .thenReturn(true); + + final Connection connectionMock = mock(Connection.class); + final PreparedStatement preparedStatement = mock(PreparedStatement.class); + when(connectionMock.prepareStatement(contains("INSERT"))).thenReturn(preparedStatement); + when(preparedStatement.executeBatch()).thenReturn(new int[0]); + + final PreparedStatement deletePreparedStatement = mock(PreparedStatement.class); + when(connectionMock.prepareStatement(contains("DELETE"))).thenReturn(deletePreparedStatement); + when(deletePreparedStatement.executeBatch()).thenReturn(new int[]{1}); + + final TableId tableId = new TableId(null, null, "dummy"); + final BufferedRecords buffer = new BufferedRecords(config, tableId, dbDialect, dbStructureMock, connectionMock); + final Schema keySchema = SchemaBuilder.struct() + .field("id", SchemaBuilder.INT64_SCHEMA); + + final Struct keyStruct = new Struct(keySchema).put("id", 0L); + final Schema valueSchema = null; + final Struct valueStruct = null; + + final SinkRecord recordA = new SinkRecord("dummy-topic", 0, keySchema, keyStruct, valueSchema, valueStruct, 0); + buffer.add(recordA); + buffer.flush(); + + verify(connectionMock).prepareStatement(contains("DELETE FROM \"dummy\" WHERE \"id\" = ?")); + } + + @Test + public void testMultiInsertModeWithDeleteEnabled() throws SQLException { + final HashMap props = new HashMap<>(); + props.put("connection.url", ""); + props.put("auto.create", true); + props.put("auto.evolve", true); + props.put("batch.size", 1000); + props.put("pk.mode", "record_key"); + props.put("pk.fields", "id"); + props.put("delete.enabled", true); + props.put("insert.mode", "MULTI"); + final JdbcSinkConfig config = new JdbcSinkConfig(props); + + final DatabaseDialect dbDialect = DatabaseDialects.findBestFor(dbUrl, config); + final DbStructure dbStructureMock = mock(DbStructure.class); + when(dbStructureMock.createOrAmendIfNecessary(any(JdbcSinkConfig.class), + any(Connection.class), + any(TableId.class), + any(FieldsMetadata.class))) + .thenReturn(true); + + final Connection connectionMock = mock(Connection.class); + final PreparedStatement preparedStatement = mock(PreparedStatement.class); + when(connectionMock.prepareStatement(contains("INSERT"))).thenReturn(preparedStatement); + when(preparedStatement.executeBatch()).thenReturn(new int[0]); + + final PreparedStatement deletePreparedStatement = mock(PreparedStatement.class); + when(connectionMock.prepareStatement(contains("DELETE"))).thenReturn(deletePreparedStatement); + when(deletePreparedStatement.executeBatch()).thenReturn(new int[]{1}); + + final TableId tableId = new TableId(null, null, "dummy"); + final BufferedRecords buffer = new BufferedRecords(config, tableId, dbDialect, dbStructureMock, connectionMock); + final Schema keySchema = SchemaBuilder.struct() + .field("id", SchemaBuilder.INT64_SCHEMA); + + final Struct keyStruct = new Struct(keySchema).put("id", 0L); + final Schema valueSchema = null; + final Struct valueStruct = null; + + final SinkRecord recordA = new SinkRecord("dummy-topic", 0, keySchema, keyStruct, valueSchema, valueStruct, 0); + buffer.add(recordA); + buffer.flush(); + + verify(connectionMock).prepareStatement(contains("DELETE FROM \"dummy\" WHERE \"id\" = ?")); + } + + @Test public void testInsertModeMultiAutomaticFlush() throws SQLException { final JdbcSinkConfig config = multiModeConfig(2); diff --git a/src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java b/src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java index b4e7527f..68b4dade 100644 --- a/src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java +++ b/src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors * Copyright 2016 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -27,6 +27,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.entry; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class JdbcSinkConfigTest { @@ -74,4 +76,17 @@ public void shouldThrowExceptionForEmptyMappingFormat() { .isInstanceOf(ConfigException.class); } + @Test + public void verifyDeleteEnabled() { + final Map props = new HashMap<>(); + props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhost"); + props.put(JdbcSinkConfig.DELETE_ENABLED, "true"); + props.put(JdbcSinkConfig.PK_MODE, "record_key"); + JdbcSinkConfig config = new JdbcSinkConfig(props); + assertTrue(config.deleteEnabled); + + props.remove(JdbcSinkConfig.DELETE_ENABLED); + config = new JdbcSinkConfig(props); + assertFalse(config.deleteEnabled); + } } diff --git a/src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConnectorTest.java b/src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConnectorTest.java new file mode 100644 index 00000000..f857cffc --- /dev/null +++ b/src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConnectorTest.java @@ -0,0 +1,114 @@ +/* + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.connect.jdbc.sink; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.apache.kafka.common.config.Config; +import org.apache.kafka.common.config.ConfigValue; + +import io.aiven.connect.jdbc.JdbcSinkConnector; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class JdbcSinkConnectorTest { + + private JdbcSinkConnector connector; + + @BeforeEach + public void setUp() { + connector = new JdbcSinkConnector(); + } + + @Test + public void testValidate_withDeleteEnabledAndPkModeNotRecordKey_shouldAddErrorMessage() { + final Map connectorConfigs = new HashMap<>(); + connectorConfigs.put(JdbcSinkConfig.DELETE_ENABLED, "true"); + connectorConfigs.put(JdbcSinkConfig.PK_MODE, "not_record_key"); + + final Config validatedConfig = connector.validate(connectorConfigs); + + final Optional deleteEnabledConfigValue = validatedConfig.configValues().stream() + .filter(cv -> cv.name().equals(JdbcSinkConfig.DELETE_ENABLED)) + .findFirst(); + + assertTrue(deleteEnabledConfigValue.isPresent()); + deleteEnabledConfigValue.ifPresent(value -> + assertTrue(value.errorMessages().contains("Delete support only works with pk.mode=record_key")) + ); + } + + @Test + public void testValidate_withDeleteEnabledAndPkModeRecordKey_shouldNotAddErrorMessage() { + final Map connectorConfigs = new HashMap<>(); + connectorConfigs.put(JdbcSinkConfig.DELETE_ENABLED, "true"); + connectorConfigs.put(JdbcSinkConfig.PK_MODE, "record_key"); + + final Config validatedConfig = connector.validate(connectorConfigs); + + final Optional deleteEnabledConfigValue = validatedConfig.configValues().stream() + .filter(cv -> cv.name().equals(JdbcSinkConfig.DELETE_ENABLED)) + .findFirst(); + + assertTrue(deleteEnabledConfigValue.isPresent()); + deleteEnabledConfigValue.ifPresent(value -> + assertFalse(value.errorMessages().contains("Delete support only works with pk.mode=record_key")) + ); + } + + @Test + public void testValidate_pkModeRecordKey_shouldNotAddErrorMessage() { + final Map connectorConfigs = new HashMap<>(); + connectorConfigs.put(JdbcSinkConfig.PK_MODE, "record_key"); + + final Config validatedConfig = connector.validate(connectorConfigs); + + final Optional deleteEnabledConfigValue = validatedConfig.configValues().stream() + .filter(cv -> cv.name().equals(JdbcSinkConfig.DELETE_ENABLED)) + .findFirst(); + + assertTrue(deleteEnabledConfigValue.isPresent()); + deleteEnabledConfigValue.ifPresent(value -> + assertFalse(value.errorMessages().contains("Delete support only works with pk.mode=record_key")) + ); + } + + @Test + public void testValidate_withDeleteDisabled_shouldNotAddErrorMessage() { + final Map connectorConfigs = new HashMap<>(); + connectorConfigs.put(JdbcSinkConfig.DELETE_ENABLED, "false"); + connectorConfigs.put(JdbcSinkConfig.PK_MODE, "anything"); + + final Config validatedConfig = connector.validate(connectorConfigs); + + final Optional deleteEnabledConfigValue = validatedConfig.configValues().stream() + .filter(cv -> cv.name().equals(JdbcSinkConfig.DELETE_ENABLED)) + .findFirst(); + + assertTrue(deleteEnabledConfigValue.isPresent()); + deleteEnabledConfigValue.ifPresent(value -> + assertTrue(value.errorMessages().isEmpty()) + ); + + } +}