Skip to content

Commit

Permalink
Fix swf.executor #15: keep the state of an activity when it is schedu…
Browse files Browse the repository at this point in the history
…led again
  • Loading branch information
Greg Leclercq committed Aug 5, 2014
1 parent a3098bb commit a18fdbb
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 1 deletion.
10 changes: 9 additions & 1 deletion simpleflow/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,15 @@ def get_activity(event):
'state': event.state,
'scheduled_id': event.id,
}
self._activities[event.activity_id] = activity
if event.activity_id not in self._activities:
self._activities[event.activity_id] = activity
else:
# When the executor retries a task, it schedules it again.
# We have to take care of not overriding some values set by the
# previous execution of the task such as the number of retries
# in ``retry``. As the state of the event mutates, it
# corresponds to the last execution.
self._activities[event.activity_id].update(activity)
elif event.state == 'started':
activity = get_activity(event)
activity['state'] = event.state
Expand Down
48 changes: 48 additions & 0 deletions tests/test_dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,54 @@ def test_workflow_retry_activity():
assert decisions[0] == workflow_completed


def test_workflow_retry_activity_failed_again():
workflow = TestDefinitionRetryActivity
executor = Executor(DOMAIN, workflow)

history = builder.History(workflow)

# There is a single task, hence the executor should schedule it first.
decisions, _ = executor.replay(history)
check_task_scheduled_decision(decisions[0], increment_retry)

# Let's add the task in ``failed`` state.
decision_id = history.last_id
(history
.add_activity_task(
increment_retry,
decision_id=decision_id,
last_state='failed',
activity_id='activity-tests.test_dataflow.increment_retry-1')
.add_decision_task_scheduled()
.add_decision_task_started())

# As the retry value is one, the executor should retry i.e. schedule the
# task again.
decisions, _ = executor.replay(history)
check_task_scheduled_decision(decisions[0], increment_retry)

# Let's add the task in ``failed`` state again.
decision_id = history.last_id
(history
.add_activity_task(
increment_retry,
decision_id=decision_id,
last_state='failed',
activity_id='activity-tests.test_dataflow.increment_retry-1')
.add_decision_task_scheduled()
.add_decision_task_started())

# There is no more retry. The executor should set `Future.exception` and
# complete the workflow as there is no further task.
decisions, _ = executor.replay(history)

workflow_completed = swf.models.decision.WorkflowExecutionDecision()
# ``a.result`` is ``None`` because it was not set.
workflow_completed.complete(result=json.dumps(None))

assert decisions[0] == workflow_completed


class TestDefinitionChildWorkflow(TestWorkflow):
"""
This workflow executes a child workflow.
Expand Down

0 comments on commit a18fdbb

Please sign in to comment.