From ef65719bab6e6c5e693e6a5aa21ce2afca3b823a Mon Sep 17 00:00:00 2001 From: Maksim Moiseenkov Date: Tue, 22 Oct 2024 10:34:14 +0000 Subject: [PATCH] Unify reattach_states parameter logic across BigQuery operators --- .../google/cloud/transfers/bigquery_to_gcs.py | 16 ++++++++++------ .../google/cloud/transfers/gcs_to_bigquery.py | 16 ++++++++++------ .../cloud/transfers/test_gcs_to_bigquery.py | 2 -- 3 files changed, 20 insertions(+), 14 deletions(-) diff --git a/providers/src/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py b/providers/src/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py index 93ef12dcbd49..e2588b8976e3 100644 --- a/providers/src/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py +++ b/providers/src/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py @@ -231,17 +231,21 @@ def execute(self, context: Context): location=self.location, job_id=self.job_id, ) - if job.state in self.reattach_states: - # We are reattaching to a job - job.result(timeout=self.result_timeout, retry=self.result_retry) - self._handle_job_error(job) - else: - # Same job configuration so we need force_rerun + if job.state not in self.reattach_states: + # Same job configuration, so we need force_rerun raise AirflowException( f"Job with id: {self.job_id} already exists and is in {job.state} state. If you " f"want to force rerun it consider setting `force_rerun=True`." f"Or, if you want to reattach in this scenario add {job.state} to `reattach_states`" ) + else: + # Job already reached state DONE + if job.state == "DONE": + raise AirflowException("Job is already in state DONE. Can not reattach to this job.") + + # We are reattaching to a job + self.log.info("Reattaching to existing Job in state %s", job.state) + self._handle_job_error(job) self.job_id = job.job_id conf = job.to_api_repr()["configuration"]["extract"]["sourceTable"] diff --git a/providers/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py b/providers/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py index 451de3fa4c07..3fe4ccf23107 100644 --- a/providers/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py +++ b/providers/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py @@ -391,17 +391,21 @@ def execute(self, context: Context): location=self.location, job_id=job_id, ) - if job.state in self.reattach_states: - # We are reattaching to a job - job._begin() - self._handle_job_error(job) - else: - # Same job configuration so we need force_rerun + if job.state not in self.reattach_states: + # Same job configuration, so we need force_rerun raise AirflowException( f"Job with id: {job_id} already exists and is in {job.state} state. If you " f"want to force rerun it consider setting `force_rerun=True`." f"Or, if you want to reattach in this scenario add {job.state} to `reattach_states`" ) + else: + # Job already reached state DONE + if job.state == "DONE": + raise AirflowException("Job is already in state DONE. Can not reattach to this job.") + + # We are reattaching to a job + self.log.info("Reattaching to existing Job in state %s", job.state) + self._handle_job_error(job) job_types = { LoadJob._JOB_TYPE: ["sourceTable", "destinationTable"], diff --git a/providers/tests/google/cloud/transfers/test_gcs_to_bigquery.py b/providers/tests/google/cloud/transfers/test_gcs_to_bigquery.py index 05ef254cb43d..1831f9c8cb93 100644 --- a/providers/tests/google/cloud/transfers/test_gcs_to_bigquery.py +++ b/providers/tests/google/cloud/transfers/test_gcs_to_bigquery.py @@ -1730,8 +1730,6 @@ def test_execute_without_external_table_reattach_async_should_execute_successful project_id=JOB_PROJECT_ID, ) - job._begin.assert_called_once_with() - @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) def test_execute_without_external_table_force_rerun_async_should_execute_successfully(self, hook): hook.return_value.generate_job_id.return_value = f"{job_id}_{hash_}"