-
Notifications
You must be signed in to change notification settings - Fork 2
/
ActionNode.py
96 lines (80 loc) · 3.58 KB
/
ActionNode.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import atexit
import os
from multiprocessing import Event, Lock
from typing import Callable, Any, Optional, Union
import py_trees
from epics.multiproc import CAProcess
from beams.behavior_tree.ActionWorker import ActionWorker
from beams.behavior_tree.VolatileStatus import VolatileStatus
class ActionNode(py_trees.behaviour.Behaviour):
def __init__(
self,
name: str,
work_func: Callable[[Any], None],
completion_condition: Callable[[Any], bool],
**kwargs,
): # TODO: can add failure condition argument...
super().__init__(name)
# Interprocess signalling mechanisms
self.__volatile_status__ = VolatileStatus(self.status) # Shares BT Status by wrapping Value
self.work_gate = Event() # Mechanism to avoid busy wait / signal node has been py_trees "initialized"
self.worker = ActionWorker(proc_name=name,
volatile_status=self.__volatile_status__,
wait_for_tick=self.work_gate,
work_func=work_func,
comp_cond=completion_condition,
stop_func=None
) # TODO: some standard notion of stop function could be valuable
self.logger.debug("%s.__init__()" % (self.__class__.__name__))
def setup(self, **kwargs: int) -> None:
"""Kickstart the separate process this behaviour will work with.
Ordinarily this process will be already running. In this case,
setup is usually just responsible for verifying it exists.
"""
self.logger.debug(
"%s.setup()->connections to an external process" % (self.__class__.__name__)
)
# Having this in setup means the workthread should always be running.
self.worker.start_work()
atexit.register(
self.worker.stop_work
) # TODO(josh): make sure this cleans up resources when it dies
def initialise(self) -> None:
"""
Initialise configures and resets the behaviour ready for (repeated) execution
"""
self.logger.debug(f"Initliazing {self.name}...")
self.__volatile_status__.set_value(py_trees.common.Status.RUNNING)
self.work_gate.set()
def update(self) -> py_trees.common.Status:
"""Increment the counter, monitor and decide on a new status."""
self.logger.debug(
f"Getting tick on {self.name}. "
f"Status: {self.__volatile_status__.get_value()}"
)
# This does the interprocess communcication between this thread which is
# getting ticked and the work_proc thread which is doing work
new_status = self.__volatile_status__.get_value()
if new_status == py_trees.common.Status.SUCCESS:
self.feedback_message = "Processing finished"
self.logger.debug(
"%s.update()[%s->%s][%s]"
% (
self.__class__.__name__,
self.status,
new_status,
self.feedback_message,
)
)
# TODO: should clear even here so work thread can go back to wait state
# self.work_gate.clear()
return new_status
def terminate(self, new_status: py_trees.common.Status) -> None:
"""Nothing to clean up."""
self.logger.debug(
py_trees.console.red
+ "%s.terminate()[%s->%s]"
% (self.__class__.__name__, self.status, new_status)
)
if __name__ == "__main__":
pass