diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index 4545b2a7ed6d..cda6a95e97d1 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -9,13 +9,12 @@ import com.codepoetics.protonpack.StreamUtils; import com.fasterxml.jackson.databind.JsonNode; import com.google.auth.oauth2.ServiceAccountCredentials; -import com.google.cloud.Binding; -import com.google.cloud.Policy; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.QueryJobConfiguration; -import com.google.cloud.storage.*; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; import com.google.common.base.Charsets; import io.airbyte.commons.functional.CheckedBiFunction; import io.airbyte.commons.json.Jsons; @@ -33,6 +32,7 @@ import io.airbyte.integrations.destination.bigquery.uploader.BigQueryUploaderFactory; import io.airbyte.integrations.destination.bigquery.uploader.UploaderType; import io.airbyte.integrations.destination.bigquery.uploader.config.UploaderConfig; +import io.airbyte.integrations.destination.gcs.GcsDestination; import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; import io.airbyte.integrations.destination.gcs.GcsNameTransformer; import io.airbyte.integrations.destination.gcs.GcsStorageOperations; @@ -49,13 +49,13 @@ import io.airbyte.protocol.models.ConfiguredAirbyteStream; import java.io.ByteArrayInputStream; import java.io.IOException; -import java.net.URI; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.avro.Schema; import org.apache.commons.lang3.tuple.ImmutablePair; import org.joda.time.DateTime; @@ -119,12 +119,9 @@ public AirbyteConnectionStatus check(final JsonNode config) { public AirbyteConnectionStatus checkStorageIamPermissions(final JsonNode config) { final JsonNode loadingMethod = config.get(BigQueryConsts.LOADING_METHOD); final String bucketName = loadingMethod.get(BigQueryConsts.GCS_BUCKET_NAME).asText(); - final String bucketPath = loadingMethod.get(BigQueryConsts.GCS_BUCKET_PATH).asText(); - final URI uri = URI.create("gs://" + bucketName + "/" + loadingMethod.get(BigQueryConsts.GCS_BUCKET_PATH).asText()); try { final ServiceAccountCredentials credentials = getServiceAccountCredentials(config); - String clientEmail = credentials.getClientEmail(); final Storage storage = StorageOptions.newBuilder() .setProjectId(config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText()) @@ -141,7 +138,9 @@ public AirbyteConnectionStatus checkStorageIamPermissions(final JsonNode config) if (!missingPermissions.isEmpty()) { LOGGER.warn("Please make sure you account has all of these permissions:{}", REQUIRED_PERMISSIONS); - return checkIamPermissionsWithLimitedAccess(clientEmail, storage, bucketName, bucketPath, config, uri); + final GcsDestination gcsDestination = new GcsDestination(); + final JsonNode gcsJsonNodeConfig = BigQueryUtils.getGcsJsonNodeConfig(config); + return gcsDestination.check(gcsJsonNodeConfig); } return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); @@ -156,51 +155,6 @@ public AirbyteConnectionStatus checkStorageIamPermissions(final JsonNode config) } } - private AirbyteConnectionStatus checkIamPermissionsWithLimitedAccess(String clientEmail, - Storage storage, - String bucketName, - String bucketPath, - JsonNode config, - URI uri) { - Policy iamPolicy = storage.getIamPolicy(bucketName, Storage.BucketSourceOption.requestedPolicyVersion(3)); - // Print binding information - List serviceAccountConditionalBindings = iamPolicy.getBindingsList() - .stream() - .filter(binding -> binding.getCondition() != null) - .filter(binding -> !binding.getMembers().isEmpty() && binding.getMembers().stream().anyMatch(x -> x.contains(clientEmail))) - .collect(Collectors.toList()); - - List objectAdminBindings = serviceAccountConditionalBindings.stream() - .filter(binding -> binding.getRole().equals("roles/storage.objectAdmin")) - .collect(Collectors.toList()); - - if (objectAdminBindings.isEmpty()) { - return new AirbyteConnectionStatus() - .withStatus(AirbyteConnectionStatus.Status.FAILED) - .withMessage("Service account " + clientEmail + " must have role id roles/storage.objectAdmin"); - } - - boolean pathAllowed = objectAdminBindings.stream() - .anyMatch(binding -> Objects.requireNonNull(binding.getCondition()).getExpression() != null - && allowedBucketPath(binding.getCondition().getExpression(), bucketPath)); - - if (!pathAllowed) { - return new AirbyteConnectionStatus() - .withStatus(AirbyteConnectionStatus.Status.FAILED) - .withMessage("Service account " + clientEmail + " does not have access to " + uri); - } - return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); - } - - private boolean allowedBucketPath(String expression, String bucketPath) { - return Stream.of(expression.split("&&")) - .anyMatch(cond -> { - String[] elements = cond.split("/"); - return cond.contains("resource.name.startsWith(") - && List.of(elements).get(elements.length - 1).equals(bucketPath + "\")"); - }); - } - protected BigQuery getBigQuery(final JsonNode config) { final String projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText();