Skip to content

Commit

Permalink
feat: execute pipelines in new subprocess and send messages back to m…
Browse files Browse the repository at this point in the history
…ain process using a queue
  • Loading branch information
WinPlay02 committed Nov 20, 2023
1 parent c453627 commit 5eefe0f
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 112 deletions.
151 changes: 58 additions & 93 deletions src/safeds_runner/server/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

import json
import logging
import typing
from typing import Any, Optional
from typing import Any

import stack_data
from flask import Flask, request
from flask import Flask
from flask_cors import CORS
from flask_sock import Sock

from safeds_runner.server.module_manager import execute_pipeline
from safeds_runner.server import messages
from safeds_runner.server.module_manager import execute_pipeline, get_placeholder, set_new_websocket_target, \
start_message_queue_handling, setup_multiprocessing

app = Flask(__name__)
# Websocket Configuration
Expand All @@ -19,38 +19,18 @@
# Allow access from VSCode extension
CORS(app, resources={r"/*": {"origins": "vscode-webview://*"}})

"""
Args should contain every source file that was generated
code: ["<package>" => ["<file>" => "<code>", ...], ...]
main: {"package": <package; Value of Package directive on Safe-DS module>, "module": <module; Name of Safe-DS source file>, "pipeline": <pipeline; Name of executable Pipeline>}
:return: Tuple: (Result String, HTTP Code)
"""

## HTTP Route
@app.route("/PostRunProgram", methods=["POST"])
def post_run_program():
"""
Args should contain every source file that was generated
code: ["<package>" => ["<file>" => "<code>", ...], ...]
main: {"package": <package; Value of Package directive on Safe-DS module>, "module": <module; Name of Safe-DS source file>, "pipeline": <pipeline; Name of executable Pipeline>}
:return: Tuple: (Result String, HTTP Code)
"""
logging.debug(f"{request.path}: {request.form}")
if not request.is_json:
return "Body is not JSON", 400
request_data = request.get_json()
# Validate
valid, invalid_message = validate_message(request_data)
if not valid:
return invalid_message, 400
code = request_data["code"]
main = request_data["main"]
# Execute
# Dynamically define Safe-DS Modules only in our runtime scope
# TODO forward memoization map here
context_globals = {}
# This should only be called from the extension as it is a security risk
result = execute_pipeline(code, main['package'], main['module'], main['pipeline'], context_globals)
return json.dumps(result), 200


@sock.route("/WSRunProgram")

@sock.route("/WSMain")
def ws_run_program(ws):
logging.debug(f"Request to WSRunProgram")
set_new_websocket_target(ws)
while True:

Check warning on line 34 in src/safeds_runner/server/main.py

View check run for this annotation

Codecov / codecov/patch

src/safeds_runner/server/main.py#L32-L34

Added lines #L32 - L34 were not covered by tests
# This would be a JSON message
received_message: str = ws.receive()
Expand All @@ -65,76 +45,62 @@ def ws_run_program(ws):
logging.warn(f"No message type specified in: {received_message}")
ws.close(None)
return
match received_object["type"]:
if "id" not in received_object:
logging.warn(f"No message id specified in: {received_message}")
ws.close(None)
return
if "data" not in received_object:
logging.warn(f"No message data specified in: {received_message}")
ws.close(None)
return
if not isinstance(received_object["type"], str):
logging.warn(f"Message type is not a string: {received_message}")
ws.close(None)
return
if not isinstance(received_object["id"], str):
logging.warn(f"Message id is not a string: {received_message}")
ws.close(None)
return
request_data = received_object["data"]
message_type = received_object["type"]
execution_id = received_object["id"]
match message_type:
case "program":
if "data" not in received_object:
logging.warn(f"No message data specified in: {received_message}")
ws.close(None)
return
request_data = received_object["data"]
valid, invalid_message = validate_message(request_data)
valid, invalid_message = messages.validate_program_message(request_data)
if not valid:
logging.warn(f"Invalid message data specified in: {received_message} ({invalid_message})")
ws.close(None)
return
code = request_data["code"]
main = request_data["main"]

Check warning on line 75 in src/safeds_runner/server/main.py

View check run for this annotation

Codecov / codecov/patch

src/safeds_runner/server/main.py#L36-L75

Added lines #L36 - L75 were not covered by tests
# Execute
# Dynamically define Safe-DS Modules only in our runtime scope
# TODO forward memoization map here
context_globals = {"connection": ws, "send_value": send_value}
# This should only be called from the extension as it is a security risk
try:
execute_pipeline(code, main['package'], main['module'], main['pipeline'], context_globals)
send_message(ws, "progress", "done")
except BaseException as error:
send_message(ws, "runtime_error",
{"message": error.__str__(), "backtrace": get_backtrace_info(error)})


