Skip to content

Commit

Permalink
feat!: show-config is now a command, remove "run" as default, refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Samuel Anderson <[email protected]>
  • Loading branch information
AWS-Samuel committed Feb 16, 2024
1 parent 52c2684 commit a50b35f
Show file tree
Hide file tree
Showing 7 changed files with 375 additions and 296 deletions.
243 changes: 143 additions & 100 deletions src/openjd/adaptor_runtime/_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pathlib import Path
from argparse import ArgumentParser, Namespace
from types import FrameType as FrameType
from typing import TYPE_CHECKING, Any, Optional, Type, TypeVar
from typing import TYPE_CHECKING, Any, Optional, Type, TypeVar, NamedTuple, Tuple

import jsonschema
import yaml
Expand Down Expand Up @@ -49,9 +49,7 @@
"This can be a JSON string or the path to a file containing a JSON string in the format "
"file://path/to/file.json"
),
"show_config": (
"When specified, the adaptor runtime configuration is printed then the program exits."
),
"show_config": ("Prints the adaptor runtime configuration, then the program exits."),
"connection_file": "The file path to the connection file for use in background mode.",
}

Expand Down Expand Up @@ -82,6 +80,19 @@
_logger = logging.getLogger(__name__)


class _LogConfig(NamedTuple):
formatter: ConditionalFormatter
stream_handler: logging.StreamHandler
runtime_logger: logging.Logger
adaptor_logger: logging.Logger


class _IntegrationData(NamedTuple):
init_data: dict
run_data: dict
path_mapping_data: dict


