diff --git a/simpleflow/constants.py b/simpleflow/constants.py deleted file mode 100644 index 293501814..000000000 --- a/simpleflow/constants.py +++ /dev/null @@ -1 +0,0 @@ -MAX_DECISIONS = 100 diff --git a/simpleflow/swf/constants.py b/simpleflow/swf/constants.py new file mode 100644 index 000000000..6a5edd01d --- /dev/null +++ b/simpleflow/swf/constants.py @@ -0,0 +1,2 @@ +MAX_DECISIONS = 100 +MAX_OPEN_ACTIVITY_COUNT = 1000 diff --git a/simpleflow/swf/executor.py b/simpleflow/swf/executor.py index 422a1be13..5b47269b4 100644 --- a/simpleflow/swf/executor.py +++ b/simpleflow/swf/executor.py @@ -12,12 +12,12 @@ executor, futures, exceptions, - constants, ) from simpleflow.activity import Activity from simpleflow.workflow import Workflow from simpleflow.history import History from simpleflow.swf.task import ActivityTask, WorkflowTask +from simpleflow.swf import constants logger = logging.getLogger(__name__) @@ -69,6 +69,7 @@ def reset(self): on each replay. """ + self._open_activity_count = 0 self._decisions = [] self._tasks = TaskRegistry() @@ -92,7 +93,7 @@ def _get_future_from_activity_event(self, event): :type event: swf.event.Event. """ - future = futures.Future() + future = futures.Future() # state is PENDING. state = event['state'] if state == 'scheduled': @@ -118,7 +119,8 @@ def _get_future_from_activity_event(self, event): future._state = futures.RUNNING elif state == 'completed': future._state = futures.FINISHED - future._result = json.loads(event['result']) + result = event['result'] + future._result = json.loads(result) if result else None elif state == 'canceled': future._state = futures.CANCELLED elif state == 'failed': @@ -202,6 +204,7 @@ def schedule_task(self, task): decisions = task.schedule(self.domain) # ``decisions`` contains a single decision. self._decisions.extend(decisions) + self._open_activity_count += 1 if len(self._decisions) == constants.MAX_DECISIONS - 1: # We add a timer to wake up the workflow immediately after # completing these decisions. @@ -226,6 +229,8 @@ def resume(self, task, *args, **kwargs): if event: if event['type'] == 'activity': future = self.resume_activity(task, event) + if future and future._state in (futures.PENDING, futures.RUNNING): + self._open_activity_count += 1 elif event['type'] == 'child_workflow': future = self.resume_child_workflow(task, event) @@ -233,6 +238,9 @@ def resume(self, task, *args, **kwargs): self.schedule_task(task) future = futures.Future() # return a pending future. + if self._open_activity_count == constants.MAX_OPEN_ACTIVITY_COUNT: + raise exceptions.ExecutionBlocked + return future def submit(self, func, *args, **kwargs): diff --git a/tests/test_dataflow.py b/tests/test_dataflow.py index c5d69e6fa..f7e57f34f 100644 --- a/tests/test_dataflow.py +++ b/tests/test_dataflow.py @@ -26,8 +26,8 @@ def register_activity_type(self, *args, **kwargs): Workflow, activity, futures, - constants, ) +from simpleflow.swf import constants from simpleflow.swf.executor import Executor @@ -1007,3 +1007,142 @@ def test_activity_not_found_schedule_failed_already_exists(): decisions, _ = executor.replay(history) check_task_scheduled_decision(decisions[0], increment) + + +class TestDefinitionMoreThanMaxOpenActivities(TestWorkflow): + """ + This workflow executes more tasks than the maximum number of decisions a + decider can take once. + + """ + def run(self): + results = self.map( + increment, + xrange(constants.MAX_OPEN_ACTIVITY_COUNT + 20)) + futures.wait(*results) + + +def test_more_than_1000_open_activities_scheduled(): + workflow = TestDefinitionMoreThanMaxOpenActivities + executor = Executor(DOMAIN, workflow) + history = builder.History(workflow) + + # The first time, the executor should schedule + # ``constants.MAX_OPEN_ACTIVITY_COUNT`` decisions. + # No timer because we wait for at least an activity to complete. + for i in xrange(constants.MAX_OPEN_ACTIVITY_COUNT / constants.MAX_DECISIONS): + decisions, _ = executor.replay(history) + assert len(decisions) == constants.MAX_DECISIONS + + decision_id = history.last_id + for i in xrange(constants.MAX_OPEN_ACTIVITY_COUNT): + history.add_activity_task( + increment, + decision_id=decision_id, + activity_id='activity-tests.test_dataflow.increment-{}'.format( + i + 1), + last_state='scheduled', + result=i + 1) + (history + .add_decision_task_scheduled() + .add_decision_task_started()) + + decisions, _ = executor.replay(history) + assert executor._open_activity_count == constants.MAX_OPEN_ACTIVITY_COUNT + assert len(decisions) == 0 + + +def test_more_than_1000_open_activities_scheduled_and_running(): + def get_random_state(): + import random + return random.choice(['scheduled', 'started']) + + workflow = TestDefinitionMoreThanMaxOpenActivities + executor = Executor(DOMAIN, workflow) + history = builder.History(workflow) + + # The first time, the executor should schedule + # ``constants.MAX_OPEN_ACTIVITY_COUNT`` decisions. + # No timer because we wait for at least an activity to complete. + for i in xrange(constants.MAX_OPEN_ACTIVITY_COUNT / constants.MAX_DECISIONS): + decisions, _ = executor.replay(history) + assert len(decisions) == constants.MAX_DECISIONS + + decision_id = history.last_id + for i in xrange(constants.MAX_OPEN_ACTIVITY_COUNT): + history.add_activity_task( + increment, + decision_id=decision_id, + activity_id='activity-tests.test_dataflow.increment-{}'.format( + i + 1), + last_state=get_random_state(), + result=i + 1) + (history + .add_decision_task_scheduled() + .add_decision_task_started()) + + decisions, _ = executor.replay(history) + assert len(decisions) == 0 + + +def test_more_than_1000_open_activities_partial_max(): + workflow = TestDefinitionMoreThanMaxOpenActivities + executor = Executor(DOMAIN, workflow) + history = builder.History(workflow) + decisions, _ = executor.replay(history) + + first_decision_id = history.last_id + for i in xrange(constants.MAX_OPEN_ACTIVITY_COUNT - 2): + history.add_activity_task( + increment, + decision_id=first_decision_id, + activity_id='activity-tests.test_dataflow.increment-{}'.format( + i + 1), + last_state='scheduled', + result=i + 1) + (history + .add_decision_task_scheduled() + .add_decision_task_started()) + + decisions, _ = executor.replay(history) + assert executor._open_activity_count == constants.MAX_OPEN_ACTIVITY_COUNT + assert len(decisions) == 2 + + history.add_decision_task_completed() + for i in xrange(2): + id_ = constants.MAX_OPEN_ACTIVITY_COUNT - 2 + i + 1 + history.add_activity_task( + increment, + decision_id=history.last_id, + activity_id='activity-tests.test_dataflow.increment-{}'.format( + id_), + last_state='scheduled', + result=id_, + ) + + (history + .add_decision_task_scheduled() + .add_decision_task_started()) + + decisions, _ = executor.replay(history) + assert executor._open_activity_count == constants.MAX_OPEN_ACTIVITY_COUNT + assert len(decisions) == 0 + + history.add_decision_task_completed() + + for i in xrange(constants.MAX_OPEN_ACTIVITY_COUNT - 2): + scheduled_id = first_decision_id + i + 1 + history.add_activity_task_started(scheduled_id) + history.add_activity_task_completed( + scheduled_id, + started=history.last_id, + ) + + (history + .add_decision_task_scheduled() + .add_decision_task_started()) + + decisions, _ = executor.replay(history) + # 2 already scheduled + 20 to schedule now + assert executor._open_activity_count == 22 + assert len(decisions) == 20