Skip to content

Commit

Permalink
📝 Destination bigquery: mark service account as required for cloud (#…
Browse files Browse the repository at this point in the history
…12768)

* Update spec

* Update doc

* Bump version and update changelog

* Modify wording

* Add sample service account key json

* Add screenshots and common permission issues

* Refactor service account helper method

* Update log message

* Update version date in changelog

* auto-bump connector version

* auto-bump connector version

Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
2 people authored and suhomud committed May 23, 2022
1 parent f8f877b commit d9cdc99
Show file tree
Hide file tree
Showing 10 changed files with 172 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
- name: BigQuery
destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 1.1.5
dockerImageTag: 1.1.6
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
icon: bigquery.svg
resourceRequirements:
Expand All @@ -40,7 +40,7 @@
- name: BigQuery (denormalized typed struct)
destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
dockerRepository: airbyte/destination-bigquery-denormalized
dockerImageTag: 0.3.5
dockerImageTag: 1.1.6
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
icon: bigquery.svg
resourceRequirements:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-bigquery:1.1.5"
- dockerImage: "airbyte/destination-bigquery:1.1.6"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery"
connectionSpecification:
Expand Down Expand Up @@ -368,7 +368,7 @@
\ <a href=\"https://docs.airbyte.com/integrations/destinations/bigquery#service-account-key\"\
>docs</a> if you need help generating this key. Default credentials will\
\ be used if this field is left empty."
title: "Service Account Key JSON (Optional)"
title: "Service Account Key JSON (Required for cloud, optional for open-source)"
airbyte_secret: true
transformation_priority:
type: "string"
Expand Down Expand Up @@ -494,7 +494,7 @@
- "overwrite"
- "append"
- "append_dedup"
- dockerImage: "airbyte/destination-bigquery-denormalized:0.3.5"
- dockerImage: "airbyte/destination-bigquery-denormalized:1.1.6"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery"
connectionSpecification:
Expand Down Expand Up @@ -578,7 +578,7 @@
\ <a href=\"https://docs.airbyte.com/integrations/destinations/bigquery#service-account-key\"\
>docs</a> if you need help generating this key. Default credentials will\
\ be used if this field is left empty."
title: "Service Account Key JSON (Optional)"
title: "Service Account Key JSON (Required for cloud, optional for open-source)"
airbyte_secret: true
loading_method:
type: "object"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.3.5
LABEL io.airbyte.version=1.1.6
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
"credentials_json": {
"type": "string",
"description": "The contents of the JSON service account key. Check out the <a href=\"https://docs.airbyte.com/integrations/destinations/bigquery#service-account-key\">docs</a> if you need help generating this key. Default credentials will be used if this field is left empty.",
"title": "Service Account Key JSON (Optional)",
"title": "Service Account Key JSON (Required for cloud, optional for open-source)",
"airbyte_secret": true
},
"loading_method": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.1.5
LABEL io.airbyte.version=1.1.6
LABEL io.airbyte.name=airbyte/destination-bigquery
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@

package io.airbyte.integrations.destination.bigquery;

import static java.util.Objects.isNull;

import com.codepoetics.protonpack.StreamUtils;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
Expand Down Expand Up @@ -110,7 +109,7 @@ public AirbyteConnectionStatus check(final JsonNode config) {
return new AirbyteConnectionStatus().withStatus(Status.FAILED).withMessage(result.getRight());
}
} catch (final Exception e) {
LOGGER.info("Check failed.", e);
LOGGER.error("Check failed.", e);
return new AirbyteConnectionStatus().withStatus(Status.FAILED).withMessage(e.getMessage() != null ? e.getMessage() : e.toString());
}
}
Expand All @@ -120,11 +119,10 @@ public AirbyteConnectionStatus checkStorageIamPermissions(final JsonNode config)
final String bucketName = loadingMethod.get(BigQueryConsts.GCS_BUCKET_NAME).asText();

