From a6dd1feed9486de17cfd4194077218be7155c4bb Mon Sep 17 00:00:00 2001 From: Greg Leclercq Date: Tue, 8 Jul 2014 23:20:15 +0200 Subject: [PATCH] Update swf.Executor: call .on_failure() when the workflow fails --- simpleflow/swf/executor.py | 21 +++++++++++++++++-- tests/test_dataflow.py | 41 +++++++++++++++++++++++++++++++++++++- 2 files changed, 59 insertions(+), 3 deletions(-) diff --git a/simpleflow/swf/executor.py b/simpleflow/swf/executor.py index 596bd1350..cebc0cf70 100644 --- a/simpleflow/swf/executor.py +++ b/simpleflow/swf/executor.py @@ -147,15 +147,22 @@ def replay(self, history): reason = 'Workflow execution error: "{}"'.format( err.exception.reason) logger.exception(reason) + + details = err.exception.details + self.on_failure(reason, details) + decision = swf.models.decision.WorkflowExecutionDecision() decision.fail( reason=reason, - details=err.exception.details) + details=details) return [decision], {} except Exception, err: reason = 'Cannot replay the workflow "{}"'.format(err) logger.exception(reason) + + self.on_failure(reason) + decision = swf.models.decision.WorkflowExecutionDecision() decision.fail(reason=reason) @@ -166,9 +173,19 @@ def replay(self, history): return [decision], {} + def on_failure(self, reason, details=None): + try: + self._workflow.on_failure(self._history, reason, details) + except NotImplementedError: + pass + def fail(self, reason, details=None): + self.on_failure(reason, details) + decision = swf.models.decision.WorkflowExecutionDecision() - decision.fail(reason=reason, details=details) + decision.fail( + reason='Workflow execution failed: {}'.format(reason), + details=details) self._decisions.append(decision) raise exceptions.ExecutionBlocked('workflow execution failed') diff --git a/tests/test_dataflow.py b/tests/test_dataflow.py index 1b403dd87..efb214fce 100644 --- a/tests/test_dataflow.py +++ b/tests/test_dataflow.py @@ -680,7 +680,7 @@ def test_workflow_failed_from_definition(): # fail the whole workflow. decisions, _ = executor.replay(history) workflow_failed = swf.models.decision.WorkflowExecutionDecision() - workflow_failed.fail(reason='error') + workflow_failed.fail(reason='Workflow execution failed: error') assert decisions[0] == workflow_failed @@ -725,3 +725,42 @@ def test_workflow_activity_raises_on_failure(): reason='Workflow execution error: "error"') assert decisions[0] == workflow_failed + + +class TestOnFailureDefinition(TestWorkflow): + failed = False + + def on_failure(self, history, reason, details=None): + self.failed = True + + def run(self): + if self.submit(raise_error).exception: + self.fail('FAIL') + + +def test_on_failure_callback(): + workflow = TestOnFailureDefinition + executor = Executor(DOMAIN, workflow) + history = builder.History(workflow) + + history.add_activity_task( + raise_error, + decision_id=history.last_id, + activity_id='activity-tests.test_dataflow.raise_error-1', + last_state='failed', + reason='error') + + (history + .add_decision_task_scheduled() + .add_decision_task_started()) + + # The executor should fail the workflow and extract the reason from the + # exception raised in the workflow definition. + decisions, _ = executor.replay(history) + assert executor._workflow.failed is True + + workflow_failed = swf.models.decision.WorkflowExecutionDecision() + workflow_failed.fail( + reason='Workflow execution failed: FAIL') + + assert decisions[0] == workflow_failed