Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 New Source DB2 secure-only #7792

Merged
merged 19 commits into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*
!Dockerfile
!build
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM airbyte/integration-base-java:dev

WORKDIR /airbyte

ENV APPLICATION source-db2-strict-encrypt

COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/source-db2-strict-encrypt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference)
# for more information about how to configure these tests
connector_image: airbyte/source-db2-strict-encrypt:dev
tests:
spec:
- spec_path: "src/test/resources/expected_spec.json"
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
}

application {
mainClass = 'io.airbyte.integrations.source.db2_strict_encrypt.Db2StrictEncryptSource'
applicationDefaultJvmArgs = ['-XX:MaxRAMPercentage=75.0']
}

dependencies {
implementation project(':airbyte-db:lib')
implementation project(':airbyte-integrations:connectors:source-db2')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-integrations:connectors:source-jdbc')
implementation project(':airbyte-integrations:connectors:source-relational-db')
implementation project(':airbyte-protocol:models')
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)

implementation group: 'com.ibm.db2', name: 'jcc', version: '11.5.5.0'

testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))
testImplementation project(':airbyte-test-utils')
testImplementation "org.testcontainers:db2:1.15.3"

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-db2')
integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
integrationTestJavaImplementation 'org.apache.commons:commons-lang3:3.11'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.db2_strict_encrypt;

