Skip to content

Commit

Permalink
feat: Address review comments
Browse files Browse the repository at this point in the history
Contributes to: event-integration/eventstreams-planning#0

Signed-off-by: Joel Hanson <[email protected]>
Signed-off-by: Joel Hanson <[email protected]>
  • Loading branch information
Joel-hanson authored and Joel Hanson committed Jun 7, 2024
1 parent 3d94433 commit 43b2959
Show file tree
Hide file tree
Showing 16 changed files with 811 additions and 175 deletions.
4 changes: 2 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import java.io.FileOutputStream
import java.net.URL

/*
* Copyright 2021 Aiven Oy and jdbc-connector-for-apache-kafka contributors
* Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -163,7 +163,7 @@ dependencies {

runtimeOnly("org.xerial:sqlite-jdbc:3.46.0.0")
runtimeOnly("org.postgresql:postgresql:42.7.3")
runtimeOnly("com.oracle.database.jdbc:ojdbc8:19.3.0.0")
runtimeOnly("com.oracle.database.jdbc:ojdbc8:23.4.0.24.05")
runtimeOnly("net.sourceforge.jtds:jtds:1.3.1")
runtimeOnly("net.snowflake:snowflake-jdbc:3.16.0")
runtimeOnly("com.microsoft.sqlserver:mssql-jdbc:12.6.1.jre11")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class AbstractOracleIT extends AbstractIT {
DockerImageName.parse("gvenzl/oracle-free")
.withTag(DEFAULT_ORACLE_TAG);
@Container
public static final OracleContainer ORACLE_CONTAINER = new OracleContainer(DEFAULT_ORACLE_IMAGE_NAME);
protected final OracleContainer oracleContainer = new OracleContainer(DEFAULT_ORACLE_IMAGE_NAME);

protected void executeSqlStatement(final String sqlStatement) throws SQLException {
try (final Connection connection = getDatasource().getConnection();
Expand All @@ -49,13 +49,13 @@ protected void executeSqlStatement(final String sqlStatement) throws SQLExceptio

protected DataSource getDatasource() throws SQLException {
final OracleDataSource dataSource = new OracleDataSource();
dataSource.setServerName(ORACLE_CONTAINER.getHost());
dataSource.setServerName(oracleContainer.getHost());
// Assuming the default Oracle port is 1521
dataSource.setPortNumber(ORACLE_CONTAINER.getMappedPort(1521));
dataSource.setPortNumber(oracleContainer.getMappedPort(1521));
// Or use setDatabaseName() if that's how your Oracle is configured
dataSource.setServiceName(ORACLE_CONTAINER.getDatabaseName());
dataSource.setUser(ORACLE_CONTAINER.getUsername());
dataSource.setPassword(ORACLE_CONTAINER.getPassword());
dataSource.setServiceName(oracleContainer.getDatabaseName());
dataSource.setUser(oracleContainer.getUsername());
dataSource.setPassword(oracleContainer.getPassword());
dataSource.setDriverType("thin");
return dataSource;
}
Expand All @@ -64,9 +64,9 @@ protected DataSource getDatasource() throws SQLException {
protected Map<String, String> basicConnectorConfig() {
final HashMap<String, String> config = new HashMap<>();
config.put("tasks.max", "1");
config.put("connection.url", ORACLE_CONTAINER.getJdbcUrl());
config.put("connection.user", ORACLE_CONTAINER.getUsername());
config.put("connection.password", ORACLE_CONTAINER.getPassword());
config.put("connection.url", oracleContainer.getJdbcUrl());
config.put("connection.user", oracleContainer.getUsername());
config.put("connection.password", oracleContainer.getPassword());
config.put("dialect.name", "OracleDatabaseDialect");
config.put("key.converter", "io.confluent.connect.avro.AvroConverter");
config.put("key.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,15 @@

package io.aiven.kafka.connect.jdbc.oracle;

import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;

import io.aiven.connect.jdbc.JdbcSinkConnector;

Expand All @@ -41,7 +37,6 @@

import static org.assertj.db.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertNotSame;

public class VerifyDeleteIT extends AbstractOracleIT {

Expand All @@ -64,7 +59,6 @@ public class VerifyDeleteIT extends AbstractOracleIT {
+ " ]\n"
+ "}");

private static final String DROP_TABLE = String.format("DROP TABLE IF EXISTS %s", TEST_TOPIC_NAME);
private static final String CREATE_TABLE = String.format("CREATE TABLE \"%s\" (\n"
+ " \"id\" NUMBER NOT NULL,\n"
+ " \"name\" VARCHAR2(255) NOT NULL,\n"
Expand All @@ -83,11 +77,6 @@ private Map<String, String> sinkConnectorConfigForDelete() {
return config;
}

private void assertNotEmptyPoll(final Duration duration) {
final ConsumerRecords<?, ?> records = consumer.poll(duration);
assertNotSame(ConsumerRecords.empty(), records);
}

private ProducerRecord<String, GenericRecord> createRecord(
final int id, final int partition, final String name, final String value) {
final GenericRecord record = new GenericData.Record(VALUE_RECORD_SCHEMA);
Expand Down Expand Up @@ -155,22 +144,18 @@ private void sendMixedTestDataWithTombstone(final int numberOfRecords, final int
}

@BeforeEach
public void afterEach() throws SQLException {
executeSqlStatement(DROP_TABLE);
public void beforeEach() throws Exception {
executeSqlStatement(CREATE_TABLE);
createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name
}

@Test
public void testDeleteTombstoneRecord() throws Exception {
createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name
consumer.assign(Collections.singleton(new TopicPartition(TEST_TOPIC_NAME, 0)));

// Start the sink connector
// Test deleting records using tombstone records
connectRunner.createConnector(sinkConnectorConfigForDelete());

sendTestData(3);

await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(18))
await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(5))
.untilAsserted(() -> {
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(3);
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID")
Expand All @@ -182,9 +167,7 @@ public void testDeleteTombstoneRecord() throws Exception {
// Send test data to Kafka topic (including a tombstone record)
sendTestDataWithTombstone(1);

assertNotEmptyPoll(Duration.ofSeconds(50));

await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(18))
await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(5))
.untilAsserted(() -> {
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(2);
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID")
Expand All @@ -195,61 +178,30 @@ public void testDeleteTombstoneRecord() throws Exception {

@Test
public void testWithJustTombstoneRecordInInsertMode() throws Exception {
// Test logic is similar to previous tests, but with tombstone records.
createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name
consumer.assign(Collections.singleton(new TopicPartition(TEST_TOPIC_NAME, 0)));
// Test behavior with only tombstone records in insert mode

// Start the sink connector
final Map<String, String> config = sinkConnectorConfigForDelete();
connectRunner.createConnector(config);

sendTestDataWithTombstone(2);

await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(18))
.untilAsserted(() -> {
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(0);
});
}

@Test
public void testMultiInsertMode() throws Exception {
// Test logic is similar to previous tests, but with multi-insert mode enabled
createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name
consumer.assign(Collections.singleton(new TopicPartition(TEST_TOPIC_NAME, 0)));

// Start the sink connector
final Map<String, String> config = sinkConnectorConfigForDelete();
config.put("insert.mode", "MULTI");
connectRunner.createConnector(config);

sendTestData(5);

await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(18))
.untilAsserted(() -> {
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(5);
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID")
.value().isEqualTo("0")
.value().isEqualTo("1")
.value().isEqualTo("2")
.value().isEqualTo("3")
.value().isEqualTo("4");
});
// TODO: Instead of sleeping for a fixed interval,
// wait for the connector to commit offsets for the records we sent
Thread.sleep(5_000); // Give the connector at least five seconds to read our tombstone messages from Kafka
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(0);
}

@Test
public void testDeleteTombstoneRecordWithMultiMode() throws Exception {
// Test logic is similar to previous tests, but with multi-insert mode enabled and tombstone records included
createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name
consumer.assign(Collections.singleton(new TopicPartition(TEST_TOPIC_NAME, 0)));
// Test deleting records using tombstone records with multi-insert mode enabled

// Start the sink connector
final Map<String, String> config = sinkConnectorConfigForDelete();
config.put("insert.mode", "MULTI");
connectRunner.createConnector(config);

sendTestData(5);

await().atMost(Duration.ofSeconds(30)).pollInterval(Duration.ofSeconds(20))
await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(5))
.untilAsserted(() -> {
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(5);
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID")
Expand All @@ -262,7 +214,7 @@ public void testDeleteTombstoneRecordWithMultiMode() throws Exception {

sendTestDataWithTombstone(1);

await().atMost(Duration.ofSeconds(30)).pollInterval(Duration.ofSeconds(18))
await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(5))
.untilAsserted(() -> {
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(4);
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID")
Expand All @@ -275,37 +227,31 @@ public void testDeleteTombstoneRecordWithMultiMode() throws Exception {

@Test
public void testWithJustTombstoneRecordWithMultiMode() throws Exception {
// Test logic with multi-insert mode enabled and has only tombstone records
createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name
consumer.assign(Collections.singleton(new TopicPartition(TEST_TOPIC_NAME, 0)));
// Test behavior with only tombstone records in multi-insert mode

// Start the sink connector
final Map<String, String> config = sinkConnectorConfigForDelete();
config.put("insert.mode", "MULTI");
connectRunner.createConnector(config);

sendTestDataWithTombstone(2);

await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(18))
.untilAsserted(() -> {
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(0);
});
// TODO: Instead of sleeping for a fixed interval,
// wait for the connector to commit offsets for the records we sent
Thread.sleep(5_000); // Give the connector at least five seconds to read our tombstone messages from Kafka
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(0);
}

@Test
public void testMixTombstoneRecordsWithMultiMode() throws Exception {
// Test logic is similar to previous tests, but with mixed tombstone records and multi-insert mode
createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name
consumer.assign(Collections.singleton(new TopicPartition(TEST_TOPIC_NAME, 0)));
// Test behavior with mixed tombstone and insert records in multi-insert mode

// Start the sink connector
final Map<String, String> config = sinkConnectorConfigForDelete();
config.put("insert.mode", "MULTI");
connectRunner.createConnector(config);

sendMixedTestDataWithTombstone(5, 2);

await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(19))
await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(5))
.untilAsserted(() -> {
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(3);
assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID")
Expand Down
Loading

0 comments on commit 43b2959

Please sign in to comment.