From 25b8bba3e2fceaa518e847dda9ff499ced665618 Mon Sep 17 00:00:00 2001 From: Lucjan Dudek Date: Thu, 2 Nov 2023 09:43:37 +0100 Subject: [PATCH 01/11] Refactor GolemService to support network stats --- ray_on_golem/server/services/golem/golem.py | 206 +----------------- .../services/golem/helpers/demand_config.py | 93 ++++++++ .../services/golem/helpers/manager_stack.py | 63 ++++++ .../server/services/golem/manager_stack.py | 59 +++++ .../server/services/golem/network_stats.py | 105 +++++++++ 5 files changed, 332 insertions(+), 194 deletions(-) create mode 100644 ray_on_golem/server/services/golem/helpers/demand_config.py create mode 100644 ray_on_golem/server/services/golem/helpers/manager_stack.py create mode 100644 ray_on_golem/server/services/golem/manager_stack.py create mode 100644 ray_on_golem/server/services/golem/network_stats.py diff --git a/ray_on_golem/server/services/golem/golem.py b/ray_on_golem/server/services/golem/golem.py index 59993c75..543f6361 100644 --- a/ray_on_golem/server/services/golem/golem.py +++ b/ray_on_golem/server/services/golem/golem.py @@ -1,105 +1,44 @@ import asyncio -import base64 import hashlib -import json import logging -import platform from collections import defaultdict -from dataclasses import dataclass from datetime import timedelta from functools import partial from pathlib import Path -from typing import AsyncIterator, Dict, List, Optional, Tuple +from typing import AsyncIterator, Dict, Optional, Tuple -import aiohttp -import ray from golem.managers import ( - ActivityManager, AddChosenPaymentPlatform, - AgreementManager, BlacklistProviderIdPlugin, Buffer, DefaultAgreementManager, DefaultProposalManager, - DemandManager, - LinearAverageCostPricing, MapScore, NegotiatingPlugin, PayAllPaymentManager, - PaymentManager, - ProposalManager, - ProposalManagerPlugin, - ProposalScorer, RefreshingDemandManager, - RejectIfCostsExceeds, ScoringBuffer, WorkContext, ) -from golem.managers.proposal.plugins.linear_coeffs import LinearCoeffsCost -from golem.node import SUBNET, GolemNode -from golem.payload import ManifestVmPayload, Payload, constraint, prop +from golem.node import GolemNode from golem.resources import Activity, Network, ProposalData -from pydantic import BaseModel, Field from yarl import URL -from ray_on_golem.server.exceptions import RayOnGolemServerError, RegistryRequestError -from ray_on_golem.server.models import DemandConfigData, NodeConfigData -from ray_on_golem.server.services.golem.manifest import get_manifest +from ray_on_golem.server.models import NodeConfigData +from ray_on_golem.server.services.golem.helpers.demand_config import DemandConfigHelper +from ray_on_golem.server.services.golem.helpers.manager_stack import ManagerStackNodeConfigHelper +from ray_on_golem.server.services.golem.manager_stack import ManagerStack from ray_on_golem.server.services.golem.provider_data import PROVIDERS_BLACKLIST, PROVIDERS_SCORED from ray_on_golem.server.services.utils import get_ssh_command logger = logging.getLogger(__name__) -class ManagerStack(BaseModel): - payment_manager: Optional[PaymentManager] - demand_manager: Optional[DemandManager] - proposal_manager: Optional[ProposalManager] - agreement_manager: Optional[AgreementManager] - activity_manager: Optional[ActivityManager] - extra_proposal_plugins: List[ProposalManagerPlugin] = Field(default_factory=list) - extra_proposal_scorers: List[ProposalScorer] = Field(default_factory=list) - - class Config: - arbitrary_types_allowed = True - - @property - def _managers(self): - return [ - self.payment_manager, - self.demand_manager, - self.proposal_manager, - self.agreement_manager, - self.activity_manager, - ] - - async def start(self) -> None: - logger.info("Starting stack managers...") - - for manager in self._managers: - if manager is not None: - await manager.start() - - logger.info("Starting stack managers done") - - async def stop(self) -> None: - logger.info("Stopping stack managers...") - - for manager in reversed(self._managers): - if manager is not None: - try: - await manager.stop() - except Exception: - logger.exception(f"{manager} stop failed!") - - logger.info("Stopping stack managers done") - - class GolemService: def __init__(self, websocat_path: Path, registry_stats: bool): self._websocat_path = websocat_path - self._registry_stats = registry_stats + self._demand_config_helper: DemandConfigHelper = DemandConfigHelper(registry_stats) self._golem: Optional[GolemNode] = None self._network: Optional[Network] = None self._yagna_appkey: Optional[str] = None @@ -169,10 +108,12 @@ async def _create_stack( ) -> ManagerStack: stack = ManagerStack() - payload = await self._get_payload_from_demand_config(node_config.demand) + payload = await self._demand_config_helper.get_payload_from_demand_config( + node_config.demand + ) - self._apply_cost_management_avg_usage(stack, node_config) - self._apply_cost_management_hard_limits(stack, node_config) + ManagerStackNodeConfigHelper.apply_cost_management_avg_usage(stack, node_config) + ManagerStackNodeConfigHelper.apply_cost_management_hard_limits(stack, node_config) stack.payment_manager = PayAllPaymentManager(self._golem, budget=budget, network=network) stack.demand_manager = RefreshingDemandManager( @@ -209,129 +150,6 @@ async def _create_stack( return stack - async def _get_payload_from_demand_config(self, demand_config: DemandConfigData) -> Payload: - @dataclass - class CustomManifestVmPayload(ManifestVmPayload): - subnet_constraint: str = constraint("golem.node.debug.subnet", "=", default=SUBNET) - debit_notes_accept_timeout: int = prop( - "golem.com.payment.debit-notes.accept-timeout?", default=240 - ) - - image_url, image_hash = await self._get_image_url_and_hash(demand_config) - - manifest = get_manifest(image_url, image_hash) - manifest = base64.b64encode(json.dumps(manifest).encode("utf-8")).decode("utf-8") - - params = demand_config.dict(exclude={"image_hash", "image_tag"}) - params["manifest"] = manifest - - payload = CustomManifestVmPayload(**params) - - return payload - - async def _get_image_url_and_hash(self, demand_config: DemandConfigData) -> Tuple[URL, str]: - image_tag = demand_config.image_tag - image_hash = demand_config.image_hash - - if image_tag is not None and image_hash is not None: - raise RayOnGolemServerError( - "Only one of `image_tag` and `image_hash` parameter should be defined!" - ) - - if image_hash is not None: - image_url = await self._get_image_url_from_hash(image_hash) - return image_url, image_hash - - if image_tag is None: - python_version = platform.python_version() - ray_version = ray.__version__ - image_tag = f"golem/ray-on-golem:py{python_version}-ray{ray_version}" - - return await self._get_image_url_and_hash_from_tag(image_tag) - - async def _get_image_url_from_hash(self, image_hash: str) -> URL: - async with aiohttp.ClientSession() as session: - async with session.get( - f"https://registry.golem.network/v1/image/info", - params={"hash": image_hash, "count": str(self._registry_stats).lower()}, - ) as response: - response_data = await response.json() - - if response.status == 200: - return URL(response_data["http"]) - elif response.status == 404: - raise RegistryRequestError(f"Image hash `{image_hash}` does not exist") - else: - raise RegistryRequestError("Can't access Golem Registry for image lookup!") - - async def _get_image_url_and_hash_from_tag(self, image_tag: str) -> Tuple[URL, str]: - async with aiohttp.ClientSession() as session: - async with session.get( - f"https://registry.golem.network/v1/image/info", - params={"tag": image_tag, "count": str(self._registry_stats).lower()}, - ) as response: - response_data = await response.json() - - if response.status == 200: - return response_data["http"], response_data["sha3"] - elif response.status == 404: - raise RegistryRequestError(f"Image tag `{image_tag}` does not exist") - else: - raise RegistryRequestError("Can't access Golem Registry for image lookup!") - - def _apply_cost_management_avg_usage( - self, stack: ManagerStack, node_config: NodeConfigData - ) -> None: - cost_management = node_config.cost_management - - if cost_management is None or not cost_management.is_average_usage_cost_enabled(): - logger.debug("Cost management based on average usage is not enabled") - return - - linear_average_cost = LinearAverageCostPricing( - average_cpu_load=node_config.cost_management.average_cpu_load, - average_duration=timedelta( - minutes=node_config.cost_management.average_duration_minutes - ), - ) - - stack.extra_proposal_scorers.append( - MapScore(linear_average_cost, normalize=True, normalize_flip=True), - ) - - max_average_usage_cost = node_config.cost_management.max_average_usage_cost - if max_average_usage_cost is not None: - stack.extra_proposal_plugins.append( - RejectIfCostsExceeds(max_average_usage_cost, linear_average_cost), - ) - logger.debug("Cost management based on average usage applied with max limits") - else: - logger.debug("Cost management based on average usage applied without max limits") - - def _apply_cost_management_hard_limits( - self, stack: ManagerStack, node_config: NodeConfigData - ) -> None: - # TODO: Consider creating RejectIfCostsExceeds variant for multiple values - proposal_plugins = [] - field_names = { - "max_initial_price": "price_initial", - "max_cpu_sec_price": "price_cpu_sec", - "max_duration_sec_price": "price_duration_sec", - } - - for cost_field_name, coef_field_name in field_names.items(): - cost_max_value = getattr(node_config.cost_management, cost_field_name, None) - if cost_max_value is not None: - proposal_plugins.append( - RejectIfCostsExceeds(cost_max_value, LinearCoeffsCost(coef_field_name)), - ) - - if proposal_plugins: - stack.extra_proposal_plugins.extend(proposal_plugins) - logger.debug("Cost management based on max limits applied") - else: - logger.debug("Cost management based on max limits is not enabled") - def _score_with_provider_data( self, proposal_data: ProposalData, network: str ) -> Optional[float]: diff --git a/ray_on_golem/server/services/golem/helpers/demand_config.py b/ray_on_golem/server/services/golem/helpers/demand_config.py new file mode 100644 index 00000000..b4e1e048 --- /dev/null +++ b/ray_on_golem/server/services/golem/helpers/demand_config.py @@ -0,0 +1,93 @@ +import base64 +import json +import logging +import platform +from dataclasses import dataclass +from typing import Tuple + +import aiohttp +import ray +from golem.node import SUBNET +from golem.payload import ManifestVmPayload, Payload, constraint, prop +from yarl import URL + +from ray_on_golem.server.exceptions import RayOnGolemServerError, RegistryRequestError +from ray_on_golem.server.models import DemandConfigData +from ray_on_golem.server.services.golem.manifest import get_manifest + +logger = logging.getLogger(__name__) + + +class DemandConfigHelper: + def __init__(self, registry_stats: bool): + self._registry_stats = registry_stats + + async def get_payload_from_demand_config(self, demand_config: DemandConfigData) -> Payload: + @dataclass + class CustomManifestVmPayload(ManifestVmPayload): + subnet_constraint: str = constraint("golem.node.debug.subnet", "=", default=SUBNET) + debit_notes_accept_timeout: int = prop( + "golem.com.payment.debit-notes.accept-timeout?", default=240 + ) + + image_url, image_hash = await self._get_image_url_and_hash(demand_config) + + manifest = get_manifest(image_url, image_hash) + manifest = base64.b64encode(json.dumps(manifest).encode("utf-8")).decode("utf-8") + + params = demand_config.dict(exclude={"image_hash", "image_tag"}) + params["manifest"] = manifest + + payload = CustomManifestVmPayload(**params) + + return payload + + async def _get_image_url_and_hash(self, demand_config: DemandConfigData) -> Tuple[URL, str]: + image_tag = demand_config.image_tag + image_hash = demand_config.image_hash + + if image_tag is not None and image_hash is not None: + raise RayOnGolemServerError( + "Only one of `image_tag` and `image_hash` parameter should be defined!" + ) + + if image_hash is not None: + image_url = await self._get_image_url_from_hash(image_hash) + return image_url, image_hash + + if image_tag is None: + python_version = platform.python_version() + ray_version = ray.__version__ + image_tag = f"golem/ray-on-golem:py{python_version}-ray{ray_version}" + + return await self._get_image_url_and_hash_from_tag(image_tag) + + async def _get_image_url_from_hash(self, image_hash: str) -> URL: + async with aiohttp.ClientSession() as session: + async with session.get( + f"https://registry.golem.network/v1/image/info", + params={"hash": image_hash, "count": str(self._registry_stats).lower()}, + ) as response: + response_data = await response.json() + + if response.status == 200: + return URL(response_data["http"]) + elif response.status == 404: + raise RegistryRequestError(f"Image hash `{image_hash}` does not exist") + else: + raise RegistryRequestError("Can't access Golem Registry for image lookup!") + + async def _get_image_url_and_hash_from_tag(self, image_tag: str) -> Tuple[URL, str]: + async with aiohttp.ClientSession() as session: + async with session.get( + f"https://registry.golem.network/v1/image/info", + params={"tag": image_tag, "count": str(self._registry_stats).lower()}, + ) as response: + response_data = await response.json() + + if response.status == 200: + return response_data["http"], response_data["sha3"] + elif response.status == 404: + raise RegistryRequestError(f"Image tag `{image_tag}` does not exist") + else: + raise RegistryRequestError("Can't access Golem Registry for image lookup!") diff --git a/ray_on_golem/server/services/golem/helpers/manager_stack.py b/ray_on_golem/server/services/golem/helpers/manager_stack.py new file mode 100644 index 00000000..d967694b --- /dev/null +++ b/ray_on_golem/server/services/golem/helpers/manager_stack.py @@ -0,0 +1,63 @@ +import logging +from datetime import timedelta + +from golem.managers import LinearAverageCostPricing, MapScore, RejectIfCostsExceeds +from golem.managers.proposal.plugins.linear_coeffs import LinearCoeffsCost + +from ray_on_golem.server.models import NodeConfigData +from ray_on_golem.server.services.golem.manager_stack import ManagerStack + +logger = logging.getLogger(__name__) + + +class ManagerStackNodeConfigHelper: + @staticmethod + def apply_cost_management_avg_usage(stack: ManagerStack, node_config: NodeConfigData) -> None: + cost_management = node_config.cost_management + + if cost_management is None or not cost_management.is_average_usage_cost_enabled(): + logger.debug("Cost management based on average usage is not enabled") + return + + linear_average_cost = LinearAverageCostPricing( + average_cpu_load=node_config.cost_management.average_cpu_load, + average_duration=timedelta( + minutes=node_config.cost_management.average_duration_minutes + ), + ) + + stack.extra_proposal_scorers.append( + MapScore(linear_average_cost, normalize=True, normalize_flip=True), + ) + + max_average_usage_cost = node_config.cost_management.max_average_usage_cost + if max_average_usage_cost is not None: + stack.extra_proposal_plugins.append( + RejectIfCostsExceeds(max_average_usage_cost, linear_average_cost), + ) + logger.debug("Cost management based on average usage applied with max limits") + else: + logger.debug("Cost management based on average usage applied without max limits") + + @staticmethod + def apply_cost_management_hard_limits(stack: ManagerStack, node_config: NodeConfigData) -> None: + # TODO: Consider creating RejectIfCostsExceeds variant for multiple values + proposal_plugins = [] + field_names = { + "max_initial_price": "price_initial", + "max_cpu_sec_price": "price_cpu_sec", + "max_duration_sec_price": "price_duration_sec", + } + + for cost_field_name, coef_field_name in field_names.items(): + cost_max_value = getattr(node_config.cost_management, cost_field_name, None) + if cost_max_value is not None: + proposal_plugins.append( + RejectIfCostsExceeds(cost_max_value, LinearCoeffsCost(coef_field_name)), + ) + + if proposal_plugins: + stack.extra_proposal_plugins.extend(proposal_plugins) + logger.debug("Cost management based on max limits applied") + else: + logger.debug("Cost management based on max limits is not enabled") diff --git a/ray_on_golem/server/services/golem/manager_stack.py b/ray_on_golem/server/services/golem/manager_stack.py new file mode 100644 index 00000000..2ebe4160 --- /dev/null +++ b/ray_on_golem/server/services/golem/manager_stack.py @@ -0,0 +1,59 @@ +import logging +from typing import List, Optional + +from golem.managers import ( + ActivityManager, + AgreementManager, + DemandManager, + PaymentManager, + ProposalManager, + ProposalManagerPlugin, + ProposalScorer, +) +from pydantic import BaseModel, Field + +logger = logging.getLogger(__name__) + + +class ManagerStack(BaseModel): + payment_manager: Optional[PaymentManager] + demand_manager: Optional[DemandManager] + proposal_manager: Optional[ProposalManager] + agreement_manager: Optional[AgreementManager] + activity_manager: Optional[ActivityManager] + extra_proposal_plugins: List[ProposalManagerPlugin] = Field(default_factory=list) + extra_proposal_scorers: List[ProposalScorer] = Field(default_factory=list) + + class Config: + arbitrary_types_allowed = True + + @property + def _managers(self): + return [ + self.payment_manager, + self.demand_manager, + self.proposal_manager, + self.agreement_manager, + self.activity_manager, + ] + + async def start(self) -> None: + logger.info("Starting stack managers...") + + for manager in self._managers: + if manager is not None: + await manager.start() + + logger.info("Starting stack managers done") + + async def stop(self) -> None: + logger.info("Stopping stack managers...") + + for manager in reversed(self._managers): + if manager is not None: + try: + await manager.stop() + except Exception: + logger.exception(f"{manager} stop failed!") + + logger.info("Stopping stack managers done") diff --git a/ray_on_golem/server/services/golem/network_stats.py b/ray_on_golem/server/services/golem/network_stats.py new file mode 100644 index 00000000..e445345c --- /dev/null +++ b/ray_on_golem/server/services/golem/network_stats.py @@ -0,0 +1,105 @@ +import logging +from datetime import timedelta +from functools import partial +from typing import Optional + +from golem.managers import ( + AddChosenPaymentPlatform, + BlacklistProviderIdPlugin, + Buffer, + DefaultAgreementManager, + DefaultProposalManager, + MapScore, + NegotiatingPlugin, + PayAllPaymentManager, + RefreshingDemandManager, + ScoringBuffer, +) +from golem.node import GolemNode + +from ray_on_golem.server.models import NodeConfigData +from ray_on_golem.server.services.golem.helpers.demand_config import DemandConfigHelper +from ray_on_golem.server.services.golem.helpers.manager_stack import ManagerStackNodeConfigHelper +from ray_on_golem.server.services.golem.manager_stack import ManagerStack +from ray_on_golem.server.services.golem.provider_data import PROVIDERS_BLACKLIST + +logger = logging.getLogger(__name__) + + +class GolemNetworkStatsService: + def __init__(self, registry_stats: bool): + self._registry_stats = registry_stats + + self._demand_config_helper: DemandConfigHelper = DemandConfigHelper(registry_stats) + + self._golem: Optional[GolemNode] = None + self._yagna_appkey: Optional[str] = None + self._stack: Optional[ManagerStack] = None + + async def init(self, yagna_appkey: str) -> None: + logger.info("Starting GolemNetworkStatsService...") + + self._golem = GolemNode(app_key=yagna_appkey) + self._yagna_appkey = yagna_appkey + await self._golem.start() + + logger.info("Starting GolemNetworkStatsService done") + + async def shutdown(self) -> None: + logger.info("Stopping GolemNetworkStatsService...") + + if self._stack is not None: + self._stack.stop() + self._stack = None + + await self._golem.aclose() + self._golem = None + + logger.info("Stopping GolemNetworkStatsService done") + + async def _create_stack( + self, node_config: NodeConfigData, budget: float, network: str + ) -> ManagerStack: + stack = ManagerStack() + + payload = await self._demand_config_helper.get_payload_from_demand_config( + node_config.demand + ) + + ManagerStackNodeConfigHelper.apply_cost_management_avg_usage(stack, node_config) + ManagerStackNodeConfigHelper.apply_cost_management_hard_limits(stack, node_config) + + stack.payment_manager = PayAllPaymentManager(self._golem, budget=budget, network=network) + stack.demand_manager = RefreshingDemandManager( + self._golem, + stack.payment_manager.get_allocation, + payload, + demand_expiration_timeout=timedelta(hours=8), + ) + stack.proposal_manager = DefaultProposalManager( + self._golem, + stack.demand_manager.get_initial_proposal, + plugins=( + BlacklistProviderIdPlugin(PROVIDERS_BLACKLIST.get(network, set())), + *stack.extra_proposal_plugins, + ScoringBuffer( + min_size=50, + max_size=1000, + fill_at_start=True, + proposal_scorers=( + *stack.extra_proposal_scorers, + MapScore(partial(self._score_with_provider_data, network=network)), + ), + update_interval=timedelta(seconds=10), + ), + NegotiatingPlugin( + proposal_negotiators=(AddChosenPaymentPlatform(),), + ), + Buffer(min_size=0, max_size=4, fill_concurrency_size=4), + ), + ) + stack.agreement_manager = DefaultAgreementManager( + self._golem, stack.proposal_manager.get_draft_proposal + ) + + return stack From f96e810f06fb57a65705a3a32566fa0696e2a519 Mon Sep 17 00:00:00 2001 From: Lucjan Dudek Date: Tue, 7 Nov 2023 11:07:29 +0100 Subject: [PATCH 02/11] Added stat service counter and stat negotiatior --- ray_on_golem/server/run.py | 101 ++++++--- .../server/services/golem/network_stats.py | 199 +++++++++++++++--- 2 files changed, 244 insertions(+), 56 deletions(-) diff --git a/ray_on_golem/server/run.py b/ray_on_golem/server/run.py index d1d34e19..4a5ade51 100644 --- a/ray_on_golem/server/run.py +++ b/ray_on_golem/server/run.py @@ -1,11 +1,14 @@ import argparse +import asyncio import logging import logging.config +from contextlib import asynccontextmanager from aiohttp import web from ray_on_golem.server.middlewares import error_middleware, trace_id_middleware from ray_on_golem.server.services import GolemService, RayService, YagnaService +from ray_on_golem.server.services.golem.network_stats import GolemNetworkStatsService from ray_on_golem.server.settings import ( LOGGING_CONFIG, RAY_ON_GOLEM_SHUTDOWN_DEADLINE, @@ -21,29 +24,45 @@ def parse_sys_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Ray on Golem's webserver.") parser.add_argument( + "--registry-stats", + action="store_true", + help="flag to enable collection of Golem Registry stats about resolved images, default: %(default)s", + ) + parser.add_argument( + "--no-registry-stats", + action="store_false", + dest="registry_stats", + ) + subparsers = parser.add_subparsers(dest="command") + + webserver_parser = subparsers.add_parser("webserver") + webserver_parser.add_argument( "-p", "--port", type=int, default=4578, help="port for Ray on Golem's webserver to listen on, default: %(default)s", ) - parser.add_argument( + webserver_parser.add_argument( "--self-shutdown", action="store_true", help="flag to enable self-shutdown after last node termination, default: %(default)s", ) - parser.add_argument("--no-self-shutdown", action="store_false", dest="self_shutdown") - parser.add_argument( - "--registry-stats", + webserver_parser.add_argument("--no-self-shutdown", action="store_false", dest="self_shutdown") + webserver_parser.set_defaults(self_shutdown=False, registry_stats=True) + + stats_parser = subparsers.add_parser("stats") + stats_parser.add_argument( + "--enable-logging", action="store_true", - help="flag to enable collection of Golem Registry stats about resolved images, default: %(default)s", + dest="enable_logging", + help="flag to enable logging, default: %(default)s", ) - parser.add_argument( - "--no-registry-stats", - action="store_false", - dest="registry_stats", - ) - parser.set_defaults(self_shutdown=False, registry_stats=True) + # TODO add config arg + # stats_parser.add_argument( + # "CLUSTER_CONFIG_FILE", + # ) + return parser.parse_args() @@ -128,29 +147,55 @@ async def ray_service_ctx(app: web.Application) -> None: await ray_service.shutdown() +@asynccontextmanager +async def golem_network_stats_service(registry_stats: bool) -> GolemNetworkStatsService: + golem_network_stats_service: GolemNetworkStatsService = GolemNetworkStatsService(registry_stats) + yagna_service = YagnaService( + yagna_path=YAGNA_PATH, + ) + + await yagna_service.init() + await golem_network_stats_service.init(yagna_appkey=yagna_service.yagna_appkey) + + yield golem_network_stats_service + + await golem_network_stats_service.shutdown() + await yagna_service.shutdown() + + +async def stats_main(args: argparse.Namespace): + async with golem_network_stats_service(args.registry_stats) as stats_service: + await stats_service.run() + + def main(): prepare_tmp_dir() args = parse_sys_args() - logging.config.dictConfig(LOGGING_CONFIG) - - app = create_application(args.port, args.self_shutdown, args.registry_stats) + if args.command == "webserver": + logging.config.dictConfig(LOGGING_CONFIG) + app = create_application(args.port, args.self_shutdown, args.registry_stats) - logger.info( - "Starting server... {}".format(", ".join(f"{k}={v}" for k, v in args.__dict__.items())) - ) - - try: - web.run_app( - app, - port=app["port"], - print=None, - shutdown_timeout=RAY_ON_GOLEM_SHUTDOWN_DEADLINE.total_seconds(), + logger.info( + "Starting server... {}".format(", ".join(f"{k}={v}" for k, v in args.__dict__.items())) ) - except Exception: - logger.info("Server unexpectedly died, bye!") - else: - logger.info("Stopping server done, bye!") + + try: + web.run_app( + app, + port=app["port"], + print=None, + shutdown_timeout=RAY_ON_GOLEM_SHUTDOWN_DEADLINE.total_seconds(), + ) + except Exception: + logger.info("Server unexpectedly died, bye!") + else: + logger.info("Stopping server done, bye!") + + elif args.command == "stats": + if args.enable_logging: + logging.config.dictConfig(LOGGING_CONFIG) + asyncio.run(stats_main(args)) if __name__ == "__main__": diff --git a/ray_on_golem/server/services/golem/network_stats.py b/ray_on_golem/server/services/golem/network_stats.py index e445345c..c71e4c0f 100644 --- a/ray_on_golem/server/services/golem/network_stats.py +++ b/ray_on_golem/server/services/golem/network_stats.py @@ -1,23 +1,29 @@ +import asyncio import logging +import re +from collections import defaultdict from datetime import timedelta -from functools import partial -from typing import Optional +from typing import Optional, Sequence from golem.managers import ( AddChosenPaymentPlatform, BlacklistProviderIdPlugin, Buffer, - DefaultAgreementManager, DefaultProposalManager, - MapScore, NegotiatingPlugin, PayAllPaymentManager, + ProposalManagerPlugin, RefreshingDemandManager, ScoringBuffer, ) +from golem.managers.base import ProposalNegotiator from golem.node import GolemNode +from golem.payload import PayloadSyntaxParser +from golem.resources import DemandData, Proposal +from golem.resources.proposal.exceptions import ProposalRejected +from ya_market import ApiException -from ray_on_golem.server.models import NodeConfigData +from ray_on_golem.server.models import CostManagementData, DemandConfigData, NodeConfigData from ray_on_golem.server.services.golem.helpers.demand_config import DemandConfigHelper from ray_on_golem.server.services.golem.helpers.manager_stack import ManagerStackNodeConfigHelper from ray_on_golem.server.services.golem.manager_stack import ManagerStack @@ -26,15 +32,96 @@ logger = logging.getLogger(__name__) +class ProposalCounterPlugin(ProposalManagerPlugin): + def __init__(self) -> None: + self._count = 0 + + async def get_proposal(self) -> Proposal: + while True: + proposal: Proposal = await self._get_proposal() + self._count += 1 + return proposal + + +class StatsNegotiatingPlugin(NegotiatingPlugin): + def __init__( + self, + demand_offer_parser: Optional[PayloadSyntaxParser] = None, + proposal_negotiators: Optional[Sequence[ProposalNegotiator]] = None, + *args, + **kwargs, + ) -> None: + super().__init__(demand_offer_parser, proposal_negotiators, *args, **kwargs) + self.fails = defaultdict(int) + + async def get_proposal(self) -> Proposal: + while True: + proposal = await self._get_proposal() + + demand_data = await self._get_demand_data_from_proposal(proposal) + + try: + negotiated_proposal = await self._negotiate_proposal(demand_data, proposal) + return negotiated_proposal + except ProposalRejected as err: + self.fails[err.reason] += 1 + except Exception as err: + self.fails[str(err)] += 1 + + async def _send_demand_proposal( + self, offer_proposal: Proposal, demand_data: DemandData + ) -> Proposal: + try: + return await offer_proposal.respond( + demand_data.properties, + demand_data.constraints, + ) + except ApiException as e: + error_msg = re.sub(r"\[.*?\]", "[***]", str(e.body)) + raise RuntimeError(f"Failed to send proposal response! {e.status}: {error_msg}") from e + except asyncio.TimeoutError as e: + raise RuntimeError(f"Failed to send proposal response! Request timed out") from e + + +class StatsPluginFactory: + _stats_negotiating_plugin: StatsNegotiatingPlugin + + def __init__(self) -> None: + self._counters = {} + + def print_gathered_stats(self) -> None: + print("\nProposals count:") + [print(f"{tag}: {counter._count}") for tag, counter in self._counters.items()] + print("\nNegotiation errors:") + [ + print(f"{err}: {count}") + for err, count in sorted( + self._stats_negotiating_plugin.fails.items(), key=lambda item: item[1], reverse=True + ) + ] + + def create_negotiating_plugin(self) -> StatsNegotiatingPlugin: + self._stats_negotiating_plugin = StatsNegotiatingPlugin( + proposal_negotiators=(AddChosenPaymentPlatform(),), + ) + return self._stats_negotiating_plugin + + def create_counter_plugin(self, tag: str) -> ProposalCounterPlugin: + self._counters[tag] = ProposalCounterPlugin() + return self._counters[tag] + + class GolemNetworkStatsService: - def __init__(self, registry_stats: bool): + def __init__(self, registry_stats: bool, run_time_minutes=5) -> None: self._registry_stats = registry_stats + self._run_time = timedelta(minutes=run_time_minutes) self._demand_config_helper: DemandConfigHelper = DemandConfigHelper(registry_stats) self._golem: Optional[GolemNode] = None self._yagna_appkey: Optional[str] = None - self._stack: Optional[ManagerStack] = None + + self._stats_plugin_factory = StatsPluginFactory() async def init(self, yagna_appkey: str) -> None: logger.info("Starting GolemNetworkStatsService...") @@ -48,15 +135,61 @@ async def init(self, yagna_appkey: str) -> None: async def shutdown(self) -> None: logger.info("Stopping GolemNetworkStatsService...") - if self._stack is not None: - self._stack.stop() - self._stack = None - await self._golem.aclose() self._golem = None logger.info("Stopping GolemNetworkStatsService done") + async def run(self) -> None: + # FIXME pass args from yaml + network: str = "goerli" + # network: str = "polygon" + budget: int = 1.0 + node_config: NodeConfigData = NodeConfigData( + demand=DemandConfigData( + image_tag="blueshade/ray-on-golem:0.2.0-py3.10.13-ray2.7.1", + capabilities=["vpn", "inet", "manifest-support"], + min_mem_gib=0.0, + min_cpu_threads=0, + min_storage_gib=0.0, + ), + cost_management=CostManagementData( + average_cpu_load=0.8, + average_duration_minutes=20, + max_average_usage_cost=1.5, + max_initial_price=0.5, + max_cpu_sec_price=0.0005, + max_duration_sec_price=0.0005, + ), + ) + + stack = await self._create_stack(node_config, budget, network) + await stack.start() + + consume_proposals_task = asyncio.create_task(self._consume_draft_proposals(stack)) + await asyncio.wait([consume_proposals_task], timeout=self._run_time.total_seconds()) + + consume_proposals_task.cancel() + await consume_proposals_task + + await stack.stop() + self._stats_plugin_factory.print_gathered_stats() + + async def _consume_draft_proposals(self, stack: ManagerStack) -> None: + drafts = [] + try: + while True: + draft = await stack.proposal_manager.get_draft_proposal() + drafts.append(draft) + except asyncio.CancelledError: + return + finally: + await asyncio.gather( + # FIXME better reason message + *[draft.reject(reason="No more needed") for draft in drafts], + return_exceptions=True, + ) + async def _create_stack( self, node_config: NodeConfigData, budget: float, network: str ) -> ManagerStack: @@ -76,30 +209,40 @@ async def _create_stack( payload, demand_expiration_timeout=timedelta(hours=8), ) - stack.proposal_manager = DefaultProposalManager( - self._golem, - stack.demand_manager.get_initial_proposal, - plugins=( - BlacklistProviderIdPlugin(PROVIDERS_BLACKLIST.get(network, set())), - *stack.extra_proposal_plugins, + + plugins = [ + self._stats_plugin_factory.create_counter_plugin("Initial"), + BlacklistProviderIdPlugin(PROVIDERS_BLACKLIST.get(network, set())), + self._stats_plugin_factory.create_counter_plugin("Not blacklisted"), + ] + + for idx, plugin in enumerate(stack.extra_proposal_plugins): + plugins.append(plugin) + plugins.append( + self._stats_plugin_factory.create_counter_plugin( + f"Passed {plugin.__class__.__name__} {idx + 1}" + ) + ) + + plugins.extend( + [ ScoringBuffer( min_size=50, max_size=1000, fill_at_start=True, - proposal_scorers=( - *stack.extra_proposal_scorers, - MapScore(partial(self._score_with_provider_data, network=network)), - ), + proposal_scorers=(*stack.extra_proposal_scorers,), update_interval=timedelta(seconds=10), ), - NegotiatingPlugin( - proposal_negotiators=(AddChosenPaymentPlatform(),), - ), - Buffer(min_size=0, max_size=4, fill_concurrency_size=4), - ), + self._stats_plugin_factory.create_counter_plugin("Scored"), + self._stats_plugin_factory.create_negotiating_plugin(), + self._stats_plugin_factory.create_counter_plugin("Negotiated"), + Buffer(min_size=1, max_size=50, fill_concurrency_size=10), + ] ) - stack.agreement_manager = DefaultAgreementManager( - self._golem, stack.proposal_manager.get_draft_proposal + stack.proposal_manager = DefaultProposalManager( + self._golem, + stack.demand_manager.get_initial_proposal, + plugins=plugins, ) return stack From b583a77c1b810964ca060a329d38202c37fa271d Mon Sep 17 00:00:00 2001 From: Lucjan Dudek Date: Tue, 7 Nov 2023 12:34:16 +0100 Subject: [PATCH 03/11] Load config from yaml file --- ray_on_golem/server/run.py | 47 ++++++++++------ .../server/services/golem/network_stats.py | 53 +++++++------------ 2 files changed, 51 insertions(+), 49 deletions(-) diff --git a/ray_on_golem/server/run.py b/ray_on_golem/server/run.py index 4a5ade51..4825680f 100644 --- a/ray_on_golem/server/run.py +++ b/ray_on_golem/server/run.py @@ -4,6 +4,7 @@ import logging.config from contextlib import asynccontextmanager +import yaml from aiohttp import web from ray_on_golem.server.middlewares import error_middleware, trace_id_middleware @@ -23,16 +24,6 @@ def parse_sys_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Ray on Golem's webserver.") - parser.add_argument( - "--registry-stats", - action="store_true", - help="flag to enable collection of Golem Registry stats about resolved images, default: %(default)s", - ) - parser.add_argument( - "--no-registry-stats", - action="store_false", - dest="registry_stats", - ) subparsers = parser.add_subparsers(dest="command") webserver_parser = subparsers.add_parser("webserver") @@ -48,20 +39,39 @@ def parse_sys_args() -> argparse.Namespace: action="store_true", help="flag to enable self-shutdown after last node termination, default: %(default)s", ) + webserver_parser.add_argument( + "--registry-stats", + action="store_true", + help="flag to enable collection of Golem Registry stats about resolved images, default: %(default)s", + ) + webserver_parser.add_argument( + "--no-registry-stats", + action="store_false", + dest="registry_stats", + ) webserver_parser.add_argument("--no-self-shutdown", action="store_false", dest="self_shutdown") webserver_parser.set_defaults(self_shutdown=False, registry_stats=True) stats_parser = subparsers.add_parser("stats") + stats_parser.add_argument( + "CLUSTER_CONFIG_FILE", + type=argparse.FileType("r"), + help="Cluster config yaml", + ) + stats_parser.add_argument( + "-t", + "--run-time", + type=int, + dest="run_time", + default=5, + help="For how long in minutes to gather stats, default: %(default)s", + ) stats_parser.add_argument( "--enable-logging", action="store_true", dest="enable_logging", help="flag to enable logging, default: %(default)s", ) - # TODO add config arg - # stats_parser.add_argument( - # "CLUSTER_CONFIG_FILE", - # ) return parser.parse_args() @@ -164,8 +174,13 @@ async def golem_network_stats_service(registry_stats: bool) -> GolemNetworkStats async def stats_main(args: argparse.Namespace): - async with golem_network_stats_service(args.registry_stats) as stats_service: - await stats_service.run() + with args.CLUSTER_CONFIG_FILE as file: + config = yaml.safe_load(file.read()) + provider_config = config["provider"] + async with golem_network_stats_service( + provider_config["enable_registry_stats"] + ) as stats_service: + await stats_service.run(provider_config["parameters"], args.run_time) def main(): diff --git a/ray_on_golem/server/services/golem/network_stats.py b/ray_on_golem/server/services/golem/network_stats.py index c71e4c0f..28ee2ce0 100644 --- a/ray_on_golem/server/services/golem/network_stats.py +++ b/ray_on_golem/server/services/golem/network_stats.py @@ -3,7 +3,7 @@ import re from collections import defaultdict from datetime import timedelta -from typing import Optional, Sequence +from typing import Dict, Optional, Sequence from golem.managers import ( AddChosenPaymentPlatform, @@ -23,7 +23,7 @@ from golem.resources.proposal.exceptions import ProposalRejected from ya_market import ApiException -from ray_on_golem.server.models import CostManagementData, DemandConfigData, NodeConfigData +from ray_on_golem.server.models import NodeConfigData from ray_on_golem.server.services.golem.helpers.demand_config import DemandConfigHelper from ray_on_golem.server.services.golem.helpers.manager_stack import ManagerStackNodeConfigHelper from ray_on_golem.server.services.golem.manager_stack import ManagerStack @@ -112,9 +112,8 @@ def create_counter_plugin(self, tag: str) -> ProposalCounterPlugin: class GolemNetworkStatsService: - def __init__(self, registry_stats: bool, run_time_minutes=5) -> None: + def __init__(self, registry_stats: bool) -> None: self._registry_stats = registry_stats - self._run_time = timedelta(minutes=run_time_minutes) self._demand_config_helper: DemandConfigHelper = DemandConfigHelper(registry_stats) @@ -140,40 +139,28 @@ async def shutdown(self) -> None: logger.info("Stopping GolemNetworkStatsService done") - async def run(self) -> None: - # FIXME pass args from yaml - network: str = "goerli" - # network: str = "polygon" - budget: int = 1.0 - node_config: NodeConfigData = NodeConfigData( - demand=DemandConfigData( - image_tag="blueshade/ray-on-golem:0.2.0-py3.10.13-ray2.7.1", - capabilities=["vpn", "inet", "manifest-support"], - min_mem_gib=0.0, - min_cpu_threads=0, - min_storage_gib=0.0, - ), - cost_management=CostManagementData( - average_cpu_load=0.8, - average_duration_minutes=20, - max_average_usage_cost=1.5, - max_initial_price=0.5, - max_cpu_sec_price=0.0005, - max_duration_sec_price=0.0005, - ), - ) + async def run(self, provider_parameters: Dict, run_time_minutes: int) -> None: + network: str = provider_parameters["network"] + budget: int = provider_parameters["budget"] + node_config: NodeConfigData = NodeConfigData(**provider_parameters["node_config"]) stack = await self._create_stack(node_config, budget, network) await stack.start() + print("Gathering stats data...") consume_proposals_task = asyncio.create_task(self._consume_draft_proposals(stack)) - await asyncio.wait([consume_proposals_task], timeout=self._run_time.total_seconds()) - - consume_proposals_task.cancel() - await consume_proposals_task + try: + await asyncio.wait( + [consume_proposals_task], + timeout=timedelta(minutes=run_time_minutes).total_seconds(), + ) + finally: + consume_proposals_task.cancel() + await consume_proposals_task - await stack.stop() - self._stats_plugin_factory.print_gathered_stats() + await stack.stop() + print("Gathering stats data done") + self._stats_plugin_factory.print_gathered_stats() async def _consume_draft_proposals(self, stack: ManagerStack) -> None: drafts = [] @@ -220,7 +207,7 @@ async def _create_stack( plugins.append(plugin) plugins.append( self._stats_plugin_factory.create_counter_plugin( - f"Passed {plugin.__class__.__name__} {idx + 1}" + f"Passed {plugin.__class__.__name__} #{idx + 1}" ) ) From 6ae49c4e534d195b72cc3ee619dda1ed5805ed27 Mon Sep 17 00:00:00 2001 From: Lucjan Dudek Date: Tue, 7 Nov 2023 12:41:57 +0100 Subject: [PATCH 04/11] Fix ray-on-golem command on ray up --- ray_on_golem/provider/node_provider.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ray_on_golem/provider/node_provider.py b/ray_on_golem/provider/node_provider.py index 89f6a194..23683929 100644 --- a/ray_on_golem/provider/node_provider.py +++ b/ray_on_golem/provider/node_provider.py @@ -189,6 +189,7 @@ def _start_webserver( ) args = [ RAY_ON_GOLEM_PATH, + "webserver", "-p", str(port), "--registry-stats" if registry_stats else "--no-registry-stats", From 5c9339a65e3c6a9055eed8134a466ebacf6eaecc Mon Sep 17 00:00:00 2001 From: Lucjan Dudek Date: Tue, 7 Nov 2023 14:46:56 +0100 Subject: [PATCH 05/11] CR improvements --- ray_on_golem/server/run.py | 17 ++++++++-------- ray_on_golem/server/services/golem/golem.py | 4 ++-- .../services/golem/helpers/manager_stack.py | 20 +++++++++---------- .../server/services/golem/manager_stack.py | 6 +++--- .../server/services/golem/network_stats.py | 14 +++++-------- 5 files changed, 29 insertions(+), 32 deletions(-) diff --git a/ray_on_golem/server/run.py b/ray_on_golem/server/run.py index 4825680f..af3c771a 100644 --- a/ray_on_golem/server/run.py +++ b/ray_on_golem/server/run.py @@ -52,21 +52,22 @@ def parse_sys_args() -> argparse.Namespace: webserver_parser.add_argument("--no-self-shutdown", action="store_false", dest="self_shutdown") webserver_parser.set_defaults(self_shutdown=False, registry_stats=True) - stats_parser = subparsers.add_parser("stats") + stats_parser = subparsers.add_parser("network-stats") stats_parser.add_argument( "CLUSTER_CONFIG_FILE", type=argparse.FileType("r"), help="Cluster config yaml", ) stats_parser.add_argument( - "-t", - "--run-time", + "-d", + "--duration", type=int, - dest="run_time", + dest="duration", default=5, help="For how long in minutes to gather stats, default: %(default)s", ) stats_parser.add_argument( + "-log", "--enable-logging", action="store_true", dest="enable_logging", @@ -173,14 +174,14 @@ async def golem_network_stats_service(registry_stats: bool) -> GolemNetworkStats await yagna_service.shutdown() -async def stats_main(args: argparse.Namespace): +async def network_stats_main(args: argparse.Namespace): with args.CLUSTER_CONFIG_FILE as file: config = yaml.safe_load(file.read()) provider_config = config["provider"] async with golem_network_stats_service( provider_config["enable_registry_stats"] ) as stats_service: - await stats_service.run(provider_config["parameters"], args.run_time) + await stats_service.run(provider_config["parameters"], args.duration) def main(): @@ -207,10 +208,10 @@ def main(): else: logger.info("Stopping server done, bye!") - elif args.command == "stats": + elif args.command == "network-stats": if args.enable_logging: logging.config.dictConfig(LOGGING_CONFIG) - asyncio.run(stats_main(args)) + asyncio.run(network_stats_main(args)) if __name__ == "__main__": diff --git a/ray_on_golem/server/services/golem/golem.py b/ray_on_golem/server/services/golem/golem.py index 543f6361..fbbda120 100644 --- a/ray_on_golem/server/services/golem/golem.py +++ b/ray_on_golem/server/services/golem/golem.py @@ -127,13 +127,13 @@ async def _create_stack( stack.demand_manager.get_initial_proposal, plugins=( BlacklistProviderIdPlugin(PROVIDERS_BLACKLIST.get(network, set())), - *stack.extra_proposal_plugins, + *stack.extra_proposal_plugins.values(), ScoringBuffer( min_size=50, max_size=1000, fill_at_start=True, proposal_scorers=( - *stack.extra_proposal_scorers, + *stack.extra_proposal_scorers.values(), MapScore(partial(self._score_with_provider_data, network=network)), ), update_interval=timedelta(seconds=10), diff --git a/ray_on_golem/server/services/golem/helpers/manager_stack.py b/ray_on_golem/server/services/golem/helpers/manager_stack.py index d967694b..48c52c13 100644 --- a/ray_on_golem/server/services/golem/helpers/manager_stack.py +++ b/ray_on_golem/server/services/golem/helpers/manager_stack.py @@ -26,15 +26,15 @@ def apply_cost_management_avg_usage(stack: ManagerStack, node_config: NodeConfig ), ) - stack.extra_proposal_scorers.append( - MapScore(linear_average_cost, normalize=True, normalize_flip=True), + stack.extra_proposal_scorers["Sort by linear average cost"] = MapScore( + linear_average_cost, normalize=True, normalize_flip=True ) max_average_usage_cost = node_config.cost_management.max_average_usage_cost if max_average_usage_cost is not None: - stack.extra_proposal_plugins.append( - RejectIfCostsExceeds(max_average_usage_cost, linear_average_cost), - ) + stack.extra_proposal_plugins[ + f"Reject if max_average_usage_cost exceeds {node_config.cost_management.max_average_usage_cost}" + ] = RejectIfCostsExceeds(max_average_usage_cost, linear_average_cost) logger.debug("Cost management based on average usage applied with max limits") else: logger.debug("Cost management based on average usage applied without max limits") @@ -42,7 +42,7 @@ def apply_cost_management_avg_usage(stack: ManagerStack, node_config: NodeConfig @staticmethod def apply_cost_management_hard_limits(stack: ManagerStack, node_config: NodeConfigData) -> None: # TODO: Consider creating RejectIfCostsExceeds variant for multiple values - proposal_plugins = [] + proposal_plugins = {} field_names = { "max_initial_price": "price_initial", "max_cpu_sec_price": "price_cpu_sec", @@ -52,12 +52,12 @@ def apply_cost_management_hard_limits(stack: ManagerStack, node_config: NodeConf for cost_field_name, coef_field_name in field_names.items(): cost_max_value = getattr(node_config.cost_management, cost_field_name, None) if cost_max_value is not None: - proposal_plugins.append( - RejectIfCostsExceeds(cost_max_value, LinearCoeffsCost(coef_field_name)), - ) + proposal_plugins[ + f"Reject if {coef_field_name} exceeds {cost_max_value}" + ] = RejectIfCostsExceeds(cost_max_value, LinearCoeffsCost(coef_field_name)) if proposal_plugins: - stack.extra_proposal_plugins.extend(proposal_plugins) + stack.extra_proposal_plugins.update(proposal_plugins) logger.debug("Cost management based on max limits applied") else: logger.debug("Cost management based on max limits is not enabled") diff --git a/ray_on_golem/server/services/golem/manager_stack.py b/ray_on_golem/server/services/golem/manager_stack.py index 2ebe4160..e22bea16 100644 --- a/ray_on_golem/server/services/golem/manager_stack.py +++ b/ray_on_golem/server/services/golem/manager_stack.py @@ -1,5 +1,5 @@ import logging -from typing import List, Optional +from typing import Dict, Optional from golem.managers import ( ActivityManager, @@ -21,8 +21,8 @@ class ManagerStack(BaseModel): proposal_manager: Optional[ProposalManager] agreement_manager: Optional[AgreementManager] activity_manager: Optional[ActivityManager] - extra_proposal_plugins: List[ProposalManagerPlugin] = Field(default_factory=list) - extra_proposal_scorers: List[ProposalScorer] = Field(default_factory=list) + extra_proposal_plugins: Dict[str, ProposalManagerPlugin] = Field(default_factory=dict) + extra_proposal_scorers: Dict[str, ProposalScorer] = Field(default_factory=dict) class Config: arbitrary_types_allowed = True diff --git a/ray_on_golem/server/services/golem/network_stats.py b/ray_on_golem/server/services/golem/network_stats.py index 28ee2ce0..f48175b5 100644 --- a/ray_on_golem/server/services/golem/network_stats.py +++ b/ray_on_golem/server/services/golem/network_stats.py @@ -139,7 +139,7 @@ async def shutdown(self) -> None: logger.info("Stopping GolemNetworkStatsService done") - async def run(self, provider_parameters: Dict, run_time_minutes: int) -> None: + async def run(self, provider_parameters: Dict, duration_minutes: int) -> None: network: str = provider_parameters["network"] budget: int = provider_parameters["budget"] node_config: NodeConfigData = NodeConfigData(**provider_parameters["node_config"]) @@ -152,7 +152,7 @@ async def run(self, provider_parameters: Dict, run_time_minutes: int) -> None: try: await asyncio.wait( [consume_proposals_task], - timeout=timedelta(minutes=run_time_minutes).total_seconds(), + timeout=timedelta(minutes=duration_minutes).total_seconds(), ) finally: consume_proposals_task.cancel() @@ -203,13 +203,9 @@ async def _create_stack( self._stats_plugin_factory.create_counter_plugin("Not blacklisted"), ] - for idx, plugin in enumerate(stack.extra_proposal_plugins): + for plugin_tag, plugin in stack.extra_proposal_plugins.items(): plugins.append(plugin) - plugins.append( - self._stats_plugin_factory.create_counter_plugin( - f"Passed {plugin.__class__.__name__} #{idx + 1}" - ) - ) + plugins.append(self._stats_plugin_factory.create_counter_plugin(f"Passed {plugin_tag}")) plugins.extend( [ @@ -217,7 +213,7 @@ async def _create_stack( min_size=50, max_size=1000, fill_at_start=True, - proposal_scorers=(*stack.extra_proposal_scorers,), + proposal_scorers=(*stack.extra_proposal_scorers.values(),), update_interval=timedelta(seconds=10), ), self._stats_plugin_factory.create_counter_plugin("Scored"), From 8733a0c05d31658531c336ac66642a07fe2624a6 Mon Sep 17 00:00:00 2001 From: Lucjan Dudek Date: Tue, 7 Nov 2023 15:32:58 +0100 Subject: [PATCH 06/11] CR chnages --- ray_on_golem/server/run.py | 44 ++++++++++--------- .../server/services/golem/network_stats.py | 2 +- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/ray_on_golem/server/run.py b/ray_on_golem/server/run.py index af3c771a..ba762452 100644 --- a/ray_on_golem/server/run.py +++ b/ray_on_golem/server/run.py @@ -174,7 +174,30 @@ async def golem_network_stats_service(registry_stats: bool) -> GolemNetworkStats await yagna_service.shutdown() +def webserver_main(args: argparse.Namespace): + logging.config.dictConfig(LOGGING_CONFIG) + app = create_application(args.port, args.self_shutdown, args.registry_stats) + + logger.info( + "Starting server... {}".format(", ".join(f"{k}={v}" for k, v in args.__dict__.items())) + ) + + try: + web.run_app( + app, + port=app["port"], + print=None, + shutdown_timeout=RAY_ON_GOLEM_SHUTDOWN_DEADLINE.total_seconds(), + ) + except Exception: + logger.info("Server unexpectedly died, bye!") + else: + logger.info("Stopping server done, bye!") + + async def network_stats_main(args: argparse.Namespace): + if args.enable_logging: + logging.config.dictConfig(LOGGING_CONFIG) with args.CLUSTER_CONFIG_FILE as file: config = yaml.safe_load(file.read()) provider_config = config["provider"] @@ -189,28 +212,9 @@ def main(): args = parse_sys_args() if args.command == "webserver": - logging.config.dictConfig(LOGGING_CONFIG) - app = create_application(args.port, args.self_shutdown, args.registry_stats) - - logger.info( - "Starting server... {}".format(", ".join(f"{k}={v}" for k, v in args.__dict__.items())) - ) - - try: - web.run_app( - app, - port=app["port"], - print=None, - shutdown_timeout=RAY_ON_GOLEM_SHUTDOWN_DEADLINE.total_seconds(), - ) - except Exception: - logger.info("Server unexpectedly died, bye!") - else: - logger.info("Stopping server done, bye!") + webserver_main(args) elif args.command == "network-stats": - if args.enable_logging: - logging.config.dictConfig(LOGGING_CONFIG) asyncio.run(network_stats_main(args)) diff --git a/ray_on_golem/server/services/golem/network_stats.py b/ray_on_golem/server/services/golem/network_stats.py index f48175b5..8aa9492f 100644 --- a/ray_on_golem/server/services/golem/network_stats.py +++ b/ray_on_golem/server/services/golem/network_stats.py @@ -147,7 +147,7 @@ async def run(self, provider_parameters: Dict, duration_minutes: int) -> None: stack = await self._create_stack(node_config, budget, network) await stack.start() - print("Gathering stats data...") + print(f"Gathering stats data for {duration_minutes} minutes...") consume_proposals_task = asyncio.create_task(self._consume_draft_proposals(stack)) try: await asyncio.wait( From 7a8eef4c3dc1d1c967cacd5dbcd5c03393858790 Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Wed, 8 Nov 2023 14:39:29 +0100 Subject: [PATCH 07/11] message changes --- ray_on_golem/server/run.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ray_on_golem/server/run.py b/ray_on_golem/server/run.py index ba762452..f2df363d 100644 --- a/ray_on_golem/server/run.py +++ b/ray_on_golem/server/run.py @@ -23,10 +23,10 @@ def parse_sys_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="Ray on Golem's webserver.") + parser = argparse.ArgumentParser(description="Ray on Golem") subparsers = parser.add_subparsers(dest="command") - webserver_parser = subparsers.add_parser("webserver") + webserver_parser = subparsers.add_parser("webserver", description="Ray on Golem's webserver.") webserver_parser.add_argument( "-p", "--port", From 13448bc58ebf95f45509f9385b97524e9c00181e Mon Sep 17 00:00:00 2001 From: approxit Date: Thu, 9 Nov 2023 13:53:03 +0100 Subject: [PATCH 08/11] network_stats separation and click usage --- pyproject.toml | 2 +- ray_on_golem/__main__.py | 3 + ray_on_golem/main.py | 24 ++ ray_on_golem/network_stats/__init__.py | 3 + ray_on_golem/network_stats/main.py | 74 ++++++ .../network_stats/services/__init__.py | 1 + .../services}/network_stats.py | 10 +- ray_on_golem/provider/node_provider.py | 2 +- ray_on_golem/server/__init__.py | 3 + ray_on_golem/server/main.py | 140 +++++++++++ ray_on_golem/server/run.py | 222 ------------------ ray_on_golem/utils.py | 8 + 12 files changed, 263 insertions(+), 229 deletions(-) create mode 100644 ray_on_golem/__main__.py create mode 100644 ray_on_golem/main.py create mode 100644 ray_on_golem/network_stats/__init__.py create mode 100644 ray_on_golem/network_stats/main.py create mode 100644 ray_on_golem/network_stats/services/__init__.py rename ray_on_golem/{server/services/golem => network_stats/services}/network_stats.py (96%) create mode 100644 ray_on_golem/server/main.py delete mode 100644 ray_on_golem/server/run.py diff --git a/pyproject.toml b/pyproject.toml index ce123313..ba5b6e10 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,7 +33,7 @@ click = "^8" pydantic = "<2" [tool.poetry.scripts] -ray-on-golem = "ray_on_golem.server.run:main" +ray-on-golem = "ray_on_golem.main:main" [tool.poetry.group.dev.dependencies] poethepoet = "^0.22.0" diff --git a/ray_on_golem/__main__.py b/ray_on_golem/__main__.py new file mode 100644 index 00000000..a9fc2409 --- /dev/null +++ b/ray_on_golem/__main__.py @@ -0,0 +1,3 @@ +from ray_on_golem.main import main + +main() diff --git a/ray_on_golem/main.py b/ray_on_golem/main.py new file mode 100644 index 00000000..824e1ea6 --- /dev/null +++ b/ray_on_golem/main.py @@ -0,0 +1,24 @@ +import click + +from ray_on_golem.network_stats import main as network_stats +from ray_on_golem.server import main as webserver +from ray_on_golem.utils import prepare_tmp_dir + + +@click.group() +def cli(): + pass + + +cli.add_command(network_stats) +cli.add_command(webserver) + + +def main(): + prepare_tmp_dir() + + cli() + + +if __name__ == "__main__": + main() diff --git a/ray_on_golem/network_stats/__init__.py b/ray_on_golem/network_stats/__init__.py new file mode 100644 index 00000000..6d1f632c --- /dev/null +++ b/ray_on_golem/network_stats/__init__.py @@ -0,0 +1,3 @@ +from ray_on_golem.network_stats.main import main + +__all__ = ("main",) diff --git a/ray_on_golem/network_stats/main.py b/ray_on_golem/network_stats/main.py new file mode 100644 index 00000000..42c587c6 --- /dev/null +++ b/ray_on_golem/network_stats/main.py @@ -0,0 +1,74 @@ +import asyncio +import logging +import logging.config +from contextlib import asynccontextmanager +from typing import Dict + +import click +import yaml + +from ray_on_golem.network_stats.services import NetworkStatsService +from ray_on_golem.server.services import YagnaService +from ray_on_golem.server.settings import LOGGING_CONFIG, YAGNA_PATH +from ray_on_golem.utils import prepare_tmp_dir + + +@click.command( + name="network-stats", + short_help="Run Golem Network statistics.", + help="Run Golem Network statistics based on given cluster config file.", +) +@click.argument("cluster-config-file", type=click.Path(exists=True)) +@click.option( + "-d", + "--duration", + type=int, + default=5, + show_default=True, + help="Set for how long gather stats, in minutes.", +) +@click.option( + "--enable-logging", + is_flag=True, + default=True, + show_default=True, + help="Enable verbose logging.", +) +def main(cluster_config_file: str, duration: int, enable_logging: bool): + if enable_logging: + logging.config.dictConfig(LOGGING_CONFIG) + + with open(cluster_config_file) as file: + config = yaml.safe_load(file.read()) + + asyncio.run(_network_stats(config, duration)) + + +async def _network_stats(config: Dict, duration: int): + provider_config = config["provider"] + + async with golem_network_stats_service( + provider_config["enable_registry_stats"] + ) as stats_service: + await stats_service.run(provider_config["parameters"], duration) + + +@asynccontextmanager +async def golem_network_stats_service(registry_stats: bool) -> NetworkStatsService: + golem_network_stats_service: NetworkStatsService = NetworkStatsService(registry_stats) + yagna_service = YagnaService( + yagna_path=YAGNA_PATH, + ) + + await yagna_service.init() + await golem_network_stats_service.init(yagna_appkey=yagna_service.yagna_appkey) + + yield golem_network_stats_service + + await golem_network_stats_service.shutdown() + await yagna_service.shutdown() + + +if __name__ == "__main__": + prepare_tmp_dir() + main() diff --git a/ray_on_golem/network_stats/services/__init__.py b/ray_on_golem/network_stats/services/__init__.py new file mode 100644 index 00000000..a15706d3 --- /dev/null +++ b/ray_on_golem/network_stats/services/__init__.py @@ -0,0 +1 @@ +from ray_on_golem.network_stats.services.network_stats import NetworkStatsService diff --git a/ray_on_golem/server/services/golem/network_stats.py b/ray_on_golem/network_stats/services/network_stats.py similarity index 96% rename from ray_on_golem/server/services/golem/network_stats.py rename to ray_on_golem/network_stats/services/network_stats.py index 8aa9492f..f2697f4f 100644 --- a/ray_on_golem/server/services/golem/network_stats.py +++ b/ray_on_golem/network_stats/services/network_stats.py @@ -111,7 +111,7 @@ def create_counter_plugin(self, tag: str) -> ProposalCounterPlugin: return self._counters[tag] -class GolemNetworkStatsService: +class NetworkStatsService: def __init__(self, registry_stats: bool) -> None: self._registry_stats = registry_stats @@ -123,21 +123,21 @@ def __init__(self, registry_stats: bool) -> None: self._stats_plugin_factory = StatsPluginFactory() async def init(self, yagna_appkey: str) -> None: - logger.info("Starting GolemNetworkStatsService...") + logger.info("Starting NetworkStatsService...") self._golem = GolemNode(app_key=yagna_appkey) self._yagna_appkey = yagna_appkey await self._golem.start() - logger.info("Starting GolemNetworkStatsService done") + logger.info("Starting NetworkStatsService done") async def shutdown(self) -> None: - logger.info("Stopping GolemNetworkStatsService...") + logger.info("Stopping NetworkStatsService...") await self._golem.aclose() self._golem = None - logger.info("Stopping GolemNetworkStatsService done") + logger.info("Stopping NetworkStatsService done") async def run(self, provider_parameters: Dict, duration_minutes: int) -> None: network: str = provider_parameters["network"] diff --git a/ray_on_golem/provider/node_provider.py b/ray_on_golem/provider/node_provider.py index 23683929..76540889 100644 --- a/ray_on_golem/provider/node_provider.py +++ b/ray_on_golem/provider/node_provider.py @@ -14,7 +14,6 @@ from ray_on_golem.client.client import RayOnGolemClient from ray_on_golem.provider.ssh_command_runner import SSHCommandRunner from ray_on_golem.server.models import NodeConfigData, NodeId, ShutdownState -from ray_on_golem.server.run import prepare_tmp_dir from ray_on_golem.server.settings import ( LOGGING_DEBUG_PATH, RAY_ON_GOLEM_CHECK_DEADLINE, @@ -26,6 +25,7 @@ get_default_ssh_key_name, get_last_lines_from_file, is_running_on_golem_network, + prepare_tmp_dir, ) logger = logging.getLogger(__name__) diff --git a/ray_on_golem/server/__init__.py b/ray_on_golem/server/__init__.py index e69de29b..ebdf1f69 100644 --- a/ray_on_golem/server/__init__.py +++ b/ray_on_golem/server/__init__.py @@ -0,0 +1,3 @@ +from ray_on_golem.server.main import main + +__all__ = ("main",) diff --git a/ray_on_golem/server/main.py b/ray_on_golem/server/main.py new file mode 100644 index 00000000..6b8f3e1a --- /dev/null +++ b/ray_on_golem/server/main.py @@ -0,0 +1,140 @@ +import logging +import logging.config + +import click +from aiohttp import web + +from ray_on_golem.server.middlewares import error_middleware, trace_id_middleware +from ray_on_golem.server.services import GolemService, RayService, YagnaService +from ray_on_golem.server.settings import ( + LOGGING_CONFIG, + RAY_ON_GOLEM_SHUTDOWN_DEADLINE, + TMP_PATH, + WEBSOCAT_PATH, + YAGNA_PATH, +) +from ray_on_golem.server.views import routes +from ray_on_golem.utils import prepare_tmp_dir + +logger = logging.getLogger(__name__) + + +@click.command(name="webserver", help="Run Ray on Golem's webserver.") +@click.option( + "-p", + "--port", + type=int, + show_default=True, + default=4578, + help="Port for Ray on Golem's webserver to listen on.", +) +@click.option( + "--self-shutdown", + is_flag=True, + show_default=True, + help="Enable self-shutdown after last node termination.", +) +@click.option( + "--registry-stats/--no-registry-stats", + show_default=True, + default=True, + help="Enable collection of Golem Registry stats about resolved images.", +) +def main(port: int, self_shutdown: bool, registry_stats: bool): + logging.config.dictConfig(LOGGING_CONFIG) + + app = create_application(port, self_shutdown, registry_stats) + + logger.info(f"Starting server... {port=}, {self_shutdown=}, {registry_stats=}") + + try: + web.run_app( + app, + port=app["port"], + print=None, + shutdown_timeout=RAY_ON_GOLEM_SHUTDOWN_DEADLINE.total_seconds(), + ) + except Exception: + logger.info("Server unexpectedly died, bye!") + else: + logger.info("Stopping server done, bye!") + + +def create_application(port: int, self_shutdown: bool, registry_stats: bool) -> web.Application: + app = web.Application( + middlewares=[ + trace_id_middleware, + error_middleware, + ] + ) + + app["port"] = port + app["self_shutdown"] = self_shutdown + app["registry_stats"] = registry_stats + + app["yagna_service"] = YagnaService( + yagna_path=YAGNA_PATH, + ) + + app["golem_service"] = GolemService( + websocat_path=WEBSOCAT_PATH, + registry_stats=app["registry_stats"], + ) + + app["ray_service"] = RayService( + ray_on_golem_port=port, + golem_service=app["golem_service"], + tmp_path=TMP_PATH, + ) + + app.add_routes(routes) + app.cleanup_ctx.append(yagna_service_ctx) + app.cleanup_ctx.append(golem_service_ctx) + app.cleanup_ctx.append(ray_service_ctx) + app.on_startup.append(startup_print) + app.on_shutdown.append(shutdown_print) + + return app + + +async def startup_print(app: web.Application) -> None: + logger.info("Starting server done, listening on port {}".format(app["port"])) + + +async def shutdown_print(app: web.Application) -> None: + print("") # explicit new line to console to visually better handle ^C + logger.info("Stopping server gracefully, forcing after `%s`...", RAY_ON_GOLEM_SHUTDOWN_DEADLINE) + + +async def yagna_service_ctx(app: web.Application) -> None: + yagna_service: YagnaService = app["yagna_service"] + + await yagna_service.init() + + yield + + await yagna_service.shutdown() + + +async def golem_service_ctx(app: web.Application) -> None: + golem_service: GolemService = app["golem_service"] + yagna_service: YagnaService = app["yagna_service"] + + await golem_service.init(yagna_appkey=yagna_service.yagna_appkey) + + yield + + await golem_service.shutdown() + + +async def ray_service_ctx(app: web.Application) -> None: + ray_service: RayService = app["ray_service"] + + yield + + await ray_service.shutdown() + + +if __name__ == "__main__": + prepare_tmp_dir() + main() diff --git a/ray_on_golem/server/run.py b/ray_on_golem/server/run.py deleted file mode 100644 index f2df363d..00000000 --- a/ray_on_golem/server/run.py +++ /dev/null @@ -1,222 +0,0 @@ -import argparse -import asyncio -import logging -import logging.config -from contextlib import asynccontextmanager - -import yaml -from aiohttp import web - -from ray_on_golem.server.middlewares import error_middleware, trace_id_middleware -from ray_on_golem.server.services import GolemService, RayService, YagnaService -from ray_on_golem.server.services.golem.network_stats import GolemNetworkStatsService -from ray_on_golem.server.settings import ( - LOGGING_CONFIG, - RAY_ON_GOLEM_SHUTDOWN_DEADLINE, - TMP_PATH, - WEBSOCAT_PATH, - YAGNA_PATH, -) -from ray_on_golem.server.views import routes - -logger = logging.getLogger(__name__) - - -def parse_sys_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="Ray on Golem") - subparsers = parser.add_subparsers(dest="command") - - webserver_parser = subparsers.add_parser("webserver", description="Ray on Golem's webserver.") - webserver_parser.add_argument( - "-p", - "--port", - type=int, - default=4578, - help="port for Ray on Golem's webserver to listen on, default: %(default)s", - ) - webserver_parser.add_argument( - "--self-shutdown", - action="store_true", - help="flag to enable self-shutdown after last node termination, default: %(default)s", - ) - webserver_parser.add_argument( - "--registry-stats", - action="store_true", - help="flag to enable collection of Golem Registry stats about resolved images, default: %(default)s", - ) - webserver_parser.add_argument( - "--no-registry-stats", - action="store_false", - dest="registry_stats", - ) - webserver_parser.add_argument("--no-self-shutdown", action="store_false", dest="self_shutdown") - webserver_parser.set_defaults(self_shutdown=False, registry_stats=True) - - stats_parser = subparsers.add_parser("network-stats") - stats_parser.add_argument( - "CLUSTER_CONFIG_FILE", - type=argparse.FileType("r"), - help="Cluster config yaml", - ) - stats_parser.add_argument( - "-d", - "--duration", - type=int, - dest="duration", - default=5, - help="For how long in minutes to gather stats, default: %(default)s", - ) - stats_parser.add_argument( - "-log", - "--enable-logging", - action="store_true", - dest="enable_logging", - help="flag to enable logging, default: %(default)s", - ) - - return parser.parse_args() - - -def prepare_tmp_dir(): - try: - TMP_PATH.mkdir(parents=True, exist_ok=True) - except OSError: - pass - - -def create_application(port: int, self_shutdown: bool, registry_stats: bool) -> web.Application: - app = web.Application( - middlewares=[ - trace_id_middleware, - error_middleware, - ] - ) - - app["port"] = port - app["self_shutdown"] = self_shutdown - app["registry_stats"] = registry_stats - - app["yagna_service"] = YagnaService( - yagna_path=YAGNA_PATH, - ) - - app["golem_service"] = GolemService( - websocat_path=WEBSOCAT_PATH, - registry_stats=app["registry_stats"], - ) - - app["ray_service"] = RayService( - ray_on_golem_port=port, - golem_service=app["golem_service"], - tmp_path=TMP_PATH, - ) - - app.add_routes(routes) - app.cleanup_ctx.append(yagna_service_ctx) - app.cleanup_ctx.append(golem_service_ctx) - app.cleanup_ctx.append(ray_service_ctx) - app.on_startup.append(startup_print) - app.on_shutdown.append(shutdown_print) - - return app - - -async def startup_print(app: web.Application) -> None: - logger.info("Starting server done, listening on port {}".format(app["port"])) - - -async def shutdown_print(app: web.Application) -> None: - logger.info("Stopping server gracefully, forcing after `%s`...", RAY_ON_GOLEM_SHUTDOWN_DEADLINE) - - -async def yagna_service_ctx(app: web.Application) -> None: - yagna_service: YagnaService = app["yagna_service"] - - await yagna_service.init() - - yield - - await yagna_service.shutdown() - - -async def golem_service_ctx(app: web.Application) -> None: - golem_service: GolemService = app["golem_service"] - yagna_service: YagnaService = app["yagna_service"] - - await golem_service.init(yagna_appkey=yagna_service.yagna_appkey) - - yield - - await golem_service.shutdown() - - -async def ray_service_ctx(app: web.Application) -> None: - ray_service: RayService = app["ray_service"] - - yield - - await ray_service.shutdown() - - -@asynccontextmanager -async def golem_network_stats_service(registry_stats: bool) -> GolemNetworkStatsService: - golem_network_stats_service: GolemNetworkStatsService = GolemNetworkStatsService(registry_stats) - yagna_service = YagnaService( - yagna_path=YAGNA_PATH, - ) - - await yagna_service.init() - await golem_network_stats_service.init(yagna_appkey=yagna_service.yagna_appkey) - - yield golem_network_stats_service - - await golem_network_stats_service.shutdown() - await yagna_service.shutdown() - - -def webserver_main(args: argparse.Namespace): - logging.config.dictConfig(LOGGING_CONFIG) - app = create_application(args.port, args.self_shutdown, args.registry_stats) - - logger.info( - "Starting server... {}".format(", ".join(f"{k}={v}" for k, v in args.__dict__.items())) - ) - - try: - web.run_app( - app, - port=app["port"], - print=None, - shutdown_timeout=RAY_ON_GOLEM_SHUTDOWN_DEADLINE.total_seconds(), - ) - except Exception: - logger.info("Server unexpectedly died, bye!") - else: - logger.info("Stopping server done, bye!") - - -async def network_stats_main(args: argparse.Namespace): - if args.enable_logging: - logging.config.dictConfig(LOGGING_CONFIG) - with args.CLUSTER_CONFIG_FILE as file: - config = yaml.safe_load(file.read()) - provider_config = config["provider"] - async with golem_network_stats_service( - provider_config["enable_registry_stats"] - ) as stats_service: - await stats_service.run(provider_config["parameters"], args.duration) - - -def main(): - prepare_tmp_dir() - args = parse_sys_args() - - if args.command == "webserver": - webserver_main(args) - - elif args.command == "network-stats": - asyncio.run(network_stats_main(args)) - - -if __name__ == "__main__": - main() diff --git a/ray_on_golem/utils.py b/ray_on_golem/utils.py index 8193e18a..48156cf0 100644 --- a/ray_on_golem/utils.py +++ b/ray_on_golem/utils.py @@ -11,6 +11,7 @@ from aiohttp.web_runner import GracefulExit from ray_on_golem.exceptions import RayOnGolemError +from ray_on_golem.server.settings import TMP_PATH async def run_subprocess( @@ -77,3 +78,10 @@ def rolloverLogFiles(): def get_last_lines_from_file(file_path: Path, max_lines: int) -> str: with file_path.open() as file: return "".join(deque(file, max_lines)) + + +def prepare_tmp_dir(): + try: + TMP_PATH.mkdir(parents=True, exist_ok=True) + except OSError: + pass From c0cc0bfd2f5f13839a5e788c13d175de944448ba Mon Sep 17 00:00:00 2001 From: approxit Date: Thu, 9 Nov 2023 15:16:04 +0100 Subject: [PATCH 09/11] no config bootstrap in network stats fix --- golem-cluster.override.3-disable-stats.yaml | 3 +- ray_on_golem/network_stats/main.py | 23 +++++----- ray_on_golem/provider/node_provider.py | 47 ++++++++++++--------- 3 files changed, 42 insertions(+), 31 deletions(-) diff --git a/golem-cluster.override.3-disable-stats.yaml b/golem-cluster.override.3-disable-stats.yaml index 127ea2a2..87dbbd58 100644 --- a/golem-cluster.override.3-disable-stats.yaml +++ b/golem-cluster.override.3-disable-stats.yaml @@ -1,2 +1,3 @@ provider: - enable_registry_stats: false + params: + enable_registry_stats: false diff --git a/ray_on_golem/network_stats/main.py b/ray_on_golem/network_stats/main.py index 42c587c6..badbdaaf 100644 --- a/ray_on_golem/network_stats/main.py +++ b/ray_on_golem/network_stats/main.py @@ -8,6 +8,7 @@ import yaml from ray_on_golem.network_stats.services import NetworkStatsService +from ray_on_golem.provider.node_provider import GolemNodeProvider from ray_on_golem.server.services import YagnaService from ray_on_golem.server.settings import LOGGING_CONFIG, YAGNA_PATH from ray_on_golem.utils import prepare_tmp_dir @@ -30,7 +31,7 @@ @click.option( "--enable-logging", is_flag=True, - default=True, + default=False, show_default=True, help="Enable verbose logging.", ) @@ -41,31 +42,31 @@ def main(cluster_config_file: str, duration: int, enable_logging: bool): with open(cluster_config_file) as file: config = yaml.safe_load(file.read()) + GolemNodeProvider._apply_config_defaults(config) + asyncio.run(_network_stats(config, duration)) async def _network_stats(config: Dict, duration: int): - provider_config = config["provider"] + provider_params = config["provider"]["parameters"] - async with golem_network_stats_service( - provider_config["enable_registry_stats"] - ) as stats_service: - await stats_service.run(provider_config["parameters"], duration) + async with network_stats_service(provider_params["enable_registry_stats"]) as stats_service: + await stats_service.run(provider_params, duration) @asynccontextmanager -async def golem_network_stats_service(registry_stats: bool) -> NetworkStatsService: - golem_network_stats_service: NetworkStatsService = NetworkStatsService(registry_stats) +async def network_stats_service(registry_stats: bool) -> NetworkStatsService: + network_stats_service: NetworkStatsService = NetworkStatsService(registry_stats) yagna_service = YagnaService( yagna_path=YAGNA_PATH, ) await yagna_service.init() - await golem_network_stats_service.init(yagna_appkey=yagna_service.yagna_appkey) + await network_stats_service.init(yagna_appkey=yagna_service.yagna_appkey) - yield golem_network_stats_service + yield network_stats_service - await golem_network_stats_service.shutdown() + await network_stats_service.shutdown() await yagna_service.shutdown() diff --git a/ray_on_golem/provider/node_provider.py b/ray_on_golem/provider/node_provider.py index 76540889..baf20029 100644 --- a/ray_on_golem/provider/node_provider.py +++ b/ray_on_golem/provider/node_provider.py @@ -57,38 +57,27 @@ def __init__(self, provider_config: dict, cluster_name: str): def bootstrap_config(cls, cluster_config: Dict[str, Any]) -> Dict[str, Any]: config = deepcopy(cluster_config) - provider_parameters: Dict = config["provider"]["parameters"] - provider_parameters.setdefault("webserver_port", 4578) - provider_parameters.setdefault("enable_registry_stats", True) - provider_parameters.setdefault("network", "goerli") - provider_parameters.setdefault("budget", 1) + cls._apply_config_defaults(config) + provider_parameters = config["provider"]["parameters"] ray_on_golem_client = cls._get_ray_on_golem_client_instance( provider_parameters["webserver_port"], provider_parameters["enable_registry_stats"], ) - auth: Dict = config["auth"] - auth.setdefault("ssh_user", "root") - - if "ssh_private_key" not in auth: - ssh_key_path = TMP_PATH / get_default_ssh_key_name(config["cluster_name"]) - auth["ssh_private_key"] = str(ssh_key_path) - - if not ssh_key_path.exists(): + auth = config["auth"] + default_ssh_private_key = TMP_PATH / get_default_ssh_key_name(config["cluster_name"]) + if auth["ssh_private_key"] == default_ssh_private_key: + if not default_ssh_private_key.exists(): ssh_key_base64 = ray_on_golem_client.get_or_create_default_ssh_key( config["cluster_name"] ) # FIXME: mitigate double file writing on local machine as get_or_create_default_ssh_key creates the file - ssh_key_path.parent.mkdir(parents=True, exist_ok=True) - with ssh_key_path.open("w") as f: + default_ssh_private_key.parent.mkdir(parents=True, exist_ok=True) + with default_ssh_private_key.open("w") as f: f.write(ssh_key_base64) - # copy ssh details to provider namespace for cluster creation in __init__ - provider_parameters["_ssh_private_key"] = auth["ssh_private_key"] - provider_parameters["_ssh_user"] = auth["ssh_user"] - global_event_system.execute_callback( CreateClusterEvent.ssh_keypair_downloaded, {"ssh_key_path": auth["ssh_private_key"]}, @@ -173,6 +162,26 @@ def external_ip(self, node_id: NodeId) -> str: def set_node_tags(self, node_id: NodeId, tags: Dict) -> None: self._ray_on_golem_client.set_node_tags(node_id, tags) + @staticmethod + def _apply_config_defaults(config: Dict[str, Any]) -> None: + provider_parameters: Dict = config["provider"]["parameters"] + provider_parameters.setdefault("webserver_port", 4578) + provider_parameters.setdefault("enable_registry_stats", True) + provider_parameters.setdefault("network", "goerli") + provider_parameters.setdefault("budget", 1) + + auth: Dict = config.setdefault("auth", {}) + auth.setdefault("ssh_user", "root") + + if "ssh_private_key" not in auth: + auth["ssh_private_key"] = str( + TMP_PATH / get_default_ssh_key_name(config["cluster_name"]) + ) + + # copy ssh details to provider namespace for cluster creation in __init__ + provider_parameters["_ssh_private_key"] = auth["ssh_private_key"] + provider_parameters["_ssh_user"] = auth["ssh_user"] + @staticmethod def _start_webserver( ray_on_golem_client: RayOnGolemClient, From e211546270a76561cb0297d55f793d78ad38e84b Mon Sep 17 00:00:00 2001 From: approxit Date: Thu, 9 Nov 2023 15:37:50 +0100 Subject: [PATCH 10/11] fix --- ray_on_golem/provider/node_provider.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ray_on_golem/provider/node_provider.py b/ray_on_golem/provider/node_provider.py index baf20029..c9d0f6a8 100644 --- a/ray_on_golem/provider/node_provider.py +++ b/ray_on_golem/provider/node_provider.py @@ -67,7 +67,7 @@ def bootstrap_config(cls, cluster_config: Dict[str, Any]) -> Dict[str, Any]: auth = config["auth"] default_ssh_private_key = TMP_PATH / get_default_ssh_key_name(config["cluster_name"]) - if auth["ssh_private_key"] == default_ssh_private_key: + if auth["ssh_private_key"] == str(default_ssh_private_key): if not default_ssh_private_key.exists(): ssh_key_base64 = ray_on_golem_client.get_or_create_default_ssh_key( config["cluster_name"] From 2ba3f06f8504119cef68488c9cf8207305ca3375 Mon Sep 17 00:00:00 2001 From: approxit Date: Thu, 9 Nov 2023 16:00:43 +0100 Subject: [PATCH 11/11] review changes --- ray_on_golem/network_stats/main.py | 5 ++--- ray_on_golem/server/main.py | 9 +++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/ray_on_golem/network_stats/main.py b/ray_on_golem/network_stats/main.py index badbdaaf..dd26fc62 100644 --- a/ray_on_golem/network_stats/main.py +++ b/ray_on_golem/network_stats/main.py @@ -18,6 +18,7 @@ name="network-stats", short_help="Run Golem Network statistics.", help="Run Golem Network statistics based on given cluster config file.", + context_settings={"show_default": True}, ) @click.argument("cluster-config-file", type=click.Path(exists=True)) @click.option( @@ -25,14 +26,12 @@ "--duration", type=int, default=5, - show_default=True, - help="Set for how long gather stats, in minutes.", + help="Set the duration of the statistics gathering process, in minutes.", ) @click.option( "--enable-logging", is_flag=True, default=False, - show_default=True, help="Enable verbose logging.", ) def main(cluster_config_file: str, duration: int, enable_logging: bool): diff --git a/ray_on_golem/server/main.py b/ray_on_golem/server/main.py index 6b8f3e1a..4fca5d8c 100644 --- a/ray_on_golem/server/main.py +++ b/ray_on_golem/server/main.py @@ -19,24 +19,25 @@ logger = logging.getLogger(__name__) -@click.command(name="webserver", help="Run Ray on Golem's webserver.") +@click.command( + name="webserver", + help="Run Ray on Golem's webserver.", + context_settings={"show_default": True}, +) @click.option( "-p", "--port", type=int, - show_default=True, default=4578, help="Port for Ray on Golem's webserver to listen on.", ) @click.option( "--self-shutdown", is_flag=True, - show_default=True, help="Enable self-shutdown after last node termination.", ) @click.option( "--registry-stats/--no-registry-stats", - show_default=True, default=True, help="Enable collection of Golem Registry stats about resolved images.", )