Skip to content

Commit

Permalink
Update swf.Executor: call .on_failure() when the workflow fails
Browse files Browse the repository at this point in the history
  • Loading branch information
Greg Leclercq committed Jul 16, 2014
1 parent b70d3c2 commit a6dd1fe
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 3 deletions.
21 changes: 19 additions & 2 deletions simpleflow/swf/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,22 @@ def replay(self, history):
reason = 'Workflow execution error: "{}"'.format(
err.exception.reason)
logger.exception(reason)

details = err.exception.details
self.on_failure(reason, details)

decision = swf.models.decision.WorkflowExecutionDecision()
decision.fail(
reason=reason,
details=err.exception.details)
details=details)
return [decision], {}

except Exception, err:
reason = 'Cannot replay the workflow "{}"'.format(err)
logger.exception(reason)

self.on_failure(reason)

decision = swf.models.decision.WorkflowExecutionDecision()
decision.fail(reason=reason)

Expand All @@ -166,9 +173,19 @@ def replay(self, history):

return [decision], {}

def on_failure(self, reason, details=None):
try:
self._workflow.on_failure(self._history, reason, details)
except NotImplementedError:
pass

def fail(self, reason, details=None):
self.on_failure(reason, details)

decision = swf.models.decision.WorkflowExecutionDecision()
decision.fail(reason=reason, details=details)
decision.fail(
reason='Workflow execution failed: {}'.format(reason),
details=details)

self._decisions.append(decision)
raise exceptions.ExecutionBlocked('workflow execution failed')
41 changes: 40 additions & 1 deletion tests/test_dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ def test_workflow_failed_from_definition():
# fail the whole workflow.
decisions, _ = executor.replay(history)
workflow_failed = swf.models.decision.WorkflowExecutionDecision()
workflow_failed.fail(reason='error')
workflow_failed.fail(reason='Workflow execution failed: error')

assert decisions[0] == workflow_failed

Expand Down Expand Up @@ -725,3 +725,42 @@ def test_workflow_activity_raises_on_failure():
reason='Workflow execution error: "error"')

assert decisions[0] == workflow_failed


class TestOnFailureDefinition(TestWorkflow):
failed = False

def on_failure(self, history, reason, details=None):
self.failed = True

def run(self):
if self.submit(raise_error).exception:
self.fail('FAIL')


def test_on_failure_callback():
workflow = TestOnFailureDefinition
executor = Executor(DOMAIN, workflow)
history = builder.History(workflow)

history.add_activity_task(
raise_error,
decision_id=history.last_id,
activity_id='activity-tests.test_dataflow.raise_error-1',
last_state='failed',
reason='error')

(history
.add_decision_task_scheduled()
.add_decision_task_started())

# The executor should fail the workflow and extract the reason from the
# exception raised in the workflow definition.
decisions, _ = executor.replay(history)
assert executor._workflow.failed is True

workflow_failed = swf.models.decision.WorkflowExecutionDecision()
workflow_failed.fail(
reason='Workflow execution failed: FAIL')

assert decisions[0] == workflow_failed

0 comments on commit a6dd1fe

Please sign in to comment.