From 6dc7f4a6ea78e61c73f26978fac6b1eac050d599 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Wed, 8 Mar 2023 11:32:48 -0500 Subject: [PATCH] Add support for unqualified table names in source connector --- checkstyle/suppressions.xml | 4 +- .../aiven/kafka/connect/jdbc/AbstractIT.java | 30 ++- .../kafka/connect/jdbc/ConnectRunner.java | 17 ++ .../jdbc/postgres/AbstractPostgresIT.java | 76 +++++++ .../PartitionedTableIntegrationTest.java | 57 +---- .../UnqualifiedTableNamesIntegrationTest.java | 202 ++++++++++++++++++ .../connect/jdbc/JdbcSourceConnector.java | 25 ++- .../source/JdbcSourceConnectorConfig.java | 21 ++ .../jdbc/source/TableMonitorThread.java | 49 +++-- .../io/aiven/connect/jdbc/util/TableId.java | 4 + .../connect/jdbc/JdbcSourceConnectorTest.java | 49 ++++- .../jdbc/source/TableMonitorThreadTest.java | 115 +++++++--- 12 files changed, 543 insertions(+), 106 deletions(-) create mode 100644 src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/AbstractPostgresIT.java create mode 100644 src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/UnqualifiedTableNamesIntegrationTest.java diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 65875836..5b8dc05d 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -22,13 +22,13 @@ + files="(BufferedRecords|DataConverter|DatabaseDialect|FieldsMetadata|HanaDialect|JdbcSourceTask|JdbcSourceConnector|MySqlDatabaseDialect|OracleDatabaseDialect|PostgreSqlDatabaseDialect|PreparedStatementBinder|SqlServerDatabaseDialect|SqliteDatabaseDialect|TimestampIncrementingTableQuerier|VerticaDatabaseDialect|SapHanaDatabaseDialect|TableId|ColumnDefinition|TableMonitorThread).java"/> + files="(DataConverter|FieldsMetadata|JdbcSourceTask|JdbcSourceConnector|GenericDatabaseDialect).java"/> diff --git a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/AbstractIT.java b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/AbstractIT.java index cc98c105..c1ababb5 100644 --- a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/AbstractIT.java +++ b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/AbstractIT.java @@ -27,6 +27,8 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -51,6 +53,7 @@ public abstract class AbstractIT { DockerImageName.parse("confluentinc/cp-kafka") .withTag(DEFAULT_KAFKA_TAG); protected static KafkaProducer producer; + protected static KafkaConsumer consumer; @Container protected KafkaContainer kafkaContainer = new KafkaContainer(DEFAULT_IMAGE_NAME) .withNetwork(Network.newNetwork()) @@ -69,6 +72,7 @@ void startKafka() throws Exception { final Path pluginDir = setupPluginDir(); setupKafkaConnect(pluginDir); producer = createProducer(); + consumer = createConsumer(); } private static Path setupPluginDir() throws Exception { @@ -85,15 +89,19 @@ private static Path setupPluginDir() throws Exception { return pluginDir; } - private void setupKafka() throws Exception { - LOGGER.info("Setup Kafka"); + + protected void createTopic(final String topic, final int numPartitions) throws Exception { try (final AdminClient adminClient = createAdminClient()) { - LOGGER.info("Create topic {}", TEST_TOPIC_NAME); - final NewTopic newTopic = new NewTopic(TEST_TOPIC_NAME, 4, (short) 1); + LOGGER.info("Create topic {}", topic); + final NewTopic newTopic = new NewTopic(topic, numPartitions, (short) 1); adminClient.createTopics(List.of(newTopic)).all().get(); } } + private void setupKafka() throws Exception { + createTopic(TEST_TOPIC_NAME, 4); + } + protected AdminClient createAdminClient() { final Properties adminClientConfig = new Properties(); adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); @@ -118,10 +126,24 @@ protected KafkaProducer createProducer() { return new KafkaProducer<>(producerProps); } + protected KafkaConsumer createConsumer() { + LOGGER.info("Create kafka consumer"); + final Map consumerProps = new HashMap<>(); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "io.confluent.kafka.serializers.KafkaAvroDeserializer"); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "io.confluent.kafka.serializers.KafkaAvroDeserializer"); + consumerProps.put("schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl()); + return new KafkaConsumer<>(consumerProps); + } + @AfterEach final void tearDown() { connectRunner.stop(); producer.close(); + consumer.close(); connectRunner.awaitStop(); } diff --git a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/ConnectRunner.java b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/ConnectRunner.java index 126e1ebb..86c0e20d 100644 --- a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/ConnectRunner.java +++ b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/ConnectRunner.java @@ -31,6 +31,7 @@ import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneHerder; import org.apache.kafka.connect.storage.MemoryOffsetBackingStore; +import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.FutureCallback; import org.slf4j.Logger; @@ -102,6 +103,22 @@ public void createConnector(final Map config) throws ExecutionEx assert connectorInfoCreated.created(); } + public void restartTask(final String connector, final int task) throws ExecutionException, InterruptedException { + assert herder != null; + + final FutureCallback cb = new FutureCallback<>( + (error, ignored) -> { + if (error != null) { + LOGGER.error("Failed to restart task {}-{}", connector, task, error); + } else { + LOGGER.info("Restarted task {}-{}", connector, task); + } + }); + + herder.restartTask(new ConnectorTaskId(connector, task), cb); + cb.get(); + } + void stop() { connect.stop(); } diff --git a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/AbstractPostgresIT.java b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/AbstractPostgresIT.java new file mode 100644 index 00000000..31b83b9f --- /dev/null +++ b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/AbstractPostgresIT.java @@ -0,0 +1,76 @@ +/* + * Copyright 2022 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.jdbc.postgres; + +import javax.sql.DataSource; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; + +import io.aiven.kafka.connect.jdbc.AbstractIT; + +import org.assertj.core.util.Arrays; +import org.postgresql.ds.PGSimpleDataSource; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.utility.DockerImageName; + +public class AbstractPostgresIT extends AbstractIT { + + public static final String DEFAULT_POSTGRES_TAG = "10.20"; + private static final DockerImageName DEFAULT_POSTGRES_IMAGE_NAME = + DockerImageName.parse("postgres") + .withTag(DEFAULT_POSTGRES_TAG); + + @Container + protected final PostgreSQLContainer postgreSqlContainer = new PostgreSQLContainer<>(DEFAULT_POSTGRES_IMAGE_NAME); + + protected void executeUpdate(final String updateStatement) throws SQLException { + try (final Connection connection = getDatasource().getConnection(); + final Statement statement = connection.createStatement()) { + statement.executeUpdate(updateStatement); + } + } + + protected DataSource getDatasource() { + final PGSimpleDataSource pgSimpleDataSource = new PGSimpleDataSource(); + pgSimpleDataSource.setServerNames(Arrays.array(postgreSqlContainer.getHost())); + pgSimpleDataSource.setPortNumbers(new int[] {postgreSqlContainer.getMappedPort(5432)}); + pgSimpleDataSource.setDatabaseName(postgreSqlContainer.getDatabaseName()); + pgSimpleDataSource.setUser(postgreSqlContainer.getUsername()); + pgSimpleDataSource.setPassword(postgreSqlContainer.getPassword()); + return pgSimpleDataSource; + } + + protected Map basicConnectorConfig() { + final HashMap config = new HashMap<>(); + config.put("key.converter", "io.confluent.connect.avro.AvroConverter"); + config.put("key.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl()); + config.put("value.converter", "io.confluent.connect.avro.AvroConverter"); + config.put("value.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl()); + config.put("tasks.max", "1"); + config.put("connection.url", postgreSqlContainer.getJdbcUrl()); + config.put("connection.user", postgreSqlContainer.getUsername()); + config.put("connection.password", postgreSqlContainer.getPassword()); + config.put("dialect.name", "PostgreSqlDatabaseDialect"); + return config; + } + +} diff --git a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/PartitionedTableIntegrationTest.java b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/PartitionedTableIntegrationTest.java index c003e6f6..67a33173 100644 --- a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/PartitionedTableIntegrationTest.java +++ b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/PartitionedTableIntegrationTest.java @@ -16,14 +16,9 @@ package io.aiven.kafka.connect.jdbc.postgres; -import javax.sql.DataSource; - -import java.sql.Connection; import java.sql.SQLException; -import java.sql.Statement; import java.time.Duration; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -33,27 +28,18 @@ import org.apache.kafka.clients.producer.RecordMetadata; import io.aiven.connect.jdbc.JdbcSinkConnector; -import io.aiven.kafka.connect.jdbc.AbstractIT; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; -import org.assertj.core.util.Arrays; import org.assertj.db.type.Table; import org.junit.jupiter.api.Test; -import org.postgresql.ds.PGSimpleDataSource; -import org.testcontainers.containers.PostgreSQLContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; import static org.apache.avro.generic.GenericData.Record; import static org.assertj.db.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -@Testcontainers -public class PartitionedTableIntegrationTest extends AbstractIT { +public class PartitionedTableIntegrationTest extends AbstractPostgresIT { - public static final String DEFAULT_POSTGRES_TAG = "10.20"; private static final String CONNECTOR_NAME = "test-sink-connector"; private static final int TEST_TOPIC_PARTITIONS = 1; private static final Schema VALUE_RECORD_SCHEMA = @@ -82,17 +68,11 @@ public class PartitionedTableIntegrationTest extends AbstractIT { private static final String CREATE_PARTITION = "create table partition partition of \"" + TEST_TOPIC_NAME + "\" for values from ('2022-03-03') to ('2122-03-03');"; - private static final DockerImageName DEFAULT_POSTGRES_IMAGE_NAME = - DockerImageName.parse("postgres") - .withTag(DEFAULT_POSTGRES_TAG); - - @Container - private final PostgreSQLContainer postgreSqlContainer = new PostgreSQLContainer<>(DEFAULT_POSTGRES_IMAGE_NAME); @Test final void testBasicDelivery() throws ExecutionException, InterruptedException, SQLException { executeUpdate(CREATE_TABLE); - connectRunner.createConnector(basicConnectorConfig()); + connectRunner.createConnector(basicSinkConnectorConfig()); sendTestData(1000); @@ -104,7 +84,7 @@ final void testBasicDelivery() throws ExecutionException, InterruptedException, final void testBasicDeliveryForPartitionedTable() throws ExecutionException, InterruptedException, SQLException { executeUpdate(CREATE_TABLE_WITH_PARTITION); executeUpdate(CREATE_PARTITION); - connectRunner.createConnector(basicConnectorConfig()); + connectRunner.createConnector(basicSinkConnectorConfig()); sendTestData(1000); @@ -112,23 +92,6 @@ final void testBasicDeliveryForPartitionedTable() throws ExecutionException, Int .untilAsserted(() -> assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(1000)); } - private void executeUpdate(final String updateStatement) throws SQLException { - try (final Connection connection = getDatasource().getConnection(); - final Statement statement = connection.createStatement()) { - statement.executeUpdate(updateStatement); - } - } - - public DataSource getDatasource() { - final PGSimpleDataSource pgSimpleDataSource = new PGSimpleDataSource(); - pgSimpleDataSource.setServerNames(Arrays.array(postgreSqlContainer.getHost())); - pgSimpleDataSource.setPortNumbers(new int[] {postgreSqlContainer.getMappedPort(5432)}); - pgSimpleDataSource.setDatabaseName(postgreSqlContainer.getDatabaseName()); - pgSimpleDataSource.setUser(postgreSqlContainer.getUsername()); - pgSimpleDataSource.setPassword(postgreSqlContainer.getPassword()); - return pgSimpleDataSource; - } - private void sendTestData(final int numberOfRecords) throws InterruptedException, ExecutionException { final List> sendFutures = new ArrayList<>(); for (int i = 0; i < numberOfRecords; i++) { @@ -155,21 +118,13 @@ private Record createRecord(final String name, final String value) { return valueRecord; } - private Map basicConnectorConfig() { - final HashMap config = new HashMap<>(); + private Map basicSinkConnectorConfig() { + final Map config = basicConnectorConfig(); config.put("name", CONNECTOR_NAME); config.put("connector.class", JdbcSinkConnector.class.getName()); config.put("topics", TEST_TOPIC_NAME); - config.put("key.converter", "io.confluent.connect.avro.AvroConverter"); - config.put("key.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl()); - config.put("value.converter", "io.confluent.connect.avro.AvroConverter"); - config.put("value.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl()); - config.put("tasks.max", "1"); - config.put("connection.url", postgreSqlContainer.getJdbcUrl()); - config.put("connection.user", postgreSqlContainer.getUsername()); - config.put("connection.password", postgreSqlContainer.getPassword()); config.put("insert.mode", "insert"); - config.put("dialect.name", "PostgreSqlDatabaseDialect"); return config; } + } diff --git a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/UnqualifiedTableNamesIntegrationTest.java b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/UnqualifiedTableNamesIntegrationTest.java new file mode 100644 index 00000000..7beb2213 --- /dev/null +++ b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/UnqualifiedTableNamesIntegrationTest.java @@ -0,0 +1,202 @@ +/* + * Copyright 2022 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.jdbc.postgres; + +import java.time.Duration; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; + +import io.aiven.connect.jdbc.JdbcSourceConnector; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.junit.jupiter.api.Test; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class UnqualifiedTableNamesIntegrationTest extends AbstractPostgresIT { + private static final String CONNECTOR_NAME = "test-source-connector"; + + private static final String TABLE = "dup"; + private static final String PREFERRED_SCHEMA = "preferred"; + private static final String OTHER_SCHEMA = "other"; + + private static final String CREATE_PREFERRED_TABLE = + "create table " + PREFERRED_SCHEMA + "." + TABLE + "\n" + + "(\n" + + " id int generated always as identity primary key,\n" + + " name text not null,\n" + + " value text not null,\n" + + " date timestamp not null default current_timestamp\n" + + ")"; + private static final String POPULATE_PREFERRED_TABLE = + "insert into " + PREFERRED_SCHEMA + "." + TABLE + " (name, value) values\n" + + "('clef', 'bass')"; + private static final String CREATE_OTHER_TABLE = + "create table " + OTHER_SCHEMA + "." + TABLE + "\n" + + "(\n" + + " name text not null" + + ")"; + private static final String POPULATE_OTHER_TABLE = + "insert into " + OTHER_SCHEMA + "." + TABLE + " (name) values\n" + + "('Rapu')"; // 🦀 + + @Test + public void testSingleTable() throws Exception { + createTopic(TABLE, 1); + consumer.assign(Collections.singleton(new TopicPartition(TABLE, 0))); + // Make sure that the topic starts empty + assertEmptyPoll(Duration.ofSeconds(1)); + + executeUpdate(createSchema(PREFERRED_SCHEMA)); + executeUpdate(setSearchPath(postgreSqlContainer.getDatabaseName())); + executeUpdate(CREATE_PREFERRED_TABLE); + executeUpdate(POPULATE_PREFERRED_TABLE); + connectRunner.createConnector(basicTimestampModeSourceConnectorConfig()); + + await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(100)) + .until(this::assertSingleNewRecordProduced); + } + + @Test + public void testMultipleTablesTimestampMode() throws Exception { + testMultipleTables(basicTimestampModeSourceConnectorConfig()); + } + + @Test + public void testMultipleTablesIncrementingMode() throws Exception { + final Map connectorConfig = basicSourceConnectorConfig(); + connectorConfig.put("mode", "incrementing"); + connectorConfig.put("incrementing.column.name", "id"); + testMultipleTables(connectorConfig); + } + + @Test + public void testMultipleTablesTimestampIncrementingMode() throws Exception { + final Map connectorConfig = basicSourceConnectorConfig(); + connectorConfig.put("mode", "timestamp+incrementing"); + connectorConfig.put("incrementing.column.name", "id"); + connectorConfig.put("timestamp.column.name", "date"); + testMultipleTables(connectorConfig); + } + + private void testMultipleTables(final Map connectorConfig) throws Exception { + createTopic(TABLE, 1); + consumer.assign(Collections.singleton(new TopicPartition(TABLE, 0))); + // Make sure that the topic starts empty + assertEmptyPoll(Duration.ofSeconds(1)); + + executeUpdate(createSchema(PREFERRED_SCHEMA)); + executeUpdate(createSchema(OTHER_SCHEMA)); + executeUpdate(setSearchPath(postgreSqlContainer.getDatabaseName())); + executeUpdate(CREATE_PREFERRED_TABLE); + executeUpdate(POPULATE_PREFERRED_TABLE); + executeUpdate(CREATE_OTHER_TABLE); + executeUpdate(POPULATE_OTHER_TABLE); + connectRunner.createConnector(connectorConfig); + + await().atMost(Duration.ofSeconds(5)).pollInterval(Duration.ofMillis(100)) + .until(this::assertSingleNewRecordProduced); + + executeUpdate(POPULATE_OTHER_TABLE); + + // Make sure that, even after adding another row to the other table, the connector + // doesn't publish any new records + assertEmptyPoll(Duration.ofSeconds(5)); + + // Add one more row to the preferred table, and verify that the connector + // is able to read it + executeUpdate(POPULATE_PREFERRED_TABLE); + await().atMost(Duration.ofSeconds(5)).pollInterval(Duration.ofMillis(100)) + .until(this::assertSingleNewRecordProduced); + + // Restart the connector, to ensure that offsets are tracked correctly + connectRunner.restartTask(CONNECTOR_NAME, 0); + + // Add one more row to the preferred table, and verify that the connector + // is able to read it + executeUpdate(POPULATE_PREFERRED_TABLE); + await().atMost(Duration.ofSeconds(5)).pollInterval(Duration.ofMillis(100)) + .until(this::assertSingleNewRecordProduced); + + // Make sure that the connector doesn't publish any more records + assertEmptyPoll(Duration.ofSeconds(5)); + } + + private void assertEmptyPoll(final Duration duration) { + final ConsumerRecords records = consumer.poll(duration); + assertEquals(ConsumerRecords.empty(), records); + } + + private boolean assertSingleNewRecordProduced() { + final ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + if (records.isEmpty()) { + return false; + } + assertEquals(1, records.count(), "Connector should only have produced one new record to Kafka"); + for (final ConsumerRecord record : records) { + final Schema valueSchema = record.value().getSchema(); + final Set actualFieldNames = valueSchema.getFields().stream() + .map(Schema.Field::name) + .collect(Collectors.toSet()); + final Set expectedFieldNames = Set.of("id", "name", "value", "date"); + assertEquals( + expectedFieldNames, + actualFieldNames, + "Records produced by the connector do not have a schema that matches " + + " the schema of the table it should have read from" + ); + } + return true; + } + + private Map basicTimestampModeSourceConnectorConfig() { + final Map config = basicSourceConnectorConfig(); + config.put("mode", "timestamp"); + config.put("timestamp.column.name", "date"); + return config; + } + + private Map basicSourceConnectorConfig() { + final Map config = super.basicConnectorConfig(); + config.put("name", CONNECTOR_NAME); + config.put("topic.prefix", ""); + config.put("qualify.table.names", "false"); + config.put("poll.interval.ms", "1000"); // Poll quickly for shorter tests + config.put("whitelist", TABLE); + config.put("connector.class", JdbcSourceConnector.class.getName()); + config.put("dialect.name", "PostgreSqlDatabaseDialect"); + return config; + } + + private static String createSchema(final String schema) { + return "CREATE SCHEMA " + schema; + } + + private static String setSearchPath(final String database) { + return "ALTER DATABASE " + database + " SET search_path TO " + + PREFERRED_SCHEMA + "," + OTHER_SCHEMA; + } + +} diff --git a/src/main/java/io/aiven/connect/jdbc/JdbcSourceConnector.java b/src/main/java/io/aiven/connect/jdbc/JdbcSourceConnector.java index 86279289..cfd1f44f 100644 --- a/src/main/java/io/aiven/connect/jdbc/JdbcSourceConnector.java +++ b/src/main/java/io/aiven/connect/jdbc/JdbcSourceConnector.java @@ -103,6 +103,7 @@ public void start(final Map properties) throws ConnectException Set whitelistSet = whitelist.isEmpty() ? null : new HashSet<>(whitelist); final List blacklist = config.getList(JdbcSourceConnectorConfig.TABLE_BLACKLIST_CONFIG); final Set blacklistSet = blacklist.isEmpty() ? null : new HashSet<>(blacklist); + final boolean qualifyTableNames = config.getBoolean(JdbcSourceConnectorConfig.QUALIFY_TABLE_NAMES_CONFIG); if (whitelistSet != null && blacklistSet != null) { throw new ConnectException(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG + " and " @@ -120,13 +121,35 @@ public void start(final Map properties) throws ConnectException whitelistSet = Collections.emptySet(); } + final String mode = config.getMode(); + if (JdbcSourceConnectorConfig.MODE_INCREMENTING.equals(mode) + || JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING.equals(mode) + ) { + final String incrementingColumn = + config.getString(JdbcSourceConnectorConfig.INCREMENTING_COLUMN_NAME_CONFIG); + if (!qualifyTableNames && (incrementingColumn == null || incrementingColumn.isEmpty())) { + // Otherwise, we may infer the wrong incrementing key for the table + // TODO: This restraint is not necessary in all cases, but additional logic will be required to + // distinguish when it is and is not, and without that logic, the connector will fail to query + // tables by trying to read a non-existent column, which is likely to be very confusing to users. + // This is still technically possible even with explicitly-specified column names, but that + // can happen regardless of whether unqualified table names are used + throw new ConnectException( + "When using unqualified table names and either " + JdbcSourceConnectorConfig.MODE_INCREMENTING + + " or " + JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING + " mode, an " + + "incrementing column name must be explicitly provided via the '" + + JdbcSourceConnectorConfig.INCREMENTING_COLUMN_NAME_CONFIG + "' property." + ); + } + } tableMonitorThread = new TableMonitorThread( dialect, cachedConnectionProvider, context, tablePollMs, whitelistSet, - blacklistSet + blacklistSet, + qualifyTableNames ); tableMonitorThread.start(); } diff --git a/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceConnectorConfig.java b/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceConnectorConfig.java index e54f02bc..57606862 100644 --- a/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceConnectorConfig.java +++ b/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceConnectorConfig.java @@ -262,6 +262,17 @@ public class JdbcSourceConnectorConfig extends JdbcConfig { + "In most cases it only makes sense to have either TABLE or VIEW."; private static final String TABLE_TYPE_DISPLAY = "Table Types"; + + public static final String QUALIFY_TABLE_NAMES_CONFIG = "qualify.table.names"; + private static final String QUALIFY_TABLE_NAMES_DOC = + "Whether to use fully-qualified table names when querying the database. If disabled, " + + "queries will be performed with unqualified table names. This may be useful if the " + + "database has been configured with a search path to automatically direct unqualified " + + "queries to the correct table when there are multiple tables available with the same " + + "unqualified name"; + public static final boolean QUALIFY_TABLE_NAMES_DEFAULT = true; + private static final String QUALIFY_TABLE_NAMES_DISPLAY = "Qualify table names"; + public static ConfigDef baseConfigDef() { final ConfigDef config = new ConfigDef(); addDatabaseOptions(config); @@ -361,6 +372,16 @@ private static final void addDatabaseOptions(final ConfigDef config) { Width.SHORT, NUMERIC_MAPPING_DISPLAY, NUMERIC_MAPPING_RECOMMENDER + ).define( + QUALIFY_TABLE_NAMES_CONFIG, + Type.BOOLEAN, + QUALIFY_TABLE_NAMES_DEFAULT, + Importance.LOW, + QUALIFY_TABLE_NAMES_DOC, + DATABASE_GROUP, + ++orderInGroup, + Width.SHORT, + QUALIFY_TABLE_NAMES_DISPLAY ); defineDbTimezone(config, ++orderInGroup); diff --git a/src/main/java/io/aiven/connect/jdbc/source/TableMonitorThread.java b/src/main/java/io/aiven/connect/jdbc/source/TableMonitorThread.java index 22a51cc0..cd56e933 100644 --- a/src/main/java/io/aiven/connect/jdbc/source/TableMonitorThread.java +++ b/src/main/java/io/aiven/connect/jdbc/source/TableMonitorThread.java @@ -20,6 +20,7 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -37,6 +38,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static io.aiven.connect.jdbc.source.JdbcSourceConnectorConfig.QUALIFY_TABLE_NAMES_CONFIG; + /** * Thread that monitors the database for changes to the set of tables in the database that this * connector should load data from. @@ -49,8 +52,9 @@ public class TableMonitorThread extends Thread { private final ConnectorContext context; private final CountDownLatch shutdownLatch; private final long pollMs; - private Set whitelist; - private Set blacklist; + private final Set whitelist; + private final Set blacklist; + private final boolean qualifyTableNames; private List tables; private Map> duplicates; @@ -59,7 +63,8 @@ public TableMonitorThread(final DatabaseDialect dialect, final ConnectorContext context, final long pollMs, final Set whitelist, - final Set blacklist + final Set blacklist, + final boolean qualifyTableNames ) { this.dialect = dialect; this.connectionProvider = connectionProvider; @@ -68,8 +73,8 @@ public TableMonitorThread(final DatabaseDialect dialect, this.pollMs = pollMs; this.whitelist = whitelist; this.blacklist = blacklist; + this.qualifyTableNames = qualifyTableNames; this.tables = null; - } @Override @@ -113,7 +118,7 @@ public synchronized List tables() { if (tables == null) { throw new ConnectException("Tables could not be updated quickly enough."); } - if (!duplicates.isEmpty()) { + if (qualifyTableNames && !duplicates.isEmpty()) { final String configText; if (whitelist != null) { configText = "'" + JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG + "'"; @@ -128,7 +133,8 @@ public synchronized List tables() { + "the topic and downstream processing errors. To prevent such processing errors, the " + "JDBC Source connector fails to start when it detects duplicate table name " + "configurations. Update the connector's " + configText + " config to include exactly " - + "one table in each of the tables listed below.\n\t"; + + "one table in each of the tables listed below or, to use unqualified table names, consider " + + "setting " + QUALIFY_TABLE_NAMES_CONFIG + " to 'false'.\n\t"; throw new ConnectException(msg + duplicates.values()); } return tables; @@ -177,22 +183,35 @@ private synchronized boolean updateTables() { filteredTables.addAll(tables); } - if (!filteredTables.equals(this.tables)) { + final List newTables; + if (!qualifyTableNames) { + newTables = filteredTables.stream() + .map(TableId::unqualified) + .distinct() + .collect(Collectors.toList()); + } else { + newTables = filteredTables; + } + + if (!newTables.equals(this.tables)) { log.info( "After filtering the tables are: {}", dialect.expressionBuilder() .appendList() .delimitedBy(",") - .of(filteredTables) + .of(newTables) ); - final Map> duplicates = filteredTables.stream() - .collect(Collectors.groupingBy(TableId::tableName)) - .entrySet().stream() - .filter(entry -> entry.getValue().size() > 1) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - this.duplicates = duplicates; + if (qualifyTableNames) { + this.duplicates = newTables.stream() + .collect(Collectors.groupingBy(TableId::tableName)) + .entrySet().stream() + .filter(entry -> entry.getValue().size() > 1) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } else { + this.duplicates = Collections.emptyMap(); + } final List previousTables = this.tables; - this.tables = filteredTables; + this.tables = newTables; notifyAll(); // Only return true if the table list wasn't previously null, i.e. if this was not the // first table lookup diff --git a/src/main/java/io/aiven/connect/jdbc/util/TableId.java b/src/main/java/io/aiven/connect/jdbc/util/TableId.java index 5f5af3a3..ad691fd4 100644 --- a/src/main/java/io/aiven/connect/jdbc/util/TableId.java +++ b/src/main/java/io/aiven/connect/jdbc/util/TableId.java @@ -49,6 +49,10 @@ public String tableName() { return tableName; } + public TableId unqualified() { + return new TableId(null, null, this.tableName); + } + @Override public void appendTo(final ExpressionBuilder builder, final boolean useQuotes) { if (catalogName != null) { diff --git a/src/test/java/io/aiven/connect/jdbc/JdbcSourceConnectorTest.java b/src/test/java/io/aiven/connect/jdbc/JdbcSourceConnectorTest.java index 990ddd58..1fed786d 100644 --- a/src/test/java/io/aiven/connect/jdbc/JdbcSourceConnectorTest.java +++ b/src/test/java/io/aiven/connect/jdbc/JdbcSourceConnectorTest.java @@ -50,6 +50,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; @RunWith(PowerMockRunner.class) @PrepareForTest({JdbcSourceConnector.class, DatabaseDialect.class}) @@ -171,6 +172,29 @@ public void testPartitioningManyTables() throws Exception { connector.stop(); } + @Test + public void testPartitioningUnqualifiedTables() throws Exception { + connProps.put(JdbcSourceConnectorConfig.QUALIFY_TABLE_NAMES_CONFIG, "false"); + // Tests distributing tables across multiple tasks, in this case unevenly + db.createTable("test1", "id", "INT NOT NULL"); + db.createTable("test2", "id", "INT NOT NULL"); + db.createTable("test3", "id", "INT NOT NULL"); + db.createTable("test4", "id", "INT NOT NULL"); + connector.start(connProps); + final List> configs = connector.taskConfigs(3); + assertEquals(3, configs.size()); + assertTaskConfigsHaveParentConfigs(configs); + + assertEquals(unqualifiedTables("test1", "test2"), configs.get(0).get(JdbcSourceTaskConfig.TABLES_CONFIG)); + assertNull(configs.get(0).get(JdbcSourceTaskConfig.QUERY_CONFIG)); + assertEquals(unqualifiedTables("test3"), configs.get(1).get(JdbcSourceTaskConfig.TABLES_CONFIG)); + assertNull(configs.get(1).get(JdbcSourceTaskConfig.QUERY_CONFIG)); + assertEquals(unqualifiedTables("test4"), configs.get(2).get(JdbcSourceTaskConfig.TABLES_CONFIG)); + assertNull(configs.get(2).get(JdbcSourceTaskConfig.QUERY_CONFIG)); + + connector.stop(); + } + @Test public void testPartitioningQuery() throws Exception { // Tests "partitioning" when config specifies running a custom query @@ -189,12 +213,22 @@ public void testPartitioningQuery() throws Exception { connector.stop(); } - @Test(expected = ConnectException.class) + @Test public void testConflictingQueryTableSettings() { final String sampleQuery = "SELECT foo, bar FROM sample_table"; connProps.put(JdbcSourceConnectorConfig.QUERY_CONFIG, sampleQuery); connProps.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "foo,bar"); - connector.start(connProps); + assertThrows(ConnectException.class, () -> connector.start(connProps)); + + connector = new JdbcSourceConnector(); + connProps.remove(JdbcSourceConnectorConfig.QUERY_CONFIG); + connProps.put(JdbcSourceConnectorConfig.QUALIFY_TABLE_NAMES_CONFIG, "false"); + connProps.put(JdbcSourceConnectorConfig.MODE_CONFIG, JdbcSourceConnectorConfig.MODE_INCREMENTING); + assertThrows(ConnectException.class, () -> connector.start(connProps)); + + connector = new JdbcSourceConnector(); + connProps.put(JdbcSourceConnectorConfig.MODE_CONFIG, JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING); + assertThrows(ConnectException.class, () -> connector.start(connProps)); } private void assertTaskConfigsHaveParentConfigs(final List> configs) { @@ -205,9 +239,18 @@ private void assertTaskConfigsHaveParentConfigs(final List> } private String tables(final String... names) { + return tables(true, names); + } + + private String unqualifiedTables(final String... names) { + return tables(false, names); + } + + private String tables(final boolean qualified, final String... names) { + final String schema = qualified ? "APP" : null; final List tableIds = new ArrayList<>(); for (final String name : names) { - tableIds.add(new TableId(null, "APP", name)); + tableIds.add(new TableId(null, schema, name)); } final ExpressionBuilder builder = ExpressionBuilder.create(); builder.appendList().delimitedBy(",").of(tableIds); diff --git a/src/test/java/io/aiven/connect/jdbc/source/TableMonitorThreadTest.java b/src/test/java/io/aiven/connect/jdbc/source/TableMonitorThreadTest.java index 1554d405..20c3c18e 100644 --- a/src/test/java/io/aiven/connect/jdbc/source/TableMonitorThreadTest.java +++ b/src/test/java/io/aiven/connect/jdbc/source/TableMonitorThreadTest.java @@ -38,14 +38,18 @@ import org.easymock.IAnswer; import org.junit.Test; import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.powermock.api.easymock.annotation.Mock; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.modules.junit4.PowerMockRunnerDelegate; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; @RunWith(PowerMockRunner.class) +@PowerMockRunnerDelegate(Parameterized.class) @PrepareForTest({JdbcSourceTask.class}) @PowerMockIgnore("javax.management.*") public class TableMonitorThreadTest { @@ -54,6 +58,7 @@ public class TableMonitorThreadTest { private static final TableId FOO = new TableId(null, null, "foo"); private static final TableId BAR = new TableId(null, null, "bar"); private static final TableId BAZ = new TableId(null, null, "baz"); + private static final TableId QUAL = new TableId("fully", "qualified", "name"); private static final TableId DUP1 = new TableId(null, "dup1", "dup"); private static final TableId DUP2 = new TableId(null, "dup2", "dup"); @@ -75,6 +80,9 @@ public class TableMonitorThreadTest { public static final Set VIEW_TABLE_TYPES = Collections.unmodifiableSet( new HashSet<>(Arrays.asList("VIEW")) ); + + private final boolean qualifiedTableNames; + private TableMonitorThread tableMonitorThread; @Mock @@ -86,17 +94,32 @@ public class TableMonitorThreadTest { @Mock private ConnectorContext context; + @Parameterized.Parameters + public static Iterable data() { + return Arrays.asList(false, true); + } + + public TableMonitorThreadTest(final boolean qualifiedTableNames) { + this.qualifiedTableNames = qualifiedTableNames; + } + @Test public void testSingleLookup() throws Exception { EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); - tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - POLL_INTERVAL, null, null); - expectTableNames(LIST_FOO, shutdownThread()); + tableMonitorThread = newTableMonitorThread(null, null); + final String expectedTableName; + if (qualifiedTableNames) { + expectTableNames(LIST_FOO, shutdownThread()); + expectedTableName = "foo"; + } else { + expectTableNames(Collections.singletonList(QUAL), shutdownThread()); + expectedTableName = "name"; + } EasyMock.replay(connectionProvider, dialect); tableMonitorThread.start(); tableMonitorThread.join(); - checkTableNames("foo").execute(); + checkTableNames(expectedTableName).execute(); EasyMock.verify(connectionProvider, dialect); } @@ -105,8 +128,7 @@ public void testSingleLookup() throws Exception { public void testWhitelist() throws Exception { final Set whitelist = new HashSet<>(Arrays.asList("foo", "bar")); EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); - tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - POLL_INTERVAL, whitelist, null); + tableMonitorThread = newTableMonitorThread(whitelist, null); expectTableNames(LIST_FOO_BAR, shutdownThread()); EasyMock.replay(connectionProvider, dialect); @@ -121,8 +143,7 @@ public void testWhitelist() throws Exception { public void testBlacklist() throws Exception { final Set blacklist = new HashSet<>(Arrays.asList("bar", "baz")); EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); - tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - POLL_INTERVAL, null, blacklist); + tableMonitorThread = newTableMonitorThread(null, blacklist); expectTableNames(LIST_FOO_BAR_BAZ, shutdownThread()); EasyMock.replay(connectionProvider, dialect); @@ -136,8 +157,7 @@ public void testBlacklist() throws Exception { @Test public void testReconfigOnUpdate() throws Exception { EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); - tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - POLL_INTERVAL, null, null); + tableMonitorThread = newTableMonitorThread(null, null); expectTableNames(LIST_FOO); expectTableNames(LIST_FOO, checkTableNames("foo")); @@ -162,8 +182,7 @@ public void testReconfigOnUpdate() throws Exception { @Test public void testInvalidConnection() throws Exception { - tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - POLL_INTERVAL, null, null); + tableMonitorThread = newTableMonitorThread(null, null); EasyMock.expect(connectionProvider.getConnection()).andAnswer(new IAnswer() { @Override public Connection answer() throws Throwable { @@ -182,46 +201,61 @@ public Connection answer() throws Throwable { EasyMock.verify(connectionProvider); } - @Test(expected = ConnectException.class) + @Test public void testDuplicates() throws Exception { EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); - tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - POLL_INTERVAL, null, null); + tableMonitorThread = newTableMonitorThread(null, null); expectTableNames(LIST_DUP_WITH_ALL, shutdownThread()); EasyMock.replay(connectionProvider, dialect); tableMonitorThread.start(); tableMonitorThread.join(); - tableMonitorThread.tables(); + + if (qualifiedTableNames) { + assertThrows(ConnectException.class, tableMonitorThread::tables); + } else { + checkTableNames("foo", "bar", "baz", "dup"); + } + EasyMock.verify(connectionProvider, dialect); } - @Test(expected = ConnectException.class) + @Test public void testDuplicateWithUnqualifiedWhitelist() throws Exception { final Set whitelist = new HashSet<>(Arrays.asList("dup")); EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); - tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - POLL_INTERVAL, whitelist, null); + tableMonitorThread = newTableMonitorThread(whitelist, null); expectTableNames(LIST_DUP_ONLY, shutdownThread()); EasyMock.replay(connectionProvider, dialect); tableMonitorThread.start(); tableMonitorThread.join(); - tableMonitorThread.tables(); + + if (qualifiedTableNames) { + assertThrows(ConnectException.class, tableMonitorThread::tables); + } else { + checkTableNames("dup"); + } + EasyMock.verify(connectionProvider, dialect); } - @Test(expected = ConnectException.class) + @Test public void testDuplicateWithUnqualifiedBlacklist() throws Exception { final Set blacklist = new HashSet<>(Arrays.asList("foo")); EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); - tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - POLL_INTERVAL, null, blacklist); + tableMonitorThread = newTableMonitorThread(null, blacklist); expectTableNames(LIST_DUP_WITH_ALL, shutdownThread()); EasyMock.replay(connectionProvider, dialect); tableMonitorThread.start(); tableMonitorThread.join(); - tableMonitorThread.tables(); + + if (qualifiedTableNames) { + assertThrows(ConnectException.class, tableMonitorThread::tables); + } else { + checkTableNames("bar", "baz", "dup"); + } + EasyMock.verify(connectionProvider, dialect); } @@ -229,14 +263,19 @@ public void testDuplicateWithUnqualifiedBlacklist() throws Exception { public void testDuplicateWithQualifiedWhitelist() throws Exception { final Set whitelist = new HashSet<>(Arrays.asList("dup1.dup", "foo")); EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); - tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - POLL_INTERVAL, whitelist, null); + tableMonitorThread = newTableMonitorThread(whitelist, null); expectTableNames(LIST_DUP_WITH_ALL, shutdownThread()); EasyMock.replay(connectionProvider, dialect); tableMonitorThread.start(); tableMonitorThread.join(); - checkTableIds(DUP1, FOO); + + if (qualifiedTableNames) { + checkTableIds(DUP1, FOO); + } else { + checkTableNames("dup", "foo"); + } + EasyMock.verify(connectionProvider, dialect); } @@ -244,17 +283,33 @@ public void testDuplicateWithQualifiedWhitelist() throws Exception { public void testDuplicateWithQualifiedBlacklist() throws Exception { final Set blacklist = new HashSet<>(Arrays.asList("dup1.dup", "foo")); EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); - tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - POLL_INTERVAL, null, blacklist); + tableMonitorThread = newTableMonitorThread(null, blacklist); expectTableNames(LIST_DUP_WITH_ALL, shutdownThread()); EasyMock.replay(connectionProvider, dialect); tableMonitorThread.start(); tableMonitorThread.join(); - checkTableIds(DUP2, BAR, BAZ); + + if (qualifiedTableNames) { + checkTableIds(DUP2, BAR, BAZ); + } else { + checkTableNames("dup", "bar", "baz"); + } EasyMock.verify(connectionProvider, dialect); } + private TableMonitorThread newTableMonitorThread(final Set whitelist, final Set blacklist) { + return new TableMonitorThread( + dialect, + connectionProvider, + context, + POLL_INTERVAL, + whitelist, + blacklist, + qualifiedTableNames + ); + } + private interface Op { void execute(); }