def get_backtrace_info(error: BaseException) -> list[dict[str, typing.Any]]:
backtrace_list = []
for frame in stack_data.core.FrameInfo.stack_data(error.__traceback__):
backtrace_list.append({"file": frame.filename, "line": str(frame.lineno)})
return backtrace_list
execute_pipeline(code, main['package'], main['module'], main['pipeline'], execution_id)
case "placeholder_query":
valid, invalid_message = messages.validate_placeholder_query_message(request_data)
if not valid:
logging.warn(f"Invalid message data specified in: {received_message} ({invalid_message})")
ws.close(None)
return
placeholder_type, placeholder_value = get_placeholder(execution_id, request_data)
if placeholder_type is not None:
send_websocket_value(ws, request_data, placeholder_type, placeholder_value)

Check warning on line 86 in src/safeds_runner/server/main.py

View check run for this annotation

Codecov / codecov/patch

src/safeds_runner/server/main.py#L77-L86

Added lines #L77 - L86 were not covered by tests
else:
# Send back empty type / value, to communicate that no placeholder exists (yet)
send_websocket_value(ws, request_data, "", "")
case _:
if message_type not in messages.message_types:
logging.warn(f"Invalid message type {message_type}")

Check warning on line 92 in src/safeds_runner/server/main.py

View check run for this annotation

Codecov / codecov/patch

src/safeds_runner/server/main.py#L89-L92

Added lines #L89 - L92 were not covered by tests


def send_value(connection, name: str, var_type: str, value: str):
send_message(connection, "value", {"name": name, "type": var_type, "value": value})
def send_websocket_value(connection, name: str, var_type: str, value: str):
send_websocket_message(connection, "value", {"name": name, "type": var_type, "value": value})

Check warning on line 96 in src/safeds_runner/server/main.py

View check run for this annotation

Codecov / codecov/patch

src/safeds_runner/server/main.py#L96

Added line #L96 was not covered by tests


def send_message(connection, msg_type: str, msg_data):
def send_websocket_message(connection, msg_type: str, msg_data):
message = {"type": msg_type, "data": msg_data}
connection.send(json.dumps(message))

Check warning on line 101 in src/safeds_runner/server/main.py

View check run for this annotation

Codecov / codecov/patch

src/safeds_runner/server/main.py#L100-L101

Added lines #L100 - L101 were not covered by tests


def validate_message(message: dict[str, Any]) -> (bool, Optional[str]):
if "code" not in message:
return False, "No 'code' parameter given"
if "main" not in message:
return False, "No 'main' parameter given"
if "package" not in message["main"] or "module" not in message["main"] or "pipeline" not in message["main"]:
return False, "Invalid 'main' parameter given"
if len(message["main"]) != 3:
return False, "Invalid 'main' parameter given"
main: dict[str, str] = message["main"]
if not isinstance(message["code"], dict):
return False, "Invalid 'code' parameter given"
code: dict = message["code"]
for key in code.keys():
if not isinstance(key, str):
return False, "Invalid 'code' parameter given"
if not isinstance(code[key], dict):
return False, "Invalid 'code' parameter given"
next_dict: dict = code[key]
for next_key in next_dict.keys():
if not isinstance(next_key, str):
return False, "Invalid 'code' parameter given"
if not isinstance(next_dict[next_key], str):
return False, "Invalid 'code' parameter given"
return True, None


if __name__ == "__main__":
# Allow prints to be unbuffered by default
import functools
Expand All @@ -143,14 +109,13 @@ def validate_message(message: dict[str, Any]) -> (bool, Optional[str]):
builtins.print = functools.partial(print, flush=True)

Check warning on line 109 in src/safeds_runner/server/main.py

View check run for this annotation

Codecov / codecov/patch

src/safeds_runner/server/main.py#L109

Added line #L109 was not covered by tests

logging.getLogger().setLevel(logging.DEBUG)
from gevent import monkey

monkey.patch_all()
from gevent.pywsgi import WSGIServer

Check warning on line 112 in src/safeds_runner/server/main.py

View check run for this annotation

Codecov / codecov/patch

src/safeds_runner/server/main.py#L111-L112

Added lines #L111 - L112 were not covered by tests

parser = argparse.ArgumentParser(description="Start Safe-DS Runner on a specific port.")
parser.add_argument('--port', type=int, default=5000, help='Port on which to run the python server.')
args = parser.parse_args()
setup_multiprocessing()
start_message_queue_handling()
logging.info(f"Starting Safe-DS Runner on port {args.port}")

