diff --git a/golem-cluster-dev.yaml b/golem-cluster-dev.yaml new file mode 100644 index 00000000..4076fda0 --- /dev/null +++ b/golem-cluster-dev.yaml @@ -0,0 +1,103 @@ +# Ray on Golem cluster name +cluster_name: golem-cluster + +# The maximum number of workers the cluster will have at any given time +max_workers: 10 + +# The number of minutes that need to pass before an idle worker node is removed by the Autoscaler +idle_timeout_minutes: 5 + +# The cloud provider-specific configuration properties. +provider: + type: "external" + use_internal_ips: true + module: "ray_on_golem.provider.node_provider.GolemNodeProvider" + parameters: + # Port of golem webserver that has connection with golem network + webserver_port: 4578 + + enable_registry_stats: false + + # Blockchain used for payments. + # Goerli means running free nodes on testnet, + # Polygon is for mainnet operations. + network: "goerli" + + # Maximum amount of GML that's going to be spent (not supported yet) + budget: 1 + + # Params for creating golem demands (same for head and workers) + node_config: + # if not provided, image_tag will be autodetected based on currently used python and ray versions + # check available versions at https://registry.golem.network/explore/golem/ray-on-golem + image_tag: "golem/ray-on-golem:0.1.0-alpha.1" + + # you can also provide the image hash directly (although it is not recommended :) + # image_hash: "c6fc9b12da19d56a998eace84b6df961852360b37479e93ef4ce3913" # approxit/ray-test:py3.10.12-ray2.3.1 + + capabilities: ["vpn", "inet", "manifest-support"] + min_mem_gib: 0 + min_cpu_threads: 0 + min_storage_gib: 0 + +# The files or directories to copy to the head and worker nodes +file_mounts: + # remote_path: local_path + { + "/app/ray_on_golem": "./ray_on_golem", + } + +# Tells the autoscaler the allowed node types and the resources they provide +available_node_types: + ray.head.default: + # The minimum number of worker nodes of this type to launch + min_workers: 0 + + # The maximum number of worker nodes of this type to launch + max_workers: 0 + + # The node type's CPU and GPU resources + resources: {"CPU": 1} + + node_config: {} # TODO: Demand description here + ray.worker.default: + min_workers: 1 + max_workers: 10 + resources: {"CPU": 1} + node_config: {} + +# List of commands that will be run to initialize the nodes (before `setup_commands`) +#initialization_commands: [ +# "pip install endplay", +#] +initialization_commands: [] + +# List of shell commands to run to set up nodes +setup_commands: [] + +# Custom commands that will be run on the head node after common setup. +head_setup_commands: [] + +# Custom commands that will be run on worker nodes after common setup. +worker_setup_commands: [] + +# Command to start ray on the head node. You don't need to change this. +head_start_ray_commands: [ + "ray start --head --node-ip-address 192.168.0.3 --include-dashboard=True --dashboard-host 0.0.0.0 --disable-usage-stats --autoscaling-config=~/ray_bootstrap_config.yaml", +] + +# Command to start ray on worker nodes. You don't need to change this. +worker_start_ray_commands: [ + "ray start --address 192.168.0.3:6379", +] + +# Authentication credentials that Ray will use to launch nodes +# auth: + # Custom username for ssh + # ssh_user: "root" + + # If ssh_private_key will be not provided, temporary key will be created and used + # ssh_private_key: "~/.ssh/id_rsa" + +# A list of paths to the files or directories to copy from the head node to the worker nodes +cluster_synced_files: [] diff --git a/golem-cluster.yaml b/golem-cluster.yaml index d3628106..aafde487 100644 --- a/golem-cluster.yaml +++ b/golem-cluster.yaml @@ -33,16 +33,14 @@ provider: # you can also provide the image hash directly (although it is not recommended :) # image_hash: "c6fc9b12da19d56a998eace84b6df961852360b37479e93ef4ce3913" # approxit/ray-test:py3.10.12-ray2.3.1 - capabilities: ['vpn', 'inet', 'manifest-support'] + capabilities: ["vpn", "inet", "manifest-support"] min_mem_gib: 0 min_cpu_threads: 0 min_storage_gib: 0 # The files or directories to copy to the head and worker nodes -file_mounts: - { - # "/app/ray_on_golem": "/tmp/ray-on-golem", - } +# file_mounts: + # : # Tells the autoscaler the allowed node types and the resources they provide available_node_types: @@ -56,15 +54,12 @@ available_node_types: # The node type's CPU and GPU resources resources: {"CPU": 1} - node_config: - # TODO: Demand description here - kind: Node + node_config: {} # TODO: Demand description here ray.worker.default: min_workers: 1 max_workers: 10 resources: {"CPU": 1} - node_config: - kind: Node + node_config: {} # List of commands that will be run to initialize the nodes (before `setup_commands`) #initialization_commands: [ @@ -92,12 +87,12 @@ worker_start_ray_commands: [ ] # Authentication credentials that Ray will use to launch nodes -auth: - # TODO: try to remove explicit ssh_user field - ssh_user: 'root' +# auth: + # Custom username for ssh + # ssh_user: "root" # If ssh_private_key will be not provided, temporary key will be created and used - # ssh_private_key: '~/.ssh/id_rsa' + # ssh_private_key: "~/.ssh/id_rsa" # A list of paths to the files or directories to copy from the head node to the worker nodes cluster_synced_files: [] diff --git a/ray_on_golem/provider/node_provider.py b/ray_on_golem/provider/node_provider.py index 68faa9a4..d35d05b0 100644 --- a/ray_on_golem/provider/node_provider.py +++ b/ray_on_golem/provider/node_provider.py @@ -40,7 +40,7 @@ def __init__(self, provider_config: dict, cluster_name: str): ) if not is_running_on_golem_network(): - self._start_webserver() + self._start_webserver(provider_parameters["enable_registry_stats"]) self._ray_on_golem_client = RayOnGolemClient.get_instance(self._webserver_url) self._ray_on_golem_client.get_running_or_create_cluster( @@ -56,12 +56,15 @@ def bootstrap_config(cluster_config: Dict[str, Any]) -> Dict[str, Any]: provider_parameters: Dict = config["provider"]["parameters"] provider_parameters.setdefault("webserver_port", RAY_ON_GOLEM_PORT) + provider_parameters.setdefault("enable_registry_stats", True) provider_parameters.setdefault("network", "goerli") provider_parameters.setdefault("budget", 1) ray_on_golem_client = RayOnGolemClient.get_instance(provider_parameters["webserver_port"]) auth: Dict = config["auth"] + auth.setdefault("ssh_user", "root") + if "ssh_private_key" not in auth: ssh_key_path = TMP_PATH / get_default_ssh_key_name(config["cluster_name"]) auth["ssh_private_key"] = provider_parameters["ssh_private_key"] = str(ssh_key_path) @@ -150,7 +153,7 @@ def external_ip(self, node_id: NodeId) -> str: def set_node_tags(self, node_id: NodeId, tags: Dict) -> None: self._ray_on_golem_client.set_node_tags(node_id, tags) - def _start_webserver(self) -> None: + def _start_webserver(self, registry_stats: bool) -> None: with cli_logger.group(WEBSERVER_LOG_GROUP): if self._is_webserver_running(): cli_logger.print("Webserver is already running") @@ -159,7 +162,13 @@ def _start_webserver(self) -> None: cli_logger.print("Starting webserver...") subprocess.Popen( - [RAY_ON_GOLEM_PATH, "-p", str(self._webserver_url.port), "--self-shutdown"], + [ + RAY_ON_GOLEM_PATH, + "-p", + str(self._webserver_url.port), + "--registry-stats" if registry_stats else "--no-registry-stats", + "--self-shutdown", + ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True, diff --git a/ray_on_golem/server/run.py b/ray_on_golem/server/run.py index 6acb995c..232dde68 100644 --- a/ray_on_golem/server/run.py +++ b/ray_on_golem/server/run.py @@ -30,9 +30,20 @@ def parse_sys_args() -> argparse.Namespace: parser.add_argument( "--self-shutdown", action="store_true", - default=False, help="flag to enable self-shutdown after last node termination, default: %(default)s", ) + parser.add_argument("--no-self-shutdown", action="store_false", dest="self_shutdown") + parser.add_argument( + "--registry-stats", + action="store_true", + help="flag to enable collection of Golem Registry stats about resolved images, default: %(default)s", + ) + parser.add_argument( + "--no-registry-stats", + action="store_false", + dest="registry_stats", + ) + parser.set_defaults(self_shutdown=False, registry_stats=True) return parser.parse_args() @@ -43,11 +54,12 @@ def prepare_tmp_dir(): pass -def create_application(port: int, self_shutdown: bool) -> web.Application: +def create_application(port: int, self_shutdown: bool, registry_stats: bool) -> web.Application: app = web.Application(middlewares=[error_middleware]) app["port"] = port app["self_shutdown"] = self_shutdown + app["registry_stats"] = registry_stats app["yagna_service"] = YagnaService( yagna_path=YAGNA_PATH, @@ -56,6 +68,7 @@ def create_application(port: int, self_shutdown: bool) -> web.Application: app["golem_service"] = GolemService( ray_on_golem_port=RAY_ON_GOLEM_PORT, websocat_path=WEBSOCAT_PATH, + registry_stats=app["registry_stats"], ) app["ray_service"] = RayService( @@ -116,13 +129,15 @@ def main(): args = parse_sys_args() prepare_tmp_dir() - app = create_application(args.port, args.self_shutdown) + app = create_application(args.port, args.self_shutdown, args.registry_stats) - logger.info("Starting server...") + logger.info( + "Starting server... {}".format(", ".join(f"{k}={v}" for k, v in args.__dict__.items())) + ) web.run_app(app, port=app["port"], print=None) - logger.info("Server stopped, bye!") + logger.info("Stopping server done, bye!") if __name__ == "__main__": diff --git a/ray_on_golem/server/services/golem/golem.py b/ray_on_golem/server/services/golem/golem.py index 600edf5e..5bef40b9 100644 --- a/ray_on_golem/server/services/golem/golem.py +++ b/ray_on_golem/server/services/golem/golem.py @@ -38,9 +38,10 @@ class GolemService: - def __init__(self, ray_on_golem_port: int, websocat_path: Path): + def __init__(self, ray_on_golem_port: int, websocat_path: Path, registry_stats: bool): self._ray_on_golem_port = ray_on_golem_port self._websocat_path = websocat_path + self._registry_stats = registry_stats self._golem: Optional[GolemNode] = None self._demand: Optional[Demand] = None @@ -50,15 +51,9 @@ def __init__(self, ray_on_golem_port: int, websocat_path: Path): self._yagna_appkey: Optional[str] = None self._lock = asyncio.Lock() - @property - def golem(self): - return self._golem - - @property - def payment_manager(self) -> DefaultPaymentManager: - return self._payment_manager - async def init(self, yagna_appkey: str) -> None: + logger.info("Starting GolemService...") + self._golem = GolemNode(app_key=yagna_appkey) self._yagna_appkey = yagna_appkey await self._golem.start() @@ -76,6 +71,8 @@ async def on_event(event) -> None: ) self._payment_manager = DefaultPaymentManager(self._golem, self._allocation) + logger.info("Starting GolemService done") + async def shutdown(self) -> None: """ Terminates all activities and ray on head node. @@ -83,15 +80,19 @@ async def shutdown(self) -> None: :return: """ - await self.payment_manager.terminate_agreements() + logger.info("Stopping GolemService...") + + await self._payment_manager.terminate_agreements() logger.info(f"Waiting for all invoices...") - await self.payment_manager.wait_for_invoices() + await self._payment_manager.wait_for_invoices() logger.info(f"Waiting for all invoices done") await self._golem.aclose() self._golem = None + logger.info("Stopping GolemService done") + async def create_cluster(self, provider_config: CreateClusterRequestData): """ Manages creating cluster, creates payload from given data and creates demand basing on payload @@ -158,12 +159,11 @@ async def _get_image_url_and_hash(self, node_config: NodeConfigData) -> Tuple[UR return await self._get_image_url_and_hash_from_tag(image_tag) - @staticmethod - async def _get_image_url_from_hash(image_hash: str) -> URL: + async def _get_image_url_from_hash(self, image_hash: str) -> URL: async with aiohttp.ClientSession() as session: async with session.get( f"https://registry.golem.network/v1/image/info", - params={"hash": image_hash, "count": "true"}, + params={"hash": image_hash, "count": str(self._registry_stats).lower()}, ) as response: response_data = await response.json() @@ -174,12 +174,11 @@ async def _get_image_url_from_hash(image_hash: str) -> URL: else: raise RegistryRequestError("Can't access Golem Registry for image lookup!") - @staticmethod - async def _get_image_url_and_hash_from_tag(image_tag: str) -> Tuple[URL, str]: + async def _get_image_url_and_hash_from_tag(self, image_tag: str) -> Tuple[URL, str]: async with aiohttp.ClientSession() as session: async with session.get( f"https://registry.golem.network/v1/image/info", - params={"tag": image_tag, "count": "true"}, + params={"tag": image_tag, "count": str(self._registry_stats).lower()}, ) as response: response_data = await response.json() diff --git a/ray_on_golem/server/services/ray.py b/ray_on_golem/server/services/ray.py index ce5d83e0..6f8d2919 100644 --- a/ray_on_golem/server/services/ray.py +++ b/ray_on_golem/server/services/ray.py @@ -35,11 +35,15 @@ def __init__(self, golem_service: GolemService, tmp_path: Path): self._ssh_public_key_path: Optional[Path] = None async def shutdown(self) -> None: + logger.info("Stopping RayService...") + await self._stop_head_node_to_webserver_tunel() async with self._nodes_lock: if not self._nodes: - logger.info(f"No need to destroy activities, as no activities are running") + logger.info( + "Stopping RayService done, no need to destroy activities, as no activities are running" + ) return logger.info(f"Destroying {len(self._nodes)} activities...") @@ -51,6 +55,8 @@ async def shutdown(self) -> None: self._nodes.clear() + logger.info("Stopping RayService done") + async def create_cluster_on_golem(self, provider_config: CreateClusterRequestData) -> None: self._ssh_private_key_path = Path(provider_config.ssh_private_key) self._ssh_public_key_path = self._ssh_private_key_path.with_suffix(".pub") diff --git a/ray_on_golem/server/services/yagna.py b/ray_on_golem/server/services/yagna.py index 173f292c..53fa2177 100644 --- a/ray_on_golem/server/services/yagna.py +++ b/ray_on_golem/server/services/yagna.py @@ -1,6 +1,7 @@ import asyncio import json import logging +import os from asyncio.subprocess import Process from pathlib import Path from typing import Optional @@ -65,6 +66,7 @@ async def _run_yagna_service(self) -> None: "run", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL, + preexec_fn=os.setpgrp, # https://stackoverflow.com/a/5446982/1993670 ) is_running = await self._wait_for_yagna_api() diff --git a/ray_on_golem/server/settings.py b/ray_on_golem/server/settings.py index 1b3ea232..4e7144ff 100644 --- a/ray_on_golem/server/settings.py +++ b/ray_on_golem/server/settings.py @@ -11,7 +11,7 @@ LOGGING_CONFIG = { "version": 1, - "disable_existing_loggers": True, + "disable_existing_loggers": False, "handlers": { "console": { "class": "logging.StreamHandler", diff --git a/ray_on_golem/server/views.py b/ray_on_golem/server/views.py index eb84e719..3cf22495 100644 --- a/ray_on_golem/server/views.py +++ b/ray_on_golem/server/views.py @@ -1,12 +1,12 @@ import asyncio import logging -import sys from aiohttp import web from ray_on_golem.server import models, settings from ray_on_golem.server.models import ShutdownState from ray_on_golem.server.services import RayService +from ray_on_golem.utils import raise_graceful_exit logger = logging.getLogger(__name__) @@ -187,7 +187,7 @@ async def self_shutdown(request): shutdown_seconds = int(settings.RAY_ON_GOLEM_SHUTDOWN_DELAY.total_seconds()) logger.info(f"Received a self-shutdown request, exiting in {shutdown_seconds} seconds...") loop = asyncio.get_event_loop() - loop.call_later(shutdown_seconds, sys.exit) + loop.call_later(shutdown_seconds, raise_graceful_exit) response_data = models.SelfShutdownResponseData(shutdown_state=shutdown_state) diff --git a/ray_on_golem/utils.py b/ray_on_golem/utils.py index 84da2e19..6c4d5b55 100644 --- a/ray_on_golem/utils.py +++ b/ray_on_golem/utils.py @@ -5,6 +5,8 @@ from pathlib import Path from typing import Dict, Optional +from aiohttp.web_runner import GracefulExit + from ray_on_golem.exceptions import RayOnGolemError @@ -68,3 +70,7 @@ async def start_ssh_reverse_tunel_process( ) return process + + +def raise_graceful_exit() -> None: + raise GracefulExit()