Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish new mssql source #13176

Merged
merged 7 commits into from
May 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@
- name: Microsoft SQL Server (MSSQL)
sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerRepository: airbyte/source-mssql
dockerImageTag: 0.3.22
dockerImageTag: 0.4.0
documentationUrl: https://docs.airbyte.io/integrations/sources/mssql
icon: mssql.svg
sourceType: database
Expand Down
66 changes: 59 additions & 7 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4736,7 +4736,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mssql:0.3.22"
- dockerImage: "airbyte/source-mssql:0.4.0"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/mssql"
connectionSpecification:
Expand All @@ -4748,7 +4748,7 @@
- "port"
- "database"
- "username"
additionalProperties: false
additionalProperties: true
properties:
host:
description: "The hostname of the database."
Expand Down Expand Up @@ -4841,19 +4841,71 @@
description: "Specifies the host name of the server. The value of\
\ this property must match the subject property of the certificate."
order: 7
replication_method:
type: "string"
replication:
type: "object"
title: "Replication Method"
description: "The replication method used for extracting data from the database.\
\ STANDARD replication requires no setup on the DB side but will not be\
\ able to represent deletions incrementally. CDC uses {TBC} to detect\
\ inserts, updates, and deletes. This needs to be configured on the source\
\ database itself."
default: "STANDARD"
enum:
- "STANDARD"
- "CDC"
additionalProperties: true
order: 8
oneOf:
- title: "Standard"
additionalProperties: false
description: "Standard replication requires no setup on the DB side but\
\ will not be able to represent deletions incrementally."
required:
- "replication_type"
properties:
replication_type:
type: "string"
const: "Standard"
enum:
- "Standard"
default: "Standard"
order: 0
- title: "Logical Replication (CDC)"
additionalProperties: false
description: "CDC uses {TBC} to detect inserts, updates, and deletes.\
\ This needs to be configured on the source database itself."
required:
- "replication_type"
properties:
replication_type:
type: "string"
const: "CDC"
enum:
- "CDC"
default: "CDC"
order: 0
data_to_sync:
title: "Data to Sync"
type: "string"
default: "Existing and New"
enum:
- "Existing and New"
- "New Changes Only"
description: "What data should be synced under the CDC. \"Existing\
\ and New\" will read existing data as a snapshot, and sync new\
\ changes through CDC. \"New Changes Only\" will skip the initial\
\ snapshot, and only sync new changes through CDC."
order: 1
snapshot_isolation:
title: "Initial Snapshot Isolation Level"
type: "string"
default: "Snapshot"
enum:
- "Snapshot"
- "Read Committed"
description: "Existing data in the database are synced through an\
\ initial snapshot. This parameter controls the isolation level\
\ that will be used during the initial snapshotting. If you choose\
\ the \"Snapshot\" level, you must enable the <a href=\"https://docs.microsoft.com/en-us/dotnet/framework/data/adonet/sql/snapshot-isolation-in-sql-server\"\
>snapshot isolation mode</a> on the database."
order: 2
tunnel_method:
type: "object"
title: "SSH Tunnel Method"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"title": "MSSQL Source Spec",
"type": "object",
"required": ["host", "port", "database", "username"],
"additionalProperties": false,
"additionalProperties": true,
"properties": {
"host": {
"description": "The hostname of the database.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.debezium.annotation.VisibleForTesting;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MssqlCdcHelper {

Expand Down Expand Up @@ -90,28 +88,22 @@ public static DataToSync from(final String value) {

}

private static final Logger LOGGER = LoggerFactory.getLogger(MssqlCdcHelper.class);

@VisibleForTesting
static boolean isCdc(final JsonNode config) {
// legacy replication method config before version 0.4.0
if (config.hasNonNull(LEGACY_REPLICATION_FIELD)) {
return ReplicationMethod.valueOf(config.get(LEGACY_REPLICATION_FIELD).asText()) == ReplicationMethod.CDC;
}
// new replication method config since version 0.4.0
if (config.hasNonNull(REPLICATION_FIELD)) {
final JsonNode replicationConfig = config.get(REPLICATION_FIELD);
return ReplicationMethod.valueOf(replicationConfig.get(REPLICATION_TYPE_FIELD).asText()) == ReplicationMethod.CDC;
}
// legacy replication method config before version 0.4.0
if (config.hasNonNull(LEGACY_REPLICATION_FIELD)) {
return ReplicationMethod.valueOf(config.get(LEGACY_REPLICATION_FIELD).asText()) == ReplicationMethod.CDC;
}
return false;
}

@VisibleForTesting
static SnapshotIsolation getSnapshotIsolationConfig(final JsonNode config) {
// legacy replication method config before version 0.4.0
if (config.hasNonNull(LEGACY_REPLICATION_FIELD)) {
return SnapshotIsolation.SNAPSHOT;
}
// new replication method config since version 0.4.0
if (config.hasNonNull(REPLICATION_FIELD)) {
final JsonNode replicationConfig = config.get(REPLICATION_FIELD);
Expand All @@ -123,10 +115,6 @@ static SnapshotIsolation getSnapshotIsolationConfig(final JsonNode config) {

@VisibleForTesting
static DataToSync getDataToSyncConfig(final JsonNode config) {
// legacy replication method config before version 0.4.0
if (config.hasNonNull(LEGACY_REPLICATION_FIELD)) {
return DataToSync.EXISTING_AND_NEW;
}
// new replication method config since version 0.4.0
if (config.hasNonNull(REPLICATION_FIELD)) {
final JsonNode replicationConfig = config.get(REPLICATION_FIELD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"title": "MSSQL Source Spec",
"type": "object",
"required": ["host", "port", "database", "username"],
"additionalProperties": false,
"additionalProperties": true,
"properties": {
"host": {
"description": "The hostname of the database.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,20 @@ public void testIsCdc() {
"data_to_sync", "Existing and New",
"snapshot_isolation", "Snapshot"))));
assertTrue(MssqlCdcHelper.isCdc(newCdc));

// migration from legacy to new config
final JsonNode mixNonCdc = Jsons.jsonNode(Map.of(
"replication_method", "CDC",
"replication", Jsons.jsonNode(Map.of("replication_type", "STANDARD"))));
assertFalse(MssqlCdcHelper.isCdc(mixNonCdc));

final JsonNode mixCdc = Jsons.jsonNode(Map.of(
"replication_method", "Standard",
"replication", Jsons.jsonNode(Map.of(
"replication_type", "CDC",
"data_to_sync", "Existing and New",
"snapshot_isolation", "Snapshot"))));
assertTrue(MssqlCdcHelper.isCdc(mixCdc));
}

@Test
Expand All @@ -58,6 +72,23 @@ public void testGetSnapshotIsolation() {
"data_to_sync", "Existing and New",
"snapshot_isolation", "Snapshot"))));
assertEquals(SnapshotIsolation.SNAPSHOT, MssqlCdcHelper.getSnapshotIsolationConfig(newCdcSnapshot));

// migration from legacy to new config
final JsonNode mixCdcNonSnapshot = Jsons.jsonNode(Map.of(
"replication_method", "Standard",
"replication", Jsons.jsonNode(Map.of(
"replication_type", "CDC",
"data_to_sync", "Existing and New",
"snapshot_isolation", "Read Committed"))));
assertEquals(SnapshotIsolation.READ_COMMITTED, MssqlCdcHelper.getSnapshotIsolationConfig(mixCdcNonSnapshot));

final JsonNode mixCdcSnapshot = Jsons.jsonNode(Map.of(
"replication_method", "Standard",
"replication", Jsons.jsonNode(Map.of(
"replication_type", "CDC",
"data_to_sync", "Existing and New",
"snapshot_isolation", "Snapshot"))));
assertEquals(SnapshotIsolation.SNAPSHOT, MssqlCdcHelper.getSnapshotIsolationConfig(mixCdcSnapshot));
}

@Test
Expand All @@ -79,6 +110,23 @@ public void testGetDataToSyncConfig() {
"data_to_sync", "New Changes Only",
"snapshot_isolation", "Snapshot"))));
assertEquals(DataToSync.NEW_CHANGES_ONLY, MssqlCdcHelper.getDataToSyncConfig(newCdcNewOnly));

final JsonNode mixCdcExistingAndNew = Jsons.jsonNode(Map.of(
"replication_method", "Standard",
"replication", Jsons.jsonNode(Map.of(
"replication_type", "CDC",
"data_to_sync", "Existing and New",
"snapshot_isolation", "Read Committed"))));
assertEquals(DataToSync.EXISTING_AND_NEW, MssqlCdcHelper.getDataToSyncConfig(mixCdcExistingAndNew));

final JsonNode mixCdcNewOnly = Jsons.jsonNode(Map.of(
"replication_method", "Standard",
"replication",
Jsons.jsonNode(Map.of(
"replication_type", "CDC",
"data_to_sync", "New Changes Only",
"snapshot_isolation", "Snapshot"))));
assertEquals(DataToSync.NEW_CHANGES_ONLY, MssqlCdcHelper.getDataToSyncConfig(mixCdcNewOnly));
}

}