diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 65875836..5b8dc05d 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -22,13 +22,13 @@
+ files="(BufferedRecords|DataConverter|DatabaseDialect|FieldsMetadata|HanaDialect|JdbcSourceTask|JdbcSourceConnector|MySqlDatabaseDialect|OracleDatabaseDialect|PostgreSqlDatabaseDialect|PreparedStatementBinder|SqlServerDatabaseDialect|SqliteDatabaseDialect|TimestampIncrementingTableQuerier|VerticaDatabaseDialect|SapHanaDatabaseDialect|TableId|ColumnDefinition|TableMonitorThread).java"/>
+ files="(DataConverter|FieldsMetadata|JdbcSourceTask|JdbcSourceConnector|GenericDatabaseDialect).java"/>
diff --git a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/AbstractIT.java b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/AbstractIT.java
index cc98c105..15f9c9b1 100644
--- a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/AbstractIT.java
+++ b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/AbstractIT.java
@@ -19,6 +19,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -27,6 +28,8 @@
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -51,6 +54,7 @@ public abstract class AbstractIT {
DockerImageName.parse("confluentinc/cp-kafka")
.withTag(DEFAULT_KAFKA_TAG);
protected static KafkaProducer producer;
+ protected static KafkaConsumer consumer;
@Container
protected KafkaContainer kafkaContainer = new KafkaContainer(DEFAULT_IMAGE_NAME)
.withNetwork(Network.newNetwork())
@@ -69,6 +73,7 @@ void startKafka() throws Exception {
final Path pluginDir = setupPluginDir();
setupKafkaConnect(pluginDir);
producer = createProducer();
+ consumer = createConsumer();
}
private static Path setupPluginDir() throws Exception {
@@ -85,15 +90,19 @@ private static Path setupPluginDir() throws Exception {
return pluginDir;
}
- private void setupKafka() throws Exception {
- LOGGER.info("Setup Kafka");
+
+ protected void createTopic(final String topic, final int numPartitions) throws Exception {
try (final AdminClient adminClient = createAdminClient()) {
- LOGGER.info("Create topic {}", TEST_TOPIC_NAME);
- final NewTopic newTopic = new NewTopic(TEST_TOPIC_NAME, 4, (short) 1);
+ LOGGER.info("Create topic {}", topic);
+ final NewTopic newTopic = new NewTopic(topic, numPartitions, (short) 1);
adminClient.createTopics(List.of(newTopic)).all().get();
}
}
+ private void setupKafka() throws Exception {
+ createTopic(TEST_TOPIC_NAME, 4);
+ }
+
protected AdminClient createAdminClient() {
final Properties adminClientConfig = new Properties();
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
@@ -118,10 +127,24 @@ protected KafkaProducer createProducer() {
return new KafkaProducer<>(producerProps);
}
+ protected KafkaConsumer createConsumer() {
+ LOGGER.info("Create kafka consumer");
+ final Map consumerProps = new HashMap<>();
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
+ consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ "io.confluent.kafka.serializers.KafkaAvroDeserializer");
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ "io.confluent.kafka.serializers.KafkaAvroDeserializer");
+ consumerProps.put("schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl());
+ return new KafkaConsumer<>(consumerProps);
+ }
+
@AfterEach
final void tearDown() {
connectRunner.stop();
producer.close();
+ consumer.close();
connectRunner.awaitStop();
}
diff --git a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/ConnectRunner.java b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/ConnectRunner.java
index 126e1ebb..86c0e20d 100644
--- a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/ConnectRunner.java
+++ b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/ConnectRunner.java
@@ -31,6 +31,7 @@
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
+import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger;
@@ -102,6 +103,22 @@ public void createConnector(final Map config) throws ExecutionEx
assert connectorInfoCreated.created();
}
+ public void restartTask(final String connector, final int task) throws ExecutionException, InterruptedException {
+ assert herder != null;
+
+ final FutureCallback cb = new FutureCallback<>(
+ (error, ignored) -> {
+ if (error != null) {
+ LOGGER.error("Failed to restart task {}-{}", connector, task, error);
+ } else {
+ LOGGER.info("Restarted task {}-{}", connector, task);
+ }
+ });
+
+ herder.restartTask(new ConnectorTaskId(connector, task), cb);
+ cb.get();
+ }
+
void stop() {
connect.stop();
}
diff --git a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/AbstractPostgresIT.java b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/AbstractPostgresIT.java
new file mode 100644
index 00000000..31b83b9f
--- /dev/null
+++ b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/AbstractPostgresIT.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2022 Aiven Oy and jdbc-connector-for-apache-kafka project contributors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.connect.jdbc.postgres;
+
+import javax.sql.DataSource;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+
+import io.aiven.kafka.connect.jdbc.AbstractIT;
+
+import org.assertj.core.util.Arrays;
+import org.postgresql.ds.PGSimpleDataSource;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.utility.DockerImageName;
+
+public class AbstractPostgresIT extends AbstractIT {
+
+ public static final String DEFAULT_POSTGRES_TAG = "10.20";
+ private static final DockerImageName DEFAULT_POSTGRES_IMAGE_NAME =
+ DockerImageName.parse("postgres")
+ .withTag(DEFAULT_POSTGRES_TAG);
+
+ @Container
+ protected final PostgreSQLContainer> postgreSqlContainer = new PostgreSQLContainer<>(DEFAULT_POSTGRES_IMAGE_NAME);
+
+ protected void executeUpdate(final String updateStatement) throws SQLException {
+ try (final Connection connection = getDatasource().getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.executeUpdate(updateStatement);
+ }
+ }
+
+ protected DataSource getDatasource() {
+ final PGSimpleDataSource pgSimpleDataSource = new PGSimpleDataSource();
+ pgSimpleDataSource.setServerNames(Arrays.array(postgreSqlContainer.getHost()));
+ pgSimpleDataSource.setPortNumbers(new int[] {postgreSqlContainer.getMappedPort(5432)});
+ pgSimpleDataSource.setDatabaseName(postgreSqlContainer.getDatabaseName());
+ pgSimpleDataSource.setUser(postgreSqlContainer.getUsername());
+ pgSimpleDataSource.setPassword(postgreSqlContainer.getPassword());
+ return pgSimpleDataSource;
+ }
+
+ protected Map basicConnectorConfig() {
+ final HashMap config = new HashMap<>();
+ config.put("key.converter", "io.confluent.connect.avro.AvroConverter");
+ config.put("key.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl());
+ config.put("value.converter", "io.confluent.connect.avro.AvroConverter");
+ config.put("value.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl());
+ config.put("tasks.max", "1");
+ config.put("connection.url", postgreSqlContainer.getJdbcUrl());
+ config.put("connection.user", postgreSqlContainer.getUsername());
+ config.put("connection.password", postgreSqlContainer.getPassword());
+ config.put("dialect.name", "PostgreSqlDatabaseDialect");
+ return config;
+ }
+
+}
diff --git a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/PartitionedTableIntegrationTest.java b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/PartitionedTableIntegrationTest.java
index c003e6f6..67a33173 100644
--- a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/PartitionedTableIntegrationTest.java
+++ b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/PartitionedTableIntegrationTest.java
@@ -16,14 +16,9 @@
package io.aiven.kafka.connect.jdbc.postgres;
-import javax.sql.DataSource;
-
-import java.sql.Connection;
import java.sql.SQLException;
-import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@@ -33,27 +28,18 @@
import org.apache.kafka.clients.producer.RecordMetadata;
import io.aiven.connect.jdbc.JdbcSinkConnector;
-import io.aiven.kafka.connect.jdbc.AbstractIT;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
-import org.assertj.core.util.Arrays;
import org.assertj.db.type.Table;
import org.junit.jupiter.api.Test;
-import org.postgresql.ds.PGSimpleDataSource;
-import org.testcontainers.containers.PostgreSQLContainer;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
-import org.testcontainers.utility.DockerImageName;
import static org.apache.avro.generic.GenericData.Record;
import static org.assertj.db.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
-@Testcontainers
-public class PartitionedTableIntegrationTest extends AbstractIT {
+public class PartitionedTableIntegrationTest extends AbstractPostgresIT {
- public static final String DEFAULT_POSTGRES_TAG = "10.20";
private static final String CONNECTOR_NAME = "test-sink-connector";
private static final int TEST_TOPIC_PARTITIONS = 1;
private static final Schema VALUE_RECORD_SCHEMA =
@@ -82,17 +68,11 @@ public class PartitionedTableIntegrationTest extends AbstractIT {
private static final String CREATE_PARTITION =
"create table partition partition of \"" + TEST_TOPIC_NAME
+ "\" for values from ('2022-03-03') to ('2122-03-03');";
- private static final DockerImageName DEFAULT_POSTGRES_IMAGE_NAME =
- DockerImageName.parse("postgres")
- .withTag(DEFAULT_POSTGRES_TAG);
-
- @Container
- private final PostgreSQLContainer> postgreSqlContainer = new PostgreSQLContainer<>(DEFAULT_POSTGRES_IMAGE_NAME);
@Test
final void testBasicDelivery() throws ExecutionException, InterruptedException, SQLException {
executeUpdate(CREATE_TABLE);
- connectRunner.createConnector(basicConnectorConfig());
+ connectRunner.createConnector(basicSinkConnectorConfig());
sendTestData(1000);
@@ -104,7 +84,7 @@ final void testBasicDelivery() throws ExecutionException, InterruptedException,
final void testBasicDeliveryForPartitionedTable() throws ExecutionException, InterruptedException, SQLException {
executeUpdate(CREATE_TABLE_WITH_PARTITION);
executeUpdate(CREATE_PARTITION);
- connectRunner.createConnector(basicConnectorConfig());
+ connectRunner.createConnector(basicSinkConnectorConfig());
sendTestData(1000);
@@ -112,23 +92,6 @@ final void testBasicDeliveryForPartitionedTable() throws ExecutionException, Int
.untilAsserted(() -> assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(1000));
}
- private void executeUpdate(final String updateStatement) throws SQLException {
- try (final Connection connection = getDatasource().getConnection();
- final Statement statement = connection.createStatement()) {
- statement.executeUpdate(updateStatement);
- }
- }
-
- public DataSource getDatasource() {
- final PGSimpleDataSource pgSimpleDataSource = new PGSimpleDataSource();
- pgSimpleDataSource.setServerNames(Arrays.array(postgreSqlContainer.getHost()));
- pgSimpleDataSource.setPortNumbers(new int[] {postgreSqlContainer.getMappedPort(5432)});
- pgSimpleDataSource.setDatabaseName(postgreSqlContainer.getDatabaseName());
- pgSimpleDataSource.setUser(postgreSqlContainer.getUsername());
- pgSimpleDataSource.setPassword(postgreSqlContainer.getPassword());
- return pgSimpleDataSource;
- }
-
private void sendTestData(final int numberOfRecords) throws InterruptedException, ExecutionException {
final List> sendFutures = new ArrayList<>();
for (int i = 0; i < numberOfRecords; i++) {
@@ -155,21 +118,13 @@ private Record createRecord(final String name, final String value) {
return valueRecord;
}
- private Map basicConnectorConfig() {
- final HashMap config = new HashMap<>();
+ private Map basicSinkConnectorConfig() {
+ final Map config = basicConnectorConfig();
config.put("name", CONNECTOR_NAME);
config.put("connector.class", JdbcSinkConnector.class.getName());
config.put("topics", TEST_TOPIC_NAME);
- config.put("key.converter", "io.confluent.connect.avro.AvroConverter");
- config.put("key.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl());
- config.put("value.converter", "io.confluent.connect.avro.AvroConverter");
- config.put("value.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl());
- config.put("tasks.max", "1");
- config.put("connection.url", postgreSqlContainer.getJdbcUrl());
- config.put("connection.user", postgreSqlContainer.getUsername());
- config.put("connection.password", postgreSqlContainer.getPassword());
config.put("insert.mode", "insert");
- config.put("dialect.name", "PostgreSqlDatabaseDialect");
return config;
}
+
}
diff --git a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/UnqualifiedTableNamesIntegrationTest.java b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/UnqualifiedTableNamesIntegrationTest.java
new file mode 100644
index 00000000..7beb2213
--- /dev/null
+++ b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/UnqualifiedTableNamesIntegrationTest.java
@@ -0,0 +1,202 @@
+/*
+ * Copyright 2022 Aiven Oy and jdbc-connector-for-apache-kafka project contributors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.connect.jdbc.postgres;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+
+import io.aiven.connect.jdbc.JdbcSourceConnector;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.jupiter.api.Test;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class UnqualifiedTableNamesIntegrationTest extends AbstractPostgresIT {
+ private static final String CONNECTOR_NAME = "test-source-connector";
+
+ private static final String TABLE = "dup";
+ private static final String PREFERRED_SCHEMA = "preferred";
+ private static final String OTHER_SCHEMA = "other";
+
+ private static final String CREATE_PREFERRED_TABLE =
+ "create table " + PREFERRED_SCHEMA + "." + TABLE + "\n"
+ + "(\n"
+ + " id int generated always as identity primary key,\n"
+ + " name text not null,\n"
+ + " value text not null,\n"
+ + " date timestamp not null default current_timestamp\n"
+ + ")";
+ private static final String POPULATE_PREFERRED_TABLE =
+ "insert into " + PREFERRED_SCHEMA + "." + TABLE + " (name, value) values\n"
+ + "('clef', 'bass')";
+ private static final String CREATE_OTHER_TABLE =
+ "create table " + OTHER_SCHEMA + "." + TABLE + "\n"
+ + "(\n"
+ + " name text not null"
+ + ")";
+ private static final String POPULATE_OTHER_TABLE =
+ "insert into " + OTHER_SCHEMA + "." + TABLE + " (name) values\n"
+ + "('Rapu')"; // 🦀
+
+ @Test
+ public void testSingleTable() throws Exception {
+ createTopic(TABLE, 1);
+ consumer.assign(Collections.singleton(new TopicPartition(TABLE, 0)));
+ // Make sure that the topic starts empty
+ assertEmptyPoll(Duration.ofSeconds(1));
+
+ executeUpdate(createSchema(PREFERRED_SCHEMA));
+ executeUpdate(setSearchPath(postgreSqlContainer.getDatabaseName()));
+ executeUpdate(CREATE_PREFERRED_TABLE);
+ executeUpdate(POPULATE_PREFERRED_TABLE);
+ connectRunner.createConnector(basicTimestampModeSourceConnectorConfig());
+
+ await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(100))
+ .until(this::assertSingleNewRecordProduced);
+ }
+
+ @Test
+ public void testMultipleTablesTimestampMode() throws Exception {
+ testMultipleTables(basicTimestampModeSourceConnectorConfig());
+ }
+
+ @Test
+ public void testMultipleTablesIncrementingMode() throws Exception {
+ final Map connectorConfig = basicSourceConnectorConfig();
+ connectorConfig.put("mode", "incrementing");
+ connectorConfig.put("incrementing.column.name", "id");
+ testMultipleTables(connectorConfig);
+ }
+
+ @Test
+ public void testMultipleTablesTimestampIncrementingMode() throws Exception {
+ final Map connectorConfig = basicSourceConnectorConfig();
+ connectorConfig.put("mode", "timestamp+incrementing");
+ connectorConfig.put("incrementing.column.name", "id");
+ connectorConfig.put("timestamp.column.name", "date");
+ testMultipleTables(connectorConfig);
+ }
+
+ private void testMultipleTables(final Map connectorConfig) throws Exception {
+ createTopic(TABLE, 1);
+ consumer.assign(Collections.singleton(new TopicPartition(TABLE, 0)));
+ // Make sure that the topic starts empty
+ assertEmptyPoll(Duration.ofSeconds(1));
+
+ executeUpdate(createSchema(PREFERRED_SCHEMA));
+ executeUpdate(createSchema(OTHER_SCHEMA));
+ executeUpdate(setSearchPath(postgreSqlContainer.getDatabaseName()));
+ executeUpdate(CREATE_PREFERRED_TABLE);
+ executeUpdate(POPULATE_PREFERRED_TABLE);
+ executeUpdate(CREATE_OTHER_TABLE);
+ executeUpdate(POPULATE_OTHER_TABLE);
+ connectRunner.createConnector(connectorConfig);
+
+ await().atMost(Duration.ofSeconds(5)).pollInterval(Duration.ofMillis(100))
+ .until(this::assertSingleNewRecordProduced);
+
+ executeUpdate(POPULATE_OTHER_TABLE);
+
+ // Make sure that, even after adding another row to the other table, the connector
+ // doesn't publish any new records
+ assertEmptyPoll(Duration.ofSeconds(5));
+
+ // Add one more row to the preferred table, and verify that the connector
+ // is able to read it
+ executeUpdate(POPULATE_PREFERRED_TABLE);
+ await().atMost(Duration.ofSeconds(5)).pollInterval(Duration.ofMillis(100))
+ .until(this::assertSingleNewRecordProduced);
+
+ // Restart the connector, to ensure that offsets are tracked correctly
+ connectRunner.restartTask(CONNECTOR_NAME, 0);
+
+ // Add one more row to the preferred table, and verify that the connector
+ // is able to read it
+ executeUpdate(POPULATE_PREFERRED_TABLE);
+ await().atMost(Duration.ofSeconds(5)).pollInterval(Duration.ofMillis(100))
+ .until(this::assertSingleNewRecordProduced);
+
+ // Make sure that the connector doesn't publish any more records
+ assertEmptyPoll(Duration.ofSeconds(5));
+ }
+
+ private void assertEmptyPoll(final Duration duration) {
+ final ConsumerRecords, ?> records = consumer.poll(duration);
+ assertEquals(ConsumerRecords.empty(), records);
+ }
+
+ private boolean assertSingleNewRecordProduced() {
+ final ConsumerRecords records = consumer.poll(Duration.ofSeconds(1));
+ if (records.isEmpty()) {
+ return false;
+ }
+ assertEquals(1, records.count(), "Connector should only have produced one new record to Kafka");
+ for (final ConsumerRecord record : records) {
+ final Schema valueSchema = record.value().getSchema();
+ final Set actualFieldNames = valueSchema.getFields().stream()
+ .map(Schema.Field::name)
+ .collect(Collectors.toSet());
+ final Set expectedFieldNames = Set.of("id", "name", "value", "date");
+ assertEquals(
+ expectedFieldNames,
+ actualFieldNames,
+ "Records produced by the connector do not have a schema that matches "
+ + " the schema of the table it should have read from"
+ );
+ }
+ return true;
+ }
+
+ private Map basicTimestampModeSourceConnectorConfig() {
+ final Map config = basicSourceConnectorConfig();
+ config.put("mode", "timestamp");
+ config.put("timestamp.column.name", "date");
+ return config;
+ }
+
+ private Map basicSourceConnectorConfig() {
+ final Map config = super.basicConnectorConfig();
+ config.put("name", CONNECTOR_NAME);
+ config.put("topic.prefix", "");
+ config.put("qualify.table.names", "false");
+ config.put("poll.interval.ms", "1000"); // Poll quickly for shorter tests
+ config.put("whitelist", TABLE);
+ config.put("connector.class", JdbcSourceConnector.class.getName());
+ config.put("dialect.name", "PostgreSqlDatabaseDialect");
+ return config;
+ }
+
+ private static String createSchema(final String schema) {
+ return "CREATE SCHEMA " + schema;
+ }
+
+ private static String setSearchPath(final String database) {
+ return "ALTER DATABASE " + database + " SET search_path TO "
+ + PREFERRED_SCHEMA + "," + OTHER_SCHEMA;
+ }
+
+}
diff --git a/src/main/java/io/aiven/connect/jdbc/JdbcSourceConnector.java b/src/main/java/io/aiven/connect/jdbc/JdbcSourceConnector.java
index 86279289..cfd1f44f 100644
--- a/src/main/java/io/aiven/connect/jdbc/JdbcSourceConnector.java
+++ b/src/main/java/io/aiven/connect/jdbc/JdbcSourceConnector.java
@@ -103,6 +103,7 @@ public void start(final Map properties) throws ConnectException
Set whitelistSet = whitelist.isEmpty() ? null : new HashSet<>(whitelist);
final List blacklist = config.getList(JdbcSourceConnectorConfig.TABLE_BLACKLIST_CONFIG);
final Set blacklistSet = blacklist.isEmpty() ? null : new HashSet<>(blacklist);
+ final boolean qualifyTableNames = config.getBoolean(JdbcSourceConnectorConfig.QUALIFY_TABLE_NAMES_CONFIG);
if (whitelistSet != null && blacklistSet != null) {
throw new ConnectException(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG + " and "
@@ -120,13 +121,35 @@ public void start(final Map properties) throws ConnectException
whitelistSet = Collections.emptySet();
}
+ final String mode = config.getMode();
+ if (JdbcSourceConnectorConfig.MODE_INCREMENTING.equals(mode)
+ || JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING.equals(mode)
+ ) {
+ final String incrementingColumn =
+ config.getString(JdbcSourceConnectorConfig.INCREMENTING_COLUMN_NAME_CONFIG);
+ if (!qualifyTableNames && (incrementingColumn == null || incrementingColumn.isEmpty())) {
+ // Otherwise, we may infer the wrong incrementing key for the table
+ // TODO: This restraint is not necessary in all cases, but additional logic will be required to
+ // distinguish when it is and is not, and without that logic, the connector will fail to query
+ // tables by trying to read a non-existent column, which is likely to be very confusing to users.
+ // This is still technically possible even with explicitly-specified column names, but that
+ // can happen regardless of whether unqualified table names are used
+ throw new ConnectException(
+ "When using unqualified table names and either " + JdbcSourceConnectorConfig.MODE_INCREMENTING
+ + " or " + JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING + " mode, an "
+ + "incrementing column name must be explicitly provided via the '"
+ + JdbcSourceConnectorConfig.INCREMENTING_COLUMN_NAME_CONFIG + "' property."
+ );
+ }
+ }
tableMonitorThread = new TableMonitorThread(
dialect,
cachedConnectionProvider,
context,
tablePollMs,
whitelistSet,
- blacklistSet
+ blacklistSet,
+ qualifyTableNames
);
tableMonitorThread.start();
}
diff --git a/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceConnectorConfig.java b/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceConnectorConfig.java
index e54f02bc..57606862 100644
--- a/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceConnectorConfig.java
+++ b/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceConnectorConfig.java
@@ -262,6 +262,17 @@ public class JdbcSourceConnectorConfig extends JdbcConfig {
+ "In most cases it only makes sense to have either TABLE or VIEW.";
private static final String TABLE_TYPE_DISPLAY = "Table Types";
+
+ public static final String QUALIFY_TABLE_NAMES_CONFIG = "qualify.table.names";
+ private static final String QUALIFY_TABLE_NAMES_DOC =
+ "Whether to use fully-qualified table names when querying the database. If disabled, "
+ + "queries will be performed with unqualified table names. This may be useful if the "
+ + "database has been configured with a search path to automatically direct unqualified "
+ + "queries to the correct table when there are multiple tables available with the same "
+ + "unqualified name";
+ public static final boolean QUALIFY_TABLE_NAMES_DEFAULT = true;
+ private static final String QUALIFY_TABLE_NAMES_DISPLAY = "Qualify table names";
+
public static ConfigDef baseConfigDef() {
final ConfigDef config = new ConfigDef();
addDatabaseOptions(config);
@@ -361,6 +372,16 @@ private static final void addDatabaseOptions(final ConfigDef config) {
Width.SHORT,
NUMERIC_MAPPING_DISPLAY,
NUMERIC_MAPPING_RECOMMENDER
+ ).define(
+ QUALIFY_TABLE_NAMES_CONFIG,
+ Type.BOOLEAN,
+ QUALIFY_TABLE_NAMES_DEFAULT,
+ Importance.LOW,
+ QUALIFY_TABLE_NAMES_DOC,
+ DATABASE_GROUP,
+ ++orderInGroup,
+ Width.SHORT,
+ QUALIFY_TABLE_NAMES_DISPLAY
);
defineDbTimezone(config, ++orderInGroup);
diff --git a/src/main/java/io/aiven/connect/jdbc/source/TableMonitorThread.java b/src/main/java/io/aiven/connect/jdbc/source/TableMonitorThread.java
index 22a51cc0..cd56e933 100644
--- a/src/main/java/io/aiven/connect/jdbc/source/TableMonitorThread.java
+++ b/src/main/java/io/aiven/connect/jdbc/source/TableMonitorThread.java
@@ -20,6 +20,7 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -37,6 +38,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static io.aiven.connect.jdbc.source.JdbcSourceConnectorConfig.QUALIFY_TABLE_NAMES_CONFIG;
+
/**
* Thread that monitors the database for changes to the set of tables in the database that this
* connector should load data from.
@@ -49,8 +52,9 @@ public class TableMonitorThread extends Thread {
private final ConnectorContext context;
private final CountDownLatch shutdownLatch;
private final long pollMs;
- private Set whitelist;
- private Set blacklist;
+ private final Set whitelist;
+ private final Set blacklist;
+ private final boolean qualifyTableNames;
private List tables;
private Map> duplicates;
@@ -59,7 +63,8 @@ public TableMonitorThread(final DatabaseDialect dialect,
final ConnectorContext context,
final long pollMs,
final Set whitelist,
- final Set blacklist
+ final Set blacklist,
+ final boolean qualifyTableNames
) {
this.dialect = dialect;
this.connectionProvider = connectionProvider;
@@ -68,8 +73,8 @@ public TableMonitorThread(final DatabaseDialect dialect,
this.pollMs = pollMs;
this.whitelist = whitelist;
this.blacklist = blacklist;
+ this.qualifyTableNames = qualifyTableNames;
this.tables = null;
-
}
@Override
@@ -113,7 +118,7 @@ public synchronized List tables() {
if (tables == null) {
throw new ConnectException("Tables could not be updated quickly enough.");
}
- if (!duplicates.isEmpty()) {
+ if (qualifyTableNames && !duplicates.isEmpty()) {
final String configText;
if (whitelist != null) {
configText = "'" + JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG + "'";
@@ -128,7 +133,8 @@ public synchronized List tables() {
+ "the topic and downstream processing errors. To prevent such processing errors, the "
+ "JDBC Source connector fails to start when it detects duplicate table name "
+ "configurations. Update the connector's " + configText + " config to include exactly "
- + "one table in each of the tables listed below.\n\t";
+ + "one table in each of the tables listed below or, to use unqualified table names, consider "
+ + "setting " + QUALIFY_TABLE_NAMES_CONFIG + " to 'false'.\n\t";
throw new ConnectException(msg + duplicates.values());
}
return tables;
@@ -177,22 +183,35 @@ private synchronized boolean updateTables() {
filteredTables.addAll(tables);
}
- if (!filteredTables.equals(this.tables)) {
+ final List newTables;
+ if (!qualifyTableNames) {
+ newTables = filteredTables.stream()
+ .map(TableId::unqualified)
+ .distinct()
+ .collect(Collectors.toList());
+ } else {
+ newTables = filteredTables;
+ }
+
+ if (!newTables.equals(this.tables)) {
log.info(
"After filtering the tables are: {}",
dialect.expressionBuilder()
.appendList()
.delimitedBy(",")
- .of(filteredTables)
+ .of(newTables)
);
- final Map> duplicates = filteredTables.stream()
- .collect(Collectors.groupingBy(TableId::tableName))
- .entrySet().stream()
- .filter(entry -> entry.getValue().size() > 1)
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- this.duplicates = duplicates;
+ if (qualifyTableNames) {
+ this.duplicates = newTables.stream()
+ .collect(Collectors.groupingBy(TableId::tableName))
+ .entrySet().stream()
+ .filter(entry -> entry.getValue().size() > 1)
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ } else {
+ this.duplicates = Collections.emptyMap();
+ }
final List previousTables = this.tables;
- this.tables = filteredTables;
+ this.tables = newTables;
notifyAll();
// Only return true if the table list wasn't previously null, i.e. if this was not the
// first table lookup
diff --git a/src/main/java/io/aiven/connect/jdbc/util/TableId.java b/src/main/java/io/aiven/connect/jdbc/util/TableId.java
index 5f5af3a3..ad691fd4 100644
--- a/src/main/java/io/aiven/connect/jdbc/util/TableId.java
+++ b/src/main/java/io/aiven/connect/jdbc/util/TableId.java
@@ -49,6 +49,10 @@ public String tableName() {
return tableName;
}
+ public TableId unqualified() {
+ return new TableId(null, null, this.tableName);
+ }
+
@Override
public void appendTo(final ExpressionBuilder builder, final boolean useQuotes) {
if (catalogName != null) {
diff --git a/src/test/java/io/aiven/connect/jdbc/JdbcSourceConnectorTest.java b/src/test/java/io/aiven/connect/jdbc/JdbcSourceConnectorTest.java
index 990ddd58..1fed786d 100644
--- a/src/test/java/io/aiven/connect/jdbc/JdbcSourceConnectorTest.java
+++ b/src/test/java/io/aiven/connect/jdbc/JdbcSourceConnectorTest.java
@@ -50,6 +50,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
@RunWith(PowerMockRunner.class)
@PrepareForTest({JdbcSourceConnector.class, DatabaseDialect.class})
@@ -171,6 +172,29 @@ public void testPartitioningManyTables() throws Exception {
connector.stop();
}
+ @Test
+ public void testPartitioningUnqualifiedTables() throws Exception {
+ connProps.put(JdbcSourceConnectorConfig.QUALIFY_TABLE_NAMES_CONFIG, "false");
+ // Tests distributing tables across multiple tasks, in this case unevenly
+ db.createTable("test1", "id", "INT NOT NULL");
+ db.createTable("test2", "id", "INT NOT NULL");
+ db.createTable("test3", "id", "INT NOT NULL");
+ db.createTable("test4", "id", "INT NOT NULL");
+ connector.start(connProps);
+ final List