Skip to content

Commit

Permalink
Add support for unqualified table names in source connector
Browse files Browse the repository at this point in the history
  • Loading branch information
C0urante committed Mar 9, 2023
1 parent 096c8c5 commit 6dc7f4a
Show file tree
Hide file tree
Showing 12 changed files with 543 additions and 106 deletions.
4 changes: 2 additions & 2 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
<!-- Legacy suppressions -->
<!-- TODO: must be fixed -->
<suppress checks="CyclomaticComplexity"
files="(BufferedRecords|DataConverter|DatabaseDialect|FieldsMetadata|HanaDialect|JdbcSourceTask|MySqlDatabaseDialect|OracleDatabaseDialect|PostgreSqlDatabaseDialect|PreparedStatementBinder|SqlServerDatabaseDialect|SqliteDatabaseDialect|TimestampIncrementingTableQuerier|VerticaDatabaseDialect|SapHanaDatabaseDialect|TableId|ColumnDefinition|TableMonitorThread).java"/>
files="(BufferedRecords|DataConverter|DatabaseDialect|FieldsMetadata|HanaDialect|JdbcSourceTask|JdbcSourceConnector|MySqlDatabaseDialect|OracleDatabaseDialect|PostgreSqlDatabaseDialect|PreparedStatementBinder|SqlServerDatabaseDialect|SqliteDatabaseDialect|TimestampIncrementingTableQuerier|VerticaDatabaseDialect|SapHanaDatabaseDialect|TableId|ColumnDefinition|TableMonitorThread).java"/>

<suppress checks="ClassDataAbstractionCoupling"
files="(DbDialect|JdbcSourceTask|GenericDatabaseDialect).java"/>

<suppress checks="NPathComplexity"
files="(DataConverter|FieldsMetadata|JdbcSourceTask|GenericDatabaseDialect).java"/>
files="(DataConverter|FieldsMetadata|JdbcSourceTask|JdbcSourceConnector|GenericDatabaseDialect).java"/>

<suppress checks="JavaNCSS"
files="(DataConverter|FieldsMetadata|JdbcSourceTask|GenericDatabaseDialect).java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;

