Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Usage reporting tweaks #536

Merged
merged 6 commits into from
Jul 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions examples/blender/blender.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
TEXT_COLOR_DEFAULT,
TEXT_COLOR_RED,
TEXT_COLOR_YELLOW,
TEXT_COLOR_MAGENTA,
format_usage,
)


Expand Down Expand Up @@ -79,9 +81,21 @@ async def worker(ctx: WorkContext, tasks):
raise

if show_usage:
print(f" --- {ctx.provider_name} USAGE: {await ctx.get_usage()}")
print(f" --- {ctx.provider_name} STATE: {await ctx.get_state()}")
print(f" --- {ctx.provider_name} COST: {await ctx.get_cost()}")
print(
f"{TEXT_COLOR_MAGENTA}"
f" --- {ctx.provider_name} STATE: {await ctx.get_raw_state()}"
f"{TEXT_COLOR_DEFAULT}"
)
print(
f"{TEXT_COLOR_MAGENTA}"
f" --- {ctx.provider_name} USAGE: {format_usage(await ctx.get_usage())}"
f"{TEXT_COLOR_DEFAULT}"
)
print(
f"{TEXT_COLOR_MAGENTA}"
f" --- {ctx.provider_name} COST: {await ctx.get_cost()}"
f"{TEXT_COLOR_DEFAULT}"
)
azawlocki marked this conversation as resolved.
Show resolved Hide resolved

# Iterator over the frame indices that we want to render
frames: range = range(0, 60, 10)
Expand Down
54 changes: 46 additions & 8 deletions examples/simple-service-poc/simple_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
TEXT_COLOR_DEFAULT,
TEXT_COLOR_RED,
TEXT_COLOR_YELLOW,
TEXT_COLOR_MAGENTA,
format_usage,
)

STARTING_TIMEOUT = timedelta(minutes=4)
Expand All @@ -39,9 +41,13 @@ class SimpleService(Service):
SIMPLE_SERVICE = "/golem/run/simple_service.py"
SIMPLE_SERVICE_CTL = "/golem/run/simulate_observations_ctl.py"

def __init__(self, *args, instance_name, **kwargs):
def __repr__(self):
return f"<{self.__class__.__name__}: {self.name}>"

def __init__(self, *args, instance_name: str, show_usage: bool = False, **kwargs):
super().__init__(*args, **kwargs)
self.name = instance_name
self._show_usage = show_usage

@staticmethod
async def get_payload():
Expand Down Expand Up @@ -81,18 +87,38 @@ async def run(self):
steps = self._ctx.commit()
yield steps

print(f" --- {self._ctx.provider_name} USAGE: {await self._ctx.get_usage()}")
print(f" --- {self._ctx.provider_name} STATE: {await self._ctx.get_state()}")
print(f" --- {self._ctx.provider_name} COST: {await self._ctx.get_cost()}")
if self._show_usage:
print(
f"{TEXT_COLOR_MAGENTA}"
f" --- {self.name} STATE: {await self._ctx.get_raw_state()}"
f"{TEXT_COLOR_DEFAULT}"
)
print(
f"{TEXT_COLOR_MAGENTA}"
f" --- {self.name} USAGE: {format_usage(await self._ctx.get_usage())}"
f"{TEXT_COLOR_DEFAULT}"
)
print(
f"{TEXT_COLOR_MAGENTA}"
f" --- {self.name} COST: {await self._ctx.get_cost()}"
f"{TEXT_COLOR_DEFAULT}"
)

async def shutdown(self):
# handler reponsible for executing operations on shutdown
self._ctx.run(self.SIMPLE_SERVICE_CTL, "--stop")
yield self._ctx.commit()
print(f" --- {self._ctx.provider_name} COST: {await self._ctx.get_cost()}")
if self._show_usage:
print(
f"{TEXT_COLOR_MAGENTA}"
f" --- {self.name} COST: {await self._ctx.get_cost()}"
f"{TEXT_COLOR_DEFAULT}"
)


