From cda4ca3aa9dc380e5fe212453e414e4b86376575 Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Wed, 7 Jul 2021 12:05:40 +0200 Subject: [PATCH 1/6] parametrize the display of usage in the simple service --- examples/simple-service-poc/simple_service.py | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/examples/simple-service-poc/simple_service.py b/examples/simple-service-poc/simple_service.py index 5d545f280..31abd7437 100755 --- a/examples/simple-service-poc/simple_service.py +++ b/examples/simple-service-poc/simple_service.py @@ -39,9 +39,10 @@ class SimpleService(Service): SIMPLE_SERVICE = "/golem/run/simple_service.py" SIMPLE_SERVICE_CTL = "/golem/run/simulate_observations_ctl.py" - def __init__(self, *args, instance_name, **kwargs): + def __init__(self, *args, instance_name, show_usage: bool = False, **kwargs): super().__init__(*args, **kwargs) self.name = instance_name + self._show_usage = show_usage @staticmethod async def get_payload(): @@ -81,18 +82,20 @@ async def run(self): steps = self._ctx.commit() yield steps - print(f" --- {self._ctx.provider_name} USAGE: {await self._ctx.get_usage()}") - print(f" --- {self._ctx.provider_name} STATE: {await self._ctx.get_state()}") - print(f" --- {self._ctx.provider_name} COST: {await self._ctx.get_cost()}") + if self._show_usage: + print(f" --- {self.name} USAGE: {await self._ctx.get_usage()}") + print(f" --- {self.name} STATE: {await self._ctx.get_state()}") + print(f" --- {self.name} COST: {await self._ctx.get_cost()}") async def shutdown(self): # handler reponsible for executing operations on shutdown self._ctx.run(self.SIMPLE_SERVICE_CTL, "--stop") yield self._ctx.commit() - print(f" --- {self._ctx.provider_name} COST: {await self._ctx.get_cost()}") + if self._show_usage: + print(f" --- {self.name} COST: {await self._ctx.get_cost()}") -async def main(subnet_tag, running_time, driver=None, network=None, num_instances=1): +async def main(subnet_tag, running_time, driver=None, network=None, num_instances=1, show_usage=False): async with Golem( budget=1.0, subnet_tag=subnet_tag, @@ -120,7 +123,7 @@ async def main(subnet_tag, running_time, driver=None, network=None, num_instance cluster = await golem.run_service( SimpleService, instance_params=[ - {"instance_name": f"simple-service-{i+1}"} for i in range(num_instances) + {"instance_name": f"simple-service-{i+1}", "show_usage": show_usage} for i in range(num_instances) ], expiration=datetime.now(timezone.utc) + timedelta(minutes=120), ) @@ -184,7 +187,8 @@ def still_starting(): "(in seconds, default: %(default)s)" ), ) - parser.add_argument("--num-instances", type=int, default=1) + parser.add_argument("--num-instances", type=int, default=1, help="The number of instances of the service to spawn") + parser.add_argument("--show-usage", action="store_true", help="Show usage and cost of each instance while running.") now = datetime.now().strftime("%Y-%m-%d_%H.%M.%S") parser.set_defaults(log_file=f"simple-service-yapapi-{now}.log") args = parser.parse_args() @@ -207,6 +211,7 @@ def still_starting(): driver=args.driver, network=args.network, num_instances=args.num_instances, + show_usage=args.show_usage, ) ) From 871ed5e99ced7222be5018ee8735fecdc646b107 Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Wed, 7 Jul 2021 12:35:48 +0200 Subject: [PATCH 2/6] black ... --- examples/simple-service-poc/simple_service.py | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/examples/simple-service-poc/simple_service.py b/examples/simple-service-poc/simple_service.py index 31abd7437..9d7046444 100755 --- a/examples/simple-service-poc/simple_service.py +++ b/examples/simple-service-poc/simple_service.py @@ -95,7 +95,9 @@ async def shutdown(self): print(f" --- {self.name} COST: {await self._ctx.get_cost()}") -async def main(subnet_tag, running_time, driver=None, network=None, num_instances=1, show_usage=False): +async def main( + subnet_tag, running_time, driver=None, network=None, num_instances=1, show_usage=False +): async with Golem( budget=1.0, subnet_tag=subnet_tag, @@ -123,7 +125,8 @@ async def main(subnet_tag, running_time, driver=None, network=None, num_instance cluster = await golem.run_service( SimpleService, instance_params=[ - {"instance_name": f"simple-service-{i+1}", "show_usage": show_usage} for i in range(num_instances) + {"instance_name": f"simple-service-{i+1}", "show_usage": show_usage} + for i in range(num_instances) ], expiration=datetime.now(timezone.utc) + timedelta(minutes=120), ) @@ -187,8 +190,17 @@ def still_starting(): "(in seconds, default: %(default)s)" ), ) - parser.add_argument("--num-instances", type=int, default=1, help="The number of instances of the service to spawn") - parser.add_argument("--show-usage", action="store_true", help="Show usage and cost of each instance while running.") + parser.add_argument( + "--num-instances", + type=int, + default=1, + help="The number of instances of the service to spawn", + ) + parser.add_argument( + "--show-usage", + action="store_true", + help="Show usage and cost of each instance while running.", + ) now = datetime.now().strftime("%Y-%m-%d_%H.%M.%S") parser.set_defaults(log_file=f"simple-service-yapapi-{now}.log") args = parser.parse_args() From 979628d0136a7dd1b2c9ae2645137c6fe44e41d8 Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Wed, 7 Jul 2021 12:38:23 +0200 Subject: [PATCH 3/6] add type for `instance_name` --- examples/simple-service-poc/simple_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/simple-service-poc/simple_service.py b/examples/simple-service-poc/simple_service.py index 9d7046444..b276c8609 100755 --- a/examples/simple-service-poc/simple_service.py +++ b/examples/simple-service-poc/simple_service.py @@ -39,7 +39,7 @@ class SimpleService(Service): SIMPLE_SERVICE = "/golem/run/simple_service.py" SIMPLE_SERVICE_CTL = "/golem/run/simulate_observations_ctl.py" - def __init__(self, *args, instance_name, show_usage: bool = False, **kwargs): + def __init__(self, *args, instance_name: str, show_usage: bool = False, **kwargs): super().__init__(*args, **kwargs) self.name = instance_name self._show_usage = show_usage From 110bfd0575a6976c218028d6cf63a9e6504f63b3 Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Wed, 7 Jul 2021 15:42:39 +0200 Subject: [PATCH 4/6] better usage reporting --- examples/blender/blender.py | 20 +++++++-- examples/simple-service-poc/simple_service.py | 26 ++++++++++-- examples/utils.py | 7 ++++ yapapi/ctx.py | 42 +++++++++++++++---- yapapi/log.py | 3 +- yapapi/props/com.py | 7 ++++ yapapi/utils.py | 5 +++ 7 files changed, 95 insertions(+), 15 deletions(-) diff --git a/examples/blender/blender.py b/examples/blender/blender.py index 0b3c0161e..77f7878c1 100755 --- a/examples/blender/blender.py +++ b/examples/blender/blender.py @@ -25,6 +25,8 @@ TEXT_COLOR_DEFAULT, TEXT_COLOR_RED, TEXT_COLOR_YELLOW, + TEXT_COLOR_MAGENTA, + format_usage, ) @@ -79,9 +81,21 @@ async def worker(ctx: WorkContext, tasks): raise if show_usage: - print(f" --- {ctx.provider_name} USAGE: {await ctx.get_usage()}") - print(f" --- {ctx.provider_name} STATE: {await ctx.get_state()}") - print(f" --- {ctx.provider_name} COST: {await ctx.get_cost()}") + print( + f"{TEXT_COLOR_MAGENTA}" + f" --- {ctx.provider_name} STATE: {await ctx.get_raw_state()}" + f"{TEXT_COLOR_DEFAULT}" + ) + print( + f"{TEXT_COLOR_MAGENTA}" + f" --- {ctx.provider_name} USAGE: {format_usage(await ctx.get_usage())}" + f"{TEXT_COLOR_DEFAULT}" + ) + print( + f"{TEXT_COLOR_MAGENTA}" + f" --- {ctx.provider_name} COST: {await ctx.get_cost()}" + f"{TEXT_COLOR_DEFAULT}" + ) # Iterator over the frame indices that we want to render frames: range = range(0, 60, 10) diff --git a/examples/simple-service-poc/simple_service.py b/examples/simple-service-poc/simple_service.py index b276c8609..d22a4e427 100755 --- a/examples/simple-service-poc/simple_service.py +++ b/examples/simple-service-poc/simple_service.py @@ -30,6 +30,8 @@ TEXT_COLOR_DEFAULT, TEXT_COLOR_RED, TEXT_COLOR_YELLOW, + TEXT_COLOR_MAGENTA, + format_usage, ) STARTING_TIMEOUT = timedelta(minutes=4) @@ -83,16 +85,32 @@ async def run(self): yield steps if self._show_usage: - print(f" --- {self.name} USAGE: {await self._ctx.get_usage()}") - print(f" --- {self.name} STATE: {await self._ctx.get_state()}") - print(f" --- {self.name} COST: {await self._ctx.get_cost()}") + print( + f"{TEXT_COLOR_MAGENTA}" + f" --- {self.name} STATE: {await self._ctx.get_raw_state()}" + f"{TEXT_COLOR_DEFAULT}" + ) + print( + f"{TEXT_COLOR_MAGENTA}" + f" --- {self.name} USAGE: {format_usage(await self._ctx.get_usage())}" + f"{TEXT_COLOR_DEFAULT}" + ) + print( + f"{TEXT_COLOR_MAGENTA}" + f" --- {self.name} COST: {await self._ctx.get_cost()}" + f"{TEXT_COLOR_DEFAULT}" + ) async def shutdown(self): # handler reponsible for executing operations on shutdown self._ctx.run(self.SIMPLE_SERVICE_CTL, "--stop") yield self._ctx.commit() if self._show_usage: - print(f" --- {self.name} COST: {await self._ctx.get_cost()}") + print( + f"{TEXT_COLOR_MAGENTA}" + f" --- {self.name} COST: {await self._ctx.get_cost()}" + f"{TEXT_COLOR_DEFAULT}" + ) async def main( diff --git a/examples/utils.py b/examples/utils.py index 0444102c8..24c9a811f 100644 --- a/examples/utils.py +++ b/examples/utils.py @@ -37,3 +37,10 @@ def build_parser(description: str) -> argparse.ArgumentParser: help="Log file for YAPAPI; default: %(default)s", ) return parser + + +def format_usage(usage): + return { + "current_usage": {k.name: v for k, v in usage["current_usage"].items()}, + "timestamp": usage["timestamp"].isoformat(sep=" "), + } diff --git a/yapapi/ctx.py b/yapapi/ctx.py index fdfabc86a..6fb284c3d 100644 --- a/yapapi/ctx.py +++ b/yapapi/ctx.py @@ -2,17 +2,23 @@ import enum import json from dataclasses import dataclass -from datetime import timedelta +from datetime import timedelta, datetime +import logging from os import PathLike from functools import partial from pathlib import Path from typing import Callable, Iterable, Optional, Dict, List, Tuple, Union, Any, Awaitable +from ya_activity.models import ActivityUsage, ActivityState + from yapapi.events import DownloadStarted, DownloadFinished from yapapi.props.com import ComLinear from yapapi.storage import StorageProvider, Source, Destination, DOWNLOAD_BYTES_LIMIT_DEFAULT from yapapi.rest.market import AgreementDetails from yapapi.rest.activity import Activity +from yapapi.utils import get_local_timezone + +logger = logging.getLogger(__name__) class CommandContainer: @@ -312,6 +318,8 @@ def __init__( self._pending_steps: List[Work] = [] self._started: bool = False + self.__payment_model: Optional[ComLinear] = None + @property def id(self) -> str: """Unique identifier for this work context.""" @@ -331,7 +339,10 @@ def _payment_model(self) -> ComLinear: # automatic casting of the payment model-related properties to an appropriate model # inheriting from `Com` - return self._agreement_details.provider_view.extract(ComLinear) + if not self.__payment_model: + self.__payment_model = self._agreement_details.provider_view.extract(ComLinear) + + return self.__payment_model def __prepare(self): if not self._started and self._implicit_init: @@ -459,16 +470,33 @@ def commit(self, timeout: Optional[timedelta] = None) -> Work: self._pending_steps = [] return Steps(*steps, timeout=timeout) - async def get_usage(self): + async def get_raw_usage(self) -> ActivityUsage: + """Get the raw usage vector for the activity bound to this work context. + The value comes directly from the low level API and is not interpreted in any way.""" usage = await self._activity.usage() + logger.debug(f"WorkContext raw usage: id={self.id}, usage={usage}") + return usage + + async def get_usage(self): + """Get the current usage for the activity bound to this work context.""" + raw_usage = await self.get_raw_usage() + usage = {} + if raw_usage.current_usage: + usage["current_usage"] = self._payment_model.usage_as_dict(raw_usage.current_usage) + if raw_usage.timestamp: + usage["timestamp"] = datetime.fromtimestamp( + raw_usage.timestamp, tz=get_local_timezone() + ) return usage - async def get_state(self): - state = await self._activity.state() - return state + async def get_raw_state(self) -> ActivityState: + """Get the state activity bound to this work context. + The value comes directly from the low level API and is not interpreted in any way.""" + return await self._activity.state() async def get_cost(self) -> Optional[float]: - usage = await self.get_usage() + """Get the accumulated cost of the activity based on the reported usage.""" + usage = await self.get_raw_usage() if usage.current_usage: return self._payment_model.calculate_cost(usage.current_usage) return None diff --git a/yapapi/log.py b/yapapi/log.py index d7f31d2bb..eb71dd45d 100644 --- a/yapapi/log.py +++ b/yapapi/log.py @@ -61,6 +61,7 @@ from yapapi import events, __version__ as yapapi_version from yapapi.services import MAX_AGREEMENT_EXPIRATION, MIN_AGREEMENT_EXPIRATION from yapapi.rest.activity import CommandExecutionError +from yapapi.utils import get_local_timezone event_logger = logging.getLogger("yapapi.events") executor_logger = logging.getLogger("yapapi.executor") @@ -74,7 +75,7 @@ class _YagnaDatetimeFormatter(logging.Formatter): """Custom log Formatter that formats datetime using the same convention yagna uses.""" - LOCAL_TZ = datetime.now(timezone.utc).astimezone().tzinfo + LOCAL_TZ = get_local_timezone() def formatTime(self, record: logging.LogRecord, datefmt=None): """Format datetime; example: `2021-06-11T14:55:43.156.123+0200`.""" diff --git a/yapapi/props/com.py b/yapapi/props/com.py index 5d4034c01..4fb6e0501 100644 --- a/yapapi/props/com.py +++ b/yapapi/props/com.py @@ -39,6 +39,10 @@ class Com(Model): def calculate_cost(self, usage: List) -> float: """Calculate the cost by applying the provided usage vector to the underlying pricing model.""" + @abc.abstractmethod + def usage_as_dict(self, usage: List) -> Dict: + """Return usage as a dictionary where keys are the appropriate usage counters.""" + @dataclass(frozen=True) class ComLinear(Com): @@ -75,3 +79,6 @@ def price_for(self) -> Dict[Counter, float]: def calculate_cost(self, usage: List): usage = usage + [1.0] # append the "usage" of the fixed component return sum([self.linear_coeffs[i] * usage[i] for i in range(len(self.linear_coeffs))]) + + def usage_as_dict(self, usage: List) -> Dict[Counter, float]: + return {Counter(self.usage_vector[i]): usage[i] for i in range(len(usage))} diff --git a/yapapi/utils.py b/yapapi/utils.py index c0cc0e5d7..e1192075c 100644 --- a/yapapi/utils.py +++ b/yapapi/utils.py @@ -2,6 +2,7 @@ import asyncio import logging from typing import AsyncContextManager, Callable, Optional +from datetime import datetime, timezone, tzinfo import warnings @@ -87,3 +88,7 @@ def show_module_deprecation_warning(old_module: str, new_module: str, since_vers category=DeprecationWarning, stacklevel=2, ) + + +def get_local_timezone() -> Optional[tzinfo]: + return datetime.now(timezone.utc).astimezone().tzinfo From cfaa1bee207478e5510a85c0e932692dd492d35b Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Wed, 7 Jul 2021 17:41:23 +0200 Subject: [PATCH 5/6] actually, a dataclass instead of dict (for better typing support et al) --- examples/simple-service-poc/simple_service.py | 3 ++ examples/utils.py | 4 +-- yapapi/ctx.py | 31 ++++++++++++------- 3 files changed, 25 insertions(+), 13 deletions(-) diff --git a/examples/simple-service-poc/simple_service.py b/examples/simple-service-poc/simple_service.py index d22a4e427..7e52d9b3f 100755 --- a/examples/simple-service-poc/simple_service.py +++ b/examples/simple-service-poc/simple_service.py @@ -41,6 +41,9 @@ class SimpleService(Service): SIMPLE_SERVICE = "/golem/run/simple_service.py" SIMPLE_SERVICE_CTL = "/golem/run/simulate_observations_ctl.py" + def __repr__(self): + return f"<{self.__class__.__name__}: {self.name}>" + def __init__(self, *args, instance_name: str, show_usage: bool = False, **kwargs): super().__init__(*args, **kwargs) self.name = instance_name diff --git a/examples/utils.py b/examples/utils.py index 24c9a811f..cb0edf080 100644 --- a/examples/utils.py +++ b/examples/utils.py @@ -41,6 +41,6 @@ def build_parser(description: str) -> argparse.ArgumentParser: def format_usage(usage): return { - "current_usage": {k.name: v for k, v in usage["current_usage"].items()}, - "timestamp": usage["timestamp"].isoformat(sep=" "), + "current_usage": {k.name: v for k, v in usage.current_usage.items()}, + "timestamp": usage.timestamp.isoformat(sep=" ") if usage.timestamp else None, } diff --git a/yapapi/ctx.py b/yapapi/ctx.py index 6fb284c3d..3b3b05cfe 100644 --- a/yapapi/ctx.py +++ b/yapapi/ctx.py @@ -1,7 +1,7 @@ import abc import enum import json -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import timedelta, datetime import logging from os import PathLike @@ -9,10 +9,13 @@ from pathlib import Path from typing import Callable, Iterable, Optional, Dict, List, Tuple, Union, Any, Awaitable -from ya_activity.models import ActivityUsage, ActivityState +from ya_activity.models import ( + ActivityUsage as yaa_ActivityUsage, + ActivityState as yaa_ActivityState, +) from yapapi.events import DownloadStarted, DownloadFinished -from yapapi.props.com import ComLinear +from yapapi.props.com import ComLinear, Counter from yapapi.storage import StorageProvider, Source, Destination, DOWNLOAD_BYTES_LIMIT_DEFAULT from yapapi.rest.market import AgreementDetails from yapapi.rest.activity import Activity @@ -470,26 +473,24 @@ def commit(self, timeout: Optional[timedelta] = None) -> Work: self._pending_steps = [] return Steps(*steps, timeout=timeout) - async def get_raw_usage(self) -> ActivityUsage: + async def get_raw_usage(self) -> yaa_ActivityUsage: """Get the raw usage vector for the activity bound to this work context. The value comes directly from the low level API and is not interpreted in any way.""" usage = await self._activity.usage() logger.debug(f"WorkContext raw usage: id={self.id}, usage={usage}") return usage - async def get_usage(self): + async def get_usage(self) -> "ActivityUsage": """Get the current usage for the activity bound to this work context.""" raw_usage = await self.get_raw_usage() - usage = {} + usage = ActivityUsage() if raw_usage.current_usage: - usage["current_usage"] = self._payment_model.usage_as_dict(raw_usage.current_usage) + usage.current_usage = self._payment_model.usage_as_dict(raw_usage.current_usage) if raw_usage.timestamp: - usage["timestamp"] = datetime.fromtimestamp( - raw_usage.timestamp, tz=get_local_timezone() - ) + usage.timestamp = datetime.fromtimestamp(raw_usage.timestamp, tz=get_local_timezone()) return usage - async def get_raw_state(self) -> ActivityState: + async def get_raw_state(self) -> yaa_ActivityState: """Get the state activity bound to this work context. The value comes directly from the low level API and is not interpreted in any way.""" return await self._activity.state() @@ -556,3 +557,11 @@ def to_dict(self) -> Dict: def is_streaming(self) -> bool: return self.mode == CaptureMode.STREAM + + +@dataclass +class ActivityUsage: + """A high-level representation of activity usage record.""" + + current_usage: Dict[Counter, float] = field(default_factory=dict) + timestamp: Optional[datetime] = None From 0846129f2578fb27574abf98b2d6b416c844ef3d Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Thu, 8 Jul 2021 10:51:38 +0200 Subject: [PATCH 6/6] human linter ;) --- yapapi/ctx.py | 16 ++++++++++++---- yapapi/utils.py | 2 +- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/yapapi/ctx.py b/yapapi/ctx.py index 3b3b05cfe..ba28e9c4e 100644 --- a/yapapi/ctx.py +++ b/yapapi/ctx.py @@ -1,8 +1,8 @@ import abc -import enum -import json from dataclasses import dataclass, field from datetime import timedelta, datetime +import enum +import json import logging from os import PathLike from functools import partial @@ -475,13 +475,17 @@ def commit(self, timeout: Optional[timedelta] = None) -> Work: async def get_raw_usage(self) -> yaa_ActivityUsage: """Get the raw usage vector for the activity bound to this work context. - The value comes directly from the low level API and is not interpreted in any way.""" + + The value comes directly from the low level API and is not interpreted in any way. + """ + usage = await self._activity.usage() logger.debug(f"WorkContext raw usage: id={self.id}, usage={usage}") return usage async def get_usage(self) -> "ActivityUsage": """Get the current usage for the activity bound to this work context.""" + raw_usage = await self.get_raw_usage() usage = ActivityUsage() if raw_usage.current_usage: @@ -492,11 +496,15 @@ async def get_usage(self) -> "ActivityUsage": async def get_raw_state(self) -> yaa_ActivityState: """Get the state activity bound to this work context. - The value comes directly from the low level API and is not interpreted in any way.""" + + The value comes directly from the low level API and is not interpreted in any way. + """ + return await self._activity.state() async def get_cost(self) -> Optional[float]: """Get the accumulated cost of the activity based on the reported usage.""" + usage = await self.get_raw_usage() if usage.current_usage: return self._payment_model.calculate_cost(usage.current_usage) diff --git a/yapapi/utils.py b/yapapi/utils.py index e1192075c..fbcbf162c 100644 --- a/yapapi/utils.py +++ b/yapapi/utils.py @@ -1,8 +1,8 @@ """Utility functions and classes used within the `yapapi.executor` package.""" import asyncio +from datetime import datetime, timezone, tzinfo import logging from typing import AsyncContextManager, Callable, Optional -from datetime import datetime, timezone, tzinfo import warnings