Expand All @@ -51,6 +53,7 @@ public abstract class AbstractIT {
DockerImageName.parse("confluentinc/cp-kafka")
.withTag(DEFAULT_KAFKA_TAG);
protected static KafkaProducer<String, GenericRecord> producer;
protected static KafkaConsumer<String, GenericRecord> consumer;
@Container
protected KafkaContainer kafkaContainer = new KafkaContainer(DEFAULT_IMAGE_NAME)
.withNetwork(Network.newNetwork())
Expand All @@ -69,6 +72,7 @@ void startKafka() throws Exception {
final Path pluginDir = setupPluginDir();
setupKafkaConnect(pluginDir);
producer = createProducer();
consumer = createConsumer();
}

private static Path setupPluginDir() throws Exception {
Expand All @@ -85,15 +89,19 @@ private static Path setupPluginDir() throws Exception {
return pluginDir;
}

private void setupKafka() throws Exception {
LOGGER.info("Setup Kafka");

protected void createTopic(final String topic, final int numPartitions) throws Exception {
try (final AdminClient adminClient = createAdminClient()) {
LOGGER.info("Create topic {}", TEST_TOPIC_NAME);
final NewTopic newTopic = new NewTopic(TEST_TOPIC_NAME, 4, (short) 1);
LOGGER.info("Create topic {}", topic);
final NewTopic newTopic = new NewTopic(topic, numPartitions, (short) 1);
adminClient.createTopics(List.of(newTopic)).all().get();
}
}

private void setupKafka() throws Exception {
createTopic(TEST_TOPIC_NAME, 4);
}

protected AdminClient createAdminClient() {
final Properties adminClientConfig = new Properties();
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
Expand All @@ -118,10 +126,24 @@ protected KafkaProducer<String, GenericRecord> createProducer() {
return new KafkaProducer<>(producerProps);
}

protected KafkaConsumer<String, GenericRecord> createConsumer() {
LOGGER.info("Create kafka consumer");
final Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroDeserializer");
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroDeserializer");
consumerProps.put("schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl());
return new KafkaConsumer<>(consumerProps);
}

@AfterEach
final void tearDown() {
connectRunner.stop();
producer.close();
consumer.close();

connectRunner.awaitStop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;

import org.slf4j.Logger;
Expand Down Expand Up @@ -102,6 +103,22 @@ public void createConnector(final Map<String, String> config) throws ExecutionEx
assert connectorInfoCreated.created();
}

public void restartTask(final String connector, final int task) throws ExecutionException, InterruptedException {
assert herder != null;

final FutureCallback<Void> cb = new FutureCallback<>(
(error, ignored) -> {
if (error != null) {
LOGGER.error("Failed to restart task {}-{}", connector, task, error);
} else {
LOGGER.info("Restarted task {}-{}", connector, task);
}
});

herder.restartTask(new ConnectorTaskId(connector, task), cb);
cb.get();
}

void stop() {
connect.stop();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2022 Aiven Oy and jdbc-connector-for-apache-kafka project contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.jdbc.postgres;

import javax.sql.DataSource;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;

import io.aiven.kafka.connect.jdbc.AbstractIT;

import org.assertj.core.util.Arrays;
import org.postgresql.ds.PGSimpleDataSource;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;

public class AbstractPostgresIT extends AbstractIT {

public static final String DEFAULT_POSTGRES_TAG = "10.20";
private static final DockerImageName DEFAULT_POSTGRES_IMAGE_NAME =
DockerImageName.parse("postgres")
.withTag(DEFAULT_POSTGRES_TAG);

@Container
protected final PostgreSQLContainer<?> postgreSqlContainer = new PostgreSQLContainer<>(DEFAULT_POSTGRES_IMAGE_NAME);

protected void executeUpdate(final String updateStatement) throws SQLException {
try (final Connection connection = getDatasource().getConnection();
final Statement statement = connection.createStatement()) {
statement.executeUpdate(updateStatement);
}
}

protected DataSource getDatasource() {
final PGSimpleDataSource pgSimpleDataSource = new PGSimpleDataSource();
pgSimpleDataSource.setServerNames(Arrays.array(postgreSqlContainer.getHost()));
pgSimpleDataSource.setPortNumbers(new int[] {postgreSqlContainer.getMappedPort(5432)});
pgSimpleDataSource.setDatabaseName(postgreSqlContainer.getDatabaseName());
pgSimpleDataSource.setUser(postgreSqlContainer.getUsername());
pgSimpleDataSource.setPassword(postgreSqlContainer.getPassword());
return pgSimpleDataSource;
}

protected Map<String, String> basicConnectorConfig() {
final HashMap<String, String> config = new HashMap<>();
config.put("key.converter", "io.confluent.connect.avro.AvroConverter");
config.put("key.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl());
config.put("value.converter", "io.confluent.connect.avro.AvroConverter");
config.put("value.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl());
config.put("tasks.max", "1");
config.put("connection.url", postgreSqlContainer.getJdbcUrl());
config.put("connection.user", postgreSqlContainer.getUsername());
config.put("connection.password", postgreSqlContainer.getPassword());
config.put("dialect.name", "PostgreSqlDatabaseDialect");
return config;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,9 @@

package io.aiven.kafka.connect.jdbc.postgres;

import javax.sql.DataSource;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
Expand All @@ -33,27 +28,18 @@
import org.apache.kafka.clients.producer.RecordMetadata;

import io.aiven.connect.jdbc.JdbcSinkConnector;
import io.aiven.kafka.connect.jdbc.AbstractIT;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.assertj.core.util.Arrays;
import org.assertj.db.type.Table;
import org.junit.jupiter.api.Test;
import org.postgresql.ds.PGSimpleDataSource;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import static org.apache.avro.generic.GenericData.Record;
import static org.assertj.db.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

@Testcontainers
public class PartitionedTableIntegrationTest extends AbstractIT {
public class PartitionedTableIntegrationTest extends AbstractPostgresIT {

public static final String DEFAULT_POSTGRES_TAG = "10.20";
private static final String CONNECTOR_NAME = "test-sink-connector";
private static final int TEST_TOPIC_PARTITIONS = 1;
private static final Schema VALUE_RECORD_SCHEMA =
Expand Down Expand Up @@ -82,17 +68,11 @@ public class PartitionedTableIntegrationTest extends AbstractIT {
private static final String CREATE_PARTITION =
"create table partition partition of \"" + TEST_TOPIC_NAME
+ "\" for values from ('2022-03-03') to ('2122-03-03');";
private static final DockerImageName DEFAULT_POSTGRES_IMAGE_NAME =
DockerImageName.parse("postgres")
.withTag(DEFAULT_POSTGRES_TAG);

@Container
private final PostgreSQLContainer<?> postgreSqlContainer = new PostgreSQLContainer<>(DEFAULT_POSTGRES_IMAGE_NAME);

@Test
final void testBasicDelivery() throws ExecutionException, InterruptedException, SQLException {
executeUpdate(CREATE_TABLE);
connectRunner.createConnector(basicConnectorConfig());
connectRunner.createConnector(basicSinkConnectorConfig());

sendTestData(1000);

Expand All @@ -104,31 +84,14 @@ final void testBasicDelivery() throws ExecutionException, InterruptedException,
final void testBasicDeliveryForPartitionedTable() throws ExecutionException, InterruptedException, SQLException {
executeUpdate(CREATE_TABLE_WITH_PARTITION);
executeUpdate(CREATE_PARTITION);
connectRunner.createConnector(basicConnectorConfig());
connectRunner.createConnector(basicSinkConnectorConfig());

sendTestData(1000);

await().atMost(Duration.ofSeconds(15)).pollInterval(Duration.ofMillis(100))
.untilAsserted(() -> assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(1000));
}

private void executeUpdate(final String updateStatement) throws SQLException {
try (final Connection connection = getDatasource().getConnection();
final Statement statement = connection.createStatement()) {
statement.executeUpdate(updateStatement);
}
}

public DataSource getDatasource() {
final PGSimpleDataSource pgSimpleDataSource = new PGSimpleDataSource();
pgSimpleDataSource.setServerNames(Arrays.array(postgreSqlContainer.getHost()));
pgSimpleDataSource.setPortNumbers(new int[] {postgreSqlContainer.getMappedPort(5432)});
pgSimpleDataSource.setDatabaseName(postgreSqlContainer.getDatabaseName());
pgSimpleDataSource.setUser(postgreSqlContainer.getUsername());
pgSimpleDataSource.setPassword(postgreSqlContainer.getPassword());
return pgSimpleDataSource;
}

private void sendTestData(final int numberOfRecords) throws InterruptedException, ExecutionException {
final List<Future<RecordMetadata>> sendFutures = new ArrayList<>();
for (int i = 0; i < numberOfRecords; i++) {
Expand All @@ -155,21 +118,13 @@ private Record createRecord(final String name, final String value) {
return valueRecord;
}

private Map<String, String> basicConnectorConfig() {
final HashMap<String, String> config = new HashMap<>();
private Map<String, String> basicSinkConnectorConfig() {
final Map<String, String> config = basicConnectorConfig();
config.put("name", CONNECTOR_NAME);
config.put("connector.class", JdbcSinkConnector.class.getName());
config.put("topics", TEST_TOPIC_NAME);
config.put("key.converter", "io.confluent.connect.avro.AvroConverter");
config.put("key.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl());
config.put("value.converter", "io.confluent.connect.avro.AvroConverter");
config.put("value.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl());
config.put("tasks.max", "1");
config.put("connection.url", postgreSqlContainer.getJdbcUrl());
config.put("connection.user", postgreSqlContainer.getUsername());
config.put("connection.password", postgreSqlContainer.getPassword());
config.put("insert.mode", "insert");
config.put("dialect.name", "PostgreSqlDatabaseDialect");
return config;
}

}
Loading

0 comments on commit 6dc7f4a

Please sign in to comment.