Skip to content

Commit

Permalink
feat: Implement the background mode in Windows.
Browse files Browse the repository at this point in the history
Signed-off-by: Hongli Chen <[email protected]>
  • Loading branch information
Honglichenn committed Nov 15, 2023
1 parent dd77eaa commit 262e134
Show file tree
Hide file tree
Showing 16 changed files with 560 additions and 52 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@ __pycache__/
/dist
_version.py
.vscode
.coverage
.idea/
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ requires-python = ">=3.9"
dependencies = [
"pyyaml ~= 6.0",
"jsonschema >= 4.17.0, == 4.*",
"pywin32 == 306; platform_system == 'Windows'",
]

[tool.hatch.build]
Expand Down
115 changes: 115 additions & 0 deletions src/openjd/adaptor_runtime/_background/backend_named_pipe_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

from __future__ import annotations
import logging

import threading
import time

from queue import Queue
from typing import List

from .named_pipe_request_handler import WinBackgroundResourceRequestHandler
from .server_config import NAMED_PIPE_BUFFER_SIZE, DEFAULT_NAMED_PIPE_TIMEOUT
from .server_response import AsyncFutureRunner
from .._osname import OSName

if OSName.is_windows():
import win32pipe
import win32file
import pywintypes
import winerror
from pywintypes import HANDLE

from ..adaptors import AdaptorRunner
from .log_buffers import LogBuffer


_logger = logging.getLogger(__name__)


class WinBackgroundNamedPipeServer:
"""
HTTP server for the background mode of the adaptor runtime communicating via Unix socket.
This UnixStreamServer subclass stores the stateful information of the adaptor backend.
"""

def __init__(
self,
pipe_name: str,
adaptor_runner: AdaptorRunner,
cancel_queue: Queue,
*,
log_buffer: LogBuffer | None = None,
) -> None: # pragma: no cover
self._adaptor_runner = adaptor_runner
self._cancel_queue = cancel_queue
self._future_runner = AsyncFutureRunner()
self._log_buffer = log_buffer
self._named_pipe_instances: List[HANDLE] = []
self._pipe_name = pipe_name
self._time_out = DEFAULT_NAMED_PIPE_TIMEOUT

def _create_pipe(self, pipe_name: str):
# Create the first instance of a specific named pipe
# OR Create a new instance of an existing named pipe.
pipe_handle = win32pipe.CreateNamedPipe(
pipe_name,
# A bi-directional pipe; both server and client processes can read from and write to the pipe.
# win32file.FILE_FLAG_OVERLAPPED is used for async communication.
win32pipe.PIPE_ACCESS_DUPLEX | win32file.FILE_FLAG_OVERLAPPED,
win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_READMODE_MESSAGE | win32pipe.PIPE_WAIT,
win32pipe.PIPE_UNLIMITED_INSTANCES,
NAMED_PIPE_BUFFER_SIZE, # nOutBufferSize
NAMED_PIPE_BUFFER_SIZE, # nInBufferSize
self._time_out,
None, # TODO: Add lpSecurityAttributes here to limit the access
)
if pipe_handle == win32file.INVALID_HANDLE_VALUE:
_logger.error("Failed to create named pipe instance.")
return None
return pipe_handle

def serve_forever(self):
# During shutdown, a `True` will be pushed to the `_cancel_queue` for ending this loop
_logger.info(f"Creating Named Pipe with name: {self._pipe_name}")

while self._cancel_queue.qsize() == 0:
pipe_handle = self._create_pipe(self._pipe_name)
self._named_pipe_instances.append(pipe_handle)
_logger.debug("Waiting for connection from the client...")

try:
win32pipe.ConnectNamedPipe(pipe_handle, None)
except pywintypes.error as e:
if e.winerror == winerror.ERROR_PIPE_NOT_CONNECTED:
_logger.info(
"NamedPipe Server is shutdown. Exit the main thread in the backend server."
)
break
request_handler = WinBackgroundResourceRequestHandler(self, pipe_handle)
threading.Thread(target=request_handler.instance_thread).start()

def shutdown(self):
self._cancel_queue.put(True)
# TODO: Need to find out a better way to wait for the communication finish
# After sending the shutdown command, we need to wait for the response
# from it before shutting down server or the client won't get the response.
time.sleep(1)
for pipe_handle in self._named_pipe_instances:
try:
win32pipe.DisconnectNamedPipe(pipe_handle)
win32file.CloseHandle(pipe_handle)
except pywintypes.error as e:
# If the communication is finished then handler may be closed
if e.args[0] == winerror.ERROR_INVALID_HANDLE:
pass
except Exception as e:
import traceback

_logger.error(
"Meeting an error during shutdown the NamedPipe Server:",
str(traceback.format_exc()),
)
raise e
99 changes: 74 additions & 25 deletions src/openjd/adaptor_runtime/_background/backend_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@
from queue import Queue
from threading import Thread
from types import FrameType
from typing import Optional
from typing import Optional, Union

from .._osname import OSName
from ..adaptors import AdaptorRunner
from .._http import SocketDirectories
from .._utils import secure_open
from .http_server import BackgroundHTTPServer

