Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scx1332/golem registry integration #1138

Merged
merged 27 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
2c17812
Added image tag selection for blender
scx1332 Jun 1, 2023
74b9d8b
Working on registry integration
scx1332 Jun 21, 2023
8174017
Working on resolving images using golem registry
scx1332 Jun 21, 2023
0ae907e
Working on resolving packages
scx1332 Jun 21, 2023
a2ebe7e
Modified blender example only
scx1332 Jun 21, 2023
0d42e76
Working on blender example
scx1332 Jun 21, 2023
b822d2c
Revert init.py
scx1332 Jun 21, 2023
2c9b50a
Removed srv test (no longer needed)
scx1332 Jun 21, 2023
a0de9e5
Trying to fix styling
scx1332 Jun 21, 2023
26b6795
format
scx1332 Jun 21, 2023
cb0943d
Fix type checkings
scx1332 Jul 1, 2023
9a01040
Merge branch 'master' into scx1332/golem_registry_integration
scx1332 Jul 1, 2023
5090bdc
Merge branch 'master' into scx1332/golem_registry_integration
scx1332 Jul 20, 2023
d3b760a
Revert accidental commit
scx1332 Jul 20, 2023
eb33aca
revert accidental commit
scx1332 Jul 20, 2023
95d98df
Update
scx1332 Jul 20, 2023
b68048c
fix
scx1332 Jul 20, 2023
eaa3a3d
formatting
scx1332 Jul 20, 2023
e39174c
revert the blender example, add registry usage as a separate example
shadeofblue Aug 14, 2023
de43e7e
pull request review
shadeofblue Aug 14, 2023
d8c0e8b
add tests for vm.repo
shadeofblue Aug 14, 2023
bda5982
remove overeager checks
shadeofblue Aug 16, 2023
5505c94
throw PackageException instead of the generic ValueError when resolvi…
shadeofblue Aug 16, 2023
77ae913
use `dev_mode` argument to `vm.repo` to mark resolve queries as "dev …
shadeofblue Aug 16, 2023
373df26
remove `srvresolver` from requirements, add re-run for the connection…
shadeofblue Aug 17, 2023
242149e
remove irrelevant `todo` ;)
shadeofblue Aug 17, 2023
9a062fa
add one more error to retry list
shadeofblue Aug 17, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 207 additions & 0 deletions examples/blender/blender_registry_usage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
#!/usr/bin/env python3
import pathlib
import sys
from datetime import datetime, timedelta

from yapapi import Golem, Task, WorkContext
from yapapi.payload import vm
from yapapi.rest.activity import BatchTimeoutError

examples_dir = pathlib.Path(__file__).resolve().parent.parent
sys.path.append(str(examples_dir))

from utils import (
TEXT_COLOR_CYAN,
TEXT_COLOR_DEFAULT,
TEXT_COLOR_MAGENTA,
TEXT_COLOR_RED,
build_parser,
format_usage,
print_env_info,
run_golem_example,
)


async def start(subnet_tag, package, payment_driver=None, payment_network=None, show_usage=False):
async def worker(ctx: WorkContext, tasks):
script_dir = pathlib.Path(__file__).resolve().parent
scene_path = str(script_dir / "cubes.blend")

# Set timeout for the first script executed on the provider. Usually, 30 seconds
# should be more than enough for computing a single frame of the provided scene,
# however a provider may require more time for the first task if it needs to download
# the VM image first. Once downloaded, the VM image will be cached and other tasks that use
# that image will be computed faster.
script = ctx.new_script(timeout=timedelta(minutes=10))
script.upload_file(scene_path, "/golem/resource/scene.blend")

async for task in tasks:
frame = task.data
crops = [{"outfilebasename": "out", "borders_x": [0.0, 1.0], "borders_y": [0.0, 1.0]}]
script.upload_json(
{
"scene_file": "/golem/resource/scene.blend",
"resolution": (400, 300),
"use_compositing": False,
"crops": crops,
"samples": 100,
"frames": [frame],
"output_format": "PNG",
"RESOURCES_DIR": "/golem/resources",
"WORK_DIR": "/golem/work",
"OUTPUT_DIR": "/golem/output",
},
"/golem/work/params.json",
)

script.run("/golem/entrypoints/run-blender.sh")
output_file = f"output_{frame}.png"
script.download_file(f"/golem/output/out{frame:04d}.png", output_file)
try:
yield script
# TODO: Check if job results are valid
# and reject by: task.reject_task(reason = 'invalid file')
task.accept_result(result=output_file)
except BatchTimeoutError:
print(
f"{TEXT_COLOR_RED}"
f"Task {task} timed out on {ctx.provider_name}, time: {task.running_time}"
f"{TEXT_COLOR_DEFAULT}"
)
raise

