Skip to content

Commit

Permalink
Add sanitized column name in some destinations' raw table outputs (#5026
Browse files Browse the repository at this point in the history
)

* Add sanitized column name in some destinations' raw table outputs
  • Loading branch information
ChristopheDuong authored Jul 28, 2021
1 parent f538a63 commit 76c1f34
Show file tree
Hide file tree
Showing 12 changed files with 67 additions and 32 deletions.
25 changes: 2 additions & 23 deletions airbyte-commons/src/main/java/io/airbyte/commons/text/Names.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public class Names {
public static final String NON_ALPHANUMERIC_AND_UNDERSCORE_PATTERN = "[^\\p{Alnum}_]";

/**
* Converts any UTF8 string to a string with only alphanumeric and _ characters.
* Converts any UTF8 string to a string with only alphanumeric and _ characters without preserving
* accent characters.
*
* @param s string to convert
* @return cleaned string
Expand All @@ -44,28 +45,6 @@ public static String toAlphanumericAndUnderscore(String s) {
.replaceAll(NON_ALPHANUMERIC_AND_UNDERSCORE_PATTERN, "_");
}

/**
* Concatenate Strings together, but handles the case where the strings are already quoted
*/
public static String concatQuotedNames(final String inputStr1, final String inputStr2) {
boolean anyQuotes = false;
String unquotedStr1 = inputStr1;
String unquotedStr2 = inputStr2;
if (inputStr1.startsWith("\"")) {
unquotedStr1 = inputStr1.substring(1, inputStr1.length() - 1);
anyQuotes = true;
}
if (inputStr2.startsWith("\"")) {
unquotedStr2 = inputStr2.substring(1, inputStr2.length() - 1);
anyQuotes = true;
}
if (anyQuotes) {
return "\"" + unquotedStr1 + unquotedStr2 + "\"";
} else {
return unquotedStr1 + unquotedStr2;
}
}

public static String doubleQuote(String value) {
return internalQuote(value, '"');
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "22f6c74f-5699-40ff-833c-4a879ea40133",
"name": "BigQuery",
"dockerRepository": "airbyte/destination-bigquery",
"dockerImageTag": "0.3.8",
"dockerImageTag": "0.3.9",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "ca81ee7c-3163-4246-af40-094cc31e5e42",
"name": "MySQL",
"dockerRepository": "airbyte/destination-mysql",
"dockerImageTag": "0.1.9",
"dockerImageTag": "0.1.10",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/mysql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
- destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
name: BigQuery
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 0.3.8
dockerImageTag: 0.3.9
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
- destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
name: BigQuery (denormalized typed struct)
Expand Down Expand Up @@ -58,7 +58,7 @@
- destinationDefinitionId: ca81ee7c-3163-4246-af40-094cc31e5e42
name: MySQL
dockerRepository: airbyte/destination-mysql
dockerImageTag: 0.1.9
dockerImageTag: 0.1.10
documentationUrl: https://docs.airbyte.io/integrations/destinations/mysql
- destinationDefinitionId: d4353156-9217-4cad-8dd7-c108fd4f74cf
name: MS SQL Server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,19 @@

package io.airbyte.integrations.destination;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.commons.text.Names;
import io.airbyte.commons.util.MoreIterators;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

public class StandardNameTransformer implements NamingConventionTransformer {

private static final String NON_JSON_PATH_CHARACTERS_PATTERN = "['\"`]";

@Override
public String getIdentifier(String name) {
return convertStreamName(name);
Expand All @@ -48,4 +56,37 @@ protected String convertStreamName(String input) {
return Names.toAlphanumericAndUnderscore(input);
}

/**
* Rebuild a JsonNode adding sanitized property names (a subset of special characters replaced by
* underscores) while keeping original property names too. This is needed by some destinations as
* their json extract functions have limitations on how such special characters are parsed. These
* naming rules may be different to schema/table/column naming conventions.
*/
public static JsonNode formatJsonPath(JsonNode root) {
if (root.isObject()) {
final Map<String, JsonNode> properties = new HashMap<>();
var keys = Jsons.keys(root);
for (var key : keys) {
final JsonNode property = root.get(key);
// keep original key
properties.put(key, formatJsonPath(property));
}
for (var key : keys) {
final JsonNode property = root.get(key);
final String formattedKey = key.replaceAll(NON_JSON_PATH_CHARACTERS_PATTERN, "_");
if (!properties.containsKey(formattedKey)) {
// duplicate property in a formatted key to be extracted in normalization
properties.put(formattedKey, formatJsonPath(property));
}
}
return Jsons.jsonNode(properties);
} else if (root.isArray()) {
return Jsons.jsonNode(MoreIterators.toList(root.elements()).stream()
.map(StandardNameTransformer::formatJsonPath)
.collect(Collectors.toList()));
} else {
return root;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.8
LABEL io.airbyte.version=0.3.9
LABEL io.airbyte.name=airbyte/destination-bigquery
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
Expand Down Expand Up @@ -112,10 +113,10 @@ protected JsonNode formatRecord(Schema schema, AirbyteRecordMessage recordMessag
// use BQ helpers to string-format correctly.
long emittedAtMicroseconds = TimeUnit.MICROSECONDS.convert(recordMessage.getEmittedAt(), TimeUnit.MILLISECONDS);
final String formattedEmittedAt = QueryParameterValue.timestamp(emittedAtMicroseconds).getValue();

final JsonNode formattedData = StandardNameTransformer.formatJsonPath(recordMessage.getData());
return Jsons.jsonNode(ImmutableMap.of(
JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString(),
JavaBaseConstants.COLUMN_NAME_DATA, Jsons.serialize(recordMessage.getData()),
JavaBaseConstants.COLUMN_NAME_DATA, Jsons.serialize(formattedData),
JavaBaseConstants.COLUMN_NAME_EMITTED_AT, formattedEmittedAt));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package io.airbyte.integrations.destination.jdbc;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.JavaBaseConstants;
Expand Down Expand Up @@ -115,7 +116,7 @@ protected void writeBatchToFile(File tmpFile, List<AirbyteRecordMessage> records

for (AirbyteRecordMessage record : records) {
var uuid = UUID.randomUUID().toString();
var jsonData = Jsons.serialize(record.getData());
var jsonData = Jsons.serialize(formatData(record.getData()));
var emittedAt = Timestamp.from(Instant.ofEpochMilli(record.getEmittedAt()));
csvPrinter.printRecord(uuid, jsonData, emittedAt);
}
Expand All @@ -126,6 +127,10 @@ protected void writeBatchToFile(File tmpFile, List<AirbyteRecordMessage> records
}
}

protected JsonNode formatData(JsonNode data) {
return data;
}

@Override
public String truncateTableQuery(JdbcDatabase database, String schemaName, String tableName) {
return String.format("TRUNCATE TABLE %s.%s;\n", schemaName, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.version=0.1.10
LABEL io.airbyte.name=airbyte/destination-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@

package io.airbyte.integrations.destination.mysql;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.integrations.destination.jdbc.DefaultSqlOperations;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.io.File;
Expand Down Expand Up @@ -92,6 +94,11 @@ private void loadDataIntoTable(JdbcDatabase database,
});
}

@Override
protected JsonNode formatData(JsonNode data) {
return StandardNameTransformer.formatJsonPath(data);
}

void verifyLocalFileEnabled(JdbcDatabase database) throws SQLException {
boolean localFileEnabled = isLocalFileEnabled || checkIfLocalFileIsEnabled(database);
if (!localFileEnabled) {
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.3.9 | 2021-07-28 | [#5026](https://github.com/airbytehq/airbyte/pull/5026) | Add sanitized json fields in raw tables to handle quotes in column names |
| 0.3.6 | 2021-06-18 | [#3947](https://github.com/airbytehq/airbyte/issues/3947) | Service account credentials are now optional. |
| 0.3.4 | 2021-06-07 | [#3277](https://github.com/airbytehq/airbyte/issues/3277) | Add dataset location option |

Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ As a result, Airbyte MySQL destination forces all identifier (table, schema and

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.10 | 2021-07-28 | [#5026](https://github.com/airbytehq/airbyte/pull/5026) | Add sanitized json fields in raw tables to handle quotes in column names |
| 0.1.7 | 2021-07-09 | [#4651](https://github.com/airbytehq/airbyte/pull/4651) | Switch normalization flag on so users can use normalization. |
| 0.1.6 | 2021-07-03 | [#4531](https://github.com/airbytehq/airbyte/pull/4531) | Added normalization for MySQL. |
| 0.1.5 | 2021-07-03 | [#3973](https://github.com/airbytehq/airbyte/pull/3973) | Added `AIRBYTE_ENTRYPOINT` for kubernetes support. |
Expand Down

0 comments on commit 76c1f34

Please sign in to comment.