Skip to content

Commit

Permalink
Initial GDS plugin implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
LeStarch committed Mar 7, 2024
1 parent 7f3bb47 commit 7951908
Show file tree
Hide file tree
Showing 8 changed files with 365 additions and 15 deletions.
8 changes: 8 additions & 0 deletions src/fprime_gds/common/communication/framing.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import sys

from .checksum import calculate_checksum
from fprime_gds.plugin.definitions import gds_plugin_implementation


class FramerDeframer(abc.ABC):
Expand Down Expand Up @@ -192,6 +193,13 @@ def deframe(self, data, no_copy=False):
return None, data, discarded
return None, data, discarded

@classmethod
@gds_plugin_implementation
def register_framing_plugin(cls):
""" Register a bad plugin """
return cls



class TcpServerFramerDeframer(FramerDeframer):
"""
Expand Down
82 changes: 73 additions & 9 deletions src/fprime_gds/executables/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from fprime_gds.common.transport import ThreadedTCPSocketClient
from fprime_gds.common.utils.config_manager import ConfigManager
from fprime_gds.executables.utils import find_app, find_dict, get_artifacts_root
from fprime_gds.plugin.system import Plugins

# Optional import: ZeroMQ. Requires package: pyzmq
try:
Expand All @@ -45,7 +46,6 @@
except ImportError:
SerialAdapter = None


GUIS = ["none", "html"]


Expand Down Expand Up @@ -85,7 +85,10 @@ def get_parser(self) -> argparse.ArgumentParser:
Return:
argparse parser for supplied arguments
"""
parser = argparse.ArgumentParser(description=self.description, add_help=True)
parser = argparse.ArgumentParser(
description=self.description, add_help=True,
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
for flags, keywords in self.get_arguments().items():
parser.add_argument(*flags, **keywords)
return parser
Expand All @@ -96,7 +99,7 @@ def reproduce_cli_args(self, args_ns):
def flag_member(flags, argparse_inputs) -> Tuple[str, str]:
"""Get the best CLI flag and namespace member"""
best_flag = (
[flag for flag in flags if flag.startswith("--")] + list(flags)
[flag for flag in flags if flag.startswith("--")] + list(flags)
)[0]
member = argparse_inputs.get(
"dest", re.sub(r"^-+", "", best_flag).replace("-", "_")
Expand All @@ -117,7 +120,7 @@ def cli_arguments(flags, argparse_inputs) -> List[str]:

# Handle arguments
if (action == "store_true" and value) or (
action == "store_false" and not value
action == "store_false" and not value
):
return [best_flag]
elif action != "store" or value is None:
Expand Down Expand Up @@ -148,10 +151,10 @@ def handle_arguments(self, args, **kwargs):

@staticmethod
def parse_args(
parser_classes,
description="No tool description provided",
arguments=None,
**kwargs,
parser_classes,
description="No tool description provided",
arguments=None,
**kwargs,
):
"""Parse and post-process arguments
Expand Down Expand Up @@ -234,7 +237,7 @@ def handle_arguments(self, args, **kwargs):
raise Exception(msg)
# Works for the old structure where the bin, lib, and dict directories live immediately under the platform
elif len(child_directories) == 3 and set(
[path.name for path in child_directories]
[path.name for path in child_directories]
) == {"bin", "lib", "dict"}:
args.deployment = detected_toolchain
return args
Expand Down Expand Up @@ -805,3 +808,64 @@ def get_arguments(self) -> Dict[Tuple[str, ...], Dict[str, Any]]:

def handle_arguments(self, args, **kwargs):
return args


class PluginArgumentParser(ParserBase):
""" Parser for arguments coming from plugins """
DESCRIPTION = "Parse plugin CLI arguments and selections"

def __init__(self):
""" Initialize the plugin information for this parser """
self._plugin_map = {
category: {
self.get_selection_name(selection): selection for selection in Plugins.system().get_selections(category)
} for category in Plugins.system().get_categories()
}

def get_arguments(self) -> Dict[Tuple[str, ...], Dict[str, Any]]:
""" Return arguments to retrieve channels/events/commands in specific ways """

arguments: Dict[Tuple[str, ...], Dict[str, Any]] = {}
for category, selections in self._plugin_map.items():
arguments.update({
(f"--{category}-selection",): {
"choices": [choice for choice in selections.keys()],
"help": f"Select {category} implementer.",
"default": list(selections.keys())[0]
}
})
for selection_name, selection in selections.items():
arguments.update(self.get_selection_arguments(selection))
return arguments

def handle_arguments(self, args, **kwargs):
""" Handles the arguments """
for category, selections in self._plugin_map.items():
selection_string = getattr(args, f"{category}_selection")
selection_class = selections[selection_string]
filled_arguments = self.map_selection_arguments(args, selection_class)
selection_instance = selection_class(**filled_arguments)
setattr(args, f"{category}_selection_instance", selection_instance)
return args

@staticmethod
def get_selection_name(selection):
""" Get the name of a selection """
return selection.get_name() if hasattr(selection, "get_name") else selection.__name__

@staticmethod
def get_selection_arguments(selection) -> Dict[Tuple[str, ...], Dict[str, Any]]:
""" Get the name of a selection """
return selection.get_arguments() if hasattr(selection, "get_arguments") else {}

@staticmethod
def map_selection_arguments(args, selection) -> Dict[str, Any]:
""" Get the name of a selection """
expected_args = PluginArgumentParser.get_selection_arguments(selection)
argument_destinations = [
value["dest"] if "dest" in value else key[0].replace("--", "").replace("-", "_")
for key, value in expected_args.items()
]
return {
destination: getattr(args, destination) for destination in argument_destinations
}
10 changes: 5 additions & 5 deletions src/fprime_gds/executables/comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import fprime_gds.common.communication.ground
import fprime_gds.common.logger
import fprime_gds.executables.cli
from fprime_gds.common.communication.framing import FpFramerDeframer
from fprime_gds.common.communication.updown import Downlinker, Uplinker

# Uses non-standard PIP package pyserial, so test the waters before getting a hard-import crash
Expand All @@ -58,6 +57,7 @@ def main():
fprime_gds.executables.cli.LogDeployParser,
fprime_gds.executables.cli.MiddleWareParser,
fprime_gds.executables.cli.CommParser,
fprime_gds.executables.cli.PluginArgumentParser,
],
description="F prime communications layer.",
client=True,
Expand Down Expand Up @@ -85,11 +85,11 @@ def main():

# Set the framing class used and pass it to the uplink and downlink component constructions giving each a separate
# instantiation
framer_class = FpFramerDeframer
framer_instance = args.framing_selection_instance
LOGGER.info(
"Starting uplinker/downlinker connecting to FSW using %s with %s",
adapter,
framer_class.__name__,
args.framing_selection
)
discarded_file_handle = None
try:
Expand All @@ -108,9 +108,9 @@ def main():
discarded_file_handle_path,
)
downlinker = Downlinker(
adapter, ground, framer_class(), discarded=discarded_file_handle
adapter, ground, framer_instance, discarded=discarded_file_handle
)
uplinker = Uplinker(adapter, ground, framer_class(), downlinker)
uplinker = Uplinker(adapter, ground, framer_instance, downlinker)

# Open resources for the handlers on either side, this prepares the resources needed for reading/writing data
ground.open()
Expand Down
3 changes: 2 additions & 1 deletion src/fprime_gds/executables/run_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
GdsParser,
ParserBase,
StandardPipelineParser,
PluginArgumentParser
)
from fprime_gds.executables.utils import AppWrapperException, run_wrapped_application

Expand All @@ -27,7 +28,7 @@ def parse_args():
:return: parsed argument namespace
"""
# Get custom handlers for all executables we are running
arg_handlers = [StandardPipelineParser, GdsParser, BinaryDeployment, CommParser]
arg_handlers = [StandardPipelineParser, GdsParser, BinaryDeployment, CommParser, PluginArgumentParser]
# Parse the arguments, and refine through all handlers
args, parser = ParserBase.parse_args(arg_handlers, "Run F prime deployment and GDS")
return args
Expand Down
Empty file.
25 changes: 25 additions & 0 deletions src/fprime_gds/plugin/definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

import pluggy
from typing import Type

PROJECT_NAME = "fprime_gds"

gds_plugin_specification = pluggy.HookspecMarker(PROJECT_NAME)

Check failure on line 7 in src/fprime_gds/plugin/definitions.py

View workflow job for this annotation

GitHub Actions / Spell checking

`Hookspec` is not a recognized word. (unrecognized-spelling)
gds_plugin_implementation = pluggy.HookimplMarker(PROJECT_NAME)

Check failure on line 8 in src/fprime_gds/plugin/definitions.py

View workflow job for this annotation

GitHub Actions / Spell checking

`Hookimpl` is not a recognized word. (unrecognized-spelling)


@gds_plugin_specification
def register_framing_plugin() -> Type["FramerDeframer"]:
""" Register a plugin to provide framing capabilities
Plugin hook for registering a plugin that supplies a FramerDeframer implementation. Implementors of this hook must
return a non-abstract subclass of FramerDeframer. This class will be provided as a framing implementation option
that users may select via command line arguments.
Note: users should return the class, not an instance of the class. Needed arguments for instantiation are
determined from class methods, solicited via the command line, and provided at construction time to the chosen
instantiation.
Returns:
FramerDeframer subclass
"""
119 changes: 119 additions & 0 deletions src/fprime_gds/plugin/system.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import inspect
import logging
import re
from typing import Iterable, ForwardRef

Check notice

Code scanning / CodeQL

Unused import Note

Import of 'ForwardRef' is not used.

import pluggy


import fprime_gds.plugin.definitions as definitions
# For automatic validation of plugins, each plugin class type must be imported here
import fprime_gds.common.communication.framing as framing


PROJECT_NAME = definitions.PROJECT_NAME
LOGGER = logging.getLogger(__name__)

_NAME_REGEX = re.compile(r"^register_(\w+)_plugin")
_TYPE_MAPPINGS = {
definitions.register_framing_plugin: framing.FramerDeframer
}
_SUPPLIED_PLUGIN_MODULES_OR_CLASSES = [
framing.FpFramerDeframer
]


class PluginException(Exception):
pass


class InvalidCategoryException(PluginException):
pass


class Plugins(object):
""" GDS plugin system providing a plugin Singleton for use across the GDS """
_singleton = None

def __init__(self):
""" Initialize the plugin system """
self.manager = pluggy.PluginManager(PROJECT_NAME)
self.manager.add_hookspecs(definitions)

Check failure on line 41 in src/fprime_gds/plugin/system.py

View workflow job for this annotation

GitHub Actions / Spell checking

`hookspecs` is not a recognized word. (unrecognized-spelling)
self.manager.load_setuptools_entrypoints(PROJECT_NAME)
for module in _SUPPLIED_PLUGIN_MODULES_OR_CLASSES:
self.manager.register(module)

def get_selections(self, category) -> Iterable:
""" Get available plugin selections
Gets all plugin implementors of "category" by looking for register_<category>_plugin implementors. If such a
function does not exist then this results in an exception.
Args:
category: category of the plugin requested
"""
plugin_function_name = f"register_{category}_plugin"
if not hasattr(definitions, plugin_function_name) or not hasattr(self.manager.hook, plugin_function_name):
raise InvalidCategoryException(f"Invalid plugin category: {category}")
selections = getattr(self.manager.hook, plugin_function_name)()
return [selection for selection in selections if self.validate_selection(category, selection)]

def get_categories(self):
""" Get all plugin categories """
specifications = _TYPE_MAPPINGS.keys()
matches = [_NAME_REGEX.match(specification.__name__) for specification in specifications]
return [match.group(1) for match in matches if match]

def register_plugin(self, module_or_class):
""" Register a plugin directly
Allows local registration of plugin implementations that are shipped as part of the GDS package.
Args:
module_or_class: module or class that has plugin implementations
"""
self.manager.register(module_or_class)

@staticmethod
def validate_selection(category, result):
""" Validate the result of plugin hook
Validates the result of a plugin hook call to ensure the result meets the expected properties for plugins of the
given category. Primarily this ensures that this plugin returns a concrete subclass of the expected type.
Args:
category: category of plugin used
result: result from the plugin hook call
Return:
True when the plugin passes validation, False otherwise
"""
plugin_function_name = f"register_{category}_plugin"
assert hasattr(definitions, plugin_function_name), "Plugin category failed pre-validation"
# Typing library not intended for introspection at runtime, thus we maintain a map of plugin specification
# functions to the types expected as a return value. When this is not found, plugins may continue without
# automatic validation.
try:
plugin_specification_function = getattr(definitions, plugin_function_name)
expected = _TYPE_MAPPINGS[plugin_specification_function]

# Validate the result
if not issubclass(result, expected):
LOGGER.warning(f"{result.__name__} is not a subclass of {expected.__name__}. Not registering.")
return False
elif inspect.isabstract(result):

Check failure on line 103 in src/fprime_gds/plugin/system.py

View workflow job for this annotation

GitHub Actions / Spell checking

`isabstract` is not a recognized word. (unrecognized-spelling)
LOGGER.warning(f"{result.__name__} is an abstract class. Not registering.")
return False
except KeyError:
LOGGER.warning(f"Plugin not registered for validation. Continuing without validation.")
return True

@classmethod
def system(cls) -> "PluginSystem":
""" Construct singleton if needed then return it """
return cls._build_singleton()

@classmethod
def _build_singleton(cls) -> "PluginSystem":
""" Build a singleton for this class """
cls._singleton = cls._singleton if cls._singleton is not None else cls()
return cls._singleton
Loading

0 comments on commit 7951908

Please sign in to comment.