Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: runner startup crash #15

Merged
merged 12 commits into from
Nov 30, 2023
42 changes: 28 additions & 14 deletions src/safeds_runner/server/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@
import queue
import runpy
import threading
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from multiprocessing.managers import SyncManager
from functools import cached_property
from multiprocessing.managers import SyncManager
from typing import Any

import simple_websocket
Expand Down Expand Up @@ -37,23 +35,38 @@ class PipelineManager:
"""

def __init__(self) -> None:
"""Create a new PipelineManager object, which needs to be started by calling startup()."""
self._placeholder_map: dict = {}
self._websocket_target: simple_websocket.Server | None = None

@cached_property
def _multiprocessing_manager(self) -> SyncManager:
return multiprocessing.Manager()

@cached_property
def _messages_queue(self) -> queue.Queue[Message]:
return self._multiprocessing_manager.Queue()

@cached_property
def _messages_queue_thread(self) -> threading.Thread:
return threading.Thread(
target=self._handle_queue_messages,
daemon=True,
)

def _startup(self) -> None:
"""
Prepare the runner for running Safe-DS pipelines.

Firstly, structures shared between processes are created.
Firstly, structures shared between processes are lazily created.
After that a message queue handling thread is started in the main process.
This allows receiving messages directly from one of the pipeline processes and relaying information
directly to the websocket connection (to the VS Code extension).

This method should not be called during the bootstrap phase of the python interpreter, as it leads to a crash.
"""
self._multiprocessing_manager: SyncManager = multiprocessing.Manager()
self._placeholder_map: dict = {}
self._messages_queue: queue.Queue[Message] = self._multiprocessing_manager.Queue()
self._websocket_target: simple_websocket.Server | None = None
self._messages_queue_thread: threading.Thread = threading.Thread(
target=self._handle_queue_messages,
daemon=True,
)
self._messages_queue_thread.start()
if not self._messages_queue_thread.is_alive():
self._messages_queue_thread.start()

def _handle_queue_messages(self) -> None:
"""
Expand Down Expand Up @@ -95,6 +108,7 @@ def execute_pipeline(
execution_id : str
Unique ID to identify this execution.
"""
self._startup()
if execution_id not in self._placeholder_map:
self._placeholder_map[execution_id] = self._multiprocessing_manager.dict()
process = PipelineProcess(
Expand Down
17 changes: 17 additions & 0 deletions tests/safeds_runner/server/test_runner_main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import subprocess
import typing
from pathlib import Path
from typing import IO

_project_root: Path = Path(__file__).parent / ".." / ".." / ".."


def test_should_runner_start_successfully() -> None:
process = subprocess.Popen(["poetry", "run", "safe-ds-runner"], cwd=_project_root, stderr=subprocess.PIPE)
while process.poll() is None:
process_line = str(typing.cast(IO[bytes], process.stderr).readline(), "utf-8").strip()
# Wait for first line of log
if process_line.startswith("INFO:root:Starting Safe-DS Runner"):
process.kill()
return
assert process.poll() == 0