From 4e901f2f49226067cf87b631b636815794d9fe4c Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 6 Apr 2022 17:48:58 -0700 Subject: [PATCH 01/12] Rebase bigquery changes to master --- .../record_buffer/BaseSerializedBuffer.java | 6 +- .../build.gradle | 1 + .../BigQueryDenormalizedDestination.java | 25 ++- .../destination-bigquery/build.gradle | 3 +- .../BigQueryAvroSerializedBuffer.java | 54 ++++++ .../bigquery/BigQueryDestination.java | 139 ++++++++----- .../bigquery/BigQueryGcsOperations.java | 182 ++++++++++++++++++ .../BigQueryStagingConsumerFactory.java | 166 ++++++++++++++++ .../bigquery/BigQueryStagingOperations.java | 38 ++++ .../destination/bigquery/BigQueryUtils.java | 59 +++--- .../bigquery/BigQueryWriteConfig.java | 57 ++++++ .../uploader/AbstractBigQueryUploader.java | 37 ++-- .../uploader/AbstractGscBigQueryUploader.java | 8 +- .../uploader/BigQueryDirectUploader.java | 16 +- .../uploader/BigQueryUploaderFactory.java | 41 ++-- .../bigquery/BigQueryDestinationTest.java | 6 +- .../bigquery/BigQueryGcsDestinationTest.java | 6 +- .../destination/gcs/GcsStorageOperations.java | 8 + .../destination/gcs/util/GcsUtils.java | 10 +- .../destination/s3/S3StorageOperations.java | 6 +- .../s3/avro/AvroSerializedBuffer.java | 4 +- .../s3/avro/S3AvroFormatConfig.java | 5 + 22 files changed, 721 insertions(+), 156 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAvroSerializedBuffer.java create mode 100644 airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java create mode 100644 airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java create mode 100644 airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingOperations.java create mode 100644 airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryWriteConfig.java diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/BaseSerializedBuffer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/BaseSerializedBuffer.java index 65f015f7bafd..2f30c34f0a0c 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/BaseSerializedBuffer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/BaseSerializedBuffer.java @@ -147,7 +147,11 @@ public long getByteCount() { @Override public void close() throws Exception { if (!isClosed) { - inputStream.close(); + // inputStream can be null if the accept method encounters + // an error before inputStream is initialized + if (inputStream != null) { + inputStream.close(); + } bufferStorage.deleteFile(); isClosed = true; } diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/build.gradle b/airbyte-integrations/connectors/destination-bigquery-denormalized/build.gradle index 32c8f4d8af52..b75143d4763a 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/build.gradle +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/build.gradle @@ -19,6 +19,7 @@ dependencies { implementation project(':airbyte-protocol:models') implementation project(':airbyte-integrations:connectors:destination-s3') implementation project(':airbyte-integrations:connectors:destination-gcs') + implementation group: 'org.apache.parquet', name: 'parquet-avro', version: '1.12.0' integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-bigquery-denormalized') diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java index cdc862ee31ab..b3a0d5ba6ae0 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java @@ -11,25 +11,25 @@ import io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryDenormalizedRecordFormatter; import io.airbyte.integrations.destination.bigquery.formatter.GcsBigQueryDenormalizedRecordFormatter; import io.airbyte.integrations.destination.bigquery.uploader.UploaderType; +import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter; +import io.airbyte.protocol.models.AirbyteStream; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.function.Function; +import org.apache.avro.Schema; public class BigQueryDenormalizedDestination extends BigQueryDestination { - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDenormalizedDestination.class); - @Override protected String getTargetTableName(final String streamName) { // This BigQuery destination does not write to a staging "raw" table but directly to a normalized // table - return getNamingResolver().getIdentifier(streamName); + return namingResolver.getIdentifier(streamName); } @Override protected Map getFormatterMap(final JsonNode jsonSchema) { - return Map.of(UploaderType.STANDARD, new DefaultBigQueryDenormalizedRecordFormatter(jsonSchema, getNamingResolver()), - UploaderType.AVRO, new GcsBigQueryDenormalizedRecordFormatter(jsonSchema, getNamingResolver())); + return Map.of(UploaderType.STANDARD, new DefaultBigQueryDenormalizedRecordFormatter(jsonSchema, namingResolver), + UploaderType.AVRO, new GcsBigQueryDenormalizedRecordFormatter(jsonSchema, namingResolver)); } /** @@ -45,6 +45,17 @@ protected boolean isDefaultAirbyteTmpTableSchema() { return false; } + @Override + protected Function getAvroSchemaCreator() { + return stream -> new JsonToAvroSchemaConverter().getAvroSchema(stream.getJsonSchema(), stream.getName(), + stream.getNamespace(), true, false, false, true); + } + + @Override + protected Function getRecordFormatterCreator(final BigQuerySQLNameTransformer namingResolver) { + return streamSchema -> new GcsBigQueryDenormalizedRecordFormatter(streamSchema, namingResolver); + } + public static void main(final String[] args) throws Exception { final Destination destination = new BigQueryDenormalizedDestination(); new IntegrationRunner(destination).run(args); diff --git a/airbyte-integrations/connectors/destination-bigquery/build.gradle b/airbyte-integrations/connectors/destination-bigquery/build.gradle index 6ad87b1eafa3..e3f01c5553f2 100644 --- a/airbyte-integrations/connectors/destination-bigquery/build.gradle +++ b/airbyte-integrations/connectors/destination-bigquery/build.gradle @@ -12,9 +12,8 @@ application { dependencies { implementation 'com.google.cloud:google-cloud-bigquery:1.122.2' implementation 'org.apache.commons:commons-lang3:3.11' - - // csv implementation 'org.apache.commons:commons-csv:1.4' + implementation group: 'org.apache.parquet', name: 'parquet-avro', version: '1.12.0' implementation group: 'com.google.cloud', name: 'google-cloud-storage', version: '2.4.5' implementation group: 'com.codepoetics', name: 'protonpack', version: '1.13' diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAvroSerializedBuffer.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAvroSerializedBuffer.java new file mode 100644 index 000000000000..27fe0d0747e0 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAvroSerializedBuffer.java @@ -0,0 +1,54 @@ +package io.airbyte.integrations.destination.bigquery; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.functional.CheckedBiFunction; +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; +import io.airbyte.integrations.destination.record_buffer.BufferStorage; +import io.airbyte.integrations.destination.record_buffer.SerializableBuffer; +import io.airbyte.integrations.destination.s3.avro.AvroSerializedBuffer; +import io.airbyte.integrations.destination.s3.avro.S3AvroFormatConfig; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.function.Function; +import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; +import org.apache.commons.lang3.StringUtils; + +public class BigQueryAvroSerializedBuffer extends AvroSerializedBuffer { + + private final BigQueryRecordFormatter recordFormatter; + + public BigQueryAvroSerializedBuffer(final BufferStorage bufferStorage, + final CodecFactory codecFactory, + final Schema schema, + final BigQueryRecordFormatter recordFormatter) throws Exception { + super(bufferStorage, codecFactory, schema); + this.recordFormatter = recordFormatter; + } + + @Override + protected void writeRecord(final AirbyteRecordMessage recordMessage) throws IOException { + dataFileWriter.append(avroRecordFactory.getAvroRecord(recordFormatter.formatRecord(recordMessage))); + } + + public static CheckedBiFunction createFunction(final S3AvroFormatConfig config, + final Function schemaCreator, + final Function recordFormatterCreator, + final Callable createStorageFunction) { + final CodecFactory codecFactory = config.getCodecFactory(); + return (pair, catalog) -> { + final AirbyteStream stream = catalog.getStreams() + .stream() + .filter(s -> s.getStream().getName().equals(pair.getName()) && StringUtils.equals(s.getStream().getNamespace(), pair.getNamespace())) + .findFirst() + .orElseThrow(() -> new RuntimeException(String.format("No such stream %s.%s", pair.getNamespace(), pair.getName()))) + .getStream(); + return new BigQueryAvroSerializedBuffer(createStorageFunction.call(), codecFactory, schemaCreator.apply(stream), recordFormatterCreator.apply(stream.getJsonSchema())); + }; + } + +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index f3a5109ac709..78f3b45c974f 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -16,12 +16,14 @@ import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; import com.google.common.base.Charsets; +import io.airbyte.commons.functional.CheckedBiFunction; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.BaseConnector; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.integrations.destination.StandardNameTransformer; import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; import io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryRecordFormatter; import io.airbyte.integrations.destination.bigquery.formatter.GcsAvroBigQueryRecordFormatter; @@ -30,6 +32,14 @@ import io.airbyte.integrations.destination.bigquery.uploader.BigQueryUploaderFactory; import io.airbyte.integrations.destination.bigquery.uploader.UploaderType; import io.airbyte.integrations.destination.bigquery.uploader.config.UploaderConfig; +import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; +import io.airbyte.integrations.destination.gcs.GcsNameTransformer; +import io.airbyte.integrations.destination.gcs.GcsStorageOperations; +import io.airbyte.integrations.destination.gcs.util.GcsUtils; +import io.airbyte.integrations.destination.record_buffer.FileBuffer; +import io.airbyte.integrations.destination.record_buffer.SerializableBuffer; +import io.airbyte.integrations.destination.s3.avro.AvroSerializedBuffer; +import io.airbyte.integrations.destination.s3.avro.S3AvroFormatConfig; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.AirbyteMessage; @@ -41,9 +51,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.function.Consumer; -import java.util.stream.Collectors; +import java.util.function.Function; +import org.apache.avro.Schema; import org.apache.commons.lang3.tuple.ImmutablePair; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +71,7 @@ public class BigQueryDestination extends BaseConnector implements Destination { "storage.objects.delete", "storage.objects.get", "storage.objects.list"); - private final BigQuerySQLNameTransformer namingResolver; + protected final BigQuerySQLNameTransformer namingResolver; public BigQueryDestination() { namingResolver = new BigQuerySQLNameTransformer(); @@ -71,7 +85,7 @@ public AirbyteConnectionStatus check(final JsonNode config) { final BigQuery bigquery = getBigQuery(config); final UploadingMethod uploadingMethod = BigQueryUtils.getLoadingMethod(config); - BigQueryUtils.createSchemaTable(bigquery, datasetId, datasetLocation); + BigQueryUtils.createDataset(bigquery, datasetId, datasetLocation); final QueryJobConfiguration queryConfig = QueryJobConfiguration .newBuilder(String.format("SELECT * FROM `%s.INFORMATION_SCHEMA.TABLES` LIMIT 1;", datasetId)) .setUseLegacySql(false) @@ -100,24 +114,24 @@ public AirbyteConnectionStatus check(final JsonNode config) { } } - public AirbyteConnectionStatus checkStorageIamPermissions(JsonNode config) { + public AirbyteConnectionStatus checkStorageIamPermissions(final JsonNode config) { final JsonNode loadingMethod = config.get(BigQueryConsts.LOADING_METHOD); final String bucketName = loadingMethod.get(BigQueryConsts.GCS_BUCKET_NAME).asText(); try { - ServiceAccountCredentials credentials = getServiceAccountCredentials(config); + final ServiceAccountCredentials credentials = getServiceAccountCredentials(config); - Storage storage = StorageOptions.newBuilder() + final Storage storage = StorageOptions.newBuilder() .setProjectId(config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText()) .setCredentials(!isNull(credentials) ? credentials : ServiceAccountCredentials.getApplicationDefault()) .build().getService(); - List permissionsCheckStatusList = storage.testIamPermissions(bucketName, REQUIRED_PERMISSIONS); + final List permissionsCheckStatusList = storage.testIamPermissions(bucketName, REQUIRED_PERMISSIONS); - List missingPermissions = StreamUtils + final List missingPermissions = StreamUtils .zipWithIndex(permissionsCheckStatusList.stream()) .filter(i -> !i.getValue()) .map(i -> REQUIRED_PERMISSIONS.get(Math.toIntExact(i.getIndex()))) - .collect(Collectors.toList()); + .toList(); if (!missingPermissions.isEmpty()) { LOGGER.error("Please make sure you account has all of these permissions:{}", REQUIRED_PERMISSIONS); @@ -139,10 +153,6 @@ public AirbyteConnectionStatus checkStorageIamPermissions(JsonNode config) { } } - protected BigQuerySQLNameTransformer getNamingResolver() { - return namingResolver; - } - protected BigQuery getBigQuery(final JsonNode config) { final String projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(); @@ -164,44 +174,26 @@ protected BigQuery getBigQuery(final JsonNode config) { } } - private ServiceAccountCredentials getServiceAccountCredentials(JsonNode config) throws IOException { - ServiceAccountCredentials credentials; - final String credentialsString = - config.get(BigQueryConsts.CONFIG_CREDS).isObject() ? Jsons.serialize(config.get(BigQueryConsts.CONFIG_CREDS)) - : config.get(BigQueryConsts.CONFIG_CREDS).asText(); + private ServiceAccountCredentials getServiceAccountCredentials(final JsonNode config) throws IOException { + final ServiceAccountCredentials credentials; + final String credentialsString = config.get(BigQueryConsts.CONFIG_CREDS).isObject() + ? Jsons.serialize(config.get(BigQueryConsts.CONFIG_CREDS)) + : config.get(BigQueryConsts.CONFIG_CREDS).asText(); credentials = ServiceAccountCredentials .fromStream(new ByteArrayInputStream(credentialsString.getBytes(Charsets.UTF_8))); return credentials; } - /** - * Strategy: - *

- * 1. Create a temporary table for each stream - *

- *

- * 2. Write records to each stream directly (the bigquery client handles managing when to push the - * records over the network) - *

- *

- * 4. Once all records have been written close the writers, so that any remaining records are - * flushed. - *

- *

- * 5. Copy the temp tables to the final table name (overwriting if necessary). - *

- * - * @param config - integration-specific configuration object as json. e.g. { "username": "airbyte", - * "password": "super secure" } - * @param catalog - schema of the incoming messages. - * @return consumer that writes singer messages to the database. - */ @Override public AirbyteMessageConsumer getConsumer(final JsonNode config, final ConfiguredAirbyteCatalog catalog, - final Consumer outputRecordCollector) - throws IOException { - return getRecordConsumer(getUploaderMap(config, catalog), outputRecordCollector); + final Consumer outputRecordCollector) throws IOException { + final UploadingMethod uploadingMethod = BigQueryUtils.getLoadingMethod(config); + if (uploadingMethod == UploadingMethod.STANDARD) { + return getStandardRecordConsumer(config, catalog, outputRecordCollector); + } else { + return getGcsRecordConsumer(config, catalog, outputRecordCollector); + } } protected Map> getUploaderMap(final JsonNode config, @@ -243,20 +235,71 @@ protected boolean isDefaultAirbyteTmpTableSchema() { } protected Map getFormatterMap(final JsonNode jsonSchema) { - return Map.of(UploaderType.STANDARD, new DefaultBigQueryRecordFormatter(jsonSchema, getNamingResolver()), - UploaderType.CSV, new GcsCsvBigQueryRecordFormatter(jsonSchema, getNamingResolver()), - UploaderType.AVRO, new GcsAvroBigQueryRecordFormatter(jsonSchema, getNamingResolver())); + return Map.of(UploaderType.STANDARD, new DefaultBigQueryRecordFormatter(jsonSchema, namingResolver), + UploaderType.CSV, new GcsCsvBigQueryRecordFormatter(jsonSchema, namingResolver), + UploaderType.AVRO, new GcsAvroBigQueryRecordFormatter(jsonSchema, namingResolver)); } protected String getTargetTableName(final String streamName) { return namingResolver.getRawTableName(streamName); } - protected AirbyteMessageConsumer getRecordConsumer(final Map> writeConfigs, - final Consumer outputRecordCollector) { + private AirbyteMessageConsumer getStandardRecordConsumer(final JsonNode config, + final ConfiguredAirbyteCatalog catalog, + final Consumer outputRecordCollector) throws IOException { + final Map> writeConfigs = getUploaderMap(config, catalog); return new BigQueryRecordConsumer(writeConfigs, outputRecordCollector); } + + + public AirbyteMessageConsumer getGcsRecordConsumer(final JsonNode config, + final ConfiguredAirbyteCatalog catalog, + final Consumer outputRecordCollector) { + final BigQuery bigQuery = getBigQuery(config); + final GcsDestinationConfig gcsConfig = BigQueryUtils.getGcsAvroDestinationConfig(config); + final StandardNameTransformer gcsNameTransformer = new GcsNameTransformer(); + final UUID stagingId = UUID.randomUUID(); + final DateTime syncDatetime = DateTime.now(DateTimeZone.UTC); + final boolean keepStagingFiles = BigQueryUtils.isKeepFilesInGcs(config); + final GcsStorageOperations gcsOperations = new GcsStorageOperations(gcsNameTransformer, gcsConfig.getS3Client(), gcsConfig); + final BigQueryStagingOperations bigQueryGcsOperations = new BigQueryGcsOperations( + bigQuery, + gcsNameTransformer, + gcsConfig, + gcsOperations, + stagingId, + syncDatetime, + keepStagingFiles); + final S3AvroFormatConfig avroFormatConfig = (S3AvroFormatConfig) gcsConfig.getFormatConfig(); + final Function recordFormatterCreator = getRecordFormatterCreator(namingResolver); + final CheckedBiFunction onCreateBuffer = + BigQueryAvroSerializedBuffer.createFunction( + avroFormatConfig, + getAvroSchemaCreator(), + recordFormatterCreator, + () -> new FileBuffer(AvroSerializedBuffer.DEFAULT_SUFFIX)); + + return new BigQueryStagingConsumerFactory().create( + config, + catalog, + outputRecordCollector, + bigQueryGcsOperations, + onCreateBuffer, + stagingId, + syncDatetime, + recordFormatterCreator, + namingResolver); + } + + protected Function getAvroSchemaCreator() { + return stream -> GcsUtils.getDefaultAvroSchema(stream.getName(), stream.getNamespace(), true); + } + + protected Function getRecordFormatterCreator(final BigQuerySQLNameTransformer namingResolver) { + return streamSchema -> new GcsAvroBigQueryRecordFormatter(streamSchema, namingResolver); + } + public static void main(final String[] args) throws Exception { final Destination destination = new BigQueryDestination(); new IntegrationRunner(destination).run(args); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java new file mode 100644 index 000000000000..ccc27e62eb84 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java @@ -0,0 +1,182 @@ +package io.airbyte.integrations.destination.bigquery; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.FormatOptions; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.JobInfo.WriteDisposition; +import com.google.cloud.bigquery.LoadJobConfiguration; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.TableId; +import io.airbyte.integrations.destination.StandardNameTransformer; +import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader; +import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; +import io.airbyte.integrations.destination.gcs.GcsStorageOperations; +import io.airbyte.integrations.destination.record_buffer.SerializableBuffer; +import io.airbyte.protocol.models.DestinationSyncMode; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BigQueryGcsOperations implements BigQueryStagingOperations { + + private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryGcsOperations.class); + + private final BigQuery bigQuery; + private final StandardNameTransformer gcsNameTransformer; + private final GcsDestinationConfig gcsConfig; + private final GcsStorageOperations gcsStorageOperations; + private final UUID randomStagingId; + private final DateTime syncDatetime; + private final boolean keepStagingFiles; + private final Set existingSchemas = new HashSet<>(); + + public BigQueryGcsOperations(final BigQuery bigQuery, + final StandardNameTransformer gcsNameTransformer, + final GcsDestinationConfig gcsConfig, + final GcsStorageOperations gcsStorageOperations, + final UUID randomStagingId, + final DateTime syncDatetime, + final boolean keepStagingFiles) { + this.bigQuery = bigQuery; + this.gcsNameTransformer = gcsNameTransformer; + this.gcsConfig = gcsConfig; + this.gcsStorageOperations = gcsStorageOperations; + this.randomStagingId = randomStagingId; + this.syncDatetime = syncDatetime; + this.keepStagingFiles = keepStagingFiles; + } + + /** + * @return /_ + */ + private String getStagingRootPath(final String datasetId, final String stream) { + return gcsNameTransformer.applyDefaultCase(String.format("%s/%s_%s", + gcsConfig.getBucketPath(), + gcsNameTransformer.convertStreamName(datasetId), + gcsNameTransformer.convertStreamName(stream))); + } + + /** + * @return /_////// + */ + @Override + public String getStagingFullPath(final String datasetId, final String stream) { + return gcsNameTransformer.applyDefaultCase(String.format("%s/%s/%02d/%02d/%02d/%s/", + getStagingRootPath(datasetId, stream), + syncDatetime.year().get(), + syncDatetime.monthOfYear().get(), + syncDatetime.dayOfMonth().get(), + syncDatetime.hourOfDay().get(), + randomStagingId)); + } + + @Override + public void createSchemaIfNotExists(final String datasetId, final String datasetLocation) { + if (!existingSchemas.contains(datasetId)) { + LOGGER.info("Creating dataset {}", datasetId); + BigQueryUtils.createDataset(bigQuery, datasetId, datasetLocation); + existingSchemas.add(datasetId); + } + } + + @Override + public void createTmpTableIfNotExists(final TableId tmpTableId, final Schema tableSchema) { + LOGGER.info("Creating tmp table {}", tmpTableId); + BigQueryUtils.createPartitionedTable(bigQuery, tmpTableId, tableSchema); + } + + @Override + public void createStageIfNotExists(final String datasetId, final String stream) { + final String objectPath = getStagingFullPath(datasetId, stream); + LOGGER.info("Creating staging path for stream {} (dataset {}): {}", stream, datasetId, objectPath); + gcsStorageOperations.createBucketObjectIfNotExists(objectPath); + } + + @Override + public String uploadRecordsToStage(final String datasetId, final String stream, final SerializableBuffer writer) { + final String objectPath = getStagingFullPath(datasetId, stream); + LOGGER.info("Uploading records to staging for stream {} (dataset {}): {}", stream, datasetId, objectPath); + return gcsStorageOperations.uploadRecordsToBucket(writer, datasetId, getStagingRootPath(datasetId, stream), objectPath); + } + + /** + * Reference https://googleapis.dev/java/google-cloud-clients/latest/index.html?com/google/cloud/bigquery/package-summary.html + */ + @Override + public void copyIntoTmpTableFromStage(final String datasetId, + final String stream, + final TableId tmpTableId, + final Schema tmpTableSchema, + final List stagedFiles) { + LOGGER.info("Uploading records from staging files to tmp table {} (dataset {}): {}", tmpTableId, datasetId, stagedFiles); + + stagedFiles.parallelStream().forEach(stagedFile -> { + final String fullFilePath = String.format("gs://%s/%s%s", gcsConfig.getBucketName(), getStagingFullPath(datasetId, stream), stagedFile); + LOGGER.info("Uploading staged file: {}", fullFilePath); + final LoadJobConfiguration configuration = LoadJobConfiguration.builder(tmpTableId, fullFilePath) + .setFormatOptions(FormatOptions.avro()) + .setSchema(tmpTableSchema) + .setWriteDisposition(WriteDisposition.WRITE_APPEND) + .setUseAvroLogicalTypes(true) + .build(); + + final Job loadJob = this.bigQuery.create(JobInfo.of(configuration)); + LOGGER.info("[{}] Created a new job to upload records to tmp table {} (dataset {}): {}", loadJob.getJobId(), tmpTableId, datasetId, loadJob); + + try { + BigQueryUtils.waitForJobFinish(loadJob); + LOGGER.info("[{}] Tmp table {} (dataset {}) is successfully appended with staging files", loadJob.getJobId(), tmpTableId, datasetId); + } catch (final BigQueryException | InterruptedException e) { + LOGGER.error(String.format("[%s] Failed to upload staging files to tmp table %s (%s)", loadJob.getJobId(), tmpTableId, datasetId), e); + } + }); + } + + @Override + public void cleanUpStage(final String datasetId, final String stream, final List stagedFiles) { + if (keepStagingFiles) { + return; + } + + LOGGER.info("Deleting staging files for stream {} (dataset {}): {}", stream, datasetId, stagedFiles); + gcsStorageOperations.cleanUpBucketObject(getStagingRootPath(datasetId, stream), stagedFiles); + } + + @Override + public void copyIntoTargetTable(final String datasetId, + final TableId tmpTableId, + final TableId targetTableId, + final Schema schema, + final DestinationSyncMode syncMode) { + LOGGER.info("Copying data from tmp table {} to target table {} (dataset {}, sync mode {})", tmpTableId, targetTableId, datasetId, syncMode); + final WriteDisposition bigQueryMode = BigQueryUtils.getWriteDisposition(syncMode); + if (bigQueryMode == JobInfo.WriteDisposition.WRITE_APPEND) { + AbstractBigQueryUploader.partitionIfUnpartitioned(bigQuery, schema, targetTableId); + } + AbstractBigQueryUploader.copyTable(bigQuery, tmpTableId, targetTableId, bigQueryMode); + } + + @Override + public void dropTableIfExists(final String datasetId, final TableId tmpTableId) { + LOGGER.info("Deleting tmp table {} (dataset {})", tmpTableId, datasetId); + bigQuery.delete(tmpTableId); + } + + @Override + public void dropStageIfExists(final String datasetId, final String stream) { + if (keepStagingFiles) { + return; + } + + final String stagingDatasetPath = getStagingRootPath(datasetId, stream); + LOGGER.info("Cleaning up staging path for stream {} (dataset {}): {}", stream, datasetId, stagingDatasetPath); + gcsStorageOperations.dropBucketObject(stagingDatasetPath); + } + +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java new file mode 100644 index 000000000000..088491668c92 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java @@ -0,0 +1,166 @@ +package io.airbyte.integrations.destination.bigquery; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Functions; +import com.google.common.base.Preconditions; +import io.airbyte.commons.concurrency.VoidCallable; +import io.airbyte.commons.functional.CheckedBiConsumer; +import io.airbyte.commons.functional.CheckedBiFunction; +import io.airbyte.commons.functional.CheckedConsumer; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.integrations.destination.StandardNameTransformer; +import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; +import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer; +import io.airbyte.integrations.destination.record_buffer.SerializableBuffer; +import io.airbyte.integrations.destination.record_buffer.SerializedBufferingStrategy; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.Map; +import java.util.UUID; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.commons.io.FileUtils; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BigQueryStagingConsumerFactory { + + private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryStagingConsumerFactory.class); + + public AirbyteMessageConsumer create(final JsonNode config, + final ConfiguredAirbyteCatalog catalog, + final Consumer outputRecordCollector, + final BigQueryStagingOperations bigQueryGcsOperations, + final CheckedBiFunction onCreateBuffer, + final UUID stagingId, + final DateTime syncDatetime, + final Function recordFormatterCreator, + final StandardNameTransformer bigQueryNameTransformer) { + LOGGER.info("Creating BigQuery staging message consumer with staging ID {} at {}", stagingId, syncDatetime); + final Map writeConfigs = createWriteConfigs( + config, + catalog, + recordFormatterCreator, + bigQueryNameTransformer); + + return new BufferedStreamConsumer( + outputRecordCollector, + onStartFunction(bigQueryGcsOperations, writeConfigs), + new SerializedBufferingStrategy( + onCreateBuffer, + catalog, + flushBufferFunction(bigQueryGcsOperations, writeConfigs, catalog)), + onCloseFunction(bigQueryGcsOperations, writeConfigs), + catalog, + json -> true); + } + + private Map createWriteConfigs(final JsonNode config, + final ConfiguredAirbyteCatalog catalog, + final Function recordFormatterCreator, + final StandardNameTransformer bigQueryNameTransformer) { + return catalog.getStreams().stream() + .map(configuredStream -> { + Preconditions.checkNotNull(configuredStream.getDestinationSyncMode(), "Undefined destination sync mode"); + + final AirbyteStream stream = configuredStream.getStream(); + final String streamName = stream.getName(); + final BigQueryRecordFormatter recordFormatter = recordFormatterCreator.apply(stream.getJsonSchema()); + + final BigQueryWriteConfig writeConfig = new BigQueryWriteConfig( + streamName, + stream.getNamespace(), + BigQueryUtils.getSchema(config, configuredStream), + BigQueryUtils.getDatasetLocation(config), + bigQueryNameTransformer.getTmpTableName(streamName), + bigQueryNameTransformer.getRawTableName(streamName), + recordFormatter.getBigQuerySchema(), + configuredStream.getDestinationSyncMode()); + + LOGGER.info("BigQuery write config: {}", writeConfig); + + return writeConfig; + }) + .collect(Collectors.toMap( + c -> new AirbyteStreamNameNamespacePair(c.streamName(), c.namespace()), + Functions.identity())); + } + + private VoidCallable onStartFunction(final BigQueryStagingOperations bigQueryGcsOperations, + final Map writeConfigs) { + return () -> { + LOGGER.info("Preparing tmp tables in destination started for {} streams", writeConfigs.size()); + for (final BigQueryWriteConfig writeConfig : writeConfigs.values()) { + final String datasetId = writeConfig.datasetId(); + bigQueryGcsOperations.createSchemaIfNotExists(datasetId, writeConfig.datasetLocation()); + // only the tmp table is explicitly created, because the target table will be automatically + // created when the data is copied from the tmp table + bigQueryGcsOperations.createTmpTableIfNotExists(writeConfig.tmpTableId(), writeConfig.tableSchema()); + bigQueryGcsOperations.createStageIfNotExists(datasetId, writeConfig.streamName()); + } + LOGGER.info("Preparing tmp tables in destination completed."); + }; + } + + private CheckedBiConsumer flushBufferFunction(final BigQueryStagingOperations bigQueryGcsOperations, + final Map writeConfigs, + final ConfiguredAirbyteCatalog catalog) { + return (pair, writer) -> { + LOGGER.info("Flushing buffer for stream {} ({}) to staging", pair.getName(), FileUtils.byteCountToDisplaySize(writer.getByteCount())); + if (!writeConfigs.containsKey(pair)) { + throw new IllegalArgumentException( + String.format("Message contained record from a stream that was not in the catalog. \ncatalog: %s", Jsons.serialize(catalog))); + } + + final BigQueryWriteConfig writeConfig = writeConfigs.get(pair); + try (writer) { + writer.flush(); + final String stagedFile = bigQueryGcsOperations.uploadRecordsToStage(writeConfig.datasetId(), writeConfig.streamName(), writer); + writeConfig.addStagedFile(stagedFile); + } catch (final Exception e) { + LOGGER.error("Failed to flush and upload buffer to stage:", e); + throw new RuntimeException("Failed to upload buffer to stage", e); + } + }; + } + + private CheckedConsumer onCloseFunction(final BigQueryStagingOperations bigQueryGcsOperations, + final Map writeConfigs) { + return (hasFailed) -> { + if (!hasFailed) { + LOGGER.info("Copying into tables in destination started for {} streams", writeConfigs.size()); + + for (final BigQueryWriteConfig writeConfig : writeConfigs.values()) { + final String datasetId = writeConfig.datasetId(); + final String stream = writeConfig.streamName(); + + try { + bigQueryGcsOperations.copyIntoTmpTableFromStage(datasetId, stream, writeConfig.tmpTableId(), writeConfig.tableSchema(), writeConfig.stagedFiles()); + } catch (final Exception e) { + bigQueryGcsOperations.cleanUpStage(datasetId, stream, writeConfig.stagedFiles()); + final String stagingPath = bigQueryGcsOperations.getStagingFullPath(datasetId, stream); + throw new RuntimeException("Failed to upload data from stage " + stagingPath, e); + } + writeConfig.clearStagedFiles(); + bigQueryGcsOperations.copyIntoTargetTable( + datasetId, writeConfig.tmpTableId(), writeConfig.targetTableId(), writeConfig.tableSchema(), writeConfig.syncMode()); + } + + LOGGER.info("Finalizing tables in destination completed"); + } + + LOGGER.info("Cleaning up destination started for {} streams", writeConfigs.size()); + for (final BigQueryWriteConfig writeConfig : writeConfigs.values()) { + bigQueryGcsOperations.dropTableIfExists(writeConfig.datasetId(), writeConfig.tmpTableId()); + bigQueryGcsOperations.dropStageIfExists(writeConfig.datasetId(), writeConfig.streamName()); + } + LOGGER.info("Cleaning up destination completed."); + }; + } + +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingOperations.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingOperations.java new file mode 100644 index 000000000000..3dc18b7b3500 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingOperations.java @@ -0,0 +1,38 @@ +package io.airbyte.integrations.destination.bigquery; + +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.TableId; +import io.airbyte.integrations.destination.record_buffer.SerializableBuffer; +import io.airbyte.protocol.models.DestinationSyncMode; +import java.util.List; + +public interface BigQueryStagingOperations { + + String getStagingFullPath(final String datasetId, final String stream); + + void createSchemaIfNotExists(final String datasetId, final String datasetLocation); + + void createTmpTableIfNotExists(final TableId tmpTableId, final Schema tableSchema); + + void createStageIfNotExists(final String datasetId, final String stream); + + String uploadRecordsToStage(final String datasetId, final String stream, final SerializableBuffer writer) throws Exception; + + void copyIntoTmpTableFromStage(final String datasetId, + final String stream, + final TableId tmpTableId, + final Schema schema, + final List stagedFiles) throws Exception; + + void cleanUpStage(final String datasetId, final String stream, final List stagedFiles); + + void copyIntoTargetTable(final String datasetId, + final TableId tmpTableId, + final TableId targetTableId, + final Schema schema, + final DestinationSyncMode syncMode); + + void dropTableIfExists(final String datasetId, final TableId tmpTableId); + + void dropStageIfExists(final String datasetId, final String stream); +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java index 6af04e9ce283..7ecc370bfdb1 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java @@ -31,6 +31,7 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.DestinationSyncMode; import java.time.Instant; @@ -89,17 +90,17 @@ static Job waitForQuery(final Job queryJob) { public static void createSchemaAndTableIfNeeded(final BigQuery bigquery, final Set existingSchemas, final String schemaName, - final String tmpTableName, + final TableId tmpTableId, final String datasetLocation, final Schema schema) { if (!existingSchemas.contains(schemaName)) { - createSchemaTable(bigquery, schemaName, datasetLocation); + createDataset(bigquery, schemaName, datasetLocation); existingSchemas.add(schemaName); } - BigQueryUtils.createPartitionedTable(bigquery, schemaName, tmpTableName, schema); + BigQueryUtils.createPartitionedTable(bigquery, tmpTableId, schema); } - static void createSchemaTable(final BigQuery bigquery, final String datasetId, final String datasetLocation) { + public static void createDataset(final BigQuery bigquery, final String datasetId, final String datasetLocation) { final Dataset dataset = bigquery.getDataset(datasetId); if (dataset == null || !dataset.exists()) { final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(datasetLocation).build(); @@ -108,11 +109,8 @@ static void createSchemaTable(final BigQuery bigquery, final String datasetId, f } // https://cloud.google.com/bigquery/docs/creating-partitioned-tables#java - static void createPartitionedTable(final BigQuery bigquery, final String datasetName, final String tableName, final Schema schema) { + static void createPartitionedTable(final BigQuery bigquery, final TableId tableId, final Schema schema) { try { - - final TableId tableId = TableId.of(datasetName, tableName); - final TimePartitioning partitioning = TimePartitioning.newBuilder(TimePartitioning.Type.DAY) .setField(JavaBaseConstants.COLUMN_NAME_EMITTED_AT) .build(); @@ -130,9 +128,9 @@ static void createPartitionedTable(final BigQuery bigquery, final String dataset final TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build(); bigquery.create(tableInfo); - LOGGER.info("Partitioned Table: {} created successfully", tableId); - } catch (BigQueryException e) { - LOGGER.info("Partitioned table was not created. \n" + e); + LOGGER.info("Partitioned table created successfully: {}", tableId); + } catch (final BigQueryException e) { + LOGGER.error("Partitioned table was not created: " + tableId, e); } } @@ -154,6 +152,10 @@ public static JsonNode getGcsJsonNodeConfig(final JsonNode config) { return gcsJsonNode; } + public static GcsDestinationConfig getGcsAvroDestinationConfig(final JsonNode config) { + return GcsDestinationConfig.getGcsDestinationConfig(getGcsAvroJsonNodeConfig(config)); + } + public static JsonNode getGcsAvroJsonNodeConfig(final JsonNode config) { final JsonNode loadingMethod = config.get(BigQueryConsts.LOADING_METHOD); final JsonNode gcsJsonNode = Jsons.jsonNode(ImmutableMap.builder() @@ -176,12 +178,12 @@ public static JsonNode getGcsAvroJsonNodeConfig(final JsonNode config) { * @return a default schema name based on the config. */ public static String getDatasetId(final JsonNode config) { - String datasetId = config.get(BigQueryConsts.CONFIG_DATASET_ID).asText(); + final String datasetId = config.get(BigQueryConsts.CONFIG_DATASET_ID).asText(); - int colonIndex = datasetId.indexOf(":"); + final int colonIndex = datasetId.indexOf(":"); if (colonIndex != -1) { - String projectIdPart = datasetId.substring(0, colonIndex); - String projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(); + final String projectIdPart = datasetId.substring(0, colonIndex); + final String projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(); if (!(projectId.equals(projectIdPart))) { throw new IllegalArgumentException(String.format( "Project ID included in Dataset ID must match Project ID field's value: Project ID is `%s`, but you specified `%s` in Dataset ID", @@ -212,9 +214,9 @@ static TableDefinition getTableDefinition(final BigQuery bigquery, final String * @return The list of fields with datetime format. * */ - public static List getDateTimeFieldsFromSchema(FieldList fieldList) { - List dateTimeFields = new ArrayList<>(); - for (Field field : fieldList) { + public static List getDateTimeFieldsFromSchema(final FieldList fieldList) { + final List dateTimeFields = new ArrayList<>(); + for (final Field field : fieldList) { if (field.getType().getStandardType().equals(StandardSQLTypeName.DATETIME)) { dateTimeFields.add(field.getName()); } @@ -231,10 +233,10 @@ public static List getDateTimeFieldsFromSchema(FieldList fieldList) { * "https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-json#details_of_loading_json_data">Supported * Google bigquery datatype This method is responsible to adapt JSON DATETIME to Bigquery */ - public static void transformJsonDateTimeToBigDataFormat(List dateTimeFields, ObjectNode data) { + public static void transformJsonDateTimeToBigDataFormat(final List dateTimeFields, final ObjectNode data) { dateTimeFields.forEach(e -> { if (data.findValue(e) != null && !data.get(e).isNull()) { - String googleBigQueryDateFormat = QueryParameterValue + final String googleBigQueryDateFormat = QueryParameterValue .dateTime(new DateTime(convertDateToInstantFormat(data .findValue(e) .asText())) @@ -245,6 +247,9 @@ public static void transformJsonDateTimeToBigDataFormat(List dateTimeFie }); } + /** + * @return BigQuery dataset ID + */ public static String getSchema(final JsonNode config, final ConfiguredAirbyteStream stream) { final String srcNamespace = stream.getStream().getNamespace(); final String schemaName = srcNamespace == null ? getDatasetId(config) : srcNamespace; @@ -308,33 +313,33 @@ public static boolean isKeepFilesInGcs(final JsonNode config) { } } - public static void waitForJobFinish(Job job) throws InterruptedException { + public static void waitForJobFinish(final Job job) throws InterruptedException { if (job != null) { try { LOGGER.info("Waiting for job finish {}. Status: {}", job, job.getStatus()); job.waitFor(); LOGGER.info("Job finish {} with status {}", job, job.getStatus()); } catch (final BigQueryException e) { - String errorMessage = getJobErrorMessage(e.getErrors(), job); + final String errorMessage = getJobErrorMessage(e.getErrors(), job); LOGGER.error(errorMessage); throw new BigQueryException(e.getCode(), errorMessage, e); } } } - private static String convertDateToInstantFormat(String data) { + private static String convertDateToInstantFormat(final String data) { Instant instant = null; try { - ZonedDateTime zdt = ZonedDateTime.parse(data, formatter); + final ZonedDateTime zdt = ZonedDateTime.parse(data, formatter); instant = zdt.toLocalDateTime().toInstant(ZoneOffset.UTC); return instant.toString(); - } catch (DateTimeParseException e) { + } catch (final DateTimeParseException e) { try { - LocalDateTime dt = LocalDateTime.parse(data, formatter); + final LocalDateTime dt = LocalDateTime.parse(data, formatter); instant = dt.toInstant(ZoneOffset.UTC); return instant.toString(); - } catch (DateTimeParseException ex) { + } catch (final DateTimeParseException ex) { // no logging since it may generate too much noise } } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryWriteConfig.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryWriteConfig.java new file mode 100644 index 000000000000..e29f2a7e5bef --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryWriteConfig.java @@ -0,0 +1,57 @@ +package io.airbyte.integrations.destination.bigquery; + +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.TableId; +import io.airbyte.protocol.models.DestinationSyncMode; +import java.util.ArrayList; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @param datasetId the dataset ID is equivalent to output schema + */ +public record BigQueryWriteConfig( + String streamName, + String namespace, + String datasetId, + String datasetLocation, + TableId tmpTableId, + TableId targetTableId, + Schema tableSchema, + DestinationSyncMode syncMode, + List stagedFiles +) { + + private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryWriteConfig.class); + + public BigQueryWriteConfig(final String streamName, + final String namespace, + final String datasetId, + final String datasetLocation, + final String tmpTableName, + final String targetTableName, + final Schema tableSchema, + final DestinationSyncMode syncMode) { + this( + streamName, + namespace, + datasetId, + datasetLocation, + TableId.of(datasetId, tmpTableName), + TableId.of(datasetId, targetTableName), + tableSchema, + syncMode, + new ArrayList<>()); + } + + public void addStagedFile(final String file) { + this.stagedFiles.add(file); + LOGGER.info("Added staged file: {}", file); + } + + public void clearStagedFiles() { + this.stagedFiles.clear(); + } + +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java index ab4600b0825a..c105087bf2fc 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java @@ -53,13 +53,13 @@ public abstract class AbstractBigQueryUploader { this.recordFormatter = recordFormatter; } - protected void postProcessAction(boolean hasFailed) throws Exception { + protected void postProcessAction(final boolean hasFailed) throws Exception { // Do nothing by default } - public void upload(AirbyteMessage airbyteMessage) { + public void upload(final AirbyteMessage airbyteMessage) { try { - writer.write((recordFormatter.formatRecord(airbyteMessage.getRecord()))); + writer.write(recordFormatter.formatRecord(airbyteMessage.getRecord())); } catch (final IOException | RuntimeException e) { LOGGER.error("Got an error while writing message: {}", e.getMessage(), e); LOGGER.error(String.format( @@ -71,7 +71,7 @@ public void upload(AirbyteMessage airbyteMessage) { } } - public void close(boolean hasFailed, Consumer outputRecordCollector, AirbyteMessage lastStateMessage) { + public void close(final boolean hasFailed, final Consumer outputRecordCollector, final AirbyteMessage lastStateMessage) { try { LOGGER.info("Field fails during format : "); recordFormatter.printAndCleanFieldFails(); @@ -91,14 +91,14 @@ public void close(boolean hasFailed, Consumer outputRecordCollec } } - protected void uploadData(Consumer outputRecordCollector, AirbyteMessage lastStateMessage) throws Exception { + protected void uploadData(final Consumer outputRecordCollector, final AirbyteMessage lastStateMessage) throws Exception { try { LOGGER.info("Uploading data from the tmp table {} to the source table {}.", tmpTable.getTable(), table.getTable()); uploadDataToTableFromTmpTable(); LOGGER.info("Data is successfully loaded to the source table {}!", table.getTable()); outputRecordCollector.accept(lastStateMessage); LOGGER.info("Final state message is accepted."); - } catch (Exception e) { + } catch (final Exception e) { LOGGER.error("Upload data is failed!"); throw e; } finally { @@ -112,21 +112,20 @@ protected void dropTmpTable() { LOGGER.info("Removing tmp tables..."); bigQuery.delete(tmpTable); LOGGER.info("Finishing destination process...completed"); - } catch (Exception e) { + } catch (final Exception e) { LOGGER.error("Fail to tmp table drop table: " + e.getMessage()); } } - protected void uploadDataToTableFromTmpTable() throws Exception { + protected void uploadDataToTableFromTmpTable() { LOGGER.info("Replication finished with no explicit errors. Copying data from tmp tables to permanent"); if (syncMode.equals(JobInfo.WriteDisposition.WRITE_APPEND)) { - partitionIfUnpartitioned(table); + partitionIfUnpartitioned(bigQuery, recordFormatter.getBigQuerySchema(), table); } - copyTable(tmpTable, table, - syncMode); + copyTable(bigQuery, tmpTable, table, syncMode); } - private void partitionIfUnpartitioned(final TableId destinationTableId) { + public static void partitionIfUnpartitioned(final BigQuery bigQuery, final Schema schema, final TableId destinationTableId) { try { final QueryJobConfiguration queryConfig = QueryJobConfiguration .newBuilder( @@ -149,7 +148,7 @@ private void partitionIfUnpartitioned(final TableId destinationTableId) { // https://cloud.google.com/bigquery/docs/creating-partitioned-tables#create_a_partitioned_table_from_a_query_result final QueryJobConfiguration partitionQuery = QueryJobConfiguration .newBuilder( - getCreatePartitionedTableFromSelectQuery(recordFormatter.getBigQuerySchema(), bigQuery.getOptions().getProjectId(), + getCreatePartitionedTableFromSelectQuery(schema, bigQuery.getOptions().getProjectId(), destinationTableId, tmpPartitionTable)) .setUseLegacySql(false) @@ -159,7 +158,7 @@ private void partitionIfUnpartitioned(final TableId destinationTableId) { // partitioned... thus, we force re-create from scratch by completely deleting and creating new // table. bigQuery.delete(destinationTableId); - copyTable(tmpPartitionTableId, destinationTableId, JobInfo.WriteDisposition.WRITE_EMPTY); + copyTable(bigQuery, tmpPartitionTableId, destinationTableId, JobInfo.WriteDisposition.WRITE_EMPTY); bigQuery.delete(tmpPartitionTableId); } }); @@ -169,7 +168,7 @@ private void partitionIfUnpartitioned(final TableId destinationTableId) { } // https://cloud.google.com/bigquery/docs/managing-tables#copying_a_single_source_table - private void copyTable( + public static void copyTable(final BigQuery bigQuery, final TableId sourceTableId, final TableId destinationTableId, final JobInfo.WriteDisposition syncMode) { @@ -187,10 +186,10 @@ private void copyTable( LOGGER.info("successfully copied table: {} to table: {}", sourceTableId, destinationTableId); } - protected String getCreatePartitionedTableFromSelectQuery(final Schema schema, - final String projectId, - final TableId destinationTableId, - final String tmpPartitionTable) { + private static String getCreatePartitionedTableFromSelectQuery(final Schema schema, + final String projectId, + final TableId destinationTableId, + final String tmpPartitionTable) { return String.format("create table `%s.%s.%s` (", projectId, destinationTableId.getDataset(), tmpPartitionTable) + schema.getFields().stream() .map(field -> String.format("%s %s", field.getName(), field.getType())) diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractGscBigQueryUploader.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractGscBigQueryUploader.java index a5a1420f05d6..92ec6a8550b3 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractGscBigQueryUploader.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractGscBigQueryUploader.java @@ -45,7 +45,7 @@ public abstract class AbstractGscBigQueryUploader outputRecordCollector, super.uploadData(outputRecordCollector, lastStateMessage); } - protected void uploadDataFromFileToTmpTable() throws Exception { + protected void uploadDataFromFileToTmpTable() { try { final String fileLocation = this.writer.getFileLocation(); @@ -79,9 +79,9 @@ protected void uploadDataFromFileToTmpTable() throws Exception { // Blocks until this load table job completes its execution, either failing or succeeding. BigQueryUtils.waitForJobFinish(loadJob); - LOGGER.info("Table is successfully overwritten by " + getFileTypeName() + " file loaded from GCS"); + LOGGER.info("Table is successfully overwritten by file loaded from GCS: {}", getFileTypeName()); } catch (final BigQueryException | InterruptedException e) { - LOGGER.error("Column not added during load append \n" + e.toString()); + LOGGER.error("Column not added during load append", e); throw new RuntimeException("Column not added during load append \n" + e.toString()); } } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryDirectUploader.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryDirectUploader.java index 1b2a30c9fe52..e813b4424b7b 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryDirectUploader.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryDirectUploader.java @@ -15,19 +15,17 @@ public class BigQueryDirectUploader extends AbstractBigQueryUploader { - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDirectUploader.class); - - public BigQueryDirectUploader(TableId table, - TableId tmpTable, - BigQueryTableWriter writer, - JobInfo.WriteDisposition syncMode, - BigQuery bigQuery, - BigQueryRecordFormatter recordFormatter) { + public BigQueryDirectUploader(final TableId table, + final TableId tmpTable, + final BigQueryTableWriter writer, + final JobInfo.WriteDisposition syncMode, + final BigQuery bigQuery, + final BigQueryRecordFormatter recordFormatter) { super(table, tmpTable, writer, syncMode, bigQuery, recordFormatter); } @Override - protected void uploadData(Consumer outputRecordCollector, AirbyteMessage lastStateMessage) throws Exception { + protected void uploadData(final Consumer outputRecordCollector, final AirbyteMessage lastStateMessage) throws Exception { BigQueryUtils.waitForJobFinish(writer.getWriteChannel().getJob()); super.uploadData(outputRecordCollector, lastStateMessage); } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java index 2491048fa929..1679c5339508 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java @@ -33,33 +33,29 @@ public class BigQueryUploaderFactory { public static AbstractBigQueryUploader getUploader(final UploaderConfig uploaderConfig) throws IOException { - final String schemaName = BigQueryUtils.getSchema( - uploaderConfig.getConfig(), - uploaderConfig.getConfigStream()); + final String schemaName = BigQueryUtils.getSchema(uploaderConfig.getConfig(), uploaderConfig.getConfigStream()); final String datasetLocation = BigQueryUtils.getDatasetLocation(uploaderConfig.getConfig()); final Set existingSchemas = new HashSet<>(); - final boolean isGcsUploadingMode = - UploadingMethod.GCS.equals(BigQueryUtils.getLoadingMethod(uploaderConfig.getConfig())); - final BigQueryRecordFormatter recordFormatter = - (isGcsUploadingMode - ? uploaderConfig.getFormatterMap().get(UploaderType.AVRO) - : uploaderConfig.getFormatterMap().get(UploaderType.STANDARD)); + final boolean isGcsUploadingMode = BigQueryUtils.getLoadingMethod(uploaderConfig.getConfig()) == UploadingMethod.GCS; + final BigQueryRecordFormatter recordFormatter = isGcsUploadingMode + ? uploaderConfig.getFormatterMap().get(UploaderType.AVRO) + : uploaderConfig.getFormatterMap().get(UploaderType.STANDARD); final Schema bigQuerySchema = recordFormatter.getBigQuerySchema(); + final TableId targetTable = TableId.of(schemaName, uploaderConfig.getTargetTableName()); + final TableId tmpTable = TableId.of(schemaName, uploaderConfig.getTmpTableName()); + BigQueryUtils.createSchemaAndTableIfNeeded( uploaderConfig.getBigQuery(), existingSchemas, schemaName, - uploaderConfig.getTmpTableName(), + tmpTable, datasetLocation, bigQuerySchema); - final TableId targetTable = TableId.of(schemaName, uploaderConfig.getTargetTableName()); - final TableId tmpTable = TableId.of(schemaName, uploaderConfig.getTmpTableName()); - final JobInfo.WriteDisposition syncMode = - BigQueryUtils.getWriteDisposition( - uploaderConfig.getConfigStream().getDestinationSyncMode()); + final JobInfo.WriteDisposition syncMode = BigQueryUtils.getWriteDisposition( + uploaderConfig.getConfigStream().getDestinationSyncMode()); return (isGcsUploadingMode ? getGcsBigQueryUploader( @@ -92,9 +88,7 @@ private static AbstractGscBigQueryUploader getGcsBigQueryUploader( final boolean isDefaultAirbyteTmpSchema) throws IOException { - final GcsDestinationConfig gcsDestinationConfig = - GcsDestinationConfig.getGcsDestinationConfig( - BigQueryUtils.getGcsAvroJsonNodeConfig(config)); + final GcsDestinationConfig gcsDestinationConfig = BigQueryUtils.getGcsAvroDestinationConfig(config); final JsonNode tmpTableSchema = (isDefaultAirbyteTmpSchema ? null : formatter.getJsonSchema()); final GcsAvroWriter gcsCsvWriter = @@ -145,12 +139,11 @@ private static BigQueryDirectUploader getBigQueryDirectUploader( .setFormatOptions(FormatOptions.json()) .build(); // new-line delimited json. - final JobId job = - JobId.newBuilder() - .setRandomJob() - .setLocation(datasetLocation) - .setProject(bigQuery.getOptions().getProjectId()) - .build(); + final JobId job = JobId.newBuilder() + .setRandomJob() + .setLocation(datasetLocation) + .setProject(bigQuery.getOptions().getProjectId()) + .build(); final TableDataWriteChannel writer = bigQuery.writer(job, writeChannelConfiguration); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java index 4d9fa9203348..021c246b8487 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java @@ -4,7 +4,6 @@ package io.airbyte.integrations.destination.bigquery; -import static java.util.stream.Collectors.toList; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -243,6 +242,7 @@ void testWriteSuccess(final DatasetIdResetter resetDatasetId) throws Exception { final BigQueryDestination destination = new BigQueryDestination(); final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); + consumer.start(); consumer.accept(MESSAGE_USERS1); consumer.accept(MESSAGE_TASKS1); consumer.accept(MESSAGE_USERS2); @@ -277,6 +277,7 @@ void testWriteFailure(final DatasetIdResetter resetDatasetId) throws Exception { final AirbyteMessageConsumer consumer = spy(new BigQueryDestination().getConsumer(config, catalog, Destination::defaultOutputRecordCollector)); + consumer.start(); assertThrows(RuntimeException.class, () -> consumer.accept(spiedMessage)); consumer.accept(MESSAGE_USERS2); consumer.close(); @@ -285,7 +286,7 @@ void testWriteFailure(final DatasetIdResetter resetDatasetId) throws Exception { .stream() .map(ConfiguredAirbyteStream::getStream) .map(AirbyteStream::getName) - .collect(toList()); + .toList(); assertTmpTablesNotPresent(catalog.getStreams() .stream() .map(ConfiguredAirbyteStream::getStream) @@ -340,6 +341,7 @@ void testWritePartitionOverUnpartitioned(final DatasetIdResetter resetDatasetId) final BigQueryDestination destination = new BigQueryDestination(); final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); + consumer.start(); consumer.accept(MESSAGE_USERS1); consumer.accept(MESSAGE_TASKS1); consumer.accept(MESSAGE_USERS2); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationTest.java index 93dca8843be0..1fc909c87bb4 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationTest.java @@ -143,9 +143,9 @@ protected void tearDownGcs() { if (keysToDelete.size() > 0) { LOGGER.info("Tearing down test bucket path: {}/{}", gcsBucketName, gcs_bucket_path); // Google Cloud Storage doesn't accept request to delete multiple objects - for (final KeyVersion keyToDelete : keysToDelete) { - s3Client.deleteObject(gcsBucketName, keyToDelete.getKey()); - } +// for (final KeyVersion keyToDelete : keysToDelete) { +// s3Client.deleteObject(gcsBucketName, keyToDelete.getKey()); +// } LOGGER.info("Deleted {} file(s).", keysToDelete.size()); } } diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsStorageOperations.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsStorageOperations.java index af2ee383d31d..172428aa8b00 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsStorageOperations.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsStorageOperations.java @@ -23,6 +23,14 @@ public GcsStorageOperations(final NamingConventionTransformer nameTransformer, super(nameTransformer, s3Client, s3Config); } + /** + * GCS only supports the legacy AmazonS3#doesBucketExist method. + */ + @Override + protected boolean doesBucketExist(final String bucket) { + return s3Client.doesBucketExist(bucket); + } + /** * This method is overridden because GCS doesn't accept request to delete multiple objects. The only * difference is that the AmazonS3#deleteObjects method is replaced with AmazonS3#deleteObject. diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/util/GcsUtils.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/util/GcsUtils.java index 7ef402f1e65f..b417a7044784 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/util/GcsUtils.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/util/GcsUtils.java @@ -16,6 +16,8 @@ public class GcsUtils { private static final Logger LOGGER = LoggerFactory.getLogger(GcsUtils.class); + private static final Schema UUID_SCHEMA = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)); + private static final Schema TIMESTAMP_MILLIS_SCHEMA = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)); public static Schema getDefaultAvroSchema(final String name, @Nullable final String namespace, @@ -31,15 +33,9 @@ public static Schema getDefaultAvroSchema(final String name, SchemaBuilder.FieldAssembler assembler = builder.fields(); - Schema TIMESTAMP_MILLIS_SCHEMA = LogicalTypes.timestampMillis() - .addToSchema(Schema.create(Schema.Type.LONG)); - Schema UUID_SCHEMA = LogicalTypes.uuid() - .addToSchema(Schema.create(Schema.Type.STRING)); - if (appendAirbyteFields) { assembler = assembler.name(JavaBaseConstants.COLUMN_NAME_AB_ID).type(UUID_SCHEMA).noDefault(); - assembler = assembler.name(JavaBaseConstants.COLUMN_NAME_EMITTED_AT) - .type(TIMESTAMP_MILLIS_SCHEMA).noDefault(); + assembler = assembler.name(JavaBaseConstants.COLUMN_NAME_EMITTED_AT).type(TIMESTAMP_MILLIS_SCHEMA).noDefault(); } assembler = assembler.name(JavaBaseConstants.COLUMN_NAME_DATA).type().stringType().noDefault(); diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java index 3fa75feea258..e60610d59b4c 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java @@ -85,7 +85,7 @@ public String getBucketObjectPath(final String namespace, final String streamNam @Override public void createBucketObjectIfNotExists(final String objectPath) { final String bucket = s3Config.getBucketName(); - if (!s3Client.doesBucketExistV2(bucket)) { + if (!doesBucketExist(bucket)) { LOGGER.info("Bucket {} does not exist; creating...", bucket); s3Client.createBucket(bucket); LOGGER.info("Bucket {} has been created.", bucket); @@ -97,6 +97,10 @@ public void createBucketObjectIfNotExists(final String objectPath) { } } + protected boolean doesBucketExist(final String bucket) { + return s3Client.doesBucketExistV2(bucket); + } + @Override public String uploadRecordsToBucket(final SerializableBuffer recordsData, final String namespace, diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBuffer.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBuffer.java index aeec54023e4e..9f983d1b8aa5 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBuffer.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBuffer.java @@ -28,8 +28,8 @@ public class AvroSerializedBuffer extends BaseSerializedBuffer { private final CodecFactory codecFactory; private final Schema schema; - private final AvroRecordFactory avroRecordFactory; - private DataFileWriter dataFileWriter; + protected final AvroRecordFactory avroRecordFactory; + protected DataFileWriter dataFileWriter; public AvroSerializedBuffer(final BufferStorage bufferStorage, final CodecFactory codecFactory, final Schema schema) throws Exception { super(bufferStorage); diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/S3AvroFormatConfig.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/S3AvroFormatConfig.java index a4649b04bb04..25ad461cabba 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/S3AvroFormatConfig.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/S3AvroFormatConfig.java @@ -17,6 +17,11 @@ public class S3AvroFormatConfig implements S3FormatConfig { private final CodecFactory codecFactory; private final Long partSize; + public S3AvroFormatConfig(final CodecFactory codecFactory, final long partSize) { + this.codecFactory = codecFactory; + this.partSize = partSize; + } + public S3AvroFormatConfig(final JsonNode formatConfig) { this.codecFactory = parseCodecConfig(formatConfig.get("compression_codec")); this.partSize = formatConfig.get(PART_SIZE_MB_ARG_NAME) != null From 7a586bdf601a4aae3d3f0f5579a9db916516ea45 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 6 Apr 2022 18:22:21 -0700 Subject: [PATCH 02/12] Add comments --- .../destination/bigquery/BigQueryAvroSerializedBuffer.java | 6 ++++++ .../destination/bigquery/BigQueryStagingOperations.java | 3 +++ 2 files changed, 9 insertions(+) diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAvroSerializedBuffer.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAvroSerializedBuffer.java index 27fe0d0747e0..120e71010e3f 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAvroSerializedBuffer.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAvroSerializedBuffer.java @@ -18,6 +18,12 @@ import org.apache.avro.file.CodecFactory; import org.apache.commons.lang3.StringUtils; +/** + * This class differs from {@link AvroSerializedBuffer} in that 1) the Avro schema can be + * customized by the caller, and 2) the message is formatted by {@link BigQueryRecordFormatter}. + * In this way, this buffer satisfies the needs of both the standard and the denormalized BigQuery + * destinations. + */ public class BigQueryAvroSerializedBuffer extends AvroSerializedBuffer { private final BigQueryRecordFormatter recordFormatter; diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingOperations.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingOperations.java index 3dc18b7b3500..ae8957ee9b39 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingOperations.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingOperations.java @@ -6,6 +6,9 @@ import io.airbyte.protocol.models.DestinationSyncMode; import java.util.List; +/** + * This interface is similar to {@link io.airbyte.integrations.destination.s3.BlobStorageOperations}. + */ public interface BigQueryStagingOperations { String getStagingFullPath(final String datasetId, final String stream); From dff488948b062761d4adf6e2337ecdfd42b85b74 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 6 Apr 2022 18:39:13 -0700 Subject: [PATCH 03/12] Uncomment test code --- .../bigquery/formatter/BigQueryRecordFormatter.java | 8 +++----- .../bigquery/uploader/AbstractBigQueryUploader.java | 5 ++--- .../destination/bigquery/BigQueryGcsDestinationTest.java | 6 +++--- 3 files changed, 8 insertions(+), 11 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java index 54e07e3540ac..08db217fc927 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java @@ -34,12 +34,12 @@ public abstract class BigQueryRecordFormatter { protected final Set invalidKeys = new HashSet<>(); protected final Set fieldsContainRefDefinitionValue = new HashSet<>(); - public BigQueryRecordFormatter(JsonNode jsonSchema, StandardNameTransformer namingResolver) { + public BigQueryRecordFormatter(final JsonNode jsonSchema, final StandardNameTransformer namingResolver) { this.namingResolver = namingResolver; this.jsonSchema = formatJsonSchema(jsonSchema.deepCopy()); } - protected JsonNode formatJsonSchema(JsonNode jsonSchema) { + protected JsonNode formatJsonSchema(final JsonNode jsonSchema) { // Do nothing by default return jsonSchema; }; @@ -59,7 +59,7 @@ public JsonNode getJsonSchema() { protected abstract Schema getBigQuerySchema(JsonNode jsonSchema); - protected void logFieldFail(String error, String fieldName) { + protected void logFieldFail(final String error, final String fieldName) { mapOfFailedFields.putIfAbsent(error, new HashSet<>()); mapOfFailedFields.get(error).add(fieldName); } @@ -72,8 +72,6 @@ public void printAndCleanFieldFails() { error, String.join(", ", fieldNames))); mapOfFailedFields.clear(); - } else { - LOGGER.info("No field fails during record format."); } } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java index c105087bf2fc..b7230fce0c82 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java @@ -73,17 +73,16 @@ public void upload(final AirbyteMessage airbyteMessage) { public void close(final boolean hasFailed, final Consumer outputRecordCollector, final AirbyteMessage lastStateMessage) { try { - LOGGER.info("Field fails during format : "); recordFormatter.printAndCleanFieldFails(); - LOGGER.info("Closing connector:" + this); + LOGGER.info("Closing connector: {}", this); this.writer.close(hasFailed); if (!hasFailed) { uploadData(outputRecordCollector, lastStateMessage); } this.postProcessAction(hasFailed); - LOGGER.info("Closed connector:" + this); + LOGGER.info("Closed connector: {}", this); } catch (final Exception e) { LOGGER.error(String.format("Failed to close %s writer, \n details: %s", this, e.getMessage())); printHeapMemoryConsumption(); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationTest.java index 1fc909c87bb4..93dca8843be0 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationTest.java @@ -143,9 +143,9 @@ protected void tearDownGcs() { if (keysToDelete.size() > 0) { LOGGER.info("Tearing down test bucket path: {}/{}", gcsBucketName, gcs_bucket_path); // Google Cloud Storage doesn't accept request to delete multiple objects -// for (final KeyVersion keyToDelete : keysToDelete) { -// s3Client.deleteObject(gcsBucketName, keyToDelete.getKey()); -// } + for (final KeyVersion keyToDelete : keysToDelete) { + s3Client.deleteObject(gcsBucketName, keyToDelete.getKey()); + } LOGGER.info("Deleted {} file(s).", keysToDelete.size()); } } From d731fe61df7d47ad47cc5837b7ab147c2fdb30a3 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 6 Apr 2022 20:11:50 -0700 Subject: [PATCH 04/12] Format code --- .../BigQueryAvroSerializedBuffer.java | 17 +++++++++----- .../bigquery/BigQueryDestination.java | 8 +++---- .../bigquery/BigQueryGcsOperations.java | 7 +++++- .../BigQueryStagingConsumerFactory.java | 7 +++++- .../bigquery/BigQueryStagingOperations.java | 11 +++++++-- .../bigquery/BigQueryWriteConfig.java | 23 +++++++++++-------- .../uploader/AbstractBigQueryUploader.java | 6 ++--- .../uploader/BigQueryDirectUploader.java | 2 -- .../destination/s3/S3StorageOperations.java | 1 + .../SnowflakeS3StagingSqlOperations.java | 4 ++-- 10 files changed, 55 insertions(+), 31 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAvroSerializedBuffer.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAvroSerializedBuffer.java index 120e71010e3f..fb60cc4150ab 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAvroSerializedBuffer.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAvroSerializedBuffer.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.destination.bigquery; import com.fasterxml.jackson.databind.JsonNode; @@ -19,10 +23,9 @@ import org.apache.commons.lang3.StringUtils; /** - * This class differs from {@link AvroSerializedBuffer} in that 1) the Avro schema can be - * customized by the caller, and 2) the message is formatted by {@link BigQueryRecordFormatter}. - * In this way, this buffer satisfies the needs of both the standard and the denormalized BigQuery - * destinations. + * This class differs from {@link AvroSerializedBuffer} in that 1) the Avro schema can be customized + * by the caller, and 2) the message is formatted by {@link BigQueryRecordFormatter}. In this way, + * this buffer satisfies the needs of both the standard and the denormalized BigQuery destinations. */ public class BigQueryAvroSerializedBuffer extends AvroSerializedBuffer { @@ -31,7 +34,8 @@ public class BigQueryAvroSerializedBuffer extends AvroSerializedBuffer { public BigQueryAvroSerializedBuffer(final BufferStorage bufferStorage, final CodecFactory codecFactory, final Schema schema, - final BigQueryRecordFormatter recordFormatter) throws Exception { + final BigQueryRecordFormatter recordFormatter) + throws Exception { super(bufferStorage, codecFactory, schema); this.recordFormatter = recordFormatter; } @@ -53,7 +57,8 @@ public static CheckedBiFunction new RuntimeException(String.format("No such stream %s.%s", pair.getNamespace(), pair.getName()))) .getStream(); - return new BigQueryAvroSerializedBuffer(createStorageFunction.call(), codecFactory, schemaCreator.apply(stream), recordFormatterCreator.apply(stream.getJsonSchema())); + return new BigQueryAvroSerializedBuffer(createStorageFunction.call(), codecFactory, schemaCreator.apply(stream), + recordFormatterCreator.apply(stream.getJsonSchema())); }; } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index 78f3b45c974f..2baadc9a216e 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -187,7 +187,8 @@ private ServiceAccountCredentials getServiceAccountCredentials(final JsonNode co @Override public AirbyteMessageConsumer getConsumer(final JsonNode config, final ConfiguredAirbyteCatalog catalog, - final Consumer outputRecordCollector) throws IOException { + final Consumer outputRecordCollector) + throws IOException { final UploadingMethod uploadingMethod = BigQueryUtils.getLoadingMethod(config); if (uploadingMethod == UploadingMethod.STANDARD) { return getStandardRecordConsumer(config, catalog, outputRecordCollector); @@ -246,13 +247,12 @@ protected String getTargetTableName(final String streamName) { private AirbyteMessageConsumer getStandardRecordConsumer(final JsonNode config, final ConfiguredAirbyteCatalog catalog, - final Consumer outputRecordCollector) throws IOException { + final Consumer outputRecordCollector) + throws IOException { final Map> writeConfigs = getUploaderMap(config, catalog); return new BigQueryRecordConsumer(writeConfigs, outputRecordCollector); } - - public AirbyteMessageConsumer getGcsRecordConsumer(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final Consumer outputRecordCollector) { diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java index ccc27e62eb84..d8a13d29c2a8 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.destination.bigquery; import com.google.cloud.bigquery.BigQuery; @@ -106,7 +110,8 @@ public String uploadRecordsToStage(final String datasetId, final String stream, } /** - * Reference https://googleapis.dev/java/google-cloud-clients/latest/index.html?com/google/cloud/bigquery/package-summary.html + * Reference + * https://googleapis.dev/java/google-cloud-clients/latest/index.html?com/google/cloud/bigquery/package-summary.html */ @Override public void copyIntoTmpTableFromStage(final String datasetId, diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java index 088491668c92..684d87ee17e2 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.destination.bigquery; import com.fasterxml.jackson.databind.JsonNode; @@ -140,7 +144,8 @@ private CheckedConsumer onCloseFunction(final BigQueryStagin final String stream = writeConfig.streamName(); try { - bigQueryGcsOperations.copyIntoTmpTableFromStage(datasetId, stream, writeConfig.tmpTableId(), writeConfig.tableSchema(), writeConfig.stagedFiles()); + bigQueryGcsOperations.copyIntoTmpTableFromStage(datasetId, stream, writeConfig.tmpTableId(), writeConfig.tableSchema(), + writeConfig.stagedFiles()); } catch (final Exception e) { bigQueryGcsOperations.cleanUpStage(datasetId, stream, writeConfig.stagedFiles()); final String stagingPath = bigQueryGcsOperations.getStagingFullPath(datasetId, stream); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingOperations.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingOperations.java index ae8957ee9b39..6548c3b023a3 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingOperations.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingOperations.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.destination.bigquery; import com.google.cloud.bigquery.Schema; @@ -7,7 +11,8 @@ import java.util.List; /** - * This interface is similar to {@link io.airbyte.integrations.destination.s3.BlobStorageOperations}. + * This interface is similar to + * {@link io.airbyte.integrations.destination.s3.BlobStorageOperations}. */ public interface BigQueryStagingOperations { @@ -25,7 +30,8 @@ void copyIntoTmpTableFromStage(final String datasetId, final String stream, final TableId tmpTableId, final Schema schema, - final List stagedFiles) throws Exception; + final List stagedFiles) + throws Exception; void cleanUpStage(final String datasetId, final String stream, final List stagedFiles); @@ -38,4 +44,5 @@ void copyIntoTargetTable(final String datasetId, void dropTableIfExists(final String datasetId, final TableId tmpTableId); void dropStageIfExists(final String datasetId, final String stream); + } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryWriteConfig.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryWriteConfig.java index e29f2a7e5bef..0a6017c8b069 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryWriteConfig.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryWriteConfig.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.destination.bigquery; import com.google.cloud.bigquery.Schema; @@ -12,16 +16,15 @@ * @param datasetId the dataset ID is equivalent to output schema */ public record BigQueryWriteConfig( - String streamName, - String namespace, - String datasetId, - String datasetLocation, - TableId tmpTableId, - TableId targetTableId, - Schema tableSchema, - DestinationSyncMode syncMode, - List stagedFiles -) { + String streamName, + String namespace, + String datasetId, + String datasetLocation, + TableId tmpTableId, + TableId targetTableId, + Schema tableSchema, + DestinationSyncMode syncMode, + List stagedFiles) { private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryWriteConfig.class); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java index b7230fce0c82..0ebbf354dc8c 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java @@ -168,9 +168,9 @@ public static void partitionIfUnpartitioned(final BigQuery bigQuery, final Schem // https://cloud.google.com/bigquery/docs/managing-tables#copying_a_single_source_table public static void copyTable(final BigQuery bigQuery, - final TableId sourceTableId, - final TableId destinationTableId, - final JobInfo.WriteDisposition syncMode) { + final TableId sourceTableId, + final TableId destinationTableId, + final JobInfo.WriteDisposition syncMode) { final CopyJobConfiguration configuration = CopyJobConfiguration.newBuilder(destinationTableId, sourceTableId) .setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED) .setWriteDisposition(syncMode) diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryDirectUploader.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryDirectUploader.java index e813b4424b7b..eba183c4b9a3 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryDirectUploader.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryDirectUploader.java @@ -10,8 +10,6 @@ import io.airbyte.integrations.destination.bigquery.writer.BigQueryTableWriter; import io.airbyte.protocol.models.AirbyteMessage; import java.util.function.Consumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class BigQueryDirectUploader extends AbstractBigQueryUploader { diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java index e60610d59b4c..d6da8b29cc96 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java @@ -126,6 +126,7 @@ public String uploadRecordsToBucket(final SerializableBuffer recordsData, /** * Upload the file from {@code recordsData} to S3 and simplify the filename as .. + * * @return the uploaded filename, which is different from the serialized buffer filename */ private String loadDataIntoBucket(final String objectPath, final SerializableBuffer recordsData) throws IOException { diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingSqlOperations.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingSqlOperations.java index a659c8af36db..fd6c7201ce6c 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingSqlOperations.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingSqlOperations.java @@ -65,8 +65,8 @@ public String uploadRecordsToStage(final JdbcDatabase database, final String schemaName, final String stageName, final String stagingPath) { - return AirbyteSentry.queryWithTracing("UploadRecordsToStage", () -> - s3StorageOperations.uploadRecordsToBucket(recordsData, schemaName, stageName, stagingPath), + return AirbyteSentry.queryWithTracing("UploadRecordsToStage", + () -> s3StorageOperations.uploadRecordsToBucket(recordsData, schemaName, stageName, stagingPath), Map.of("stage", stageName, "path", stagingPath)); } From 69a2ad492e7a9f1a529ad70e935e83ba08911063 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 6 Apr 2022 20:19:49 -0700 Subject: [PATCH 05/12] Bump versions --- airbyte-integrations/builds.md | 1 + .../connectors/destination-bigquery-denormalized/Dockerfile | 2 +- .../connectors/destination-bigquery/Dockerfile | 2 +- docs/integrations/destinations/bigquery.md | 3 ++- 4 files changed, 5 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/builds.md b/airbyte-integrations/builds.md index 536bed04b0fc..62bfda98ba17 100644 --- a/airbyte-integrations/builds.md +++ b/airbyte-integrations/builds.md @@ -118,6 +118,7 @@ |:---------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | Azure Blob Storage | [![destination-azure-blob-storage](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-azure-blob-storage%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-azure-blob-storage) | | BigQuery | [![destination-bigquery](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-bigquery%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-bigquery) | +| BigQuery Denormalized | [![destination-bigquery-denormalized](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-bigquery-denormalized%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-bigquery-denormalized) | | ClickHouse | [![destination-clickhouse](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-clickhouse%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-clickhouse) | | Cassandra | [![destination-cassandra](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-cassandra%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-cassandra) | | Databricks | [![destination-databricks](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-databricks%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-databricks) | diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile index e42653cb75d8..6ea8d5e32feb 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.2.15 +LABEL io.airbyte.version=0.3.0 LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized diff --git a/airbyte-integrations/connectors/destination-bigquery/Dockerfile b/airbyte-integrations/connectors/destination-bigquery/Dockerfile index 20434f895cca..2eff9daaae37 100644 --- a/airbyte-integrations/connectors/destination-bigquery/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery/Dockerfile @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.2 +LABEL io.airbyte.version=1.1.0 LABEL io.airbyte.name=airbyte/destination-bigquery diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index a7285da10db7..a1fc93e7ee1c 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -207,7 +207,7 @@ This uploads data directly from your source to BigQuery. While this is faster to | Version | Date | Pull Request | Subject | |:--------| :--- | :--- | :--- | -| 1.0.2 | 2022-03-30 | [11620](https://github.com/airbytehq/airbyte/pull/11620) | Updated spec | +| 1.1.0 | 2022-04-06 | [11776](https://github.com/airbytehq/airbyte/pull/11776) | Use serialized buffering strategy to reduce memory consumption. | | 1.0.1 | 2022-03-24 | [11350](https://github.com/airbytehq/airbyte/pull/11350) | Improve check performance | | 1.0.0 | 2022-03-18 | [11238](https://github.com/airbytehq/airbyte/pull/11238) | Updated spec and documentation | | 0.6.12 | 2022-03-18 | [10793](https://github.com/airbytehq/airbyte/pull/10793) | Fix namespace with invalid characters | @@ -236,6 +236,7 @@ This uploads data directly from your source to BigQuery. While this is faster to | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------| :--- | +| 0.3.0 | 2022-04-06 | [11776](https://github.com/airbytehq/airbyte/pull/11776) | Use serialized buffering strategy to reduce memory consumption. | | 0.2.15 | 2022-04-05 | [11166](https://github.com/airbytehq/airbyte/pull/11166) | Fixed handling of anyOf and allOf fields | | 0.2.14 | 2022-04-02 | [11620](https://github.com/airbytehq/airbyte/pull/11620) | Updated spec | | 0.2.13 | 2022-04-01 | [11636](https://github.com/airbytehq/airbyte/pull/11636) | Added new unit tests | From cc523a8e9c827ab149028c12a2761d2357d728c7 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Thu, 7 Apr 2022 01:39:52 -0700 Subject: [PATCH 06/12] Fix denormalized destination target table name --- .../bigquery/BigQueryDenormalizedDestination.java | 9 +++++++++ .../destination/bigquery/BigQueryDestination.java | 7 ++++++- .../bigquery/BigQueryGcsOperations.java | 4 ++-- .../bigquery/BigQueryStagingConsumerFactory.java | 14 ++++++++------ 4 files changed, 25 insertions(+), 9 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java index b3a0d5ba6ae0..452d9d765100 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java @@ -56,6 +56,15 @@ protected Function getRecordFormatterCreator( return streamSchema -> new GcsBigQueryDenormalizedRecordFormatter(streamSchema, namingResolver); } + /** + * This BigQuery destination does not write to a staging "raw" table but directly to a normalized + * table. + */ + @Override + protected Function getTargetTableNameTransformer(final BigQuerySQLNameTransformer namingResolver) { + return namingResolver::getIdentifier; + } + public static void main(final String[] args) throws Exception { final Destination destination = new BigQueryDenormalizedDestination(); new IntegrationRunner(destination).run(args); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index 2baadc9a216e..40559bcabdd5 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -289,7 +289,8 @@ public AirbyteMessageConsumer getGcsRecordConsumer(final JsonNode config, stagingId, syncDatetime, recordFormatterCreator, - namingResolver); + namingResolver::getTmpTableName, + getTargetTableNameTransformer(namingResolver)); } protected Function getAvroSchemaCreator() { @@ -300,6 +301,10 @@ protected Function getRecordFormatterCreator( return streamSchema -> new GcsAvroBigQueryRecordFormatter(streamSchema, namingResolver); } + protected Function getTargetTableNameTransformer(final BigQuerySQLNameTransformer namingResolver) { + return namingResolver::getRawTableName; + } + public static void main(final String[] args) throws Exception { final Destination destination = new BigQueryDestination(); new IntegrationRunner(destination).run(args); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java index d8a13d29c2a8..8d4e35098991 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java @@ -57,7 +57,7 @@ public BigQueryGcsOperations(final BigQuery bigQuery, } /** - * @return /_ + * @return {@code /_} */ private String getStagingRootPath(final String datasetId, final String stream) { return gcsNameTransformer.applyDefaultCase(String.format("%s/%s_%s", @@ -67,7 +67,7 @@ private String getStagingRootPath(final String datasetId, final String stream) { } /** - * @return /_////// + * @return {@code /_//////} */ @Override public String getStagingFullPath(final String datasetId, final String stream) { diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java index 684d87ee17e2..27e787524d05 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java @@ -14,7 +14,6 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; -import io.airbyte.integrations.destination.StandardNameTransformer; import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer; import io.airbyte.integrations.destination.record_buffer.SerializableBuffer; @@ -44,13 +43,15 @@ public AirbyteMessageConsumer create(final JsonNode config, final UUID stagingId, final DateTime syncDatetime, final Function recordFormatterCreator, - final StandardNameTransformer bigQueryNameTransformer) { + final Function tmpTableNameTransformer, + final Function targetTableNameTransformer) { LOGGER.info("Creating BigQuery staging message consumer with staging ID {} at {}", stagingId, syncDatetime); final Map writeConfigs = createWriteConfigs( config, catalog, recordFormatterCreator, - bigQueryNameTransformer); + tmpTableNameTransformer, + targetTableNameTransformer); return new BufferedStreamConsumer( outputRecordCollector, @@ -67,7 +68,8 @@ public AirbyteMessageConsumer create(final JsonNode config, private Map createWriteConfigs(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final Function recordFormatterCreator, - final StandardNameTransformer bigQueryNameTransformer) { + final Function tmpTableNameTransformer, + final Function targetTableNameTransformer) { return catalog.getStreams().stream() .map(configuredStream -> { Preconditions.checkNotNull(configuredStream.getDestinationSyncMode(), "Undefined destination sync mode"); @@ -81,8 +83,8 @@ private Map createWriteConf stream.getNamespace(), BigQueryUtils.getSchema(config, configuredStream), BigQueryUtils.getDatasetLocation(config), - bigQueryNameTransformer.getTmpTableName(streamName), - bigQueryNameTransformer.getRawTableName(streamName), + tmpTableNameTransformer.apply(streamName), + targetTableNameTransformer.apply(streamName), recordFormatter.getBigQuerySchema(), configuredStream.getDestinationSyncMode()); From d02b3add0d8ba783fbc5a5354df87fb01bc88e07 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Thu, 7 Apr 2022 02:22:50 -0700 Subject: [PATCH 07/12] Fix avro schema for denormalized destination --- .../bigquery/BigQueryDenormalizedDestination.java | 10 ++++++---- .../BigQueryDenormalizedGcsDestinationTest.java | 4 ++++ .../bigquery/BigQueryAvroSerializedBuffer.java | 8 +++++--- .../destination/bigquery/BigQueryDestination.java | 7 ++++--- 4 files changed, 19 insertions(+), 10 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java index 452d9d765100..bed1d31255df 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.destination.bigquery; import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; @@ -12,8 +13,8 @@ import io.airbyte.integrations.destination.bigquery.formatter.GcsBigQueryDenormalizedRecordFormatter; import io.airbyte.integrations.destination.bigquery.uploader.UploaderType; import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter; -import io.airbyte.protocol.models.AirbyteStream; import java.util.Map; +import java.util.function.BiFunction; import java.util.function.Function; import org.apache.avro.Schema; @@ -46,9 +47,10 @@ protected boolean isDefaultAirbyteTmpTableSchema() { } @Override - protected Function getAvroSchemaCreator() { - return stream -> new JsonToAvroSchemaConverter().getAvroSchema(stream.getJsonSchema(), stream.getName(), - stream.getNamespace(), true, false, false, true); + protected BiFunction getAvroSchemaCreator() { + // the json schema needs to be processed by the record former to denormalize + return (formatter, pair) -> new JsonToAvroSchemaConverter().getAvroSchema(formatter.getJsonSchema(), pair.getName(), + pair.getNamespace(), true, false, false, true); } @Override diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationTest.java index 7c67c7bc6bc7..4052a89d9b18 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationTest.java @@ -236,6 +236,7 @@ void testNestedWrite(final JsonNode schema, final AirbyteMessage message) throws final BigQueryDestination destination = new BigQueryDenormalizedDestination(); final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); + consumer.start(); consumer.accept(message); consumer.close(); @@ -257,6 +258,7 @@ void testWriteWithFormat() throws Exception { final BigQueryDestination destination = new BigQueryDenormalizedDestination(); final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); + consumer.start(); consumer.accept(MESSAGE_USERS3); consumer.close(); @@ -293,6 +295,7 @@ void testIfJSONDateTimeWasConvertedToBigQueryFormat() throws Exception { final BigQueryDestination destination = new BigQueryDenormalizedDestination(); final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); + consumer.start(); consumer.accept(MESSAGE_USERS4); consumer.close(); @@ -317,6 +320,7 @@ void testJsonReferenceDefinition() throws Exception { final BigQueryDestination destination = new BigQueryDenormalizedDestination(); final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); + consumer.start(); consumer.accept(MESSAGE_USERS5); consumer.accept(MESSAGE_USERS6); consumer.accept(EMPTY_MESSAGE); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAvroSerializedBuffer.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAvroSerializedBuffer.java index fb60cc4150ab..3c803815db84 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAvroSerializedBuffer.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAvroSerializedBuffer.java @@ -17,6 +17,7 @@ import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.io.IOException; import java.util.concurrent.Callable; +import java.util.function.BiFunction; import java.util.function.Function; import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; @@ -46,8 +47,8 @@ protected void writeRecord(final AirbyteRecordMessage recordMessage) throws IOEx } public static CheckedBiFunction createFunction(final S3AvroFormatConfig config, - final Function schemaCreator, final Function recordFormatterCreator, + final BiFunction schemaCreator, final Callable createStorageFunction) { final CodecFactory codecFactory = config.getCodecFactory(); return (pair, catalog) -> { @@ -57,8 +58,9 @@ public static CheckedBiFunction new RuntimeException(String.format("No such stream %s.%s", pair.getNamespace(), pair.getName()))) .getStream(); - return new BigQueryAvroSerializedBuffer(createStorageFunction.call(), codecFactory, schemaCreator.apply(stream), - recordFormatterCreator.apply(stream.getJsonSchema())); + final BigQueryRecordFormatter recordFormatter = recordFormatterCreator.apply(stream.getJsonSchema()); + final Schema schema = schemaCreator.apply(recordFormatter, pair); + return new BigQueryAvroSerializedBuffer(createStorageFunction.call(), codecFactory, schema, recordFormatter); }; } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index 40559bcabdd5..373e6091abf5 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -52,6 +52,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import org.apache.avro.Schema; @@ -276,8 +277,8 @@ public AirbyteMessageConsumer getGcsRecordConsumer(final JsonNode config, final CheckedBiFunction onCreateBuffer = BigQueryAvroSerializedBuffer.createFunction( avroFormatConfig, - getAvroSchemaCreator(), recordFormatterCreator, + getAvroSchemaCreator(), () -> new FileBuffer(AvroSerializedBuffer.DEFAULT_SUFFIX)); return new BigQueryStagingConsumerFactory().create( @@ -293,8 +294,8 @@ public AirbyteMessageConsumer getGcsRecordConsumer(final JsonNode config, getTargetTableNameTransformer(namingResolver)); } - protected Function getAvroSchemaCreator() { - return stream -> GcsUtils.getDefaultAvroSchema(stream.getName(), stream.getNamespace(), true); + protected BiFunction getAvroSchemaCreator() { + return (formatter, pair) -> GcsUtils.getDefaultAvroSchema(pair.getName(), pair.getNamespace(), true); } protected Function getRecordFormatterCreator(final BigQuerySQLNameTransformer namingResolver) { From ff1485f9c016cd99056ad3d879eb328207f642e4 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Thu, 7 Apr 2022 02:31:48 -0700 Subject: [PATCH 08/12] Remove unnecessary params from consumer factory --- .../destination/bigquery/BigQueryDestination.java | 6 +++--- .../bigquery/BigQueryStagingConsumerFactory.java | 5 ----- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index 373e6091abf5..9f97cabf9ef2 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -257,9 +257,10 @@ private AirbyteMessageConsumer getStandardRecordConsumer(final JsonNode config, public AirbyteMessageConsumer getGcsRecordConsumer(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final Consumer outputRecordCollector) { + + final StandardNameTransformer gcsNameTransformer = new GcsNameTransformer(); final BigQuery bigQuery = getBigQuery(config); final GcsDestinationConfig gcsConfig = BigQueryUtils.getGcsAvroDestinationConfig(config); - final StandardNameTransformer gcsNameTransformer = new GcsNameTransformer(); final UUID stagingId = UUID.randomUUID(); final DateTime syncDatetime = DateTime.now(DateTimeZone.UTC); final boolean keepStagingFiles = BigQueryUtils.isKeepFilesInGcs(config); @@ -281,14 +282,13 @@ public AirbyteMessageConsumer getGcsRecordConsumer(final JsonNode config, getAvroSchemaCreator(), () -> new FileBuffer(AvroSerializedBuffer.DEFAULT_SUFFIX)); + LOGGER.info("Creating BigQuery staging message consumer with staging ID {} at {}", stagingId, syncDatetime); return new BigQueryStagingConsumerFactory().create( config, catalog, outputRecordCollector, bigQueryGcsOperations, onCreateBuffer, - stagingId, - syncDatetime, recordFormatterCreator, namingResolver::getTmpTableName, getTargetTableNameTransformer(namingResolver)); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java index 27e787524d05..58f0e9888fa6 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java @@ -22,12 +22,10 @@ import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.util.Map; -import java.util.UUID; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; -import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,12 +38,9 @@ public AirbyteMessageConsumer create(final JsonNode config, final Consumer outputRecordCollector, final BigQueryStagingOperations bigQueryGcsOperations, final CheckedBiFunction onCreateBuffer, - final UUID stagingId, - final DateTime syncDatetime, final Function recordFormatterCreator, final Function tmpTableNameTransformer, final Function targetTableNameTransformer) { - LOGGER.info("Creating BigQuery staging message consumer with staging ID {} at {}", stagingId, syncDatetime); final Map writeConfigs = createWriteConfigs( config, catalog, From f9aa7a8832334a469de82e0595525e8d82a05206 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Thu, 7 Apr 2022 02:50:54 -0700 Subject: [PATCH 09/12] Add back previous version --- docs/integrations/destinations/bigquery.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index a1fc93e7ee1c..ad91416cc53f 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -208,6 +208,7 @@ This uploads data directly from your source to BigQuery. While this is faster to | Version | Date | Pull Request | Subject | |:--------| :--- | :--- | :--- | | 1.1.0 | 2022-04-06 | [11776](https://github.com/airbytehq/airbyte/pull/11776) | Use serialized buffering strategy to reduce memory consumption. | +| 1.0.2 | 2022-03-30 | [11620](https://github.com/airbytehq/airbyte/pull/11620) | Updated spec | | 1.0.1 | 2022-03-24 | [11350](https://github.com/airbytehq/airbyte/pull/11350) | Improve check performance | | 1.0.0 | 2022-03-18 | [11238](https://github.com/airbytehq/airbyte/pull/11238) | Updated spec and documentation | | 0.6.12 | 2022-03-18 | [10793](https://github.com/airbytehq/airbyte/pull/10793) | Fix namespace with invalid characters | From 149f1090eeddefb05e544de073c2f25659846129 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Thu, 7 Apr 2022 13:52:54 -0700 Subject: [PATCH 10/12] Add warning about standard mode --- .../integrations/destination/bigquery/BigQueryDestination.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index 9f97cabf9ef2..f44fe4ea4bec 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -192,6 +192,8 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, throws IOException { final UploadingMethod uploadingMethod = BigQueryUtils.getLoadingMethod(config); if (uploadingMethod == UploadingMethod.STANDARD) { + LOGGER.warn("The \"standard\" upload mode is not performant, and is not recommended for production. " + + "Please use the GCS upload mode if you are syncing a large amount of data."); return getStandardRecordConsumer(config, catalog, outputRecordCollector); } else { return getGcsRecordConsumer(config, catalog, outputRecordCollector); From 0578f1af77cb1d8c6f78247c4e8b1ee1658b3fd1 Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Thu, 7 Apr 2022 21:32:34 +0000 Subject: [PATCH 11/12] auto-bump connector version --- .../seed/destination_definitions.yaml | 2 +- .../resources/seed/destination_specs.yaml | 66 +++++++++++-------- 2 files changed, 41 insertions(+), 27 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index adf35aae9727..b5fbad18784f 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -36,7 +36,7 @@ - name: BigQuery (denormalized typed struct) destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496 dockerRepository: airbyte/destination-bigquery-denormalized - dockerImageTag: 0.2.15 + dockerImageTag: 0.3.0 documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery icon: bigquery.svg resourceRequirements: diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index ddb3cdd99a6e..0d54f23bd3ec 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -495,7 +495,7 @@ - "overwrite" - "append" - "append_dedup" -- dockerImage: "airbyte/destination-bigquery-denormalized:0.2.15" +- dockerImage: "airbyte/destination-bigquery-denormalized:0.3.0" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery" connectionSpecification: @@ -508,14 +508,13 @@ additionalProperties: true properties: big_query_client_buffer_size_mb: - title: "Google BigQuery client chunk size" - description: "Google BigQuery client's chunk (buffer) size (MIN = 1, MAX\ - \ = 15) for each table. The size that will be written by a single RPC.\ - \ Written data will be buffered and only flushed upon reaching this size\ - \ or closing the channel. It defaults to 15MiB. Smaller chunk size means\ - \ less memory consumption, and is recommended for big data sets. For more\ - \ details refer to the documentation here" + title: "Google BigQuery Client Chunk Size (Optional)" + description: "Google BigQuery client's chunk (buffer) size (MIN=1, MAX =\ + \ 15) for each table. The size that will be written by a single RPC. Written\ + \ data will be buffered and only flushed upon reaching this size or closing\ + \ the channel. The default 15MB value is used if not set explicitly. Read\ + \ more here." type: "integer" minimum: 1 maximum: 15 @@ -525,18 +524,22 @@ project_id: type: "string" description: "The GCP project ID for the project containing the target BigQuery\ - \ dataset." + \ dataset. Read more here." title: "Project ID" dataset_id: type: "string" - description: "Default BigQuery Dataset ID tables are replicated to if the\ - \ source does not specify a namespace." + description: "The default BigQuery Dataset ID that tables are replicated\ + \ to if the source does not specify a namespace. Read more here." title: "Default Dataset ID" dataset_location: type: "string" description: "The location of the dataset. Warning: Changes made after creation\ - \ will not be applied." - title: "Dataset Location" + \ will not be applied. The default \"US\" value is used if not set explicitly.\ + \ Read more here." + title: "Dataset Location (Optional)" default: "US" enum: - "US" @@ -573,19 +576,26 @@ credentials_json: type: "string" description: "The contents of the JSON service account key. Check out the\ - \ docs if you need help generating this key. Default credentials will\ \ be used if this field is left empty." - title: "Credentials JSON" + title: "Service Account Key JSON (Optional)" airbyte_secret: true loading_method: type: "object" - title: "Loading Method" - description: "Select the way that data will be uploaded to BigQuery." + title: "Loading Method *" + description: "Loading method used to send select the way data will be uploaded\ + \ to BigQuery.
Standard Inserts - Direct uploading using SQL\ + \ INSERT statements. This method is extremely inefficient and provided\ + \ only for quick testing. In almost all cases, you should use staging.\ + \
GCS Staging - Writes large batches of records to a file,\ + \ uploads the file to GCS, then uses COPY INTO table to upload\ + \ the file. Recommended for most workloads for better speed and scalability.\ + \ Read more about GCS Staging here." oneOf: - title: "Standard Inserts" additionalProperties: false - description: "Direct uploading using streams." required: - "method" properties: @@ -594,9 +604,6 @@ const: "Standard" - title: "GCS Staging" additionalProperties: false - description: "Writes large batches of records to a file, uploads the file\ - \ to GCS, then uses
COPY INTO table
to upload the file. Recommended\ - \ for large production workloads for better speed and scalability." required: - "method" - "gcs_bucket_name" @@ -609,16 +616,18 @@ gcs_bucket_name: title: "GCS Bucket Name" type: "string" - description: "The name of the GCS bucket." + description: "The name of the GCS bucket. Read more here." examples: - "airbyte_sync" gcs_bucket_path: + title: "GCS Bucket Path" description: "Directory under the GCS bucket where data will be written." type: "string" examples: - "data_sync/test" part_size_mb: - title: "Block Size (MB) for GCS multipart upload" + title: "Block Size (MB) for GCS Multipart Upload (Optional)" description: "This is the size of a \"Part\" being buffered in memory.\ \ It limits the memory usage when writing. Larger values will allow\ \ to upload a bigger files and improve the speed, but consumes more\ @@ -633,14 +642,19 @@ type: "string" description: "This upload method is supposed to temporary store records\ \ in GCS bucket. What do you want to do with data in GCS bucket\ - \ when migration has finished?" - title: "GCS tmp files afterward processing" + \ when migration has finished? The default \"Delete all tmp files\ + \ from GCS\" value is used if not set explicitly." + title: "GCS Tmp Files Afterward Processing (Optional)" default: "Delete all tmp files from GCS" enum: - "Delete all tmp files from GCS" - "Keep all tmp files in GCS" credential: title: "Credential" + description: "An HMAC key is a type of credential and can be associated\ + \ with a service account or a user account in Cloud Storage. Read\ + \ more here." type: "object" oneOf: - title: "HMAC key" From cbb0c987466445356a036099041a70b1486fc8f5 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Thu, 7 Apr 2022 16:57:53 -0700 Subject: [PATCH 12/12] Bump version for bigquery in seed --- .../src/main/resources/seed/destination_definitions.yaml | 2 +- .../init/src/main/resources/seed/destination_specs.yaml | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index b5fbad18784f..e306e43fb306 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -24,7 +24,7 @@ - name: BigQuery destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 dockerRepository: airbyte/destination-bigquery - dockerImageTag: 1.0.2 + dockerImageTag: 1.1.0 documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery icon: bigquery.svg resourceRequirements: diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 0d54f23bd3ec..566b743a2875 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -285,7 +285,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-bigquery:1.0.2" +- dockerImage: "airbyte/destination-bigquery:1.1.0" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery" connectionSpecification: @@ -314,7 +314,7 @@ project_id: type: "string" description: "The GCP project ID for the project containing the target BigQuery\ - \ dataset. Read more here." title: "Project ID" dataset_id: @@ -369,7 +369,7 @@ \ docs if you need help generating this key. Default credentials will\ \ be used if this field is left empty." - title: "Credentials JSON (Optional)" + title: "Service Account Key JSON (Optional)" airbyte_secret: true transformation_priority: type: "string"