diff --git a/src/safeds_runner/server/pipeline_manager.py b/src/safeds_runner/server/pipeline_manager.py index 53c0992..7db0d84 100644 --- a/src/safeds_runner/server/pipeline_manager.py +++ b/src/safeds_runner/server/pipeline_manager.py @@ -6,10 +6,8 @@ import queue import runpy import threading -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from multiprocessing.managers import SyncManager +from functools import cached_property +from multiprocessing.managers import SyncManager from typing import Any import simple_websocket @@ -37,23 +35,38 @@ class PipelineManager: """ def __init__(self) -> None: + """Create a new PipelineManager object, which needs to be started by calling startup().""" + self._placeholder_map: dict = {} + self._websocket_target: simple_websocket.Server | None = None + + @cached_property + def _multiprocessing_manager(self) -> SyncManager: + return multiprocessing.Manager() + + @cached_property + def _messages_queue(self) -> queue.Queue[Message]: + return self._multiprocessing_manager.Queue() + + @cached_property + def _messages_queue_thread(self) -> threading.Thread: + return threading.Thread( + target=self._handle_queue_messages, + daemon=True, + ) + + def _startup(self) -> None: """ Prepare the runner for running Safe-DS pipelines. - Firstly, structures shared between processes are created. + Firstly, structures shared between processes are lazily created. After that a message queue handling thread is started in the main process. This allows receiving messages directly from one of the pipeline processes and relaying information directly to the websocket connection (to the VS Code extension). + + This method should not be called during the bootstrap phase of the python interpreter, as it leads to a crash. """ - self._multiprocessing_manager: SyncManager = multiprocessing.Manager() - self._placeholder_map: dict = {} - self._messages_queue: queue.Queue[Message] = self._multiprocessing_manager.Queue() - self._websocket_target: simple_websocket.Server | None = None - self._messages_queue_thread: threading.Thread = threading.Thread( - target=self._handle_queue_messages, - daemon=True, - ) - self._messages_queue_thread.start() + if not self._messages_queue_thread.is_alive(): + self._messages_queue_thread.start() def _handle_queue_messages(self) -> None: """ @@ -95,6 +108,7 @@ def execute_pipeline( execution_id : str Unique ID to identify this execution. """ + self._startup() if execution_id not in self._placeholder_map: self._placeholder_map[execution_id] = self._multiprocessing_manager.dict() process = PipelineProcess( diff --git a/tests/safeds_runner/server/test_runner_main.py b/tests/safeds_runner/server/test_runner_main.py new file mode 100644 index 0000000..8e4cd79 --- /dev/null +++ b/tests/safeds_runner/server/test_runner_main.py @@ -0,0 +1,17 @@ +import subprocess +import typing +from pathlib import Path +from typing import IO + +_project_root: Path = Path(__file__).parent / ".." / ".." / ".." + + +def test_should_runner_start_successfully() -> None: + process = subprocess.Popen(["poetry", "run", "safe-ds-runner"], cwd=_project_root, stderr=subprocess.PIPE) + while process.poll() is None: + process_line = str(typing.cast(IO[bytes], process.stderr).readline(), "utf-8").strip() + # Wait for first line of log + if process_line.startswith("INFO:root:Starting Safe-DS Runner"): + process.kill() + return + assert process.poll() == 0