diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/22f6c74f-5699-40ff-833c-4a879ea40133.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/22f6c74f-5699-40ff-833c-4a879ea40133.json index a903a2ea2c87..b39e46e12a21 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/22f6c74f-5699-40ff-833c-4a879ea40133.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/22f6c74f-5699-40ff-833c-4a879ea40133.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "22f6c74f-5699-40ff-833c-4a879ea40133", "name": "BigQuery", "dockerRepository": "airbyte/destination-bigquery", - "dockerImageTag": "0.3.12", + "dockerImageTag": "0.4.0", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery" } diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 17b184dc32f9..6fdfdffe2f1a 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -22,7 +22,7 @@ - destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 name: BigQuery dockerRepository: airbyte/destination-bigquery - dockerImageTag: 0.3.12 + dockerImageTag: 0.4.0 documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery - destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496 name: BigQuery (denormalized typed struct) diff --git a/airbyte-integrations/connectors/destination-bigquery/Dockerfile b/airbyte-integrations/connectors/destination-bigquery/Dockerfile index 868a819ce67f..5bb1b53cab83 100644 --- a/airbyte-integrations/connectors/destination-bigquery/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.3.12 +LABEL io.airbyte.version=0.4.0 LABEL io.airbyte.name=airbyte/destination-bigquery diff --git a/airbyte-integrations/connectors/destination-bigquery/README.md b/airbyte-integrations/connectors/destination-bigquery/README.md index 888d43581c8c..3ed753d3adae 100644 --- a/airbyte-integrations/connectors/destination-bigquery/README.md +++ b/airbyte-integrations/connectors/destination-bigquery/README.md @@ -1,3 +1,15 @@ +## Uploading options +There are 2 available options to upload data to bigquery `Standard` and `GCS Staging`. +- `Standard` is option to upload data directly from your source to BigQuery storage. This way is faster and requires less resources than GCS one. +Please be aware you may see some fails for big datasets and slow sources, i.e. if reading from source takes more than 10-12 hours. +It may happen if you have a slow connection to source and\or migrate a very big dataset. If that's a case, then select a GCS Uploading type. +This is caused by the Google BigQuery SDK client limitations. For more details please check https://github.com/airbytehq/airbyte/issues/3549 +- `GCS Uploading (CSV format)`. This approach has been implemented in order to avoid the issue for big datasets mentioned above. +At the first step all data is uploaded to GCS bucket and then all moved to BigQuery at one shot stream by stream. +The destination-gcs connector is partially used under the hood here, so you may check its documentation for more details. +There is no sense to use this uploading method if your migration doesn't take more than 10 hours and if you don't see the error like this in logs: +"PUT https://www.googleapis.com/upload/bigquery/v2/projects/some-project-name/jobs?uploadType=resumable&upload_id=some_randomly_generated_upload_id". + # BigQuery Test Configuration In order to test the BigQuery destination, you need a service account key file. @@ -10,9 +22,14 @@ As a community contributor, you will need access to a GCP project and BigQuery t 1. Click on `+ Create Service Account" button 1. Fill out a descriptive name/id/description 1. Click the edit icon next to the service account you created on the `IAM` page -1. Add the `BigQuery Data Editor` and `BigQuery User` role +1. Add the `BigQuery Data Editor`, `BigQuery User` and `GCS User` roles. For more details check https://cloud.google.com/storage/docs/access-control/iam-roles 1. Go back to the `Service Accounts` page and use the actions modal to `Create Key` 1. Download this key as a JSON file +1. Create an GCS bucket for testing. +1. Generate a [HMAC key](https://cloud.google.com/storage/docs/authentication/hmackeys) for the bucket with reading and writing permissions. Please note that currently only the HMAC key credential is supported. More credential types will be added in the future. +1. Paste the bucket and key information into the config files under [`./sample_secrets`](./sample_secrets). +1. Rename the directory from `sample_secrets` to `secrets`. +1. Feel free to modify the config files with different settings in the acceptance test file as long as they follow the schema defined in [spec.json](src/main/resources/spec.json). 1. Move and rename this file to `secrets/credentials.json` ## Airbyte Employee diff --git a/airbyte-integrations/connectors/destination-bigquery/build.gradle b/airbyte-integrations/connectors/destination-bigquery/build.gradle index bc2151571fb2..9889647d59bc 100644 --- a/airbyte-integrations/connectors/destination-bigquery/build.gradle +++ b/airbyte-integrations/connectors/destination-bigquery/build.gradle @@ -15,6 +15,8 @@ dependencies { implementation project(':airbyte-config:models') implementation project(':airbyte-integrations:bases:base-java') implementation project(':airbyte-protocol:models') + implementation project(':airbyte-integrations:connectors:destination-s3') + implementation project(':airbyte-integrations:connectors:destination-gcs') integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs) diff --git a/airbyte-integrations/connectors/destination-bigquery/sample_secret/credentials.json b/airbyte-integrations/connectors/destination-bigquery/sample_secret/credentials.json new file mode 100644 index 000000000000..3a208d588a20 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/sample_secret/credentials.json @@ -0,0 +1,24 @@ +{ + "basic_bigquery_config": { + "type": "service_account", + "project_id": "", + "private_key_id": "", + "private_key": "", + "client_email": "", + "client_id": "", + "auth_uri": "", + "token_uri": "", + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", + "client_x509_cert_url": "" + }, + "gcs_config": { + "gcs_bucket_name": "", + "gcs_bucket_path": "test_path", + "gcs_bucket_region": "us-west1", + "credential": { + "credential_type": "HMAC_KEY", + "hmac_key_access_id": "", + "hmac_key_secret": "" + } + } +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryConsts.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryConsts.java new file mode 100644 index 000000000000..3c020b3cae61 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryConsts.java @@ -0,0 +1,55 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.bigquery; + +public class BigQueryConsts { + + public static final int MiB = 1024 * 1024; + public static final String CONFIG_DATASET_ID = "dataset_id"; + public static final String CONFIG_PROJECT_ID = "project_id"; + public static final String CONFIG_DATASET_LOCATION = "dataset_location"; + public static final String CONFIG_CREDS = "credentials_json"; + public static final String BIG_QUERY_CLIENT_CHUNK_SIZE = "big_query_client_buffer_size_mb"; + + public static final String LOADING_METHOD = "loading_method"; + public static final String METHOD = "method"; + public static final String GCS_STAGING = "GCS Staging"; + public static final String GCS_BUCKET_NAME = "gcs_bucket_name"; + public static final String GCS_BUCKET_PATH = "gcs_bucket_path"; + public static final String GCS_BUCKET_REGION = "gcs_bucket_region"; + public static final String CREDENTIAL = "credential"; + public static final String FORMAT = "format"; + public static final String KEEP_GCS_FILES = "keep_files_in_gcs-bucket"; + public static final String KEEP_GCS_FILES_VAL = "Keep all tmp files in GCS"; + + // tests + public static final String BIGQUERY_BASIC_CONFIG = "basic_bigquery_config"; + public static final String GCS_CONFIG = "gcs_config"; + + public static final String CREDENTIAL_TYPE = "credential_type"; + public static final String HMAC_KEY_ACCESS_ID = "hmac_key_access_id"; + public static final String HMAC_KEY_ACCESS_SECRET = "hmac_key_secret"; + +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index 48ad99189ab5..48518a07817f 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -26,12 +26,11 @@ import static java.util.Objects.isNull; +import com.amazonaws.services.s3.AmazonS3; import com.fasterxml.jackson.databind.JsonNode; import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; -import com.google.cloud.bigquery.Dataset; -import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.FormatOptions; import com.google.cloud.bigquery.Job; @@ -52,6 +51,10 @@ import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.destination.gcs.GcsDestination; +import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; +import io.airbyte.integrations.destination.gcs.GcsS3Helper; +import io.airbyte.integrations.destination.gcs.csv.GcsCsvWriter; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.AirbyteMessage; @@ -61,6 +64,7 @@ import io.airbyte.protocol.models.DestinationSyncMode; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.sql.Timestamp; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -73,17 +77,13 @@ public class BigQueryDestination extends BaseConnector implements Destination { private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestination.class); - private static final int MiB = 1024 * 1024; - static final String CONFIG_DATASET_ID = "dataset_id"; - static final String CONFIG_PROJECT_ID = "project_id"; - static final String CONFIG_DATASET_LOCATION = "dataset_location"; - static final String CONFIG_CREDS = "credentials_json"; - static final String BIG_QUERY_CLIENT_CHUNK_SIZE = "big_query_client_buffer_size_mb"; private static final com.google.cloud.bigquery.Schema SCHEMA = com.google.cloud.bigquery.Schema.of( Field.of(JavaBaseConstants.COLUMN_NAME_AB_ID, StandardSQLTypeName.STRING), - Field.of(JavaBaseConstants.COLUMN_NAME_DATA, StandardSQLTypeName.STRING), - Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.TIMESTAMP)); + // GCS works with only date\datetime formats, so need to have it a string for a while + // https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv#data_types + Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.STRING), + Field.of(JavaBaseConstants.COLUMN_NAME_DATA, StandardSQLTypeName.STRING)); private final BigQuerySQLNameTransformer namingResolver; @@ -94,15 +94,27 @@ public BigQueryDestination() { @Override public AirbyteConnectionStatus check(JsonNode config) { try { - final String datasetId = config.get(CONFIG_DATASET_ID).asText(); - final String datasetLocation = getDatasetLocation(config); + final String datasetId = config.get(BigQueryConsts.CONFIG_DATASET_ID).asText(); + final String datasetLocation = BigQueryUtils.getDatasetLocation(config); final BigQuery bigquery = getBigQuery(config); - createSchemaTable(bigquery, datasetId, datasetLocation); + final UploadingMethod uploadingMethod = getLoadingMethod(config); + + BigQueryUtils.createSchemaTable(bigquery, datasetId, datasetLocation); QueryJobConfiguration queryConfig = QueryJobConfiguration .newBuilder(String.format("SELECT * FROM %s.INFORMATION_SCHEMA.TABLES LIMIT 1;", datasetId)) .setUseLegacySql(false) .build(); + // GCS upload time re-uses destination-GCS for check and other uploading (CSV format writer) + if (UploadingMethod.GCS.equals(uploadingMethod)) { + GcsDestination gcsDestination = new GcsDestination(); + JsonNode gcsJsonNodeConfig = BigQueryUtils.getGcsJsonNodeConfig(config); + AirbyteConnectionStatus airbyteConnectionStatus = gcsDestination.check(gcsJsonNodeConfig); + if (Status.FAILED == airbyteConnectionStatus.getStatus()) { + return new AirbyteConnectionStatus().withStatus(Status.FAILED).withMessage(airbyteConnectionStatus.getMessage()); + } + } + final ImmutablePair result = BigQueryUtils.executeQuery(bigquery, queryConfig); if (result.getLeft() != null) { return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); @@ -119,38 +131,22 @@ protected BigQuerySQLNameTransformer getNamingResolver() { return namingResolver; } - private static String getDatasetLocation(JsonNode config) { - if (config.has(CONFIG_DATASET_LOCATION)) { - return config.get(CONFIG_DATASET_LOCATION).asText(); - } else { - return "US"; - } - } - // https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.client.Client.html private Integer getBigQueryClientChunkSize(JsonNode config) { Integer chunkSizeFromConfig = null; - if (config.has(BIG_QUERY_CLIENT_CHUNK_SIZE)) { - chunkSizeFromConfig = config.get(BIG_QUERY_CLIENT_CHUNK_SIZE).asInt(); + if (config.has(BigQueryConsts.BIG_QUERY_CLIENT_CHUNK_SIZE)) { + chunkSizeFromConfig = config.get(BigQueryConsts.BIG_QUERY_CLIENT_CHUNK_SIZE).asInt(); if (chunkSizeFromConfig <= 0) { LOGGER.error("BigQuery client Chunk (buffer) size must be a positive number (MB), but was:" + chunkSizeFromConfig); throw new IllegalArgumentException("BigQuery client Chunk (buffer) size must be a positive number (MB)"); } - chunkSizeFromConfig = chunkSizeFromConfig * MiB; + chunkSizeFromConfig = chunkSizeFromConfig * BigQueryConsts.MiB; } return chunkSizeFromConfig; } - private void createSchemaTable(BigQuery bigquery, String datasetId, String datasetLocation) { - final Dataset dataset = bigquery.getDataset(datasetId); - if (dataset == null || !dataset.exists()) { - final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(datasetLocation).build(); - bigquery.create(datasetInfo); - } - } - private BigQuery getBigQuery(JsonNode config) { - final String projectId = config.get(CONFIG_PROJECT_ID).asText(); + final String projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(); try { BigQueryOptions.Builder bigQueryBuilder = BigQueryOptions.newBuilder(); @@ -159,7 +155,8 @@ private BigQuery getBigQuery(JsonNode config) { // handle the credentials json being passed as a json object or a json object already serialized as // a string. final String credentialsString = - config.get(CONFIG_CREDS).isObject() ? Jsons.serialize(config.get(CONFIG_CREDS)) : config.get(CONFIG_CREDS).asText(); + config.get(BigQueryConsts.CONFIG_CREDS).isObject() ? Jsons.serialize(config.get(BigQueryConsts.CONFIG_CREDS)) + : config.get(BigQueryConsts.CONFIG_CREDS).asText(); credentials = ServiceAccountCredentials .fromStream(new ByteArrayInputStream(credentialsString.getBytes(Charsets.UTF_8))); } @@ -174,7 +171,7 @@ private BigQuery getBigQuery(JsonNode config) { } public static boolean isUsingJsonCredentials(JsonNode config) { - return config.has(CONFIG_CREDS) && !config.get(CONFIG_CREDS).asText().isEmpty(); + return config.has(BigQueryConsts.CONFIG_CREDS) && !config.get(BigQueryConsts.CONFIG_CREDS).asText().isEmpty(); } /** @@ -202,10 +199,13 @@ public static boolean isUsingJsonCredentials(JsonNode config) { @Override public AirbyteMessageConsumer getConsumer(JsonNode config, ConfiguredAirbyteCatalog catalog, - Consumer outputRecordCollector) { + Consumer outputRecordCollector) + throws IOException { final BigQuery bigquery = getBigQuery(config); Map writeConfigs = new HashMap<>(); Set existingSchemas = new HashSet<>(); + boolean isGcsUploadingMode = UploadingMethod.GCS.equals(getLoadingMethod(config)); + final boolean isKeepFilesInGcs = isKeepFilesInGcs(config); // create tmp tables if not exist for (final ConfiguredAirbyteStream configStream : catalog.getStreams()) { @@ -214,8 +214,9 @@ public AirbyteMessageConsumer getConsumer(JsonNode config, final String schemaName = getSchema(config, configStream); final String tableName = getTargetTableName(streamName); final String tmpTableName = namingResolver.getTmpTableName(streamName); - final String datasetLocation = getDatasetLocation(config); - createSchemaAndTableIfNeeded(bigquery, existingSchemas, schemaName, tmpTableName, datasetLocation, stream.getJsonSchema()); + final String datasetLocation = BigQueryUtils.getDatasetLocation(config); + BigQueryUtils.createSchemaAndTableIfNeeded(bigquery, existingSchemas, schemaName, tmpTableName, + datasetLocation, getBigQuerySchema(stream.getJsonSchema())); final Schema schema = getBigQuerySchema(stream.getJsonSchema()); // https://cloud.google.com/bigquery/docs/loading-data-local#loading_data_from_a_local_data_source final WriteChannelConfiguration writeChannelConfiguration = WriteChannelConfiguration @@ -239,12 +240,45 @@ public AirbyteMessageConsumer getConsumer(JsonNode config, } final WriteDisposition syncMode = getWriteDisposition(configStream.getDestinationSyncMode()); - writeConfigs.put(AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream), - new BigQueryWriteConfig(TableId.of(schemaName, tableName), TableId.of(schemaName, tmpTableName), writer, syncMode, schema)); + if (isGcsUploadingMode) { + GcsDestinationConfig gcsDestinationConfig = GcsDestinationConfig + .getGcsDestinationConfig(BigQueryUtils.getGcsJsonNodeConfig(config)); + GcsCsvWriter gcsCsvWriter = initGcsWriter(gcsDestinationConfig, configStream); + gcsCsvWriter.initialize(); + + writeConfigs.put(AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream), + new BigQueryWriteConfig(TableId.of(schemaName, tableName), TableId.of(schemaName, tmpTableName), + writer, syncMode, schema, gcsCsvWriter, gcsDestinationConfig)); + } else { + writeConfigs.put(AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream), + new BigQueryWriteConfig(TableId.of(schemaName, tableName), TableId.of(schemaName, tmpTableName), + writer, syncMode, schema, null, null)); + } + } // write to tmp tables // if success copy delete main table if exists. rename tmp tables to real tables. - return getRecordConsumer(bigquery, writeConfigs, catalog, outputRecordCollector); + return getRecordConsumer(bigquery, writeConfigs, catalog, outputRecordCollector, isGcsUploadingMode, isKeepFilesInGcs); + } + + /** + * Despite the fact that uploading to going to be done to GCS, you may see the S3 client + * initialization. The S3 client appears to be compatible with GCS and widely used in + * destination-gcs connector. Since the destination-gcs connector is partially re-used here - we + * also need to init S3 client. + * + * @param gcsDestinationConfig + * @param configuredStream + * @return GcsCsvWriter + * @throws IOException + */ + private GcsCsvWriter initGcsWriter(GcsDestinationConfig gcsDestinationConfig, + ConfiguredAirbyteStream configuredStream) + throws IOException { + Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis()); + + AmazonS3 s3Client = GcsS3Helper.getGcsS3Client(gcsDestinationConfig); + return new GcsCsvWriter(gcsDestinationConfig, s3Client, configuredStream, uploadTimestamp); } protected String getTargetTableName(String streamName) { @@ -254,8 +288,10 @@ protected String getTargetTableName(String streamName) { protected AirbyteMessageConsumer getRecordConsumer(BigQuery bigquery, Map writeConfigs, ConfiguredAirbyteCatalog catalog, - Consumer outputRecordCollector) { - return new BigQueryRecordConsumer(bigquery, writeConfigs, catalog, outputRecordCollector); + Consumer outputRecordCollector, + boolean isGcsUploadingMode, + boolean isKeepFilesInGcs) { + return new BigQueryRecordConsumer(bigquery, writeConfigs, catalog, outputRecordCollector, isGcsUploadingMode, isKeepFilesInGcs); } protected Schema getBigQuerySchema(JsonNode jsonSchema) { @@ -263,7 +299,7 @@ protected Schema getBigQuerySchema(JsonNode jsonSchema) { } private static String getSchema(JsonNode config, ConfiguredAirbyteStream stream) { - final String defaultSchema = config.get(CONFIG_DATASET_ID).asText(); + final String defaultSchema = config.get(BigQueryConsts.CONFIG_DATASET_ID).asText(); final String srcNamespace = stream.getStream().getNamespace(); if (srcNamespace == null) { return defaultSchema; @@ -271,20 +307,6 @@ private static String getSchema(JsonNode config, ConfiguredAirbyteStream stream) return srcNamespace; } - private void createSchemaAndTableIfNeeded(BigQuery bigquery, - Set existingSchemas, - String schemaName, - String tmpTableName, - String datasetLocation, - JsonNode jsonSchema) { - if (!existingSchemas.contains(schemaName)) { - createSchemaTable(bigquery, schemaName, datasetLocation); - existingSchemas.add(schemaName); - } - final Schema schema = getBigQuerySchema(jsonSchema); - BigQueryUtils.createTable(bigquery, schemaName, tmpTableName, schema); - } - private static WriteDisposition getWriteDisposition(DestinationSyncMode syncMode) { if (syncMode == null) { throw new IllegalStateException("Undefined destination sync mode"); @@ -300,6 +322,35 @@ private static WriteDisposition getWriteDisposition(DestinationSyncMode syncMode } } + private UploadingMethod getLoadingMethod(JsonNode config) { + JsonNode loadingMethod = config.get(BigQueryConsts.LOADING_METHOD); + if (loadingMethod != null && BigQueryConsts.GCS_STAGING.equals(loadingMethod.get(BigQueryConsts.METHOD).asText())) { + LOGGER.info("Selected loading method is set to: " + UploadingMethod.GCS); + return UploadingMethod.GCS; + } else { + LOGGER.info("Selected loading method is set to: " + UploadingMethod.STANDARD); + return UploadingMethod.STANDARD; + } + } + + private boolean isKeepFilesInGcs(JsonNode config) { + JsonNode loadingMethod = config.get(BigQueryConsts.LOADING_METHOD); + if (loadingMethod != null && loadingMethod.get(BigQueryConsts.KEEP_GCS_FILES) != null + && BigQueryConsts.KEEP_GCS_FILES_VAL + .equals(loadingMethod.get(BigQueryConsts.KEEP_GCS_FILES).asText())) { + LOGGER.info("All tmp files GCS will be kept in bucket when migration is finished"); + return true; + } else { + LOGGER.info("All tmp files will be removed from GCS when migration is finished"); + return false; + } + } + + public enum UploadingMethod { + STANDARD, + GCS + } + public static void main(String[] args) throws Exception { final Destination destination = new BigQueryDestination(); LOGGER.info("starting destination: {}", BigQueryDestination.class); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java index ad4da80d2c4b..a349c332da9c 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java @@ -24,13 +24,21 @@ package io.airbyte.integrations.destination.bigquery; +import static com.amazonaws.util.StringUtils.UTF8; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; +import com.amazonaws.services.s3.model.S3ObjectSummary; import com.fasterxml.jackson.databind.JsonNode; import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.CopyJobConfiguration; +import com.google.cloud.bigquery.CsvOptions; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.JobInfo; import com.google.cloud.bigquery.JobInfo.CreateDisposition; import com.google.cloud.bigquery.JobInfo.WriteDisposition; +import com.google.cloud.bigquery.LoadJobConfiguration; import com.google.cloud.bigquery.QueryParameterValue; import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.TableDataWriteChannel; @@ -44,6 +52,9 @@ import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.StandardNameTransformer; +import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; +import io.airbyte.integrations.destination.gcs.GcsS3Helper; +import io.airbyte.integrations.destination.gcs.csv.GcsCsvWriter; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.AirbyteRecordMessage; @@ -52,10 +63,13 @@ import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.ImmutablePair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,17 +82,23 @@ public class BigQueryRecordConsumer extends FailureTrackingAirbyteMessageConsume private final Map writeConfigs; private final ConfiguredAirbyteCatalog catalog; private final Consumer outputRecordCollector; + private final boolean isGcsUploadingMode; + private final boolean isKeepFilesInGcs; private AirbyteMessage lastStateMessage = null; public BigQueryRecordConsumer(BigQuery bigquery, Map writeConfigs, ConfiguredAirbyteCatalog catalog, - Consumer outputRecordCollector) { + Consumer outputRecordCollector, + boolean isGcsUploadingMode, + boolean isKeepFilesInGcs) { this.bigquery = bigquery; this.writeConfigs = writeConfigs; this.catalog = catalog; this.outputRecordCollector = outputRecordCollector; + this.isGcsUploadingMode = isGcsUploadingMode; + this.isKeepFilesInGcs = isKeepFilesInGcs; } @Override @@ -87,7 +107,7 @@ protected void startTracked() { } @Override - public void acceptTracked(AirbyteMessage message) { + public void acceptTracked(AirbyteMessage message) throws IOException { if (message.getType() == Type.STATE) { lastStateMessage = message; } else if (message.getType() == Type.RECORD) { @@ -101,15 +121,25 @@ public void acceptTracked(AirbyteMessage message) { Jsons.serialize(catalog), Jsons.serialize(recordMessage))); } final BigQueryWriteConfig writer = writeConfigs.get(pair); - try { - writer.getWriter().write(ByteBuffer.wrap((Jsons.serialize(formatRecord(writer.getSchema(), recordMessage)) + "\n").getBytes(Charsets.UTF_8))); - } catch (IOException | RuntimeException e) { - LOGGER.error("Got an error while writing message:" + e.getMessage()); - LOGGER.error(String.format( - "Failed to process a message for job: %s, \nStreams numbers: %s, \nSyncMode: %s, \nTableName: %s, \nTmpTableName: %s, \nAirbyteMessage: %s", - writer.getWriter().getJob(), catalog.getStreams().size(), writer.getSyncMode(), writer.getTable(), writer.getTmpTable(), message)); - printHeapMemoryConsumption(); - throw new RuntimeException(e); + + // select the way of uploading - normal or through the GCS + if (writer.getGcsCsvWriter() == null) { + // Normal uploading way + try { + writer.getWriter() + .write(ByteBuffer.wrap((Jsons.serialize(formatRecord(writer.getSchema(), recordMessage)) + "\n").getBytes(Charsets.UTF_8))); + } catch (IOException | RuntimeException e) { + LOGGER.error("Got an error while writing message:" + e.getMessage()); + LOGGER.error(String.format( + "Failed to process a message for job: %s, \nStreams numbers: %s, \nSyncMode: %s, \nTableName: %s, \nTmpTableName: %s, \nAirbyteMessage: %s", + writer.getWriter().getJob(), catalog.getStreams().size(), writer.getSyncMode(), writer.getTable(), writer.getTmpTable(), message)); + printHeapMemoryConsumption(); + throw new RuntimeException(e); + } + } else { + // GCS uploading way, this data will be moved to bigquery in close method + GcsCsvWriter gcsCsvWriter = writer.getGcsCsvWriter(); + gcsCsvWriter.write(UUID.randomUUID(), recordMessage); } } else { LOGGER.warn("Unexpected message: " + message.getType()); @@ -131,13 +161,114 @@ protected JsonNode formatRecord(Schema schema, AirbyteRecordMessage recordMessag @Override public void close(boolean hasFailed) { LOGGER.info("Started closing all connections"); + // process gcs streams + if (isGcsUploadingMode) { + closeGcsStreamsAndCopyDataToBigQuery(hasFailed); + } + + closeNormalBigqueryStreams(hasFailed); + + if (isGcsUploadingMode && !isKeepFilesInGcs) { + deleteDataFromGcsBucket(); + } + } + + private void closeGcsStreamsAndCopyDataToBigQuery(boolean hasFailed) { + final List gcsWritersList = writeConfigs.values().parallelStream() + .filter(el -> el.getGcsCsvWriter() != null) + .collect(Collectors.toList()); + + if (!gcsWritersList.isEmpty()) { + LOGGER.info("GCS connectors that need to be closed:" + gcsWritersList); + gcsWritersList.parallelStream().forEach(writer -> { + final GcsCsvWriter gcsCsvWriter = writer.getGcsCsvWriter(); + + try { + LOGGER.info("Closing connector:" + gcsCsvWriter); + gcsCsvWriter.close(hasFailed); + } catch (IOException | RuntimeException e) { + LOGGER.error(String.format("Failed to close %s gcsWriter, \n details: %s", gcsCsvWriter, e.getMessage())); + printHeapMemoryConsumption(); + throw new RuntimeException(e); + } + }); + } + + // copy data from tmp gcs storage to bigquery tables + writeConfigs.values().stream() + .filter(pair -> pair.getGcsCsvWriter() != null) + .forEach(pair -> { + try { + loadCsvFromGcsTruncate(pair); + } catch (Exception e) { + LOGGER.error("Failed to load data from GCS CSV file to BibQuery tmp table with reason: " + e.getMessage()); + throw new RuntimeException(e); + } + }); + } + + private void loadCsvFromGcsTruncate(BigQueryWriteConfig bigQueryWriteConfig) + throws Exception { + try { + + TableId tmpTable = bigQueryWriteConfig.getTmpTable(); + Schema schema = bigQueryWriteConfig.getSchema(); + + String csvFile = bigQueryWriteConfig.getGcsCsvWriter().getGcsCsvFileLocation(); + + // Initialize client that will be used to send requests. This client only needs to be created + // once, and can be reused for multiple requests. + LOGGER.info(String.format("Started coping data from %s GCS csv file to %s tmp BigQuery table with schema: \n %s", + csvFile, tmpTable, schema)); + + CsvOptions csvOptions = CsvOptions.newBuilder().setEncoding(UTF8).setSkipLeadingRows(1).build(); + + LoadJobConfiguration configuration = + LoadJobConfiguration.builder(tmpTable, csvFile) + .setFormatOptions(csvOptions) + .setSchema(schema) + .setWriteDisposition(bigQueryWriteConfig.getSyncMode()) + .build(); + + // For more information on Job see: + // https://googleapis.dev/java/google-cloud-clients/latest/index.html?com/google/cloud/bigquery/package-summary.html + // Load the table + Job loadJob = bigquery.create(JobInfo.of(configuration)); + + LOGGER.info("Crated a new job GCS csv file to tmp BigQuery table: " + loadJob); + LOGGER.info("Waiting for job to complete..."); + + // Load data from a GCS parquet file into the table + // Blocks until this load table job completes its execution, either failing or succeeding. + Job completedJob = loadJob.waitFor(); + + // Check for errors + if (completedJob == null) { + LOGGER.error("Job not executed since it no longer exists."); + throw new Exception("Job not executed since it no longer exists."); + } else if (completedJob.getStatus().getError() != null) { + // You can also look at queryJob.getStatus().getExecutionErrors() for all + // errors, not just the latest one. + String msg = "BigQuery was unable to load into the table due to an error: \n" + + loadJob.getStatus().getError(); + LOGGER.error(msg); + throw new Exception(msg); + } + LOGGER.info("Table is successfully overwritten by CSV file loaded from GCS"); + } catch (BigQueryException | InterruptedException e) { + LOGGER.error("Column not added during load append \n" + e.toString()); + throw new RuntimeException("Column not added during load append \n" + e.toString()); + } + } + + private void closeNormalBigqueryStreams(boolean hasFailed) { try { writeConfigs.values().parallelStream().forEach(bigQueryWriteConfig -> Exceptions.toRuntime(() -> { TableDataWriteChannel writer = bigQueryWriteConfig.getWriter(); try { writer.close(); } catch (IOException | RuntimeException e) { - LOGGER.error(String.format("Failed to process a message for job: %s, \nStreams numbers: %s", + LOGGER.error(String.format("Failed to close writer: %s, \nStreams numbers: %s", writer.getJob(), catalog.getStreams().size())); printHeapMemoryConsumption(); throw new RuntimeException(e); @@ -179,6 +310,34 @@ public void close(boolean hasFailed) { } } + private void deleteDataFromGcsBucket() { + writeConfigs.values().forEach(writeConfig -> { + GcsDestinationConfig gcsDestinationConfig = writeConfig.getGcsDestinationConfig(); + AmazonS3 s3Client = GcsS3Helper.getGcsS3Client(gcsDestinationConfig); + + String gcsBucketName = gcsDestinationConfig.getBucketName(); + String gcs_bucket_path = gcsDestinationConfig.getBucketPath(); + + List keysToDelete = new LinkedList<>(); + List objects = s3Client + .listObjects(gcsBucketName, gcs_bucket_path) + .getObjectSummaries(); + for (S3ObjectSummary object : objects) { + keysToDelete.add(new KeyVersion(object.getKey())); + } + + if (keysToDelete.size() > 0) { + LOGGER.info("Tearing down test bucket path: {}/{}", gcsBucketName, gcs_bucket_path); + // Google Cloud Storage doesn't accept request to delete multiple objects + for (KeyVersion keyToDelete : keysToDelete) { + s3Client.deleteObject(gcsBucketName, keyToDelete.getKey()); + } + LOGGER.info("Deleted {} file(s).", keysToDelete.size()); + } + s3Client.shutdown(); + }); + } + // https://cloud.google.com/bigquery/docs/managing-tables#copying_a_single_source_table private static void copyTable( BigQuery bigquery, diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java index 9402ef8b9d02..8d7ceb560e96 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java @@ -24,8 +24,11 @@ package io.airbyte.integrations.destination.bigquery; +import com.fasterxml.jackson.databind.JsonNode; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.Dataset; +import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.JobId; import com.google.cloud.bigquery.JobInfo; @@ -35,6 +38,9 @@ import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableInfo; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import java.util.Set; import java.util.UUID; import org.apache.commons.lang3.tuple.ImmutablePair; import org.slf4j.Logger; @@ -73,6 +79,27 @@ static Job waitForQuery(Job queryJob) { } } + static void createSchemaAndTableIfNeeded(BigQuery bigquery, + Set existingSchemas, + String schemaName, + String tmpTableName, + String datasetLocation, + Schema schema) { + if (!existingSchemas.contains(schemaName)) { + createSchemaTable(bigquery, schemaName, datasetLocation); + existingSchemas.add(schemaName); + } + BigQueryUtils.createTable(bigquery, schemaName, tmpTableName, schema); + } + + static void createSchemaTable(BigQuery bigquery, String datasetId, String datasetLocation) { + final Dataset dataset = bigquery.getDataset(datasetId); + if (dataset == null || !dataset.exists()) { + final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(datasetLocation).build(); + bigquery.create(datasetInfo); + } + } + // https://cloud.google.com/bigquery/docs/tables#create-table static void createTable(BigQuery bigquery, String datasetName, String tableName, Schema schema) { try { @@ -88,4 +115,29 @@ static void createTable(BigQuery bigquery, String datasetName, String tableName, } } + public static JsonNode getGcsJsonNodeConfig(JsonNode config) { + JsonNode loadingMethod = config.get(BigQueryConsts.LOADING_METHOD); + JsonNode gcsJsonNode = Jsons.jsonNode(ImmutableMap.builder() + .put(BigQueryConsts.GCS_BUCKET_NAME, loadingMethod.get(BigQueryConsts.GCS_BUCKET_NAME)) + .put(BigQueryConsts.GCS_BUCKET_PATH, loadingMethod.get(BigQueryConsts.GCS_BUCKET_PATH)) + .put(BigQueryConsts.GCS_BUCKET_REGION, getDatasetLocation(config)) + .put(BigQueryConsts.CREDENTIAL, loadingMethod.get(BigQueryConsts.CREDENTIAL)) + .put(BigQueryConsts.FORMAT, Jsons.deserialize("{\n" + + " \"format_type\": \"CSV\",\n" + + " \"flattening\": \"No flattening\"\n" + + "}")) + .build()); + + LOGGER.debug("Composed GCS config is: \n" + gcsJsonNode.toPrettyString()); + return gcsJsonNode; + } + + public static String getDatasetLocation(JsonNode config) { + if (config.has(BigQueryConsts.CONFIG_DATASET_LOCATION)) { + return config.get(BigQueryConsts.CONFIG_DATASET_LOCATION).asText(); + } else { + return "US"; + } + } + } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryWriteConfig.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryWriteConfig.java index e5e6fbe6ac43..903d948f040f 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryWriteConfig.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryWriteConfig.java @@ -28,6 +28,8 @@ import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.TableDataWriteChannel; import com.google.cloud.bigquery.TableId; +import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; +import io.airbyte.integrations.destination.gcs.csv.GcsCsvWriter; class BigQueryWriteConfig { @@ -36,13 +38,23 @@ class BigQueryWriteConfig { private final TableDataWriteChannel writer; private final WriteDisposition syncMode; private final Schema schema; + private final GcsCsvWriter gcsCsvWriter; + private final GcsDestinationConfig gcsDestinationConfig; - BigQueryWriteConfig(TableId table, TableId tmpTable, TableDataWriteChannel writer, WriteDisposition syncMode, Schema schema) { + BigQueryWriteConfig(TableId table, + TableId tmpTable, + TableDataWriteChannel writer, + WriteDisposition syncMode, + Schema schema, + GcsCsvWriter gcsCsvWriter, + GcsDestinationConfig gcsDestinationConfig) { this.table = table; this.tmpTable = tmpTable; this.writer = writer; this.syncMode = syncMode; this.schema = schema; + this.gcsCsvWriter = gcsCsvWriter; + this.gcsDestinationConfig = gcsDestinationConfig; } public TableId getTable() { @@ -65,4 +77,12 @@ public Schema getSchema() { return schema; } + public GcsCsvWriter getGcsCsvWriter() { + return gcsCsvWriter; + } + + public GcsDestinationConfig getGcsDestinationConfig() { + return gcsDestinationConfig; + } + } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-bigquery/src/main/resources/spec.json index b07eb72d84d1..4dc5364f36e3 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/resources/spec.json @@ -72,6 +72,97 @@ "description": "The contents of the JSON service account key. Check out the docs if you need help generating this key. Default credentials will be used if this field is left empty.", "title": "Credentials JSON", "airbyte_secret": true + }, + "loading_method": { + "type": "object", + "title": "Loading Method", + "description": "Loading method used to send select the way data will be uploaded to BigQuery.", + "oneOf": [ + { + "title": "Standard Inserts", + "additionalProperties": false, + "description": "Direct uploading using streams.", + "required": ["method"], + "properties": { + "method": { + "type": "string", + "const": "Standard" + } + } + }, + { + "title": "GCS Staging", + "additionalProperties": false, + "description": "Writes large batches of records to a file, uploads the file to GCS, then uses
COPY INTO table
to upload the file. Recommended for large production workloads for better speed and scalability.", + "required": [ + "method", + "gcs_bucket_name", + "gcs_bucket_path", + "credential" + ], + "properties": { + "method": { + "type": "string", + "const": "GCS Staging" + }, + "gcs_bucket_name": { + "title": "GCS Bucket Name", + "type": "string", + "description": "The name of the GCS bucket.", + "examples": ["airbyte_sync"] + }, + "gcs_bucket_path": { + "description": "Directory under the GCS bucket where data will be written.", + "type": "string", + "examples": ["data_sync/test"] + }, + "keep_files_in_gcs-bucket": { + "type": "string", + "description": "This upload method is supposed to temporary store records in GCS bucket. What do you want to do with data in GCS bucket when migration has finished?", + "title": "GCS tmp files afterward processing", + "default": "Delete all tmp files from GCS", + "enum": [ + "Delete all tmp files from GCS", + "Keep all tmp files in GCS" + ] + }, + "credential": { + "title": "Credential", + "type": "object", + "oneOf": [ + { + "title": "HMAC key", + "required": [ + "credential_type", + "hmac_key_access_id", + "hmac_key_secret" + ], + "properties": { + "credential_type": { + "type": "string", + "const": "HMAC_KEY" + }, + "hmac_key_access_id": { + "type": "string", + "description": "HMAC key access ID. When linked to a service account, this ID is 61 characters long; when linked to a user account, it is 24 characters long.", + "title": "HMAC Key Access ID", + "airbyte_secret": true, + "examples": ["1234567890abcdefghij1234"] + }, + "hmac_key_secret": { + "type": "string", + "description": "The corresponding secret for the access ID. It is a 40-character base-64 encoded string.", + "title": "HMAC Key Secret", + "airbyte_secret": true, + "examples": ["1234567890abcdefghij1234567890ABCDEFGHIJ"] + } + } + } + ] + } + } + } + ] } } } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java index c7e976fc438d..cfb6b515c9ff 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java @@ -189,9 +189,8 @@ protected void setup(TestDestinationEnv testEnv) throws Exception { + ". Override by setting setting path with the CREDENTIALS_PATH constant."); } - final String credentialsJsonString = new String(Files.readAllBytes(CREDENTIALS_PATH)); - - final JsonNode credentialsJson = Jsons.deserialize(credentialsJsonString); + final String fullConfigAsString = new String(Files.readAllBytes(CREDENTIALS_PATH)); + final JsonNode credentialsJson = Jsons.deserialize(fullConfigAsString).get(BigQueryConsts.BIGQUERY_BASIC_CONFIG); final String projectId = credentialsJson.get(CONFIG_PROJECT_ID).asText(); final String datasetLocation = "US"; @@ -199,13 +198,14 @@ protected void setup(TestDestinationEnv testEnv) throws Exception { config = Jsons.jsonNode(ImmutableMap.builder() .put(CONFIG_PROJECT_ID, projectId) - .put(CONFIG_CREDS, credentialsJsonString) + .put(CONFIG_CREDS, credentialsJson.toString()) .put(CONFIG_DATASET_ID, datasetId) .put(CONFIG_DATASET_LOCATION, datasetLocation) .build()); - final ServiceAccountCredentials credentials = - ServiceAccountCredentials.fromStream(new ByteArrayInputStream(config.get(CONFIG_CREDS).asText().getBytes())); + final ServiceAccountCredentials credentials = ServiceAccountCredentials + .fromStream(new ByteArrayInputStream(credentialsJson.toString().getBytes())); + bigquery = BigQueryOptions.newBuilder() .setProjectId(config.get(CONFIG_PROJECT_ID).asText()) .setCredentials(credentials) diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java index f5fb88841e0a..5e0a541c1a02 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java @@ -126,11 +126,13 @@ void setup(TestInfo info) throws IOException { throw new IllegalStateException( "Must provide path to a big query credentials file. By default {module-root}/config/credentials.json. Override by setting setting path with the CREDENTIALS_PATH constant."); } - final String credentialsJsonString = new String(Files.readAllBytes(CREDENTIALS_PATH)); - final JsonNode credentialsJson = Jsons.deserialize(credentialsJsonString); + final String fullConfigAsString = new String(Files.readAllBytes(CREDENTIALS_PATH)); + final JsonNode credentialsJson = Jsons.deserialize(fullConfigAsString).get(BigQueryConsts.BIGQUERY_BASIC_CONFIG); - final String projectId = credentialsJson.get(BigQueryDestination.CONFIG_PROJECT_ID).asText(); - final ServiceAccountCredentials credentials = ServiceAccountCredentials.fromStream(new ByteArrayInputStream(credentialsJsonString.getBytes())); + final String projectId = credentialsJson.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(); + + final ServiceAccountCredentials credentials = ServiceAccountCredentials + .fromStream(new ByteArrayInputStream(credentialsJson.toString().getBytes())); bigquery = BigQueryOptions.newBuilder() .setProjectId(projectId) .setCredentials(credentials) @@ -155,10 +157,10 @@ void setup(TestInfo info) throws IOException { dataset = bigquery.create(datasetInfo); config = Jsons.jsonNode(ImmutableMap.builder() - .put(BigQueryDestination.CONFIG_PROJECT_ID, projectId) - .put(BigQueryDestination.CONFIG_CREDS, credentialsJsonString) - .put(BigQueryDestination.CONFIG_DATASET_ID, datasetId) - .put(BigQueryDestination.CONFIG_DATASET_LOCATION, datasetLocation) + .put(BigQueryConsts.CONFIG_PROJECT_ID, projectId) + .put(BigQueryConsts.CONFIG_CREDS, credentialsJson.toString()) + .put(BigQueryConsts.CONFIG_DATASET_ID, datasetId) + .put(BigQueryConsts.CONFIG_DATASET_LOCATION, datasetLocation) .put(BIG_QUERY_CLIENT_CHUNK_SIZE, 10) .build()); @@ -215,7 +217,7 @@ void testCheckSuccess() { @Test void testCheckFailure() { - ((ObjectNode) config).put(BigQueryDestination.CONFIG_PROJECT_ID, "fake"); + ((ObjectNode) config).put(BigQueryConsts.CONFIG_PROJECT_ID, "fake"); final AirbyteConnectionStatus actual = new BigQueryDestination().check(config); final String actualMessage = actual.getMessage(); LOGGER.info("Checking expected failure message:" + actualMessage); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationAcceptanceTest.java new file mode 100644 index 000000000000..27633dfe1ff4 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationAcceptanceTest.java @@ -0,0 +1,298 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.bigquery; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.auth.oauth2.ServiceAccountCredentials; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.Dataset; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FieldList; +import com.google.cloud.bigquery.FieldValue; +import com.google.cloud.bigquery.FieldValueList; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.JobId; +import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.TableResult; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.destination.StandardNameTransformer; +import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import java.io.ByteArrayInputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BigQueryGcsDestinationAcceptanceTest extends DestinationAcceptanceTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryGcsDestinationAcceptanceTest.class); + + private static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json"); + + private static final String CONFIG_DATASET_ID = "dataset_id"; + private static final String CONFIG_PROJECT_ID = "project_id"; + private static final String CONFIG_DATASET_LOCATION = "dataset_location"; + private static final String CONFIG_CREDS = "credentials_json"; + + private BigQuery bigquery; + private Dataset dataset; + private boolean tornDown; + private JsonNode config; + private StandardNameTransformer namingResolver = new StandardNameTransformer(); + + @Override + protected String getImageName() { + return "airbyte/destination-bigquery:dev"; + } + + @Override + protected JsonNode getConfig() { + return config; + } + + @Override + protected JsonNode getFailCheckConfig() throws Exception { + ((ObjectNode) config).put(CONFIG_PROJECT_ID, "fake"); + return config; + } + + @Override + protected boolean supportsNormalization() { + return true; + } + + @Override + protected boolean supportsDBT() { + return true; + } + + @Override + protected boolean implementsNamespaces() { + return true; + } + + @Override + protected String getDefaultSchema(JsonNode config) { + return config.get(CONFIG_DATASET_ID).asText(); + } + + @Override + protected List retrieveNormalizedRecords(TestDestinationEnv testEnv, String streamName, String namespace) throws Exception { + String tableName = namingResolver.getIdentifier(streamName); + String schema = namingResolver.getIdentifier(namespace); + return retrieveRecordsFromTable(tableName, schema); + } + + @Override + protected List retrieveRecords(TestDestinationEnv env, + String streamName, + String namespace, + JsonNode streamSchema) + throws Exception { + return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namingResolver.getIdentifier(namespace)) + .stream() + .map(node -> node.get(JavaBaseConstants.COLUMN_NAME_DATA).asText()) + .map(Jsons::deserialize) + .collect(Collectors.toList()); + } + + @Override + protected List resolveIdentifier(String identifier) { + final List result = new ArrayList<>(); + result.add(identifier); + result.add(namingResolver.getIdentifier(identifier)); + return result; + } + + private List retrieveRecordsFromTable(String tableName, String schema) throws InterruptedException { + final QueryJobConfiguration queryConfig = + QueryJobConfiguration + .newBuilder( + String.format("SELECT * FROM `%s`.`%s` order by %s asc;", schema, tableName, + JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) + .setUseLegacySql(false).build(); + + TableResult queryResults = executeQuery(bigquery, queryConfig).getLeft().getQueryResults(); + FieldList fields = queryResults.getSchema().getFields(); + + return StreamSupport + .stream(queryResults.iterateAll().spliterator(), false) + .map(row -> { + Map jsonMap = Maps.newHashMap(); + for (Field field : fields) { + Object value = getTypedFieldValue(row, field); + jsonMap.put(field.getName(), value); + } + return jsonMap; + }) + .map(Jsons::jsonNode) + .collect(Collectors.toList()); + } + + private Object getTypedFieldValue(FieldValueList row, Field field) { + FieldValue fieldValue = row.get(field.getName()); + if (fieldValue.getValue() != null) { + return switch (field.getType().getStandardType()) { + case FLOAT64, NUMERIC -> fieldValue.getDoubleValue(); + case INT64 -> fieldValue.getNumericValue().intValue(); + case STRING -> fieldValue.getStringValue(); + case BOOL -> fieldValue.getBooleanValue(); + default -> fieldValue.getValue(); + }; + } else { + return null; + } + } + + @Override + protected void setup(TestDestinationEnv testEnv) throws Exception { + if (!Files.exists(CREDENTIALS_PATH)) { + throw new IllegalStateException( + "Must provide path to a big query credentials file. By default {module-root}/" + CREDENTIALS_PATH + + ". Override by setting setting path with the CREDENTIALS_PATH constant."); + } + + final String fullConfigFromSecretFileAsString = new String(Files.readAllBytes(CREDENTIALS_PATH)); + + final JsonNode fullConfigFromSecretFileJson = Jsons.deserialize(fullConfigFromSecretFileAsString); + final JsonNode bigqueryConfigFromSecretFile = fullConfigFromSecretFileJson.get(BigQueryConsts.BIGQUERY_BASIC_CONFIG); + final JsonNode gcsConfigFromSecretFile = fullConfigFromSecretFileJson.get(BigQueryConsts.GCS_CONFIG); + + final String projectId = bigqueryConfigFromSecretFile.get(CONFIG_PROJECT_ID).asText(); + final String datasetLocation = "US"; + + final String datasetId = Strings.addRandomSuffix("airbyte_tests", "_", 8); + + JsonNode gcsCredentialFromSecretFile = gcsConfigFromSecretFile.get(BigQueryConsts.CREDENTIAL); + JsonNode credential = Jsons.jsonNode(ImmutableMap.builder() + .put(BigQueryConsts.CREDENTIAL_TYPE, gcsCredentialFromSecretFile.get(BigQueryConsts.CREDENTIAL_TYPE)) + .put(BigQueryConsts.HMAC_KEY_ACCESS_ID, gcsCredentialFromSecretFile.get(BigQueryConsts.HMAC_KEY_ACCESS_ID)) + .put(BigQueryConsts.HMAC_KEY_ACCESS_SECRET, gcsCredentialFromSecretFile.get(BigQueryConsts.HMAC_KEY_ACCESS_SECRET)) + .build()); + + JsonNode loadingMethod = Jsons.jsonNode(ImmutableMap.builder() + .put(BigQueryConsts.METHOD, BigQueryConsts.GCS_STAGING) + .put(BigQueryConsts.GCS_BUCKET_NAME, gcsConfigFromSecretFile.get(BigQueryConsts.GCS_BUCKET_NAME)) + .put(BigQueryConsts.GCS_BUCKET_PATH, gcsConfigFromSecretFile.get(BigQueryConsts.GCS_BUCKET_PATH).asText() + System.currentTimeMillis()) + .put(BigQueryConsts.CREDENTIAL, credential) + .build()); + + config = Jsons.jsonNode(ImmutableMap.builder() + .put(BigQueryConsts.CONFIG_PROJECT_ID, projectId) + .put(BigQueryConsts.CONFIG_CREDS, bigqueryConfigFromSecretFile.toString()) + .put(BigQueryConsts.CONFIG_DATASET_ID, datasetId) + .put(BigQueryConsts.CONFIG_DATASET_LOCATION, datasetLocation) + .put(BigQueryConsts.LOADING_METHOD, loadingMethod) + .build()); + + final ServiceAccountCredentials credentials = ServiceAccountCredentials + .fromStream(new ByteArrayInputStream(bigqueryConfigFromSecretFile.toString().getBytes())); + + bigquery = BigQueryOptions.newBuilder() + .setProjectId(config.get(CONFIG_PROJECT_ID).asText()) + .setCredentials(credentials) + .build() + .getService(); + + final DatasetInfo datasetInfo = + DatasetInfo.newBuilder(config.get(CONFIG_DATASET_ID).asText()).setLocation(config.get(CONFIG_DATASET_LOCATION).asText()).build(); + dataset = bigquery.create(datasetInfo); + + tornDown = false; + Runtime.getRuntime() + .addShutdownHook( + new Thread( + () -> { + if (!tornDown) { + tearDownBigQuery(); + } + })); + } + + @Override + protected void tearDown(TestDestinationEnv testEnv) { + // gcs tmp files are supposed to be removed automatically by consumer + tearDownBigQuery(); + } + + private void tearDownBigQuery() { + // allows deletion of a dataset that has contents + final BigQuery.DatasetDeleteOption option = BigQuery.DatasetDeleteOption.deleteContents(); + + final boolean success = bigquery.delete(dataset.getDatasetId(), option); + if (success) { + LOGGER.info("BQ Dataset " + dataset + " deleted..."); + } else { + LOGGER.info("BQ Dataset cleanup for " + dataset + " failed!"); + } + + tornDown = true; + } + + // todo (cgardens) - figure out how to share these helpers. they are currently copied from + // BigQueryDestination. + private static ImmutablePair executeQuery(BigQuery bigquery, QueryJobConfiguration queryConfig) { + final JobId jobId = JobId.of(UUID.randomUUID().toString()); + final Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build()); + return executeQuery(queryJob); + } + + private static ImmutablePair executeQuery(Job queryJob) { + final Job completedJob = waitForQuery(queryJob); + if (completedJob == null) { + throw new RuntimeException("Job no longer exists"); + } else if (completedJob.getStatus().getError() != null) { + // You can also look at queryJob.getStatus().getExecutionErrors() for all + // errors, not just the latest one. + return ImmutablePair.of(null, (completedJob.getStatus().getError().toString())); + } + + return ImmutablePair.of(completedJob, null); + } + + private static Job waitForQuery(Job queryJob) { + try { + return queryJob.waitFor(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationTest.java new file mode 100644 index 000000000000..09c67849eb71 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationTest.java @@ -0,0 +1,373 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.bigquery; + +import static java.util.stream.Collectors.toList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.auth.oauth2.ServiceAccountCredentials; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.Dataset; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.commons.string.Strings; +import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.destination.NamingConventionTransformer; +import io.airbyte.integrations.destination.StandardNameTransformer; +import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; +import io.airbyte.integrations.destination.gcs.GcsS3Helper; +import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.JsonSchemaPrimitive; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Instant; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class BigQueryGcsDestinationTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryGcsDestinationTest.class); + private static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json"); + + private static final String BIG_QUERY_CLIENT_CHUNK_SIZE = "big_query_client_buffer_size_mb"; + private static final Instant NOW = Instant.now(); + private static final String USERS_STREAM_NAME = "users"; + private static final String TASKS_STREAM_NAME = "tasks"; + private static final AirbyteMessage MESSAGE_USERS1 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) + .withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "john").put("id", "10").build())) + .withEmittedAt(NOW.toEpochMilli())); + private static final AirbyteMessage MESSAGE_USERS2 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) + .withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "susan").put("id", "30").build())) + .withEmittedAt(NOW.toEpochMilli())); + private static final AirbyteMessage MESSAGE_TASKS1 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(TASKS_STREAM_NAME) + .withData(Jsons.jsonNode(ImmutableMap.builder().put("goal", "announce the game.").build())) + .withEmittedAt(NOW.toEpochMilli())); + private static final AirbyteMessage MESSAGE_TASKS2 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(TASKS_STREAM_NAME) + .withData(Jsons.jsonNode(ImmutableMap.builder().put("goal", "ship some code.").build())) + .withEmittedAt(NOW.toEpochMilli())); + private static final AirbyteMessage MESSAGE_STATE = new AirbyteMessage().withType(AirbyteMessage.Type.STATE) + .withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.builder().put("checkpoint", "now!").build()))); + + private static final NamingConventionTransformer NAMING_RESOLVER = new StandardNameTransformer(); + + private JsonNode config; + + private BigQuery bigquery; + private AmazonS3 s3Client; + private Dataset dataset; + private ConfiguredAirbyteCatalog catalog; + + private boolean tornDown = true; + + @BeforeEach + void setup(TestInfo info) throws IOException { + if (info.getDisplayName().equals("testSpec()")) { + return; + } + + if (!Files.exists(CREDENTIALS_PATH)) { + throw new IllegalStateException( + "Must provide path to a big query credentials file. By default {module-root}/config/credentials.json. Override by setting setting path with the CREDENTIALS_PATH constant."); + } + final String fullConfigAsString = new String(Files.readAllBytes(CREDENTIALS_PATH)); + final JsonNode credentialsJson = Jsons.deserialize(fullConfigAsString).get(BigQueryConsts.BIGQUERY_BASIC_CONFIG); + final JsonNode credentialsGcsJson = Jsons.deserialize(fullConfigAsString).get(BigQueryConsts.GCS_CONFIG); + + final String projectId = credentialsJson.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(); + + final ServiceAccountCredentials credentials = ServiceAccountCredentials + .fromStream(new ByteArrayInputStream(credentialsJson.toString().getBytes())); + bigquery = BigQueryOptions.newBuilder() + .setProjectId(projectId) + .setCredentials(credentials) + .build() + .getService(); + + final String datasetId = Strings.addRandomSuffix("airbyte_tests", "_", 8); + final String datasetLocation = "EU"; + MESSAGE_USERS1.getRecord().setNamespace(datasetId); + MESSAGE_USERS2.getRecord().setNamespace(datasetId); + MESSAGE_TASKS1.getRecord().setNamespace(datasetId); + MESSAGE_TASKS2.getRecord().setNamespace(datasetId); + + catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( + CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, datasetId, + Field.of("name", JsonSchemaPrimitive.STRING), + Field + .of("id", JsonSchemaPrimitive.STRING)), + CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, datasetId, Field.of("goal", JsonSchemaPrimitive.STRING)))); + + final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(datasetLocation).build(); + dataset = bigquery.create(datasetInfo); + + JsonNode credentialFromSecretFile = credentialsGcsJson.get(BigQueryConsts.CREDENTIAL); + JsonNode credential = Jsons.jsonNode(ImmutableMap.builder() + .put(BigQueryConsts.CREDENTIAL_TYPE, credentialFromSecretFile.get(BigQueryConsts.CREDENTIAL_TYPE)) + .put(BigQueryConsts.HMAC_KEY_ACCESS_ID, credentialFromSecretFile.get(BigQueryConsts.HMAC_KEY_ACCESS_ID)) + .put(BigQueryConsts.HMAC_KEY_ACCESS_SECRET, credentialFromSecretFile.get(BigQueryConsts.HMAC_KEY_ACCESS_SECRET)) + .build()); + + JsonNode loadingMethod = Jsons.jsonNode(ImmutableMap.builder() + .put(BigQueryConsts.METHOD, BigQueryConsts.GCS_STAGING) + .put(BigQueryConsts.KEEP_GCS_FILES, BigQueryConsts.KEEP_GCS_FILES_VAL) + .put(BigQueryConsts.GCS_BUCKET_NAME, credentialsGcsJson.get(BigQueryConsts.GCS_BUCKET_NAME)) + .put(BigQueryConsts.GCS_BUCKET_PATH, credentialsGcsJson.get(BigQueryConsts.GCS_BUCKET_PATH).asText() + System.currentTimeMillis()) + .put(BigQueryConsts.CREDENTIAL, credential) + .build()); + + config = Jsons.jsonNode(ImmutableMap.builder() + .put(BigQueryConsts.CONFIG_PROJECT_ID, projectId) + .put(BigQueryConsts.CONFIG_CREDS, credentialsJson.toString()) + .put(BigQueryConsts.CONFIG_DATASET_ID, datasetId) + .put(BigQueryConsts.CONFIG_DATASET_LOCATION, datasetLocation) + .put(BigQueryConsts.LOADING_METHOD, loadingMethod) + .put(BIG_QUERY_CLIENT_CHUNK_SIZE, 10) + .build()); + + GcsDestinationConfig gcsDestinationConfig = GcsDestinationConfig + .getGcsDestinationConfig(BigQueryUtils.getGcsJsonNodeConfig(config)); + this.s3Client = GcsS3Helper.getGcsS3Client(gcsDestinationConfig); + + tornDown = false; + Runtime.getRuntime() + .addShutdownHook( + new Thread( + () -> { + if (!tornDown) { + tearDownBigQuery(); + } + })); + + } + + @AfterEach + void tearDown(TestInfo info) { + if (info.getDisplayName().equals("testSpec()")) { + return; + } + + tearDownGcs(); + tearDownBigQuery(); + } + + /** + * Remove all the GCS output from the tests. + */ + protected void tearDownGcs() { + JsonNode properties = config.get(BigQueryConsts.LOADING_METHOD); + String gcsBucketName = properties.get(BigQueryConsts.GCS_BUCKET_NAME).asText(); + String gcs_bucket_path = properties.get(BigQueryConsts.GCS_BUCKET_PATH).asText(); + + List keysToDelete = new LinkedList<>(); + List objects = s3Client + .listObjects(gcsBucketName, gcs_bucket_path) + .getObjectSummaries(); + for (S3ObjectSummary object : objects) { + keysToDelete.add(new KeyVersion(object.getKey())); + } + + if (keysToDelete.size() > 0) { + LOGGER.info("Tearing down test bucket path: {}/{}", gcsBucketName, gcs_bucket_path); + // Google Cloud Storage doesn't accept request to delete multiple objects + for (KeyVersion keyToDelete : keysToDelete) { + s3Client.deleteObject(gcsBucketName, keyToDelete.getKey()); + } + LOGGER.info("Deleted {} file(s).", keysToDelete.size()); + } + } + + private void tearDownBigQuery() { + // allows deletion of a dataset that has contents + final BigQuery.DatasetDeleteOption option = BigQuery.DatasetDeleteOption.deleteContents(); + + final boolean success = bigquery.delete(dataset.getDatasetId(), option); + if (success) { + LOGGER.info("BQ Dataset " + dataset + " deleted..."); + } else { + LOGGER.info("BQ Dataset cleanup for " + dataset + " failed!"); + } + + tornDown = true; + } + + @Test + void testSpec() throws Exception { + final ConnectorSpecification actual = new BigQueryDestination().spec(); + final String resourceString = MoreResources.readResource("spec.json"); + final ConnectorSpecification expected = Jsons.deserialize(resourceString, ConnectorSpecification.class); + + assertEquals(expected, actual); + } + + @Test + void testCheckSuccess() { + final AirbyteConnectionStatus actual = new BigQueryDestination().check(config); + final AirbyteConnectionStatus expected = new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); + assertEquals(expected, actual); + } + + @Test + void testCheckFailure() { + ((ObjectNode) config).put(BigQueryConsts.CONFIG_PROJECT_ID, "fake"); + final AirbyteConnectionStatus actual = new BigQueryDestination().check(config); + final String actualMessage = actual.getMessage(); + LOGGER.info("Checking expected failure message:" + actualMessage); + assertTrue(actualMessage.contains("Access Denied:")); + final AirbyteConnectionStatus expected = new AirbyteConnectionStatus().withStatus(Status.FAILED).withMessage(""); + assertEquals(expected, actual.withMessage("")); + } + + @Test + void testWriteSuccess() throws Exception { + final BigQueryDestination destination = new BigQueryDestination(); + final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); + + consumer.accept(MESSAGE_USERS1); + consumer.accept(MESSAGE_TASKS1); + consumer.accept(MESSAGE_USERS2); + consumer.accept(MESSAGE_TASKS2); + consumer.accept(MESSAGE_STATE); + consumer.close(); + + final List usersActual = retrieveRecords(NAMING_RESOLVER.getRawTableName(USERS_STREAM_NAME)); + final List expectedUsersJson = Lists.newArrayList(MESSAGE_USERS1.getRecord().getData(), MESSAGE_USERS2.getRecord().getData()); + assertEquals(expectedUsersJson.size(), usersActual.size()); + assertTrue(expectedUsersJson.containsAll(usersActual) && usersActual.containsAll(expectedUsersJson)); + + final List tasksActual = retrieveRecords(NAMING_RESOLVER.getRawTableName(TASKS_STREAM_NAME)); + final List expectedTasksJson = Lists.newArrayList(MESSAGE_TASKS1.getRecord().getData(), MESSAGE_TASKS2.getRecord().getData()); + assertEquals(expectedTasksJson.size(), tasksActual.size()); + assertTrue(expectedTasksJson.containsAll(tasksActual) && tasksActual.containsAll(expectedTasksJson)); + + assertTmpTablesNotPresent(catalog.getStreams() + .stream() + .map(ConfiguredAirbyteStream::getStream) + .map(AirbyteStream::getName) + .collect(Collectors.toList())); + } + + @Test + void testWriteFailure() throws Exception { + // hack to force an exception to be thrown from within the consumer. + final AirbyteMessage spiedMessage = spy(MESSAGE_USERS1); + doThrow(new RuntimeException()).when(spiedMessage).getRecord(); + + final AirbyteMessageConsumer consumer = spy(new BigQueryDestination().getConsumer(config, catalog, Destination::defaultOutputRecordCollector)); + + assertThrows(RuntimeException.class, () -> consumer.accept(spiedMessage)); + consumer.accept(MESSAGE_USERS2); + assertThrows(RuntimeException.class, () -> consumer.close()); // check if fails when data was not loaded to GCS bucket by some reason + + final List tableNames = catalog.getStreams() + .stream() + .map(ConfiguredAirbyteStream::getStream) + .map(AirbyteStream::getName) + .collect(toList()); + assertTmpTablesNotPresent(catalog.getStreams() + .stream() + .map(ConfiguredAirbyteStream::getStream) + .map(AirbyteStream::getName) + .collect(Collectors.toList())); + // assert that no tables were created. + assertTrue(fetchNamesOfTablesInDb().stream().noneMatch(tableName -> tableNames.stream().anyMatch(tableName::startsWith))); + } + + private Set fetchNamesOfTablesInDb() throws InterruptedException { + final QueryJobConfiguration queryConfig = QueryJobConfiguration + .newBuilder(String.format("SELECT * FROM %s.INFORMATION_SCHEMA.TABLES;", dataset.getDatasetId().getDataset())) + .setUseLegacySql(false) + .build(); + + return StreamSupport + .stream(BigQueryUtils.executeQuery(bigquery, queryConfig).getLeft().getQueryResults().iterateAll().spliterator(), false) + .map(v -> v.get("TABLE_NAME").getStringValue()).collect(Collectors.toSet()); + } + + private void assertTmpTablesNotPresent(List tableNames) throws InterruptedException { + final Set tmpTableNamePrefixes = tableNames.stream().map(name -> name + "_").collect(Collectors.toSet()); + final Set finalTableNames = tableNames.stream().map(name -> name + "_raw").collect(Collectors.toSet()); + // search for table names that have the tmp table prefix but are not raw tables. + assertTrue(fetchNamesOfTablesInDb() + .stream() + .filter(tableName -> !finalTableNames.contains(tableName)) + .noneMatch(tableName -> tmpTableNamePrefixes.stream().anyMatch(tableName::startsWith))); + } + + private List retrieveRecords(String tableName) throws Exception { + QueryJobConfiguration queryConfig = + QueryJobConfiguration.newBuilder(String.format("SELECT * FROM %s.%s;", dataset.getDatasetId().getDataset(), tableName.toLowerCase())) + .setUseLegacySql(false).build(); + + BigQueryUtils.executeQuery(bigquery, queryConfig); + + return StreamSupport + .stream(BigQueryUtils.executeQuery(bigquery, queryConfig).getLeft().getQueryResults().iterateAll().spliterator(), false) + .map(v -> v.get(JavaBaseConstants.COLUMN_NAME_DATA).getStringValue()) + .map(Jsons::deserialize) + .collect(Collectors.toList()); + } + +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvWriter.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvWriter.java index ede22ebce82e..92570ba8d513 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvWriter.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvWriter.java @@ -55,6 +55,7 @@ public class GcsCsvWriter extends BaseGcsWriter implements S3Writer { private final StreamTransferManager uploadManager; private final MultiPartOutputStream outputStream; private final CSVPrinter csvPrinter; + private final String gcsCsvFileLocation; // this used in destination-bigquery (GCS upload type) public GcsCsvWriter(GcsDestinationConfig config, AmazonS3 s3Client, @@ -68,6 +69,7 @@ public GcsCsvWriter(GcsDestinationConfig config, String outputFilename = BaseGcsWriter.getOutputFilename(uploadTimestamp, S3Format.CSV); String objectKey = String.join("/", outputPrefix, outputFilename); + gcsCsvFileLocation = String.format("gs://%s/%s", config.getBucketName(), objectKey); LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.getBucketName(), objectKey); @@ -99,4 +101,8 @@ protected void closeWhenFail() throws IOException { uploadManager.abort(); } + public String getGcsCsvFileLocation() { + return gcsCsvFileLocation; + } + } diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index 4bf8ac079673..57597f7143d2 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -6,6 +6,15 @@ description: >- # BigQuery +## Uploading options +There are 2 available options to upload data to bigquery `Standard` and `GCS Staging`. +- `Standard` is option to upload data directly from your source to BigQuery storage. This way is faster and requires less resources than GCS one. +Please be aware you may see some fails for big datasets and slow sources, i.e. if reading from source takes more than 10-12 hours. +This is caused by the Google BigQuery SDK client limitations. For more details please check https://github.com/airbytehq/airbyte/issues/3549 +- `GCS Uploading (CSV format)`. This approach has been implemented in order to avoid the issue for big datasets mentioned above. +At the first step all data is uploaded to GCS bucket and then all moved to BigQuery at one shot stream by stream. +The destination-gcs connector is partially used under the hood here, so you may check its documentation for more details. + ## Overview The Airbyte BigQuery destination allows you to sync data to BigQuery. BigQuery is a serverless, highly scalable, and cost-effective data warehouse offered by Google Cloud Provider. @@ -21,7 +30,7 @@ There are two flavors of connectors for this destination: Each stream will be output into its own table in BigQuery. Each table will contain 3 columns: * `_airbyte_ab_id`: a uuid assigned by Airbyte to each event that is processed. The column type in BigQuery is `String`. -* `_airbyte_emitted_at`: a timestamp representing when the event was pulled from the data source. The column type in BigQuery is `Timestamp`. +* `_airbyte_emitted_at`: a timestamp representing when the event was pulled from the data source. The column type in BigQuery is `String`. Due to a Google [limitations](https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv#data_types) for data migration from GCs to BigQuery by its native job - the timestamp (seconds from 1970' can't be used). Only date format, so only String is accepted for us in this case. * `_airbyte_data`: a json blob representing with the event data. The column type in BigQuery is `String`. #### Features @@ -43,6 +52,10 @@ To use the BigQuery destination, you'll need: * A Google Cloud Service Account with the "BigQuery User" and "BigQuery Data Editor" roles in your GCP project * A Service Account Key to authenticate into your Service Account +For GCS Staging upload mode: +* GCS role enabled for same user as used for biqquery +* HMAC key obtained for user. Currently, only the [HMAC key](https://cloud.google.com/storage/docs/authentication/hmackeys) is supported. More credential types will be added in the future. + See the setup guide for more information about how to create the required resources. ### Setup guide @@ -91,6 +104,27 @@ You should now have all the requirements needed to configure BigQuery as a desti Once you've configured BigQuery as a destination, delete the Service Account Key from your computer. +For the GCS Staging upload type additional params must be configured: + + * **GCS Bucket Name** + * **GCS Bucket Path** + * **GCS Bucket Keep files after migration** + * See [this](https://cloud.google.com/storage/docs/creating-buckets) to create an S3 bucket. + * **HMAC Key Access ID** + * See [this](https://cloud.google.com/storage/docs/authentication/hmackeys) on how to generate an access key. + * We recommend creating an Airbyte-specific user or service account. This user or account will require read and write permissions to objects in the bucket. + * **Secret Access Key** + * Corresponding key to the above access ID. +* Make sure your GCS bucket is accessible from the machine running Airbyte. + * This depends on your networking setup. + * The easiest way to verify if Airbyte is able to connect to your GCS bucket is via the check connection tool in the UI. + + +Note: +It partially re-uses the destination-gcs connector under the hood. So you may also refer to its guide for additional clarifications. +**GCS Region** for GCS would be used the same as set for BigQuery +**Format** - Gcs format is set to CSV + ## Notes about BigQuery Naming Conventions From [BigQuery Datasets Naming](https://cloud.google.com/bigquery/docs/datasets#dataset-naming): @@ -113,6 +147,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.4.0 | 2021-08-26 | [#5296](https://github.com/airbytehq/airbyte/issues/5296) | Added GCS Staging uploading option | | 0.3.12 | 2021-08-03 | [#3549](https://github.com/airbytehq/airbyte/issues/3549) | Add optional arg to make a possibility to change the BigQuery client's chunk\buffer size | | 0.3.11 | 2021-07-30 | [#5125](https://github.com/airbytehq/airbyte/pull/5125) | Enable `additionalPropertities` in spec.json | | 0.3.10 | 2021-07-28 | [#3549](https://github.com/airbytehq/airbyte/issues/3549) | Add extended logs and made JobId filled with region and projectId | diff --git a/docs/integrations/destinations/gcs.md b/docs/integrations/destinations/gcs.md index 0e1312e315b4..e667abf3bd31 100644 --- a/docs/integrations/destinations/gcs.md +++ b/docs/integrations/destinations/gcs.md @@ -372,4 +372,5 @@ Under the hood, an Airbyte data stream in Json schema is first converted to an A | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.1 | 2021-08-26 | [#5296](https://github.com/airbytehq/airbyte/issues/5296) | Added storing gcsCsvFileLocation property for CSV format. This is used by destination-bigquery (GCS Staging upload type) | | 0.1.0 | 2021-07-16 | [#4329](https://github.com/airbytehq/airbyte/pull/4784) | Initial release. | diff --git a/settings.gradle b/settings.gradle index fe427876668c..1520ce2918c9 100644 --- a/settings.gradle +++ b/settings.gradle @@ -82,6 +82,10 @@ if(!System.getenv().containsKey("SUB_BUILD") || System.getenv().get("SUB_BUILD") include ':airbyte-integrations:connectors:destination-redshift' include ':airbyte-integrations:connectors:destination-snowflake' include ':airbyte-integrations:connectors:destination-oracle' + + //Needed by destination-bugquery + include ':airbyte-integrations:connectors:destination-s3' + include ':airbyte-integrations:connectors:destination-gcs' } // connectors