Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/performance improvements #398

Merged
merged 8 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 14 additions & 16 deletions SpiffWorkflow/bpmn/specs/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
# 02110-1301 USA

from SpiffWorkflow.exceptions import WorkflowException
from SpiffWorkflow.util.task import TaskState, TaskFilter
from SpiffWorkflow.util.task import TaskState, TaskFilter, TaskIterator
from SpiffWorkflow.specs.StartTask import StartTask
from SpiffWorkflow.specs.Join import Join

Expand Down Expand Up @@ -72,7 +72,7 @@ class BoundaryEventJoin(Join, BpmnTaskSpec):
def __init__(self, wf_spec, name, **kwargs):
super().__init__(wf_spec, name, **kwargs)

def _check_threshold_structured(self, my_task, force=False):
def _check_threshold_structured(self, my_task):
split_task = my_task.find_ancestor(self.split_task)
if split_task is None:
raise WorkflowException(f'Split at {self.split_task} was not reached', task_spec=self)
Expand All @@ -97,15 +97,15 @@ def _check_threshold_structured(self, my_task, force=False):
cancel += [main]
else:
cancel = []
return force or finished, cancel
return finished, cancel


class StartEventJoin(Join, BpmnTaskSpec):

def __init__(self, wf_spec, name, **kwargs):
super().__init__(wf_spec, name, **kwargs)

def _check_threshold_structured(self, my_task, force=False):
def _check_threshold_structured(self, my_task):

split_task = my_task.find_ancestor(self.split_task)
if split_task is None:
Expand All @@ -118,23 +118,21 @@ def _check_threshold_structured(self, my_task, force=False):
else:
waiting.append(task)

return force or may_fire, waiting
return may_fire, waiting


class _EndJoin(UnstructuredJoin, BpmnTaskSpec):

def _check_threshold_unstructured(self, my_task, force=False):
# Look at the tree to find all ready and waiting tasks (excluding
# ourself). The EndJoin waits for everyone!
waiting_tasks = []
for task in my_task.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING):
if task.thread_id != my_task.thread_id:
def _check_threshold_unstructured(self, my_task):
# Look at the tree to find all ready and waiting tasks (excluding ourself). The EndJoin waits for everyone!
for task in TaskIterator(my_task.workflow.task_tree, state=TaskState.NOT_FINISHED_MASK, end_at_spec=self.name):
if task == my_task:
continue
if task.task_spec == my_task.task_spec:
continue
waiting_tasks.append(task)

return force or len(waiting_tasks) == 0, waiting_tasks
may_fire = False
break
else:
may_fire = True
return may_fire

def _run_hook(self, my_task):
result = super(_EndJoin, self)._run_hook(my_task)
Expand Down
37 changes: 19 additions & 18 deletions SpiffWorkflow/bpmn/specs/mixins/inclusive_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
# 02110-1301 USA

from SpiffWorkflow.bpmn.exceptions import WorkflowTaskException
from SpiffWorkflow.util.task import TaskState, TaskFilter
from SpiffWorkflow.util.task import TaskState
from SpiffWorkflow.specs.MultiChoice import MultiChoice
from .unstructured_join import UnstructuredJoin

Expand Down Expand Up @@ -68,39 +68,40 @@ def test(self):
MultiChoice.test(self)
UnstructuredJoin.test(self)

def _check_threshold_unstructured(self, my_task, force=False):
# Look at the tree to find all places where this task is used.
tasks = my_task.workflow.get_tasks(task_filter=TaskFilter(spec_name=self.name))
def _check_threshold_unstructured(self, my_task):
# Look at the tree to find all places where this task is used and unfinished tasks that may be ancestors
# If there are any, we may have to check whether this gateway is reachable from any of them.
tasks, sources = [], []
for task in my_task.workflow.get_tasks(end_at_spec=self.name):
if task.task_spec == self:
tasks.append(task)
elif task.has_state(TaskState.READY|TaskState.WAITING):
sources.append(task.task_spec)

# Look up which tasks have parents completed.
completed_inputs = set([ task.parent.task_spec for task in tasks if task.parent.state == TaskState.COMPLETED ])

# Find waiting tasks
# Exclude tasks whose specs have already been completed
# A spec only has to complete once, even if on multiple paths
waiting_tasks = []
# If any parents of this join have not been finished, this task must wait.
# A parent spec only has to be completed once, even it is on multiple paths
tasks_waiting = False
for task in tasks:
if task.parent.has_state(TaskState.DEFINITE_MASK) and task.parent.task_spec not in completed_inputs:
waiting_tasks.append(task.parent)
tasks_waiting = True
break

