diff --git a/airbyte-integrations/bases/base-normalization/Dockerfile b/airbyte-integrations/bases/base-normalization/Dockerfile index 7b5882af96a2..f3db3ce267c7 100644 --- a/airbyte-integrations/bases/base-normalization/Dockerfile +++ b/airbyte-integrations/bases/base-normalization/Dockerfile @@ -28,5 +28,5 @@ WORKDIR /airbyte ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh" ENTRYPOINT ["/airbyte/entrypoint.sh"] -LABEL io.airbyte.version=0.1.70 +LABEL io.airbyte.version=0.1.71 LABEL io.airbyte.name=airbyte/normalization diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py b/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py index 06657251a50f..82ad24c96436 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py @@ -128,10 +128,24 @@ def get_ssh_altered_config(config: Dict[str, Any], port_key: str = "port", host_ def transform_bigquery(config: Dict[str, Any]): print("transform_bigquery") # https://docs.getdbt.com/reference/warehouse-profiles/bigquery-profile + + project_id = config["project_id"] + dataset_id = config["dataset_id"] + + if ":" in config["dataset_id"]: + splits = config["dataset_id"].split(":") + if len(splits) > 2: + raise ValueError("Invalid format for dataset ID (expected at most one colon)") + project_id, dataset_id = splits + if project_id != config["project_id"]: + raise ValueError( + f"Project ID in dataset ID did not match explicitly-provided project ID: {project_id} and {config['project_id']}" + ) + dbt_config = { "type": "bigquery", - "project": config["project_id"], - "dataset": config["dataset_id"], + "project": project_id, + "dataset": dataset_id, "priority": config.get("transformation_priority", "interactive"), "threads": 8, "retries": 3, diff --git a/airbyte-integrations/bases/base-normalization/snowflake.Dockerfile b/airbyte-integrations/bases/base-normalization/snowflake.Dockerfile index 8fd374047988..b1777276e320 100644 --- a/airbyte-integrations/bases/base-normalization/snowflake.Dockerfile +++ b/airbyte-integrations/bases/base-normalization/snowflake.Dockerfile @@ -29,5 +29,5 @@ WORKDIR /airbyte ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh" ENTRYPOINT ["/airbyte/entrypoint.sh"] -LABEL io.airbyte.version=0.1.70 +LABEL io.airbyte.version=0.1.71 LABEL io.airbyte.name=airbyte/normalization-snowflake diff --git a/airbyte-integrations/bases/base-normalization/unit_tests/test_transform_config.py b/airbyte-integrations/bases/base-normalization/unit_tests/test_transform_config.py index 3eb3b47120cb..334cb314d387 100644 --- a/airbyte-integrations/bases/base-normalization/unit_tests/test_transform_config.py +++ b/airbyte-integrations/bases/base-normalization/unit_tests/test_transform_config.py @@ -174,6 +174,41 @@ def test_transform_bigquery_no_credentials(self): assert expected_output == actual_output assert extract_schema(actual_output) == "my_dataset_id" + def test_transform_bigquery_with_embedded_project_id(self): + input = {"project_id": "my_project_id", "dataset_id": "my_project_id:my_dataset_id"} + + actual_output = TransformConfig().transform_bigquery(input) + expected_output = { + "type": "bigquery", + "method": "oauth", + "project": "my_project_id", + "dataset": "my_dataset_id", + "priority": "interactive", + "retries": 3, + "threads": 8, + } + + assert expected_output == actual_output + assert extract_schema(actual_output) == "my_dataset_id" + + def test_transform_bigquery_with_embedded_mismatched_project_id(self): + input = {"project_id": "my_project_id", "dataset_id": "bad_project_id:my_dataset_id"} + + try: + TransformConfig().transform_bigquery(input) + assert False, "transform_bigquery should have raised an exception" + except ValueError: + pass + + def test_transform_bigquery_with_invalid_format(self): + input = {"project_id": "my_project_id", "dataset_id": "foo:bar:baz"} + + try: + TransformConfig().transform_bigquery(input) + assert False, "transform_bigquery should have raised an exception" + except ValueError: + pass + def test_transform_postgres(self): input = { "host": "airbyte.io", diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java index 1ab4e0998190..bfec69ba6cf8 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java @@ -14,7 +14,7 @@ public class NormalizationRunnerFactory { public static final String BASE_NORMALIZATION_IMAGE_NAME = "airbyte/normalization"; - public static final String NORMALIZATION_VERSION = "0.1.70"; + public static final String NORMALIZATION_VERSION = "0.1.71"; static final Map> NORMALIZATION_MAPPING = ImmutableMap.>builder() diff --git a/docs/understanding-airbyte/basic-normalization.md b/docs/understanding-airbyte/basic-normalization.md index 6dd81a13cade..4a3fc774a5c1 100644 --- a/docs/understanding-airbyte/basic-normalization.md +++ b/docs/understanding-airbyte/basic-normalization.md @@ -350,6 +350,7 @@ Therefore, in order to "upgrade" to the desired normalization version, you need | Airbyte Version | Normalization Version | Date | Pull Request | Subject | |:----------------| :--- | :--- | :--- | :--- | +| 0.35.53-alpha | 0.1.71 | 2022-03-14 | [\#11077](https://github.com/airbytehq/airbyte/pull/11077) | Enable BigQuery to handle project ID embedded inside dataset ID | | 0.35.49-alpha | 0.1.70 | 2022-03-11 | [\#11051](https://github.com/airbytehq/airbyte/pull/11051) | Upgrade dbt to 1.0.0 (except for MySQL and Oracle) | | 0.35.45-alpha | 0.1.69 | 2022-03-04 | [\#10754](https://github.com/airbytehq/airbyte/pull/10754) | Enable Clickhouse normalization over SSL | | 0.35.32-alpha | 0.1.68 | 2022-02-20 | [\#10485](https://github.com/airbytehq/airbyte/pull/10485) | Fix row size too large for table with numerous `string` fields |