Skip to content

Commit

Permalink
šŸ› Source CockroachDB: fix connector replication failure due to multipā€¦
Browse files Browse the repository at this point in the history
ā€¦le open portals error (#10235)

* fix cockroachdb connector replication failure due to multiple open portals error

* bump connector version

Co-authored-by: marcosmarxm <[email protected]>
  • Loading branch information
lshrinivas and marcosmarxm authored Feb 25, 2022
1 parent e21ec5a commit 658efc8
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@
- name: Cockroachdb
sourceDefinitionId: 9fa5862c-da7c-11eb-8d19-0242ac130003
dockerRepository: airbyte/source-cockroachdb
dockerImageTag: 0.1.9
dockerImageTag: 0.1.10
documentationUrl: https://docs.airbyte.io/integrations/sources/cockroachdb
icon: cockroachdb.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1205,7 +1205,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-cockroachdb:0.1.9"
- dockerImage: "airbyte/source-cockroachdb:0.1.10"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/cockroachdb"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-cockroachdb-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.name=airbyte/source-cockroachdb-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-cockroachdb

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.version=0.1.10
LABEL io.airbyte.name=airbyte/source-cockroachdb
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.PostgresJdbcStreamingQueryConfiguration;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.base.ssh.SshWrappedSource;
Expand Down Expand Up @@ -39,7 +39,7 @@ public class CockroachDbSource extends AbstractJdbcSource<JDBCType> {
public static final List<String> PORT_KEY = List.of("port");

public CockroachDbSource() {
super(DRIVER_CLASS, new PostgresJdbcStreamingQueryConfiguration(), new CockroachJdbcSourceOperations());
super(DRIVER_CLASS, null, new CockroachJdbcSourceOperations());
}

public static Source sshWrappedSource() {
Expand Down Expand Up @@ -110,12 +110,29 @@ protected boolean isNotInternalSchema(JsonNode jsonNode, Set<String> internalSch
return false;
}

@Override
public JdbcDatabase createDatabase(final JsonNode config) throws SQLException {
final JsonNode jdbcConfig = toDatabaseConfig(config);

final JdbcDatabase database = Databases.createJdbcDatabase(
jdbcConfig.get("username").asText(),
jdbcConfig.has("password") ? jdbcConfig.get("password").asText() : null,
jdbcConfig.get("jdbc_url").asText(),
driverClass,
jdbcConfig.has("connection_properties") ? jdbcConfig.get("connection_properties").asText() : null,
sourceOperations);

quoteString = (quoteString == null ? database.getMetaData().getIdentifierQuoteString() : quoteString);

return new CockroachJdbcDatabase(database, sourceOperations);
}

private CheckedFunction<Connection, PreparedStatement, SQLException> getPrivileges(JdbcDatabase database) {
return connection -> {
final PreparedStatement ps = connection.prepareStatement(
"SELECT DISTINCT table_catalog, table_schema, table_name, privilege_type\n"
+ "FROM information_schema.table_privileges\n"
+ "WHERE grantee = ? AND privilege_type in ('SELECT', 'ALL')");
+ "WHERE (grantee = ? AND privilege_type in ('SELECT', 'ALL')) OR (table_schema = 'public')");
ps.setString(1, database.getDatabaseConfig().get("username").asText());
return ps;
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.cockroachdb;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.db.JdbcCompatibleSourceOperations;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration;

import javax.sql.DataSource;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.stream.Stream;

/**
* This implementation uses non-streamed queries to CockroachDB. CockroachDB
* does not currently support multiple active pgwire portals on the same session,
* which makes it impossible to replicate tables that have over ~1000 rows
* using StreamingJdbcDatabase. See: https://go.crdb.dev/issue-v/40195/v21.2
* and in particular, the comment:
* https://github.com/cockroachdb/cockroach/issues/40195?version=v21.2#issuecomment-870570351
* The same situation as kafka-connect applies to StreamingJdbcDatabase
*/
public class CockroachJdbcDatabase
extends JdbcDatabase
{

private final JdbcDatabase database;

public CockroachJdbcDatabase(final JdbcDatabase database,
final JdbcCompatibleSourceOperations<?> sourceOperations) {
super(sourceOperations);
this.database = database;
}

@Override
public DatabaseMetaData getMetaData() throws SQLException {
return database.getMetaData();
}

@Override
public void execute(final CheckedConsumer<Connection, SQLException> query) throws SQLException {
database.execute(query);
}

@Override
public <T> List<T> bufferedResultSetQuery(final CheckedFunction<Connection, ResultSet, SQLException> query,
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException {
return database.bufferedResultSetQuery(query, recordTransform);
}

@Override
public <T> Stream<T> resultSetQuery(final CheckedFunction<Connection, ResultSet, SQLException> query,
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException {
return database.resultSetQuery(query, recordTransform);
}

@Override
public <T> Stream<T> query(final CheckedFunction<Connection, PreparedStatement, SQLException> statementCreator,
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException {
return database.query(statementCreator, recordTransform);
}

@Override
public Stream<JsonNode> query(final String sql, final String... params) throws SQLException {
return bufferedResultSetQuery(connection -> {
final PreparedStatement statement = connection.prepareStatement(sql);
int i = 1;
for (final String param : params) {
statement.setString(i, param);
++i;
}
return statement.executeQuery();
}, sourceOperations::rowToJson).stream();

}

@Override
public void close() throws Exception {
database.close();
}

}
1 change: 1 addition & 0 deletions docs/integrations/sources/cockroachdb.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ Your database user should now be ready for use with Airbyte.

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.10 | 2022-02-24 | [10235](https://github.com/airbytehq/airbyte/pull/10235) | Fix Replication Failure due Multiple portal opens |
| 0.1.9 | 2022-02-21 | [10242](https://github.com/airbytehq/airbyte/pull/10242) | Fixed cursor for old connectors that use non-microsecond format. Now connectors work with both formats |
| 0.1.8 | 2022-02-18 | [10242](https://github.com/airbytehq/airbyte/pull/10242) | Updated timestamp transformation with microseconds |
| 0.1.7 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option |
Expand Down

0 comments on commit 658efc8

Please sign in to comment.