Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement the background mode in Windows. #23

Merged
merged 2 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hatch.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pre-install-commands = [
[envs.default.scripts]
sync = "pip install -r requirements-testing.txt"
test = "pytest --cov-config pyproject.toml {args:test}"
test-windows = "pytest --cov-config pyproject.toml {args:test} --cov-fail-under=34"
test-windows = "pytest --cov-config pyproject.toml {args:test/openjd/adaptor_runtime/integ/background} --cov-fail-under=44"
typing = "mypy {args:src test}"
style = [
"ruff {args:.}",
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ requires-python = ">=3.9"
dependencies = [
"pyyaml ~= 6.0",
"jsonschema >= 4.17.0, == 4.*",
"pywin32 == 306; platform_system == 'Windows'",
]

[tool.hatch.build]
Expand Down Expand Up @@ -125,4 +126,4 @@ source = [

[tool.coverage.report]
show_missing = true
fail_under = 94
fail_under = 80
Honglichenn marked this conversation as resolved.
Show resolved Hide resolved
197 changes: 197 additions & 0 deletions src/openjd/adaptor_runtime/_background/backend_named_pipe_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

from __future__ import annotations
import logging

import threading
import time

from queue import Queue
from typing import List, Optional

from .named_pipe_request_handler import WinBackgroundResourceRequestHandler
from .server_config import NAMED_PIPE_BUFFER_SIZE, DEFAULT_NAMED_PIPE_TIMEOUT_MILLISECONDS
from .server_response import AsyncFutureRunner
from .._osname import OSName

import win32pipe
import win32file
import pywintypes
import winerror
import win32api
from pywintypes import HANDLE

from ..adaptors import AdaptorRunner
from .log_buffers import LogBuffer


_logger = logging.getLogger(__name__)


class MultipleErrors(Exception):
"""
Custom exception class to aggregate and handle multiple exceptions.

This class is used to collect a list of exceptions that occur during a process, allowing
them to be raised together as a single exception. This is particularly useful in scenarios
where multiple operations are performed in a loop, and each operation could potentially
raise an exception.
"""

def __init__(self, errors: List[Exception]):
"""
Initialize the MultipleErrors exception with a list of errors.

Args:
errors (List[Exception]): A list of exceptions that have been raised.
"""
self.errors = errors

def __str__(self) -> str:
"""
Return a string representation of all errors aggregated in this exception.

This method concatenates the string representations of each individual exception
in the `errors` list, separated by semicolons.

Returns:
str: A formatted string containing all the error messages.
"""
return "Multiple errors occurred: " + "; ".join(str(e) for e in self.errors)


class WinBackgroundNamedPipeServer:
"""
A class to manage a Windows Named Pipe Server in background mode for the adaptor runtime communication.

This class encapsulates stateful information of the adaptor backend and provides methods
for server initialization, operation, and shutdown.
"""

def __init__(
AWS-Samuel marked this conversation as resolved.
Show resolved Hide resolved
self,
pipe_name: str,
adaptor_runner: AdaptorRunner,
cancel_queue: Queue,
*,
log_buffer: LogBuffer | None = None,
): # pragma: no cover
"""
Args:
pipe_name (str): Name of the pipe for the NamedPipe Server.
adaptor_runner (AdaptorRunner): Adaptor runner instance for operation execution.
cancel_queue (Queue): Queue used for signaling server shutdown.
log_buffer (LogBuffer|None, optional): Buffer for logging activities, defaults to None.
"""
if not OSName.is_windows():
raise OSError(
"WinBackgroundNamedPipeServer can be only used on Windows Operating Systems. "
f"Current Operating System is {OSName._get_os_name()}"
)
self._adaptor_runner = adaptor_runner
self._cancel_queue = cancel_queue
self._future_runner = AsyncFutureRunner()
self._log_buffer = log_buffer
self._named_pipe_instances: List[HANDLE] = []
self._pipe_name = pipe_name
AWS-Samuel marked this conversation as resolved.
Show resolved Hide resolved
# TODO: Need to figure out how to set the itme out for NamedPipe.
# Unlike Linux Server, time out can only be set in the Server side instead of the client side.
self._time_out = DEFAULT_NAMED_PIPE_TIMEOUT_MILLISECONDS

def _create_pipe(self, pipe_name: str) -> Optional[HANDLE]:
"""
Creates a new instance of a named pipe or an additional instance if the pipe already exists.

Args:
pipe_name (str): Name of the pipe for which the instance is to be created.

Returns:
HANDLE: The handler for the created named pipe instance.

"""

pipe_handle = win32pipe.CreateNamedPipe(
pipe_name,
# A bi-directional pipe; both server and client processes can read from and write to the pipe.
# win32file.FILE_FLAG_OVERLAPPED is used for async communication.
win32pipe.PIPE_ACCESS_DUPLEX | win32file.FILE_FLAG_OVERLAPPED,
win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_READMODE_MESSAGE | win32pipe.PIPE_WAIT,
win32pipe.PIPE_UNLIMITED_INSTANCES,
NAMED_PIPE_BUFFER_SIZE, # nOutBufferSize
NAMED_PIPE_BUFFER_SIZE, # nInBufferSize
self._time_out,
None, # TODO: Add lpSecurityAttributes here to limit the access
)
if pipe_handle == win32file.INVALID_HANDLE_VALUE:
return None
return pipe_handle

def serve_forever(self) -> None:
"""
Runs the Named Pipe Server continuously until a shutdown signal is received.

This method listens to the NamedPipe Server and creates new instances of named pipes
and corresponding threads for handling client-server communication.
"""
_logger.info(f"Creating Named Pipe with name: {self._pipe_name}")
# During shutdown, a `True` will be pushed to the `_cancel_queue` for ending this loop
# TODO: Using threading.event instead of a queue to signal and termination
while self._cancel_queue.qsize() == 0:
pipe_handle = self._create_pipe(self._pipe_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to be creating a new pipe file for every request, or is there a way to reuse the same one? It seems weird to me to only be able to send a single request per pipe.

Also, you're not handling the pipe_handle == None case here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. For the reusing the NamedPipe instance, I will try to explain this. Short answer is there are not simple ways to do that. If we reuse the named pipe instance, this situation may happen, a run command is sent to the server followed by a stop command, the response for the run command may be read by the stop command. We are creating the NamedPipe instance here under the same NamedPipe server. The relationship between a NamedPipe server and a NamedPipe instance can be likened to the relationship between an HTTP server and an HTTP connection . And in the HTTP/2 connection, they have a technique called multiplexing (multiple requests and responses can be sent concurrently over a single connection). NamedPipe doesn't support this (like HTTP/1).. we can implement this by ourselves if we observe performance issues.

  2. Good catch, I will handle it in next version.

if pipe_handle is None:
error_msg = (
f"Failed to create named pipe instance: "
f"{win32api.FormatMessage(win32api.GetLastError())}"
)
_logger.error(error_msg)
raise RuntimeError(error_msg)
self._named_pipe_instances.append(pipe_handle)
Honglichenn marked this conversation as resolved.
Show resolved Hide resolved
_logger.debug("Waiting for connection from the client...")

try:
win32pipe.ConnectNamedPipe(pipe_handle, None)
except pywintypes.error as e:
if e.winerror == winerror.ERROR_PIPE_NOT_CONNECTED:
_logger.info(
"NamedPipe Server is shutdown. Exit the main thread in the backend server."
)
break
jericht marked this conversation as resolved.
Show resolved Hide resolved
else:
_logger.error(f"Error encountered while connecting to NamedPipe: {e} ")
request_handler = WinBackgroundResourceRequestHandler(self, pipe_handle)
threading.Thread(target=request_handler.instance_thread).start()

def shutdown(self) -> None:
"""
Shuts down the Named Pipe server and closes all named pipe handlers.

Signals the `serve_forever` method to stop listening to the NamedPipe Server by
pushing a `True` value into the `_cancel_queue`.
"""
self._cancel_queue.put(True)
# TODO: Need to find out a better way to wait for the communication finish
# After sending the shutdown command, we need to wait for the response
# from it before shutting down server or the client won't get the response.
time.sleep(1)
error_list: List[Exception] = []
while self._named_pipe_instances:
pipe_handle = self._named_pipe_instances.pop()
try:
win32pipe.DisconnectNamedPipe(pipe_handle)
win32file.CloseHandle(pipe_handle)
except pywintypes.error as e:
# If the communication is finished then handler may be closed
if e.args[0] == winerror.ERROR_INVALID_HANDLE:
pass
except Exception as e:
import traceback

_logger.error(
f"Encountered the following error "
f"while shutting down the WinBackgroundNamedPipeServer: {str(traceback.format_exc())}"
)
# Store any errors to raise after closing all pipe handles,
# allowing handling of multiple errors during shutdown.
error_list.append(e)
if error_list:
raise MultipleErrors(error_list)
90 changes: 63 additions & 27 deletions src/openjd/adaptor_runtime/_background/backend_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@
from queue import Queue
from threading import Thread
from types import FrameType
from typing import Optional
from typing import Optional, Union

from .._osname import OSName
from ..adaptors import AdaptorRunner
from .._http import SocketDirectories
from .._utils import secure_open
from .http_server import BackgroundHTTPServer

if OSName.is_posix():
from .http_server import BackgroundHTTPServer
if OSName.is_windows():
from .backend_named_pipe_server import WinBackgroundNamedPipeServer
from .log_buffers import LogBuffer
from .model import ConnectionSettings
from .model import DataclassJSONEncoder
Expand All @@ -37,17 +42,26 @@ def __init__(
self._adaptor_runner = adaptor_runner
self._connection_file_path = connection_file_path
self._log_buffer = log_buffer
self._http_server: Optional[BackgroundHTTPServer] = None
signal.signal(signal.SIGINT, self._sigint_handler)
signal.signal(signal.SIGTERM, self._sigint_handler)
self._server: Optional[Union[BackgroundHTTPServer, WinBackgroundNamedPipeServer]] = None
# TODO: Signal handler needed to be checked in Windows
# The current plan is to use CTRL_BREAK.
if OSName.is_posix():
signal.signal(signal.SIGINT, self._sigint_handler)
signal.signal(signal.SIGTERM, self._sigint_handler)

def _sigint_handler(self, signum: int, frame: Optional[FrameType]) -> None:
"""Signal handler that is invoked when the process receives a SIGINT/SIGTERM"""
_logger.info("Interruption signal recieved.")
# OpenJD dictates that a SIGTERM/SIGINT results in a cancel workflow being
# kicked off.
if self._http_server is not None:
self._http_server.submit(self._adaptor_runner._cancel, force_immediate=True)
# TODO: Do a code refactoring to move the `submit` to the `server_response`
if OSName.is_posix():
if self._server is not None:
self._server.submit( # type: ignore
self._adaptor_runner._cancel, force_immediate=True
)
else:
raise NotImplementedError("Signal is not implemented in Windows.")
AWS-Samuel marked this conversation as resolved.
Show resolved Hide resolved

def run(self) -> None:
"""
Expand All @@ -57,32 +71,47 @@ def run(self) -> None:
that port to a connection file, and listens for HTTP requests until a shutdown is requested
"""
_logger.info("Running in background daemon mode.")

queue: Queue = Queue()

socket_path = SocketDirectories.for_os().get_process_socket_path("runtime", create_dir=True)
if OSName.is_posix():
server_path = SocketDirectories.for_os().get_process_socket_path(
"runtime", create_dir=True
)
else:
# TODO: Do a code refactoring to generate the namedpipe server path by using the SocketDirectories
Honglichenn marked this conversation as resolved.
Show resolved Hide resolved
# Need to check if the pipe name is used and the max length.
server_path = rf"\\.\pipe\AdaptorNamedPipe_{str(os.getpid())}"

try:
self._http_server = BackgroundHTTPServer(
socket_path,
self._adaptor_runner,
cancel_queue=queue,
log_buffer=self._log_buffer,
if OSName.is_windows():
self._server = WinBackgroundNamedPipeServer(
server_path,
self._adaptor_runner,
cancel_queue=queue,
log_buffer=self._log_buffer,
)
else:
self._server = BackgroundHTTPServer(
server_path,
self._adaptor_runner,
cancel_queue=queue,
log_buffer=self._log_buffer,
)
Comment on lines +86 to +99
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to my comment in FrontendRunner, what do you think about applying the same thing here and using specialized classes for Windows vs Posix variants of the BackendRunner?

I think there's enough differences here to justify separate classes for each (e.g. Windows doesn't use signals, Windows uses Named Pipe Server vs HTTP server on Linux)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like a good idea, and this is a optimization task so I prefer doing that outside of this PR, because this will touch the code used for Linux and we need to modify some tests as well. We have more than engineer working on the Adaptor runtime, so we can parallelize these tasks.

_logger.debug(f"Listening on {server_path}")
server_thread = Thread(
name="AdaptorRuntimeBackendServerThread",
target=self._server.serve_forever, # type: ignore
)
server_thread.start()

except Exception as e:
_logger.error(f"Error starting in background mode: {e}")
raise

_logger.debug(f"Listening on {socket_path}")
http_thread = Thread(
name="AdaptorRuntimeBackendHttpThread", target=self._http_server.serve_forever
)
http_thread.start()

try:
with secure_open(self._connection_file_path, open_mode="w") as conn_file:
json.dump(
ConnectionSettings(socket_path),
ConnectionSettings(server_path),
conn_file,
cls=DataclassJSONEncoder,
)
Expand All @@ -96,16 +125,23 @@ def run(self) -> None:
queue.get()

# Shutdown the server
self._http_server.shutdown()
http_thread.join()

# Cleanup the connection file and socket
for path in [self._connection_file_path, socket_path]:
self._server.shutdown() # type: ignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this type: ignore even though both servers implement shutdown()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.. because the definition is self._server: Optional[Union[BackgroundHTTPServer, WinBackgroundNamedPipeServer]] = None. It is possible that this is None

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, we could add a if self._server is not None: here, I think we generally want to avoid using type: ignore whenever possible so that our type checking is stronger.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried and got this error:

src\openjd\adaptor_runtime\_background\backend_runner.py:128: error: "object" has no attribute "shutdown"
                    self._server.shutdown()

I think mypy cannot understand the inheritance, the shutdown for Linux is implemented in the BaseServer. And the relationship is BaseServer -> TCPServer -> UnixStreamServer -> BackgroundHTTPServer. In Windows, shutdown is implemented in the WinBackgroundNamedPipeServerdirectly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah seems like it, thanks for investigating!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can fix this by extracting the common functions of our server implementations to an interface. Right now, the WinBackgroundNamedPipeServer class implicitly implements a similar interface as our BackgroundHTTPServer class so that it's usable from here.

I suggest we make an explicit common interface between the two and use that here instead. For now, it'll only need two methods:

import abc

class BackgroundServer(abc.ABC):
    @abc.abstractmethod
    def server_forever(self): pass

    @abc.abstractmethod
    def shutdown(self): pass

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this won't work either... BaseServer is not defined by us it is a built-in class which is defined by python. If we insist on avoiding using the # type: ignore here, I prefer doing this outside of this PR and already created a ticket for it.


server_thread.join()

# Cleanup the connection file and socket for Linux server.
# We don't need to call the `remove` for the NamedPipe server.
# NamedPipe servers are managed by Named Pipe File System it is not a regular file.
# Once all handles are closed, the system automatically cleans up the named pipe.
files_for_deletion = [self._connection_file_path]
if OSName.is_posix():
files_for_deletion.append(server_path)
for path in files_for_deletion:
try:
os.remove(path)
except FileNotFoundError: # pragma: no cover
pass # File is already cleaned up
except OSError as e: # pragma: no cover
_logger.warning(f"Failed to delete {path}: {e}")

_logger.info("HTTP server has shutdown.")
_logger.info("Background server has been shut down.")
Loading