Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: show-config is now a command, slight refactor #74

Merged
merged 1 commit into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 143 additions & 100 deletions src/openjd/adaptor_runtime/_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pathlib import Path
from argparse import ArgumentParser, Namespace
from types import FrameType as FrameType
from typing import TYPE_CHECKING, Any, Optional, Type, TypeVar
from typing import TYPE_CHECKING, Any, Optional, Type, TypeVar, NamedTuple, Tuple

import jsonschema
import yaml
Expand Down Expand Up @@ -49,9 +49,7 @@
"This can be a JSON string or the path to a file containing a JSON string in the format "
"file://path/to/file.json"
),
"show_config": (
"When specified, the adaptor runtime configuration is printed then the program exits."
),
"show_config": ("Prints the adaptor runtime configuration, then the program exits."),
"connection_file": "The file path to the connection file for use in background mode.",
}

Expand Down Expand Up @@ -82,6 +80,19 @@
_logger = logging.getLogger(__name__)


class _LogConfig(NamedTuple):
formatter: ConditionalFormatter
stream_handler: logging.StreamHandler
runtime_logger: logging.Logger
adaptor_logger: logging.Logger


class _IntegrationData(NamedTuple):
init_data: dict
run_data: dict
path_mapping_data: dict


class EntryPoint:
"""
The main entry point of the adaptor runtime.
Expand All @@ -93,13 +104,8 @@ def __init__(self, adaptor_class: Type[_U]) -> None:
# 'background' command
self._adaptor_runner: Optional[AdaptorRunner] = None

def start(self, reentry_exe: Optional[Path] = None) -> None:
"""
Starts the run of the adaptor.

Args:
reentry_exe (Path): The path to the binary executable that for adaptor reentry.
"""
def _init_loggers(self) -> _LogConfig:
"Creates runtime/adaptor loggers"
formatter = ConditionalFormatter(
"%(levelname)s: %(message)s", ignore_patterns=[_OPENJD_LOG_REGEX]
)
Expand All @@ -113,16 +119,10 @@ def start(self, reentry_exe: Optional[Path] = None) -> None:
adaptor_logger = logging.getLogger(self.adaptor_class.__module__.split(".")[0])
adaptor_logger.addHandler(stream_handler)

parsed_args = self._parse_args()

path_mapping_data = (
parsed_args.path_mapping_rules
if hasattr(parsed_args, "path_mapping_rules")
# TODO: Eliminate the use of the environment variable once all users of this library have
# been updated to use the command-line option. Default to an empty dictionary.
else _load_data(os.environ.get("PATH_MAPPING_RULES", "{}"))
)
return _LogConfig(formatter, stream_handler, runtime_logger, adaptor_logger)

def _init_config(self) -> None:
"""Initializes self.config_manager"""
additional_config_path = os.environ.get(_ENV_CONFIG_PATH_PREFIX)
self.config_manager = ConfigurationManager(
config_cls=RuntimeConfiguration,
Expand All @@ -144,103 +144,142 @@ def start(self, reentry_exe: Optional[Path] = None) -> None:
# is valid here.
self.config = self.config_manager.get_default_config()

if hasattr(parsed_args, "show_config") and parsed_args.show_config:
print(yaml.dump(self.config.config, indent=2))
return # pragma: no cover

init_data = parsed_args.init_data if hasattr(parsed_args, "init_data") else {}
run_data = parsed_args.run_data if hasattr(parsed_args, "run_data") else {}
command = (
parsed_args.command
if hasattr(parsed_args, "command") and parsed_args.command is not None
else "run"
def _get_integration_data(self, parsed_args: Namespace) -> _IntegrationData:
return _IntegrationData(
init_data=parsed_args.init_data if hasattr(parsed_args, "init_data") else {},
run_data=parsed_args.run_data if hasattr(parsed_args, "run_data") else {},
path_mapping_data=parsed_args.path_mapping_rules
if hasattr(parsed_args, "path_mapping_rules")
else {},
)

def start(self, reentry_exe: Optional[Path] = None) -> None:
"""
Starts the run of the adaptor.

