diff --git a/beams/behavior_tree/ActionNode.py b/beams/behavior_tree/ActionNode.py index 4ae5807..35761d6 100644 --- a/beams/behavior_tree/ActionNode.py +++ b/beams/behavior_tree/ActionNode.py @@ -16,18 +16,17 @@ def __init__( name: str, work_func: Callable[[Any], None], completion_condition: Callable[[Any], bool], - work_gate=Event(), - work_lock=Lock(), **kwargs, ): # TODO: can add failure condition argument... super().__init__(name) - # print(type(self.status)) - self.__volatile_status__ = VolatileStatus(self.status) - # TODO may want to instantiate these locally and then decorate the passed work function with them - self.work_gate = work_gate - self.lock = work_lock + + # Interprocess signalling mechanisms + self.__volatile_status__ = VolatileStatus(self.status) # Shares BT Status by wrapping Value + self.work_gate = Event() # Mechanism to avoid busy wait / signal node has been py_trees "initialized" + self.worker = ActionWorker(proc_name=name, volatile_status=self.__volatile_status__, + wait_for_tick=self.work_gate, work_func=work_func, comp_cond=completion_condition, stop_func=None diff --git a/beams/behavior_tree/ActionWorker.py b/beams/behavior_tree/ActionWorker.py index 5736576..e45c5bf 100644 --- a/beams/behavior_tree/ActionWorker.py +++ b/beams/behavior_tree/ActionWorker.py @@ -1,6 +1,7 @@ """ A worker specialized to execute ActionNode work functions """ +from multiprocessing import Event from typing import Callable, Any, Optional from epics.multiproc import CAProcess @@ -14,13 +15,18 @@ def __init__(self, proc_name: str, volatile_status: VolatileStatus, work_func: Callable[[Any], None], + wait_for_tick: Event, comp_cond: Callable[[Any], bool], stop_func: Optional[Callable[[None], None]] = None): super().__init__(proc_name=proc_name, stop_func=stop_func, work_func=work_func, proc_type=CAProcess, - add_args=(comp_cond, volatile_status)) + add_args=(comp_cond, volatile_status, wait_for_tick)) # Note: there may be a world where we define a common stop_func here in which case - # the class may have maintain a reference to voltaile_status and or comp_cond \ No newline at end of file + # the class may have maintain a reference to voltaile_status and or comp_cond + + # Note: the function passed to action_worker needs to have 4 arguments of + # (self, comp_condition, volatile_status, event). I am considering using functools.wraps and inspect.signature + # to ensure that the work function is well formatted... \ No newline at end of file diff --git a/beams/tests/test_check_and_do.py b/beams/tests/test_check_and_do.py index 6f2731b..7fe9b90 100644 --- a/beams/tests/test_check_and_do.py +++ b/beams/tests/test_check_and_do.py @@ -1,16 +1,20 @@ import time +from typing import Callable, Any from multiprocessing import Value import py_trees -from beams.behavior_tree import ActionNode, CheckAndDo, ConditionNode +from beams.behavior_tree import ActionNode, CheckAndDo, ConditionNode, VolatileStatus class TestTask: def test_check_and_do(self, capsys): percentage_complete = Value("i", 0) - def thisjob(myself, comp_condition, volatile_status) -> None: + def thisjob(myself, + comp_condition: Callable[[Any], None], + volatile_status: VolatileStatus, + *args) -> None: # TODO: grabbing intended keyword argument. Josh's less than pythonic mechanism for closures volatile_status.set_value(py_trees.common.Status.RUNNING) while not comp_condition(percentage_complete.value): diff --git a/beams/tests/test_leaf_node.py b/beams/tests/test_leaf_node.py index 953c139..5d84f87 100644 --- a/beams/tests/test_leaf_node.py +++ b/beams/tests/test_leaf_node.py @@ -1,10 +1,12 @@ import time +from typing import Callable, Any from multiprocessing import Value import py_trees from beams.behavior_tree.ActionNode import ActionNode from beams.behavior_tree.ConditionNode import ConditionNode +from beams.behavior_tree.VolatileStatus import VolatileStatus class TestTask: @@ -13,7 +15,10 @@ def test_action_node(self, capsys): # For test percentage_complete = Value("i", 0) - def thisjob(myself, comp_condition, volatile_status) -> None: + def thisjob(myself, + comp_condition: Callable[[Any], None], + volatile_status: VolatileStatus, + *args) -> None: volatile_status.set_value(py_trees.common.Status.RUNNING) while not comp_condition(percentage_complete.value): py_trees.console.logdebug(f"yuh {percentage_complete.value}, {volatile_status.get_value()}") diff --git a/beams/tree_config.py b/beams/tree_config.py index 11bbc74..c989fcc 100644 --- a/beams/tree_config.py +++ b/beams/tree_config.py @@ -167,41 +167,38 @@ class SetPVActionItem(ActionItem): termination_check: ConditionItem = field(default_factory=ConditionItem) def get_tree(self) -> ActionNode: - # TODO: can I put these two lines in a decorator which action node uses on the function? - wait_for_tick = Event() - wait_for_tick_lock = Lock() - - def work_func(myself, comp_condition, volatile_status): - py_trees.console.logdebug(f"WAITING FOR INIT {os.getpid()} " - f"from node: {self.name}") - wait_for_tick.wait() - - # Set to running - value = 0 - - # While termination_check is not True - while not comp_condition(): # TODO check work_gate.is_set() - py_trees.console.logdebug( - f"CALLING CAGET FROM {os.getpid()} from node: " f"{self.name}" - ) - value = caget(self.termination_check.pv) - + def work_func(myself, + comp_condition: Callable[[Any], bool], + volatile_status: VolatileStatus, + wait_for_tick: Event()): + while myself.do_work.value: + py_trees.console.logdebug(f"WAITING FOR INIT {os.getpid()} " + f"from node: {self.name}") + wait_for_tick.wait() + + # While termination_check is not True + while not comp_condition(): # TODO check work_gate.is_set() + py_trees.console.logdebug( + f"CALLING CAGET FROM {os.getpid()} from node: " f"{self.name}" + ) + value = caget(self.termination_check.pv) + + if comp_condition(): + volatile_status.set_value(py_trees.common.Status.SUCCESS) + py_trees.console.logdebug( + f"{self.name}: Value is {value}, BT Status: " + f"{volatile_status.get_value()}" + ) + + # specific caput logic to SetPVActionItem + caput(self.pv, self.value) + time.sleep(self.loop_period_sec) + + # one last check if comp_condition(): volatile_status.set_value(py_trees.common.Status.SUCCESS) - py_trees.console.logdebug( - f"{self.name}: Value is {value}, BT Status: " - f"{volatile_status.get_value()}" - ) - - # specific caput logic to SetPVActionItem - caput(self.pv, self.value) - time.sleep(self.loop_period_sec) - - # one last check - if comp_condition(): - volatile_status.set_value(py_trees.common.Status.SUCCESS) - else: - volatile_status.set_value(py_trees.common.Status.FAILURE) + else: + volatile_status.set_value(py_trees.common.Status.FAILURE) comp_cond = self.termination_check.get_condition_function() @@ -209,8 +206,6 @@ def work_func(myself, comp_condition, volatile_status): name=self.name, work_func=work_func, completion_condition=comp_cond, - work_gate=wait_for_tick, - work_lock=wait_for_tick_lock, ) return node @@ -225,40 +220,38 @@ class IncPVActionItem(ActionItem): # TODO: DRY this out a bit def get_tree(self) -> ActionNode: - wait_for_tick = Event() - wait_for_tick_lock = Lock() - - def work_func(myself, comp_condition, volatile_status): - py_trees.console.logdebug(f"WAITING FOR INIT {os.getpid()} " - f"from node: {self.name}") - wait_for_tick.wait() - - # Set to running - value = 0 - - # While termination_check is not True - while not comp_condition(): # TODO check work_gate.is_set() - py_trees.console.logdebug( - f"CALLING CAGET FROM {os.getpid()} from node: " f"{self.name}" - ) - value = caget(self.pv) - + def work_func(myself, + comp_condition: Callable[[Any], bool], + volatile_status: VolatileStatus, + wait_for_tick: Event()): + while myself.do_work.value: + py_trees.console.logdebug(f"WAITING FOR INIT {os.getpid()} " + f"from node: {self.name}") + wait_for_tick.wait() + + # While termination_check is not True + while not comp_condition(): # TODO check work_gate.is_set() + py_trees.console.logdebug( + f"CALLING CAGET FROM {os.getpid()} from node: " f"{self.name}" + ) + value = caget(self.pv) + + if comp_condition(): + volatile_status.set_value(py_trees.common.Status.SUCCESS) + py_trees.console.logdebug( + f"{self.name}: Value is {value}, BT Status: " + f"{volatile_status.get_value()}" + ) + + # specific caput logic to IncPVActionItem + caput(self.pv, value + self.increment) + time.sleep(self.loop_period_sec) + + # one last check if comp_condition(): volatile_status.set_value(py_trees.common.Status.SUCCESS) - py_trees.console.logdebug( - f"{self.name}: Value is {value}, BT Status: " - f"{volatile_status.get_value()}" - ) - - # specific caput logic to IncPVActionItem - caput(self.pv, value + self.increment) - time.sleep(self.loop_period_sec) - - # one last check - if comp_condition(): - volatile_status.set_value(py_trees.common.Status.SUCCESS) - else: - volatile_status.set_value(py_trees.common.Status.FAILURE) + else: + volatile_status.set_value(py_trees.common.Status.FAILURE) comp_cond = self.termination_check.get_condition_function() @@ -266,8 +259,6 @@ def work_func(myself, comp_condition, volatile_status): name=self.name, work_func=work_func, completion_condition=comp_cond, - work_gate=wait_for_tick, - work_lock=wait_for_tick_lock, ) return node