Skip to content

Commit

Permalink
Merge pull request #19 from botify-labs/fix/schedule-failed
Browse files Browse the repository at this point in the history
Fix/schedule failed
  • Loading branch information
ggreg committed Aug 7, 2014
2 parents 39e7594 + 50a30c9 commit 70b72cf
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 5 deletions.
9 changes: 7 additions & 2 deletions simpleflow/history.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import collections


class History(object):
def __init__(self, history):
self._history = history
self._activities = {}
self._child_workflows = {}
self._activities = collections.defaultdict(
lambda: {'type': 'activity'})
self._child_workflows = collections.defaultdict(
lambda: {'type': 'child_workflow'})

@property
def events(self):
Expand Down
1 change: 1 addition & 0 deletions simpleflow/swf/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import swf.models
import swf.models.decision
import swf.exceptions

from simpleflow import (
executor,
Expand Down
44 changes: 41 additions & 3 deletions tests/test_dataflow.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import functools
import mock

import boto.swf


Expand Down Expand Up @@ -955,11 +958,9 @@ def test_activity_not_found_schedule_failed():
history = builder.History(workflow)
decision_id = history.last_id
(history
.add_activity_task(
raise_on_failure,
.add_activity_task_schedule_failed(
activity_id='activity-tests.test_dataflow.increment-1',
decision_id=decision_id,
last_state='schedule_failed',
activity_type={
'name': increment.name,
'version': increment.version
Expand All @@ -968,3 +969,40 @@ def test_activity_not_found_schedule_failed():

decisions, _ = executor.replay(history)
check_task_scheduled_decision(decisions[0], increment)


def raise_already_exists(activity):
@functools.wraps(raise_already_exists)
def wrapped(*args):
raise swf.exceptions.AlreadyExistsError(
'<ActivityType domain={} name={} version={} status=REGISTERED> '
'already exists'.format(
DOMAIN.name,
activity.name,
activity.version))

return wrapped


def test_activity_not_found_schedule_failed_already_exists():
workflow = TestDefinition
executor = Executor(DOMAIN, workflow)

history = builder.History(workflow)
decision_id = history.last_id
(history
.add_activity_task_schedule_failed(
activity_id='activity-tests.test_dataflow.increment-1',
decision_id=decision_id,
activity_type={
'name': increment.name,
'version': increment.version
},
cause='ACTIVITY_TYPE_DOES_NOT_EXIST'))

with mock.patch(
'swf.models.ActivityType.save',
raise_already_exists(increment)):
decisions, _ = executor.replay(history)

check_task_scheduled_decision(decisions[0], increment)

0 comments on commit 70b72cf

Please sign in to comment.