Skip to content

Commit

Permalink
feat(ActionWorker, ActionNode, Timeout): moved wrapped_action_work to…
Browse files Browse the repository at this point in the history
… ActioNodeWorker, added functional timeout ability, tested
  • Loading branch information
joshc-slac committed Oct 19, 2024
1 parent 561ee97 commit c275289
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 61 deletions.
60 changes: 3 additions & 57 deletions beams/behavior_tree/ActionNode.py
Original file line number Diff line number Diff line change
@@ -1,71 +1,17 @@
import atexit
import logging
import os
import time
from multiprocessing import Event, Queue, Value
from typing import Callable
from multiprocessing import Event

import py_trees

from beams.behavior_tree.ActionWorker import ActionWorker
from beams.behavior_tree.ActionWorker import ActionWorker, wrapped_action_work # the latter is grabbed as a pas through
from beams.behavior_tree.VolatileStatus import VolatileStatus
from beams.typing_helper import (ActionNodeWorkFunction, ActionNodeWorkLoop,
Evaluatable)
from beams.typing_helper import ActionNodeWorkLoop, Evaluatable

logger = logging.getLogger(__name__)


def wrapped_action_work(loop_period_sec: float = 0.1):
def action_worker_work_function_generator(func: ActionNodeWorkFunction) -> ActionNodeWorkLoop:
def work_wrapper(
do_work: Value,
name: str,
work_gate: Event,
volatile_status: VolatileStatus,
completion_condition: Evaluatable,
log_queue: Queue,
log_configurer: Callable) -> None:
"""
Wrap self.work_func, and set up logging / status communication
InterProcess Communication performed by shared memory objects:
- volatile status
- logging queue
Runs a persistent while loop, in which the work func is called repeatedly
"""
log_configurer(log_queue)
while (do_work.value):
logger.debug(f"WAITING FOR INIT from node: {name}")
work_gate.wait()
work_gate.clear()

# Set to running
volatile_status.set_value(py_trees.common.Status.RUNNING)
while not completion_condition():
logger.debug(f"CALLING CAGET FROM from node ({name})")
try:
status = func(completion_condition)
except Exception as ex:
volatile_status.set_value(py_trees.common.Status.FAILURE)
logger.error(f"Work function failed, setting node ({name}) "
f"as FAILED. ({ex})")
break

volatile_status.set_value(status)
logger.debug(f"Setting node ({name}): {volatile_status.get_value()}")
time.sleep(loop_period_sec)

# one last check
if completion_condition():
volatile_status.set_value(py_trees.common.Status.SUCCESS)
else:
volatile_status.set_value(py_trees.common.Status.FAILURE)

logger.debug(f"Worker for node ({name}) completed.")
return work_wrapper
return action_worker_work_function_generator


class ActionNode(py_trees.behaviour.Behaviour):
def __init__(
self,
Expand Down
75 changes: 71 additions & 4 deletions beams/behavior_tree/ActionWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,81 @@
* LOGGER_QUEUE: instance of the logging queue
* worker_logging_configurer: utility functuon to register log queue with handler
"""
from multiprocessing import Event
from typing import Any, Callable, Optional
import logging
import time
from multiprocessing import Event, Value, Queue
from typing import Callable, Optional

from epics.multiproc import CAProcess

import py_trees

from beams.behavior_tree.VolatileStatus import VolatileStatus
from beams.logging import LOGGER_QUEUE, worker_logging_configurer
from beams.typing_helper import (ActionNodeWorkFunction, ActionNodeWorkLoop,
Evaluatable)
from beams.sequencer.helpers.Worker import Worker
from beams.sequencer.helpers.Timer import Timer

logger = logging.getLogger(__name__)


def wrapped_action_work(loop_period_sec: float = 0.1, work_function_timeout_period_sec: float = 2):
def action_worker_work_function_generator(func: ActionNodeWorkFunction) -> ActionNodeWorkLoop:
def work_wrapper(
do_work: Value,
name: str,
work_gate: Event,
volatile_status: VolatileStatus,
completion_condition: Evaluatable,
log_queue: Queue,
log_configurer: Callable) -> None:
"""
Wrap self.work_func, and set up logging / status communication
InterProcess Communication performed by shared memory objects:
- volatile status
- logging queue
Runs a persistent while loop, in which the work func is called repeatedly
"""
log_configurer(log_queue)
work_loop_timeout_timer = Timer(name=name,
timer_period_seconds=work_function_timeout_period_sec,
auto_start=False,
is_periodic=True)
while (do_work.value):
logger.debug(f"WAITING FOR INIT from node: {name}")
work_gate.wait()
work_gate.clear()

# Set to running
volatile_status.set_value(py_trees.common.Status.RUNNING)
# Start timer
work_loop_timeout_timer.start_timer()
while not completion_condition() and not work_loop_timeout_timer.is_elapsed():
logger.debug(f"CALLING CAGET FROM from node ({name})")
try:
status = func(completion_condition)
except Exception as ex:
volatile_status.set_value(py_trees.common.Status.FAILURE)
logger.error(f"Work function failed, setting node ({name}) "
f"as FAILED. ({ex})")
break

volatile_status.set_value(status)
logger.debug(f"Setting node ({name}): {volatile_status.get_value()}")
time.sleep(loop_period_sec)

# check if we exited loop because we timed out or we succeeded at task
if completion_condition():
logger.debug(f"Worker for node ({name}) completed.")
volatile_status.set_value(py_trees.common.Status.SUCCESS)
else:
logger.debug(f"Worker for node ({name}) failed.")
volatile_status.set_value(py_trees.common.Status.FAILURE)

return work_wrapper
return action_worker_work_function_generator


class ActionWorker(Worker):
Expand All @@ -27,8 +94,8 @@ def __init__(
proc_name: str,
volatile_status: VolatileStatus,
work_gate: Event,
work_func: Callable[[Any], None],
comp_cond: Callable[[Any], bool],
work_func: Callable[..., None],
comp_cond: Callable[..., bool],
stop_func: Optional[Callable[[None], None]] = None
):
super().__init__(
Expand Down
29 changes: 29 additions & 0 deletions beams/tests/test_leaf_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,35 @@ def comp_cond():
assert percentage_complete.value == 100


def test_action_node_timeout():
# For test
percentage_complete = Value("i", 0)

@wrapped_action_work(loop_period_sec=0.001, work_function_timeout_period_sec=.002)
def work_func(comp_condition: Callable) -> Status:
percentage_complete.value += 10
if comp_condition():
return Status.SUCCESS
logger.debug(f"pct complete -> {percentage_complete.value}")
return Status.RUNNING

def comp_cond():
return percentage_complete.value >= 100

action = ActionNode(name="action", work_func=work_func,
completion_condition=comp_cond)
action.setup()

while action.status not in (
Status.SUCCESS,
Status.FAILURE,
):
time.sleep(0.01)
action.tick_once()
assert action.status == Status.FAILURE
assert percentage_complete.value != 100


def test_condition_node():
def condition_fn():
return True
Expand Down

0 comments on commit c275289

Please sign in to comment.