Skip to content

Commit

Permalink
feat: Address review comments
Browse files Browse the repository at this point in the history
Contributes to: event-integration/eventstreams-planning#0

Signed-off-by: Joel Hanson <[email protected]>
Signed-off-by: Joel Hanson <[email protected]>
  • Loading branch information
Joel-hanson authored and Joel Hanson committed Jun 4, 2024
1 parent 5e2a3c8 commit 5ef719d
Show file tree
Hide file tree
Showing 16 changed files with 833 additions and 143 deletions.
4 changes: 2 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import java.io.FileOutputStream
import java.net.URL

/*
* Copyright 2021 Aiven Oy and jdbc-connector-for-apache-kafka contributors
* Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -164,7 +164,7 @@ dependencies {

runtimeOnly("org.xerial:sqlite-jdbc:3.45.2.0")
runtimeOnly("org.postgresql:postgresql:42.7.3")
runtimeOnly("com.oracle.database.jdbc:ojdbc8:19.3.0.0")
runtimeOnly("com.oracle.database.jdbc:ojdbc8:23.4.0.24.05")
runtimeOnly("net.sourceforge.jtds:jtds:1.3.1")
runtimeOnly("net.snowflake:snowflake-jdbc:3.14.2")
runtimeOnly("com.microsoft.sqlserver:mssql-jdbc:12.6.1.jre11")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ protected DataSource getDatasource() throws SQLException {
final OracleDataSource dataSource = new OracleDataSource();
dataSource.setServerName(ORACLE_CONTAINER.getHost());
// Assuming the default Oracle port is 1521
dataSource.setPortNumber(ORACLE_CONTAINER.getMappedPort(1521));
dataSource.setPortNumber(ORACLE_CONTAINER.getMappedPort(1521));
// Or use setDatabaseName() if that's how your Oracle is configured
dataSource.setServiceName(ORACLE_CONTAINER.getDatabaseName());
dataSource.setServiceName(ORACLE_CONTAINER.getDatabaseName());
dataSource.setUser(ORACLE_CONTAINER.getUsername());
dataSource.setPassword(ORACLE_CONTAINER.getPassword());
dataSource.setDriverType("thin");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,13 @@
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;

import io.aiven.connect.jdbc.JdbcSinkConnector;

Expand All @@ -41,7 +38,6 @@

import static org.assertj.db.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertNotSame;

public class VerifyDeleteIT extends AbstractOracleIT {

Expand Down Expand Up @@ -83,11 +79,6 @@ private Map<String, String> sinkConnectorConfigForDelete() {
return config;
}

private void assertNotEmptyPoll(final Duration duration) {
final ConsumerRecords<?, ?> records = consumer.poll(duration);
assertNotSame(ConsumerRecords.empty(), records);
}

private ProducerRecord<String, GenericRecord> createRecord(
final int id, final int partition, final String name, final String value) {
final GenericRecord record = new GenericData.Record(VALUE_RECORD_SCHEMA);
Expand Down Expand Up @@ -163,14 +154,13 @@ public void afterEach() throws SQLException {
@Test
public void testDeleteTombstoneRecord() throws Exception {
createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name
consumer.assign(Collections.singleton(new TopicPartition(TEST_TOPIC_NAME, 0)));

// Start the sink connector
connectRunner.createConnector(sinkConnectorConfigForDelete());

sendTestData(3);

await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(18))
await().atMost(Duration.ofSeconds(6)).pollInterval(Duration.ofSeconds(2))
.untilAsserted(() -> {
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(3);
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID")
Expand All @@ -182,9 +172,7 @@ public void testDeleteTombstoneRecord() throws Exception {
// Send test data to Kafka topic (including a tombstone record)
sendTestDataWithTombstone(1);

assertNotEmptyPoll(Duration.ofSeconds(50));

await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(18))
await().atMost(Duration.ofSeconds(6)).pollInterval(Duration.ofSeconds(2))
.untilAsserted(() -> {
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(2);
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID")
Expand All @@ -197,50 +185,23 @@ public void testDeleteTombstoneRecord() throws Exception {
public void testWithJustTombstoneRecordInInsertMode() throws Exception {
// Test logic is similar to previous tests, but with tombstone records.
createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name
consumer.assign(Collections.singleton(new TopicPartition(TEST_TOPIC_NAME, 0)));

// Start the sink connector
final Map<String, String> config = sinkConnectorConfigForDelete();
connectRunner.createConnector(config);

sendTestDataWithTombstone(2);

await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(18))
await().atMost(Duration.ofSeconds(6)).pollInterval(Duration.ofSeconds(2))
.untilAsserted(() -> {
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(0);
});
}

@Test
public void testMultiInsertMode() throws Exception {
// Test logic is similar to previous tests, but with multi-insert mode enabled
createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name
consumer.assign(Collections.singleton(new TopicPartition(TEST_TOPIC_NAME, 0)));

// Start the sink connector
final Map<String, String> config = sinkConnectorConfigForDelete();
config.put("insert.mode", "MULTI");
connectRunner.createConnector(config);

sendTestData(5);

await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(18))
.untilAsserted(() -> {
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(5);
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID")
.value().isEqualTo("0")
.value().isEqualTo("1")
.value().isEqualTo("2")
.value().isEqualTo("3")
.value().isEqualTo("4");
});
}

@Test
public void testDeleteTombstoneRecordWithMultiMode() throws Exception {
// Test logic is similar to previous tests, but with multi-insert mode enabled and tombstone records included
createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name
consumer.assign(Collections.singleton(new TopicPartition(TEST_TOPIC_NAME, 0)));

// Start the sink connector
final Map<String, String> config = sinkConnectorConfigForDelete();
Expand All @@ -249,7 +210,7 @@ public void testDeleteTombstoneRecordWithMultiMode() throws Exception {

sendTestData(5);

await().atMost(Duration.ofSeconds(30)).pollInterval(Duration.ofSeconds(20))
await().atMost(Duration.ofSeconds(6)).pollInterval(Duration.ofSeconds(2))
.untilAsserted(() -> {
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(5);
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID")
Expand All @@ -262,7 +223,7 @@ public void testDeleteTombstoneRecordWithMultiMode() throws Exception {

sendTestDataWithTombstone(1);

await().atMost(Duration.ofSeconds(30)).pollInterval(Duration.ofSeconds(18))
await().atMost(Duration.ofSeconds(6)).pollInterval(Duration.ofSeconds(2))
.untilAsserted(() -> {
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(4);
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID")
Expand All @@ -277,7 +238,6 @@ public void testDeleteTombstoneRecordWithMultiMode() throws Exception {
public void testWithJustTombstoneRecordWithMultiMode() throws Exception {
// Test logic with multi-insert mode enabled and has only tombstone records
createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name
consumer.assign(Collections.singleton(new TopicPartition(TEST_TOPIC_NAME, 0)));

// Start the sink connector
final Map<String, String> config = sinkConnectorConfigForDelete();
Expand All @@ -286,7 +246,7 @@ public void testWithJustTombstoneRecordWithMultiMode() throws Exception {

sendTestDataWithTombstone(2);

await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(18))
await().atMost(Duration.ofSeconds(6)).pollInterval(Duration.ofSeconds(2))
.untilAsserted(() -> {
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(0);
});
Expand All @@ -296,7 +256,6 @@ public void testWithJustTombstoneRecordWithMultiMode() throws Exception {
public void testMixTombstoneRecordsWithMultiMode() throws Exception {
// Test logic is similar to previous tests, but with mixed tombstone records and multi-insert mode
createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name
consumer.assign(Collections.singleton(new TopicPartition(TEST_TOPIC_NAME, 0)));

// Start the sink connector
final Map<String, String> config = sinkConnectorConfigForDelete();
Expand All @@ -305,7 +264,7 @@ public void testMixTombstoneRecordsWithMultiMode() throws Exception {

sendMixedTestDataWithTombstone(5, 2);

await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(19))
await().atMost(Duration.ofSeconds(6)).pollInterval(Duration.ofSeconds(2))
.untilAsserted(() -> {
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(3);
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;

import io.aiven.connect.jdbc.JdbcSinkConnector;

Expand All @@ -41,7 +39,6 @@
import static org.assertj.db.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;


public class VerifyInsertIT extends AbstractOracleIT {

private static final String TEST_TOPIC_NAME = "SINK_TOPIC";
Expand Down Expand Up @@ -87,13 +84,25 @@ private Map<String, String> basicSinkConnectorConfig() {
return config;
}

private Map<String, String> sinkConnectorConfigWithPKModeRecordKey() {
private Map<String, String> sinkConnectorConfigWith_PKModeRecordKey() {
final Map<String, String> config = basicSinkConnectorConfig();
config.put("pk.mode", "record_key");
config.put("pk.fields", "id"); // assigned name for the primitive key
return config;
}

private Map<String, String> sinkConnectorConfigWith_PKModeRecordKey_DeleteEnabled() {
final Map<String, String> config = sinkConnectorConfigWith_PKModeRecordKey();
config.put("delete.enabled", String.valueOf(true));
return config;
}

private Map<String, String> sinkConnectorConfigWith_PKModeRecordKey_DeleteEnabled_InsertModeMulti() {
final Map<String, String> config = sinkConnectorConfigWith_PKModeRecordKey_DeleteEnabled();
config.put("insert.mode", "MULTI");
return config;
}

private ProducerRecord<String, GenericRecord> createRecord(
final int id, final int partition, final String name, final String value) {
final GenericRecord record = new GenericData.Record(VALUE_RECORD_SCHEMA);
Expand Down Expand Up @@ -128,14 +137,14 @@ public void afterEach() throws SQLException {
public void testSinkConnector() throws Exception {
createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name
executeSqlStatement(CREATE_TABLE);
consumer.assign(Collections.singleton(new TopicPartition(TEST_TOPIC_NAME, 0)));

// Start the sink connector
connectRunner.createConnector(basicSinkConnectorConfig());

// Send test data to Kafka topic
sendTestData(1);

await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(19))
await().atMost(Duration.ofSeconds(6)).pollInterval(Duration.ofSeconds(2))
.untilAsserted(() -> {
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(1);
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID")
Expand All @@ -147,18 +156,61 @@ public void testSinkConnector() throws Exception {
public void testSinkWithPKModeRecordKeyConnector() throws Exception {
createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name
executeSqlStatement(CREATE_TABLE_WITH_PK);
consumer.assign(Collections.singleton(new TopicPartition(TEST_TOPIC_NAME, 0)));

// Start the sink connector
connectRunner.createConnector(sinkConnectorConfigWithPKModeRecordKey());
connectRunner.createConnector(sinkConnectorConfigWith_PKModeRecordKey());

// Send test data to Kafka topic
sendTestData(1);

await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(19))
await().atMost(Duration.ofSeconds(6)).pollInterval(Duration.ofSeconds(2))
.untilAsserted(() -> {
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(1);
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID")
.value().isEqualTo("0");
});
}

@Test
public void testSinkConnectorInDeleteMode() throws Exception {
createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name
executeSqlStatement(CREATE_TABLE);

// Start the sink connector
connectRunner.createConnector(sinkConnectorConfigWith_PKModeRecordKey_DeleteEnabled());

// Send test data to Kafka topic
sendTestData(1);

await().atMost(Duration.ofSeconds(6)).pollInterval(Duration.ofSeconds(2))
.untilAsserted(() -> {
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(1);
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID")
.value().isEqualTo("0");
});
}

@Test
public void testMultiInsertMode() throws Exception {
// Test logic is similar to previous tests, but with multi-insert mode enabled
createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name
executeSqlStatement(CREATE_TABLE);

// Start the sink connector
final Map<String, String> config = sinkConnectorConfigWith_PKModeRecordKey_DeleteEnabled_InsertModeMulti();
connectRunner.createConnector(config);

sendTestData(5);

await().atMost(Duration.ofSeconds(6)).pollInterval(Duration.ofSeconds(2))
.untilAsserted(() -> {
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(5);
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID")
.value().isEqualTo("0")
.value().isEqualTo("1")
.value().isEqualTo("2")
.value().isEqualTo("3")
.value().isEqualTo("4");
});
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 Aiven Oy and jdbc-connector-for-apache-kafka project contributors
* Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,22 +40,24 @@ public class AbstractPostgresIT extends AbstractIT {
.withTag(DEFAULT_POSTGRES_TAG);

@Container
protected final PostgreSQLContainer<?> postgreSqlContainer = new PostgreSQLContainer<>(DEFAULT_POSTGRES_IMAGE_NAME);
public static final PostgreSQLContainer<?> POSTGRES_CONTAINER = new PostgreSQLContainer<>(
DEFAULT_POSTGRES_IMAGE_NAME
);

protected void executeUpdate(final String updateStatement) throws SQLException {
protected void executeSqlStatement(final String sqlStatement) throws SQLException {
try (final Connection connection = getDatasource().getConnection();
final Statement statement = connection.createStatement()) {
statement.executeUpdate(updateStatement);
statement.executeUpdate(sqlStatement);
}
}

protected DataSource getDatasource() {
final PGSimpleDataSource pgSimpleDataSource = new PGSimpleDataSource();
pgSimpleDataSource.setServerNames(Arrays.array(postgreSqlContainer.getHost()));
pgSimpleDataSource.setPortNumbers(new int[] {postgreSqlContainer.getMappedPort(5432)});
pgSimpleDataSource.setDatabaseName(postgreSqlContainer.getDatabaseName());
pgSimpleDataSource.setUser(postgreSqlContainer.getUsername());
pgSimpleDataSource.setPassword(postgreSqlContainer.getPassword());
pgSimpleDataSource.setServerNames(Arrays.array(POSTGRES_CONTAINER.getHost()));
pgSimpleDataSource.setPortNumbers(new int[] {POSTGRES_CONTAINER.getMappedPort(5432)});
pgSimpleDataSource.setDatabaseName(POSTGRES_CONTAINER.getDatabaseName());
pgSimpleDataSource.setUser(POSTGRES_CONTAINER.getUsername());
pgSimpleDataSource.setPassword(POSTGRES_CONTAINER.getPassword());
return pgSimpleDataSource;
}

Expand All @@ -66,9 +68,9 @@ protected Map<String, String> basicConnectorConfig() {
config.put("value.converter", "io.confluent.connect.avro.AvroConverter");
config.put("value.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl());
config.put("tasks.max", "1");
config.put("connection.url", postgreSqlContainer.getJdbcUrl());
config.put("connection.user", postgreSqlContainer.getUsername());
config.put("connection.password", postgreSqlContainer.getPassword());
config.put("connection.url", POSTGRES_CONTAINER.getJdbcUrl());
config.put("connection.user", POSTGRES_CONTAINER.getUsername());
config.put("connection.password", POSTGRES_CONTAINER.getPassword());
config.put("dialect.name", "PostgreSqlDatabaseDialect");
return config;
}
Expand Down
Loading

0 comments on commit 5ef719d

Please sign in to comment.