if force:
# If force is true, complete the task
complete = True
elif len(waiting_tasks) > 0:
# If we have waiting tasks, we're obviously not done
if tasks_waiting:
complete = False
else:
# Handle the case where there are paths from active tasks that must go through waiting inputs
waiting_inputs = [i for i in self.inputs if i not in completed_inputs]
task_filter = TaskFilter(state=TaskState.READY|TaskState.WAITING)
sources = [t.task_spec for t in my_task.workflow.get_tasks(task_filter=task_filter)]

# This will go back through a task spec's ancestors and return the source, if applicable
def check(spec):
for parent in spec.inputs:
return parent if parent in sources else check(parent)

# If we can get to a completed input from this task, we don't have to wait for it
# Start with the completed inputs and recurse back through its ancestors, removing any waiting tasks that
# could reach one of them.
for spec in completed_inputs:
source = check(spec)
if source is not None:
Expand All @@ -115,7 +116,7 @@ def check(spec):

complete = len(unfinished_paths) == 0

return complete, waiting_tasks
return complete

def _run_hook(self, my_task):
outputs = self._get_matching_outputs(my_task)
Expand Down
26 changes: 13 additions & 13 deletions SpiffWorkflow/bpmn/specs/mixins/parallel_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA

from SpiffWorkflow.util.task import TaskState, TaskFilter
from SpiffWorkflow.util.task import TaskState
from .unstructured_join import UnstructuredJoin


Expand All @@ -41,11 +41,9 @@ class ParallelGateway(UnstructuredJoin):
Essentially, this means that we must wait until we have a completed parent
task on each incoming sequence.
"""
def _check_threshold_unstructured(self, my_task, force=False):
def _check_threshold_unstructured(self, my_task):

tasks = my_task.workflow.get_tasks(task_filter=TaskFilter(spec_name=self.name))
# Look up which tasks have parents completed.
waiting_tasks = []
tasks = my_task.workflow.get_tasks(spec_name=self.name)
waiting_inputs = set(self.inputs)

def remove_ancestor(task):
Expand All @@ -56,13 +54,15 @@ def remove_ancestor(task):
remove_ancestor(task.parent)

for task in tasks:
if task.parent.state == TaskState.COMPLETED and task.parent.task_spec in waiting_inputs:
waiting_inputs.remove(task.parent.task_spec)
# Do not wait for descendants of this task
elif task.is_descendant_of(my_task):
# Handle the case where the parallel gateway is part of a loop.
if task.is_descendant_of(my_task):
# This is the first iteration; we should not wait on this task, because it will not be reached
# until after this join completes
remove_ancestor(task)
# Ignore predicted tasks; we don't care about anything not definite
elif task.parent.has_state(TaskState.DEFINITE_MASK):
waiting_tasks.append(task.parent)
elif my_task.is_descendant_of(task):
# This is an subsequent iteration; we need to ignore the parents of previous iterations
continue
elif task.parent.state == TaskState.COMPLETED and task.parent.task_spec in waiting_inputs:
waiting_inputs.remove(task.parent.task_spec)

return force or len(waiting_inputs) == 0, waiting_tasks
return len(waiting_inputs) == 0
10 changes: 9 additions & 1 deletion SpiffWorkflow/bpmn/specs/mixins/subworkflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,22 @@ def __init__(self, wf_spec, bpmn_id, subworkflow_spec, transaction=False, **kwar

def _on_subworkflow_completed(self, subworkflow, my_task):
self.update_data(my_task, subworkflow)
# I don't like manually moving back to ready, but don't want to run it
# Ideally, update hook would create the subprocess and return True, _run would start the subprocess and
# return None (so that the state would transition to started), and the completed event for this task
# could be used to run post-completed actions automatically.
# However, until I align the events with state transitions, I don't want to encourage external use of
# callback methods (though completed event is not going to change).
if my_task.state is TaskState.COMPLETED:
my_task._set_state(TaskState.READY)

def _update_hook(self, my_task):
subprocess = my_task.workflow.top_workflow.subprocesses.get(my_task.id)
if subprocess is None:
super()._update_hook(my_task)
self.create_workflow(my_task)
self.start_workflow(my_task)
my_task._set_state(TaskState.WAITING)
my_task._set_state(TaskState.STARTED)
else:
return subprocess.is_completed()

Expand Down
66 changes: 28 additions & 38 deletions SpiffWorkflow/bpmn/specs/mixins/unstructured_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,43 +26,33 @@ class UnstructuredJoin(Join):
A helper subclass of Join that makes it work in a slightly friendlier way
for the BPMN style threading
"""
def _do_join(self, my_task):
split_task = self._get_split_task(my_task)
def _update_hook(self, my_task):

