Skip to content

Commit

Permalink
feat: Implement the background mode in Windows. (#23)
Browse files Browse the repository at this point in the history
Signed-off-by: Hongli Chen <[email protected]>
  • Loading branch information
Honglichenn authored Nov 28, 2023
1 parent 7177744 commit ac80ce7
Show file tree
Hide file tree
Showing 17 changed files with 702 additions and 65 deletions.
2 changes: 1 addition & 1 deletion hatch.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pre-install-commands = [
[envs.default.scripts]
sync = "pip install -r requirements-testing.txt"
test = "pytest --cov-config pyproject.toml {args:test}"
test-windows = "pytest --cov-config pyproject.toml {args:test} --cov-fail-under=34"
test-windows = "pytest --cov-config pyproject.toml {args:test/openjd/adaptor_runtime/integ/background} --cov-fail-under=44"
typing = "mypy {args:src test}"
style = [
"ruff {args:.}",
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ requires-python = ">=3.9"
dependencies = [
"pyyaml ~= 6.0",
"jsonschema >= 4.17.0, == 4.*",
"pywin32 == 306; platform_system == 'Windows'",
]

[tool.hatch.build]
Expand Down Expand Up @@ -125,4 +126,4 @@ source = [

[tool.coverage.report]
show_missing = true
fail_under = 94
fail_under = 80
197 changes: 197 additions & 0 deletions src/openjd/adaptor_runtime/_background/backend_named_pipe_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

from __future__ import annotations
import logging

import threading
import time

from queue import Queue
from typing import List, Optional

from .named_pipe_request_handler import WinBackgroundResourceRequestHandler
from .server_config import NAMED_PIPE_BUFFER_SIZE, DEFAULT_NAMED_PIPE_TIMEOUT_MILLISECONDS
from .server_response import AsyncFutureRunner
from .._osname import OSName

import win32pipe
import win32file
import pywintypes
import winerror
import win32api
from pywintypes import HANDLE

from ..adaptors import AdaptorRunner
from .log_buffers import LogBuffer


_logger = logging.getLogger(__name__)


class MultipleErrors(Exception):
"""
Custom exception class to aggregate and handle multiple exceptions.
This class is used to collect a list of exceptions that occur during a process, allowing
them to be raised together as a single exception. This is particularly useful in scenarios
where multiple operations are performed in a loop, and each operation could potentially
raise an exception.
"""

def __init__(self, errors: List[Exception]):
"""
Initialize the MultipleErrors exception with a list of errors.
Args:
errors (List[Exception]): A list of exceptions that have been raised.
"""
self.errors = errors

def __str__(self) -> str:
"""
Return a string representation of all errors aggregated in this exception.
This method concatenates the string representations of each individual exception
in the `errors` list, separated by semicolons.
Returns:
str: A formatted string containing all the error messages.
"""
return "Multiple errors occurred: " + "; ".join(str(e) for e in self.errors)


class WinBackgroundNamedPipeServer:
"""
A class to manage a Windows Named Pipe Server in background mode for the adaptor runtime communication.
This class encapsulates stateful information of the adaptor backend and provides methods
for server initialization, operation, and shutdown.
"""

def __init__(
self,
pipe_name: str,
adaptor_runner: AdaptorRunner,
cancel_queue: Queue,
*,
log_buffer: LogBuffer | None = None,
): # pragma: no cover
"""
Args:
pipe_name (str): Name of the pipe for the NamedPipe Server.
adaptor_runner (AdaptorRunner): Adaptor runner instance for operation execution.
cancel_queue (Queue): Queue used for signaling server shutdown.
log_buffer (LogBuffer|None, optional): Buffer for logging activities, defaults to None.
"""
if not OSName.is_windows():
raise OSError(
"WinBackgroundNamedPipeServer can be only used on Windows Operating Systems. "
f"Current Operating System is {OSName._get_os_name()}"
)
self._adaptor_runner = adaptor_runner
self._cancel_queue = cancel_queue
self._future_runner = AsyncFutureRunner()
self._log_buffer = log_buffer
self._named_pipe_instances: List[HANDLE] = []
self._pipe_name = pipe_name
# TODO: Need to figure out how to set the itme out for NamedPipe.
# Unlike Linux Server, time out can only be set in the Server side instead of the client side.
self._time_out = DEFAULT_NAMED_PIPE_TIMEOUT_MILLISECONDS

def _create_pipe(self, pipe_name: str) -> Optional[HANDLE]:
"""
Creates a new instance of a named pipe or an additional instance if the pipe already exists.
Args:
pipe_name (str): Name of the pipe for which the instance is to be created.
Returns:
HANDLE: The handler for the created named pipe instance.
"""

pipe_handle = win32pipe.CreateNamedPipe(
pipe_name,
# A bi-directional pipe; both server and client processes can read from and write to the pipe.
# win32file.FILE_FLAG_OVERLAPPED is used for async communication.
win32pipe.PIPE_ACCESS_DUPLEX | win32file.FILE_FLAG_OVERLAPPED,
win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_READMODE_MESSAGE | win32pipe.PIPE_WAIT,
win32pipe.PIPE_UNLIMITED_INSTANCES,
NAMED_PIPE_BUFFER_SIZE, # nOutBufferSize
NAMED_PIPE_BUFFER_SIZE, # nInBufferSize
self._time_out,
None, # TODO: Add lpSecurityAttributes here to limit the access
)
if pipe_handle == win32file.INVALID_HANDLE_VALUE:
return None
return pipe_handle

def serve_forever(self) -> None:
"""
Runs the Named Pipe Server continuously until a shutdown signal is received.
This method listens to the NamedPipe Server and creates new instances of named pipes
and corresponding threads for handling client-server communication.
"""
_logger.info(f"Creating Named Pipe with name: {self._pipe_name}")
# During shutdown, a `True` will be pushed to the `_cancel_queue` for ending this loop
# TODO: Using threading.event instead of a queue to signal and termination
while self._cancel_queue.qsize() == 0:
pipe_handle = self._create_pipe(self._pipe_name)
if pipe_handle is None:
error_msg = (
f"Failed to create named pipe instance: "
f"{win32api.FormatMessage(win32api.GetLastError())}"
)
_logger.error(error_msg)
raise RuntimeError(error_msg)
self._named_pipe_instances.append(pipe_handle)
_logger.debug("Waiting for connection from the client...")

try:
win32pipe.ConnectNamedPipe(pipe_handle, None)
except pywintypes.error as e:
if e.winerror == winerror.ERROR_PIPE_NOT_CONNECTED:
_logger.info(
"NamedPipe Server is shutdown. Exit the main thread in the backend server."
)
break
else:
_logger.error(f"Error encountered while connecting to NamedPipe: {e} ")
request_handler = WinBackgroundResourceRequestHandler(self, pipe_handle)
threading.Thread(target=request_handler.instance_thread).start()

def shutdown(self) -> None:
"""
Shuts down the Named Pipe server and closes all named pipe handlers.
Signals the `serve_forever` method to stop listening to the NamedPipe Server by
pushing a `True` value into the `_cancel_queue`.
"""
self._cancel_queue.put(True)
# TODO: Need to find out a better way to wait for the communication finish
# After sending the shutdown command, we need to wait for the response
# from it before shutting down server or the client won't get the response.
time.sleep(1)
error_list: List[Exception] = []
while self._named_pipe_instances:
pipe_handle = self._named_pipe_instances.pop()
try:
win32pipe.DisconnectNamedPipe(pipe_handle)
win32file.CloseHandle(pipe_handle)
except pywintypes.error as e:
# If the communication is finished then handler may be closed
if e.args[0] == winerror.ERROR_INVALID_HANDLE:
pass
except Exception as e:
import traceback

_logger.error(
f"Encountered the following error "
f"while shutting down the WinBackgroundNamedPipeServer: {str(traceback.format_exc())}"
)
# Store any errors to raise after closing all pipe handles,
# allowing handling of multiple errors during shutdown.
error_list.append(e)
if error_list:
raise MultipleErrors(error_list)
90 changes: 63 additions & 27 deletions src/openjd/adaptor_runtime/_background/backend_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@
from queue import Queue
from threading import Thread
from types import FrameType
from typing import Optional
from typing import Optional, Union

from .._osname import OSName
from ..adaptors import AdaptorRunner
from .._http import SocketDirectories
from .._utils import secure_open
from .http_server import BackgroundHTTPServer

if OSName.is_posix():
from .http_server import BackgroundHTTPServer
if OSName.is_windows():
from .backend_named_pipe_server import WinBackgroundNamedPipeServer
from .log_buffers import LogBuffer
from .model import ConnectionSettings
from .model import DataclassJSONEncoder
Expand All @@ -37,17 +42,26 @@ def __init__(
self._adaptor_runner = adaptor_runner
self._connection_file_path = connection_file_path
self._log_buffer = log_buffer
self._http_server: Optional[BackgroundHTTPServer] = None
signal.signal(signal.SIGINT, self._sigint_handler)
signal.signal(signal.SIGTERM, self._sigint_handler)
self._server: Optional[Union[BackgroundHTTPServer, WinBackgroundNamedPipeServer]] = None
# TODO: Signal handler needed to be checked in Windows
# The current plan is to use CTRL_BREAK.
if OSName.is_posix():
signal.signal(signal.SIGINT, self._sigint_handler)
signal.signal(signal.SIGTERM, self._sigint_handler)

def _sigint_handler(self, signum: int, frame: Optional[FrameType]) -> None:
"""Signal handler that is invoked when the process receives a SIGINT/SIGTERM"""
_logger.info("Interruption signal recieved.")
# OpenJD dictates that a SIGTERM/SIGINT results in a cancel workflow being
# kicked off.
if self._http_server is not None:
self._http_server.submit(self._adaptor_runner._cancel, force_immediate=True)
# TODO: Do a code refactoring to move the `submit` to the `server_response`
if OSName.is_posix():
if self._server is not None:
self._server.submit( # type: ignore
self._adaptor_runner._cancel, force_immediate=True
)
else:
raise NotImplementedError("Signal is not implemented in Windows.")

def run(self) -> None:
"""
Expand All @@ -57,32 +71,47 @@ def run(self) -> None:
that port to a connection file, and listens for HTTP requests until a shutdown is requested
"""
_logger.info("Running in background daemon mode.")

queue: Queue = Queue()

socket_path = SocketDirectories.for_os().get_process_socket_path("runtime", create_dir=True)
if OSName.is_posix():
server_path = SocketDirectories.for_os().get_process_socket_path(
"runtime", create_dir=True
)
else:
# TODO: Do a code refactoring to generate the namedpipe server path by using the SocketDirectories
# Need to check if the pipe name is used and the max length.
server_path = rf"\\.\pipe\AdaptorNamedPipe_{str(os.getpid())}"

try:
self._http_server = BackgroundHTTPServer(
socket_path,
self._adaptor_runner,
cancel_queue=queue,
log_buffer=self._log_buffer,
if OSName.is_windows():
self._server = WinBackgroundNamedPipeServer(
server_path,
self._adaptor_runner,
cancel_queue=queue,
log_buffer=self._log_buffer,
)
else:
self._server = BackgroundHTTPServer(
server_path,
self._adaptor_runner,
cancel_queue=queue,
log_buffer=self._log_buffer,
)
_logger.debug(f"Listening on {server_path}")
server_thread = Thread(
name="AdaptorRuntimeBackendServerThread",
target=self._server.serve_forever, # type: ignore
)
server_thread.start()

except Exception as e:
_logger.error(f"Error starting in background mode: {e}")
raise

_logger.debug(f"Listening on {socket_path}")
http_thread = Thread(
name="AdaptorRuntimeBackendHttpThread", target=self._http_server.serve_forever
)
http_thread.start()

try:
with secure_open(self._connection_file_path, open_mode="w") as conn_file:
json.dump(
ConnectionSettings(socket_path),
ConnectionSettings(server_path),
conn_file,
cls=DataclassJSONEncoder,
)
Expand All @@ -96,16 +125,23 @@ def run(self) -> None:
queue.get()

# Shutdown the server
self._http_server.shutdown()
http_thread.join()

# Cleanup the connection file and socket
for path in [self._connection_file_path, socket_path]:
self._server.shutdown() # type: ignore

server_thread.join()

# Cleanup the connection file and socket for Linux server.
# We don't need to call the `remove` for the NamedPipe server.
# NamedPipe servers are managed by Named Pipe File System it is not a regular file.
# Once all handles are closed, the system automatically cleans up the named pipe.
files_for_deletion = [self._connection_file_path]
if OSName.is_posix():
files_for_deletion.append(server_path)
for path in files_for_deletion:
try:
os.remove(path)
except FileNotFoundError: # pragma: no cover
pass # File is already cleaned up
except OSError as e: # pragma: no cover
_logger.warning(f"Failed to delete {path}: {e}")

_logger.info("HTTP server has shutdown.")
_logger.info("Background server has been shut down.")
Loading

0 comments on commit ac80ce7

Please sign in to comment.