async def main(subnet_tag, running_time, driver=None, network=None, num_instances=1):
async def main(
subnet_tag, running_time, driver=None, network=None, num_instances=1, show_usage=False
):
async with Golem(
budget=1.0,
subnet_tag=subnet_tag,
Expand Down Expand Up @@ -120,7 +146,8 @@ async def main(subnet_tag, running_time, driver=None, network=None, num_instance
cluster = await golem.run_service(
SimpleService,
instance_params=[
{"instance_name": f"simple-service-{i+1}"} for i in range(num_instances)
{"instance_name": f"simple-service-{i+1}", "show_usage": show_usage}
for i in range(num_instances)
],
expiration=datetime.now(timezone.utc) + timedelta(minutes=120),
)
Expand Down Expand Up @@ -184,7 +211,17 @@ def still_starting():
"(in seconds, default: %(default)s)"
),
)
parser.add_argument("--num-instances", type=int, default=1)
parser.add_argument(
"--num-instances",
type=int,
default=1,
help="The number of instances of the service to spawn",
)
parser.add_argument(
"--show-usage",
action="store_true",
help="Show usage and cost of each instance while running.",
)
now = datetime.now().strftime("%Y-%m-%d_%H.%M.%S")
parser.set_defaults(log_file=f"simple-service-yapapi-{now}.log")
args = parser.parse_args()
Expand All @@ -207,6 +244,7 @@ def still_starting():
driver=args.driver,
network=args.network,
num_instances=args.num_instances,
show_usage=args.show_usage,
)
)

Expand Down
7 changes: 7 additions & 0 deletions examples/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,10 @@ def build_parser(description: str) -> argparse.ArgumentParser:
help="Log file for YAPAPI; default: %(default)s",
)
return parser


def format_usage(usage):
return {
"current_usage": {k.name: v for k, v in usage.current_usage.items()},
"timestamp": usage.timestamp.isoformat(sep=" ") if usage.timestamp else None,
}
63 changes: 54 additions & 9 deletions yapapi/ctx.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
import abc
from dataclasses import dataclass, field
from datetime import timedelta, datetime
Comment on lines +2 to +3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should go before import enum

import enum
import json
from dataclasses import dataclass
from datetime import timedelta
import logging
from os import PathLike
from functools import partial
from pathlib import Path
from typing import Callable, Iterable, Optional, Dict, List, Tuple, Union, Any, Awaitable

from ya_activity.models import (
ActivityUsage as yaa_ActivityUsage,
ActivityState as yaa_ActivityState,
)

from yapapi.events import DownloadStarted, DownloadFinished
from yapapi.props.com import ComLinear
from yapapi.props.com import ComLinear, Counter
from yapapi.storage import StorageProvider, Source, Destination, DOWNLOAD_BYTES_LIMIT_DEFAULT
from yapapi.rest.market import AgreementDetails
from yapapi.rest.activity import Activity
from yapapi.utils import get_local_timezone

logger = logging.getLogger(__name__)


