Skip to content

Commit

Permalink
BigQuery Destination fixed bug with contitional binding
Browse files Browse the repository at this point in the history
  • Loading branch information
VitaliiMaltsev committed Apr 15, 2022
1 parent faf66da commit 7ad0085
Showing 1 changed file with 10 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@
import com.codepoetics.protonpack.StreamUtils;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.Binding;
import com.google.cloud.Policy;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.storage.*;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.base.Charsets;
import io.airbyte.commons.functional.CheckedBiFunction;
import io.airbyte.commons.json.Jsons;
Expand All @@ -33,6 +32,7 @@
import io.airbyte.integrations.destination.bigquery.uploader.BigQueryUploaderFactory;
import io.airbyte.integrations.destination.bigquery.uploader.UploaderType;
import io.airbyte.integrations.destination.bigquery.uploader.config.UploaderConfig;
import io.airbyte.integrations.destination.gcs.GcsDestination;
import io.airbyte.integrations.destination.gcs.GcsDestinationConfig;
import io.airbyte.integrations.destination.gcs.GcsNameTransformer;
import io.airbyte.integrations.destination.gcs.GcsStorageOperations;
Expand All @@ -49,13 +49,13 @@
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -119,12 +119,9 @@ public AirbyteConnectionStatus check(final JsonNode config) {
public AirbyteConnectionStatus checkStorageIamPermissions(final JsonNode config) {
final JsonNode loadingMethod = config.get(BigQueryConsts.LOADING_METHOD);
final String bucketName = loadingMethod.get(BigQueryConsts.GCS_BUCKET_NAME).asText();
final String bucketPath = loadingMethod.get(BigQueryConsts.GCS_BUCKET_PATH).asText();
final URI uri = URI.create("gs://" + bucketName + "/" + loadingMethod.get(BigQueryConsts.GCS_BUCKET_PATH).asText());

try {
final ServiceAccountCredentials credentials = getServiceAccountCredentials(config);
String clientEmail = credentials.getClientEmail();

final Storage storage = StorageOptions.newBuilder()
.setProjectId(config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText())
Expand All @@ -141,7 +138,9 @@ public AirbyteConnectionStatus checkStorageIamPermissions(final JsonNode config)
if (!missingPermissions.isEmpty()) {
LOGGER.warn("Please make sure you account has all of these permissions:{}", REQUIRED_PERMISSIONS);

return checkIamPermissionsWithLimitedAccess(clientEmail, storage, bucketName, bucketPath, config, uri);
final GcsDestination gcsDestination = new GcsDestination();
final JsonNode gcsJsonNodeConfig = BigQueryUtils.getGcsJsonNodeConfig(config);
return gcsDestination.check(gcsJsonNodeConfig);

}
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
Expand All @@ -156,51 +155,6 @@ public AirbyteConnectionStatus checkStorageIamPermissions(final JsonNode config)
}
}

private AirbyteConnectionStatus checkIamPermissionsWithLimitedAccess(String clientEmail,
Storage storage,
String bucketName,
String bucketPath,
JsonNode config,
URI uri) {
Policy iamPolicy = storage.getIamPolicy(bucketName, Storage.BucketSourceOption.requestedPolicyVersion(3));
// Print binding information
List<Binding> serviceAccountConditionalBindings = iamPolicy.getBindingsList()
.stream()
.filter(binding -> binding.getCondition() != null)
.filter(binding -> !binding.getMembers().isEmpty() && binding.getMembers().stream().anyMatch(x -> x.contains(clientEmail)))
.collect(Collectors.toList());

List<Binding> objectAdminBindings = serviceAccountConditionalBindings.stream()
.filter(binding -> binding.getRole().equals("roles/storage.objectAdmin"))
.collect(Collectors.toList());

if (objectAdminBindings.isEmpty()) {
return new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage("Service account " + clientEmail + " must have role id roles/storage.objectAdmin");
}

boolean pathAllowed = objectAdminBindings.stream()
.anyMatch(binding -> Objects.requireNonNull(binding.getCondition()).getExpression() != null
&& allowedBucketPath(binding.getCondition().getExpression(), bucketPath));

if (!pathAllowed) {
return new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage("Service account " + clientEmail + " does not have access to " + uri);
}
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
}

private boolean allowedBucketPath(String expression, String bucketPath) {
return Stream.of(expression.split("&&"))
.anyMatch(cond -> {
String[] elements = cond.split("/");
return cond.contains("resource.name.startsWith(")
&& List.of(elements).get(elements.length - 1).equals(bucketPath + "\")");
});
}

protected BigQuery getBigQuery(final JsonNode config) {
final String projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText();

Expand Down

1 comment on commit 7ad0085

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SonarQube Report

SonarQube report for Airbyte Connectors Destination Bigquery(#12068)

Measures

Name Value Name Value Name Value
Security Rating A Lines to Cover 838 Vulnerabilities 0
Quality Gate Status OK Bugs 3 Coverage 0.0
Duplicated Lines (%) 0.0 Code Smells 44 Reliability Rating C
Duplicated Blocks 0 Lines of Code 1767 Blocker Issues 0
Critical Issues 3 Major Issues 34 Minor Issues 8

Detected Issues

Rule File Description Message
java:S112 (MAJOR) bigquery/BigQueryAvroSerializedBuffer.java:39 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S2142 (MAJOR) bigquery/BigQueryGcsOperations.java:140 "InterruptedException" should not be ignored Either re-interrupt this method or rethrow the "InterruptedException" that can be caught here.
java:S107 (MAJOR) bigquery/BigQueryStagingConsumerFactory.java:36 Methods should not have too many parameters Method has 8 parameters, which is greater than 7 authorized.
java:S4276 (MINOR) bigquery/BigQueryStagingConsumerFactory.java:42 Functional Interfaces should be as specialised as possible Refactor this code to use the more specialised Functional Interface 'UnaryOperator'
java:S4276 (MINOR) bigquery/BigQueryStagingConsumerFactory.java:43 Functional Interfaces should be as specialised as possible Refactor this code to use the more specialised Functional Interface 'UnaryOperator'
java:S4276 (MINOR) bigquery/BigQueryStagingConsumerFactory.java:66 Functional Interfaces should be as specialised as possible Refactor this code to use the more specialised Functional Interface 'UnaryOperator'
java:S4276 (MINOR) bigquery/BigQueryStagingConsumerFactory.java:67 Functional Interfaces should be as specialised as possible Refactor this code to use the more specialised Functional Interface 'UnaryOperator'
java:S1611 (MINOR) bigquery/BigQueryStagingConsumerFactory.java:135 Parentheses should be removed from a single lambda input parameter when its type is inferred Remove the parentheses around the "hasFailed" parameter (sonar.java.source not set. Assuming 8 or greater.)
java:S112 (MAJOR) bigquery/BigQueryStagingOperations.java:27 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S112 (MAJOR) bigquery/BigQueryStagingOperations.java:34 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S5993 (MAJOR) formatter/BigQueryRecordFormatter.java:37 Constructors of an "abstract" class should not be declared "public" Change the visibility of this constructor to "protected".
java:S112 (MAJOR) uploader/AbstractBigQueryUploader.java:56 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S112 (MAJOR) uploader/AbstractBigQueryUploader.java:93 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S1135 (INFO) bigquery/BigQueryDestination.java:97 Track uses of "TODO" tags Complete the task associated to this TODO comment.
java:S112 (MAJOR) bigquery/BigQueryRecordConsumer.java:69 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S112 (MAJOR) writer/BigQueryTableWriter.java:34 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S1452 (CRITICAL) uploader/BigQueryUploaderFactory.java:34 Generic wildcard types should not be used in return types Remove usage of generic wildcard type.
java:S1116 (MINOR) formatter/BigQueryRecordFormatter.java:45 Empty statements should be removed Remove this empty statement.
java:S112 (MAJOR) uploader/AbstractBigQueryUploader.java:70 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S3457 (MAJOR) uploader/AbstractBigQueryUploader.java:87 Printf-style format strings should be used correctly %n should be used in place of \n to produce the platform-specific line separator.
java:S112 (MAJOR) uploader/AbstractBigQueryUploader.java:89 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S125 (MAJOR) uploader/AbstractBigQueryUploader.java:110 Sections of code should not be commented out This block of commented-out lines of code should be removed.
java:S2142 (MAJOR) uploader/AbstractBigQueryUploader.java:164 "InterruptedException" should not be ignored Either re-interrupt this method or rethrow the "InterruptedException" that can be caught here.
java:S112 (MAJOR) uploader/AbstractBigQueryUploader.java:183 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S2142 (MAJOR) uploader/AbstractGscBigQueryUploader.java:83 "InterruptedException" should not be ignored Either re-interrupt this method or rethrow the "InterruptedException" that can be caught here.
java:S112 (MAJOR) uploader/AbstractGscBigQueryUploader.java:85 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S1124 (MINOR) uploader/AbstractGscBigQueryUploader.java:89 Modifiers should be declared in the correct order Reorder the modifiers to comply with the Java Language Specification.
java:S1117 (MAJOR) uploader/AbstractGscBigQueryUploader.java:97 Local variables should not shadow class fields Rename "gcsDestinationConfig" which hides the field declared at line 32.
java:S1118 (MAJOR) uploader/BigQueryUploaderFactory.java:32 Utility classes should not have public constructors Add a private constructor to hide the implicit public one.
java:S107 (MAJOR) uploader/BigQueryUploaderFactory.java:80 Methods should not have too many parameters Method has 8 parameters, which is greater than 7 authorized.
java:S1068 (MAJOR) config/UploaderConfig.java:20 Unused "private" fields should be removed Remove this unused "config" private field.
java:S1068 (MAJOR) config/UploaderConfig.java:21 Unused "private" fields should be removed Remove this unused "configStream" private field.
java:S1068 (MAJOR) config/UploaderConfig.java:22 Unused "private" fields should be removed Remove this unused "targetTableName" private field.
java:S1068 (MAJOR) config/UploaderConfig.java:23 Unused "private" fields should be removed Remove this unused "tmpTableName" private field.
java:S1068 (MAJOR) config/UploaderConfig.java:24 Unused "private" fields should be removed Remove this unused "bigQuery" private field.
java:S1068 (MAJOR) config/UploaderConfig.java:25 Unused "private" fields should be removed Remove this unused "formatterMap" private field.
java:S1068 (MAJOR) config/UploaderConfig.java:26 Unused "private" fields should be removed Remove this unused "isDefaultAirbyteTmpSchema" private field.
java:S1068 (MAJOR) writer/BigQueryTableWriter.java:21 Unused "private" fields should be removed Remove this unused "LOGGER" private field.
java:S1186 (CRITICAL) writer/BigQueryTableWriter.java:30 Methods should not be empty Add a nested comment explaining why this method is empty, throw an UnsupportedOperationException or complete the implementation.
java:S1118 (MAJOR) bigquery/BigQueryConsts.java:7 Utility classes should not have public constructors Add a private constructor to hide the implicit public one.
java:S115 (CRITICAL) bigquery/BigQueryConsts.java:9 Constant names should comply with a naming convention Rename this constant name to match the regular expression '^[A-Z][A-Z0-9](_[A-Z0-9]+)$'.
java:S1450 (MINOR) bigquery/BigQueryRecordConsumer.java:27 Private fields only used as local variables in methods should become local variables Remove the "lastStateMessage" field and declare it as a local variable in the relevant methods.
java:S1135 (INFO) bigquery/BigQueryRecordConsumer.java:37 Track uses of "TODO" tags Complete the task associated to this TODO comment.
java:S1118 (MAJOR) bigquery/BigQueryUtils.java:52 Utility classes should not have public constructors Add a private constructor to hide the implicit public one.
java:S112 (MAJOR) bigquery/BigQueryUtils.java:71 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S112 (MAJOR) bigquery/BigQueryUtils.java:86 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S112 (MAJOR) bigquery/BigQueryDestination.java:175 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.

Coverage (0.0%)

File Coverage File Coverage
src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAvroSerializedBuffer.java 0.0 src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryConsts.java 0.0
src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java 0.0 src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java 0.0
src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java 0.0 src/main/java/io/airbyte/integrations/destination/bigquery/BigQuerySQLNameTransformer.java 0.0
src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java 0.0 src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java 0.0
src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryWriteConfig.java 0.0 src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java 0.0
src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryRecordFormatter.java 0.0 src/main/java/io/airbyte/integrations/destination/bigquery/formatter/GcsAvroBigQueryRecordFormatter.java 0.0
src/main/java/io/airbyte/integrations/destination/bigquery/formatter/GcsCsvBigQueryRecordFormatter.java 0.0 src/main/java/io/airbyte/integrations/destination/bigquery/helpers/LoggerHelper.java 0.0
src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java 0.0 src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractGscBigQueryUploader.java 0.0
src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryDirectUploader.java 0.0 src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java 0.0
src/main/java/io/airbyte/integrations/destination/bigquery/uploader/GcsAvroBigQueryUploader.java 0.0 src/main/java/io/airbyte/integrations/destination/bigquery/uploader/GcsCsvBigQueryUploader.java 0.0
src/main/java/io/airbyte/integrations/destination/bigquery/writer/BigQueryTableWriter.java 0.0

Please sign in to comment.