From 920c23b39755969185712403b152d663246df6aa Mon Sep 17 00:00:00 2001 From: VladaZakharova Date: Sat, 10 Dec 2022 13:39:02 +0100 Subject: [PATCH 1/4] Fix for issue with reading schema fields for JSON files --- .../google/cloud/transfers/gcs_to_bigquery.py | 427 +++-- .../google/cloud/triggers/bigquery.py | 2 + .../cloud/transfers/test_gcs_to_bigquery.py | 1495 +++++++++-------- .../gcs/example_gcs_to_bigquery_async.py | 15 + 4 files changed, 1100 insertions(+), 839 deletions(-) diff --git a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py index 63c625be87a25..4a8d1d2c43ac1 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py @@ -23,15 +23,20 @@ from google.api_core.exceptions import Conflict from google.api_core.retry import Retry -from google.cloud.bigquery import DEFAULT_RETRY, CopyJob, ExtractJob, LoadJob, QueryJob +from google.cloud.bigquery import ( + DEFAULT_RETRY, + CopyJob, + ExternalConfig, + ExtractJob, + LoadJob, + QueryJob, + SchemaField, +) +from google.cloud.bigquery.table import EncryptionConfiguration, Table, TableReference from airflow import AirflowException from airflow.models import BaseOperator -from airflow.providers.google.cloud.hooks.bigquery import ( - BigQueryHook, - BigQueryJob, - _cleanse_time_partitioning, -) +from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook, BigQueryJob from airflow.providers.google.cloud.hooks.gcs import GCSHook from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink from airflow.providers.google.cloud.triggers.bigquery import BigQueryInsertJobTrigger @@ -300,6 +305,16 @@ def execute(self, context: Context): impersonation_chain=self.impersonation_chain, ) self.hook = hook + self.source_format = self.source_format.upper() + allowed_formats = [ + "CSV", + "NEWLINE_DELIMITED_JSON", + "AVRO", + "GOOGLE_SHEETS", + "DATASTORE_BACKUP", + "PARQUET", + ] + job_id = self.hook.generate_job_id( job_id=self.job_id, dag_id=self.dag_id, @@ -312,7 +327,7 @@ def execute(self, context: Context): self.source_objects = ( self.source_objects if isinstance(self.source_objects, list) else [self.source_objects] ) - source_uris = [f"gs://{self.bucket}/{source_object}" for source_object in self.source_objects] + self.source_uris = [f"gs://{self.bucket}/{source_object}" for source_object in self.source_objects] if not self.schema_fields and self.schema_object and self.source_format != "DATASTORE_BACKUP": gcs_hook = GCSHook( @@ -324,101 +339,35 @@ def execute(self, context: Context): gcs_hook.download(self.schema_object_bucket, self.schema_object).decode("utf-8") ) self.log.info("Autodetected fields from schema object: %s", self.schema_fields) + if not self.schema_object and not self.autodetect: + raise AirflowException( + "Table schema was not found. Neither schema object nor schema fields were specified" + ) + + if self.source_format not in allowed_formats: + raise ValueError( + f"{self.source_format} is not a valid source format. " + f"Please use one of the following types: {allowed_formats}." + ) if self.external_table: self.log.info("Creating a new BigQuery table for storing data...") - project_id, dataset_id, table_id = self.hook.split_tablename( - table_input=self.destination_project_dataset_table, - default_project_id=self.hook.project_id or "", - ) - table_resource = { - "tableReference": { - "projectId": project_id, - "datasetId": dataset_id, - "tableId": table_id, - }, - "labels": self.labels, - "description": self.description, - "externalDataConfiguration": { - "source_uris": source_uris, - "source_format": self.source_format, - "maxBadRecords": self.max_bad_records, - "autodetect": self.autodetect, - "compression": self.compression, - "csvOptions": { - "fieldDelimeter": self.field_delimiter, - "skipLeadingRows": self.skip_leading_rows, - "quote": self.quote_character, - "allowQuotedNewlines": self.allow_quoted_newlines, - "allowJaggedRows": self.allow_jagged_rows, - }, - }, - "location": self.location, - "encryptionConfiguration": self.encryption_configuration, - } - table_resource_checked_schema = self._check_schema_fields(table_resource) - table = self.hook.create_empty_table( - table_resource=table_resource_checked_schema, - ) - max_id = self._find_max_value_in_column() + table_obj_api_repr = self._create_empty_table() + BigQueryTableLink.persist( context=context, task_instance=self, - dataset_id=table.to_api_repr()["tableReference"]["datasetId"], - project_id=table.to_api_repr()["tableReference"]["projectId"], - table_id=table.to_api_repr()["tableReference"]["tableId"], + dataset_id=table_obj_api_repr["tableReference"]["datasetId"], + project_id=table_obj_api_repr["tableReference"]["projectId"], + table_id=table_obj_api_repr["tableReference"]["tableId"], ) - return max_id + if self.max_id_key: + max_id = self._find_max_value_in_column() + return max_id else: self.log.info("Using existing BigQuery table for storing data...") - destination_project, destination_dataset, destination_table = self.hook.split_tablename( - table_input=self.destination_project_dataset_table, - default_project_id=self.hook.project_id or "", - var_name="destination_project_dataset_table", - ) - self.configuration = { - "load": { - "autodetect": self.autodetect, - "createDisposition": self.create_disposition, - "destinationTable": { - "projectId": destination_project, - "datasetId": destination_dataset, - "tableId": destination_table, - }, - "destinationTableProperties": { - "description": self.description, - "labels": self.labels, - }, - "sourceFormat": self.source_format, - "skipLeadingRows": self.skip_leading_rows, - "sourceUris": source_uris, - "writeDisposition": self.write_disposition, - "ignoreUnknownValues": self.ignore_unknown_values, - "allowQuotedNewlines": self.allow_quoted_newlines, - "encoding": self.encoding, - "allowJaggedRows": self.allow_jagged_rows, - "fieldDelimiter": self.field_delimiter, - "maxBadRecords": self.max_bad_records, - "quote": self.quote_character, - "schemaUpdateOptions": self.schema_update_options, - }, - } - if self.cluster_fields: - self.configuration["load"].update({"clustering": {"fields": self.cluster_fields}}) - time_partitioning = _cleanse_time_partitioning( - self.destination_project_dataset_table, self.time_partitioning - ) - if time_partitioning: - self.configuration["load"].update({"timePartitioning": time_partitioning}) - # fields that should only be set if defined - set_if_def = { - "quote": self.quote_character, - "destinationEncryptionConfiguration": self.encryption_configuration, - } - for k, v in set_if_def.items(): - if v: - self.configuration["load"][k] = v - self.configuration = self._check_schema_fields(self.configuration) + self.configuration = self._use_existing_table() + try: self.log.info("Executing: %s", self.configuration) job = self._submit_job(self.hook, job_id) @@ -480,9 +429,9 @@ def execute(self, context: Context): ) else: job.result(timeout=self.result_timeout, retry=self.result_retry) - max_id = self._find_max_value_in_column() self._handle_job_error(job) - return max_id + if self.max_id_key: + return self._find_max_value_in_column() def execute_complete(self, context: Context, event: dict[str, Any]): """ @@ -512,7 +461,6 @@ def _find_max_value_in_column(self): f"SELECT MAX({self.max_id_key}) AS max_value " f"FROM {self.destination_project_dataset_table}" ) - self.configuration = { "query": { "query": select_command, @@ -546,43 +494,272 @@ def _check_schema_fields(self, table_resource): :param table_resource: Configuration or table_resource dictionary :return: table_resource: Updated table_resource dict with schema_fields """ - if not self.autodetect and not self.schema_fields: - raise RuntimeError( - "Table schema was not found. Set autodetect=True to " - "automatically set schema fields from source objects or pass " - "schema_fields explicitly" - ) - elif not self.schema_fields: + if not self.schema_fields: for source_object in self.source_objects: - gcs_hook = GCSHook( - gcp_conn_id=self.gcp_conn_id, - delegate_to=self.delegate_to, - impersonation_chain=self.impersonation_chain, - ) - blob = gcs_hook.download( - bucket_name=self.schema_object_bucket, - object_name=source_object, - ) - fields, values = [item.split(",") for item in blob.decode("utf-8").splitlines()][:2] - import re - - if any(re.match(r"[\d\-\\.]+$", value) for value in values): - return table_resource + if self.source_format == "CSV": + gcs_hook = GCSHook( + gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to, + impersonation_chain=self.impersonation_chain, + ) + blob = gcs_hook.download( + bucket_name=self.schema_object_bucket, + object_name=source_object, + ) + fields, values = [item.split(",") for item in blob.decode("utf-8").splitlines()][:2] + self.log.info("fields: %s", fields) + import re + + if any(re.match(r"[\d\-\\.]+$", value) for value in values): + self.log.info("table_resource: %s", table_resource) + return table_resource + else: + schema_fields = [] + for field in fields: + schema_fields.append({"name": field, "type": "STRING", "mode": "NULLABLE"}) + self.schema_fields = schema_fields + if self.external_table: + table_resource["externalDataConfiguration"]["csvOptions"]["skipLeadingRows"] = 1 + elif not self.external_table: + table_resource["load"]["skipLeadingRows"] = 1 else: - schema_fields = [] - for field in fields: - schema_fields.append({"name": field, "type": "STRING", "mode": "NULLABLE"}) - self.schema_fields = schema_fields - if self.external_table: - table_resource["externalDataConfiguration"]["csvOptions"]["skipLeadingRows"] = 1 - elif not self.external_table: - table_resource["load"]["skipLeadingRows"] = 1 + return table_resource if self.external_table: table_resource["schema"] = {"fields": self.schema_fields} elif not self.external_table: table_resource["load"]["schema"] = {"fields": self.schema_fields} return table_resource + def _create_empty_table(self): + project_id, dataset_id, table_id = self.hook.split_tablename( + table_input=self.destination_project_dataset_table, + default_project_id=self.hook.project_id or "", + ) + + external_config_api_repr = { + "autodetect": self.autodetect, + "sourceFormat": self.source_format, + "sourceUris": self.source_uris, + "compression": self.compression.upper(), + "ignoreUnknownValues": self.ignore_unknown_values, + } + # if following fields are not specified in src_fmt_configs, + # honor the top-level params for backward-compatibility + backward_compatibility_configs = { + "skipLeadingRows": self.skip_leading_rows, + "fieldDelimiter": self.field_delimiter, + "quote": self.quote_character, + "allowQuotedNewlines": self.allow_quoted_newlines, + "allowJaggedRows": self.allow_jagged_rows, + "encoding": self.encoding, + } + src_fmt_to_param_mapping = {"CSV": "csvOptions", "GOOGLE_SHEETS": "googleSheetsOptions"} + src_fmt_to_configs_mapping = { + "csvOptions": [ + "allowJaggedRows", + "allowQuotedNewlines", + "fieldDelimiter", + "skipLeadingRows", + "quote", + "encoding", + ], + "googleSheetsOptions": ["skipLeadingRows"], + } + if self.source_format in src_fmt_to_param_mapping.keys(): + valid_configs = src_fmt_to_configs_mapping[src_fmt_to_param_mapping[self.source_format]] + self.src_fmt_configs = self._validate_src_fmt_configs( + self.source_format, self.src_fmt_configs, valid_configs, backward_compatibility_configs + ) + external_config_api_repr[src_fmt_to_param_mapping[self.source_format]] = self.src_fmt_configs + + external_config = ExternalConfig.from_api_repr(external_config_api_repr) + if self.schema_fields: + external_config.schema = [SchemaField.from_api_repr(f) for f in self.schema_fields] + if self.max_bad_records: + external_config.max_bad_records = self.max_bad_records + + # build table definition + table = Table( + table_ref=TableReference.from_string(self.destination_project_dataset_table, project_id) + ) + table.external_data_configuration = external_config + if self.labels: + table.labels = self.labels + + if self.description: + table.description = self.description + + if self.encryption_configuration: + table.encryption_configuration = EncryptionConfiguration.from_api_repr( + self.encryption_configuration + ) + table_obj_api_repr = table.to_api_repr() + if not self.schema_fields and self.source_format == "CSV": + table_obj_api_repr = self._check_schema_fields(table_obj_api_repr) + + self.log.info("Creating external table: %s", self.destination_project_dataset_table) + self.hook.create_empty_table( + table_resource=table_obj_api_repr, project_id=project_id, location=self.location, exists_ok=True + ) + self.log.info("External table created successfully: %s", self.destination_project_dataset_table) + return table_obj_api_repr + + def _use_existing_table(self): + destination_project, destination_dataset, destination_table = self.hook.split_tablename( + table_input=self.destination_project_dataset_table, + default_project_id=self.hook.project_id or "", + var_name="destination_project_dataset_table", + ) + + # bigquery also allows you to define how you want a table's schema to change + # as a side effect of a load + # for more details: + # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schemaUpdateOptions + allowed_schema_update_options = ["ALLOW_FIELD_ADDITION", "ALLOW_FIELD_RELAXATION"] + if not set(allowed_schema_update_options).issuperset(set(self.schema_update_options)): + raise ValueError( + f"{self.schema_update_options} contains invalid schema update options. " + f"Please only use one or more of the following options: {allowed_schema_update_options}" + ) + + self.configuration = { + "load": { + "autodetect": self.autodetect, + "createDisposition": self.create_disposition, + "destinationTable": { + "projectId": destination_project, + "datasetId": destination_dataset, + "tableId": destination_table, + }, + "sourceFormat": self.source_format, + "sourceUris": self.source_uris, + "writeDisposition": self.write_disposition, + "ignoreUnknownValues": self.ignore_unknown_values, + }, + } + self.time_partitioning = self._cleanse_time_partitioning( + self.destination_project_dataset_table, self.time_partitioning + ) + if self.time_partitioning: + self.configuration["load"].update({"timePartitioning": self.time_partitioning}) + + if self.cluster_fields: + self.configuration["load"].update({"clustering": {"fields": self.cluster_fields}}) + + if self.schema_fields: + self.configuration["load"]["schema"] = {"fields": self.schema_fields} + elif self.source_format == "CSV": + self.configuration = self._check_schema_fields(self.configuration) + + if self.schema_update_options: + if self.write_disposition not in ["WRITE_APPEND", "WRITE_TRUNCATE"]: + raise ValueError( + "schema_update_options is only " + "allowed if write_disposition is " + "'WRITE_APPEND' or 'WRITE_TRUNCATE'." + ) + else: + # To provide backward compatibility + self.schema_update_options = list(self.schema_update_options or []) + self.log.info("Adding experimental 'schemaUpdateOptions': %s", self.schema_update_options) + self.configuration["load"]["schemaUpdateOptions"] = self.schema_update_options + + if self.max_bad_records: + self.configuration["load"]["maxBadRecords"] = self.max_bad_records + + if self.encryption_configuration: + self.configuration["load"]["destinationEncryptionConfiguration"] = self.encryption_configuration + + if self.labels or self.description: + self.configuration["load"].update({"destinationTableProperties": {}}) + if self.labels: + self.configuration["load"]["destinationTableProperties"]["labels"] = self.labels + if self.description: + self.configuration["load"]["destinationTableProperties"]["description"] = self.description + + src_fmt_to_configs_mapping = { + "CSV": [ + "allowJaggedRows", + "allowQuotedNewlines", + "autodetect", + "fieldDelimiter", + "skipLeadingRows", + "ignoreUnknownValues", + "nullMarker", + "quote", + "encoding", + ], + "DATASTORE_BACKUP": ["projectionFields"], + "NEWLINE_DELIMITED_JSON": ["autodetect", "ignoreUnknownValues"], + "PARQUET": ["autodetect", "ignoreUnknownValues"], + "AVRO": ["useAvroLogicalTypes"], + } + + valid_configs = src_fmt_to_configs_mapping[self.source_format] + + # if following fields are not specified in src_fmt_configs, + # honor the top-level params for backward-compatibility + backward_compatibility_configs = { + "skipLeadingRows": self.skip_leading_rows, + "fieldDelimiter": self.field_delimiter, + "ignoreUnknownValues": self.ignore_unknown_values, + "quote": self.quote_character, + "allowQuotedNewlines": self.allow_quoted_newlines, + "encoding": self.encoding, + } + + self.src_fmt_configs = self._validate_src_fmt_configs( + self.source_format, self.src_fmt_configs, valid_configs, backward_compatibility_configs + ) + + self.configuration["load"].update(self.src_fmt_configs) + + if self.allow_jagged_rows: + self.configuration["load"]["allowJaggedRows"] = self.allow_jagged_rows + return self.configuration + + def _validate_src_fmt_configs( + self, + source_format: str, + src_fmt_configs: dict, + valid_configs: list[str], + backward_compatibility_configs: dict | None = None, + ) -> dict: + """ + Validates the given src_fmt_configs against a valid configuration for the source format. + Adds the backward compatibility config to the src_fmt_configs. + + :param source_format: File format to export. + :param src_fmt_configs: Configure optional fields specific to the source format. + :param valid_configs: Valid configuration specific to the source format + :param backward_compatibility_configs: The top-level params for backward-compatibility + """ + if backward_compatibility_configs is None: + backward_compatibility_configs = {} + + for k, v in backward_compatibility_configs.items(): + if k not in src_fmt_configs and k in valid_configs: + src_fmt_configs[k] = v + + for k, v in src_fmt_configs.items(): + if k not in valid_configs: + raise ValueError(f"{k} is not a valid src_fmt_configs for type {source_format}.") + + return src_fmt_configs + + def _cleanse_time_partitioning( + self, destination_dataset_table: str | None, time_partitioning_in: dict | None + ) -> dict: # if it is a partitioned table ($ is in the table name) add partition load option + + if time_partitioning_in is None: + time_partitioning_in = {} + + time_partitioning_out = {} + if destination_dataset_table and "$" in destination_dataset_table: + time_partitioning_out["type"] = "DAY" + time_partitioning_out.update(time_partitioning_in) + return time_partitioning_out + def on_kill(self) -> None: if self.job_id and self.cancel_on_kill: self.hook.cancel_job(job_id=self.job_id, location=self.location) # type: ignore[union-attr] diff --git a/airflow/providers/google/cloud/triggers/bigquery.py b/airflow/providers/google/cloud/triggers/bigquery.py index 271b02e3fcb02..9dd780d24afad 100644 --- a/airflow/providers/google/cloud/triggers/bigquery.py +++ b/airflow/providers/google/cloud/triggers/bigquery.py @@ -88,12 +88,14 @@ async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override] "message": "Job completed", } ) + return elif response_from_hook == "pending": self.log.info("Query is still running...") self.log.info("Sleeping for %s seconds.", self.poll_interval) await asyncio.sleep(self.poll_interval) else: yield TriggerEvent({"status": "error", "message": response_from_hook}) + return except Exception as e: self.log.exception("Exception occurred while checking for query completion") diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py b/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py index 1a9356134c239..4e28eae226872 100644 --- a/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py +++ b/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py @@ -56,6 +56,7 @@ SCHEMA_OBJECT = "test/schema/schema.json" TEST_SOURCE_OBJECTS = ["test/objects/test.csv"] TEST_SOURCE_OBJECTS_AS_STRING = "test/objects/test.csv" +TEST_SOURCE_OBJECTS_JSON = "test/objects/test.json" LABELS = {"k1": "v1"} DESCRIPTION = "Test Description" @@ -63,9 +64,11 @@ hash_ = "hash" pytest.real_job_id = f"{job_id}_{hash_}" +GCS_TO_BQ_PATH = "airflow.providers.google.cloud.transfers.gcs_to_bigquery.{}" + class TestGCSToBigQueryOperator(unittest.TestCase): - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_max_value_external_table_should_execute_successfully(self, hook): hook.return_value.insert_job.side_effect = [ MagicMock(job_id=pytest.real_job_id, error_result=False), @@ -89,28 +92,29 @@ def test_max_value_external_table_should_execute_successfully(self, hook): assert result == "1" hook.return_value.create_empty_table.assert_called_once_with( + exists_ok=True, + location=None, + project_id=PROJECT_ID, table_resource={ "tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, - "labels": None, - "description": None, + "labels": {}, "externalDataConfiguration": { - "source_uris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], - "source_format": "CSV", - "maxBadRecords": 0, "autodetect": True, + "sourceFormat": "CSV", + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], "compression": "NONE", + "ignoreUnknownValues": False, "csvOptions": { - "fieldDelimeter": ",", "skipLeadingRows": None, + "fieldDelimiter": ",", "quote": None, "allowQuotedNewlines": False, "allowJaggedRows": False, + "encoding": "UTF-8", }, + "schema": {"fields": SCHEMA_FIELDS}, }, - "location": None, - "encryptionConfiguration": None, - "schema": {"fields": SCHEMA_FIELDS}, - } + }, ) hook.return_value.insert_job.assert_called_once_with( configuration={ @@ -123,7 +127,7 @@ def test_max_value_external_table_should_execute_successfully(self, hook): project_id=hook.return_value.project_id, ) - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_max_value_without_external_table_should_execute_successfully(self, hook): hook.return_value.insert_job.side_effect = [ MagicMock(job_id=pytest.real_job_id, error_result=False), @@ -150,35 +154,28 @@ def test_max_value_without_external_table_should_execute_successfully(self, hook calls = [ call( configuration={ - "load": dict( - autodetect=True, - createDisposition="CREATE_IF_NEEDED", - destinationTable={"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, - destinationTableProperties={ - "description": None, - "labels": None, - }, - sourceFormat="CSV", - skipLeadingRows=None, - sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], - writeDisposition=WRITE_DISPOSITION, - ignoreUnknownValues=False, - allowQuotedNewlines=False, - encoding="UTF-8", - schema={"fields": SCHEMA_FIELDS}, - allowJaggedRows=False, - fieldDelimiter=",", - maxBadRecords=0, - quote=None, - schemaUpdateOptions=(), - ), + "load": { + "autodetect": True, + "createDisposition": "CREATE_IF_NEEDED", + "destinationTable": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, + "sourceFormat": "CSV", + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + "writeDisposition": WRITE_DISPOSITION, + "ignoreUnknownValues": False, + "schema": {"fields": SCHEMA_FIELDS}, + "skipLeadingRows": None, + "fieldDelimiter": ",", + "quote": None, + "allowQuotedNewlines": False, + "encoding": "UTF-8", + } }, - project_id=hook.return_value.project_id, - location=None, job_id=pytest.real_job_id, - timeout=None, - retry=DEFAULT_RETRY, + location=None, nowait=True, + project_id=hook.return_value.project_id, + retry=DEFAULT_RETRY, + timeout=None, ), call( configuration={ @@ -194,7 +191,7 @@ def test_max_value_without_external_table_should_execute_successfully(self, hook hook.return_value.insert_job.assert_has_calls(calls) - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_max_value_should_throw_ex_when_query_returns_no_rows(self, hook): hook.return_value.insert_job.side_effect = [ MagicMock(job_id=pytest.real_job_id, error_result=False), @@ -218,35 +215,28 @@ def test_max_value_should_throw_ex_when_query_returns_no_rows(self, hook): calls = [ call( configuration={ - "load": dict( - autodetect=True, - createDisposition="CREATE_IF_NEEDED", - destinationTable={"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, - destinationTableProperties={ - "description": None, - "labels": None, - }, - sourceFormat="CSV", - skipLeadingRows=None, - sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], - writeDisposition=WRITE_DISPOSITION, - ignoreUnknownValues=False, - allowQuotedNewlines=False, - encoding="UTF-8", - schema={"fields": SCHEMA_FIELDS}, - allowJaggedRows=False, - fieldDelimiter=",", - maxBadRecords=0, - quote=None, - schemaUpdateOptions=(), - ), + "load": { + "autodetect": True, + "createDisposition": "CREATE_IF_NEEDED", + "destinationTable": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, + "sourceFormat": "CSV", + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + "writeDisposition": "WRITE_TRUNCATE", + "ignoreUnknownValues": False, + "schema": {"fields": SCHEMA_FIELDS}, + "skipLeadingRows": None, + "fieldDelimiter": ",", + "quote": None, + "allowQuotedNewlines": False, + "encoding": "UTF-8", + } }, - project_id=hook.return_value.project_id, - location=None, job_id=pytest.real_job_id, - timeout=None, - retry=DEFAULT_RETRY, + location=None, nowait=True, + project_id=hook.return_value.project_id, + retry=DEFAULT_RETRY, + timeout=None, ), call( configuration={ @@ -262,7 +252,7 @@ def test_max_value_should_throw_ex_when_query_returns_no_rows(self, hook): hook.return_value.insert_job.assert_has_calls(calls) - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_labels_external_table_should_execute_successfully(self, hook): hook.return_value.insert_job.side_effect = [ MagicMock(job_id=pytest.real_job_id, error_result=False), @@ -284,31 +274,32 @@ def test_labels_external_table_should_execute_successfully(self, hook): operator.execute(context=MagicMock()) hook.return_value.create_empty_table.assert_called_once_with( + exists_ok=True, + location=None, + project_id=PROJECT_ID, table_resource={ "tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, "labels": LABELS, - "description": None, "externalDataConfiguration": { - "source_uris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], - "source_format": "CSV", - "maxBadRecords": 0, "autodetect": True, + "sourceFormat": "CSV", + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], "compression": "NONE", + "ignoreUnknownValues": False, "csvOptions": { - "fieldDelimeter": ",", "skipLeadingRows": None, + "fieldDelimiter": ",", "quote": None, "allowQuotedNewlines": False, "allowJaggedRows": False, + "encoding": "UTF-8", }, + "schema": {"fields": SCHEMA_FIELDS}, }, - "location": None, - "encryptionConfiguration": None, - "schema": {"fields": SCHEMA_FIELDS}, - } + }, ) - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_labels_without_external_table_should_execute_successfully(self, hook): hook.return_value.insert_job.side_effect = [ MagicMock(job_id=pytest.real_job_id, error_result=False), @@ -332,41 +323,35 @@ def test_labels_without_external_table_should_execute_successfully(self, hook): calls = [ call( configuration={ - "load": dict( - autodetect=True, - createDisposition="CREATE_IF_NEEDED", - destinationTable={"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, - destinationTableProperties={ - "description": None, - "labels": LABELS, - }, - sourceFormat="CSV", - skipLeadingRows=None, - sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], - writeDisposition=WRITE_DISPOSITION, - ignoreUnknownValues=False, - allowQuotedNewlines=False, - encoding="UTF-8", - schema={"fields": SCHEMA_FIELDS}, - allowJaggedRows=False, - fieldDelimiter=",", - maxBadRecords=0, - quote=None, - schemaUpdateOptions=(), - ), + "load": { + "autodetect": True, + "createDisposition": "CREATE_IF_NEEDED", + "destinationTable": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, + "sourceFormat": "CSV", + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + "writeDisposition": "WRITE_TRUNCATE", + "ignoreUnknownValues": False, + "schema": {"fields": SCHEMA_FIELDS}, + "destinationTableProperties": {"labels": LABELS}, + "skipLeadingRows": None, + "fieldDelimiter": ",", + "quote": None, + "allowQuotedNewlines": False, + "encoding": "UTF-8", + } }, - project_id=hook.return_value.project_id, - location=None, job_id=pytest.real_job_id, - timeout=None, - retry=DEFAULT_RETRY, + location=None, nowait=True, - ), + project_id=hook.return_value.project_id, + retry=DEFAULT_RETRY, + timeout=None, + ) ] hook.return_value.insert_job.assert_has_calls(calls) - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_description_external_table_should_execute_successfully(self, hook): hook.return_value.insert_job.side_effect = [ MagicMock(job_id=pytest.real_job_id, error_result=False), @@ -388,31 +373,33 @@ def test_description_external_table_should_execute_successfully(self, hook): operator.execute(context=MagicMock()) hook.return_value.create_empty_table.assert_called_once_with( + exists_ok=True, + location=None, + project_id=PROJECT_ID, table_resource={ "tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, - "labels": None, - "description": DESCRIPTION, + "labels": {}, "externalDataConfiguration": { - "source_uris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], - "source_format": "CSV", - "maxBadRecords": 0, "autodetect": True, + "sourceFormat": "CSV", + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], "compression": "NONE", + "ignoreUnknownValues": False, "csvOptions": { - "fieldDelimeter": ",", "skipLeadingRows": None, + "fieldDelimiter": ",", "quote": None, "allowQuotedNewlines": False, "allowJaggedRows": False, + "encoding": "UTF-8", }, + "schema": {"fields": SCHEMA_FIELDS}, }, - "location": None, - "encryptionConfiguration": None, - "schema": {"fields": SCHEMA_FIELDS}, - } + "description": DESCRIPTION, + }, ) - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_description_without_external_table_should_execute_successfully(self, hook): hook.return_value.insert_job.side_effect = [ MagicMock(job_id=pytest.real_job_id, error_result=False), @@ -442,7 +429,6 @@ def test_description_without_external_table_should_execute_successfully(self, ho destinationTable={"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, destinationTableProperties={ "description": DESCRIPTION, - "labels": None, }, sourceFormat="CSV", skipLeadingRows=None, @@ -452,11 +438,8 @@ def test_description_without_external_table_should_execute_successfully(self, ho allowQuotedNewlines=False, encoding="UTF-8", schema={"fields": SCHEMA_FIELDS}, - allowJaggedRows=False, - fieldDelimiter=",", - maxBadRecords=0, quote=None, - schemaUpdateOptions=(), + fieldDelimiter=",", ), }, project_id=hook.return_value.project_id, @@ -467,10 +450,9 @@ def test_description_without_external_table_should_execute_successfully(self, ho nowait=True, ), ] - hook.return_value.insert_job.assert_has_calls(calls) - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_source_objs_as_list_external_table_should_execute_successfully(self, hook): hook.return_value.insert_job.side_effect = [ MagicMock(job_id=pytest.real_job_id, error_result=False), @@ -491,33 +473,34 @@ def test_source_objs_as_list_external_table_should_execute_successfully(self, ho operator.execute(context=MagicMock()) hook.return_value.create_empty_table.assert_called_once_with( + exists_ok=True, + location=None, + project_id=PROJECT_ID, table_resource={ "tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, - "labels": None, - "description": None, + "labels": {}, "externalDataConfiguration": { - "source_uris": [ + "autodetect": True, + "sourceFormat": "CSV", + "sourceUris": [ f"gs://{TEST_BUCKET}/{source_object}" for source_object in TEST_SOURCE_OBJECTS ], - "source_format": "CSV", - "maxBadRecords": 0, - "autodetect": True, "compression": "NONE", + "ignoreUnknownValues": False, "csvOptions": { - "fieldDelimeter": ",", "skipLeadingRows": None, + "fieldDelimiter": ",", "quote": None, "allowQuotedNewlines": False, "allowJaggedRows": False, + "encoding": "UTF-8", }, + "schema": {"fields": SCHEMA_FIELDS}, }, - "location": None, - "encryptionConfiguration": None, - "schema": {"fields": SCHEMA_FIELDS}, - } + }, ) - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_source_objs_as_list_without_external_table_should_execute_successfully(self, hook): hook.return_value.insert_job.side_effect = [ MagicMock(job_id=pytest.real_job_id, error_result=False), @@ -540,43 +523,38 @@ def test_source_objs_as_list_without_external_table_should_execute_successfully( calls = [ call( configuration={ - "load": dict( - autodetect=True, - createDisposition="CREATE_IF_NEEDED", - destinationTable={"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, - destinationTableProperties={ - "description": None, - "labels": None, + "load": { + "autodetect": True, + "createDisposition": "CREATE_IF_NEEDED", + "destinationTable": { + "projectId": "test-project", + "datasetId": "dataset", + "tableId": "table", }, - sourceFormat="CSV", - skipLeadingRows=None, - sourceUris=[ - f"gs://{TEST_BUCKET}/{source_object}" for source_object in TEST_SOURCE_OBJECTS - ], - writeDisposition=WRITE_DISPOSITION, - ignoreUnknownValues=False, - allowQuotedNewlines=False, - encoding="UTF-8", - schema={"fields": SCHEMA_FIELDS}, - allowJaggedRows=False, - fieldDelimiter=",", - maxBadRecords=0, - quote=None, - schemaUpdateOptions=(), - ), + "sourceFormat": "CSV", + "sourceUris": ["gs://test-bucket/test/objects/test.csv"], + "writeDisposition": "WRITE_TRUNCATE", + "ignoreUnknownValues": False, + "schema": {"fields": SCHEMA_FIELDS}, + "skipLeadingRows": None, + "fieldDelimiter": ",", + "quote": None, + "allowQuotedNewlines": False, + "encoding": "UTF-8", + } }, - project_id=hook.return_value.project_id, - location=None, job_id=pytest.real_job_id, - timeout=None, - retry=DEFAULT_RETRY, + location=None, nowait=True, - ), + project_id=hook.return_value.project_id, + retry=DEFAULT_RETRY, + timeout=None, + ) ] hook.return_value.insert_job.assert_has_calls(calls) - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_source_objs_as_string_external_table_should_execute_successfully(self, hook): hook.return_value.insert_job.side_effect = [ MagicMock(job_id=pytest.real_job_id, error_result=False), @@ -597,31 +575,32 @@ def test_source_objs_as_string_external_table_should_execute_successfully(self, operator.execute(context=MagicMock()) hook.return_value.create_empty_table.assert_called_once_with( + exists_ok=True, + location=None, + project_id=PROJECT_ID, table_resource={ "tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, - "labels": None, - "description": None, + "labels": {}, "externalDataConfiguration": { - "source_uris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], - "source_format": "CSV", - "maxBadRecords": 0, "autodetect": True, + "sourceFormat": "CSV", + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], "compression": "NONE", + "ignoreUnknownValues": False, "csvOptions": { - "fieldDelimeter": ",", "skipLeadingRows": None, + "fieldDelimiter": ",", "quote": None, "allowQuotedNewlines": False, "allowJaggedRows": False, + "encoding": "UTF-8", }, + "schema": {"fields": SCHEMA_FIELDS}, }, - "location": None, - "encryptionConfiguration": None, - "schema": {"fields": SCHEMA_FIELDS}, - } + }, ) - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_source_objs_as_string_without_external_table_should_execute_successfully(self, hook): hook.return_value.insert_job.side_effect = [ MagicMock(job_id=pytest.real_job_id, error_result=False), @@ -644,36 +623,33 @@ def test_source_objs_as_string_without_external_table_should_execute_successfull calls = [ call( configuration={ - "load": dict( - autodetect=True, - createDisposition="CREATE_IF_NEEDED", - destinationTable={"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, - destinationTableProperties={ - "description": None, - "labels": None, + "load": { + "autodetect": True, + "createDisposition": "CREATE_IF_NEEDED", + "destinationTable": { + "projectId": "test-project", + "datasetId": "dataset", + "tableId": "table", }, - sourceFormat="CSV", - skipLeadingRows=None, - sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], - writeDisposition=WRITE_DISPOSITION, - ignoreUnknownValues=False, - allowQuotedNewlines=False, - encoding="UTF-8", - schema={"fields": SCHEMA_FIELDS}, - allowJaggedRows=False, - fieldDelimiter=",", - maxBadRecords=0, - quote=None, - schemaUpdateOptions=(), - ), + "sourceFormat": "CSV", + "sourceUris": ["gs://test-bucket/test/objects/test.csv"], + "writeDisposition": "WRITE_TRUNCATE", + "ignoreUnknownValues": False, + "schema": {"fields": SCHEMA_FIELDS}, + "skipLeadingRows": None, + "fieldDelimiter": ",", + "quote": None, + "allowQuotedNewlines": False, + "encoding": "UTF-8", + } }, - project_id=hook.return_value.project_id, - location=None, job_id=pytest.real_job_id, - timeout=None, - retry=DEFAULT_RETRY, + location=None, nowait=True, - ), + project_id=hook.return_value.project_id, + retry=DEFAULT_RETRY, + timeout=None, + ) ] hook.return_value.insert_job.assert_has_calls(calls) @@ -972,7 +948,7 @@ def test_date_partitioned_implied_in_table_name_should_be_found(self, hook): hook.return_value.insert_job.assert_has_calls(calls) - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_execute_should_throw_ex_when_no_bucket_specified(self, hook): hook.return_value.insert_job.side_effect = [ MagicMock(job_id=pytest.real_job_id, error_result=False), @@ -992,9 +968,8 @@ def test_execute_should_throw_ex_when_no_bucket_specified(self, hook): ) operator.execute(context=MagicMock()) - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_execute_should_throw_ex_when_no_source_objects_specified(self, hook): - hook.return_value.insert_job.side_effect = [ MagicMock(job_id=pytest.real_job_id, error_result=False), pytest.real_job_id, @@ -1013,7 +988,7 @@ def test_execute_should_throw_ex_when_no_source_objects_specified(self, hook): ) operator.execute(context=MagicMock()) - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_execute_should_throw_ex_when_no_destination_project_dataset_table_specified(self, hook): hook.return_value.insert_job.side_effect = [ MagicMock(job_id=pytest.real_job_id, error_result=False), @@ -1035,9 +1010,12 @@ def test_execute_should_throw_ex_when_no_destination_project_dataset_table_speci ) operator.execute(context=MagicMock()) - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook") - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") - def test_schema_fields_scanner_external_table_should_execute_successfully(self, bq_hook, gcs_hook): + @mock.patch(GCS_TO_BQ_PATH.format("GCSToBigQueryOperator._check_schema_fields")) + @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) + def test_schema_fields_scanner_external_table_should_execute_successfully( + self, bq_hook, gcs_hook, mocked_check_fileds + ): """ Check detection of schema fields if schema_fields parameter is not specified and fields are read from source objects correctly by the operator @@ -1059,54 +1037,85 @@ def test_schema_fields_scanner_external_table_should_execute_successfully(self, bucket=TEST_BUCKET, source_objects=TEST_SOURCE_OBJECTS, destination_project_dataset_table=TEST_EXPLICIT_DEST, - max_id_key=MAX_ID_KEY, write_disposition=WRITE_DISPOSITION, external_table=True, autodetect=True, ) - result = operator.execute(context=MagicMock()) + operator.execute(context=MagicMock()) + mocked_check_fileds.assert_called_once() - assert result == "1" - bq_hook.return_value.create_empty_table.assert_called_once_with( - table_resource={ - "tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, - "labels": None, - "description": None, - "externalDataConfiguration": { - "source_uris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], - "source_format": "CSV", - "maxBadRecords": 0, - "autodetect": True, - "compression": "NONE", - "csvOptions": { - "fieldDelimeter": ",", - "skipLeadingRows": 1, - "quote": None, - "allowQuotedNewlines": False, - "allowJaggedRows": False, - }, - }, - "location": None, - "encryptionConfiguration": None, - "schema": {"fields": SCHEMA_FIELDS}, - } + @mock.patch(GCS_TO_BQ_PATH.format("GCSToBigQueryOperator._check_schema_fields")) + @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) + def test_schema_fields_scanner_should_not_called_on_json_ex_should_execute_successfully( + self, bq_hook, gcs_hook, mocked_check_fileds + ): + """ + Check not calling check_schema_fields method in case if input file is of JSON type on external table + """ + bq_hook.return_value.insert_job.side_effect = [ + MagicMock(job_id=pytest.real_job_id, error_result=False), + pytest.real_job_id, + ] + bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id + bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) + bq_hook.return_value.get_job.return_value.result.return_value = ("1",) + + gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna" + + operator = GCSToBigQueryOperator( + task_id=TASK_ID, + bucket=TEST_BUCKET, + source_objects=TEST_SOURCE_OBJECTS_JSON, + source_format="NEWLINE_DELIMITED_JSON", + destination_project_dataset_table=TEST_EXPLICIT_DEST, + write_disposition=WRITE_DISPOSITION, + external_table=True, + autodetect=True, ) - bq_hook.return_value.insert_job.assert_called_once_with( - configuration={ - "query": { - "query": f"SELECT MAX({MAX_ID_KEY}) AS max_value FROM {TEST_EXPLICIT_DEST}", - "useLegacySql": False, - "schemaUpdateOptions": [], - } - }, - project_id=bq_hook.return_value.project_id, + + operator.execute(context=MagicMock()) + mocked_check_fileds.assert_not_called() + + @mock.patch(GCS_TO_BQ_PATH.format("GCSToBigQueryOperator._check_schema_fields")) + @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) + def test_schema_fields_scanner_should_not_called_on_json_should_execute_successfully( + self, bq_hook, gcs_hook, mocked_check_fileds + ): + """ + Check not calling check_schema_fields method in case if input file is of JSON type + """ + bq_hook.return_value.insert_job.side_effect = [ + MagicMock(job_id=pytest.real_job_id, error_result=False), + pytest.real_job_id, + ] + bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id + bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) + bq_hook.return_value.get_job.return_value.result.return_value = ("1",) + + gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna" + + operator = GCSToBigQueryOperator( + task_id=TASK_ID, + bucket=TEST_BUCKET, + source_objects=TEST_SOURCE_OBJECTS_JSON, + source_format="NEWLINE_DELIMITED_JSON", + destination_project_dataset_table=TEST_EXPLICIT_DEST, + write_disposition=WRITE_DISPOSITION, + external_table=False, + autodetect=True, ) - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook") - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") + operator.execute(context=MagicMock()) + mocked_check_fileds.assert_not_called() + + @mock.patch(GCS_TO_BQ_PATH.format("GCSToBigQueryOperator._check_schema_fields")) + @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_schema_fields_scanner_without_external_table_should_execute_successfully( - self, bq_hook, gcs_hook + self, bq_hook, gcs_hook, mocked_check_fileds ): """ Check detection of schema fields if schema_fields parameter is not @@ -1130,62 +1139,14 @@ def test_schema_fields_scanner_without_external_table_should_execute_successfull source_objects=TEST_SOURCE_OBJECTS, destination_project_dataset_table=TEST_EXPLICIT_DEST, write_disposition=WRITE_DISPOSITION, - max_id_key=MAX_ID_KEY, external_table=False, autodetect=True, ) - result = operator.execute(context=MagicMock()) - - assert result == "1" - calls = [ - call( - configuration={ - "load": dict( - autodetect=True, - createDisposition="CREATE_IF_NEEDED", - destinationTable={"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, - destinationTableProperties={ - "description": None, - "labels": None, - }, - sourceFormat="CSV", - skipLeadingRows=1, - sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], - writeDisposition=WRITE_DISPOSITION, - ignoreUnknownValues=False, - allowQuotedNewlines=False, - encoding="UTF-8", - schema={"fields": SCHEMA_FIELDS}, - allowJaggedRows=False, - fieldDelimiter=",", - maxBadRecords=0, - quote=None, - schemaUpdateOptions=(), - ), - }, - project_id=bq_hook.return_value.project_id, - location=None, - job_id=pytest.real_job_id, - timeout=None, - retry=DEFAULT_RETRY, - nowait=True, - ), - call( - configuration={ - "query": { - "query": f"SELECT MAX({MAX_ID_KEY}) AS max_value FROM {TEST_EXPLICIT_DEST}", - "useLegacySql": False, - "schemaUpdateOptions": [], - } - }, - project_id=bq_hook.return_value.project_id, - ), - ] - - bq_hook.return_value.insert_job.assert_has_calls(calls) + operator.execute(context=MagicMock()) + mocked_check_fileds.assert_called_once() - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_schema_fields_scanner_external_table_should_throw_ex_when_autodetect_not_specified( self, hook, @@ -1198,7 +1159,10 @@ def test_schema_fields_scanner_external_table_should_throw_ex_when_autodetect_no hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) hook.return_value.get_job.return_value.result.return_value = ("1",) - with pytest.raises(RuntimeError, match=r"Table schema was not found."): + with pytest.raises( + AirflowException, + match=r"Table schema was not found. Neither schema object nor schema fields were specified", + ): operator = GCSToBigQueryOperator( task_id=TASK_ID, bucket=TEST_BUCKET, @@ -1211,7 +1175,7 @@ def test_schema_fields_scanner_external_table_should_throw_ex_when_autodetect_no ) operator.execute(context=MagicMock()) - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_schema_fields_scanner_without_external_table_should_throw_ex_when_autodetect_not_specified( self, hook, @@ -1224,7 +1188,10 @@ def test_schema_fields_scanner_without_external_table_should_throw_ex_when_autod hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) hook.return_value.get_job.return_value.result.return_value = ("1",) - with pytest.raises(RuntimeError, match=r"Table schema was not found."): + with pytest.raises( + AirflowException, + match=r"Table schema was not found. Neither schema object nor schema fields were specified", + ): operator = GCSToBigQueryOperator( task_id=TASK_ID, bucket=TEST_BUCKET, @@ -1237,8 +1204,8 @@ def test_schema_fields_scanner_without_external_table_should_throw_ex_when_autod ) operator.execute(context=MagicMock()) - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook") - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") + @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_schema_fields_integer_scanner_external_table_should_execute_successfully( self, bq_hook, gcs_hook ): @@ -1271,27 +1238,28 @@ def test_schema_fields_integer_scanner_external_table_should_execute_successfull assert result == "1" bq_hook.return_value.create_empty_table.assert_called_once_with( + exists_ok=True, + location=None, + project_id=PROJECT_ID, table_resource={ "tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, - "labels": None, - "description": None, + "labels": {}, "externalDataConfiguration": { - "source_uris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], - "source_format": "CSV", - "maxBadRecords": 0, "autodetect": True, + "sourceFormat": "CSV", + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], "compression": "NONE", + "ignoreUnknownValues": False, "csvOptions": { - "fieldDelimeter": ",", "skipLeadingRows": None, + "fieldDelimiter": ",", "quote": None, "allowQuotedNewlines": False, "allowJaggedRows": False, + "encoding": "UTF-8", }, }, - "location": None, - "encryptionConfiguration": None, - } + }, ) bq_hook.return_value.insert_job.assert_called_once_with( configuration={ @@ -1304,8 +1272,8 @@ def test_schema_fields_integer_scanner_external_table_should_execute_successfull project_id=bq_hook.return_value.project_id, ) - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook") - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") + @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_schema_fields_integer_scanner_without_external_table_should_execute_successfully( self, bq_hook, gcs_hook ): @@ -1340,34 +1308,27 @@ def test_schema_fields_integer_scanner_without_external_table_should_execute_suc calls = [ call( configuration={ - "load": dict( - autodetect=True, - createDisposition="CREATE_IF_NEEDED", - destinationTable={"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, - destinationTableProperties={ - "description": None, - "labels": None, - }, - sourceFormat="CSV", - skipLeadingRows=None, - sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], - writeDisposition=WRITE_DISPOSITION, - ignoreUnknownValues=False, - allowQuotedNewlines=False, - encoding="UTF-8", - allowJaggedRows=False, - fieldDelimiter=",", - maxBadRecords=0, - quote=None, - schemaUpdateOptions=(), - ), + "load": { + "autodetect": True, + "createDisposition": "CREATE_IF_NEEDED", + "destinationTable": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, + "sourceFormat": "CSV", + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + "writeDisposition": WRITE_DISPOSITION, + "ignoreUnknownValues": False, + "skipLeadingRows": None, + "fieldDelimiter": ",", + "quote": None, + "allowQuotedNewlines": False, + "encoding": "UTF-8", + } }, - project_id=bq_hook.return_value.project_id, - location=None, job_id=pytest.real_job_id, - timeout=None, - retry=DEFAULT_RETRY, + location=None, nowait=True, + project_id=bq_hook.return_value.project_id, + retry=DEFAULT_RETRY, + timeout=None, ), call( configuration={ @@ -1383,7 +1344,7 @@ def test_schema_fields_integer_scanner_without_external_table_should_execute_suc bq_hook.return_value.insert_job.assert_has_calls(calls) - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_schema_fields_without_external_table_should_execute_successfully(self, hook): hook.return_value.insert_job.side_effect = [ MagicMock(job_id=pytest.real_job_id, error_result=False), @@ -1408,41 +1369,34 @@ def test_schema_fields_without_external_table_should_execute_successfully(self, calls = [ call( configuration={ - "load": dict( - autodetect=True, - createDisposition="CREATE_IF_NEEDED", - destinationTable={"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, - destinationTableProperties={ - "description": None, - "labels": None, - }, - sourceFormat="CSV", - skipLeadingRows=None, - sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], - writeDisposition=WRITE_DISPOSITION, - ignoreUnknownValues=False, - allowQuotedNewlines=False, - encoding="UTF-8", - schema={"fields": SCHEMA_FIELDS_INT}, - allowJaggedRows=False, - fieldDelimiter=",", - maxBadRecords=0, - quote=None, - schemaUpdateOptions=(), - ), + "load": { + "autodetect": True, + "createDisposition": "CREATE_IF_NEEDED", + "destinationTable": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, + "sourceFormat": "CSV", + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + "writeDisposition": WRITE_DISPOSITION, + "ignoreUnknownValues": False, + "schema": {"fields": SCHEMA_FIELDS_INT}, + "skipLeadingRows": None, + "fieldDelimiter": ",", + "quote": None, + "allowQuotedNewlines": False, + "encoding": "UTF-8", + } }, - project_id=hook.return_value.project_id, - location=None, job_id=pytest.real_job_id, - timeout=None, - retry=DEFAULT_RETRY, + location=None, nowait=True, - ), + project_id=hook.return_value.project_id, + retry=DEFAULT_RETRY, + timeout=None, + ) ] hook.return_value.insert_job.assert_has_calls(calls) - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_schema_fields_external_table_should_execute_successfully(self, hook): hook.return_value.insert_job.side_effect = [ MagicMock(job_id=pytest.real_job_id, error_result=False), @@ -1465,68 +1419,44 @@ def test_schema_fields_external_table_should_execute_successfully(self, hook): operator.execute(context=MagicMock()) hook.return_value.create_empty_table.assert_called_once_with( + exists_ok=True, + location=None, + project_id=PROJECT_ID, table_resource={ "tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, - "labels": None, - "description": None, + "labels": {}, "externalDataConfiguration": { - "source_uris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], - "source_format": "CSV", - "maxBadRecords": 0, "autodetect": True, + "sourceFormat": "CSV", + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], "compression": "NONE", + "ignoreUnknownValues": False, "csvOptions": { - "fieldDelimeter": ",", "skipLeadingRows": None, + "fieldDelimiter": ",", "quote": None, "allowQuotedNewlines": False, "allowJaggedRows": False, + "encoding": "UTF-8", }, + "schema": {"fields": SCHEMA_FIELDS_INT}, }, - "location": None, - "encryptionConfiguration": None, - "schema": {"fields": SCHEMA_FIELDS_INT}, - } + }, ) -@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") -def test_execute_without_external_table_async_should_execute_successfully(hook): - """ - Asserts that a task is deferred and a BigQueryInsertJobTrigger will be fired - when Operator is executed in deferrable. - """ - hook.return_value.insert_job.return_value = MagicMock(job_id=pytest.real_job_id, error_result=False) - hook.return_value.generate_job_id.return_value = pytest.real_job_id - hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - hook.return_value.get_job.return_value.result.return_value = ("1",) - - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS, - destination_project_dataset_table=TEST_EXPLICIT_DEST, - write_disposition=WRITE_DISPOSITION, - schema_fields=SCHEMA_FIELDS, - external_table=False, - autodetect=True, - deferrable=True, - ) - - with pytest.raises(TaskDeferred) as exc: - operator.execute(create_context(operator)) - - assert isinstance( - exc.value.trigger, BigQueryInsertJobTrigger - ), "Trigger is not a BigQueryInsertJobTrigger" - - -def test_execute_without_external_table_async_should_throw_ex_when_event_status_error(): - """ - Tests that an AirflowException is raised in case of error event. - """ - - with pytest.raises(AirflowException): +class TestAsyncGCSToBigQueryOperator(unittest.TestCase): + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) + def test_execute_without_external_table_async_should_execute_successfully(self, hook): + """ + Asserts that a task is deferred and a BigQueryInsertJobTrigger will be fired + when Operator is executed in deferrable. + """ + hook.return_value.insert_job.return_value = MagicMock(job_id=pytest.real_job_id, error_result=False) + hook.return_value.generate_job_id.return_value = pytest.real_job_id + hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) + hook.return_value.get_job.return_value.result.return_value = ("1",) + operator = GCSToBigQueryOperator( task_id=TASK_ID, bucket=TEST_BUCKET, @@ -1538,313 +1468,450 @@ def test_execute_without_external_table_async_should_throw_ex_when_event_status_ autodetect=True, deferrable=True, ) - operator.execute_complete(context=None, event={"status": "error", "message": "test failure message"}) - - -def test_execute_logging_without_external_table_async_should_execute_successfully(): - """ - Asserts that logging occurs as expected. - """ - - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS, - destination_project_dataset_table=TEST_EXPLICIT_DEST, - write_disposition=WRITE_DISPOSITION, - schema_fields=SCHEMA_FIELDS, - external_table=False, - autodetect=True, - deferrable=True, - ) - with mock.patch.object(operator.log, "info") as mock_log_info: - operator.execute_complete( - context=create_context(operator), - event={"status": "success", "message": "Job completed", "job_id": job_id}, - ) - mock_log_info.assert_called_with( - "%s completed with response %s ", "test-gcs-to-bq-operator", "Job completed" - ) - - -@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") -def test_execute_without_external_table_generate_job_id_async_should_execute_successfully(hook): - hook.return_value.insert_job.side_effect = Conflict("any") - hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - job = MagicMock( - job_id=pytest.real_job_id, - error_result=False, - state="PENDING", - done=lambda: False, - ) - hook.return_value.get_job.return_value = job - - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS, - destination_project_dataset_table=TEST_EXPLICIT_DEST, - write_disposition=WRITE_DISPOSITION, - schema_fields=SCHEMA_FIELDS, - reattach_states={"PENDING"}, - external_table=False, - autodetect=True, - deferrable=True, - ) - - with pytest.raises(TaskDeferred): - operator.execute(create_context(operator)) - - hook.return_value.generate_job_id.assert_called_once_with( - job_id=None, - dag_id="adhoc_airflow", - task_id=TASK_ID, - logical_date=datetime(2022, 1, 1, 0, 0), - configuration={}, - force_rerun=True, - ) - - -@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") -def test_execute_without_external_table_reattach_async_should_execute_successfully(hook): - hook.return_value.generate_job_id.return_value = pytest.real_job_id - - hook.return_value.insert_job.side_effect = Conflict("any") - hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - job = MagicMock( - job_id=pytest.real_job_id, - error_result=False, - state="PENDING", - done=lambda: False, - ) - hook.return_value.get_job.return_value = job - - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS, - destination_project_dataset_table=TEST_EXPLICIT_DEST, - write_disposition=WRITE_DISPOSITION, - schema_fields=SCHEMA_FIELDS, - location=TEST_DATASET_LOCATION, - reattach_states={"PENDING"}, - external_table=False, - autodetect=True, - deferrable=True, - ) - - with pytest.raises(TaskDeferred): - operator.execute(create_context(operator)) - - hook.return_value.get_job.assert_called_once_with( - location=TEST_DATASET_LOCATION, - job_id=pytest.real_job_id, - project_id=hook.return_value.project_id, - ) - - job._begin.assert_called_once_with() - - -@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") -def test_execute_without_external_table_force_rerun_async_should_execute_successfully(hook): - hook.return_value.generate_job_id.return_value = f"{job_id}_{hash_}" - hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - - hook.return_value.insert_job.side_effect = Conflict("any") - job = MagicMock( - job_id=pytest.real_job_id, - error_result=False, - state="DONE", - done=lambda: False, - ) - hook.return_value.get_job.return_value = job - - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS, - destination_project_dataset_table=TEST_EXPLICIT_DEST, - write_disposition=WRITE_DISPOSITION, - schema_fields=SCHEMA_FIELDS, - location=TEST_DATASET_LOCATION, - reattach_states={"PENDING"}, - external_table=False, - autodetect=True, - deferrable=True, - ) - - with pytest.raises(AirflowException) as exc: - operator.execute(create_context(operator)) - - expected_exception_msg = ( - f"Job with id: {pytest.real_job_id} already exists and is in {job.state} state. " - f"If you want to force rerun it consider setting `force_rerun=True`." - f"Or, if you want to reattach in this scenario add {job.state} to `reattach_states`" - ) - - assert str(exc.value) == expected_exception_msg - - hook.return_value.get_job.assert_called_once_with( - location=TEST_DATASET_LOCATION, - job_id=pytest.real_job_id, - project_id=hook.return_value.project_id, - ) - - -@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook") -@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") -def test_schema_fields_without_external_table_async_should_execute_successfully(bq_hook, gcs_hook): - bq_hook.return_value.insert_job.return_value = MagicMock(job_id=pytest.real_job_id, error_result=False) - bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id - bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - bq_hook.return_value.get_job.return_value.result.return_value = ("1",) - gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna" - - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS, - destination_project_dataset_table=TEST_EXPLICIT_DEST, - write_disposition=WRITE_DISPOSITION, - schema_fields=SCHEMA_FIELDS, - max_id_key=MAX_ID_KEY, - external_table=False, - autodetect=True, - deferrable=True, - ) - - with pytest.raises(TaskDeferred): - result = operator.execute(create_context(operator)) - assert result == "1" - calls = [ - call( - configuration={ - "load": dict( - autodetect=True, - createDisposition="CREATE_IF_NEEDED", - destinationTable={"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, - destinationTableProperties={ - "description": None, - "labels": None, - }, - sourceFormat="CSV", - skipLeadingRows=None, - sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], - writeDisposition=WRITE_DISPOSITION, - ignoreUnknownValues=False, - allowQuotedNewlines=False, - encoding="UTF-8", - schema={"fields": SCHEMA_FIELDS}, - ), - }, - project_id=bq_hook.return_value.project_id, - location=None, - job_id=pytest.real_job_id, - timeout=None, - retry=DEFAULT_RETRY, - nowait=True, - ), - call( - configuration={ - "query": { - "query": f"SELECT MAX({MAX_ID_KEY}) AS max_value FROM {TEST_EXPLICIT_DEST}", - "useLegacySql": False, - "schemaUpdateOptions": [], - } - }, - project_id=bq_hook.return_value.project_id, - ), - ] + with pytest.raises(TaskDeferred) as exc: + operator.execute(self.create_context(operator)) - bq_hook.return_value.insert_job.assert_has_calls(calls) + assert isinstance( + exc.value.trigger, BigQueryInsertJobTrigger + ), "Trigger is not a BigQueryInsertJobTrigger" + def test_execute_without_external_table_async_should_throw_ex_when_event_status_error(self): + """ + Tests that an AirflowException is raised in case of error event. + """ -@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook") -@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") -def test_schema_fields_int_without_external_table_async_should_execute_successfully(bq_hook, gcs_hook): - bq_hook.return_value.insert_job.return_value = MagicMock(job_id=pytest.real_job_id, error_result=False) - bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id - bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - bq_hook.return_value.get_job.return_value.result.return_value = ("1",) - gcs_hook.return_value.download.return_value = b"id,name\r\n1,Anna" - - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS, - destination_project_dataset_table=TEST_EXPLICIT_DEST, - write_disposition=WRITE_DISPOSITION, - schema_fields=SCHEMA_FIELDS, - max_id_key=MAX_ID_KEY, - external_table=False, - autodetect=True, - deferrable=True, - ) - - with pytest.raises(TaskDeferred): - result = operator.execute(create_context(operator)) - assert result == "1" + with pytest.raises(AirflowException): + operator = GCSToBigQueryOperator( + task_id=TASK_ID, + bucket=TEST_BUCKET, + source_objects=TEST_SOURCE_OBJECTS, + destination_project_dataset_table=TEST_EXPLICIT_DEST, + write_disposition=WRITE_DISPOSITION, + schema_fields=SCHEMA_FIELDS, + external_table=False, + autodetect=True, + deferrable=True, + ) + operator.execute_complete( + context=None, event={"status": "error", "message": "test failure message"} + ) - calls = [ - call( - configuration={ - "load": dict( - autodetect=True, - createDisposition="CREATE_IF_NEEDED", - destinationTable={"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, - destinationTableProperties={ - "description": None, - "labels": None, - }, - sourceFormat="CSV", - skipLeadingRows=None, - sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], - writeDisposition=WRITE_DISPOSITION, - ignoreUnknownValues=False, - allowQuotedNewlines=False, - encoding="UTF-8", - ), - }, - project_id=bq_hook.return_value.project_id, - location=None, - job_id=pytest.real_job_id, - timeout=None, - retry=DEFAULT_RETRY, - nowait=True, - ), - call( - configuration={ - "query": { - "query": f"SELECT MAX({MAX_ID_KEY}) AS max_value FROM {TEST_EXPLICIT_DEST}", - "useLegacySql": False, - "schemaUpdateOptions": [], - } - }, - project_id=bq_hook.return_value.project_id, - ), + def test_execute_logging_without_external_table_async_should_execute_successfully(self): + """ + Asserts that logging occurs as expected. + """ + + operator = GCSToBigQueryOperator( + task_id=TASK_ID, + bucket=TEST_BUCKET, + source_objects=TEST_SOURCE_OBJECTS, + destination_project_dataset_table=TEST_EXPLICIT_DEST, + write_disposition=WRITE_DISPOSITION, + schema_fields=SCHEMA_FIELDS, + external_table=False, + autodetect=True, + deferrable=True, + ) + with mock.patch.object(operator.log, "info") as mock_log_info: + operator.execute_complete( + context=self.create_context(operator), + event={"status": "success", "message": "Job completed", "job_id": job_id}, + ) + mock_log_info.assert_called_with( + "%s completed with response %s ", "test-gcs-to-bq-operator", "Job completed" + ) + + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) + def test_execute_without_external_table_generate_job_id_async_should_execute_successfully(self, hook): + hook.return_value.insert_job.side_effect = Conflict("any") + hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) + job = MagicMock( + job_id=pytest.real_job_id, + error_result=False, + state="PENDING", + done=lambda: False, + ) + hook.return_value.get_job.return_value = job + + operator = GCSToBigQueryOperator( + task_id=TASK_ID, + bucket=TEST_BUCKET, + source_objects=TEST_SOURCE_OBJECTS, + destination_project_dataset_table=TEST_EXPLICIT_DEST, + write_disposition=WRITE_DISPOSITION, + schema_fields=SCHEMA_FIELDS, + reattach_states={"PENDING"}, + external_table=False, + autodetect=True, + deferrable=True, + ) + + with pytest.raises(TaskDeferred): + operator.execute(self.create_context(operator)) + + hook.return_value.generate_job_id.assert_called_once_with( + job_id=None, + dag_id="adhoc_airflow", + task_id=TASK_ID, + logical_date=datetime(2022, 1, 1, 0, 0), + configuration={}, + force_rerun=True, + ) + + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) + def test_execute_without_external_table_reattach_async_should_execute_successfully(self, hook): + hook.return_value.generate_job_id.return_value = pytest.real_job_id + + hook.return_value.insert_job.side_effect = Conflict("any") + hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) + job = MagicMock( + job_id=pytest.real_job_id, + error_result=False, + state="PENDING", + done=lambda: False, + ) + hook.return_value.get_job.return_value = job + + operator = GCSToBigQueryOperator( + task_id=TASK_ID, + bucket=TEST_BUCKET, + source_objects=TEST_SOURCE_OBJECTS, + destination_project_dataset_table=TEST_EXPLICIT_DEST, + write_disposition=WRITE_DISPOSITION, + schema_fields=SCHEMA_FIELDS, + location=TEST_DATASET_LOCATION, + reattach_states={"PENDING"}, + external_table=False, + autodetect=True, + deferrable=True, + ) + + with pytest.raises(TaskDeferred): + operator.execute(self.create_context(operator)) + + hook.return_value.get_job.assert_called_once_with( + location=TEST_DATASET_LOCATION, + job_id=pytest.real_job_id, + project_id=hook.return_value.project_id, + ) + + job._begin.assert_called_once_with() + + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) + def test_execute_without_external_table_force_rerun_async_should_execute_successfully(self, hook): + hook.return_value.generate_job_id.return_value = f"{job_id}_{hash_}" + hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) + + hook.return_value.insert_job.side_effect = Conflict("any") + job = MagicMock( + job_id=pytest.real_job_id, + error_result=False, + state="DONE", + done=lambda: False, + ) + hook.return_value.get_job.return_value = job + + operator = GCSToBigQueryOperator( + task_id=TASK_ID, + bucket=TEST_BUCKET, + source_objects=TEST_SOURCE_OBJECTS, + destination_project_dataset_table=TEST_EXPLICIT_DEST, + write_disposition=WRITE_DISPOSITION, + schema_fields=SCHEMA_FIELDS, + location=TEST_DATASET_LOCATION, + reattach_states={"PENDING"}, + external_table=False, + autodetect=True, + deferrable=True, + ) + + with pytest.raises(AirflowException) as exc: + operator.execute(self.create_context(operator)) + + expected_exception_msg = ( + f"Job with id: {pytest.real_job_id} already exists and is in {job.state} state. " + f"If you want to force rerun it consider setting `force_rerun=True`." + f"Or, if you want to reattach in this scenario add {job.state} to `reattach_states`" + ) + + assert str(exc.value) == expected_exception_msg + + hook.return_value.get_job.assert_called_once_with( + location=TEST_DATASET_LOCATION, + job_id=pytest.real_job_id, + project_id=hook.return_value.project_id, + ) + + @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) + def test_schema_fields_without_external_table_async_should_execute_successfully(self, bq_hook, gcs_hook): + bq_hook.return_value.insert_job.return_value = MagicMock( + job_id=pytest.real_job_id, error_result=False + ) + bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id + bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) + bq_hook.return_value.get_job.return_value.result.return_value = ("1",) + gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna" + + operator = GCSToBigQueryOperator( + task_id=TASK_ID, + bucket=TEST_BUCKET, + source_objects=TEST_SOURCE_OBJECTS, + destination_project_dataset_table=TEST_EXPLICIT_DEST, + write_disposition=WRITE_DISPOSITION, + schema_fields=SCHEMA_FIELDS, + max_id_key=MAX_ID_KEY, + external_table=False, + autodetect=True, + deferrable=True, + ) + + with pytest.raises(TaskDeferred): + result = operator.execute(self.create_context(operator)) + assert result == "1" + + calls = [ + call( + configuration={ + "load": dict( + autodetect=True, + createDisposition="CREATE_IF_NEEDED", + destinationTable={ + "projectId": PROJECT_ID, + "datasetId": DATASET, + "tableId": TABLE, + }, + destinationTableProperties={ + "description": None, + "labels": None, + }, + sourceFormat="CSV", + skipLeadingRows=None, + sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + writeDisposition=WRITE_DISPOSITION, + ignoreUnknownValues=False, + allowQuotedNewlines=False, + encoding="UTF-8", + schema={"fields": SCHEMA_FIELDS}, + ), + }, + project_id=bq_hook.return_value.project_id, + location=None, + job_id=pytest.real_job_id, + timeout=None, + retry=DEFAULT_RETRY, + nowait=True, + ), + call( + configuration={ + "query": { + "query": f"SELECT MAX({MAX_ID_KEY}) AS max_value FROM {TEST_EXPLICIT_DEST}", + "useLegacySql": False, + "schemaUpdateOptions": [], + } + }, + project_id=bq_hook.return_value.project_id, + ), + ] + + bq_hook.return_value.insert_job.assert_has_calls(calls) + + @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) + def test_schema_fields_int_without_external_table_async_should_execute_successfully( + self, bq_hook, gcs_hook + ): + bq_hook.return_value.insert_job.return_value = MagicMock( + job_id=pytest.real_job_id, error_result=False + ) + bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id + bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) + bq_hook.return_value.get_job.return_value.result.return_value = ("1",) + gcs_hook.return_value.download.return_value = b"id,name\r\n1,Anna" + + operator = GCSToBigQueryOperator( + task_id=TASK_ID, + bucket=TEST_BUCKET, + source_objects=TEST_SOURCE_OBJECTS, + destination_project_dataset_table=TEST_EXPLICIT_DEST, + write_disposition=WRITE_DISPOSITION, + schema_fields=SCHEMA_FIELDS, + max_id_key=MAX_ID_KEY, + external_table=False, + autodetect=True, + deferrable=True, + ) + + with pytest.raises(TaskDeferred): + result = operator.execute(self.create_context(operator)) + assert result == "1" + + calls = [ + call( + configuration={ + "load": dict( + autodetect=True, + createDisposition="CREATE_IF_NEEDED", + destinationTable={ + "projectId": PROJECT_ID, + "datasetId": DATASET, + "tableId": TABLE, + }, + destinationTableProperties={ + "description": None, + "labels": None, + }, + sourceFormat="CSV", + skipLeadingRows=None, + sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + writeDisposition=WRITE_DISPOSITION, + ignoreUnknownValues=False, + allowQuotedNewlines=False, + encoding="UTF-8", + ), + }, + project_id=bq_hook.return_value.project_id, + location=None, + job_id=pytest.real_job_id, + timeout=None, + retry=DEFAULT_RETRY, + nowait=True, + ), + call( + configuration={ + "query": { + "query": f"SELECT MAX({MAX_ID_KEY}) AS max_value FROM {TEST_EXPLICIT_DEST}", + "useLegacySql": False, + "schemaUpdateOptions": [], + } + }, + project_id=bq_hook.return_value.project_id, + ), + ] + + bq_hook.return_value.insert_job.assert_has_calls(calls) + + @mock.patch(GCS_TO_BQ_PATH.format("GCSToBigQueryOperator._check_schema_fields")) + @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) + def test_async_schema_fields_scanner_external_table_should_execute_successfully( + self, bq_hook, gcs_hook, mocked_check_fileds + ): + """ + Check detection of schema fields if schema_fields parameter is not + specified and fields are read from source objects correctly by the operator + if all fields are characters. In this case operator searches for fields in source object + and update configuration with constructed schema_fields. + """ + bq_hook.return_value.insert_job.side_effect = [ + MagicMock(job_id=pytest.real_job_id, error_result=False), + pytest.real_job_id, ] + bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id + bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) + bq_hook.return_value.get_job.return_value.result.return_value = ("1",) - bq_hook.return_value.insert_job.assert_has_calls(calls) + gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna" + operator = GCSToBigQueryOperator( + task_id=TASK_ID, + bucket=TEST_BUCKET, + source_objects=TEST_SOURCE_OBJECTS, + destination_project_dataset_table=TEST_EXPLICIT_DEST, + write_disposition=WRITE_DISPOSITION, + external_table=False, + autodetect=True, + deferrable=True, + ) + with pytest.raises(TaskDeferred): + operator.execute(context=MagicMock()) + mocked_check_fileds.assert_called_once() -def create_context(task): - dag = DAG(dag_id="dag") - logical_date = datetime(2022, 1, 1, 0, 0, 0) - dag_run = DagRun( - dag_id=dag.dag_id, - execution_date=logical_date, - run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date), - ) - task_instance = TaskInstance(task=task) - task_instance.dag_run = dag_run - task_instance.dag_id = dag.dag_id - task_instance.xcom_push = mock.Mock() - return { - "dag": dag, - "run_id": dag_run.run_id, - "task": task, - "ti": task_instance, - "task_instance": task_instance, - "logical_date": logical_date, - } + @mock.patch(GCS_TO_BQ_PATH.format("GCSToBigQueryOperator._check_schema_fields")) + @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) + def test_schema_fields_scanner_should_not_called_on_json_ex_should_execute_successfully( + self, bq_hook, gcs_hook, mocked_check_fileds + ): + """ + Check not calling check_schema_fields method in case if input file is of JSON type on external table + """ + bq_hook.return_value.insert_job.side_effect = [ + MagicMock(job_id=pytest.real_job_id, error_result=False), + pytest.real_job_id, + ] + bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id + bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) + bq_hook.return_value.get_job.return_value.result.return_value = ("1",) + + gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna" + + operator = GCSToBigQueryOperator( + task_id=TASK_ID, + bucket=TEST_BUCKET, + source_objects=TEST_SOURCE_OBJECTS_JSON, + source_format="NEWLINE_DELIMITED_JSON", + destination_project_dataset_table=TEST_EXPLICIT_DEST, + write_disposition=WRITE_DISPOSITION, + external_table=True, + autodetect=True, + deferrable=True, + ) + operator.execute(context=MagicMock()) + mocked_check_fileds.assert_not_called() + + @mock.patch(GCS_TO_BQ_PATH.format("GCSToBigQueryOperator._check_schema_fields")) + @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) + def test_schema_fields_scanner_should_not_called_on_json_should_execute_successfully( + self, bq_hook, gcs_hook, mocked_check_fileds + ): + """ + Check not calling check_schema_fields method in case if input file is of JSON type + """ + bq_hook.return_value.insert_job.side_effect = [ + MagicMock(job_id=pytest.real_job_id, error_result=False), + pytest.real_job_id, + ] + bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id + bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) + bq_hook.return_value.get_job.return_value.result.return_value = ("1",) + + gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna" + + operator = GCSToBigQueryOperator( + task_id=TASK_ID, + bucket=TEST_BUCKET, + source_objects=TEST_SOURCE_OBJECTS_JSON, + source_format="NEWLINE_DELIMITED_JSON", + destination_project_dataset_table=TEST_EXPLICIT_DEST, + write_disposition=WRITE_DISPOSITION, + external_table=False, + autodetect=True, + deferrable=True, + ) + with pytest.raises(TaskDeferred): + operator.execute(context=MagicMock()) + mocked_check_fileds.assert_not_called() + + def create_context(self, task): + dag = DAG(dag_id="dag") + logical_date = datetime(2022, 1, 1, 0, 0, 0) + dag_run = DagRun( + dag_id=dag.dag_id, + execution_date=logical_date, + run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date), + ) + task_instance = TaskInstance(task=task) + task_instance.dag_run = dag_run + task_instance.dag_id = dag.dag_id + task_instance.xcom_push = mock.Mock() + return { + "dag": dag, + "run_id": dag_run.run_id, + "task": task, + "ti": task_instance, + "task_instance": task_instance, + "logical_date": logical_date, + } diff --git a/tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery_async.py b/tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery_async.py index ebfebab58573c..d13812df337c1 100644 --- a/tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery_async.py +++ b/tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery_async.py @@ -81,6 +81,20 @@ max_id_key=MAX_ID_DATE, deferrable=True, ) + + load_json = GCSToBigQueryOperator( + task_id="gcs_to_bigquery_example_date_csv_async", + bucket="cloud-samples-data", + source_objects=["bigquery/us-states/us-states.json"], + source_format="NEWLINE_DELIMITED_JSON", + destination_project_dataset_table=f"{DATASET_NAME_DATE}.{TABLE_NAME_DATE}", + write_disposition="WRITE_TRUNCATE", + external_table=False, + autodetect=True, + max_id_key=MAX_ID_DATE, + deferrable=True, + ) + # [END howto_operator_gcs_to_bigquery_async] delete_test_dataset_str = BigQueryDeleteDatasetOperator( @@ -104,6 +118,7 @@ # TEST BODY >> load_string_based_csv >> load_date_based_csv + >> load_json # TEST TEARDOWN >> delete_test_dataset_str >> delete_test_dataset_date From 21b6ad99ec04745a7880b50991781bad7f095365 Mon Sep 17 00:00:00 2001 From: VladaZakharova Date: Sun, 11 Dec 2022 00:22:36 +0100 Subject: [PATCH 2/4] Update example_gcs_to_bigquery_async.py --- .../providers/google/cloud/gcs/example_gcs_to_bigquery_async.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery_async.py b/tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery_async.py index d13812df337c1..2f66057747e97 100644 --- a/tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery_async.py +++ b/tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery_async.py @@ -83,7 +83,7 @@ ) load_json = GCSToBigQueryOperator( - task_id="gcs_to_bigquery_example_date_csv_async", + task_id="gcs_to_bigquery_example_date_json_async", bucket="cloud-samples-data", source_objects=["bigquery/us-states/us-states.json"], source_format="NEWLINE_DELIMITED_JSON", From 7e5bb81269a864e41a0d62ae6fbeda950aff49f1 Mon Sep 17 00:00:00 2001 From: VladaZakharova Date: Sun, 11 Dec 2022 00:58:39 +0100 Subject: [PATCH 3/4] move checking source_format to __init__ --- .../google/cloud/transfers/gcs_to_bigquery.py | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py index 4a8d1d2c43ac1..60394855bdbf8 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py @@ -44,6 +44,15 @@ if TYPE_CHECKING: from airflow.utils.context import Context +ALLOWED_FORMATS = [ + "CSV", + "NEWLINE_DELIMITED_JSON", + "AVRO", + "GOOGLE_SHEETS", + "DATASTORE_BACKUP", + "PARQUET", +] + class GCSToBigQueryOperator(BaseOperator): """ @@ -238,7 +247,13 @@ def __init__( # BQ config self.destination_project_dataset_table = destination_project_dataset_table self.schema_fields = schema_fields - self.source_format = source_format + if source_format not in ALLOWED_FORMATS: + raise ValueError( + f"{source_format} is not a valid source format. " + f"Please use one of the following types: {ALLOWED_FORMATS}." + ) + else: + self.source_format = source_format self.compression = compression self.create_disposition = create_disposition self.skip_leading_rows = skip_leading_rows @@ -306,14 +321,6 @@ def execute(self, context: Context): ) self.hook = hook self.source_format = self.source_format.upper() - allowed_formats = [ - "CSV", - "NEWLINE_DELIMITED_JSON", - "AVRO", - "GOOGLE_SHEETS", - "DATASTORE_BACKUP", - "PARQUET", - ] job_id = self.hook.generate_job_id( job_id=self.job_id, @@ -344,12 +351,6 @@ def execute(self, context: Context): "Table schema was not found. Neither schema object nor schema fields were specified" ) - if self.source_format not in allowed_formats: - raise ValueError( - f"{self.source_format} is not a valid source format. " - f"Please use one of the following types: {allowed_formats}." - ) - if self.external_table: self.log.info("Creating a new BigQuery table for storing data...") table_obj_api_repr = self._create_empty_table() From 9ffa97fd10374108e705390cbd19243da0c28303 Mon Sep 17 00:00:00 2001 From: VladaZakharova Date: Mon, 19 Dec 2022 12:21:26 +0100 Subject: [PATCH 4/4] Revert changes for determining schema fields in _check_schema_fields() --- .../google/cloud/transfers/gcs_to_bigquery.py | 93 +-- .../cloud/transfers/test_gcs_to_bigquery.py | 566 ++---------------- .../gcs/example_gcs_to_bigquery_async.py | 59 +- 3 files changed, 135 insertions(+), 583 deletions(-) diff --git a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py index 60394855bdbf8..21b3e3d8658ce 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py @@ -21,7 +21,7 @@ import json from typing import TYPE_CHECKING, Any, Sequence -from google.api_core.exceptions import Conflict +from google.api_core.exceptions import BadRequest, Conflict from google.api_core.retry import Retry from google.cloud.bigquery import ( DEFAULT_RETRY, @@ -247,13 +247,13 @@ def __init__( # BQ config self.destination_project_dataset_table = destination_project_dataset_table self.schema_fields = schema_fields - if source_format not in ALLOWED_FORMATS: + if source_format.upper() not in ALLOWED_FORMATS: raise ValueError( f"{source_format} is not a valid source format. " f"Please use one of the following types: {ALLOWED_FORMATS}." ) else: - self.source_format = source_format + self.source_format = source_format.upper() self.compression = compression self.create_disposition = create_disposition self.skip_leading_rows = skip_leading_rows @@ -336,20 +336,23 @@ def execute(self, context: Context): ) self.source_uris = [f"gs://{self.bucket}/{source_object}" for source_object in self.source_objects] - if not self.schema_fields and self.schema_object and self.source_format != "DATASTORE_BACKUP": - gcs_hook = GCSHook( - gcp_conn_id=self.gcp_conn_id, - delegate_to=self.delegate_to, - impersonation_chain=self.impersonation_chain, - ) - self.schema_fields = json.loads( - gcs_hook.download(self.schema_object_bucket, self.schema_object).decode("utf-8") - ) - self.log.info("Autodetected fields from schema object: %s", self.schema_fields) + if not self.schema_fields: if not self.schema_object and not self.autodetect: raise AirflowException( "Table schema was not found. Neither schema object nor schema fields were specified" ) + if self.schema_object and self.source_format != "DATASTORE_BACKUP": + gcs_hook = GCSHook( + gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to, + impersonation_chain=self.impersonation_chain, + ) + self.schema_fields = json.loads( + gcs_hook.download(self.schema_object_bucket, self.schema_object).decode("utf-8") + ) + self.log.info("Loaded fields from schema object: %s", self.schema_fields) + else: + self.schema_fields = None if self.external_table: self.log.info("Creating a new BigQuery table for storing data...") @@ -469,8 +472,17 @@ def _find_max_value_in_column(self): "schemaUpdateOptions": [], } } - job_id = hook.insert_job(configuration=self.configuration, project_id=hook.project_id) - rows = list(hook.get_job(job_id=job_id, location=self.location).result()) + try: + job_id = hook.insert_job(configuration=self.configuration, project_id=hook.project_id) + rows = list(hook.get_job(job_id=job_id, location=self.location).result()) + except BadRequest as e: + if "Unrecognized name:" in e.message: + raise AirflowException( + f"Could not determine MAX value in column {self.max_id_key} " + f"since the default value of 'string_field_n' was set by BQ" + ) + else: + raise AirflowException(e.message) if rows: for row in rows: max_id = row[0] if row[0] else 0 @@ -484,53 +496,6 @@ def _find_max_value_in_column(self): else: raise RuntimeError(f"The {select_command} returned no rows!") - def _check_schema_fields(self, table_resource): - """ - Helper method to detect schema fields if they were not specified by user and autodetect=True. - If source_objects were passed, method reads the second row in CSV file. If there is at least one digit - table_resurce is returned without changes so that BigQuery can determine schema_fields in the - next step. - If there are only characters, the first row with fields is used to construct schema_fields argument - with type 'STRING'. Table_resource is updated with new schema_fileds key and returned back to operator - :param table_resource: Configuration or table_resource dictionary - :return: table_resource: Updated table_resource dict with schema_fields - """ - if not self.schema_fields: - for source_object in self.source_objects: - if self.source_format == "CSV": - gcs_hook = GCSHook( - gcp_conn_id=self.gcp_conn_id, - delegate_to=self.delegate_to, - impersonation_chain=self.impersonation_chain, - ) - blob = gcs_hook.download( - bucket_name=self.schema_object_bucket, - object_name=source_object, - ) - fields, values = [item.split(",") for item in blob.decode("utf-8").splitlines()][:2] - self.log.info("fields: %s", fields) - import re - - if any(re.match(r"[\d\-\\.]+$", value) for value in values): - self.log.info("table_resource: %s", table_resource) - return table_resource - else: - schema_fields = [] - for field in fields: - schema_fields.append({"name": field, "type": "STRING", "mode": "NULLABLE"}) - self.schema_fields = schema_fields - if self.external_table: - table_resource["externalDataConfiguration"]["csvOptions"]["skipLeadingRows"] = 1 - elif not self.external_table: - table_resource["load"]["skipLeadingRows"] = 1 - else: - return table_resource - if self.external_table: - table_resource["schema"] = {"fields": self.schema_fields} - elif not self.external_table: - table_resource["load"]["schema"] = {"fields": self.schema_fields} - return table_resource - def _create_empty_table(self): project_id, dataset_id, table_id = self.hook.split_tablename( table_input=self.destination_project_dataset_table, @@ -595,8 +560,6 @@ def _create_empty_table(self): self.encryption_configuration ) table_obj_api_repr = table.to_api_repr() - if not self.schema_fields and self.source_format == "CSV": - table_obj_api_repr = self._check_schema_fields(table_obj_api_repr) self.log.info("Creating external table: %s", self.destination_project_dataset_table) self.hook.create_empty_table( @@ -649,8 +612,6 @@ def _use_existing_table(self): if self.schema_fields: self.configuration["load"]["schema"] = {"fields": self.schema_fields} - elif self.source_format == "CSV": - self.configuration = self._check_schema_fields(self.configuration) if self.schema_update_options: if self.write_disposition not in ["WRITE_APPEND", "WRITE_TRUNCATE"]: diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py b/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py index 4e28eae226872..5340448083bb5 100644 --- a/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py +++ b/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py @@ -54,8 +54,8 @@ ] SCHEMA_BUCKET = "test-schema-bucket" SCHEMA_OBJECT = "test/schema/schema.json" -TEST_SOURCE_OBJECTS = ["test/objects/test.csv"] -TEST_SOURCE_OBJECTS_AS_STRING = "test/objects/test.csv" +TEST_SOURCE_OBJECTS_LIST = ["test/objects/test.csv"] +TEST_SOURCE_OBJECTS = "test/objects/test.csv" TEST_SOURCE_OBJECTS_JSON = "test/objects/test.json" LABELS = {"k1": "v1"} DESCRIPTION = "Test Description" @@ -101,7 +101,7 @@ def test_max_value_external_table_should_execute_successfully(self, hook): "externalDataConfiguration": { "autodetect": True, "sourceFormat": "CSV", - "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], "compression": "NONE", "ignoreUnknownValues": False, "csvOptions": { @@ -159,7 +159,7 @@ def test_max_value_without_external_table_should_execute_successfully(self, hook "createDisposition": "CREATE_IF_NEEDED", "destinationTable": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, "sourceFormat": "CSV", - "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], "writeDisposition": WRITE_DISPOSITION, "ignoreUnknownValues": False, "schema": {"fields": SCHEMA_FIELDS}, @@ -220,7 +220,7 @@ def test_max_value_should_throw_ex_when_query_returns_no_rows(self, hook): "createDisposition": "CREATE_IF_NEEDED", "destinationTable": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, "sourceFormat": "CSV", - "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], "writeDisposition": "WRITE_TRUNCATE", "ignoreUnknownValues": False, "schema": {"fields": SCHEMA_FIELDS}, @@ -283,7 +283,7 @@ def test_labels_external_table_should_execute_successfully(self, hook): "externalDataConfiguration": { "autodetect": True, "sourceFormat": "CSV", - "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], "compression": "NONE", "ignoreUnknownValues": False, "csvOptions": { @@ -328,7 +328,7 @@ def test_labels_without_external_table_should_execute_successfully(self, hook): "createDisposition": "CREATE_IF_NEEDED", "destinationTable": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, "sourceFormat": "CSV", - "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], "writeDisposition": "WRITE_TRUNCATE", "ignoreUnknownValues": False, "schema": {"fields": SCHEMA_FIELDS}, @@ -382,7 +382,7 @@ def test_description_external_table_should_execute_successfully(self, hook): "externalDataConfiguration": { "autodetect": True, "sourceFormat": "CSV", - "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], "compression": "NONE", "ignoreUnknownValues": False, "csvOptions": { @@ -432,7 +432,7 @@ def test_description_without_external_table_should_execute_successfully(self, ho }, sourceFormat="CSV", skipLeadingRows=None, - sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], writeDisposition=WRITE_DISPOSITION, ignoreUnknownValues=False, allowQuotedNewlines=False, @@ -483,7 +483,7 @@ def test_source_objs_as_list_external_table_should_execute_successfully(self, ho "autodetect": True, "sourceFormat": "CSV", "sourceUris": [ - f"gs://{TEST_BUCKET}/{source_object}" for source_object in TEST_SOURCE_OBJECTS + f"gs://{TEST_BUCKET}/{source_object}" for source_object in TEST_SOURCE_OBJECTS_LIST ], "compression": "NONE", "ignoreUnknownValues": False, @@ -584,7 +584,7 @@ def test_source_objs_as_string_external_table_should_execute_successfully(self, "externalDataConfiguration": { "autodetect": True, "sourceFormat": "CSV", - "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], "compression": "NONE", "ignoreUnknownValues": False, "csvOptions": { @@ -627,12 +627,12 @@ def test_source_objs_as_string_without_external_table_should_execute_successfull "autodetect": True, "createDisposition": "CREATE_IF_NEEDED", "destinationTable": { - "projectId": "test-project", - "datasetId": "dataset", + "projectId": PROJECT_ID, + "datasetId": DATASET, "tableId": "table", }, "sourceFormat": "CSV", - "sourceUris": ["gs://test-bucket/test/objects/test.csv"], + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], "writeDisposition": "WRITE_TRUNCATE", "ignoreUnknownValues": False, "schema": {"fields": SCHEMA_FIELDS}, @@ -654,8 +654,8 @@ def test_source_objs_as_string_without_external_table_should_execute_successfull hook.return_value.insert_job.assert_has_calls(calls) - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook") - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") + @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_schema_obj_external_table_should_execute_successfully(self, bq_hook, gcs_hook): bq_hook.return_value.insert_job.side_effect = [ MagicMock(job_id=pytest.real_job_id, error_result=False), @@ -678,33 +678,34 @@ def test_schema_obj_external_table_should_execute_successfully(self, bq_hook, gc operator.execute(context=MagicMock()) bq_hook.return_value.create_empty_table.assert_called_once_with( + exists_ok=True, + location=None, + project_id=PROJECT_ID, table_resource={ "tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, - "labels": None, - "description": None, + "labels": {}, "externalDataConfiguration": { - "source_uris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], - "source_format": "CSV", - "maxBadRecords": 0, "autodetect": True, + "sourceFormat": "CSV", + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], "compression": "NONE", + "ignoreUnknownValues": False, "csvOptions": { - "fieldDelimeter": ",", "skipLeadingRows": None, + "fieldDelimiter": ",", "quote": None, "allowQuotedNewlines": False, "allowJaggedRows": False, + "encoding": "UTF-8", }, + "schema": {"fields": SCHEMA_FIELDS}, }, - "location": None, - "encryptionConfiguration": None, - "schema": {"fields": SCHEMA_FIELDS}, - } + }, ) gcs_hook.return_value.download.assert_called_once_with(SCHEMA_BUCKET, SCHEMA_OBJECT) - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook") - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") + @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_schema_obj_without_external_table_should_execute_successfully(self, bq_hook, gcs_hook): bq_hook.return_value.insert_job.side_effect = [ MagicMock(job_id=pytest.real_job_id, error_result=False), @@ -730,28 +731,21 @@ def test_schema_obj_without_external_table_should_execute_successfully(self, bq_ calls = [ call( configuration={ - "load": dict( - autodetect=True, - createDisposition="CREATE_IF_NEEDED", - destinationTable={"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, - destinationTableProperties={ - "description": None, - "labels": None, - }, - sourceFormat="CSV", - skipLeadingRows=None, - sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], - writeDisposition=WRITE_DISPOSITION, - ignoreUnknownValues=False, - allowQuotedNewlines=False, - encoding="UTF-8", - schema={"fields": SCHEMA_FIELDS}, - allowJaggedRows=False, - fieldDelimiter=",", - maxBadRecords=0, - quote=None, - schemaUpdateOptions=(), - ), + "load": { + "autodetect": True, + "createDisposition": "CREATE_IF_NEEDED", + "destinationTable": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, + "sourceFormat": "CSV", + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], + "writeDisposition": "WRITE_TRUNCATE", + "ignoreUnknownValues": False, + "schema": {"fields": SCHEMA_FIELDS}, + "skipLeadingRows": None, + "fieldDelimiter": ",", + "quote": None, + "allowQuotedNewlines": False, + "encoding": "UTF-8", + } }, project_id=bq_hook.return_value.project_id, location=None, @@ -759,195 +753,12 @@ def test_schema_obj_without_external_table_should_execute_successfully(self, bq_ timeout=None, retry=DEFAULT_RETRY, nowait=True, - ), + ) ] bq_hook.return_value.insert_job.assert_has_calls(calls) gcs_hook.return_value.download.assert_called_once_with(SCHEMA_BUCKET, SCHEMA_OBJECT) - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") - def test_all_fields_should_be_present(self, hook): - hook.return_value.insert_job.side_effect = [ - MagicMock(job_id=pytest.real_job_id, error_result=False), - pytest.real_job_id, - ] - hook.return_value.generate_job_id.return_value = pytest.real_job_id - hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS, - schema_fields=SCHEMA_FIELDS, - destination_project_dataset_table=TEST_EXPLICIT_DEST, - write_disposition=WRITE_DISPOSITION, - external_table=False, - field_delimiter=";", - max_bad_records=13, - quote_character="|", - schema_update_options={"foo": "bar"}, - allow_jagged_rows=True, - encryption_configuration={"bar": "baz"}, - cluster_fields=["field_1", "field_2"], - ) - - operator.execute(context=MagicMock()) - - calls = [ - call( - configuration={ - "load": dict( - autodetect=True, - createDisposition="CREATE_IF_NEEDED", - destinationTable={"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, - destinationTableProperties={ - "description": None, - "labels": None, - }, - sourceFormat="CSV", - skipLeadingRows=None, - sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], - writeDisposition=WRITE_DISPOSITION, - ignoreUnknownValues=False, - allowQuotedNewlines=False, - encoding="UTF-8", - schema={"fields": SCHEMA_FIELDS}, - allowJaggedRows=True, - fieldDelimiter=";", - maxBadRecords=13, - quote="|", - schemaUpdateOptions={"foo": "bar"}, - destinationEncryptionConfiguration={"bar": "baz"}, - clustering={"fields": ["field_1", "field_2"]}, - ), - }, - project_id=hook.return_value.project_id, - location=None, - job_id=pytest.real_job_id, - timeout=None, - retry=DEFAULT_RETRY, - nowait=True, - ), - ] - - hook.return_value.insert_job.assert_has_calls(calls) - - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") - def test_date_partitioned_explicit_setting_should_be_found(self, hook): - hook.return_value.insert_job.side_effect = [ - MagicMock(job_id=pytest.real_job_id, error_result=False), - pytest.real_job_id, - ] - hook.return_value.generate_job_id.return_value = pytest.real_job_id - hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS, - schema_fields=SCHEMA_FIELDS, - destination_project_dataset_table=TEST_EXPLICIT_DEST, - write_disposition=WRITE_DISPOSITION, - external_table=False, - time_partitioning={"type": "DAY"}, - ) - - operator.execute(context=MagicMock()) - - calls = [ - call( - configuration={ - "load": dict( - autodetect=True, - createDisposition="CREATE_IF_NEEDED", - destinationTable={"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, - destinationTableProperties={ - "description": None, - "labels": None, - }, - sourceFormat="CSV", - skipLeadingRows=None, - sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], - writeDisposition=WRITE_DISPOSITION, - ignoreUnknownValues=False, - allowQuotedNewlines=False, - encoding="UTF-8", - schema={"fields": SCHEMA_FIELDS}, - allowJaggedRows=False, - fieldDelimiter=",", - maxBadRecords=0, - quote=None, - schemaUpdateOptions=(), - timePartitioning={"type": "DAY"}, - ), - }, - project_id=hook.return_value.project_id, - location=None, - job_id=pytest.real_job_id, - timeout=None, - retry=DEFAULT_RETRY, - nowait=True, - ), - ] - - hook.return_value.insert_job.assert_has_calls(calls) - - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") - def test_date_partitioned_implied_in_table_name_should_be_found(self, hook): - hook.return_value.insert_job.side_effect = [ - MagicMock(job_id=pytest.real_job_id, error_result=False), - pytest.real_job_id, - ] - hook.return_value.generate_job_id.return_value = pytest.real_job_id - hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS, - schema_fields=SCHEMA_FIELDS, - destination_project_dataset_table=TEST_EXPLICIT_DEST + "$20221123", - write_disposition=WRITE_DISPOSITION, - external_table=False, - ) - - operator.execute(context=MagicMock()) - - calls = [ - call( - configuration={ - "load": dict( - autodetect=True, - createDisposition="CREATE_IF_NEEDED", - destinationTable={"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, - destinationTableProperties={ - "description": None, - "labels": None, - }, - sourceFormat="CSV", - skipLeadingRows=None, - sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], - writeDisposition=WRITE_DISPOSITION, - ignoreUnknownValues=False, - allowQuotedNewlines=False, - encoding="UTF-8", - schema={"fields": SCHEMA_FIELDS}, - allowJaggedRows=False, - fieldDelimiter=",", - maxBadRecords=0, - quote=None, - schemaUpdateOptions=(), - timePartitioning={"type": "DAY"}, - ), - }, - project_id=hook.return_value.project_id, - location=None, - job_id=pytest.real_job_id, - timeout=None, - retry=DEFAULT_RETRY, - nowait=True, - ), - ] - - hook.return_value.insert_job.assert_has_calls(calls) - @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_execute_should_throw_ex_when_no_bucket_specified(self, hook): hook.return_value.insert_job.side_effect = [ @@ -1010,173 +821,8 @@ def test_execute_should_throw_ex_when_no_destination_project_dataset_table_speci ) operator.execute(context=MagicMock()) - @mock.patch(GCS_TO_BQ_PATH.format("GCSToBigQueryOperator._check_schema_fields")) - @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) - @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) - def test_schema_fields_scanner_external_table_should_execute_successfully( - self, bq_hook, gcs_hook, mocked_check_fileds - ): - """ - Check detection of schema fields if schema_fields parameter is not - specified and fields are read from source objects correctly by the operator - if all fields are characters. In this case operator searches for fields in source object - and update configuration with constructed schema_fields. - """ - bq_hook.return_value.insert_job.side_effect = [ - MagicMock(job_id=pytest.real_job_id, error_result=False), - pytest.real_job_id, - ] - bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id - bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - bq_hook.return_value.get_job.return_value.result.return_value = ("1",) - - gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna" - - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS, - destination_project_dataset_table=TEST_EXPLICIT_DEST, - write_disposition=WRITE_DISPOSITION, - external_table=True, - autodetect=True, - ) - - operator.execute(context=MagicMock()) - mocked_check_fileds.assert_called_once() - - @mock.patch(GCS_TO_BQ_PATH.format("GCSToBigQueryOperator._check_schema_fields")) - @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) - @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) - def test_schema_fields_scanner_should_not_called_on_json_ex_should_execute_successfully( - self, bq_hook, gcs_hook, mocked_check_fileds - ): - """ - Check not calling check_schema_fields method in case if input file is of JSON type on external table - """ - bq_hook.return_value.insert_job.side_effect = [ - MagicMock(job_id=pytest.real_job_id, error_result=False), - pytest.real_job_id, - ] - bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id - bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - bq_hook.return_value.get_job.return_value.result.return_value = ("1",) - - gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna" - - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS_JSON, - source_format="NEWLINE_DELIMITED_JSON", - destination_project_dataset_table=TEST_EXPLICIT_DEST, - write_disposition=WRITE_DISPOSITION, - external_table=True, - autodetect=True, - ) - - operator.execute(context=MagicMock()) - mocked_check_fileds.assert_not_called() - - @mock.patch(GCS_TO_BQ_PATH.format("GCSToBigQueryOperator._check_schema_fields")) - @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) - @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) - def test_schema_fields_scanner_should_not_called_on_json_should_execute_successfully( - self, bq_hook, gcs_hook, mocked_check_fileds - ): - """ - Check not calling check_schema_fields method in case if input file is of JSON type - """ - bq_hook.return_value.insert_job.side_effect = [ - MagicMock(job_id=pytest.real_job_id, error_result=False), - pytest.real_job_id, - ] - bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id - bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - bq_hook.return_value.get_job.return_value.result.return_value = ("1",) - - gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna" - - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS_JSON, - source_format="NEWLINE_DELIMITED_JSON", - destination_project_dataset_table=TEST_EXPLICIT_DEST, - write_disposition=WRITE_DISPOSITION, - external_table=False, - autodetect=True, - ) - - operator.execute(context=MagicMock()) - mocked_check_fileds.assert_not_called() - - @mock.patch(GCS_TO_BQ_PATH.format("GCSToBigQueryOperator._check_schema_fields")) - @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) - @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) - def test_schema_fields_scanner_without_external_table_should_execute_successfully( - self, bq_hook, gcs_hook, mocked_check_fileds - ): - """ - Check detection of schema fields if schema_fields parameter is not - specified and fields are read from source objects correctly by the operator - if all fields are characters. In this case operator searches for fields in source object - and update configuration with constructed schema_fields. - """ - bq_hook.return_value.insert_job.side_effect = [ - MagicMock(job_id=pytest.real_job_id, error_result=False), - pytest.real_job_id, - ] - bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id - bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - bq_hook.return_value.get_job.return_value.result.return_value = ("1",) - - gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna" - - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS, - destination_project_dataset_table=TEST_EXPLICIT_DEST, - write_disposition=WRITE_DISPOSITION, - external_table=False, - autodetect=True, - ) - - operator.execute(context=MagicMock()) - mocked_check_fileds.assert_called_once() - - @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) - def test_schema_fields_scanner_external_table_should_throw_ex_when_autodetect_not_specified( - self, - hook, - ): - hook.return_value.insert_job.side_effect = [ - MagicMock(job_id=pytest.real_job_id, error_result=False), - pytest.real_job_id, - ] - hook.return_value.generate_job_id.return_value = pytest.real_job_id - hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - hook.return_value.get_job.return_value.result.return_value = ("1",) - - with pytest.raises( - AirflowException, - match=r"Table schema was not found. Neither schema object nor schema fields were specified", - ): - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS, - destination_project_dataset_table=TEST_EXPLICIT_DEST, - max_id_key=MAX_ID_KEY, - write_disposition=WRITE_DISPOSITION, - external_table=True, - autodetect=False, - ) - operator.execute(context=MagicMock()) - @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) - def test_schema_fields_scanner_without_external_table_should_throw_ex_when_autodetect_not_specified( + def test_source_format_check_should_throw_ex_when_incorrect_source_type( self, hook, ): @@ -1189,8 +835,8 @@ def test_schema_fields_scanner_without_external_table_should_throw_ex_when_autod hook.return_value.get_job.return_value.result.return_value = ("1",) with pytest.raises( - AirflowException, - match=r"Table schema was not found. Neither schema object nor schema fields were specified", + ValueError, + match=r"is not a valid source format.", ): operator = GCSToBigQueryOperator( task_id=TASK_ID, @@ -1201,6 +847,7 @@ def test_schema_fields_scanner_without_external_table_should_throw_ex_when_autod write_disposition=WRITE_DISPOSITION, external_table=False, autodetect=False, + source_format="incorrect", ) operator.execute(context=MagicMock()) @@ -1247,7 +894,7 @@ def test_schema_fields_integer_scanner_external_table_should_execute_successfull "externalDataConfiguration": { "autodetect": True, "sourceFormat": "CSV", - "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], "compression": "NONE", "ignoreUnknownValues": False, "csvOptions": { @@ -1313,7 +960,7 @@ def test_schema_fields_integer_scanner_without_external_table_should_execute_suc "createDisposition": "CREATE_IF_NEEDED", "destinationTable": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, "sourceFormat": "CSV", - "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], "writeDisposition": WRITE_DISPOSITION, "ignoreUnknownValues": False, "skipLeadingRows": None, @@ -1374,7 +1021,7 @@ def test_schema_fields_without_external_table_should_execute_successfully(self, "createDisposition": "CREATE_IF_NEEDED", "destinationTable": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, "sourceFormat": "CSV", - "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], "writeDisposition": WRITE_DISPOSITION, "ignoreUnknownValues": False, "schema": {"fields": SCHEMA_FIELDS_INT}, @@ -1428,7 +1075,7 @@ def test_schema_fields_external_table_should_execute_successfully(self, hook): "externalDataConfiguration": { "autodetect": True, "sourceFormat": "CSV", - "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], "compression": "NONE", "ignoreUnknownValues": False, "csvOptions": { @@ -1688,7 +1335,7 @@ def test_schema_fields_without_external_table_async_should_execute_successfully( }, sourceFormat="CSV", skipLeadingRows=None, - sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], writeDisposition=WRITE_DISPOSITION, ignoreUnknownValues=False, allowQuotedNewlines=False, @@ -1764,7 +1411,7 @@ def test_schema_fields_int_without_external_table_async_should_execute_successfu }, sourceFormat="CSV", skipLeadingRows=None, - sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], + sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], writeDisposition=WRITE_DISPOSITION, ignoreUnknownValues=False, allowQuotedNewlines=False, @@ -1792,109 +1439,6 @@ def test_schema_fields_int_without_external_table_async_should_execute_successfu bq_hook.return_value.insert_job.assert_has_calls(calls) - @mock.patch(GCS_TO_BQ_PATH.format("GCSToBigQueryOperator._check_schema_fields")) - @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) - @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) - def test_async_schema_fields_scanner_external_table_should_execute_successfully( - self, bq_hook, gcs_hook, mocked_check_fileds - ): - """ - Check detection of schema fields if schema_fields parameter is not - specified and fields are read from source objects correctly by the operator - if all fields are characters. In this case operator searches for fields in source object - and update configuration with constructed schema_fields. - """ - bq_hook.return_value.insert_job.side_effect = [ - MagicMock(job_id=pytest.real_job_id, error_result=False), - pytest.real_job_id, - ] - bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id - bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - bq_hook.return_value.get_job.return_value.result.return_value = ("1",) - - gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna" - - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS, - destination_project_dataset_table=TEST_EXPLICIT_DEST, - write_disposition=WRITE_DISPOSITION, - external_table=False, - autodetect=True, - deferrable=True, - ) - with pytest.raises(TaskDeferred): - operator.execute(context=MagicMock()) - mocked_check_fileds.assert_called_once() - - @mock.patch(GCS_TO_BQ_PATH.format("GCSToBigQueryOperator._check_schema_fields")) - @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) - @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) - def test_schema_fields_scanner_should_not_called_on_json_ex_should_execute_successfully( - self, bq_hook, gcs_hook, mocked_check_fileds - ): - """ - Check not calling check_schema_fields method in case if input file is of JSON type on external table - """ - bq_hook.return_value.insert_job.side_effect = [ - MagicMock(job_id=pytest.real_job_id, error_result=False), - pytest.real_job_id, - ] - bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id - bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - bq_hook.return_value.get_job.return_value.result.return_value = ("1",) - - gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna" - - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS_JSON, - source_format="NEWLINE_DELIMITED_JSON", - destination_project_dataset_table=TEST_EXPLICIT_DEST, - write_disposition=WRITE_DISPOSITION, - external_table=True, - autodetect=True, - deferrable=True, - ) - operator.execute(context=MagicMock()) - mocked_check_fileds.assert_not_called() - - @mock.patch(GCS_TO_BQ_PATH.format("GCSToBigQueryOperator._check_schema_fields")) - @mock.patch(GCS_TO_BQ_PATH.format("GCSHook")) - @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) - def test_schema_fields_scanner_should_not_called_on_json_should_execute_successfully( - self, bq_hook, gcs_hook, mocked_check_fileds - ): - """ - Check not calling check_schema_fields method in case if input file is of JSON type - """ - bq_hook.return_value.insert_job.side_effect = [ - MagicMock(job_id=pytest.real_job_id, error_result=False), - pytest.real_job_id, - ] - bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id - bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) - bq_hook.return_value.get_job.return_value.result.return_value = ("1",) - - gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna" - - operator = GCSToBigQueryOperator( - task_id=TASK_ID, - bucket=TEST_BUCKET, - source_objects=TEST_SOURCE_OBJECTS_JSON, - source_format="NEWLINE_DELIMITED_JSON", - destination_project_dataset_table=TEST_EXPLICIT_DEST, - write_disposition=WRITE_DISPOSITION, - external_table=False, - autodetect=True, - deferrable=True, - ) - with pytest.raises(TaskDeferred): - operator.execute(context=MagicMock()) - mocked_check_fileds.assert_not_called() - def create_context(self, task): dag = DAG(dag_id="dag") logical_date = datetime(2022, 1, 1, 0, 0, 0) diff --git a/tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery_async.py b/tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery_async.py index 2f66057747e97..5888056456bc3 100644 --- a/tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery_async.py +++ b/tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery_async.py @@ -37,8 +37,12 @@ DATASET_NAME_STR = f"dataset_{DAG_ID}_{ENV_ID}_STR" DATASET_NAME_DATE = f"dataset_{DAG_ID}_{ENV_ID}_DATE" +DATASET_NAME_JSON = f"dataset_{DAG_ID}_{ENV_ID}_JSON" +DATASET_NAME_DELIMITER = f"dataset_{DAG_ID}_{ENV_ID}_DELIMITER" TABLE_NAME_STR = "test_str" TABLE_NAME_DATE = "test_date" +TABLE_NAME_JSON = "test_json" +TABLE_NAME_DELIMITER = "test_delimiter" MAX_ID_STR = "name" MAX_ID_DATE = "date" @@ -49,14 +53,24 @@ catchup=False, tags=["example", "gcs"], ) as dag: - create_test_dataset_for_string_fileds = BigQueryCreateEmptyDatasetOperator( + create_test_dataset_for_string_fields = BigQueryCreateEmptyDatasetOperator( task_id="create_airflow_test_dataset_str", dataset_id=DATASET_NAME_STR, project_id=PROJECT_ID ) - create_test_dataset_for_date_fileds = BigQueryCreateEmptyDatasetOperator( + create_test_dataset_for_date_fields = BigQueryCreateEmptyDatasetOperator( task_id="create_airflow_test_dataset_date", dataset_id=DATASET_NAME_DATE, project_id=PROJECT_ID ) + create_test_dataset_for_json_fields = BigQueryCreateEmptyDatasetOperator( + task_id="create_airflow_test_dataset_json", dataset_id=DATASET_NAME_JSON, project_id=PROJECT_ID + ) + + create_test_dataset_for_delimiter_fields = BigQueryCreateEmptyDatasetOperator( + task_id="create_airflow_test_dataset_delimiter", + dataset_id=DATASET_NAME_DELIMITER, + project_id=PROJECT_ID, + ) + # [START howto_operator_gcs_to_bigquery_async] load_string_based_csv = GCSToBigQueryOperator( task_id="gcs_to_bigquery_example_str_csv_async", @@ -87,14 +101,28 @@ bucket="cloud-samples-data", source_objects=["bigquery/us-states/us-states.json"], source_format="NEWLINE_DELIMITED_JSON", - destination_project_dataset_table=f"{DATASET_NAME_DATE}.{TABLE_NAME_DATE}", + destination_project_dataset_table=f"{DATASET_NAME_JSON}.{TABLE_NAME_JSON}", write_disposition="WRITE_TRUNCATE", external_table=False, autodetect=True, - max_id_key=MAX_ID_DATE, + max_id_key=MAX_ID_STR, deferrable=True, ) + load_csv_delimiter = GCSToBigQueryOperator( + task_id="gcs_to_bigquery_example_delimiter_async", + bucket="big-query-samples", + source_objects=["employees-tabular.csv"], + source_format="csv", + destination_project_dataset_table=f"{DATASET_NAME_DELIMITER}.{TABLE_NAME_DELIMITER}", + write_disposition="WRITE_TRUNCATE", + external_table=False, + autodetect=True, + field_delimiter="\t", + quote_character="", + max_id_key=MAX_ID_STR, + deferrable=True, + ) # [END howto_operator_gcs_to_bigquery_async] delete_test_dataset_str = BigQueryDeleteDatasetOperator( @@ -111,17 +139,36 @@ trigger_rule=TriggerRule.ALL_DONE, ) + delete_test_dataset_json = BigQueryDeleteDatasetOperator( + task_id="delete_airflow_test_json_dataset", + dataset_id=DATASET_NAME_JSON, + delete_contents=True, + trigger_rule=TriggerRule.ALL_DONE, + ) + + delete_test_dataset_delimiter = BigQueryDeleteDatasetOperator( + task_id="delete_airflow_test_delimiter", + dataset_id=DATASET_NAME_JSON, + delete_contents=True, + trigger_rule=TriggerRule.ALL_DONE, + ) + ( # TEST SETUP - create_test_dataset_for_string_fileds - >> create_test_dataset_for_date_fileds + create_test_dataset_for_string_fields + >> create_test_dataset_for_date_fields + >> create_test_dataset_for_json_fields + >> create_test_dataset_for_delimiter_fields # TEST BODY >> load_string_based_csv >> load_date_based_csv >> load_json + >> load_csv_delimiter # TEST TEARDOWN >> delete_test_dataset_str >> delete_test_dataset_date + >> delete_test_dataset_json + >> delete_test_dataset_delimiter ) from tests.system.utils.watcher import watcher