diff --git a/examples/blender/blender_registry_usage.py b/examples/blender/blender_registry_usage.py new file mode 100755 index 000000000..6ebb9a6aa --- /dev/null +++ b/examples/blender/blender_registry_usage.py @@ -0,0 +1,207 @@ +#!/usr/bin/env python3 +import pathlib +import sys +from datetime import datetime, timedelta + +from yapapi import Golem, Task, WorkContext +from yapapi.payload import vm +from yapapi.rest.activity import BatchTimeoutError + +examples_dir = pathlib.Path(__file__).resolve().parent.parent +sys.path.append(str(examples_dir)) + +from utils import ( + TEXT_COLOR_CYAN, + TEXT_COLOR_DEFAULT, + TEXT_COLOR_MAGENTA, + TEXT_COLOR_RED, + build_parser, + format_usage, + print_env_info, + run_golem_example, +) + + +async def start(subnet_tag, package, payment_driver=None, payment_network=None, show_usage=False): + async def worker(ctx: WorkContext, tasks): + script_dir = pathlib.Path(__file__).resolve().parent + scene_path = str(script_dir / "cubes.blend") + + # Set timeout for the first script executed on the provider. Usually, 30 seconds + # should be more than enough for computing a single frame of the provided scene, + # however a provider may require more time for the first task if it needs to download + # the VM image first. Once downloaded, the VM image will be cached and other tasks that use + # that image will be computed faster. + script = ctx.new_script(timeout=timedelta(minutes=10)) + script.upload_file(scene_path, "/golem/resource/scene.blend") + + async for task in tasks: + frame = task.data + crops = [{"outfilebasename": "out", "borders_x": [0.0, 1.0], "borders_y": [0.0, 1.0]}] + script.upload_json( + { + "scene_file": "/golem/resource/scene.blend", + "resolution": (400, 300), + "use_compositing": False, + "crops": crops, + "samples": 100, + "frames": [frame], + "output_format": "PNG", + "RESOURCES_DIR": "/golem/resources", + "WORK_DIR": "/golem/work", + "OUTPUT_DIR": "/golem/output", + }, + "/golem/work/params.json", + ) + + script.run("/golem/entrypoints/run-blender.sh") + output_file = f"output_{frame}.png" + script.download_file(f"/golem/output/out{frame:04d}.png", output_file) + try: + yield script + # TODO: Check if job results are valid + # and reject by: task.reject_task(reason = 'invalid file') + task.accept_result(result=output_file) + except BatchTimeoutError: + print( + f"{TEXT_COLOR_RED}" + f"Task {task} timed out on {ctx.provider_name}, time: {task.running_time}" + f"{TEXT_COLOR_DEFAULT}" + ) + raise + + # reinitialize the script which we send to the engine to compute subsequent frames + script = ctx.new_script(timeout=timedelta(minutes=1)) + + if show_usage: + raw_state = await ctx.get_raw_state() + usage = format_usage(await ctx.get_usage()) + cost = await ctx.get_cost() + print( + f"{TEXT_COLOR_MAGENTA}" + f" --- {ctx.provider_name} STATE: {raw_state}\n" + f" --- {ctx.provider_name} USAGE: {usage}\n" + f" --- {ctx.provider_name} COST: {cost}" + f"{TEXT_COLOR_DEFAULT}" + ) + + # Iterator over the frame indices that we want to render + frames: range = range(0, 60, 10) + # Worst-case overhead, in minutes, for initialization (negotiation, file transfer etc.) + # TODO: make this dynamic, e.g. depending on the size of files to transfer + init_overhead = 3 + # Providers will not accept work if the timeout is outside of the [5 min, 30min] range. + # We increase the lower bound to 6 min to account for the time needed for our demand to + # reach the providers. + min_timeout, max_timeout = 6, 30 + + timeout = timedelta(minutes=max(min(init_overhead + len(frames) * 2, max_timeout), min_timeout)) + + async with Golem( + budget=10.0, + subnet_tag=subnet_tag, + payment_driver=payment_driver, + payment_network=payment_network, + ) as golem: + print_env_info(golem) + + num_tasks = 0 + start_time = datetime.now() + + completed_tasks = golem.execute_tasks( + worker, + [Task(data=frame) for frame in frames], + payload=package, + max_workers=3, + timeout=timeout, + ) + async for task in completed_tasks: + num_tasks += 1 + print( + f"{TEXT_COLOR_CYAN}" + f"Task computed: {task}, result: {task.result}, time: {task.running_time}" + f"{TEXT_COLOR_DEFAULT}" + ) + + print( + f"{TEXT_COLOR_CYAN}" + f"{num_tasks} tasks computed, total time: {datetime.now() - start_time}" + f"{TEXT_COLOR_DEFAULT}" + ) + + +async def create_package(args, default_image_tag): + # Use golem/blender:latest image tag, + # you can overwrite this option with --image-tag or --image-hash + if args.image_url and args.image_tag: + raise ValueError("Only one of --image-url and --image-tag can be specified") + if args.image_url and not args.image_hash: + raise ValueError("--image-url requires --image-hash to be specified") + if args.image_hash and args.image_tag: + raise ValueError("Only one of --image-hash and --image-tag can be specified") + elif args.image_hash: + image_tag = None + else: + image_tag = args.image_tag or default_image_tag + + # resolve image by tag, hash or direct link + package = await vm.repo( + image_tag=image_tag, + image_hash=args.image_hash, + image_url=args.image_url, + image_use_https=args.image_use_https, + # only run on provider nodes that have more than 0.5gb of RAM available + min_mem_gib=0.5, + # only run on provider nodes that have more than 2gb of storage space available + min_storage_gib=2.0, + # only run on provider nodes which a certain number of CPU threads (logical CPU cores) + # available + min_cpu_threads=args.min_cpu_threads, + ) + return package + + +async def main(args): + # Create a package using options specified in the command line + package = await create_package(args, default_image_tag="golem/blender:latest") + + await start( + subnet_tag=args.subnet_tag, + package=package, + payment_driver=args.payment_driver, + payment_network=args.payment_network, + show_usage=args.show_usage, + ) + + +if __name__ == "__main__": + parser = build_parser("Render a Blender scene") + parser.add_argument("--show-usage", action="store_true", help="show activity usage and cost") + parser.add_argument( + "--min-cpu-threads", + type=int, + default=1, + help="require the provider nodes to have at least this number of available CPU threads", + ) + parser.add_argument( + "--image-tag", help="Image tag to use when resolving image url from Golem Registry" + ) + parser.add_argument( + "--image-hash", help="Image hash to use when resolving image url from Golem Registry" + ) + parser.add_argument( + "--image-url", help="Direct image url to use instead of resolving from Golem Registry" + ) + parser.add_argument( + "--image-use-https", + help="Whether to use https when resolving image url from Golem Registry", + action="store_true", + ) + now = datetime.now().strftime("%Y-%m-%d_%H.%M.%S") + parser.set_defaults(log_file=f"blender-yapapi-{now}.log") + cmd_args = parser.parse_args() + + run_golem_example( + main(args=cmd_args), + log_file=cmd_args.log_file, + ) diff --git a/pyproject.toml b/pyproject.toml index 8bdcc4e2b..43c8a5059 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,6 @@ jsonrpc-base = "^1.0.3" ya-aioclient = "^0.6.4" toml = "^0.10.1" -srvresolver = "^0.3.5" colorama = "^0.4.4" semantic-version = "^2.8" attrs = ">=19.3" @@ -82,7 +81,7 @@ _format_black = "black ." tests_unit = {cmd = "pytest --cov=yapapi --cov-report html --cov-report term -sv --ignore tests/goth_tests", help = "Run only unit tests"} tests_integration_init = { sequence = ["_gothv_env", "_gothv_requirements", "_gothv_assets"], help="Initialize the integration test environment"} -tests_integration = { cmd = ".envs/yapapi-goth/bin/python -m pytest -svx tests/goth_tests --config-override docker-compose.build-environment.use-prerelease=false --config-path tests/goth_tests/assets/goth-config.yml --ssh-verify-connection --reruns 3 --only-rerun AssertionError --only-rerun TimeoutError --only-rerun goth.runner.exceptions.TemporalAssertionError --only-rerun urllib.error.URLError --only-rerun goth.runner.exceptions.CommandError", help = "Run the integration tests"} +tests_integration = { cmd = ".envs/yapapi-goth/bin/python -m pytest -svx tests/goth_tests --config-override docker-compose.build-environment.use-prerelease=false --config-path tests/goth_tests/assets/goth-config.yml --ssh-verify-connection --reruns 3 --only-rerun AssertionError --only-rerun TimeoutError --only-rerun goth.runner.exceptions.TemporalAssertionError --only-rerun urllib.error.URLError --only-rerun goth.runner.exceptions.CommandError --only-rerun requests.exceptions.ConnectionError --only-rerun OSError", help = "Run the integration tests"} _gothv_env = "python -m venv .envs/yapapi-goth" _gothv_requirements = ".envs/yapapi-goth/bin/pip install -U --extra-index-url https://test.pypi.org/simple/ goth==0.14.1 pip pytest pytest-asyncio pytest-rerunfailures pexpect" _gothv_assets = ".envs/yapapi-goth/bin/python -m goth create-assets tests/goth_tests/assets" diff --git a/tests/payload/test_repo.py b/tests/payload/test_repo.py new file mode 100644 index 000000000..6a32268e2 --- /dev/null +++ b/tests/payload/test_repo.py @@ -0,0 +1,104 @@ +from unittest.mock import AsyncMock + +import pytest + +from yapapi.payload import vm +from yapapi.payload.package import PackageException + +_MOCK_HTTP_ADDR = "http://test.address/" +_MOCK_HTTPS_ADDR = "https://test.address/" +_MOCK_SHA3 = "abcdef124356789" +_MOCK_SIZE = 2**24 + + +async def _mock_response(*args, **kwargs): + mock = AsyncMock() + mock.status = 200 + mock.json.return_value = { + "http": _MOCK_HTTP_ADDR, + "https": _MOCK_HTTPS_ADDR, + "sha3": _MOCK_SHA3, + "size": _MOCK_SIZE, + } + return mock + + +@pytest.mark.parametrize( + "image_hash, image_tag, image_url, image_use_https, " + "expected_url, expected_error, expected_error_msg", + ( + ("testhash", None, None, False, f"hash:sha3:{_MOCK_SHA3}:{_MOCK_HTTP_ADDR}", None, ""), + (None, "testtag", None, False, f"hash:sha3:{_MOCK_SHA3}:{_MOCK_HTTP_ADDR}", None, ""), + ("testhash", None, None, True, f"hash:sha3:{_MOCK_SHA3}:{_MOCK_HTTPS_ADDR}", None, ""), + (None, "testtag", None, True, f"hash:sha3:{_MOCK_SHA3}:{_MOCK_HTTPS_ADDR}", None, ""), + ("testhash", None, "http://image", False, "hash:sha3:testhash:http://image", None, ""), + ( + None, + None, + None, + False, + None, + PackageException, + "Either an image_hash or an image_tag is required " + "to resolve an image URL from the Golem Registry", + ), + ( + None, + None, + "http://image", + False, + None, + PackageException, + "An image_hash is required when using a direct image_url", + ), + ( + None, + "testtag", + "http://image", + False, + None, + PackageException, + "An image_tag can only be used when resolving " + "from Golem Registry, not with a direct image_url", + ), + ( + "testhash", + "testtag", + None, + False, + None, + PackageException, + "Golem Registry images can be resolved by either " + "an image_hash or by an image_tag but not both", + ), + ), +) +@pytest.mark.asyncio +async def test_repo( + monkeypatch, + image_hash, + image_tag, + image_url, + image_use_https, + expected_url, + expected_error, + expected_error_msg, +): + monkeypatch.setattr("aiohttp.ClientSession.get", _mock_response) + monkeypatch.setattr("aiohttp.ClientSession.head", _mock_response) + + package_awaitable = vm.repo( + image_hash=image_hash, + image_tag=image_tag, + image_url=image_url, + image_use_https=image_use_https, + ) + + if expected_error: + with pytest.raises(expected_error) as e: + _ = await package_awaitable + assert expected_error_msg in str(e) + else: + package = await package_awaitable + url = await package.resolve_url() + assert url == expected_url diff --git a/tests/payload/test_vm.py b/tests/payload/test_vm.py deleted file mode 100644 index 000b01441..000000000 --- a/tests/payload/test_vm.py +++ /dev/null @@ -1,28 +0,0 @@ -from unittest import mock - -from dns.exception import DNSException -from srvresolver.srv_resolver import SRVRecord # type: ignore - -from yapapi.payload.vm import _FALLBACK_REPO_URL, resolve_repo_srv - -_MOCK_HOST = "non.existent.domain" -_MOCK_PORT = 9999 - - -@mock.patch( - "yapapi.payload.vm.SRVResolver.resolve_random", - mock.Mock(return_value=SRVRecord(host=_MOCK_HOST, port=_MOCK_PORT, weight=1, priority=1)), -) -def test_resolve_srv(): - assert resolve_repo_srv("") == f"http://{_MOCK_HOST}:{_MOCK_PORT}" - - -@mock.patch("yapapi.payload.vm.SRVResolver.resolve_random", mock.Mock(side_effect=DNSException())) -def test_resolve_srv_exception(): - # should be: - - # with pytest.raises(DNSException): - # assert resolve_repo_srv("") - - # temporary work-around: - assert resolve_repo_srv("") == _FALLBACK_REPO_URL diff --git a/yapapi/payload/package.py b/yapapi/payload/package.py index 039b35aba..348e352d9 100644 --- a/yapapi/payload/package.py +++ b/yapapi/payload/package.py @@ -1,10 +1,14 @@ import abc +import logging +from typing import Optional import aiohttp from dataclasses import dataclass from yapapi.payload import Payload +logger = logging.getLogger(__name__) + class PackageException(Exception): """Exception raised on any problems related to the package repository.""" @@ -19,7 +23,7 @@ async def resolve_url(self) -> str: """Return package URL.""" -async def resolve_package_url(image_url: str, image_hash: str) -> str: +async def check_package_url(image_url: str, image_hash: str) -> str: async with aiohttp.ClientSession() as client: resp = await client.head(image_url, allow_redirects=True) if resp.status != 200: @@ -28,11 +32,74 @@ async def resolve_package_url(image_url: str, image_hash: str) -> str: return f"hash:sha3:{image_hash}:{image_url}" -async def resolve_package_repo_url(repo_url: str, image_hash: str) -> str: +def _sizeof_fmt(num, suffix="B"): + for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: + if abs(num) < 1024.0: + return f"{num:3.2f}{unit}{suffix}" + num /= 1024.0 + return f"{num:.1f}Yi{suffix}" + + +async def resolve_package_url( + repo_url: str, + image_tag: Optional[str] = None, + image_hash: Optional[str] = None, + image_use_https: bool = False, + dev_mode: bool = False, +) -> str: + params = {} + + if image_tag: + params["tag"] = image_tag + + if image_hash: + params["hash"] = image_hash + + if not params: + raise PackageException( + "Either an image_hash or an image_tag is required " + "to resolve an image URL from the Golem Registry." + ) + + if "tag" in params and "hash" in params: + raise PackageException( + "Golem Registry images can be resolved by " + "either an image_hash or by an image_tag but not both." + ) + + if dev_mode: + # if dev, skip usage statistics, pass dev option for statistics + params["dev"] = "true" + else: + params["count"] = "true" + async with aiohttp.ClientSession() as client: - resp = await client.get(f"{repo_url}/image.{image_hash}.link") + url = f"{repo_url}/v1/image/info" + logger.debug(f"Querying registry portal: url={url}, params={params}") + resp = await client.get(url, params=params) if resp.status != 200: - resp.raise_for_status() + try: + text = await resp.text() + except Exception as ex: + logger.error(f"Failed to get body of response: {ex}") + text = "N/A" - image_url = await resp.text() - return f"hash:sha3:{image_hash}:{image_url}" + logger.error(f"Failed to resolve image URL: {resp.status} {text}") + raise Exception(f"Failed to resolve image URL: {resp.status} {text}") + + json_resp = await resp.json() + + if image_use_https: + image_url = json_resp["https"] + else: + image_url = json_resp["http"] + image_hash = json_resp["sha3"] + image_size = json_resp["size"] + logger.debug( + f"Resolved image: " + f"url={image_url}, " + f"size={_sizeof_fmt(image_size)}, " + f"hash={image_hash}" + ) + + return f"hash:sha3:{image_hash}:{image_url}" diff --git a/yapapi/payload/vm.py b/yapapi/payload/vm.py index dec5dc0cc..6cda613dc 100644 --- a/yapapi/payload/vm.py +++ b/yapapi/payload/vm.py @@ -3,26 +3,17 @@ from typing import Final, List, Literal, Optional from dataclasses import dataclass -from dns.exception import DNSException -from srvresolver.srv_resolver import SRVRecord, SRVResolver - -from yapapi.payload.package import ( - Package, - PackageException, - resolve_package_repo_url, - resolve_package_url, -) + +from yapapi.payload.package import Package, PackageException, check_package_url, resolve_package_url from yapapi.props import base as prop_base from yapapi.props import inf from yapapi.props.builder import DemandBuilder, Model from yapapi.props.inf import INF_CORES, RUNTIME_VM, ExeUnitManifestRequest, ExeUnitRequest, InfBase -_DEFAULT_REPO_SRV: Final[str] = "_girepo._tcp.dev.golem.network" -_FALLBACK_REPO_URL: Final[str] = "http://girepo.dev.golem.network:8000" -_DEFAULT_TIMEOUT_SECONDS: Final[int] = 10 - logger = logging.getLogger(__name__) +DEFAULT_REPOSITORY_URL: Final[str] = "https://registry.golem.network" + VM_CAPS_VPN: str = "vpn" VM_CAPS_MANIFEST_SUPPORT: str = "manifest-support" @@ -150,26 +141,27 @@ async def manifest( @dataclass class _VmPackage(Package): - repo_url: str - image_hash: str - image_url: Optional[str] + image_url: str constraints: _VmConstraints async def resolve_url(self) -> str: - if not self.image_url: - return await resolve_package_repo_url(self.repo_url, self.image_hash) - return await resolve_package_url(self.image_url, self.image_hash) + return self.image_url async def decorate_demand(self, demand: DemandBuilder): - image_url = await self.resolve_url() demand.ensure(str(self.constraints)) - demand.add(VmRequest(package_url=image_url, package_format=VmPackageFormat.GVMKIT_SQUASH)) + demand.add( + VmRequest(package_url=self.image_url, package_format=VmPackageFormat.GVMKIT_SQUASH) + ) async def repo( *, - image_hash: str, + image_hash: Optional[str] = None, + image_tag: Optional[str] = None, image_url: Optional[str] = None, + image_use_https: bool = False, + repository_url: str = DEFAULT_REPOSITORY_URL, + dev_mode: bool = False, # noqa min_mem_gib: float = 0.5, min_storage_gib: float = 2.0, min_cpu_threads: int = 1, @@ -179,7 +171,10 @@ async def repo( Build a reference to application package. :param image_hash: hash of the package's image + :param image_tag: Tag of the package to resolve from Golem Registry :param image_url: URL of the package's image + :param image_use_https: whether to resolve to HTTPS or HTTP when using Golem Registry + :param repository_url: override the package repository location :param min_mem_gib: minimal memory required to execute application code :param min_storage_gib: minimal disk storage to execute tasks :param min_cpu_threads: minimal available logical CPU cores @@ -217,45 +212,39 @@ async def repo( min_mem_gib=0.5, # only run on provider nodes that have more than 2gb of storage space available min_storage_gib=2.0, - # only run on provider nodes which a certain number of CPU threads available + # only run on provider nodes with a certain number of CPU threads available min_cpu_threads=min_cpu_threads, ) """ + + if image_url: + if image_tag: + raise PackageException( + "An image_tag can only be used when resolving from Golem Registry, " + "not with a direct image_url." + ) + if not image_hash: + raise PackageException("An image_hash is required when using a direct image_url.") + logger.debug(f"Verifying if {image_url} exists.") + resolved_image_url = await check_package_url(image_url, image_hash) + else: + logger.debug( + f"Resolving image on {repository_url}: " + f"image_hash={image_hash}, image_tag={image_tag}, image_use_https={image_use_https}." + ) + resolved_image_url = await resolve_package_url( + repository_url, + image_hash=image_hash, + image_tag=image_tag, + image_use_https=image_use_https, + dev_mode=dev_mode, + ) + + logger.debug(f"Resolved image: {resolved_image_url}") + capabilities = capabilities or list() return _VmPackage( - repo_url=resolve_repo_srv(_DEFAULT_REPO_SRV), - image_hash=image_hash, - image_url=image_url, + image_url=resolved_image_url, constraints=_VmConstraints(min_mem_gib, min_storage_gib, min_cpu_threads, capabilities), ) - - -def resolve_repo_srv( - repo_srv: str, fallback_url=_FALLBACK_REPO_URL, timeout=_DEFAULT_TIMEOUT_SECONDS -) -> str: - """ - Get the url of the package repository based on its SRV record address. - - :param repo_srv: the SRV domain name - :param fallback_url: temporary hardcoded fallback url in case there's a problem resolving SRV - :param timeout: socket connection timeout in seconds - :return: the url of the package repository containing the port - :raises: PackageException if no valid service could be reached - """ - try: - try: - srv: Optional[SRVRecord] = SRVResolver.resolve_random(repo_srv, timeout=timeout) - except DNSException as e: - raise PackageException(f"Could not resolve Golem package repository address [{e}].") - - if not srv: - raise PackageException("Golem package repository is currently unavailable.") - except Exception as e: - # this is a temporary fallback for a problem resolving the SRV record - logger.warning( - "Problem resolving %s, falling back to %s, exception: %s", repo_srv, fallback_url, e - ) - return fallback_url - - return f"http://{srv.host}:{srv.port}"