# reinitialize the script which we send to the engine to compute subsequent frames
script = ctx.new_script(timeout=timedelta(minutes=1))

if show_usage:
raw_state = await ctx.get_raw_state()
usage = format_usage(await ctx.get_usage())
cost = await ctx.get_cost()
print(
f"{TEXT_COLOR_MAGENTA}"
f" --- {ctx.provider_name} STATE: {raw_state}\n"
f" --- {ctx.provider_name} USAGE: {usage}\n"
f" --- {ctx.provider_name} COST: {cost}"
f"{TEXT_COLOR_DEFAULT}"
)

# Iterator over the frame indices that we want to render
frames: range = range(0, 60, 10)
# Worst-case overhead, in minutes, for initialization (negotiation, file transfer etc.)
# TODO: make this dynamic, e.g. depending on the size of files to transfer
init_overhead = 3
# Providers will not accept work if the timeout is outside of the [5 min, 30min] range.
# We increase the lower bound to 6 min to account for the time needed for our demand to
# reach the providers.
min_timeout, max_timeout = 6, 30

timeout = timedelta(minutes=max(min(init_overhead + len(frames) * 2, max_timeout), min_timeout))

async with Golem(
budget=10.0,
subnet_tag=subnet_tag,
payment_driver=payment_driver,
payment_network=payment_network,
) as golem:
print_env_info(golem)

num_tasks = 0
start_time = datetime.now()

completed_tasks = golem.execute_tasks(
worker,
[Task(data=frame) for frame in frames],
payload=package,
max_workers=3,
timeout=timeout,
)
async for task in completed_tasks:
num_tasks += 1
print(
f"{TEXT_COLOR_CYAN}"
f"Task computed: {task}, result: {task.result}, time: {task.running_time}"
f"{TEXT_COLOR_DEFAULT}"
)

print(
f"{TEXT_COLOR_CYAN}"
f"{num_tasks} tasks computed, total time: {datetime.now() - start_time}"
f"{TEXT_COLOR_DEFAULT}"
)


async def create_package(args, default_image_tag):
# Use golem/blender:latest image tag,
# you can overwrite this option with --image-tag or --image-hash
if args.image_url and args.image_tag:
raise ValueError("Only one of --image-url and --image-tag can be specified")
if args.image_url and not args.image_hash:
raise ValueError("--image-url requires --image-hash to be specified")
if args.image_hash and args.image_tag:
raise ValueError("Only one of --image-hash and --image-tag can be specified")
elif args.image_hash:
image_tag = None
else:
image_tag = args.image_tag or default_image_tag

# resolve image by tag, hash or direct link
package = await vm.repo(
image_tag=image_tag,
image_hash=args.image_hash,
image_url=args.image_url,
image_use_https=args.image_use_https,
# only run on provider nodes that have more than 0.5gb of RAM available
min_mem_gib=0.5,
# only run on provider nodes that have more than 2gb of storage space available
min_storage_gib=2.0,
# only run on provider nodes which a certain number of CPU threads (logical CPU cores)
# available
min_cpu_threads=args.min_cpu_threads,
)
return package


async def main(args):
# Create a package using options specified in the command line
package = await create_package(args, default_image_tag="golem/blender:latest")

await start(
subnet_tag=args.subnet_tag,
package=package,
payment_driver=args.payment_driver,
payment_network=args.payment_network,
show_usage=args.show_usage,
)


if __name__ == "__main__":
parser = build_parser("Render a Blender scene")
parser.add_argument("--show-usage", action="store_true", help="show activity usage and cost")
parser.add_argument(
"--min-cpu-threads",
type=int,
default=1,
help="require the provider nodes to have at least this number of available CPU threads",
)
parser.add_argument(
"--image-tag", help="Image tag to use when resolving image url from Golem Registry"
)
parser.add_argument(
"--image-hash", help="Image hash to use when resolving image url from Golem Registry"
)
parser.add_argument(
"--image-url", help="Direct image url to use instead of resolving from Golem Registry"
)
parser.add_argument(
"--image-use-https",
help="Whether to use https when resolving image url from Golem Registry",
action="store_true",
)
now = datetime.now().strftime("%Y-%m-%d_%H.%M.%S")
parser.set_defaults(log_file=f"blender-yapapi-{now}.log")
cmd_args = parser.parse_args()

run_golem_example(
main(args=cmd_args),
log_file=cmd_args.log_file,
)
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ jsonrpc-base = "^1.0.3"

ya-aioclient = "^0.6.4"
toml = "^0.10.1"
srvresolver = "^0.3.5"
colorama = "^0.4.4"
semantic-version = "^2.8"
attrs = ">=19.3"
Expand Down Expand Up @@ -82,7 +81,7 @@ _format_black = "black ."