# Identify all corresponding task instances within the thread.
# Also remember which of those instances was most recently changed,
# because we are making this one the instance that will
# continue the thread of control. In other words, we will continue
# to build the task tree underneath the most recently changed task.
last_changed = None
thread_tasks = []
for task in TaskIterator(split_task, spec_name=self.name):
if task.thread_id != my_task.thread_id:
# Ignore tasks from other threads. (Do we need this condition?)
continue
if not task.parent.has_state(TaskState.FINISHED_MASK):
# For an inclusive join, this can happen - it's a future join
continue
if my_task.is_descendant_of(task):
# Skip ancestors (otherwise the branch this task is on will get dropped)
continue
# We have found a matching instance.
thread_tasks.append(task)
may_fire = self._check_threshold_unstructured(my_task)
other_tasks = [t for t in my_task.workflow.tasks.values() if t.task_spec == self and t != my_task and t.state is TaskState.WAITING]
for task in other_tasks:
# By cancelling other waiting tasks immediately, we can prevent them from being updated repeeatedly and pointlessly
task.cancel()
if not may_fire:
# Only the most recent instance of the spec needs to wait.
my_task._set_state(TaskState.WAITING)
else:
# Only copy the data to the task that will proceed
my_task._inherit_data()
return may_fire

# Check whether the state of the instance was recently changed.
changed = task.parent.last_state_change
if last_changed is None or changed > last_changed.parent.last_state_change:
last_changed = task

# Update data from all the same thread tasks.
thread_tasks.sort(key=lambda t: t.parent.last_state_change)
collected_data = {}
for task in thread_tasks:
collected_data.update(task.data)

for task in thread_tasks:
if task != last_changed:
task._set_state(TaskState.CANCELLED)
task._drop_children()
else:
task.data.update(collected_data)
def _run_hook(self, my_task):
other_tasks = filter(
lambda t: t.task_spec == self and t != my_task and t.has_state(TaskState.FINISHED_MASK) and not my_task.is_descendant_of(t),
my_task.workflow.tasks.values()
)
pass
for task in sorted(other_tasks, key=lambda t: t.last_state_change):
# By inheriting directly from parent tasks, we can avoid copying previouly merged data
my_task.data.update(task.parent.data)
# This condition only applies when a workflow is reset inside a parallel branch.
# If reset to a branch that was originally cancelled, all the descendants of the previously completed branch will still
# appear in the tree, potentially corrupting the structure and data.
if task.has_state(TaskState.COMPLETED):
task._drop_children(force=True)
return True
21 changes: 11 additions & 10 deletions SpiffWorkflow/bpmn/util/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA

from SpiffWorkflow.util.task import TaskFilter, TaskIterator
from SpiffWorkflow.util.task import TaskFilter, TaskIterator, TaskState
from SpiffWorkflow.bpmn.specs.mixins.events.event_types import CatchingEvent

class BpmnTaskFilter(TaskFilter):
Expand Down Expand Up @@ -54,27 +54,28 @@ def _next(self):
task = self.task_list.pop(0)
subprocess = task.workflow.top_workflow.subprocesses.get(task.id)

if task.task_spec.name == self.end_at_spec:
self.task_list = []
elif all([
if all([
len(task._children) > 0 or subprocess is not None,
task.state >= self.min_state or subprocess is not None,
self.depth < self.max_depth,
task.task_spec.name != self.end_at_spec,
]):
if subprocess is None:
next_tasks = task.children
elif self.depth_first:
next_tasks = [subprocess.task_tree] + task.children
# Do not descend into a completed subprocess to look for unfinished tasks.
if subprocess is None or (task.state >= TaskState.FINISHED_MASK and self.task_filter.state <= TaskState.FINISHED_MASK):
subprocess_tasks = []
else:
next_tasks = task.children + [subprocess.task_tree]
subprocess_tasks = [subprocess.task_tree]

if self.depth_first:
next_tasks = subprocess_tasks + task.children
self.task_list = next_tasks + self.task_list
else:
next_tasks = task.children + subprocess_tasks
self.task_list.extend(next_tasks)

self._update_depth(task)

elif self.depth_first and len(self.task_list) > 0:
self._handle_leaf_depth(task)

return task
return task
Loading
Loading