Args:
reentry_exe (Path): The path to the binary executable that for adaptor reentry.
"""
log_config = self._init_loggers()
parser, parsed_args = self._parse_args()
self._init_config()
if not parsed_args.command:
parser.print_help()
parser.error("No command was provided.")
elif parsed_args.command == "show-config":
return print(yaml.dump(self.config.config, indent=2))

integration_data = self._get_integration_data(parsed_args)

adaptor: BaseAdaptor[AdaptorConfiguration] = self.adaptor_class(
init_data, path_mapping_data=path_mapping_data
integration_data.init_data, path_mapping_data=integration_data.path_mapping_data
)

adaptor_logger.setLevel(adaptor.config.log_level)
runtime_logger.setLevel(self.config.log_level)

if command == "run":
self._adaptor_runner = AdaptorRunner(adaptor=adaptor)
# To be able to handle cancelation via signals
signal.signal(signal.SIGINT, self._sigint_handler)
if OSName.is_posix(): # pragma: is-windows
signal.signal(signal.SIGTERM, self._sigint_handler)
else: # pragma: is-posix
signal.signal(signal.SIGBREAK, self._sigint_handler) # type: ignore[attr-defined]
log_config.adaptor_logger.setLevel(adaptor.config.log_level)
log_config.runtime_logger.setLevel(self.config.log_level)

if parsed_args.command == "run":
return self._handle_run(adaptor, integration_data)
elif parsed_args.command == "daemon": # pragma: no branch
return self._handle_daemon(
adaptor, parsed_args, log_config, integration_data, reentry_exe
)

def _handle_run(
self, adaptor: BaseAdaptor[AdaptorConfiguration], integration_data: _IntegrationData
):
self._adaptor_runner = AdaptorRunner(adaptor=adaptor)
# To be able to handle cancelation via signals
signal.signal(signal.SIGINT, self._sigint_handler)
if OSName.is_posix(): # pragma: is-windows
signal.signal(signal.SIGTERM, self._sigint_handler)
else: # pragma: is-posix
signal.signal(signal.SIGBREAK, self._sigint_handler) # type: ignore[attr-defined]
try:
self._adaptor_runner._start()
self._adaptor_runner._run(integration_data.run_data)
self._adaptor_runner._stop()
self._adaptor_runner._cleanup()
except Exception as e:
_logger.error(f"Error running the adaptor: {e}")
try:
self._adaptor_runner._start()
self._adaptor_runner._run(run_data)
self._adaptor_runner._stop()
self._adaptor_runner._cleanup()
except Exception as e:
_logger.error(f"Error running the adaptor: {e}")
try:
self._adaptor_runner._cleanup()
except Exception as e:
_logger.error(f"Error cleaning up the adaptor: {e}")
raise
_logger.error(f"Error cleaning up the adaptor: {e}")
raise
elif command == "daemon": # pragma: no branch
connection_file = parsed_args.connection_file
if not os.path.isabs(connection_file):
connection_file = os.path.abspath(connection_file)
subcommand = parsed_args.subcommand if hasattr(parsed_args, "subcommand") else None

if subcommand == "_serve":
# Replace stream handler with log buffer handler since output will be buffered in
# background mode
log_buffer = InMemoryLogBuffer(formatter=formatter)
buffer_handler = LogBufferHandler(log_buffer)
for logger in [runtime_logger, adaptor_logger]:
logger.removeHandler(stream_handler)
logger.addHandler(buffer_handler)

# This process is running in background mode. Create the backend server and serve
# forever until a shutdown is requested
backend = BackendRunner(
AdaptorRunner(adaptor=adaptor),
connection_file,
log_buffer=log_buffer,
raise

def _handle_daemon(
self,
adaptor: BaseAdaptor[AdaptorConfiguration],
parsed_args: Namespace,
log_config: _LogConfig,
integration_data: _IntegrationData,
reentry_exe: Optional[Path] = None,
):
connection_file = parsed_args.connection_file
if not os.path.isabs(connection_file):
connection_file = os.path.abspath(connection_file)
subcommand = parsed_args.subcommand if hasattr(parsed_args, "subcommand") else None

if subcommand == "_serve":
# Replace stream handler with log buffer handler since output will be buffered in
# background mode
log_buffer = InMemoryLogBuffer(formatter=log_config.formatter)
buffer_handler = LogBufferHandler(log_buffer)
for logger in [log_config.runtime_logger, log_config.adaptor_logger]:
logger.removeHandler(log_config.stream_handler)
logger.addHandler(buffer_handler)

# This process is running in background mode. Create the backend server and serve
# forever until a shutdown is requested
backend = BackendRunner(
AdaptorRunner(adaptor=adaptor),
connection_file,
log_buffer=log_buffer,
)
backend.run()
else:
# This process is running in frontend mode. Create the frontend runner and send
# the appropriate request to the backend.
frontend = FrontendRunner(connection_file)
if subcommand == "start":
adaptor_module = sys.modules.get(self.adaptor_class.__module__)
if adaptor_module is None:
raise ModuleNotFoundError(
f"Adaptor module is not loaded: {self.adaptor_class.__module__}"
)

frontend.init(
adaptor_module,
integration_data.init_data,
integration_data.path_mapping_data,
reentry_exe,
)
backend.run()
else:
# This process is running in frontend mode. Create the frontend runner and send
# the appropriate request to the backend.
frontend = FrontendRunner(connection_file)
if subcommand == "start":
adaptor_module = sys.modules.get(self.adaptor_class.__module__)
if adaptor_module is None:
raise ModuleNotFoundError(
f"Adaptor module is not loaded: {self.adaptor_class.__module__}"
)

frontend.init(adaptor_module, init_data, path_mapping_data, reentry_exe)
frontend.start()
elif subcommand == "run":
frontend.run(run_data)
elif subcommand == "stop":
frontend.stop()
frontend.shutdown()

def _parse_args(self) -> Namespace:
frontend.start()
elif subcommand == "run":
frontend.run(integration_data.run_data)
elif subcommand == "stop":
frontend.stop()
frontend.shutdown()

def _parse_args(self) -> Tuple[ArgumentParser, Namespace]:
parser = self._build_argparser()
try:
return parser.parse_args(sys.argv[1:])
return parser, parser.parse_args(sys.argv[1:])
except Exception as e:
_logger.error(f"Error parsing command line arguments: {e}")
raise

def _build_argparser(self) -> ArgumentParser:
parser = ArgumentParser(prog="adaptor_runtime", add_help=True)
parser.add_argument(
"--show-config", action="store_true", help=_CLI_HELP_TEXT["show_config"]
parser = ArgumentParser(
prog="adaptor_runtime",
add_help=True,
usage=f"{self.adaptor_class.__name__} <command> [arguments]",
)
subparser = parser.add_subparsers(dest="command", title="commands")

subparser = parser.add_subparsers(dest="command", title="subcommands")
subparser.add_parser("show-config", help=_CLI_HELP_TEXT["show_config"])

init_data = ArgumentParser(add_help=False)
init_data.add_argument(
Expand All @@ -260,7 +299,11 @@ def _build_argparser(self) -> ArgumentParser:
help=_CLI_HELP_TEXT["path_mapping_rules"],
)

subparser.add_parser("run", parents=[init_data, path_mapping_rules, run_data])
subparser.add_parser(
"run",
parents=[init_data, path_mapping_rules, run_data],
help="Run through the start, run, stop, cleanup adaptor states.",
)

connection_file = ArgumentParser(add_help=False)
connection_file.add_argument(
Expand All @@ -270,7 +313,7 @@ def _build_argparser(self) -> ArgumentParser:
required=True,
)

bg_parser = subparser.add_parser("daemon")
bg_parser = subparser.add_parser("daemon", help="Runs the adaptor in a daemon mode.")
bg_subparser = bg_parser.add_subparsers(
dest="subcommand",
title="subcommands",
Expand Down
2 changes: 1 addition & 1 deletion src/openjd/adaptor_runtime/adaptors/_adaptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def _start(self): # pragma: no cover

def _run(self, run_data: dict):
"""
:param run_data: This is the data that changes between the different SubTasks. Eg. frame
:param run_data: This is the data that changes between the different Tasks. Eg. frame
number.
"""
self.on_run(run_data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@


class IntegManagedProcess(ManagedProcess):
def __init__(self, run_data: dict) -> None:
super().__init__(run_data)

def get_executable(self) -> str:
if OSName.is_windows():
# In Windows, we cannot directly execute the powershell script.
Expand All @@ -24,24 +21,21 @@ def get_executable(self) -> str:
return os.path.abspath(os.path.join(os.path.sep, "bin", "echo"))

def get_arguments(self) -> List[str]:
return self.run_data.get("args", "")
return self.run_data.get("args", [""])


class IntegCommandAdaptor(CommandAdaptor):
def __init__(self, init_data: dict, path_mapping_data: dict):
super().__init__(init_data, path_mapping_data=path_mapping_data)

def get_managed_process(self, run_data: dict) -> ManagedProcess:
return IntegManagedProcess(run_data)

def on_prerun(self):
# Print only goes to stdout and is not captured in daemon mode.
print("prerun-print")
# Logging is captured in daemon mode.
logger.info(self.init_data.get("on_prerun", ""))
logger.info(str(self.init_data.get("on_prerun", "")))

def on_postrun(self):
# Print only goes to stdout and is not captured in daemon mode.
print("postrun-print")
# Logging is captured in daemon mode.
logger.info(self.init_data.get("on_postrun", ""))
logger.info(str(self.init_data.get("on_postrun", "")))
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ class PrintAdaptor(Adaptor):
Test implementation of an Adaptor.
"""

def __init__(self, init_data: dict):
super().__init__(init_data)

def on_run(self, run_data: dict):
# This run funciton will simply print the run_data.
self.update_status(progress=first_progress, status_message=first_status_message)
Expand Down Expand Up @@ -61,9 +58,6 @@ def test_start_end_cleanup(self, tmpdir, capsys) -> None:
"""

class FileAdaptor(Adaptor):
def __init__(self, init_data: dict):
super().__init__(init_data)

def on_start(self):
# Open a temp file
self.f = tmpdir.mkdir("test").join("hello.txt")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ def test_run(self, caplog):
"""Testing a success case for the managed process."""

class FakeManagedProcess(ManagedProcess):
def __init__(self, run_data: dict):
super(FakeManagedProcess, self).__init__(run_data)

def get_executable(self) -> str:
if OSName.is_windows():
return "powershell.exe"
Expand Down
Loading