tests_unit = {cmd = "pytest --cov=yapapi --cov-report html --cov-report term -sv --ignore tests/goth_tests", help = "Run only unit tests"}
tests_integration_init = { sequence = ["_gothv_env", "_gothv_requirements", "_gothv_assets"], help="Initialize the integration test environment"}
tests_integration = { cmd = ".envs/yapapi-goth/bin/python -m pytest -svx tests/goth_tests --config-override docker-compose.build-environment.use-prerelease=false --config-path tests/goth_tests/assets/goth-config.yml --ssh-verify-connection --reruns 3 --only-rerun AssertionError --only-rerun TimeoutError --only-rerun goth.runner.exceptions.TemporalAssertionError --only-rerun urllib.error.URLError --only-rerun goth.runner.exceptions.CommandError", help = "Run the integration tests"}
tests_integration = { cmd = ".envs/yapapi-goth/bin/python -m pytest -svx tests/goth_tests --config-override docker-compose.build-environment.use-prerelease=false --config-path tests/goth_tests/assets/goth-config.yml --ssh-verify-connection --reruns 3 --only-rerun AssertionError --only-rerun TimeoutError --only-rerun goth.runner.exceptions.TemporalAssertionError --only-rerun urllib.error.URLError --only-rerun goth.runner.exceptions.CommandError --only-rerun requests.exceptions.ConnectionError --only-rerun OSError", help = "Run the integration tests"}
_gothv_env = "python -m venv .envs/yapapi-goth"
_gothv_requirements = ".envs/yapapi-goth/bin/pip install -U --extra-index-url https://test.pypi.org/simple/ goth==0.14.1 pip pytest pytest-asyncio pytest-rerunfailures pexpect"
_gothv_assets = ".envs/yapapi-goth/bin/python -m goth create-assets tests/goth_tests/assets"
Expand Down
104 changes: 104 additions & 0 deletions tests/payload/test_repo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from unittest.mock import AsyncMock

import pytest

from yapapi.payload import vm
from yapapi.payload.package import PackageException

_MOCK_HTTP_ADDR = "http://test.address/"
_MOCK_HTTPS_ADDR = "https://test.address/"
_MOCK_SHA3 = "abcdef124356789"
_MOCK_SIZE = 2**24


async def _mock_response(*args, **kwargs):
mock = AsyncMock()
mock.status = 200
mock.json.return_value = {
"http": _MOCK_HTTP_ADDR,
"https": _MOCK_HTTPS_ADDR,
"sha3": _MOCK_SHA3,
"size": _MOCK_SIZE,
}
return mock


@pytest.mark.parametrize(
"image_hash, image_tag, image_url, image_use_https, "
"expected_url, expected_error, expected_error_msg",
(
("testhash", None, None, False, f"hash:sha3:{_MOCK_SHA3}:{_MOCK_HTTP_ADDR}", None, ""),
(None, "testtag", None, False, f"hash:sha3:{_MOCK_SHA3}:{_MOCK_HTTP_ADDR}", None, ""),
("testhash", None, None, True, f"hash:sha3:{_MOCK_SHA3}:{_MOCK_HTTPS_ADDR}", None, ""),
(None, "testtag", None, True, f"hash:sha3:{_MOCK_SHA3}:{_MOCK_HTTPS_ADDR}", None, ""),
("testhash", None, "http://image", False, "hash:sha3:testhash:http://image", None, ""),
(
None,
None,
None,
False,
None,
PackageException,
"Either an image_hash or an image_tag is required "
"to resolve an image URL from the Golem Registry",
),
(
None,
None,
"http://image",
False,
None,
PackageException,
"An image_hash is required when using a direct image_url",
),
(
None,
"testtag",
"http://image",
False,
None,
PackageException,
"An image_tag can only be used when resolving "
"from Golem Registry, not with a direct image_url",
),
(
"testhash",
"testtag",
None,
False,
None,
PackageException,
"Golem Registry images can be resolved by either "
"an image_hash or by an image_tag but not both",
),
),
)
@pytest.mark.asyncio
async def test_repo(
monkeypatch,
image_hash,
image_tag,
image_url,
image_use_https,
expected_url,
expected_error,
expected_error_msg,
):
monkeypatch.setattr("aiohttp.ClientSession.get", _mock_response)
monkeypatch.setattr("aiohttp.ClientSession.head", _mock_response)

package_awaitable = vm.repo(
image_hash=image_hash,
image_tag=image_tag,
image_url=image_url,
image_use_https=image_use_https,
)

if expected_error:
with pytest.raises(expected_error) as e:
_ = await package_awaitable
assert expected_error_msg in str(e)
else:
package = await package_awaitable
url = await package.resolve_url()
assert url == expected_url
28 changes: 0 additions & 28 deletions tests/payload/test_vm.py

This file was deleted.

Loading