Skip to content

Commit

Permalink
🎉Source MySQL - added option to connect using SSL (airbytehq#6510)
Browse files Browse the repository at this point in the history
* Source MySQL - added option to connect using SSL
  • Loading branch information
etsybaev authored and schlattk committed Jan 4, 2022
1 parent 6e63281 commit e211ab8
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad",
"name": "MySQL",
"dockerRepository": "airbyte/source-mysql",
"dockerImageTag": "0.4.5",
"dockerImageTag": "0.4.6",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql",
"icon": "mysql.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@
- sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
name: MySQL
dockerRepository: airbyte/source-mysql
dockerImageTag: 0.4.5
dockerImageTag: 0.4.6
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
icon: mysql.svg
sourceType: database
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.4.5
LABEL io.airbyte.version=0.4.6

LABEL io.airbyte.name=airbyte/source-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public class MySqlSource extends AbstractJdbcSource implements Source {
public static final String MYSQL_DB_HISTORY = "mysql_db_history";
public static final String CDC_LOG_FILE = "_ab_cdc_log_file";
public static final String CDC_LOG_POS = "_ab_cdc_log_pos";
public static final List<String> SSL_PARAMETERS = List.of(
"useSSL=true",
"requireSSL=true",
"verifyServerCertificate=false");

public MySqlSource() {
super(DRIVER_CLASS, new MySqlJdbcStreamingQueryConfiguration());
Expand Down Expand Up @@ -163,18 +167,24 @@ public AirbyteCatalog discover(JsonNode config) throws Exception {

@Override
public JsonNode toDatabaseConfig(JsonNode config) {
final StringBuilder jdbc_url = new StringBuilder(String.format("jdbc:mysql://%s:%s/%s",
final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:mysql://%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("database").asText()));

// see MySqlJdbcStreamingQueryConfiguration for more context on why useCursorFetch=true is needed.
jdbc_url.append("?useCursorFetch=true");
jdbcUrl.append("?useCursorFetch=true");
if (config.get("jdbc_url_params") != null && !config.get("jdbc_url_params").asText().isEmpty()) {
jdbc_url.append("&").append(config.get("jdbc_url_params").asText());
jdbcUrl.append("&").append(config.get("jdbc_url_params").asText());
}

if (config.has("ssl") && config.get("ssl").asBoolean()) {
jdbcUrl.append("&").append(String.join("&", SSL_PARAMETERS));
}

ImmutableMap.Builder<Object, Object> configBuilder = ImmutableMap.builder()
.put("username", config.get("username").asText())
.put("jdbc_url", jdbc_url.toString());
.put("jdbc_url", jdbcUrl.toString());

if (config.has("password")) {
configBuilder.put("password", config.get("password").asText());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@
"order": 6,
"default": "STANDARD",
"enum": ["STANDARD", "CDC"]
},
"ssl": {
"title": "SSL Connection",
"description": "Encrypt data using SSL.",
"type": "boolean",
"default": true,
"order": 7
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public class MySqlSourceAcceptanceTest extends SourceAcceptanceTest {
private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "public.starships";

private MySQLContainer<?> container;
private JsonNode config;
protected MySQLContainer<?> container;
protected JsonNode config;

@Override
protected void setupEnvironment(TestDestinationEnv environment) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.source.mysql;

import static io.airbyte.integrations.source.mysql.MySqlSource.SSL_PARAMETERS;

import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.source.mysql.MySqlSource.ReplicationMethod;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import org.jooq.SQLDialect;
import org.testcontainers.containers.MySQLContainer;

public class MySqlSslSourceAcceptanceTest extends MySqlSourceAcceptanceTest {

@Override
protected void setupEnvironment(TestDestinationEnv environment) throws Exception {
container = new MySQLContainer<>("mysql:8.0");
container.start();

config = Jsons.jsonNode(ImmutableMap.builder()
.put("host", container.getHost())
.put("port", container.getFirstMappedPort())
.put("database", container.getDatabaseName())
.put("username", container.getUsername())
.put("password", container.getPassword())
.put("ssl", true)
.put("replication_method", ReplicationMethod.STANDARD)
.build());

final Database database = Databases.createDatabase(
config.get("username").asText(),
config.get("password").asText(),
String.format("jdbc:mysql://%s:%s/%s?%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("database").asText(),
String.join("&", SSL_PARAMETERS)),
"com.mysql.cj.jdbc.Driver",
SQLDialect.MYSQL);

database.query(ctx -> {
ctx.fetch("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));");
ctx.fetch(
"INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');");
ctx.fetch("CREATE TABLE starships(id INTEGER, name VARCHAR(200));");
ctx.fetch(
"INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');");
return null;
});

database.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@

class MySqlJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {

private static final String TEST_USER = "test";
private static final String TEST_PASSWORD = "test";
private static MySQLContainer<?> container;
protected static final String TEST_USER = "test";
protected static final String TEST_PASSWORD = "test";
protected static MySQLContainer<?> container;

private JsonNode config;
private Database database;
protected JsonNode config;
protected Database database;

@BeforeAll
static void init() throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.source.mysql;

import static io.airbyte.integrations.source.mysql.MySqlSource.SSL_PARAMETERS;

import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Databases;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.BeforeEach;

class MySqlSslJdbcSourceAcceptanceTest extends MySqlJdbcSourceAcceptanceTest {

@BeforeEach
public void setup() throws Exception {
config = Jsons.jsonNode(ImmutableMap.builder()
.put("host", container.getHost())
.put("port", container.getFirstMappedPort())
.put("database", Strings.addRandomSuffix("db", "_", 10))
.put("username", TEST_USER)
.put("password", TEST_PASSWORD)
.put("ssl", true)
.build());

database = Databases.createDatabase(
config.get("username").asText(),
config.get("password").asText(),
String.format("jdbc:mysql://%s:%s?%s",
config.get("host").asText(),
config.get("port").asText(),
String.join("&", SSL_PARAMETERS)),
MySqlSource.DRIVER_CLASS,

SQLDialect.MYSQL);

database.query(ctx -> {
ctx.fetch("CREATE DATABASE " + config.get("database").asText());
ctx.fetch("SHOW STATUS LIKE 'Ssl_cipher'");
return null;
});
database.close();

super.setup();
}

}
1 change: 1 addition & 0 deletions docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ Using this feature requires additional configuration, when creating the source.

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.4.6 | 2021-09-29 | [6510](https://github.com/airbytehq/airbyte/pull/6510) | Support SSL connection |
| 0.4.5 | 2021-09-17 | [6146](https://github.com/airbytehq/airbyte/pull/6146) | Added option to connect to DB via SSH|
| 0.4.1 | 2021-07-23 | [4956](https://github.com/airbytehq/airbyte/pull/4956) | Fix log link |
| 0.3.7 | 2021-06-09 | [3179](https://github.com/airbytehq/airbyte/pull/3973) | Add AIRBYTE_ENTRYPOINT for Kubernetes support |
Expand Down

0 comments on commit e211ab8

Please sign in to comment.