Skip to content

Commit

Permalink
Job cleaner, use own thread pool for graph traversal
Browse files Browse the repository at this point in the history
  • Loading branch information
albertz committed Nov 27, 2024
1 parent 8d4e9fd commit 3d93e7e
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
6 changes: 4 additions & 2 deletions sisyphus/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ def get_unfinished_jobs(job):
self.for_all_nodes(get_unfinished_jobs, nodes=nodes)
return states

def for_all_nodes(self, f, nodes=None, bottom_up=False):
def for_all_nodes(self, f, nodes=None, bottom_up=False, *, pool: Optional[ThreadPool] = None):
"""
Run function f for each node and ancestor for `nodes` from top down,
stop expanding tree branch if functions returns False. Does not stop on None to allow functions with no
Expand All @@ -497,6 +497,7 @@ def for_all_nodes(self, f, nodes=None, bottom_up=False):
:param (Job)->bool f: function will be executed for all nodes
:param nodes: all nodes that will be checked, defaults to all output nodes in graph
:param bool bottom_up: start with deepest nodes first, ignore return value of f
:param pool: use custom thread pool
:return: set with all visited nodes
"""

Expand Down Expand Up @@ -544,7 +545,8 @@ def for_all_nodes(self, f, nodes=None, bottom_up=False):

pool_lock = threading.Lock()
finished_lock = threading.Lock()
pool = self.pool
if not pool:
pool = self.pool

# recursive function to run through tree
def runner(job):
Expand Down
2 changes: 1 addition & 1 deletion sisyphus/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def f(job):
return True

while not self.stopped:
self.sis_graph.for_all_nodes(f)
self.sis_graph.for_all_nodes(f, pool=self.thread_pool)
time.sleep(gs.JOB_CLEANER_INTERVAL)

def close(self):
Expand Down

0 comments on commit 3d93e7e

Please sign in to comment.