try {
final ServiceAccountCredentials credentials = getServiceAccountCredentials(config);

final GoogleCredentials credentials = getServiceAccountCredentials(config);
final Storage storage = StorageOptions.newBuilder()
.setProjectId(config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText())
.setCredentials(!isNull(credentials) ? credentials : ServiceAccountCredentials.getApplicationDefault())
.setCredentials(credentials)
.build().getService();
final List<Boolean> permissionsCheckStatusList = storage.testIamPermissions(bucketName, REQUIRED_PERMISSIONS);

Expand All @@ -146,11 +144,11 @@ public AirbyteConnectionStatus checkStorageIamPermissions(final JsonNode config)
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);

} catch (final Exception e) {
LOGGER.error("Exception attempting to access the Gcs bucket: {}", e.getMessage());
LOGGER.error("Cannot access the GCS bucket", e);

return new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage("Could not connect to the Gcs bucket with the provided configuration. \n" + e
.withMessage("Could access the GCS bucket with the provided configuration.\n" + e
.getMessage());
}
}
Expand All @@ -160,30 +158,30 @@ protected BigQuery getBigQuery(final JsonNode config) {

try {
final BigQueryOptions.Builder bigQueryBuilder = BigQueryOptions.newBuilder();
ServiceAccountCredentials credentials = null;
if (BigQueryUtils.isUsingJsonCredentials(config)) {
// handle the credentials json being passed as a json object or a json object already serialized as
// a string.
credentials = getServiceAccountCredentials(config);
}
final GoogleCredentials credentials = getServiceAccountCredentials(config);
return bigQueryBuilder
.setProjectId(projectId)
.setCredentials(!isNull(credentials) ? credentials : ServiceAccountCredentials.getApplicationDefault())
.setCredentials(credentials)
.build()
.getService();
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

private ServiceAccountCredentials getServiceAccountCredentials(final JsonNode config) throws IOException {
final ServiceAccountCredentials credentials;
private static GoogleCredentials getServiceAccountCredentials(final JsonNode config) throws IOException {
if (!BigQueryUtils.isUsingJsonCredentials(config)) {
LOGGER.info("No service account key json is provided. It is required if you are using Airbyte cloud.");
LOGGER.info("Using the default service account credential from environment.");
return ServiceAccountCredentials.getApplicationDefault();
}

// The JSON credential can either be a raw JSON object, or a serialized JSON object.
final String credentialsString = config.get(BigQueryConsts.CONFIG_CREDS).isObject()
? Jsons.serialize(config.get(BigQueryConsts.CONFIG_CREDS))
: config.get(BigQueryConsts.CONFIG_CREDS).asText();
credentials = ServiceAccountCredentials
.fromStream(new ByteArrayInputStream(credentialsString.getBytes(Charsets.UTF_8)));
return credentials;
return ServiceAccountCredentials.fromStream(
new ByteArrayInputStream(credentialsString.getBytes(Charsets.UTF_8)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,15 @@ public static JobInfo.WriteDisposition getWriteDisposition(final DestinationSync
}

public static boolean isUsingJsonCredentials(final JsonNode config) {
return config.has(BigQueryConsts.CONFIG_CREDS) && !config.get(BigQueryConsts.CONFIG_CREDS).asText().isEmpty();
if (!config.has(BigQueryConsts.CONFIG_CREDS)) {
return false;
}
final JsonNode json = config.get(BigQueryConsts.CONFIG_CREDS);
if (json.isTextual()) {
return !json.asText().isEmpty();
} else {
return !Jsons.serialize(json).isEmpty();
}
}

// https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.client.Client.html
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
"credentials_json": {
"type": "string",
"description": "The contents of the JSON service account key. Check out the <a href=\"https://docs.airbyte.com/integrations/destinations/bigquery#service-account-key\">docs</a> if you need help generating this key. Default credentials will be used if this field is left empty.",
"title": "Service Account Key JSON (Optional)",
"title": "Service Account Key JSON (Required for cloud, optional for open-source)",
"airbyte_secret": true
},
"transformation_priority": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@
package io.airbyte.integrations.destination.bigquery;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand All @@ -29,30 +34,51 @@ public void init() {

@ParameterizedTest
@MethodSource("validBigQueryIdProvider")
public void testGetDatasetIdSuccess(String projectId, String datasetId, String expected) throws Exception {
JsonNode config = Jsons.jsonNode(configMapBuilder
public void testGetDatasetIdSuccess(final String projectId, final String datasetId, final String expected) {
final JsonNode config = Jsons.jsonNode(configMapBuilder
.put(BigQueryConsts.CONFIG_PROJECT_ID, projectId)
.put(BigQueryConsts.CONFIG_DATASET_ID, datasetId)
.build());

String actual = BigQueryUtils.getDatasetId(config);
final String actual = BigQueryUtils.getDatasetId(config);

assertEquals(expected, actual);
}

@ParameterizedTest
@MethodSource("invalidBigQueryIdProvider")
public void testGetDatasetIdFail(String projectId, String datasetId, String expected) throws Exception {
JsonNode config = Jsons.jsonNode(configMapBuilder
public void testGetDatasetIdFail(final String projectId, final String datasetId, final String expected) {
final JsonNode config = Jsons.jsonNode(configMapBuilder
.put(BigQueryConsts.CONFIG_PROJECT_ID, projectId)
.put(BigQueryConsts.CONFIG_DATASET_ID, datasetId)
.build());

Exception exception = assertThrows(IllegalArgumentException.class, () -> BigQueryUtils.getDatasetId(config));
final Exception exception = assertThrows(IllegalArgumentException.class, () -> BigQueryUtils.getDatasetId(config));

assertEquals(expected, exception.getMessage());
}

@Test
public void testIsUsingJsonCredentials() {
// empty
final JsonNode emptyConfig = Jsons.jsonNode(Collections.emptyMap());
assertFalse(BigQueryUtils.isUsingJsonCredentials(emptyConfig));

// empty text
final JsonNode emptyTextConfig = Jsons.jsonNode(Map.of(BigQueryConsts.CONFIG_CREDS, ""));
assertFalse(BigQueryUtils.isUsingJsonCredentials(emptyTextConfig));

// non-empty text
final JsonNode nonEmptyTextConfig = Jsons.jsonNode(
Map.of(BigQueryConsts.CONFIG_CREDS, "{ \"service_account\": \"[email protected]\" }"));
assertTrue(BigQueryUtils.isUsingJsonCredentials(nonEmptyTextConfig));

// object
final JsonNode objectConfig = Jsons.jsonNode(Map.of(
BigQueryConsts.CONFIG_CREDS, Jsons.jsonNode(Map.of("service_account", "[email protected]"))));
assertTrue(BigQueryUtils.isUsingJsonCredentials(objectConfig));
}

private static Stream<Arguments> validBigQueryIdProvider() {
return Stream.of(
Arguments.arguments("my-project", "my_dataset", "my_dataset"),
Expand Down
Loading

0 comments on commit d9cdc99

Please sign in to comment.