Skip to content

Commit

Permalink
Unify reattach_states parameter logic across BigQuery operators
Browse files Browse the repository at this point in the history
  • Loading branch information
moiseenkov committed Oct 22, 2024
1 parent ae90a8f commit ef65719
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,17 +231,21 @@ def execute(self, context: Context):
location=self.location,
job_id=self.job_id,
)
if job.state in self.reattach_states:
# We are reattaching to a job
job.result(timeout=self.result_timeout, retry=self.result_retry)
self._handle_job_error(job)
else:
# Same job configuration so we need force_rerun
if job.state not in self.reattach_states:
# Same job configuration, so we need force_rerun
raise AirflowException(
f"Job with id: {self.job_id} already exists and is in {job.state} state. If you "
f"want to force rerun it consider setting `force_rerun=True`."
f"Or, if you want to reattach in this scenario add {job.state} to `reattach_states`"
)
else:
# Job already reached state DONE
if job.state == "DONE":
raise AirflowException("Job is already in state DONE. Can not reattach to this job.")

# We are reattaching to a job
self.log.info("Reattaching to existing Job in state %s", job.state)
self._handle_job_error(job)

self.job_id = job.job_id
conf = job.to_api_repr()["configuration"]["extract"]["sourceTable"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,17 +391,21 @@ def execute(self, context: Context):
location=self.location,
job_id=job_id,
)
if job.state in self.reattach_states:
# We are reattaching to a job
job._begin()
self._handle_job_error(job)
else:
# Same job configuration so we need force_rerun
if job.state not in self.reattach_states:
# Same job configuration, so we need force_rerun
raise AirflowException(
f"Job with id: {job_id} already exists and is in {job.state} state. If you "
f"want to force rerun it consider setting `force_rerun=True`."
f"Or, if you want to reattach in this scenario add {job.state} to `reattach_states`"
)
else:
# Job already reached state DONE
if job.state == "DONE":
raise AirflowException("Job is already in state DONE. Can not reattach to this job.")

# We are reattaching to a job
self.log.info("Reattaching to existing Job in state %s", job.state)
self._handle_job_error(job)

job_types = {
LoadJob._JOB_TYPE: ["sourceTable", "destinationTable"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1730,8 +1730,6 @@ def test_execute_without_external_table_reattach_async_should_execute_successful
project_id=JOB_PROJECT_ID,
)

job._begin.assert_called_once_with()

@mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_execute_without_external_table_force_rerun_async_should_execute_successfully(self, hook):
hook.return_value.generate_job_id.return_value = f"{job_id}_{hash_}"
Expand Down

0 comments on commit ef65719

Please sign in to comment.