diff --git a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py index 60394855bdbf8..21b3e3d8658ce 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py @@ -21,7 +21,7 @@ import json from typing import TYPE_CHECKING, Any, Sequence -from google.api_core.exceptions import Conflict +from google.api_core.exceptions import BadRequest, Conflict from google.api_core.retry import Retry from google.cloud.bigquery import ( DEFAULT_RETRY, @@ -247,13 +247,13 @@ def __init__( # BQ config self.destination_project_dataset_table = destination_project_dataset_table self.schema_fields = schema_fields - if source_format not in ALLOWED_FORMATS: + if source_format.upper() not in ALLOWED_FORMATS: raise ValueError( f"{source_format} is not a valid source format. " f"Please use one of the following types: {ALLOWED_FORMATS}." ) else: - self.source_format = source_format + self.source_format = source_format.upper() self.compression = compression self.create_disposition = create_disposition self.skip_leading_rows = skip_leading_rows @@ -336,20 +336,23 @@ def execute(self, context: Context): ) self.source_uris = [f"gs://{self.bucket}/{source_object}" for source_object in self.source_objects] - if not self.schema_fields and self.schema_object and self.source_format != "DATASTORE_BACKUP": - gcs_hook = GCSHook( - gcp_conn_id=self.gcp_conn_id, - delegate_to=self.delegate_to, - impersonation_chain=self.impersonation_chain, - ) - self.schema_fields = json.loads( - gcs_hook.download(self.schema_object_bucket, self.schema_object).decode("utf-8") - ) - self.log.info("Autodetected fields from schema object: %s", self.schema_fields) + if not self.schema_fields: if not self.schema_object and not self.autodetect: raise AirflowException( "Table schema was not found. Neither schema object nor schema fields were specified" ) + if self.schema_object and self.source_format != "DATASTORE_BACKUP": + gcs_hook = GCSHook( + gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to, + impersonation_chain=self.impersonation_chain, + ) + self.schema_fields = json.loads( + gcs_hook.download(self.schema_object_bucket, self.schema_object).decode("utf-8") + ) + self.log.info("Loaded fields from schema object: %s", self.schema_fields) + else: + self.schema_fields = None if self.external_table: self.log.info("Creating a new BigQuery table for storing data...") @@ -469,8 +472,17 @@ def _find_max_value_in_column(self): "schemaUpdateOptions": [], } } - job_id = hook.insert_job(configuration=self.configuration, project_id=hook.project_id) - rows = list(hook.get_job(job_id=job_id, location=self.location).result()) + try: + job_id = hook.insert_job(configuration=self.configuration, project_id=hook.project_id) + rows = list(hook.get_job(job_id=job_id, location=self.location).result()) + except BadRequest as e: + if "Unrecognized name:" in e.message: + raise AirflowException( + f"Could not determine MAX value in column {self.max_id_key} " + f"since the default value of 'string_field_n' was set by BQ" + ) + else: + raise AirflowException(e.message) if rows: for row in rows: max_id = row[0] if row[0] else 0 @@ -484,53 +496,6 @@ def _find_max_value_in_column(self): else: raise RuntimeError(f"The {select_command} returned no rows!") - def _check_schema_fields(self, table_resource): - """ - Helper method to detect schema fields if they were not specified by user and autodetect=True. - If source_objects were passed, method reads the second row in CSV file. If there is at least one digit - table_resurce is returned without changes so that BigQuery can determine schema_fields in the - next step. - If there are only characters, the first row with fields is used to construct schema_fields argument - with type 'STRING'. Table_resource is updated with new schema_fileds key and returned back to operator - :param table_resource: Configuration or table_resource dictionary - :return: table_resource: Updated table_resource dict with schema_fields - """ - if not self.schema_fields: - for source_object in self.source_objects: - if self.source_format == "CSV": - gcs_hook = GCSHook( - gcp_conn_id=self.gcp_conn_id, - delegate_to=self.delegate_to, - impersonation_chain=self.impersonation_chain, - ) - blob = gcs_hook.download( - bucket_name=self.schema_object_bucket, - object_name=source_object, - ) - fields, values = [item.split(",") for item in blob.decode("utf-8").splitlines()][:2] - self.log.info("fields: %s", fields) - import re - - if any(re.match(r"[\d\-\\.]+$", value) for value in values): - self.log.info("table_resource: %s", table_resource) - return table_resource - else: - schema_fields = [] - for field in fields: - schema_fields.append({"name": field, "type": "STRING", "mode": "NULLABLE"}) - self.schema_fields = schema_fields - if self.external_table: - table_resource["externalDataConfiguration"]["csvOptions"]["skipLeadingRows"] = 1 - elif not self.external_table: - table_resource["load"]["skipLeadingRows"] = 1 - else: - return table_resource - if self.external_table: - table_resource["schema"] = {"fields": self.schema_fields} - elif not self.external_table: - table_resource["load"]["schema"] = {"fields": self.schema_fields} - return table_resource - def _create_empty_table(self): project_id, dataset_id, table_id = self.hook.split_tablename( table_input=self.destination_project_dataset_table, @@ -595,8 +560,6 @@ def _create_empty_table(self): self.encryption_configuration ) table_obj_api_repr = table.to_api_repr() - if not self.schema_fields and self.source_format == "CSV": - table_obj_api_repr = self._check_schema_fields(table_obj_api_repr) self.log.info("Creating external table: %s", self.destination_project_dataset_table) self.hook.create_empty_table( @@ -649,8 +612,6 @@ def _use_existing_table(self): if self.schema_fields: self.configuration["load"]["schema"] = {"fields": self.schema_fields} - elif self.source_format == "CSV": - self.configuration = self._check_schema_fields(self.configuration) if self.schema_update_options: if self.write_disposition not in ["WRITE_APPEND", "WRITE_TRUNCATE"]: diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py b/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py index 4e28eae226872..5340448083bb5 100644 --- a/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py +++ b/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py @@ -54,8 +54,8 @@ ] SCHEMA_BUCKET = "test-schema-bucket" SCHEMA_OBJECT = "test/schema/schema.json" -TEST_SOURCE_OBJECTS = ["test/objects/test.csv"] -TEST_SOURCE_OBJECTS_AS_STRING = "test/objects/test.csv" +TEST_SOURCE_OBJECTS_LIST = ["test/objects/test.csv"] +TEST_SOURCE_OBJECTS = "test/objects/test.csv" TEST_SOURCE_OBJECTS_JSON = "test/objects/test.json" LABELS = {"k1": "v1"} DESCRIPTION = "Test Description" @@ -101,7 +101,7 @@ def test_max_value_external_table_should_execute_successfully(self, hook): "externalDataConfiguration": { "autodetect": True, "sourceFormat": "CSV", - "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], "compression": "NONE", "ignoreUnknownValues": False, "csvOptions": { @@ -159,7 +159,7 @@ def test_max_value_without_external_table_should_execute_successfully(self, hook "createDisposition": "CREATE_IF_NEEDED", "destinationTable": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, "sourceFormat": "CSV", - "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], "writeDisposition": WRITE_DISPOSITION, "ignoreUnknownValues": False, "schema": {"fields": SCHEMA_FIELDS}, @@ -220,7 +220,7 @@ def test_max_value_should_throw_ex_when_query_returns_no_rows(self, hook): "createDisposition": "CREATE_IF_NEEDED", "destinationTable": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, "sourceFormat": "CSV", - "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], "writeDisposition": "WRITE_TRUNCATE", "ignoreUnknownValues": False, "schema": {"fields": SCHEMA_FIELDS}, @@ -283,7 +283,7 @@ def test_labels_external_table_should_execute_successfully(self, hook): "externalDataConfiguration": { "autodetect": True, "sourceFormat": "CSV", - "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], "compression": "NONE", "ignoreUnknownValues": False, "csvOptions": { @@ -328,7 +328,7 @@ def test_labels_without_external_table_should_execute_successfully(self, hook): "createDisposition": "CREATE_IF_NEEDED", "destinationTable": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, "sourceFormat": "CSV", - "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], "writeDisposition": "WRITE_TRUNCATE", "ignoreUnknownValues": False, "schema": {"fields": SCHEMA_FIELDS}, @@ -382,7 +382,7 @@ def test_description_external_table_should_execute_successfully(self, hook): "externalDataConfiguration": { "autodetect": True, "sourceFormat": "CSV", - "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], "compression": "NONE", "ignoreUnknownValues": False, "csvOptions": { @@ -432,7 +432,7 @@ def test_description_without_external_table_should_execute_successfully(self, ho }, sourceFormat="CSV", skipLeadingRows=None, - sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], writeDisposition=WRITE_DISPOSITION, ignoreUnknownValues=False, allowQuotedNewlines=False, @@ -483,7 +483,7 @@ def test_source_objs_as_list_external_table_should_execute_successfully(self, ho "autodetect": True, "sourceFormat": "CSV", "sourceUris": [ - f"gs://{TEST_BUCKET}/{source_object}" for source_object in TEST_SOURCE_OBJECTS + f"gs://{TEST_BUCKET}/{source_object}" for source_object in TEST_SOURCE_OBJECTS_LIST ], "compression": "NONE", "ignoreUnknownValues": False, @@ -584,7 +584,7 @@ def test_source_objs_as_string_external_table_should_execute_successfully(self, "externalDataConfiguration": { "autodetect": True, "sourceFormat": "CSV", - "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], "compression": "NONE", "ignoreUnknownValues": False, "csvOptions": { @@ -627,12 +627,12 @@ def test_source_objs_as_string_without_external_table_should_execute_successfull "autodetect": True, "createDisposition": "CREATE_IF_NEEDED", "destinationTable": { - "projectId": "test-project", - "datasetId": "dataset", + "projectId": PROJECT_ID, + "datasetId": DATASET, "tableId": "table", }, "sourceFormat": "CSV", - "sourceUris": ["gs://test-bucket/test/objects/test.csv"], + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], "writeDisposition": "WRITE_TRUNCATE", "ignoreUnknownValues": False, "schema": {"fields": SCHEMA_FIELDS}, @@ -654,8 +654,8 @@ def test_source_objs_as_string_without_external_table_should_execute_successfull hook.return_value.insert_job.assert_has_calls(calls) - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook") - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") + @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_schema_obj_external_table_should_execute_successfully(self, bq_hook, gcs_hook): bq_hook.return_value.insert_job.side_effect = [ MagicMock(job_id=pytest.real_job_id, error_result=False), @@ -678,33 +678,34 @@ def test_schema_obj_external_table_should_execute_successfully(self, bq_hook, gc operator.execute(context=MagicMock()) bq_hook.return_value.create_empty_table.assert_called_once_with( + exists_ok=True, + location=None, + project_id=PROJECT_ID, table_resource={ "tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, - "labels": None, - "description": None, + "labels": {}, "externalDataConfiguration": { - "source_uris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], - "source_format": "CSV", - "maxBadRecords": 0, "autodetect": True, + "sourceFormat": "CSV", + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], "compression": "NONE", + "ignoreUnknownValues": False, "csvOptions": { - "fieldDelimeter": ",", "skipLeadingRows": None, + "fieldDelimiter": ",", "quote": None, "allowQuotedNewlines": False, "allowJaggedRows": False, + "encoding": "UTF-8", }, + "schema": {"fields": SCHEMA_FIELDS}, }, - "location": None, - "encryptionConfiguration": None, - "schema": {"fields": SCHEMA_FIELDS}, - } + }, ) gcs_hook.return_value.download.assert_called_once_with(SCHEMA_BUCKET, SCHEMA_OBJECT) - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook") - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") + @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_schema_obj_without_external_table_should_execute_successfully(self, bq_hook, gcs_hook): bq_hook.return_value.insert_job.side_effect = [ MagicMock(job_id=pytest.real_job_id, error_result=False), @@ -730,28 +731,21 @@ def test_schema_obj_without_external_table_should_execute_successfully(self, bq_ calls = [ call( configuration={ - "load": dict( - autodetect=True, - createDisposition="CREATE_IF_NEEDED", - destinationTable={"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, - destinationTableProperties={ - "description": None, - "labels": None, - }, - sourceFormat="CSV", - skipLeadingRows=None, - sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], - writeDisposition=WRITE_DISPOSITION, - ignoreUnknownValues=False, - allowQuotedNewlines=False, - encoding="UTF-8", - schema={"fields": SCHEMA_FIELDS}, - allowJaggedRows=False, - fieldDelimiter=",", - maxBadRecords=0, - quote=None, - schemaUpdateOptions=(), - ), + "load": { + "autodetect": True, + "createDisposition": "CREATE_IF_NEEDED", + "destinationTable": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, + "sourceFormat": "CSV", + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], + "writeDisposition": "WRITE_TRUNCATE", + "ignoreUnknownValues": False, + "schema": {"fields": SCHEMA_FIELDS}, + "skipLeadingRows": None, + "fieldDelimiter": ",", + "quote": None, + "allowQuotedNewlines": False, + "encoding": "UTF-8", + } }, project_id=bq_hook.return_value.project_id, location=None, @@ -759,195 +753,12 @@ def test_schema_obj_without_external_table_should_execute_successfully(self, bq_ timeout=None, retry=DEFAULT_RETRY, nowait=True, - ), + ) ] bq_hook.return_value.insert_job.assert_has_calls(calls) gcs_hook.return_value.download.assert_called_once_with(SCHEMA_BUCKET, SCHEMA_OBJECT) - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") - def test_all_fields_should_be_present(self, hook): - hook.return_value.insert_job.side_effect = [ - MagicMock(job_id=pytest.real_job_id, error_result=False), - pytest.real_job_id, - ] - hook.return_value.generate_job_id.return_value = pytest.real_job_id - hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS, - schema_fields=SCHEMA_FIELDS, - destination_project_dataset_table=TEST_EXPLICIT_DEST, - write_disposition=WRITE_DISPOSITION, - external_table=False, - field_delimiter=";", - max_bad_records=13, - quote_character="|", - schema_update_options={"foo": "bar"}, - allow_jagged_rows=True, - encryption_configuration={"bar": "baz"}, - cluster_fields=["field_1", "field_2"], - ) - - operator.execute(context=MagicMock()) - - calls = [ - call( - configuration={ - "load": dict( - autodetect=True, - createDisposition="CREATE_IF_NEEDED", - destinationTable={"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, - destinationTableProperties={ - "description": None, - "labels": None, - }, - sourceFormat="CSV", - skipLeadingRows=None, - sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], - writeDisposition=WRITE_DISPOSITION, - ignoreUnknownValues=False, - allowQuotedNewlines=False, - encoding="UTF-8", - schema={"fields": SCHEMA_FIELDS}, - allowJaggedRows=True, - fieldDelimiter=";", - maxBadRecords=13, - quote="|", - schemaUpdateOptions={"foo": "bar"}, - destinationEncryptionConfiguration={"bar": "baz"}, - clustering={"fields": ["field_1", "field_2"]}, - ), - }, - project_id=hook.return_value.project_id, - location=None, - job_id=pytest.real_job_id, - timeout=None, - retry=DEFAULT_RETRY, - nowait=True, - ), - ] - - hook.return_value.insert_job.assert_has_calls(calls) - - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") - def test_date_partitioned_explicit_setting_should_be_found(self, hook): - hook.return_value.insert_job.side_effect = [ - MagicMock(job_id=pytest.real_job_id, error_result=False), - pytest.real_job_id, - ] - hook.return_value.generate_job_id.return_value = pytest.real_job_id - hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS, - schema_fields=SCHEMA_FIELDS, - destination_project_dataset_table=TEST_EXPLICIT_DEST, - write_disposition=WRITE_DISPOSITION, - external_table=False, - time_partitioning={"type": "DAY"}, - ) - - operator.execute(context=MagicMock()) - - calls = [ - call( - configuration={ - "load": dict( - autodetect=True, - createDisposition="CREATE_IF_NEEDED", - destinationTable={"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, - destinationTableProperties={ - "description": None, - "labels": None, - }, - sourceFormat="CSV", - skipLeadingRows=None, - sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], - writeDisposition=WRITE_DISPOSITION, - ignoreUnknownValues=False, - allowQuotedNewlines=False, - encoding="UTF-8", - schema={"fields": SCHEMA_FIELDS}, - allowJaggedRows=False, - fieldDelimiter=",", - maxBadRecords=0, - quote=None, - schemaUpdateOptions=(), - timePartitioning={"type": "DAY"}, - ), - }, - project_id=hook.return_value.project_id, - location=None, - job_id=pytest.real_job_id, - timeout=None, - retry=DEFAULT_RETRY, - nowait=True, - ), - ] - - hook.return_value.insert_job.assert_has_calls(calls) - - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") - def test_date_partitioned_implied_in_table_name_should_be_found(self, hook): - hook.return_value.insert_job.side_effect = [ - MagicMock(job_id=pytest.real_job_id, error_result=False), - pytest.real_job_id, - ] - hook.return_value.generate_job_id.return_value = pytest.real_job_id - hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS, - schema_fields=SCHEMA_FIELDS, - destination_project_dataset_table=TEST_EXPLICIT_DEST + "$20221123", - write_disposition=WRITE_DISPOSITION, - external_table=False, - ) - - operator.execute(context=MagicMock()) - - calls = [ - call( - configuration={ - "load": dict( - autodetect=True, - createDisposition="CREATE_IF_NEEDED", - destinationTable={"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, - destinationTableProperties={ - "description": None, - "labels": None, - }, - sourceFormat="CSV", - skipLeadingRows=None, - sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], - writeDisposition=WRITE_DISPOSITION, - ignoreUnknownValues=False, - allowQuotedNewlines=False, - encoding="UTF-8", - schema={"fields": SCHEMA_FIELDS}, - allowJaggedRows=False, - fieldDelimiter=",", - maxBadRecords=0, - quote=None, - schemaUpdateOptions=(), - timePartitioning={"type": "DAY"}, - ), - }, - project_id=hook.return_value.project_id, - location=None, - job_id=pytest.real_job_id, - timeout=None, - retry=DEFAULT_RETRY, - nowait=True, - ), - ] - - hook.return_value.insert_job.assert_has_calls(calls) - @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_execute_should_throw_ex_when_no_bucket_specified(self, hook): hook.return_value.insert_job.side_effect = [ @@ -1010,173 +821,8 @@ def test_execute_should_throw_ex_when_no_destination_project_dataset_table_speci ) operator.execute(context=MagicMock()) - @mock.patch(GCS_TO_BQ_PATH.format("GCSToBigQueryOperator._check_schema_fields")) - @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) - @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) - def test_schema_fields_scanner_external_table_should_execute_successfully( - self, bq_hook, gcs_hook, mocked_check_fileds - ): - """ - Check detection of schema fields if schema_fields parameter is not - specified and fields are read from source objects correctly by the operator - if all fields are characters. In this case operator searches for fields in source object - and update configuration with constructed schema_fields. - """ - bq_hook.return_value.insert_job.side_effect = [ - MagicMock(job_id=pytest.real_job_id, error_result=False), - pytest.real_job_id, - ] - bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id - bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - bq_hook.return_value.get_job.return_value.result.return_value = ("1",) - - gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna" - - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS, - destination_project_dataset_table=TEST_EXPLICIT_DEST, - write_disposition=WRITE_DISPOSITION, - external_table=True, - autodetect=True, - ) - - operator.execute(context=MagicMock()) - mocked_check_fileds.assert_called_once() - - @mock.patch(GCS_TO_BQ_PATH.format("GCSToBigQueryOperator._check_schema_fields")) - @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) - @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) - def test_schema_fields_scanner_should_not_called_on_json_ex_should_execute_successfully( - self, bq_hook, gcs_hook, mocked_check_fileds - ): - """ - Check not calling check_schema_fields method in case if input file is of JSON type on external table - """ - bq_hook.return_value.insert_job.side_effect = [ - MagicMock(job_id=pytest.real_job_id, error_result=False), - pytest.real_job_id, - ] - bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id - bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - bq_hook.return_value.get_job.return_value.result.return_value = ("1",) - - gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna" - - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS_JSON, - source_format="NEWLINE_DELIMITED_JSON", - destination_project_dataset_table=TEST_EXPLICIT_DEST, - write_disposition=WRITE_DISPOSITION, - external_table=True, - autodetect=True, - ) - - operator.execute(context=MagicMock()) - mocked_check_fileds.assert_not_called() - - @mock.patch(GCS_TO_BQ_PATH.format("GCSToBigQueryOperator._check_schema_fields")) - @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) - @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) - def test_schema_fields_scanner_should_not_called_on_json_should_execute_successfully( - self, bq_hook, gcs_hook, mocked_check_fileds - ): - """ - Check not calling check_schema_fields method in case if input file is of JSON type - """ - bq_hook.return_value.insert_job.side_effect = [ - MagicMock(job_id=pytest.real_job_id, error_result=False), - pytest.real_job_id, - ] - bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id - bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - bq_hook.return_value.get_job.return_value.result.return_value = ("1",) - - gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna" - - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS_JSON, - source_format="NEWLINE_DELIMITED_JSON", - destination_project_dataset_table=TEST_EXPLICIT_DEST, - write_disposition=WRITE_DISPOSITION, - external_table=False, - autodetect=True, - ) - - operator.execute(context=MagicMock()) - mocked_check_fileds.assert_not_called() - - @mock.patch(GCS_TO_BQ_PATH.format("GCSToBigQueryOperator._check_schema_fields")) - @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) - @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) - def test_schema_fields_scanner_without_external_table_should_execute_successfully( - self, bq_hook, gcs_hook, mocked_check_fileds - ): - """ - Check detection of schema fields if schema_fields parameter is not - specified and fields are read from source objects correctly by the operator - if all fields are characters. In this case operator searches for fields in source object - and update configuration with constructed schema_fields. - """ - bq_hook.return_value.insert_job.side_effect = [ - MagicMock(job_id=pytest.real_job_id, error_result=False), - pytest.real_job_id, - ] - bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id - bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - bq_hook.return_value.get_job.return_value.result.return_value = ("1",) - - gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna" - - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS, - destination_project_dataset_table=TEST_EXPLICIT_DEST, - write_disposition=WRITE_DISPOSITION, - external_table=False, - autodetect=True, - ) - - operator.execute(context=MagicMock()) - mocked_check_fileds.assert_called_once() - - @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) - def test_schema_fields_scanner_external_table_should_throw_ex_when_autodetect_not_specified( - self, - hook, - ): - hook.return_value.insert_job.side_effect = [ - MagicMock(job_id=pytest.real_job_id, error_result=False), - pytest.real_job_id, - ] - hook.return_value.generate_job_id.return_value = pytest.real_job_id - hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - hook.return_value.get_job.return_value.result.return_value = ("1",) - - with pytest.raises( - AirflowException, - match=r"Table schema was not found. Neither schema object nor schema fields were specified", - ): - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS, - destination_project_dataset_table=TEST_EXPLICIT_DEST, - max_id_key=MAX_ID_KEY, - write_disposition=WRITE_DISPOSITION, - external_table=True, - autodetect=False, - ) - operator.execute(context=MagicMock()) - @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) - def test_schema_fields_scanner_without_external_table_should_throw_ex_when_autodetect_not_specified( + def test_source_format_check_should_throw_ex_when_incorrect_source_type( self, hook, ): @@ -1189,8 +835,8 @@ def test_schema_fields_scanner_without_external_table_should_throw_ex_when_autod hook.return_value.get_job.return_value.result.return_value = ("1",) with pytest.raises( - AirflowException, - match=r"Table schema was not found. Neither schema object nor schema fields were specified", + ValueError, + match=r"is not a valid source format.", ): operator = GCSToBigQueryOperator( task_id=TASK_ID, @@ -1201,6 +847,7 @@ def test_schema_fields_scanner_without_external_table_should_throw_ex_when_autod write_disposition=WRITE_DISPOSITION, external_table=False, autodetect=False, + source_format="incorrect", ) operator.execute(context=MagicMock()) @@ -1247,7 +894,7 @@ def test_schema_fields_integer_scanner_external_table_should_execute_successfull "externalDataConfiguration": { "autodetect": True, "sourceFormat": "CSV", - "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], "compression": "NONE", "ignoreUnknownValues": False, "csvOptions": { @@ -1313,7 +960,7 @@ def test_schema_fields_integer_scanner_without_external_table_should_execute_suc "createDisposition": "CREATE_IF_NEEDED", "destinationTable": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, "sourceFormat": "CSV", - "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], "writeDisposition": WRITE_DISPOSITION, "ignoreUnknownValues": False, "skipLeadingRows": None, @@ -1374,7 +1021,7 @@ def test_schema_fields_without_external_table_should_execute_successfully(self, "createDisposition": "CREATE_IF_NEEDED", "destinationTable": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, "sourceFormat": "CSV", - "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], "writeDisposition": WRITE_DISPOSITION, "ignoreUnknownValues": False, "schema": {"fields": SCHEMA_FIELDS_INT}, @@ -1428,7 +1075,7 @@ def test_schema_fields_external_table_should_execute_successfully(self, hook): "externalDataConfiguration": { "autodetect": True, "sourceFormat": "CSV", - "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], "compression": "NONE", "ignoreUnknownValues": False, "csvOptions": { @@ -1688,7 +1335,7 @@ def test_schema_fields_without_external_table_async_should_execute_successfully( }, sourceFormat="CSV", skipLeadingRows=None, - sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], writeDisposition=WRITE_DISPOSITION, ignoreUnknownValues=False, allowQuotedNewlines=False, @@ -1764,7 +1411,7 @@ def test_schema_fields_int_without_external_table_async_should_execute_successfu }, sourceFormat="CSV", skipLeadingRows=None, - sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], writeDisposition=WRITE_DISPOSITION, ignoreUnknownValues=False, allowQuotedNewlines=False, @@ -1792,109 +1439,6 @@ def test_schema_fields_int_without_external_table_async_should_execute_successfu bq_hook.return_value.insert_job.assert_has_calls(calls) - @mock.patch(GCS_TO_BQ_PATH.format("GCSToBigQueryOperator._check_schema_fields")) - @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) - @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) - def test_async_schema_fields_scanner_external_table_should_execute_successfully( - self, bq_hook, gcs_hook, mocked_check_fileds - ): - """ - Check detection of schema fields if schema_fields parameter is not - specified and fields are read from source objects correctly by the operator - if all fields are characters. In this case operator searches for fields in source object - and update configuration with constructed schema_fields. - """ - bq_hook.return_value.insert_job.side_effect = [ - MagicMock(job_id=pytest.real_job_id, error_result=False), - pytest.real_job_id, - ] - bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id - bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - bq_hook.return_value.get_job.return_value.result.return_value = ("1",) - - gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna" - - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS, - destination_project_dataset_table=TEST_EXPLICIT_DEST, - write_disposition=WRITE_DISPOSITION, - external_table=False, - autodetect=True, - deferrable=True, - ) - with pytest.raises(TaskDeferred): - operator.execute(context=MagicMock()) - mocked_check_fileds.assert_called_once() - - @mock.patch(GCS_TO_BQ_PATH.format("GCSToBigQueryOperator._check_schema_fields")) - @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) - @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) - def test_schema_fields_scanner_should_not_called_on_json_ex_should_execute_successfully( - self, bq_hook, gcs_hook, mocked_check_fileds - ): - """ - Check not calling check_schema_fields method in case if input file is of JSON type on external table - """ - bq_hook.return_value.insert_job.side_effect = [ - MagicMock(job_id=pytest.real_job_id, error_result=False), - pytest.real_job_id, - ] - bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id - bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - bq_hook.return_value.get_job.return_value.result.return_value = ("1",) - - gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna" - - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS_JSON, - source_format="NEWLINE_DELIMITED_JSON", - destination_project_dataset_table=TEST_EXPLICIT_DEST, - write_disposition=WRITE_DISPOSITION, - external_table=True, - autodetect=True, - deferrable=True, - ) - operator.execute(context=MagicMock()) - mocked_check_fileds.assert_not_called() - - @mock.patch(GCS_TO_BQ_PATH.format("GCSToBigQueryOperator._check_schema_fields")) - @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) - @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) - def test_schema_fields_scanner_should_not_called_on_json_should_execute_successfully( - self, bq_hook, gcs_hook, mocked_check_fileds - ): - """ - Check not calling check_schema_fields method in case if input file is of JSON type - """ - bq_hook.return_value.insert_job.side_effect = [ - MagicMock(job_id=pytest.real_job_id, error_result=False), - pytest.real_job_id, - ] - bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id - bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - bq_hook.return_value.get_job.return_value.result.return_value = ("1",) - - gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna" - - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS_JSON, - source_format="NEWLINE_DELIMITED_JSON", - destination_project_dataset_table=TEST_EXPLICIT_DEST, - write_disposition=WRITE_DISPOSITION, - external_table=False, - autodetect=True, - deferrable=True, - ) - with pytest.raises(TaskDeferred): - operator.execute(context=MagicMock()) - mocked_check_fileds.assert_not_called() - def create_context(self, task): dag = DAG(dag_id="dag") logical_date = datetime(2022, 1, 1, 0, 0, 0) diff --git a/tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery_async.py b/tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery_async.py index 2f66057747e97..5888056456bc3 100644 --- a/tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery_async.py +++ b/tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery_async.py @@ -37,8 +37,12 @@ DATASET_NAME_STR = f"dataset_{DAG_ID}_{ENV_ID}_STR" DATASET_NAME_DATE = f"dataset_{DAG_ID}_{ENV_ID}_DATE" +DATASET_NAME_JSON = f"dataset_{DAG_ID}_{ENV_ID}_JSON" +DATASET_NAME_DELIMITER = f"dataset_{DAG_ID}_{ENV_ID}_DELIMITER" TABLE_NAME_STR = "test_str" TABLE_NAME_DATE = "test_date" +TABLE_NAME_JSON = "test_json" +TABLE_NAME_DELIMITER = "test_delimiter" MAX_ID_STR = "name" MAX_ID_DATE = "date" @@ -49,14 +53,24 @@ catchup=False, tags=["example", "gcs"], ) as dag: - create_test_dataset_for_string_fileds = BigQueryCreateEmptyDatasetOperator( + create_test_dataset_for_string_fields = BigQueryCreateEmptyDatasetOperator( task_id="create_airflow_test_dataset_str", dataset_id=DATASET_NAME_STR, project_id=PROJECT_ID ) - create_test_dataset_for_date_fileds = BigQueryCreateEmptyDatasetOperator( + create_test_dataset_for_date_fields = BigQueryCreateEmptyDatasetOperator( task_id="create_airflow_test_dataset_date", dataset_id=DATASET_NAME_DATE, project_id=PROJECT_ID ) + create_test_dataset_for_json_fields = BigQueryCreateEmptyDatasetOperator( + task_id="create_airflow_test_dataset_json", dataset_id=DATASET_NAME_JSON, project_id=PROJECT_ID + ) + + create_test_dataset_for_delimiter_fields = BigQueryCreateEmptyDatasetOperator( + task_id="create_airflow_test_dataset_delimiter", + dataset_id=DATASET_NAME_DELIMITER, + project_id=PROJECT_ID, + ) + # [START howto_operator_gcs_to_bigquery_async] load_string_based_csv = GCSToBigQueryOperator( task_id="gcs_to_bigquery_example_str_csv_async", @@ -87,14 +101,28 @@ bucket="cloud-samples-data", source_objects=["bigquery/us-states/us-states.json"], source_format="NEWLINE_DELIMITED_JSON", - destination_project_dataset_table=f"{DATASET_NAME_DATE}.{TABLE_NAME_DATE}", + destination_project_dataset_table=f"{DATASET_NAME_JSON}.{TABLE_NAME_JSON}", write_disposition="WRITE_TRUNCATE", external_table=False, autodetect=True, - max_id_key=MAX_ID_DATE, + max_id_key=MAX_ID_STR, deferrable=True, ) + load_csv_delimiter = GCSToBigQueryOperator( + task_id="gcs_to_bigquery_example_delimiter_async", + bucket="big-query-samples", + source_objects=["employees-tabular.csv"], + source_format="csv", + destination_project_dataset_table=f"{DATASET_NAME_DELIMITER}.{TABLE_NAME_DELIMITER}", + write_disposition="WRITE_TRUNCATE", + external_table=False, + autodetect=True, + field_delimiter="\t", + quote_character="", + max_id_key=MAX_ID_STR, + deferrable=True, + ) # [END howto_operator_gcs_to_bigquery_async] delete_test_dataset_str = BigQueryDeleteDatasetOperator( @@ -111,17 +139,36 @@ trigger_rule=TriggerRule.ALL_DONE, ) + delete_test_dataset_json = BigQueryDeleteDatasetOperator( + task_id="delete_airflow_test_json_dataset", + dataset_id=DATASET_NAME_JSON, + delete_contents=True, + trigger_rule=TriggerRule.ALL_DONE, + ) + + delete_test_dataset_delimiter = BigQueryDeleteDatasetOperator( + task_id="delete_airflow_test_delimiter", + dataset_id=DATASET_NAME_JSON, + delete_contents=True, + trigger_rule=TriggerRule.ALL_DONE, + ) + ( # TEST SETUP - create_test_dataset_for_string_fileds - >> create_test_dataset_for_date_fileds + create_test_dataset_for_string_fields + >> create_test_dataset_for_date_fields + >> create_test_dataset_for_json_fields + >> create_test_dataset_for_delimiter_fields # TEST BODY >> load_string_based_csv >> load_date_based_csv >> load_json + >> load_csv_delimiter # TEST TEARDOWN >> delete_test_dataset_str >> delete_test_dataset_date + >> delete_test_dataset_json + >> delete_test_dataset_delimiter ) from tests.system.utils.watcher import watcher