import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class Db2JdbcStreamingQueryConfiguration implements
JdbcStreamingQueryConfiguration {

@Override
public void accept(final Connection connection, final PreparedStatement preparedStatement)
throws SQLException {
connection.setAutoCommit(false);
preparedStatement.setFetchSize(1000);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.db2_strict_encrypt;

import com.fasterxml.jackson.databind.node.ArrayNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.base.spec_modification.SpecModifyingSource;
import io.airbyte.integrations.source.db2.Db2Source;
import io.airbyte.protocol.models.ConnectorSpecification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Db2StrictEncryptSource extends SpecModifyingSource implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(Db2StrictEncryptSource.class);
public static final String DRIVER_CLASS = "com.ibm.db2.jcc.DB2Driver";

public Db2StrictEncryptSource() {
super(new Db2Source());
}

@Override
public ConnectorSpecification modifySpec(final ConnectorSpecification originalSpec) {
final ConnectorSpecification spec = Jsons.clone(originalSpec);
// We need to remove the first item from one Of, which is responsible for connecting to the source
// without encrypted.
((ArrayNode) spec.getConnectionSpecification().get("properties").get("encryption").get("oneOf")).remove(0);
return spec;
}

public static void main(final String[] args) throws Exception {
final Source source = new Db2StrictEncryptSource();
LOGGER.info("starting source: {}", Db2StrictEncryptSource.class);
new IntegrationRunner(source).run(args);
LOGGER.info("completed source: {}", Db2StrictEncryptSource.class);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.io.airbyte.integration_tests.sources;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.source.db2.Db2Source;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.testcontainers.containers.Db2Container;

public class Db2StrictEncryptSourceCertificateAcceptanceTest extends SourceAcceptanceTest {

private static final String SCHEMA_NAME = "SOURCE_INTEGRATION_TEST";
private static final String STREAM_NAME1 = "ID_AND_NAME1";
private static final String STREAM_NAME2 = "ID_AND_NAME2";

private static final String TEST_KEY_STORE_PASS = "Passw0rd";
private static final String KEY_STORE_FILE_PATH = "clientkeystore.jks";
private static final String SSL_CONFIG = ":sslConnection=true;sslTrustStoreLocation=" + KEY_STORE_FILE_PATH +
";sslTrustStorePassword=" + TEST_KEY_STORE_PASS + ";";

private Db2Container db;
private JsonNode config;
private JdbcDatabase database;

@Override
protected String getImageName() {
return "airbyte/source-db2-strict-encrypt:dev";
}

@Override
protected ConnectorSpecification getSpec() throws Exception {
return Jsons.deserialize(MoreResources.readResource("expected_spec.json"), ConnectorSpecification.class);
}

@Override
protected JsonNode getConfig() {
return config;
}

@Override
protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("ID"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s.%s", SCHEMA_NAME, STREAM_NAME1),
Field.of("ID", JsonSchemaPrimitive.NUMBER),
Field.of("NAME", JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s.%s", SCHEMA_NAME, STREAM_NAME2),
Field.of("ID", JsonSchemaPrimitive.NUMBER),
Field.of("NAME", JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
}

@Override
protected JsonNode getState() {
return Jsons.jsonNode(new HashMap<>());
}

@Override
protected List<String> getRegexTests() {
return Collections.emptyList();
}

@Override
protected void setupEnvironment(TestDestinationEnv environment) throws Exception {
db = new Db2Container("ibmcom/db2:11.5.5.0").withCommand().acceptLicense()
.withExposedPorts(50000);
db.start();

var certificate = getCertificate();
try {
convertAndImportCertificate(certificate);
} catch (IOException | InterruptedException e) {
throw new RuntimeException("Failed to import certificate into Java Keystore");
}

config = Jsons.jsonNode(ImmutableMap.builder()
.put("host", db.getHost())
.put("port", db.getMappedPort(50000))
.put("db", db.getDatabaseName())
.put("username", db.getUsername())
.put("password", db.getPassword())
.put("encryption", Jsons.jsonNode(ImmutableMap.builder()
.put("encryption_method", "encrypted_verify_certificate")
.put("ssl_certificate", certificate)
.put("key_store_password", TEST_KEY_STORE_PASS)
.build()))
.build());

String jdbcUrl = String.format("jdbc:db2://%s:%s/%s",
config.get("host").asText(),
db.getMappedPort(50000),
config.get("db").asText()) + SSL_CONFIG;

database = Databases.createJdbcDatabase(
config.get("username").asText(),
config.get("password").asText(),
jdbcUrl,
Db2Source.DRIVER_CLASS);

final String createSchemaQuery = String.format("CREATE SCHEMA %s", SCHEMA_NAME);
final String createTableQuery1 = String
.format("CREATE TABLE %s.%s (ID INTEGER, NAME VARCHAR(200))", SCHEMA_NAME, STREAM_NAME1);
final String createTableQuery2 = String
.format("CREATE TABLE %s.%s (ID INTEGER, NAME VARCHAR(200))", SCHEMA_NAME, STREAM_NAME2);
final String insertIntoTableQuery1 = String
.format("INSERT INTO %s.%s (ID, NAME) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash')",
SCHEMA_NAME, STREAM_NAME1);
final String insertIntoTableQuery2 = String
.format("INSERT INTO %s.%s (ID, NAME) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash')",
SCHEMA_NAME, STREAM_NAME2);

database.execute(createSchemaQuery);
database.execute(createTableQuery1);
database.execute(createTableQuery2);
database.execute(insertIntoTableQuery1);
database.execute(insertIntoTableQuery2);

database.close();
}

@Override
protected void tearDown(TestDestinationEnv testEnv) {
new File("certificate.pem").delete();
new File("certificate.der").delete();
new File(KEY_STORE_FILE_PATH).delete();
db.close();
}

/* Helpers */

private String getCertificate() throws IOException, InterruptedException {
// To enable SSL connection on the server, we need to generate self-signed certificates for the server and add them to the configuration.
// Then you need to enable SSL connection and specify on which port it will work. These changes will take effect after restart.
// The certificate for generating a user certificate has the extension *.arm.
db.execInContainer("su", "-", "db2inst1", "-c", "gsk8capicmd_64 -keydb -create -db \"server.kdb\" -pw \"" + TEST_KEY_STORE_PASS + "\" -stash");
andriikorotkov marked this conversation as resolved.
Show resolved Hide resolved
db.execInContainer("su", "-", "db2inst1", "-c", "gsk8capicmd_64 -cert -create -db \"server.kdb\" -pw \"" + TEST_KEY_STORE_PASS
+ "\" -label \"mylabel\" -dn \"CN=testcompany\" -size 2048 -sigalg SHA256_WITH_RSA");
db.execInContainer("su", "-", "db2inst1", "-c", "gsk8capicmd_64 -cert -extract -db \"server.kdb\" -pw \"" + TEST_KEY_STORE_PASS
+ "\" -label \"mylabel\" -target \"server.arm\" -format ascii -fips");

db.execInContainer("su", "-", "db2inst1", "-c", "db2 update dbm cfg using SSL_SVR_KEYDB /database/config/db2inst1/server.kdb");
db.execInContainer("su", "-", "db2inst1", "-c", "db2 update dbm cfg using SSL_SVR_STASH /database/config/db2inst1/server.sth");
db.execInContainer("su", "-", "db2inst1", "-c", "db2 update dbm cfg using SSL_SVR_LABEL mylabel");
db.execInContainer("su", "-", "db2inst1", "-c", "db2 update dbm cfg using SSL_SVCENAME 50000");
db.execInContainer("su", "-", "db2inst1", "-c", "db2set -i db2inst1 DB2COMM=SSL");
db.execInContainer("su", "-", "db2inst1", "-c", "db2stop force");
db.execInContainer("su", "-", "db2inst1", "-c", "db2start");
return db.execInContainer("su", "-", "db2inst1", "-c", "cat server.arm").getStdout();
}

private static void convertAndImportCertificate(String certificate) throws IOException, InterruptedException {
Runtime run = Runtime.getRuntime();
try (PrintWriter out = new PrintWriter("certificate.pem")) {
out.print(certificate);
}
runProcess("openssl x509 -outform der -in certificate.pem -out certificate.der", run);
runProcess(
"keytool -import -alias rds-root -keystore " + KEY_STORE_FILE_PATH + " -file certificate.der -storepass " + TEST_KEY_STORE_PASS
+ " -noprompt",
run);
}

private static void runProcess(String cmd, Runtime run) throws IOException, InterruptedException {
Process pr = run.exec(cmd);
if (!pr.waitFor(30, TimeUnit.SECONDS)) {
pr.destroy();
throw new RuntimeException("Timeout while executing: " + cmd);
}
}

}
Loading