Skip to content

Commit

Permalink
Destination BigQuery: Accept Dataset ID field prefixed by Project ID (#…
Browse files Browse the repository at this point in the history
…8383)

* add Dataset ID parse method

* add BigQuery Destination unit test

* update change log

* fit to the latest code base

* update change log

* change var name to const name

* change public method to private

* add test cases for testGetDatasetIdFail

* add integration test for dataset-id prefixed with project-id

* fix getDatasetId

* add comment to parameterized test provider

* update docker image versions

* update docker image versions again
  • Loading branch information
koji-m authored Jan 18, 2022
1 parent c4b365c commit 3f9cbec
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "079d5540-f236-4294-ba7c-ade8fd918496",
"name": "BigQuery (denormalized typed struct)",
"dockerRepository": "airbyte/destination-bigquery-denormalized",
"dockerImageTag": "0.2.3",
"dockerImageTag": "0.2.4",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery",
"icon": "bigquery.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "22f6c74f-5699-40ff-833c-4a879ea40133",
"name": "BigQuery",
"dockerRepository": "airbyte/destination-bigquery",
"dockerImageTag": "0.6.3",
"dockerImageTag": "0.6.4",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery",
"icon": "bigquery.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
- name: BigQuery
destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 0.6.3
dockerImageTag: 0.6.4
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
icon: bigquery.svg
- name: BigQuery (denormalized typed struct)
destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
dockerRepository: airbyte/destination-bigquery-denormalized
dockerImageTag: 0.2.3
dockerImageTag: 0.2.4
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
icon: bigquery.svg
- name: Cassandra
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@
supportsDBT: false
supported_destination_sync_modes:
- "append"
- dockerImage: "airbyte/destination-bigquery:0.6.3"
- dockerImage: "airbyte/destination-bigquery:0.6.4"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery"
connectionSpecification:
Expand Down Expand Up @@ -378,7 +378,7 @@
- "overwrite"
- "append"
- "append_dedup"
- dockerImage: "airbyte/destination-bigquery-denormalized:0.2.3"
- dockerImage: "airbyte/destination-bigquery-denormalized:0.2.4"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-bigquery-denormalized

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.2.3
LABEL io.airbyte.version=0.2.4
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-bigquery

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.6.3
LABEL io.airbyte.version=0.6.4
LABEL io.airbyte.name=airbyte/destination-bigquery
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public BigQueryDestination() {
@Override
public AirbyteConnectionStatus check(final JsonNode config) {
try {
final String datasetId = config.get(BigQueryConsts.CONFIG_DATASET_ID).asText();
final String datasetId = BigQueryUtils.getDatasetId(config);
final String datasetLocation = BigQueryUtils.getDatasetLocation(config);
final BigQuery bigquery = getBigQuery(config);
final UploadingMethod uploadingMethod = BigQueryUtils.getLoadingMethod(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.destination.bigquery;

import static io.airbyte.integrations.destination.bigquery.helpers.LoggerHelper.getJobErrorMessage;
import static java.util.Objects.isNull;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand Down Expand Up @@ -162,6 +163,25 @@ public static JsonNode getGcsAvroJsonNodeConfig(final JsonNode config) {
return gcsJsonNode;
}

public static String getDatasetId(final JsonNode config) {
String datasetId = config.get(BigQueryConsts.CONFIG_DATASET_ID).asText();

int colonIndex = datasetId.indexOf(":");
if (colonIndex != -1) {
String projectIdPart = datasetId.substring(0, colonIndex);
String projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText();
if (!(projectId.equals(projectIdPart))) {
throw new IllegalArgumentException(String.format(
"Project ID included in Dataset ID must match Project ID field's value: Project ID is `%s`, but you specified `%s` in Dataset ID",
projectId,
projectIdPart));
}
}
// if colonIndex is -1, then this returns the entire string
// otherwise it returns everything after the colon
return datasetId.substring(colonIndex + 1);
}

public static String getDatasetLocation(final JsonNode config) {
if (config.has(BigQueryConsts.CONFIG_DATASET_LOCATION)) {
return config.get(BigQueryConsts.CONFIG_DATASET_LOCATION).asText();
Expand Down Expand Up @@ -214,7 +234,7 @@ public static void transformJsonDateTimeToBigDataFormat(List<String> dateTimeFie
}

public static String getSchema(final JsonNode config, final ConfiguredAirbyteStream stream) {
final String defaultSchema = config.get(BigQueryConsts.CONFIG_DATASET_ID).asText();
final String defaultSchema = getDatasetId(config);
final String srcNamespace = stream.getStream().getNamespace();
if (srcNamespace == null) {
return defaultSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,18 @@
import java.time.Instant;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -197,16 +202,20 @@ void testSpec() throws Exception {
assertEquals(expected, actual);
}

@Test
void testCheckSuccess() {
@ParameterizedTest
@MethodSource("datasetIdResetterProvider")
void testCheckSuccess(DatasetIdResetter resetDatasetId) {
resetDatasetId.accept(config);
final AirbyteConnectionStatus actual = new BigQueryDestination().check(config);
final AirbyteConnectionStatus expected = new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
assertEquals(expected, actual);
}

@Test
void testCheckFailure() {
@ParameterizedTest
@MethodSource("datasetIdResetterProvider")
void testCheckFailure(DatasetIdResetter resetDatasetId) {
((ObjectNode) config).put(BigQueryConsts.CONFIG_PROJECT_ID, "fake");
resetDatasetId.accept(config);
final AirbyteConnectionStatus actual = new BigQueryDestination().check(config);
final String actualMessage = actual.getMessage();
LOGGER.info("Checking expected failure message:" + actualMessage);
Expand All @@ -215,8 +224,10 @@ void testCheckFailure() {
assertEquals(expected, actual.withMessage(""));
}

@Test
void testWriteSuccess() throws Exception {
@ParameterizedTest
@MethodSource("datasetIdResetterProvider")
void testWriteSuccess(DatasetIdResetter resetDatasetId) throws Exception {
resetDatasetId.accept(config);
final BigQueryDestination destination = new BigQueryDestination();
final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector);

Expand Down Expand Up @@ -244,8 +255,10 @@ void testWriteSuccess() throws Exception {
.collect(Collectors.toList()));
}

@Test
void testWriteFailure() throws Exception {
@ParameterizedTest
@MethodSource("datasetIdResetterProvider")
void testWriteFailure(DatasetIdResetter resetDatasetId) throws Exception {
resetDatasetId.accept(config);
// hack to force an exception to be thrown from within the consumer.
final AirbyteMessage spiedMessage = spy(MESSAGE_USERS1);
doThrow(new RuntimeException()).when(spiedMessage).getRecord();
Expand Down Expand Up @@ -305,8 +318,10 @@ private List<JsonNode> retrieveRecords(final String tableName) throws Exception
.collect(Collectors.toList());
}

@Test
void testWritePartitionOverUnpartitioned() throws Exception {
@ParameterizedTest
@MethodSource("datasetIdResetterProvider")
void testWritePartitionOverUnpartitioned(DatasetIdResetter resetDatasetId) throws Exception {
resetDatasetId.accept(config);
final String raw_table_name = String.format("_airbyte_raw_%s", USERS_STREAM_NAME);
createUnpartitionedTable(bigquery, dataset, raw_table_name);
assertFalse(isTablePartitioned(bigquery, dataset, raw_table_name));
Expand Down Expand Up @@ -369,4 +384,30 @@ private boolean isTablePartitioned(final BigQuery bigquery, final Dataset datase
return false;
}

private static class DatasetIdResetter {
private Consumer<JsonNode> consumer;

DatasetIdResetter(Consumer<JsonNode> consumer) {
this.consumer = consumer;
}

public void accept(JsonNode config) {
consumer.accept(config);
}
}

private static Stream<Arguments> datasetIdResetterProvider() {
// parameterized test with two dataset-id patterns: `dataset_id` and `project-id:dataset_id`
return Stream.of(
Arguments.arguments(new DatasetIdResetter(config -> {})),
Arguments.arguments(new DatasetIdResetter(
config -> {
String projectId = ((ObjectNode) config).get(BigQueryConsts.CONFIG_PROJECT_ID).asText();
String datasetId = ((ObjectNode) config).get(BigQueryConsts.CONFIG_DATASET_ID).asText();
((ObjectNode) config).put(BigQueryConsts.CONFIG_DATASET_ID,
String.format("%s:%s", projectId, datasetId));
}
))
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.bigquery;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class BigQueryUtilsTest {

private ImmutableMap.Builder<Object, Object> configMapBuilder;

@BeforeEach
public void init() {
configMapBuilder = ImmutableMap.builder()
.put(BigQueryConsts.CONFIG_CREDS, "test_secret")
.put(BigQueryConsts.CONFIG_DATASET_LOCATION, "US");
}

@ParameterizedTest
@MethodSource("validBigQueryIdProvider")
public void testGetDatasetIdSuccess(String projectId, String datasetId, String expected) throws Exception {
JsonNode config = Jsons.jsonNode(configMapBuilder
.put(BigQueryConsts.CONFIG_PROJECT_ID, projectId)
.put(BigQueryConsts.CONFIG_DATASET_ID, datasetId)
.build());

String actual = BigQueryUtils.getDatasetId(config);

assertEquals(expected, actual);
}

@ParameterizedTest
@MethodSource("invalidBigQueryIdProvider")
public void testGetDatasetIdFail(String projectId, String datasetId, String expected) throws Exception {
JsonNode config = Jsons.jsonNode(configMapBuilder
.put(BigQueryConsts.CONFIG_PROJECT_ID, projectId)
.put(BigQueryConsts.CONFIG_DATASET_ID, datasetId)
.build());

Exception exception = assertThrows(IllegalArgumentException.class, () -> BigQueryUtils.getDatasetId(config));

assertEquals(expected, exception.getMessage());
}

private static Stream<Arguments> validBigQueryIdProvider() {
return Stream.of(
Arguments.arguments("my-project", "my_dataset", "my_dataset"),
Arguments.arguments("my-project", "my-project:my_dataset", "my_dataset"));
}

private static Stream<Arguments> invalidBigQueryIdProvider() {
return Stream.of(
Arguments.arguments("my-project", ":my_dataset",
"Project ID included in Dataset ID must match Project ID field's value: Project ID is `my-project`, but you specified `` in Dataset ID"),
Arguments.arguments("my-project", "your-project:my_dataset",
"Project ID included in Dataset ID must match Project ID field's value: Project ID is `my-project`, but you specified `your-project` in Dataset ID"));
}
}
2 changes: 2 additions & 0 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into
| Version | Date | Pull Request | Subject |
|:--------| :--- | :--- | :--- |
| 0.6.4 | 2022-01-17 | [\#8383](https://github.com/airbytehq/airbyte/issues/8383) | Support dataset-id prefixed by project-id |
| 0.6.3 | 2022-01-12 | [\#9415](https://github.com/airbytehq/airbyte/pull/9415) | BigQuery Destination : Fix GCS processing of Facebook data |
| 0.6.2 | 2022-01-10 | [\#9121](https://github.com/airbytehq/airbyte/pull/9121) | Fixed check method for GCS mode to verify if all roles assigned to user |
| 0.6.1 | 2021-12-22 | [\#9039](https://github.com/airbytehq/airbyte/pull/9039) | Added part_size configuration to UI for GCS staging |
Expand All @@ -172,6 +173,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------| :--- |
| 0.2.4 | 2022-01-17 | [\#8383](https://github.com/airbytehq/airbyte/issues/8383) | BigQuery/BiqQuery denorm Destinations : Support dataset-id prefixed by project-id |
| 0.2.3 | 2022-01-12 | [\#9415](https://github.com/airbytehq/airbyte/pull/9415) | BigQuery Destination : Fix GCS processing of Facebook data |
| 0.2.2 | 2021-12-22 | [\#9039](https://github.com/airbytehq/airbyte/pull/9039) | Added part_size configuration to UI for GCS staging |
| 0.2.1 | 2021-12-21 | [\#8574](https://github.com/airbytehq/airbyte/pull/8574) | Added namespace to Avro and Parquet record types |
Expand Down

0 comments on commit 3f9cbec

Please sign in to comment.