Check warning on line 119 in src/safeds_runner/server/main.py

View check run for this annotation

Codecov / codecov/patch

src/safeds_runner/server/main.py#L114-L119

Added lines #L114 - L119 were not covered by tests
# TODO Maybe only bind to host=127.0.0.1? Connections from other devices would then not be accepted
WSGIServer(('0.0.0.0', args.port), app).serve_forever()
# Only bind to host=127.0.0.1. Connections from other devices should not be accepted
WSGIServer(('127.0.0.1', args.port), app).serve_forever()

Check warning on line 121 in src/safeds_runner/server/main.py

View check run for this annotation

Codecov / codecov/patch

src/safeds_runner/server/main.py#L121

Added line #L121 was not covered by tests
56 changes: 56 additions & 0 deletions src/safeds_runner/server/messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import typing

message_types = ["program", "placeholder_query", "placeholder", "placeholder_value", "runtime_error",
"runtime_progress"]


def create_placeholder_description(name: str, placeholder_type: str) -> dict[str, typing.Any]:
return {"name": name, "type": placeholder_type}

Check warning on line 8 in src/safeds_runner/server/messages.py

View check run for this annotation

Codecov / codecov/patch

src/safeds_runner/server/messages.py#L8

Added line #L8 was not covered by tests


def create_placeholder_value(name: str, placeholder_type: str, value: str) -> dict[str, typing.Any]:
return {"name": name, "type": placeholder_type, "value": value}

Check warning on line 12 in src/safeds_runner/server/messages.py

View check run for this annotation

Codecov / codecov/patch

src/safeds_runner/server/messages.py#L12

Added line #L12 was not covered by tests


def create_runtime_error_description(message: str, backtrace: list[dict[str, typing.Any]]) -> dict[str, typing.Any]:
return {"message": message, "backtrace": backtrace}

Check warning on line 16 in src/safeds_runner/server/messages.py

View check run for this annotation

Codecov / codecov/patch

src/safeds_runner/server/messages.py#L16

Added line #L16 was not covered by tests


def create_runtime_progress_done() -> str:
return "done"

Check warning on line 20 in src/safeds_runner/server/messages.py

View check run for this annotation

Codecov / codecov/patch

src/safeds_runner/server/messages.py#L20

Added line #L20 was not covered by tests


def validate_program_message(message_data: dict[str, typing.Any] | str) -> (bool, typing.Optional[str]):
if not isinstance(message_data, dict):
return False, "Message data is not a JSON object"
if "code" not in message_data:
return False, "No 'code' parameter given"
if "main" not in message_data:
return False, "No 'main' parameter given"
if "package" not in message_data["main"] or "module" not in message_data["main"] or "pipeline" not in message_data[

Check warning on line 30 in src/safeds_runner/server/messages.py

View check run for this annotation

Codecov / codecov/patch

src/safeds_runner/server/messages.py#L24-L30

Added lines #L24 - L30 were not covered by tests
"main"]:
return False, "Invalid 'main' parameter given"
if len(message_data["main"]) != 3:
return False, "Invalid 'main' parameter given"
main: dict[str, str] = message_data["main"]
if not isinstance(message_data["code"], dict):
return False, "Invalid 'code' parameter given"
code: dict = message_data["code"]
for key in code.keys():
if not isinstance(key, str):
return False, "Invalid 'code' parameter given"
if not isinstance(code[key], dict):
return False, "Invalid 'code' parameter given"
next_dict: dict = code[key]
for next_key in next_dict.keys():
if not isinstance(next_key, str):
return False, "Invalid 'code' parameter given"
if not isinstance(next_dict[next_key], str):
return False, "Invalid 'code' parameter given"
return True, None

Check warning on line 50 in src/safeds_runner/server/messages.py

View check run for this annotation

Codecov / codecov/patch

src/safeds_runner/server/messages.py#L32-L50

Added lines #L32 - L50 were not covered by tests


def validate_placeholder_query_message(message_data: dict[str, typing.Any] | str) -> (bool, typing.Optional[str]):
if not isinstance(message_data, str):
return False, "Message data is not a string"
return True, None

Check warning on line 56 in src/safeds_runner/server/messages.py

View check run for this annotation

Codecov / codecov/patch

src/safeds_runner/server/messages.py#L54-L56

Added lines #L54 - L56 were not covered by tests
Loading

0 comments on commit 5eefe0f

Please sign in to comment.