class CommandContainer:
Expand Down Expand Up @@ -312,6 +321,8 @@ def __init__(
self._pending_steps: List[Work] = []
self._started: bool = False

self.__payment_model: Optional[ComLinear] = None

@property
def id(self) -> str:
"""Unique identifier for this work context."""
Expand All @@ -331,7 +342,10 @@ def _payment_model(self) -> ComLinear:
# automatic casting of the payment model-related properties to an appropriate model
# inheriting from `Com`

return self._agreement_details.provider_view.extract(ComLinear)
if not self.__payment_model:
self.__payment_model = self._agreement_details.provider_view.extract(ComLinear)

return self.__payment_model

def __prepare(self):
if not self._started and self._implicit_init:
Expand Down Expand Up @@ -459,16 +473,39 @@ def commit(self, timeout: Optional[timedelta] = None) -> Work:
self._pending_steps = []
return Steps(*steps, timeout=timeout)

async def get_usage(self):
async def get_raw_usage(self) -> yaa_ActivityUsage:
"""Get the raw usage vector for the activity bound to this work context.

The value comes directly from the low level API and is not interpreted in any way.
"""

usage = await self._activity.usage()
logger.debug(f"WorkContext raw usage: id={self.id}, usage={usage}")
return usage

async def get_usage(self) -> "ActivityUsage":
"""Get the current usage for the activity bound to this work context."""

raw_usage = await self.get_raw_usage()
usage = ActivityUsage()
if raw_usage.current_usage:
usage.current_usage = self._payment_model.usage_as_dict(raw_usage.current_usage)
if raw_usage.timestamp:
usage.timestamp = datetime.fromtimestamp(raw_usage.timestamp, tz=get_local_timezone())
return usage

async def get_state(self):
state = await self._activity.state()
return state
async def get_raw_state(self) -> yaa_ActivityState:
"""Get the state activity bound to this work context.

The value comes directly from the low level API and is not interpreted in any way.
"""

return await self._activity.state()

async def get_cost(self) -> Optional[float]:
usage = await self.get_usage()
"""Get the accumulated cost of the activity based on the reported usage."""

usage = await self.get_raw_usage()
if usage.current_usage:
return self._payment_model.calculate_cost(usage.current_usage)
return None
Expand Down Expand Up @@ -528,3 +565,11 @@ def to_dict(self) -> Dict:

def is_streaming(self) -> bool:
return self.mode == CaptureMode.STREAM


@dataclass
class ActivityUsage:
"""A high-level representation of activity usage record."""

current_usage: Dict[Counter, float] = field(default_factory=dict)
timestamp: Optional[datetime] = None
3 changes: 2 additions & 1 deletion yapapi/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from yapapi import events, __version__ as yapapi_version
from yapapi.services import MAX_AGREEMENT_EXPIRATION, MIN_AGREEMENT_EXPIRATION
from yapapi.rest.activity import CommandExecutionError
from yapapi.utils import get_local_timezone

event_logger = logging.getLogger("yapapi.events")
executor_logger = logging.getLogger("yapapi.executor")
Expand All @@ -74,7 +75,7 @@
class _YagnaDatetimeFormatter(logging.Formatter):
"""Custom log Formatter that formats datetime using the same convention yagna uses."""

LOCAL_TZ = datetime.now(timezone.utc).astimezone().tzinfo
LOCAL_TZ = get_local_timezone()

def formatTime(self, record: logging.LogRecord, datefmt=None):
"""Format datetime; example: `2021-06-11T14:55:43.156.123+0200`."""
Expand Down
7 changes: 7 additions & 0 deletions yapapi/props/com.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ class Com(Model):
def calculate_cost(self, usage: List) -> float:
"""Calculate the cost by applying the provided usage vector to the underlying pricing model."""

@abc.abstractmethod
def usage_as_dict(self, usage: List) -> Dict:
"""Return usage as a dictionary where keys are the appropriate usage counters."""


@dataclass(frozen=True)
class ComLinear(Com):
Expand Down Expand Up @@ -75,3 +79,6 @@ def price_for(self) -> Dict[Counter, float]:
def calculate_cost(self, usage: List):
usage = usage + [1.0] # append the "usage" of the fixed component
return sum([self.linear_coeffs[i] * usage[i] for i in range(len(self.linear_coeffs))])

def usage_as_dict(self, usage: List) -> Dict[Counter, float]:
return {Counter(self.usage_vector[i]): usage[i] for i in range(len(usage))}
5 changes: 5 additions & 0 deletions yapapi/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Utility functions and classes used within the `yapapi.executor` package."""
import asyncio
from datetime import datetime, timezone, tzinfo
import logging
from typing import AsyncContextManager, Callable, Optional
import warnings
Expand Down Expand Up @@ -87,3 +88,7 @@ def show_module_deprecation_warning(old_module: str, new_module: str, since_vers
category=DeprecationWarning,
stacklevel=2,
)


def get_local_timezone() -> Optional[tzinfo]:
return datetime.now(timezone.utc).astimezone().tzinfo