Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination BigQuery: Accept Dataset ID field prefixed by Project ID #8383

Merged
merged 17 commits into from
Jan 18, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public BigQueryDestination() {
@Override
public AirbyteConnectionStatus check(final JsonNode config) {
try {
final String datasetId = config.get(BigQueryConsts.CONFIG_DATASET_ID).asText();
final String datasetId = BigQueryUtils.getDatasetId(config);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BigQueryUtils.getDatasetId(config) used in 2 cases

  1. BigQueryDestination.check()
  2. BigQueryDestination.getConsumer() invoke inside BigQueryUploaderFactory.getUploader then BigQueryUtils.getSchema then BigQueryUtils.getDatasetId(config)

Could you pls consider to add integration test to BigQueryDestinationTest to check that:

  1. we can create connection with provided dataset
  2. we can get consumer and write some data

final String datasetLocation = BigQueryUtils.getDatasetLocation(config);
final BigQuery bigquery = getBigQuery(config);
final UploadingMethod uploadingMethod = BigQueryUtils.getLoadingMethod(config);
Expand Down Expand Up @@ -176,9 +176,10 @@ protected Map<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>> getUp
}

/**
* BigQuery might have different structure of the Temporary table.
* If this method returns TRUE, temporary table will have only three common Airbyte attributes.
* In case of FALSE, temporary table structure will be in line with Airbyte message JsonSchema.
* BigQuery might have different structure of the Temporary table. If this method returns TRUE,
* temporary table will have only three common Airbyte attributes. In case of FALSE, temporary table
* structure will be in line with Airbyte message JsonSchema.
*
* @return use default AirbyteSchema or build using JsonSchema
*/
protected boolean isDefaultAirbyteTmpTableSchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.destination.bigquery;

import static io.airbyte.integrations.destination.bigquery.helpers.LoggerHelper.getJobErrorMessage;
import static java.util.Objects.isNull;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand Down Expand Up @@ -37,6 +38,8 @@
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.joda.time.DateTime;
import org.slf4j.Logger;
Expand All @@ -46,6 +49,7 @@ public class BigQueryUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryUtils.class);
private static final String BIG_QUERY_DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss.SSSSSS";
private static final Pattern datasetIdPattern = Pattern.compile("^(([a-z]([a-z0-9\\-]*[a-z0-9])?):)?([a-zA-Z0-9_]+)$");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems to be a constant
pls, rename it to DATASET_ID_PATTERN


public static ImmutablePair<Job, String> executeQuery(final BigQuery bigquery, final QueryJobConfiguration queryConfig) {
final JobId jobId = JobId.of(UUID.randomUUID().toString());
Expand Down Expand Up @@ -160,6 +164,27 @@ public static JsonNode getGcsAvroJsonNodeConfig(final JsonNode config) {
return gcsJsonNode;
}

public static String getDatasetId(final JsonNode config) {
String datasetId = config.get(BigQueryConsts.CONFIG_DATASET_ID).asText();
Matcher matcher = datasetIdPattern.matcher(datasetId);

if (matcher.matches()) {
if (!isNull(matcher.group(1))) {
final String projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText();
if (!(projectId.equals(matcher.group(2)))) {
throw new IllegalArgumentException(String.format(
"Project ID included in Dataset ID must match Project ID field's value: Project ID is %s, but you specified %s in Dataset ID",
projectId,
matcher.group(2)));
}
}
return matcher.group(4);
}
throw new IllegalArgumentException(String.format(
"BigQuery Dataset ID format must match '[project-id:]dataset_id': %s",
datasetId));
}

public static String getDatasetLocation(final JsonNode config) {
if (config.has(BigQueryConsts.CONFIG_DATASET_LOCATION)) {
return config.get(BigQueryConsts.CONFIG_DATASET_LOCATION).asText();
Expand Down Expand Up @@ -212,7 +237,7 @@ public static void transformJsonDateTimeToBigDataFormat(List<String> dateTimeFie
}

public static String getSchema(final JsonNode config, final ConfiguredAirbyteStream stream) {
final String defaultSchema = config.get(BigQueryConsts.CONFIG_DATASET_ID).asText();
final String defaultSchema = getDatasetId(config);
final String srcNamespace = stream.getStream().getNamespace();
if (srcNamespace == null) {
return defaultSchema;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.bigquery;

import static org.junit.jupiter.api.Assertions.*;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.*;

public class BigQueryUtilsTest {

private ImmutableMap.Builder<Object, Object> configMapBuilder;

@BeforeEach
public void init() {
configMapBuilder = ImmutableMap.builder()
.put(BigQueryConsts.CONFIG_CREDS, "test_secret")
.put(BigQueryConsts.CONFIG_DATASET_LOCATION, "US");
}

public static Stream<Arguments> validBigQueryIdProvider() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change method definition to private and move below the public methods

return Stream.of(
Arguments.arguments("my-project", "my_dataset", "my_dataset"),
Arguments.arguments("my-project", "my-project:my_dataset", "my_dataset"));
}

@ParameterizedTest
@MethodSource("validBigQueryIdProvider")
public void testGetDatasetIdSuccess(String projectId, String datasetId, String expected) throws Exception {
JsonNode config = Jsons.jsonNode(configMapBuilder
.put(BigQueryConsts.CONFIG_PROJECT_ID, projectId)
.put(BigQueryConsts.CONFIG_DATASET_ID, datasetId)
.build());

String actual = BigQueryUtils.getDatasetId(config);

assertEquals(expected, actual);
}

public static Stream<Arguments> invalidBigQueryIdProvider() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change method definition to private and move below the public methods

return Stream.of(
Arguments.arguments("my-project", ":my_dataset", "BigQuery Dataset ID format must match '[project-id:]dataset_id': :my_dataset"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to check regexp pattern for dataset we could also add here some other incorrect datasets:
1."my-project:my-project:my_dataset".(to check that ':' is not allowed inside project name)
2."my-project-:my_dataset". (project name cannot end with a hyphen)
3."my-project:"
3."my-project: "

Arguments.arguments("my-project", "your-project:my_dataset",
"Project ID included in Dataset ID must match Project ID field's value: Project ID is my-project, but you specified your-project in Dataset ID"));
}

@ParameterizedTest
@MethodSource("invalidBigQueryIdProvider")
public void testGetDatasetIdFail(String projectId, String datasetId, String expected) throws Exception {
JsonNode config = Jsons.jsonNode(configMapBuilder
.put(BigQueryConsts.CONFIG_PROJECT_ID, projectId)
.put(BigQueryConsts.CONFIG_DATASET_ID, datasetId)
.build());

Exception exception = assertThrows(IllegalArgumentException.class, () -> BigQueryUtils.getDatasetId(config));

assertEquals(expected, exception.getMessage());
}

}
1 change: 1 addition & 0 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into

| Version | Date | Pull Request | Subject |
|:--------| :--- | :--- | :--- |
| 0.6.1 | 2021-12-22 | [\#8383](https://github.com/airbytehq/airbyte/issues/8383) | Support dataset-id prefixed by project-id |
| 0.6.0 | 2021-12-17 | [\#8788](https://github.com/airbytehq/airbyte/issues/8788) | BigQuery/BiqQuery denorm Destinations : Add possibility to use different types of GCS files |
| 0.5.1 | 2021-12-16 | [\#8816](https://github.com/airbytehq/airbyte/issues/8816) | Update dataset locations |
| 0.5.0 | 2021-10-26 | [\#7240](https://github.com/airbytehq/airbyte/issues/7240) | Output partitioned/clustered tables |
Expand Down