From d2540a9ffbf65c38fae26e5f931f8f3e03d21abf Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Wed, 8 Jun 2022 17:16:43 +0200 Subject: [PATCH] Workaround job race bug on biguery to gcs transfer Fixes: #24277 --- airflow/providers/google/cloud/hooks/bigquery.py | 6 +++++- .../providers/google/cloud/transfers/bigquery_to_gcs.py | 7 +++---- .../google/cloud/transfers/test_bigquery_to_gcs.py | 1 + 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/airflow/providers/google/cloud/hooks/bigquery.py b/airflow/providers/google/cloud/hooks/bigquery.py index 70795efd6530d..9edde76f3c6fc 100644 --- a/airflow/providers/google/cloud/hooks/bigquery.py +++ b/airflow/providers/google/cloud/hooks/bigquery.py @@ -1894,7 +1894,8 @@ def run_extract( field_delimiter: str = ',', print_header: bool = True, labels: Optional[Dict] = None, - ) -> str: + return_full_job: bool = False, + ) -> Union[str, BigQueryJob]: """ Executes a BigQuery extract command to copy data from BigQuery to Google Cloud Storage. See here: @@ -1915,6 +1916,7 @@ def run_extract( :param print_header: Whether to print a header for a CSV file extract. :param labels: a dictionary containing labels for the job/query, passed to BigQuery + :param return_full_job: return full job instead of job id only """ warnings.warn( "This method is deprecated. Please use `BigQueryHook.insert_job` method.", DeprecationWarning @@ -1953,6 +1955,8 @@ def run_extract( job = self.insert_job(configuration=configuration, project_id=self.project_id) self.running_job_id = job.job_id + if return_full_job: + return job return job.job_id def run_query( diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py index 09ac190e0f269..fbae5a5b77933 100644 --- a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py @@ -19,7 +19,7 @@ from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union from airflow.models import BaseOperator -from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook +from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook, BigQueryJob from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink if TYPE_CHECKING: @@ -115,7 +115,7 @@ def execute(self, context: 'Context'): location=self.location, impersonation_chain=self.impersonation_chain, ) - job_id = hook.run_extract( + job: BigQueryJob = hook.run_extract( source_project_dataset_table=self.source_project_dataset_table, destination_cloud_storage_uris=self.destination_cloud_storage_uris, compression=self.compression, @@ -123,9 +123,8 @@ def execute(self, context: 'Context'): field_delimiter=self.field_delimiter, print_header=self.print_header, labels=self.labels, + return_full_job=True, ) - - job = hook.get_job(job_id=job_id).to_api_repr() conf = job["configuration"]["extract"]["sourceTable"] dataset_id, project_id, table_id = conf["datasetId"], conf["projectId"], conf["tableId"] BigQueryTableLink.persist( diff --git a/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py b/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py index 5ed9b660310ab..b627d3672c6a6 100644 --- a/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py @@ -59,4 +59,5 @@ def test_execute(self, mock_hook): field_delimiter=field_delimiter, print_header=print_header, labels=labels, + return_full_job=True, )