class EntryPoint:
"""
The main entry point of the adaptor runtime.
Expand All @@ -93,13 +104,8 @@ def __init__(self, adaptor_class: Type[_U]) -> None:
# 'background' command
self._adaptor_runner: Optional[AdaptorRunner] = None

def start(self, reentry_exe: Optional[Path] = None) -> None:
"""
Starts the run of the adaptor.
Args:
reentry_exe (Path): The path to the binary executable that for adaptor reentry.
"""
def _init_loggers(self) -> _LogConfig:
"Creates runtime/adaptor loggers"
formatter = ConditionalFormatter(
"%(levelname)s: %(message)s", ignore_patterns=[_OPENJD_LOG_REGEX]
)
Expand All @@ -113,16 +119,10 @@ def start(self, reentry_exe: Optional[Path] = None) -> None:
adaptor_logger = logging.getLogger(self.adaptor_class.__module__.split(".")[0])
adaptor_logger.addHandler(stream_handler)

parsed_args = self._parse_args()

path_mapping_data = (
parsed_args.path_mapping_rules
if hasattr(parsed_args, "path_mapping_rules")
# TODO: Eliminate the use of the environment variable once all users of this library have
# been updated to use the command-line option. Default to an empty dictionary.
else _load_data(os.environ.get("PATH_MAPPING_RULES", "{}"))
)
return _LogConfig(formatter, stream_handler, runtime_logger, adaptor_logger)

def _init_config(self) -> None:
"""Initializes self.config_manager"""
additional_config_path = os.environ.get(_ENV_CONFIG_PATH_PREFIX)
self.config_manager = ConfigurationManager(
config_cls=RuntimeConfiguration,
Expand All @@ -144,103 +144,142 @@ def start(self, reentry_exe: Optional[Path] = None) -> None:
# is valid here.
self.config = self.config_manager.get_default_config()

if hasattr(parsed_args, "show_config") and parsed_args.show_config:
print(yaml.dump(self.config.config, indent=2))
return # pragma: no cover

init_data = parsed_args.init_data if hasattr(parsed_args, "init_data") else {}
run_data = parsed_args.run_data if hasattr(parsed_args, "run_data") else {}
command = (
parsed_args.command
if hasattr(parsed_args, "command") and parsed_args.command is not None
else "run"
def _get_integration_data(self, parsed_args: Namespace) -> _IntegrationData:
return _IntegrationData(
init_data=parsed_args.init_data if hasattr(parsed_args, "init_data") else {},
run_data=parsed_args.run_data if hasattr(parsed_args, "run_data") else {},
path_mapping_data=parsed_args.path_mapping_rules
if hasattr(parsed_args, "path_mapping_rules")
else {},
)

def start(self, reentry_exe: Optional[Path] = None) -> None:
"""
Starts the run of the adaptor.
Args:
reentry_exe (Path): The path to the binary executable that for adaptor reentry.
"""
log_config = self._init_loggers()
parser, parsed_args = self._parse_args()
self._init_config()
if not parsed_args.command:
parser.print_help()
parser.error("No command was provided.")
elif parsed_args.command == "show-config":
return print(yaml.dump(self.config.config, indent=2))

integration_data = self._get_integration_data(parsed_args)

adaptor: BaseAdaptor[AdaptorConfiguration] = self.adaptor_class(
init_data, path_mapping_data=path_mapping_data
integration_data.init_data, path_mapping_data=integration_data.path_mapping_data
)

adaptor_logger.setLevel(adaptor.config.log_level)
runtime_logger.setLevel(self.config.log_level)

if command == "run":
self._adaptor_runner = AdaptorRunner(adaptor=adaptor)
# To be able to handle cancelation via signals
signal.signal(signal.SIGINT, self._sigint_handler)
if OSName.is_posix(): # pragma: is-windows
signal.signal(signal.SIGTERM, self._sigint_handler)
else: # pragma: is-posix
signal.signal(signal.SIGBREAK, self._sigint_handler) # type: ignore[attr-defined]
log_config.adaptor_logger.setLevel(adaptor.config.log_level)
log_config.runtime_logger.setLevel(self.config.log_level)

if parsed_args.command == "run":
return self._handle_run(adaptor, integration_data)
elif parsed_args.command == "daemon": # pragma: no branch
return self._handle_daemon(
adaptor, parsed_args, log_config, integration_data, reentry_exe
)

def _handle_run(
self, adaptor: BaseAdaptor[AdaptorConfiguration], integration_data: _IntegrationData
):
self._adaptor_runner = AdaptorRunner(adaptor=adaptor)
# To be able to handle cancelation via signals
signal.signal(signal.SIGINT, self._sigint_handler)
if OSName.is_posix(): # pragma: is-windows
signal.signal(signal.SIGTERM, self._sigint_handler)
else: # pragma: is-posix
signal.signal(signal.SIGBREAK, self._sigint_handler) # type: ignore[attr-defined]
try:
self._adaptor_runner._start()
self._adaptor_runner._run(integration_data.run_data)
self._adaptor_runner._stop()
self._adaptor_runner._cleanup()
except Exception as e:
_logger.error(f"Error running the adaptor: {e}")
try:
self._adaptor_runner._start()
self._adaptor_runner._run(run_data)
self._adaptor_runner._stop()
self._adaptor_runner._cleanup()
except Exception as e:
_logger.error(f"Error running the adaptor: {e}")
try:
self._adaptor_runner._cleanup()
except Exception as e:
_logger.error(f"Error cleaning up the adaptor: {e}")
raise
_logger.error(f"Error cleaning up the adaptor: {e}")
raise
elif command == "daemon": # pragma: no branch
connection_file = parsed_args.connection_file
if not os.path.isabs(connection_file):
connection_file = os.path.abspath(connection_file)
subcommand = parsed_args.subcommand if hasattr(parsed_args, "subcommand") else None

if subcommand == "_serve":
# Replace stream handler with log buffer handler since output will be buffered in
# background mode
log_buffer = InMemoryLogBuffer(formatter=formatter)
buffer_handler = LogBufferHandler(log_buffer)
for logger in [runtime_logger, adaptor_logger]:
logger.removeHandler(stream_handler)
logger.addHandler(buffer_handler)

# This process is running in background mode. Create the backend server and serve
# forever until a shutdown is requested
backend = BackendRunner(
AdaptorRunner(adaptor=adaptor),
connection_file,
log_buffer=log_buffer,
raise

def _handle_daemon(
self,
adaptor: BaseAdaptor[AdaptorConfiguration],
parsed_args: Namespace,
log_config: _LogConfig,
integration_data: _IntegrationData,
reentry_exe: Optional[Path] = None,
):
connection_file = parsed_args.connection_file
if not os.path.isabs(connection_file):
connection_file = os.path.abspath(connection_file)
subcommand = parsed_args.subcommand if hasattr(parsed_args, "subcommand") else None

if subcommand == "_serve":
# Replace stream handler with log buffer handler since output will be buffered in
# background mode
log_buffer = InMemoryLogBuffer(formatter=log_config.formatter)
buffer_handler = LogBufferHandler(log_buffer)
for logger in [log_config.runtime_logger, log_config.adaptor_logger]:
logger.removeHandler(log_config.stream_handler)
logger.addHandler(buffer_handler)

# This process is running in background mode. Create the backend server and serve
# forever until a shutdown is requested
backend = BackendRunner(
AdaptorRunner(adaptor=adaptor),
connection_file,
log_buffer=log_buffer,
)
backend.run()
else:
# This process is running in frontend mode. Create the frontend runner and send
# the appropriate request to the backend.
frontend = FrontendRunner(connection_file)
if subcommand == "start":
adaptor_module = sys.modules.get(self.adaptor_class.__module__)
if adaptor_module is None:
raise ModuleNotFoundError(
f"Adaptor module is not loaded: {self.adaptor_class.__module__}"
)

frontend.init(
adaptor_module,
integration_data.init_data,
integration_data.path_mapping_data,
reentry_exe,
)
backend.run()
else:
# This process is running in frontend mode. Create the frontend runner and send
# the appropriate request to the backend.
frontend = FrontendRunner(connection_file)
if subcommand == "start":
adaptor_module = sys.modules.get(self.adaptor_class.__module__)
if adaptor_module is None:
raise ModuleNotFoundError(
f"Adaptor module is not loaded: {self.adaptor_class.__module__}"
)

frontend.init(adaptor_module, init_data, path_mapping_data, reentry_exe)
frontend.start()
elif subcommand == "run":
frontend.run(run_data)
elif subcommand == "stop":
frontend.stop()
frontend.shutdown()

def _parse_args(self) -> Namespace:
frontend.start()
elif subcommand == "run":
frontend.run(integration_data.run_data)
elif subcommand == "stop":
frontend.stop()
frontend.shutdown()

def _parse_args(self) -> Tuple[ArgumentParser, Namespace]:
parser = self._build_argparser()
try:
return parser.parse_args(sys.argv[1:])
return parser, parser.parse_args(sys.argv[1:])
except Exception as e:
_logger.error(f"Error parsing command line arguments: {e}")
raise

def _build_argparser(self) -> ArgumentParser:
parser = ArgumentParser(prog="adaptor_runtime", add_help=True)
parser.add_argument(
"--show-config", action="store_true", help=_CLI_HELP_TEXT["show_config"]
parser = ArgumentParser(
prog="adaptor_runtime",
add_help=True,
usage=f"{self.adaptor_class.__name__} <command> [arguments]",
)
subparser = parser.add_subparsers(dest="command", title="commands")

subparser = parser.add_subparsers(dest="command", title="subcommands")
subparser.add_parser("show-config", help=_CLI_HELP_TEXT["show_config"])

init_data = ArgumentParser(add_help=False)
init_data.add_argument(
Expand All @@ -260,7 +299,11 @@ def _build_argparser(self) -> ArgumentParser:
help=_CLI_HELP_TEXT["path_mapping_rules"],
)

subparser.add_parser("run", parents=[init_data, path_mapping_rules, run_data])
subparser.add_parser(
"run",
parents=[init_data, path_mapping_rules, run_data],
help="Run through the start, run, stop, cleanup adaptor states.",
)

connection_file = ArgumentParser(add_help=False)
connection_file.add_argument(
Expand All @@ -270,7 +313,7 @@ def _build_argparser(self) -> ArgumentParser:
required=True,
)

bg_parser = subparser.add_parser("daemon")
bg_parser = subparser.add_parser("daemon", help="Runs the adaptor in a daemon mode.")
bg_subparser = bg_parser.add_subparsers(
dest="subcommand",
title="subcommands",
Expand Down
2 changes: 1 addition & 1 deletion src/openjd/adaptor_runtime/adaptors/_adaptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def _start(self): # pragma: no cover

def _run(self, run_data: dict):
"""
:param run_data: This is the data that changes between the different SubTasks. Eg. frame
:param run_data: This is the data that changes between the different Tasks. Eg. frame
number.
"""
self.on_run(run_data)
Expand Down
12 changes: 3 additions & 9 deletions test/openjd/adaptor_runtime/integ/IntegCommandAdaptor/adaptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@


class IntegManagedProcess(ManagedProcess):
def __init__(self, run_data: dict) -> None:
super().__init__(run_data)

def get_executable(self) -> str:
if OSName.is_windows():
# In Windows, we cannot directly execute the powershell script.
Expand All @@ -24,24 +21,21 @@ def get_executable(self) -> str:
return os.path.abspath(os.path.join(os.path.sep, "bin", "echo"))

def get_arguments(self) -> List[str]:
return self.run_data.get("args", "")
return self.run_data.get("args", [""])


class IntegCommandAdaptor(CommandAdaptor):
def __init__(self, init_data: dict, path_mapping_data: dict):
super().__init__(init_data, path_mapping_data=path_mapping_data)

def get_managed_process(self, run_data: dict) -> ManagedProcess:
return IntegManagedProcess(run_data)

def on_prerun(self):
# Print only goes to stdout and is not captured in daemon mode.
print("prerun-print")
# Logging is captured in daemon mode.
logger.info(self.init_data.get("on_prerun", ""))
logger.info(str(self.init_data.get("on_prerun", "")))

def on_postrun(self):
# Print only goes to stdout and is not captured in daemon mode.
print("postrun-print")
# Logging is captured in daemon mode.
logger.info(self.init_data.get("on_postrun", ""))
logger.info(str(self.init_data.get("on_postrun", "")))
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ class PrintAdaptor(Adaptor):
Test implementation of an Adaptor.
"""

def __init__(self, init_data: dict):
super().__init__(init_data)

def on_run(self, run_data: dict):
# This run funciton will simply print the run_data.
self.update_status(progress=first_progress, status_message=first_status_message)
Expand Down Expand Up @@ -61,9 +58,6 @@ def test_start_end_cleanup(self, tmpdir, capsys) -> None:
"""

class FileAdaptor(Adaptor):
def __init__(self, init_data: dict):
super().__init__(init_data)

def on_start(self):
# Open a temp file
self.f = tmpdir.mkdir("test").join("hello.txt")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ def test_run(self, caplog):
"""Testing a success case for the managed process."""

class FakeManagedProcess(ManagedProcess):
def __init__(self, run_data: dict):
super(FakeManagedProcess, self).__init__(run_data)

def get_executable(self) -> str:
if OSName.is_windows():
return "powershell.exe"
Expand Down
Loading

0 comments on commit a50b35f

Please sign in to comment.