Skip to content

Commit

Permalink
fix: runner startup crash (#15)
Browse files Browse the repository at this point in the history
### Summary of Changes

- fixes a crash on startup as the multiprocessing manager causes a new
process to spawn during the bootstrap phase
- this error only occurred during execution of the main.py with a new
interpreter

---------

Co-authored-by: megalinter-bot <[email protected]>
Co-authored-by: Lars Reimann <[email protected]>
  • Loading branch information
3 people authored Nov 30, 2023
1 parent 035f64c commit 01df889
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 14 deletions.
42 changes: 28 additions & 14 deletions src/safeds_runner/server/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@
import queue
import runpy
import threading
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from multiprocessing.managers import SyncManager
from functools import cached_property
from multiprocessing.managers import SyncManager
from typing import Any

import simple_websocket
Expand Down Expand Up @@ -37,23 +35,38 @@ class PipelineManager:
"""

def __init__(self) -> None:
"""Create a new PipelineManager object, which needs to be started by calling startup()."""
self._placeholder_map: dict = {}
self._websocket_target: simple_websocket.Server | None = None

@cached_property
def _multiprocessing_manager(self) -> SyncManager:
return multiprocessing.Manager()

@cached_property
def _messages_queue(self) -> queue.Queue[Message]:
return self._multiprocessing_manager.Queue()

@cached_property
def _messages_queue_thread(self) -> threading.Thread:
return threading.Thread(
target=self._handle_queue_messages,
daemon=True,
)

def _startup(self) -> None:
"""
Prepare the runner for running Safe-DS pipelines.
Firstly, structures shared between processes are created.
Firstly, structures shared between processes are lazily created.
After that a message queue handling thread is started in the main process.
This allows receiving messages directly from one of the pipeline processes and relaying information
directly to the websocket connection (to the VS Code extension).
This method should not be called during the bootstrap phase of the python interpreter, as it leads to a crash.
"""
self._multiprocessing_manager: SyncManager = multiprocessing.Manager()
self._placeholder_map: dict = {}
self._messages_queue: queue.Queue[Message] = self._multiprocessing_manager.Queue()
self._websocket_target: simple_websocket.Server | None = None
self._messages_queue_thread: threading.Thread = threading.Thread(
target=self._handle_queue_messages,
daemon=True,
)
self._messages_queue_thread.start()
if not self._messages_queue_thread.is_alive():
self._messages_queue_thread.start()

def _handle_queue_messages(self) -> None:
"""
Expand Down Expand Up @@ -95,6 +108,7 @@ def execute_pipeline(
execution_id : str
Unique ID to identify this execution.
"""
self._startup()
if execution_id not in self._placeholder_map:
self._placeholder_map[execution_id] = self._multiprocessing_manager.dict()
process = PipelineProcess(
Expand Down
17 changes: 17 additions & 0 deletions tests/safeds_runner/server/test_runner_main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import subprocess
import typing
from pathlib import Path
from typing import IO

_project_root: Path = Path(__file__).parent / ".." / ".." / ".."


def test_should_runner_start_successfully() -> None:
process = subprocess.Popen(["poetry", "run", "safe-ds-runner"], cwd=_project_root, stderr=subprocess.PIPE)
while process.poll() is None:
process_line = str(typing.cast(IO[bytes], process.stderr).readline(), "utf-8").strip()
# Wait for first line of log
if process_line.startswith("INFO:root:Starting Safe-DS Runner"):
process.kill()
return
assert process.poll() == 0

0 comments on commit 01df889

Please sign in to comment.