Skip to content

Commit

Permalink
Update swf.executor #30: handle more then 1,000 open activity tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Greg Leclercq committed Mar 23, 2015
1 parent 509324f commit 08b7c30
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 5 deletions.
1 change: 0 additions & 1 deletion simpleflow/constants.py

This file was deleted.

2 changes: 2 additions & 0 deletions simpleflow/swf/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
MAX_DECISIONS = 100
MAX_OPEN_ACTIVITY_COUNT = 1000
14 changes: 11 additions & 3 deletions simpleflow/swf/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
executor,
futures,
exceptions,
constants,
)
from simpleflow.activity import Activity
from simpleflow.workflow import Workflow
from simpleflow.history import History
from simpleflow.swf.task import ActivityTask, WorkflowTask
from simpleflow.swf import constants

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -69,6 +69,7 @@ def reset(self):
on each replay.
"""
self._open_activity_count = 0
self._decisions = []
self._tasks = TaskRegistry()

Expand All @@ -92,7 +93,7 @@ def _get_future_from_activity_event(self, event):
:type event: swf.event.Event.
"""
future = futures.Future()
future = futures.Future() # state is PENDING.
state = event['state']

if state == 'scheduled':
Expand All @@ -118,7 +119,8 @@ def _get_future_from_activity_event(self, event):
future._state = futures.RUNNING
elif state == 'completed':
future._state = futures.FINISHED
future._result = json.loads(event['result'])
result = event['result']
future._result = json.loads(result) if result else None
elif state == 'canceled':
future._state = futures.CANCELLED
elif state == 'failed':
Expand Down Expand Up @@ -202,6 +204,7 @@ def schedule_task(self, task):
decisions = task.schedule(self.domain)
# ``decisions`` contains a single decision.
self._decisions.extend(decisions)
self._open_activity_count += 1
if len(self._decisions) == constants.MAX_DECISIONS - 1:
# We add a timer to wake up the workflow immediately after
# completing these decisions.
Expand All @@ -226,13 +229,18 @@ def resume(self, task, *args, **kwargs):
if event:
if event['type'] == 'activity':
future = self.resume_activity(task, event)
if future and future._state in (futures.PENDING, futures.RUNNING):
self._open_activity_count += 1
elif event['type'] == 'child_workflow':
future = self.resume_child_workflow(task, event)

if not future:
self.schedule_task(task)
future = futures.Future() # return a pending future.

if self._open_activity_count == constants.MAX_OPEN_ACTIVITY_COUNT:
raise exceptions.ExecutionBlocked

return future

def submit(self, func, *args, **kwargs):
Expand Down
141 changes: 140 additions & 1 deletion tests/test_dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ def register_activity_type(self, *args, **kwargs):
Workflow,
activity,
futures,
constants,
)
from simpleflow.swf import constants
from simpleflow.swf.executor import Executor


Expand Down Expand Up @@ -1007,3 +1007,142 @@ def test_activity_not_found_schedule_failed_already_exists():
decisions, _ = executor.replay(history)

check_task_scheduled_decision(decisions[0], increment)


class TestDefinitionMoreThanMaxOpenActivities(TestWorkflow):
"""
This workflow executes more tasks than the maximum number of decisions a
decider can take once.
"""
def run(self):
results = self.map(
increment,
xrange(constants.MAX_OPEN_ACTIVITY_COUNT + 20))
futures.wait(*results)


def test_more_than_1000_open_activities_scheduled():
workflow = TestDefinitionMoreThanMaxOpenActivities
executor = Executor(DOMAIN, workflow)
history = builder.History(workflow)

# The first time, the executor should schedule
# ``constants.MAX_OPEN_ACTIVITY_COUNT`` decisions.
# No timer because we wait for at least an activity to complete.
for i in xrange(constants.MAX_OPEN_ACTIVITY_COUNT / constants.MAX_DECISIONS):
decisions, _ = executor.replay(history)
assert len(decisions) == constants.MAX_DECISIONS

decision_id = history.last_id
for i in xrange(constants.MAX_OPEN_ACTIVITY_COUNT):
history.add_activity_task(
increment,
decision_id=decision_id,
activity_id='activity-tests.test_dataflow.increment-{}'.format(
i + 1),
last_state='scheduled',
result=i + 1)
(history
.add_decision_task_scheduled()
.add_decision_task_started())

decisions, _ = executor.replay(history)
assert executor._open_activity_count == constants.MAX_OPEN_ACTIVITY_COUNT
assert len(decisions) == 0


def test_more_than_1000_open_activities_scheduled_and_running():
def get_random_state():
import random
return random.choice(['scheduled', 'started'])

workflow = TestDefinitionMoreThanMaxOpenActivities
executor = Executor(DOMAIN, workflow)
history = builder.History(workflow)

# The first time, the executor should schedule
# ``constants.MAX_OPEN_ACTIVITY_COUNT`` decisions.
# No timer because we wait for at least an activity to complete.
for i in xrange(constants.MAX_OPEN_ACTIVITY_COUNT / constants.MAX_DECISIONS):
decisions, _ = executor.replay(history)
assert len(decisions) == constants.MAX_DECISIONS

decision_id = history.last_id
for i in xrange(constants.MAX_OPEN_ACTIVITY_COUNT):
history.add_activity_task(
increment,
decision_id=decision_id,
activity_id='activity-tests.test_dataflow.increment-{}'.format(
i + 1),
last_state=get_random_state(),
result=i + 1)
(history
.add_decision_task_scheduled()
.add_decision_task_started())

decisions, _ = executor.replay(history)
assert len(decisions) == 0


def test_more_than_1000_open_activities_partial_max():
workflow = TestDefinitionMoreThanMaxOpenActivities
executor = Executor(DOMAIN, workflow)
history = builder.History(workflow)
decisions, _ = executor.replay(history)

first_decision_id = history.last_id
for i in xrange(constants.MAX_OPEN_ACTIVITY_COUNT - 2):
history.add_activity_task(
increment,
decision_id=first_decision_id,
activity_id='activity-tests.test_dataflow.increment-{}'.format(
i + 1),
last_state='scheduled',
result=i + 1)
(history
.add_decision_task_scheduled()
.add_decision_task_started())

decisions, _ = executor.replay(history)
assert executor._open_activity_count == constants.MAX_OPEN_ACTIVITY_COUNT
assert len(decisions) == 2

history.add_decision_task_completed()
for i in xrange(2):
id_ = constants.MAX_OPEN_ACTIVITY_COUNT - 2 + i + 1
history.add_activity_task(
increment,
decision_id=history.last_id,
activity_id='activity-tests.test_dataflow.increment-{}'.format(
id_),
last_state='scheduled',
result=id_,
)

(history
.add_decision_task_scheduled()
.add_decision_task_started())

decisions, _ = executor.replay(history)
assert executor._open_activity_count == constants.MAX_OPEN_ACTIVITY_COUNT
assert len(decisions) == 0

history.add_decision_task_completed()

for i in xrange(constants.MAX_OPEN_ACTIVITY_COUNT - 2):
scheduled_id = first_decision_id + i + 1
history.add_activity_task_started(scheduled_id)
history.add_activity_task_completed(
scheduled_id,
started=history.last_id,
)

(history
.add_decision_task_scheduled()
.add_decision_task_started())

decisions, _ = executor.replay(history)
# 2 already scheduled + 20 to schedule now
assert executor._open_activity_count == 22
assert len(decisions) == 20

0 comments on commit 08b7c30

Please sign in to comment.