diff --git a/.gitignore b/.gitignore index 77e7941..146f023 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,5 @@ __pycache__/ /dist _version.py .vscode +.coverage +.idea/ \ No newline at end of file diff --git a/hatch.toml b/hatch.toml index 5741ed6..433ff5c 100644 --- a/hatch.toml +++ b/hatch.toml @@ -5,8 +5,8 @@ pre-install-commands = [ [envs.default.scripts] sync = "pip install -r requirements-testing.txt" -test = "pytest --cov-config pyproject.toml {args:test}" -test-windows = "pytest --cov-config pyproject.toml {args:test} --cov-fail-under=34" +test = "pytest --cov-config pyproject.toml {args:test} -vv" +test-windows = "pytest --cov-config pyproject.toml {args:test} --cov-fail-under=34 -vv" typing = "mypy {args:src test}" style = [ "ruff {args:.}", diff --git a/pyproject.toml b/pyproject.toml index df66f29..99bec80 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,7 @@ requires-python = ">=3.9" dependencies = [ "pyyaml ~= 6.0", "jsonschema >= 4.17.0, == 4.*", + "pywin32 == 306; platform_system == 'Windows'", ] [tool.hatch.build] diff --git a/src/openjd/adaptor_runtime/_background/backend_named_pipe_server.py b/src/openjd/adaptor_runtime/_background/backend_named_pipe_server.py new file mode 100644 index 0000000..6b12f19 --- /dev/null +++ b/src/openjd/adaptor_runtime/_background/backend_named_pipe_server.py @@ -0,0 +1,115 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +from __future__ import annotations +import logging + +import threading +import time + +from queue import Queue +from typing import List + +from .named_pipe_request_handler import WinBackgroundResourceRequestHandler +from .server_config import NAMED_PIPE_BUFFER_SIZE, DEFAULT_NAMED_PIPE_TIMEOUT +from .server_response import AsyncFutureRunner +from .._osname import OSName + +if OSName.is_windows(): + import win32pipe + import win32file + import pywintypes + import winerror + from pywintypes import HANDLE + +from ..adaptors import AdaptorRunner +from .log_buffers import LogBuffer + + +_logger = logging.getLogger(__name__) + + +class WinBackgroundNamedPipeServer: + """ + HTTP server for the background mode of the adaptor runtime communicating via Unix socket. + + This UnixStreamServer subclass stores the stateful information of the adaptor backend. + """ + + def __init__( + self, + pipe_name: str, + adaptor_runner: AdaptorRunner, + cancel_queue: Queue, + *, + log_buffer: LogBuffer | None = None, + ) -> None: # pragma: no cover + self._adaptor_runner = adaptor_runner + self._cancel_queue = cancel_queue + self._future_runner = AsyncFutureRunner() + self._log_buffer = log_buffer + self._named_pipe_instances: List[HANDLE] = [] + self._pipe_name = pipe_name + self._time_out = DEFAULT_NAMED_PIPE_TIMEOUT + + def _create_pipe(self, pipe_name: str): + # Create the first instance of a specific named pipe + # OR Create a new instance of an existing named pipe. + pipe_handle = win32pipe.CreateNamedPipe( + pipe_name, + # A bi-directional pipe; both server and client processes can read from and write to the pipe. + # win32file.FILE_FLAG_OVERLAPPED is used for async communication. + win32pipe.PIPE_ACCESS_DUPLEX | win32file.FILE_FLAG_OVERLAPPED, + win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_READMODE_MESSAGE | win32pipe.PIPE_WAIT, + win32pipe.PIPE_UNLIMITED_INSTANCES, + NAMED_PIPE_BUFFER_SIZE, # nOutBufferSize + NAMED_PIPE_BUFFER_SIZE, # nInBufferSize + self._time_out, + None, # TODO: Add lpSecurityAttributes here to limit the access + ) + if pipe_handle == win32file.INVALID_HANDLE_VALUE: + _logger.error("Failed to create named pipe instance.") + return None + return pipe_handle + + def serve_forever(self): + # During shutdown, a `True` will be pushed to the `_cancel_queue` for ending this loop + _logger.info(f"Creating Named Pipe with name: {self._pipe_name}") + + while self._cancel_queue.qsize() == 0: + pipe_handle = self._create_pipe(self._pipe_name) + self._named_pipe_instances.append(pipe_handle) + _logger.debug("Waiting for connection from the client...") + + try: + win32pipe.ConnectNamedPipe(pipe_handle, None) + except pywintypes.error as e: + if e.winerror == winerror.ERROR_PIPE_NOT_CONNECTED: + _logger.info( + "NamedPipe Server is shutdown. Exit the main thread in the backend server." + ) + break + request_handler = WinBackgroundResourceRequestHandler(self, pipe_handle) + threading.Thread(target=request_handler.instance_thread).start() + + def shutdown(self): + self._cancel_queue.put(True) + # TODO: Need to find out a better way to wait for the communication finish + # After sending the shutdown command, we need to wait for the response + # from it before shutting down server or the client won't get the response. + time.sleep(1) + for pipe_handle in self._named_pipe_instances: + try: + win32pipe.DisconnectNamedPipe(pipe_handle) + win32file.CloseHandle(pipe_handle) + except pywintypes.error as e: + # If the communication is finished then handler may be closed + if e.args[0] == winerror.ERROR_INVALID_HANDLE: + pass + except Exception as e: + import traceback + + _logger.error( + "Meeting an error during shutdown the NamedPipe Server:", + str(traceback.format_exc()), + ) + raise e diff --git a/src/openjd/adaptor_runtime/_background/backend_runner.py b/src/openjd/adaptor_runtime/_background/backend_runner.py index 9fd837a..f8eb45e 100644 --- a/src/openjd/adaptor_runtime/_background/backend_runner.py +++ b/src/openjd/adaptor_runtime/_background/backend_runner.py @@ -9,12 +9,17 @@ from queue import Queue from threading import Thread from types import FrameType -from typing import Optional +from typing import Optional, Union +from .._osname import OSName from ..adaptors import AdaptorRunner from .._http import SocketDirectories from .._utils import secure_open -from .http_server import BackgroundHTTPServer + +if OSName.is_posix(): + from .http_server import BackgroundHTTPServer +if OSName.is_windows(): + from .backend_named_pipe_server import WinBackgroundNamedPipeServer from .log_buffers import LogBuffer from .model import ConnectionSettings from .model import DataclassJSONEncoder @@ -37,7 +42,9 @@ def __init__( self._adaptor_runner = adaptor_runner self._connection_file_path = connection_file_path self._log_buffer = log_buffer - self._http_server: Optional[BackgroundHTTPServer] = None + self._server: Optional[Union[BackgroundHTTPServer, WinBackgroundNamedPipeServer]] = None + # TODO: Signal handler needed to be checked in Windows + # The current plan is to use CTRL_BREAK. signal.signal(signal.SIGINT, self._sigint_handler) signal.signal(signal.SIGTERM, self._sigint_handler) @@ -46,8 +53,13 @@ def _sigint_handler(self, signum: int, frame: Optional[FrameType]) -> None: _logger.info("Interruption signal recieved.") # OpenJD dictates that a SIGTERM/SIGINT results in a cancel workflow being # kicked off. - if self._http_server is not None: - self._http_server.submit(self._adaptor_runner._cancel, force_immediate=True) + if OSName.is_posix(): + if self._server is not None: + self._server.submit( + self._adaptor_runner._cancel, force_immediate=True + ) # type : ignore + else: + raise NotImplementedError("Signal is not implemented in Windows.") def run(self) -> None: """ @@ -57,32 +69,47 @@ def run(self) -> None: that port to a connection file, and listens for HTTP requests until a shutdown is requested """ _logger.info("Running in background daemon mode.") - queue: Queue = Queue() - socket_path = SocketDirectories.for_os().get_process_socket_path("runtime", create_dir=True) + if OSName.is_posix(): + server_path = SocketDirectories.for_os().get_process_socket_path( + "runtime", create_dir=True + ) + else: + # TODO: Do a code refactoring to generate the namedpipe server path by using the SocketDirectories + # Need to check if the pipe name is used and the max length. + server_path = rf"\\.\pipe\AdaptorNamedPipe_{str(os.getpid())}" try: - self._http_server = BackgroundHTTPServer( - socket_path, - self._adaptor_runner, - cancel_queue=queue, - log_buffer=self._log_buffer, + if OSName.is_windows(): + self._server = WinBackgroundNamedPipeServer( + server_path, + self._adaptor_runner, + cancel_queue=queue, + log_buffer=self._log_buffer, + ) + else: + self._server = BackgroundHTTPServer( + server_path, + self._adaptor_runner, + cancel_queue=queue, + log_buffer=self._log_buffer, + ) + _logger.debug(f"Listening on {server_path}") + server_thread = Thread( + name="AdaptorRuntimeBackendServerThread", + target=self._server.serve_forever, # type : ignore ) + server_thread.start() + except Exception as e: _logger.error(f"Error starting in background mode: {e}") raise - _logger.debug(f"Listening on {socket_path}") - http_thread = Thread( - name="AdaptorRuntimeBackendHttpThread", target=self._http_server.serve_forever - ) - http_thread.start() - try: with secure_open(self._connection_file_path, open_mode="w") as conn_file: json.dump( - ConnectionSettings(socket_path), + ConnectionSettings(server_path), conn_file, cls=DataclassJSONEncoder, ) @@ -96,11 +123,18 @@ def run(self) -> None: queue.get() # Shutdown the server - self._http_server.shutdown() - http_thread.join() - - # Cleanup the connection file and socket - for path in [self._connection_file_path, socket_path]: + self._server.shutdown() # type : ignore + + server_thread.join() + + # Cleanup the connection file and socket for Linux server. + # We don't need to call the `remove` for the NamedPipe server. + # NamedPipe servers are managed by Named Pipe File System it is not a regular file. + # Once all handles are closed, the system automatically cleans up the named pipe. + files_for_deletion = [self._connection_file_path] + if OSName.is_posix(): + files_for_deletion.append(server_path) + for path in files_for_deletion: try: os.remove(path) except FileNotFoundError: # pragma: no cover @@ -108,4 +142,4 @@ def run(self) -> None: except OSError as e: # pragma: no cover _logger.warning(f"Failed to delete {path}: {e}") - _logger.info("HTTP server has shutdown.") + _logger.info("Background server has been shut down.") diff --git a/src/openjd/adaptor_runtime/_background/frontend_runner.py b/src/openjd/adaptor_runtime/_background/frontend_runner.py index 29e273e..3acd52f 100644 --- a/src/openjd/adaptor_runtime/_background/frontend_runner.py +++ b/src/openjd/adaptor_runtime/_background/frontend_runner.py @@ -15,8 +15,9 @@ from threading import Event from types import FrameType from types import ModuleType -from typing import Optional +from typing import Optional, Dict +from .._osname import OSName from ..process._logging import _ADAPTOR_OUTPUT_LEVEL from .model import ( AdaptorState, @@ -28,6 +29,13 @@ HeartbeatResponse, ) +if OSName.is_windows(): + import win32file + import win32pipe + import pywintypes + import winerror + from openjd.adaptor_runtime._background.named_pipe_helper import NamedPipeHelper + _logger = logging.getLogger(__name__) @@ -50,6 +58,8 @@ def __init__( heartbeat_interval (float, optional): Interval between heartbeats, in seconds. Defaults to 1. """ + # TODO: Need to figure out how to set up the timeout for the Windows NamedPipe Server + # For Namedpipe, we can only set a timeout on the server side not on the client side. self._timeout_s = timeout_s self._heartbeat_interval = heartbeat_interval self._connection_file_path = connection_file_path @@ -166,8 +176,10 @@ def _heartbeat(self, ack_id: str | None = None) -> HeartbeatResponse: """ params: dict[str, str] | None = {"ack_id": ack_id} if ack_id else None response = self._send_request("GET", "/heartbeat", params=params) - - return DataclassMapper(HeartbeatResponse).map(json.load(response.fp)) + if OSName.is_posix(): + return DataclassMapper(HeartbeatResponse).map(json.load(response.fp)) # type: ignore + else: + return DataclassMapper(HeartbeatResponse).map(json.loads(response["body"])) # type: ignore def _heartbeat_until_state_complete(self, state: AdaptorState) -> None: """ @@ -220,6 +232,29 @@ def _send_request( *, params: dict | None = None, json_body: dict | None = None, + ) -> http_client.HTTPResponse | Dict: + if OSName.is_windows(): + return self._send_windows_request( + method, + path, + params=params if params else None, + json_body=json_body if json_body else None, + ) + else: + return self._send_linux_request( + method, + path, + params=params if params else None, + json_body=json_body if json_body else None, + ) + + def _send_linux_request( + self, + method: str, + path: str, + *, + params: dict | None = None, + json_body: dict | None = None, ) -> http_client.HTTPResponse: conn = UnixHTTPConnection(self.connection_settings.socket, timeout=self._timeout_s) @@ -245,6 +280,64 @@ def _send_request( return response + def _send_windows_request( + self, + method: str, + path: str, + *, + params: dict | None = None, + json_body: dict | None = None, + ) -> Dict: + start_time = time.time() + # Wait for the server pipe to become available. + handle = None + while handle is None: + try: + handle = win32file.CreateFile( + self.connection_settings.socket, # pipe name + win32file.GENERIC_READ + | win32file.GENERIC_WRITE, # Give the read / write permission + 0, # Disable the sharing Mode + None, # TODO: Need to set the security descriptor. Right now, None means default security + win32file.OPEN_EXISTING, # Open existing pipe + 0, # No Additional flags + None, # A valid handle to a template file, This parameter is ignored when opening an existing pipe. + ) + except pywintypes.error as e: + # NamedPipe server may be not ready, + # or no additional resource to create new instance and need to wait for previous connection release + if e.args[0] in [winerror.ERROR_FILE_NOT_FOUND, winerror.ERROR_PIPE_BUSY]: + duration = time.time() - start_time + time.sleep(0.1) + # Check timeout limit + if duration > self._timeout_s: + _logger.error( + f"NamedPipe Server readiness timeout. Duration: {duration} seconds, " + f"Timeout limit: {self._timeout_s} seconds." + ) + raise e + continue + _logger.error(f"Could not open pipe: {e}") + raise e + + # The pipe connected; change to message-read mode. It preserves the boundaries of the messages. + win32pipe.SetNamedPipeHandleState(handle, win32pipe.PIPE_READMODE_MESSAGE, None, None) + + # Send a message to the server. + message_dict = { + "method": method, + "body": json.dumps(json_body), + "path": path, + } + if params: + message_dict["params"] = json.dumps(params) + message = json.dumps(message_dict) + NamedPipeHelper.write_to_pipe(handle, message) + _logger.debug(f"Message sent from frontend process: {message}") + result = NamedPipeHelper.read_from_pipe(handle) + handle.close() + return json.loads(result) + @property def connection_settings(self) -> ConnectionSettings: """ diff --git a/src/openjd/adaptor_runtime/_background/named_pipe_helper.py b/src/openjd/adaptor_runtime/_background/named_pipe_helper.py new file mode 100644 index 0000000..5cf784a --- /dev/null +++ b/src/openjd/adaptor_runtime/_background/named_pipe_helper.py @@ -0,0 +1,102 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +import logging + +from openjd.adaptor_runtime._osname import OSName + +if OSName.is_windows(): + import win32file + import pywintypes + import winerror + from pywintypes import HANDLE + +from openjd.adaptor_runtime._background.server_config import NAMED_PIPE_BUFFER_SIZE + +_logger = logging.getLogger(__name__) + + +class PipeDisconnectedException(Exception): + """ + Exception raised when a Named Pipe is either broken or not connected. + + Attributes: + message (str): Explanation of the error. + error_code (int): The specific Windows error code associated with the pipe issue. + """ + + def __init__(self, message: str, error_code: int): + self.message = message + self.error_code = error_code + super().__init__(f"{message} (Error code: {error_code})") + + def __str__(self): + return f"{self.message} (Error code: {self.error_code})" + + +class NamedPipeHelper: + """ + Helper class for reading from and writing to Named Pipes in Windows. + + This class provides static methods to interact with Named Pipes, + facilitating data transmission between the server and the client. + """ + + @staticmethod + def read_from_pipe(handle: HANDLE) -> str: # type: ignore + """ + Reads data from a Named Pipe. + + Args: + handle (HANDLE): The handle to the Named Pipe. + + Returns: + str: The data read from the Named Pipe. + """ + data_parts = [] + while True: + try: + return_code, data = win32file.ReadFile(handle, NAMED_PIPE_BUFFER_SIZE) + data_parts.append(data.decode("utf-8")) + if return_code == winerror.ERROR_MORE_DATA: + continue + elif return_code == winerror.NO_ERROR: + return "".join(data_parts) + else: + raise IOError( + f"Got error when reading from the Named Pipe with error code: {return_code}" + ) + # Server maybe shutdown during reading. + except pywintypes.error as e: + if e.winerror in [ + winerror.ERROR_BROKEN_PIPE, + winerror.ERROR_PIPE_NOT_CONNECTED, + winerror.ERROR_INVALID_HANDLE, + ]: + raise PipeDisconnectedException( + "Client disconnected or pipe is not available", e.winerror + ) + raise + + @staticmethod + def write_to_pipe(handle: HANDLE, message: str) -> None: # type: ignore + """ + Writes data to a Named Pipe. + + Args: + handle (HANDLE): The handle to the Named Pipe. + message (str): The message to write to the Named Pipe. + + """ + try: + win32file.WriteFile(handle, message.encode("utf-8")) + # Server maybe shutdown during writing. + except pywintypes.error as e: + if e.winerror in [ + winerror.ERROR_BROKEN_PIPE, + winerror.ERROR_PIPE_NOT_CONNECTED, + winerror.ERROR_INVALID_HANDLE, + ]: + raise PipeDisconnectedException( + "Client disconnected or pipe is not available", e.winerror + ) + raise diff --git a/src/openjd/adaptor_runtime/_background/named_pipe_request_handler.py b/src/openjd/adaptor_runtime/_background/named_pipe_request_handler.py new file mode 100644 index 0000000..765bb9f --- /dev/null +++ b/src/openjd/adaptor_runtime/_background/named_pipe_request_handler.py @@ -0,0 +1,100 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +import json + +from openjd.adaptor_runtime._background.named_pipe_helper import ( + NamedPipeHelper, + PipeDisconnectedException, +) +from openjd.adaptor_runtime._background.server_response import ServerResponseGenerator +import win32pipe +import win32file +from http import HTTPStatus +import logging +import traceback + +_logger = logging.getLogger(__name__) + + +class WinBackgroundResourceRequestHandler: + def __init__(self, server, pipe_handle): + self.server = server + self.pipe_handle = pipe_handle + + def instance_thread(self): + _logger.debug("An instance thread is created to handle communication.") + while True: + try: + request_data = NamedPipeHelper.read_from_pipe(self.pipe_handle) + _logger.debug(f"Got following request from client: {request_data}") + self.handle_request(request_data) + except PipeDisconnectedException as e: + # Server is closed + _logger.debug(str(e)) + break + except Exception: + error_message = traceback.format_exc() + _logger.error( + f"Having an error during reading from the namedpipe: {error_message}." + ) + # Try to send back the error message + try: + self.send_response(HTTPStatus.INTERNAL_SERVER_ERROR, error_message) + except Exception: + _logger.error( + f"Having an error during sending back from the error message: {traceback.format_exc()}." + ) + try: + win32pipe.DisconnectNamedPipe(self.pipe_handle) + win32file.CloseHandle(self.pipe_handle) + except Exception: + _logger.error( + f"Having an error during close the namedpipe handler: {traceback.format_exc()}" + ) + _logger.debug("The instance thread existed.") + + def send_response(self, status: HTTPStatus, body: str = ""): + response = {"status": status, "body": body} + _logger.debug(f"NamedPipe Server: Send Response: {response}") + NamedPipeHelper.write_to_pipe(self.pipe_handle, json.dumps(response)) + + def handle_request(self, data: str): + request_dict = json.loads(data) + path = request_dict["path"] + body = json.loads(request_dict["body"]) + method = request_dict["method"] + + if "params" in request_dict and request_dict["params"] != "null": + query_string_params = json.loads(request_dict["params"]) + else: + query_string_params = {} + + server_operation = ServerResponseGenerator( + self.server, self.send_response, body, query_string_params + ) + try: + if path == "/run" and method == "PUT": + server_operation.generate_run_put_response() + + elif path == "/shutdown" and method == "PUT": + server_operation.generate_shutdown_put_response() + + elif path == "/heartbeat" and method == "GET": + _ACK_ID_KEY = ServerResponseGenerator.ACK_ID_KEY + + def _parse_ack_id(): + if _ACK_ID_KEY in query_string_params: + return query_string_params[_ACK_ID_KEY] + + server_operation.generate_heartbeat_get_response(_parse_ack_id) + + elif path == "/start" and method == "PUT": + server_operation.generate_start_put_response() + + elif path == "/stop" and method == "PUT": + server_operation.generate_stop_put_response() + + elif path == "/cancel" and method == "PUT": + server_operation.generate_cancel_put_response() + except Exception: + raise diff --git a/src/openjd/adaptor_runtime/_background/server_config.py b/src/openjd/adaptor_runtime/_background/server_config.py new file mode 100644 index 0000000..351eb37 --- /dev/null +++ b/src/openjd/adaptor_runtime/_background/server_config.py @@ -0,0 +1,4 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +NAMED_PIPE_BUFFER_SIZE = 8192 +DEFAULT_NAMED_PIPE_TIMEOUT = 5000 diff --git a/src/openjd/adaptor_runtime/_entrypoint.py b/src/openjd/adaptor_runtime/_entrypoint.py index 17ca5d3..51c6bf1 100644 --- a/src/openjd/adaptor_runtime/_entrypoint.py +++ b/src/openjd/adaptor_runtime/_entrypoint.py @@ -71,7 +71,9 @@ "runtime", "configuration.json", ) - ) + ), + # TODO: Replace this with a proper path + "Windows": r"C:/tmp/configuration.json", }, "user_config_rel_path": os.path.join( ".openjd", "worker", "adaptors", "runtime", "configuration.json" diff --git a/src/openjd/adaptor_runtime/adaptors/configuration/_configuration_manager.py b/src/openjd/adaptor_runtime/adaptors/configuration/_configuration_manager.py index 3a95ed7..a7c4d30 100644 --- a/src/openjd/adaptor_runtime/adaptors/configuration/_configuration_manager.py +++ b/src/openjd/adaptor_runtime/adaptors/configuration/_configuration_manager.py @@ -61,7 +61,9 @@ def create_adaptor_configuration_manager( adaptor_name, f"{adaptor_name}.json", ) - ) + ), + # TODO: Confirm the windows path format + "Windows": f"C:tmp/{adaptor_name}/{adaptor_name}.json", } user_config_rel_path = os.path.join(".openjd", "adaptors", adaptor_name, f"{adaptor_name}.json") diff --git a/src/openjd/adaptor_runtime/application_ipc/__init__.py b/src/openjd/adaptor_runtime/application_ipc/__init__.py index 55ad594..56d928d 100644 --- a/src/openjd/adaptor_runtime/application_ipc/__init__.py +++ b/src/openjd/adaptor_runtime/application_ipc/__init__.py @@ -1,6 +1,11 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. from ._actions_queue import ActionsQueue -from ._adaptor_server import AdaptorServer +from .._osname import OSName -__all__ = ["ActionsQueue", "AdaptorServer", "ServerAddress"] +if OSName.is_windows(): + from ._win_adaptor_server import WinAdaptorServer +else: + from ._adaptor_server import AdaptorServer + +__all__ = ["ActionsQueue", "AdaptorServer", "ServerAddress", "WinAdaptorServer"] diff --git a/test/openjd/adaptor_runtime/conftest.py b/test/openjd/adaptor_runtime/conftest.py index 31600b4..1537e70 100644 --- a/test/openjd/adaptor_runtime/conftest.py +++ b/test/openjd/adaptor_runtime/conftest.py @@ -1,5 +1,5 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - +import os import platform from openjd.adaptor_runtime._osname import OSName @@ -11,7 +11,14 @@ def pytest_collection_modifyitems(items): if OSName.is_windows(): # Add the tests' paths that we want to enable in Windows - do_not_skip_paths = [] + do_not_skip_paths = [ + os.path.join(os.path.abspath(os.path.dirname(__file__)), "integ", "background"), + os.path.join( + os.path.abspath(os.path.dirname(__file__)), + "integ", + "test_integration_entrypoint.py", + ), + ] skip_marker = pytest.mark.skip(reason="Skipping tests on Windows") for item in items: if not any(not_skip_path in item.fspath.strpath for not_skip_path in do_not_skip_paths): diff --git a/test/openjd/adaptor_runtime/integ/IntegCommandAdaptor/adaptor.py b/test/openjd/adaptor_runtime/integ/IntegCommandAdaptor/adaptor.py index 9710cff..a48dff5 100644 --- a/test/openjd/adaptor_runtime/integ/IntegCommandAdaptor/adaptor.py +++ b/test/openjd/adaptor_runtime/integ/IntegCommandAdaptor/adaptor.py @@ -3,6 +3,8 @@ import os from typing import List from logging import getLogger + +from openjd.adaptor_runtime._osname import OSName from openjd.adaptor_runtime.adaptors import CommandAdaptor from openjd.adaptor_runtime.process import ManagedProcess @@ -14,7 +16,12 @@ def __init__(self, run_data: dict) -> None: super().__init__(run_data) def get_executable(self) -> str: - return os.path.abspath(os.path.join(os.path.sep, "bin", "echo")) + if OSName.is_windows(): + # In Windows, we cannot directly execute the powershell script. + # Need to use PowerShell.exe to run the command. + return "powershell.exe" + else: + return os.path.abspath(os.path.join(os.path.sep, "bin", "echo")) def get_arguments(self) -> List[str]: return self.run_data.get("args", "") diff --git a/test/openjd/adaptor_runtime/integ/background/test_background_mode.py b/test/openjd/adaptor_runtime/integ/background/test_background_mode.py index 874de73..08812e5 100644 --- a/test/openjd/adaptor_runtime/integ/background/test_background_mode.py +++ b/test/openjd/adaptor_runtime/integ/background/test_background_mode.py @@ -22,6 +22,7 @@ HTTPError, _load_connection_settings, ) +from openjd.adaptor_runtime._osname import OSName mod_path = (Path(__file__).parent).resolve() sys.path.append(str(mod_path)) @@ -75,10 +76,15 @@ def initialized_setup( backend_proc.kill() except psutil.NoSuchProcess: pass # Already stopped - try: - os.remove(conn_settings.socket) - except FileNotFoundError: - pass # Already deleted + + # We don't need to call the `remove` for the NamedPipe server. + # NamedPipe servers are managed by Named Pipe File System it is not a regular file. + # Once all handles are closed, the system automatically cleans up the named pipe. + if OSName.is_posix(): + try: + os.remove(conn_settings.socket) + except FileNotFoundError: + pass # Already deleted def test_init( self, @@ -92,12 +98,17 @@ def test_init( assert os.path.exists(connection_file_path) connection_settings = _load_connection_settings(connection_file_path) - assert any( - [ - conn.laddr == connection_settings.socket - for conn in backend_proc.connections(kind="unix") - ] - ) + + if OSName.is_windows(): + # TODO: Need to figure out what we need to validate here + pass + else: + assert any( + [ + conn.laddr == connection_settings.socket + for conn in backend_proc.connections(kind="unix") + ] + ) def test_shutdown( self, @@ -114,7 +125,7 @@ def test_shutdown( # THEN assert all( [ - _wait_for_file_deletion(p, timeout_s=1) + _wait_for_file_deletion(p, timeout_s=(1 if OSName.is_posix() else 2)) for p in [connection_file_path, conn_settings.socket] ] ) @@ -185,7 +196,10 @@ def test_heartbeat_acks( # WHEN new_response = frontend._heartbeat(response.output.id) - + # In Windows, we need to shut down the namedpipe client, + # or the connection of the NamedPipe server remains open + if OSName.is_windows(): + frontend.shutdown() # THEN assert f"Received ACK for chunk: {response.output.id}" in new_response.output.output diff --git a/test/openjd/adaptor_runtime/integ/test_integration_entrypoint.py b/test/openjd/adaptor_runtime/integ/test_integration_entrypoint.py index 2939946..800fc26 100644 --- a/test/openjd/adaptor_runtime/integ/test_integration_entrypoint.py +++ b/test/openjd/adaptor_runtime/integ/test_integration_entrypoint.py @@ -15,6 +15,7 @@ import openjd.adaptor_runtime._entrypoint as runtime_entrypoint from openjd.adaptor_runtime import EntryPoint +from openjd.adaptor_runtime._osname import OSName mod_path = (Path(__file__).parent).resolve() sys.path.append(str(mod_path)) @@ -46,7 +47,9 @@ def test_runs_command_adaptor( } ), "--run-data", - json.dumps({"args": ["hello world"]}), + json.dumps( + {"args": ["echo", "hello world"] if OSName.is_windows() else ["hello world"]} + ), ] entrypoint = EntryPoint(IntegCommandAdaptor) @@ -144,7 +147,9 @@ def test_run(self, caplog: pytest.LogCaptureFixture, tmp_path: Path): "--connection-file", str(connection_file), "--run-data", - json.dumps({"args": ["hello world"]}), + json.dumps( + {"args": ["echo", "hello world"] if OSName.is_windows() else ["hello world"]} + ), ] test_stop_argv = [ "program_filename.py", diff --git a/test/openjd/adaptor_runtime/unit/background/test_backend_runner.py b/test/openjd/adaptor_runtime/unit/background/test_backend_runner.py index ed8c380..02dcad2 100644 --- a/test/openjd/adaptor_runtime/unit/background/test_backend_runner.py +++ b/test/openjd/adaptor_runtime/unit/background/test_backend_runner.py @@ -69,7 +69,7 @@ def test_run( assert caplog.messages == [ "Running in background daemon mode.", f"Listening on {socket_path}", - "HTTP server has shutdown.", + "Background server has been shut down.", ] mock_server_cls.assert_called_once_with( socket_path, @@ -143,7 +143,7 @@ def test_run_raises_when_writing_connection_file_fails( f"Listening on {socket_path}", "Error writing to connection file: ", "Shutting down server...", - "HTTP server has shutdown.", + "Background server has been shut down.", ] mock_thread.assert_called_once() mock_thread.return_value.start.assert_called_once() @@ -163,7 +163,7 @@ def test_signal_hook(self, signal_mock: MagicMock) -> None: server_mock = MagicMock() submit_mock = MagicMock() server_mock.submit = submit_mock - runner._http_server = server_mock + runner._server = server_mock # WHEN runner._sigint_handler(MagicMock(), MagicMock())