if OSName.is_posix():
from .http_server import BackgroundHTTPServer
if OSName.is_windows():
from .backend_named_pipe_server import WinBackgroundNamedPipeServer
from .log_buffers import LogBuffer
from .model import ConnectionSettings
from .model import DataclassJSONEncoder
Expand All @@ -37,7 +42,9 @@ def __init__(
self._adaptor_runner = adaptor_runner
self._connection_file_path = connection_file_path
self._log_buffer = log_buffer
self._http_server: Optional[BackgroundHTTPServer] = None
self._server: Optional[Union[BackgroundHTTPServer, WinBackgroundNamedPipeServer]] = None
# TODO: Signal handler needed to be checked in Windows
# The current plan is to use CTRL_BREAK.
signal.signal(signal.SIGINT, self._sigint_handler)
signal.signal(signal.SIGTERM, self._sigint_handler)

Expand All @@ -46,8 +53,11 @@ def _sigint_handler(self, signum: int, frame: Optional[FrameType]) -> None:
_logger.info("Interruption signal recieved.")
# OpenJD dictates that a SIGTERM/SIGINT results in a cancel workflow being
# kicked off.
if self._http_server is not None:
self._http_server.submit(self._adaptor_runner._cancel, force_immediate=True)
if isinstance(self._server, BackgroundHTTPServer):
if self._server is not None:
self._server.submit(self._adaptor_runner._cancel, force_immediate=True)
else:
raise NotImplementedError("Signal is not implemented in Windows.")

def run(self) -> None:
"""
Expand All @@ -57,32 +67,55 @@ def run(self) -> None:
that port to a connection file, and listens for HTTP requests until a shutdown is requested
"""
_logger.info("Running in background daemon mode.")

queue: Queue = Queue()

socket_path = SocketDirectories.for_os().get_process_socket_path("runtime", create_dir=True)
if OSName.is_posix():
server_path = SocketDirectories.for_os().get_process_socket_path(
"runtime", create_dir=True
)
else:
# TODO: Do a code refactoring to generate the namedpipe server path by using the SocketDirectories
# Need to check if the pipe name is used and the max length.
server_path = rf"\\.\pipe\AdaptorNamedPipe_{str(os.getpid())}"

try:
self._http_server = BackgroundHTTPServer(
socket_path,
self._adaptor_runner,
cancel_queue=queue,
log_buffer=self._log_buffer,
)
if OSName.is_windows():
self._server = WinBackgroundNamedPipeServer(
server_path,
self._adaptor_runner,
cancel_queue=queue,
log_buffer=self._log_buffer,
)
else:
self._server = BackgroundHTTPServer(
server_path,
self._adaptor_runner,
cancel_queue=queue,
log_buffer=self._log_buffer,
)
_logger.debug(f"Listening on {server_path}")
# This if condition is just for passing the typing checking.
if isinstance(self._server, BackgroundHTTPServer) or isinstance(
self._server, WinBackgroundNamedPipeServer
):
server_thread = Thread(
name="AdaptorRuntimeBackendServerThread", target=self._server.serve_forever
)
server_thread.start()
else:
raise RuntimeError(
f"Unknown Server Type {type(self._server)}. "
"Only support WinBackgroundNamedPipeServer or BackgroundHTTPServer"
)

except Exception as e:
_logger.error(f"Error starting in background mode: {e}")
raise

_logger.debug(f"Listening on {socket_path}")
http_thread = Thread(
name="AdaptorRuntimeBackendHttpThread", target=self._http_server.serve_forever
)
http_thread.start()

try:
with secure_open(self._connection_file_path, open_mode="w") as conn_file:
json.dump(
ConnectionSettings(socket_path),
ConnectionSettings(server_path),
conn_file,
cls=DataclassJSONEncoder,
)
Expand All @@ -96,16 +129,32 @@ def run(self) -> None:
queue.get()

# Shutdown the server
self._http_server.shutdown()
http_thread.join()
# This if condition is just for passing the typing checking.
if isinstance(self._server, BackgroundHTTPServer) or isinstance(
self._server, WinBackgroundNamedPipeServer
):
self._server.shutdown()
else:
raise RuntimeError(
f"Unknown Server Type {type(self._server)}. "
"Only support WinBackgroundNamedPipeServer or BackgroundHTTPServer"
)

server_thread.join()

# Cleanup the connection file and socket
for path in [self._connection_file_path, socket_path]:
# Cleanup the connection file and socket for Linux server.
# We don't need to call the `remove` for the NamedPipe server.
# NamedPipe servers are managed by Named Pipe File System it is not a regular file.
# Once all handles are closed, the system automatically cleans up the named pipe.
files_for_deletion = [self._connection_file_path]
if OSName.is_posix():
files_for_deletion.append(server_path)
for path in files_for_deletion:
try:
os.remove(path)
except FileNotFoundError: # pragma: no cover
pass # File is already cleaned up
except OSError as e: # pragma: no cover
_logger.warning(f"Failed to delete {path}: {e}")

_logger.info("HTTP server has shutdown.")
_logger.info("Background server has been shut down.")
Loading

0 comments on commit 262e134

Please sign in to comment.