Skip to content

Commit

Permalink
Use thin/passthrough hook instead of one-liner hook method (#29252)
Browse files Browse the repository at this point in the history
Use the hook to get a boto connection and make the api call from
operator method. Defining one-liner hook methods that are just calling
boto are not ideal.
  • Loading branch information
o-nikolas authored Jan 31, 2023
1 parent 6282567 commit aacf4da
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 11 deletions.
9 changes: 0 additions & 9 deletions airflow/providers/amazon/aws/hooks/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,6 @@ def add_job_flow_steps(
)
return response["StepIds"]

def terminate_job_flow(self, job_flow_id: str) -> None:
"""
Terminate a given EMR cluster (job flow) by id. If TerminationProtected=True on the cluster,
termination will be unsuccessful.
:param job_flow_id: id of the job flow to terminate
"""
self.get_conn().terminate_job_flows(JobFlowIds=[job_flow_id])

def test_connection(self):
"""
Return failed state for test Amazon Elastic MapReduce Connection (untestable).
Expand Down
7 changes: 5 additions & 2 deletions airflow/providers/amazon/aws/operators/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,10 +611,13 @@ def execute(self, context: Context) -> str | None:
return self._job_flow_id

def on_kill(self) -> None:
"""Terminate job flow."""
"""
Terminate the EMR cluster (job flow). If TerminationProtected=True on the cluster,
termination will be unsuccessful.
"""
if self._job_flow_id:
self.log.info("Terminating job flow %s", self._job_flow_id)
self._emr_hook.terminate_job_flow(self._job_flow_id)
self._emr_hook.conn.terminate_job_flows(JobFlowIds=[self._job_flow_id])


class EmrModifyClusterOperator(BaseOperator):
Expand Down

0 comments on commit aacf4da

Please sign in to comment.