Skip to content

Commit

Permalink
Early delete a Dataproc cluster if started in the ERROR state. (#33668)
Browse files Browse the repository at this point in the history
* Early delete a Dataproc cluster if started in the ERROR state.

Update airflow/providers/google/cloud/operators/dataproc.py

Co-authored-by: Alex Cazacu <[email protected]>

* fixing up logging of exceptions

---------

Co-authored-by: Alex Cazacu <[email protected]>
  • Loading branch information
kristopherkane and acazacu authored Aug 30, 2023
1 parent 075afe5 commit d361761
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 16 deletions.
22 changes: 21 additions & 1 deletion airflow/providers/google/cloud/operators/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,13 +595,17 @@ def _handle_error_state(self, hook: DataprocHook, cluster: Cluster) -> None:
if cluster.status.state != cluster.status.State.ERROR:
return
self.log.info("Cluster is in ERROR state")
self.log.info("Gathering diagnostic information.")
gcs_uri = hook.diagnose_cluster(
region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
)
self.log.info("Diagnostic information for cluster %s available at: %s", self.cluster_name, gcs_uri)
if self.delete_on_error:
self._delete_cluster(hook)
raise AirflowException("Cluster was created but was in ERROR state.")
# The delete op is asynchronous and can cause further failure if the cluster finishes
# deleting between catching AlreadyExists and checking state
self._wait_for_cluster_in_deleting_state(hook)
raise AirflowException("Cluster was created in an ERROR state then deleted.")
raise AirflowException("Cluster was created but is in ERROR state")

def _wait_for_cluster_in_deleting_state(self, hook: DataprocHook) -> None:
Expand Down Expand Up @@ -668,6 +672,22 @@ def execute(self, context: Context) -> dict:
raise
self.log.info("Cluster already exists.")
cluster = self._get_cluster(hook)
except AirflowException as ae:
# There still could be a cluster created here in an ERROR state which
# should be deleted immediately rather than consuming another retry attempt
# (assuming delete_on_error is true (default))
# This reduces overall the number of task attempts from 3 to 2 to successful cluster creation
# assuming the underlying GCE issues have resolved within that window. Users can configure
# a higher number of retry attempts in powers of two with 30s-60s wait interval
try:
cluster = self._get_cluster(hook)
self._handle_error_state(hook, cluster)
except AirflowException as ae_inner:
# We could get any number of failures here, including cluster not found and we
# can just ignore to ensure we surface the original cluster create failure
self.log.error(ae_inner, exc_info=True)
finally:
raise ae

# Check if cluster is not in ERROR state
self._handle_error_state(hook, cluster)
Expand Down
37 changes: 22 additions & 15 deletions tests/providers/google/cloud/operators/test_dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,13 +619,16 @@ def test_execute_if_cluster_exists_do_not_use(self, mock_hook):
with pytest.raises(AlreadyExists):
op.execute(context=self.mock_context)

@mock.patch(DATAPROC_PATH.format("DataprocCreateClusterOperator._wait_for_cluster_in_deleting_state"))
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
def test_execute_if_cluster_exists_in_error_state(self, mock_hook):
def test_execute_if_cluster_exists_in_error_state(self, mock_hook, mock_wait_for_deleting):
mock_hook.return_value.create_cluster.side_effect = [AlreadyExists("test")]
cluster_status = mock_hook.return_value.get_cluster.return_value.status
cluster_status.state = 0
cluster_status.State.ERROR = 0

mock_wait_for_deleting.return_value.get_cluster.side_effect = [NotFound]

op = DataprocCreateClusterOperator(
task_id=TASK_ID,
region=GCP_REGION,
Expand All @@ -650,24 +653,30 @@ def test_execute_if_cluster_exists_in_error_state(self, mock_hook):
region=GCP_REGION, project_id=GCP_PROJECT, cluster_name=CLUSTER_NAME
)

@mock.patch(DATAPROC_PATH.format("Cluster.to_dict"))
@mock.patch(DATAPROC_PATH.format("exponential_sleep_generator"))
@mock.patch(DATAPROC_PATH.format("DataprocCreateClusterOperator._create_cluster"))
@mock.patch(DATAPROC_PATH.format("DataprocCreateClusterOperator._get_cluster"))
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
def test_execute_if_cluster_exists_in_deleting_state(
self, mock_hook, mock_get_cluster, mock_create_cluster, mock_generator
self,
mock_hook,
mock_get_cluster,
mock_create_cluster,
mock_generator,
to_dict_mock,
):
cluster = mock.MagicMock()
cluster.status.state = 0
cluster.status.State.DELETING = 0
cluster_deleting = mock.MagicMock()
cluster_deleting.status.state = 0
cluster_deleting.status.State.DELETING = 0

cluster2 = mock.MagicMock()
cluster2.status.state = 0
cluster2.status.State.ERROR = 0
cluster_running = mock.MagicMock()
cluster_running.status.state = 0
cluster_running.status.State.RUNNING = 0

mock_create_cluster.side_effect = [AlreadyExists("test"), cluster2]
mock_create_cluster.side_effect = [AlreadyExists("test"), cluster_running]
mock_generator.return_value = [0]
mock_get_cluster.side_effect = [cluster, NotFound("test")]
mock_get_cluster.side_effect = [cluster_deleting, NotFound("test")]

op = DataprocCreateClusterOperator(
task_id=TASK_ID,
Expand All @@ -679,15 +688,13 @@ def test_execute_if_cluster_exists_in_deleting_state(
delete_on_error=True,
gcp_conn_id=GCP_CONN_ID,
)
with pytest.raises(AirflowException):
op.execute(context=self.mock_context)

op.execute(context=self.mock_context)
calls = [mock.call(mock_hook.return_value), mock.call(mock_hook.return_value)]
mock_get_cluster.assert_has_calls(calls)
mock_create_cluster.assert_has_calls(calls)
mock_hook.return_value.diagnose_cluster.assert_called_once_with(
region=GCP_REGION, project_id=GCP_PROJECT, cluster_name=CLUSTER_NAME
)

to_dict_mock.assert_called_once_with(cluster_running)

@mock.patch(DATAPROC_PATH.format("DataprocHook"))
@mock.patch(DATAPROC_TRIGGERS_PATH.format("DataprocAsyncHook"))
Expand Down

0 comments on commit d361761

Please sign in to comment.