Skip to content

Commit

Permalink
🎉 Destination S3 & GCS: support additional properties (#7288)
Browse files Browse the repository at this point in the history
* Log json schema

* Use patched json avro converter

* Rename schema

* Update unit test cases

* Fix ab ap field schema conversion

* Rename files

* Add unit test cases

* Fix dependency for databricks

* Bump versions

* Update documentations

* Update gcs doc

* Set additional properties field name

* Revert s3 and gcs version

* Specify extra props fields

* Refactor json avro conversion doc

* Update connector doc

* Fix databricks spec typo

* Bump connector versions in seed
  • Loading branch information
tuliren authored Nov 3, 2021
1 parent 9a07136 commit c9c41dc
Show file tree
Hide file tree
Showing 39 changed files with 812 additions and 474 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "4816b78f-1489-44c1-9060-4b19d5fa9362",
"name": "S3",
"dockerRepository": "airbyte/destination-s3",
"dockerImageTag": "0.1.12",
"dockerImageTag": "0.1.13",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/s3"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "ca8f6566-e555-4b40-943a-545bf123117a",
"name": "Google Cloud Storage (GCS)",
"dockerRepository": "airbyte/destination-gcs",
"dockerImageTag": "0.1.2",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/gcs"
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
- name: Google Cloud Storage (GCS)
destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a
dockerRepository: airbyte/destination-gcs
dockerImageTag: 0.1.2
dockerImageTag: 0.1.3
documentationUrl: https://docs.airbyte.io/integrations/destinations/gcs
- name: Google PubSub
destinationDefinitionId: 356668e2-7e34-47f3-a3b0-67a8a481b692
Expand Down Expand Up @@ -93,7 +93,7 @@
- name: S3
destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerRepository: airbyte/destination-s3
dockerImageTag: 0.1.12
dockerImageTag: 0.1.13
documentationUrl: https://docs.airbyte.io/integrations/destinations/s3
- name: Snowflake
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/destination-databricks
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ dependencies {
implementation group: 'org.apache.hadoop', name: 'hadoop-aws', version: '3.3.0'
implementation group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '3.3.0'
implementation group: 'org.apache.parquet', name: 'parquet-avro', version: '1.12.0'
implementation group: 'tech.allegro.schema.json2avro', name: 'converter', version: '0.2.10'
implementation('tech.allegro.schema.json2avro:converter') {
version {
branch = 'master'
}
}

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-databricks')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public DatabricksStreamCopier(final String stagingFolder,
s3Config.getBucketName(), s3Config.getBucketPath(), databricksConfig.getDatabaseSchema(), streamName);

LOGGER.info("[Stream {}] Database schema: {}", streamName, schemaName);
LOGGER.info("[Stream {}] Parquet schema: {}", streamName, parquetWriter.getParquetSchema());
LOGGER.info("[Stream {}] Parquet schema: {}", streamName, parquetWriter.getSchema());
LOGGER.info("[Stream {}] Tmp table {} location: {}", streamName, tmpTableName, tmpTableLocation);
LOGGER.info("[Stream {}] Data table {} location: {}", streamName, destTableName, destTableLocation);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
"examples": ["airbyte.staging"]
},
"s3_bucket_path": {
"Title": "S3 Bucket Path",
"title": "S3 Bucket Path",
"type": "string",
"description": "The directory under the S3 bucket where data will be written.",
"examples": ["data_sync/test"]
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-gcs/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/destination-gcs
6 changes: 5 additions & 1 deletion airbyte-integrations/connectors/destination-gcs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ dependencies {
implementation group: 'org.apache.hadoop', name: 'hadoop-aws', version: '3.3.0'
implementation group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '3.3.0'
implementation group: 'org.apache.parquet', name: 'parquet-avro', version: '1.12.0'
implementation group: 'tech.allegro.schema.json2avro', name: 'converter', version: '0.2.10'
implementation('tech.allegro.schema.json2avro:converter') {
version {
branch = 'master'
}
}

testImplementation 'org.apache.commons:commons-lang3:3.11'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.airbyte.integrations.destination.gcs.writer.BaseGcsWriter;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.avro.AvroRecordFactory;
import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater;
import io.airbyte.integrations.destination.s3.avro.S3AvroFormatConfig;
import io.airbyte.integrations.destination.s3.util.S3StreamTransferManagerHelper;
import io.airbyte.integrations.destination.s3.writer.S3Writer;
Expand All @@ -27,6 +26,7 @@
import org.apache.avro.generic.GenericDatumWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;

public class GcsAvroWriter extends BaseGcsWriter implements S3Writer {

Expand All @@ -42,7 +42,7 @@ public GcsAvroWriter(final GcsDestinationConfig config,
final ConfiguredAirbyteStream configuredStream,
final Timestamp uploadTimestamp,
final Schema schema,
final JsonFieldNameUpdater nameUpdater)
final JsonAvroConverter converter)
throws IOException {
super(config, s3Client, configuredStream);

Expand All @@ -52,7 +52,7 @@ public GcsAvroWriter(final GcsDestinationConfig config,
LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.getBucketName(),
objectKey);

this.avroRecordFactory = new AvroRecordFactory(schema, nameUpdater);
this.avroRecordFactory = new AvroRecordFactory(schema, converter);
this.uploadManager = S3StreamTransferManagerHelper.getDefault(
config.getBucketName(), objectKey, s3Client, config.getFormatConfig().getPartSize());
// We only need one output stream as we only have one input stream. This is reasonably performant.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@
package io.airbyte.integrations.destination.gcs.parquet;

import com.amazonaws.services.s3.AmazonS3;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.gcs.GcsDestinationConfig;
import io.airbyte.integrations.destination.gcs.credential.GcsHmacKeyCredentialConfig;
import io.airbyte.integrations.destination.gcs.writer.BaseGcsWriter;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater;
import io.airbyte.integrations.destination.s3.avro.AvroRecordFactory;
import io.airbyte.integrations.destination.s3.parquet.S3ParquetFormatConfig;
import io.airbyte.integrations.destination.s3.writer.S3Writer;
import io.airbyte.protocol.models.AirbyteRecordMessage;
Expand Down Expand Up @@ -42,21 +39,17 @@ public class GcsParquetWriter extends BaseGcsWriter implements S3Writer {
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final ObjectWriter WRITER = MAPPER.writer();

private final Schema schema;
private final JsonFieldNameUpdater nameUpdater;
private final ParquetWriter<Record> parquetWriter;
private final JsonAvroConverter converter = new JsonAvroConverter();
private final AvroRecordFactory avroRecordFactory;

public GcsParquetWriter(final GcsDestinationConfig config,
final AmazonS3 s3Client,
final ConfiguredAirbyteStream configuredStream,
final Timestamp uploadTimestamp,
final Schema schema,
final JsonFieldNameUpdater nameUpdater)
final JsonAvroConverter converter)
throws URISyntaxException, IOException {
super(config, s3Client, configuredStream);
this.schema = schema;
this.nameUpdater = nameUpdater;

final String outputFilename = BaseGcsWriter.getOutputFilename(uploadTimestamp, S3Format.PARQUET);
final String objectKey = String.join("/", outputPrefix, outputFilename);
Expand All @@ -78,6 +71,7 @@ public GcsParquetWriter(final GcsDestinationConfig config,
.withDictionaryPageSize(formatConfig.getDictionaryPageSize())
.withDictionaryEncoding(formatConfig.isDictionaryEncoding())
.build();
this.avroRecordFactory = new AvroRecordFactory(schema, converter);
}

public static Configuration getHadoopConfig(final GcsDestinationConfig config) {
Expand All @@ -99,16 +93,7 @@ public static Configuration getHadoopConfig(final GcsDestinationConfig config) {

@Override
public void write(final UUID id, final AirbyteRecordMessage recordMessage) throws IOException {
JsonNode inputData = recordMessage.getData();
inputData = nameUpdater.getJsonWithStandardizedFieldNames(inputData);

final ObjectNode jsonRecord = MAPPER.createObjectNode();
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString());
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, recordMessage.getEmittedAt());
jsonRecord.setAll((ObjectNode) inputData);

final GenericData.Record avroRecord = converter.convertToGenericDataRecord(WRITER.writeValueAsBytes(jsonRecord), schema);
parquetWriter.write(avroRecord);
parquetWriter.write(avroRecordFactory.getAvroRecord(id, recordMessage));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import io.airbyte.integrations.destination.gcs.jsonl.GcsJsonlWriter;
import io.airbyte.integrations.destination.gcs.parquet.GcsParquetWriter;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater;
import io.airbyte.integrations.destination.s3.avro.AvroConstants;
import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter;
import io.airbyte.integrations.destination.s3.writer.S3Writer;
import io.airbyte.protocol.models.AirbyteStream;
Expand All @@ -35,20 +35,17 @@ public S3Writer create(final GcsDestinationConfig config,

if (format == S3Format.AVRO || format == S3Format.PARQUET) {
final AirbyteStream stream = configuredStream.getStream();
LOGGER.info("Json schema for stream {}: {}", stream.getName(), stream.getJsonSchema());

final JsonToAvroSchemaConverter schemaConverter = new JsonToAvroSchemaConverter();
final Schema avroSchema = schemaConverter.getAvroSchema(stream.getJsonSchema(), stream.getName(), stream.getNamespace(), true);
final JsonFieldNameUpdater nameUpdater = new JsonFieldNameUpdater(schemaConverter.getStandardizedNames());

LOGGER.info("Paquet schema for stream {}: {}", stream.getName(), avroSchema.toString(false));
if (nameUpdater.hasNameUpdate()) {
LOGGER.info("The following field names will be standardized: {}", nameUpdater);
}
LOGGER.info("Avro schema for stream {}: {}", stream.getName(), avroSchema.toString(false));

if (format == S3Format.AVRO) {
return new GcsAvroWriter(config, s3Client, configuredStream, uploadTimestamp, avroSchema, nameUpdater);
return new GcsAvroWriter(config, s3Client, configuredStream, uploadTimestamp, avroSchema, AvroConstants.JSON_CONVERTER);
} else {
return new GcsParquetWriter(config, s3Client, configuredStream, uploadTimestamp, avroSchema, nameUpdater);
return new GcsParquetWriter(config, s3Client, configuredStream, uploadTimestamp, avroSchema, AvroConstants.JSON_CONVERTER);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.fasterxml.jackson.databind.ObjectReader;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.avro.AvroConstants;
import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater;
import io.airbyte.integrations.destination.s3.util.AvroRecordHelper;
import java.util.LinkedList;
Expand All @@ -19,12 +20,9 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumReader;
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;

public class GcsAvroDestinationAcceptanceTest extends GcsDestinationAcceptanceTest {

private final JsonAvroConverter converter = new JsonAvroConverter();

protected GcsAvroDestinationAcceptanceTest() {
super(S3Format.AVRO);
}
Expand Down Expand Up @@ -56,7 +54,7 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
final ObjectReader jsonReader = MAPPER.reader();
while (dataFileReader.hasNext()) {
final GenericData.Record record = dataFileReader.next();
final byte[] jsonBytes = converter.convertToJson(record);
final byte[] jsonBytes = AvroConstants.JSON_CONVERTER.convertToJson(record);
JsonNode jsonRecord = jsonReader.readTree(jsonBytes);
jsonRecord = nameUpdater.getJsonWithOriginalFieldNames(jsonRecord);
jsonRecords.add(AvroRecordHelper.pruneAirbyteJson(jsonRecord));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

package io.airbyte.integrations.destination.gcs;

import static io.airbyte.integrations.destination.s3.S3DestinationConstants.NAME_TRANSFORMER;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
import com.amazonaws.services.s3.model.S3ObjectSummary;
Expand All @@ -15,6 +13,7 @@
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.jackson.MoreMappers;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.destination.s3.S3DestinationConstants;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.S3FormatConfig;
import io.airbyte.integrations.destination.s3.util.S3OutputPathHelper;
Expand Down Expand Up @@ -89,7 +88,7 @@ protected List<S3ObjectSummary> getAllSyncedObjects(final String streamName, fin
.listObjects(config.getBucketName(), outputPrefix)
.getObjectSummaries()
.stream()
.filter(o -> o.getKey().contains(NAME_TRANSFORMER.convertStreamName(streamName) + "/"))
.filter(o -> o.getKey().contains(S3DestinationConstants.NAME_TRANSFORMER.convertStreamName(streamName) + "/"))
.sorted(Comparator.comparingLong(o -> o.getLastModified().getTime()))
.collect(Collectors.toList());
LOGGER.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.destination.gcs.parquet.GcsParquetWriter;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.avro.AvroConstants;
import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater;
import io.airbyte.integrations.destination.s3.util.AvroRecordHelper;
import java.io.IOException;
Expand All @@ -22,12 +23,9 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader;
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;

public class GcsParquetDestinationAcceptanceTest extends GcsDestinationAcceptanceTest {

private final JsonAvroConverter converter = new JsonAvroConverter();

protected GcsParquetDestinationAcceptanceTest() {
super(S3Format.PARQUET);
}
Expand Down Expand Up @@ -63,7 +61,7 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
final ObjectReader jsonReader = MAPPER.reader();
GenericData.Record record;
while ((record = parquetReader.read()) != null) {
final byte[] jsonBytes = converter.convertToJson(record);
final byte[] jsonBytes = AvroConstants.JSON_CONVERTER.convertToJson(record);
JsonNode jsonRecord = jsonReader.readTree(jsonBytes);
jsonRecord = nameUpdater.getJsonWithOriginalFieldNames(jsonRecord);
jsonRecords.add(AvroRecordHelper.pruneAirbyteJson(jsonRecord));
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.12
LABEL io.airbyte.version=0.1.13
LABEL io.airbyte.name=airbyte/destination-s3
6 changes: 5 additions & 1 deletion airbyte-integrations/connectors/destination-s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ dependencies {
implementation group: 'org.apache.hadoop', name: 'hadoop-aws', version: '3.3.0'
implementation group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '3.3.0'
implementation group: 'org.apache.parquet', name: 'parquet-avro', version: '1.12.0'
implementation group: 'tech.allegro.schema.json2avro', name: 'converter', version: '0.2.10'
implementation('tech.allegro.schema.json2avro:converter') {
version {
branch = 'master'
}
}

testImplementation 'org.apache.commons:commons-lang3:3.11'

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3.avro;

import java.util.Set;
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;

public class AvroConstants {

// Field name with special character
public static final String DOC_KEY_VALUE_DELIMITER = ":";
public static final String DOC_KEY_ORIGINAL_NAME = "_airbyte_original_name";

public static final String AVRO_EXTRA_PROPS_FIELD = "_airbyte_additional_properties";
// This set must include _ab_additional_col in source_s3/source_files_abstract/stream.py
public static final Set<String> JSON_EXTRA_PROPS_FIELDS = Set.of("_ab_additional_properties", AVRO_EXTRA_PROPS_FIELD);
public static final AvroNameTransformer NAME_TRANSFORMER = new AvroNameTransformer();
public static final JsonAvroConverter JSON_CONVERTER = JsonAvroConverter.builder()
.setNameTransformer(NAME_TRANSFORMER::getIdentifier)
.setJsonAdditionalPropsFieldNames(JSON_EXTRA_PROPS_FIELDS)
.setAvroAdditionalPropsFieldName(AVRO_EXTRA_PROPS_FIELD)
.build();

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3;
package io.airbyte.integrations.destination.s3.avro;

import io.airbyte.integrations.destination.ExtendedNameTransformer;

public class S3NameTransformer extends ExtendedNameTransformer {
public class AvroNameTransformer extends ExtendedNameTransformer {

@Override
protected String applyDefaultCase(final String input) {
Expand Down
Loading

0 comments on commit c9c41dc

Please sign in to comment.