Skip to content

Commit

Permalink
Golem.start() and Golem.stop() (#644)
Browse files Browse the repository at this point in the history
Golem can be used in a non-contextmanager mode.
Stopped instance can be started again.
  • Loading branch information
johny-b authored Sep 22, 2021
1 parent 5012c8d commit 5e14c4f
Show file tree
Hide file tree
Showing 3 changed files with 251 additions and 7 deletions.
211 changes: 211 additions & 0 deletions examples/blender/start_stop_blender.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
#!/usr/bin/env python3

"""Other version of the blender example.
Instead of using Golem in the default way - as a context manager - we directly call `Golem.start()` and `Golem.stop()`.
This way of using Golem might be more convenient for some specific use cases (although doesn't change a lot
in the blender example).
"""

import asyncio
from datetime import datetime, timedelta
import pathlib
import sys

from yapapi import (
Golem,
NoPaymentAccountError,
Task,
__version__ as yapapi_version,
WorkContext,
windows_event_loop_fix,
)
from yapapi.log import enable_default_logger
from yapapi.payload import vm
from yapapi.rest.activity import BatchTimeoutError

examples_dir = pathlib.Path(__file__).resolve().parent.parent
sys.path.append(str(examples_dir))

from utils import (
build_parser,
TEXT_COLOR_CYAN,
TEXT_COLOR_DEFAULT,
TEXT_COLOR_RED,
TEXT_COLOR_YELLOW,
TEXT_COLOR_MAGENTA,
format_usage,
)


async def main(golem, show_usage=False):
print(
f"yapapi version: {TEXT_COLOR_YELLOW}{yapapi_version}{TEXT_COLOR_DEFAULT}\n"
f"Using subnet: {TEXT_COLOR_YELLOW}{golem.subnet_tag}{TEXT_COLOR_DEFAULT}, "
f"payment driver: {TEXT_COLOR_YELLOW}{golem.payment_driver}{TEXT_COLOR_DEFAULT}, "
f"and network: {TEXT_COLOR_YELLOW}{golem.payment_network}{TEXT_COLOR_DEFAULT}\n"
)

package = await vm.repo(
image_hash="9a3b5d67b0b27746283cb5f287c13eab1beaa12d92a9f536b747c7ae",
min_mem_gib=0.5,
min_storage_gib=2.0,
)

async def worker(ctx: WorkContext, tasks):
script_dir = pathlib.Path(__file__).resolve().parent
scene_path = str(script_dir / "cubes.blend")
ctx.send_file(scene_path, "/golem/resource/scene.blend")
async for task in tasks:
frame = task.data
crops = [{"outfilebasename": "out", "borders_x": [0.0, 1.0], "borders_y": [0.0, 1.0]}]
ctx.send_json(
"/golem/work/params.json",
{
"scene_file": "/golem/resource/scene.blend",
"resolution": (400, 300),
"use_compositing": False,
"crops": crops,
"samples": 100,
"frames": [frame],
"output_format": "PNG",
"RESOURCES_DIR": "/golem/resources",
"WORK_DIR": "/golem/work",
"OUTPUT_DIR": "/golem/output",
},
)
ctx.run("/golem/entrypoints/run-blender.sh")
output_file = f"output_{frame}.png"
ctx.download_file(f"/golem/output/out{frame:04d}.png", output_file)
try:
# Set timeout for executing the script on the provider. Usually, 30 seconds
# should be more than enough for computing a single frame, however a provider
# may require more time for the first task if it needs to download a VM image
# first. Once downloaded, the VM image will be cached and other tasks that use
# that image will be computed faster.
yield ctx.commit(timeout=timedelta(minutes=10))
# TODO: Check if job results are valid
# and reject by: task.reject_task(reason = 'invalid file')
task.accept_result(result=output_file)
except BatchTimeoutError:
print(
f"{TEXT_COLOR_RED}"
f"Task {task} timed out on {ctx.provider_name}, time: {task.running_time}"
f"{TEXT_COLOR_DEFAULT}"
)
raise

if show_usage:
raw_state = await ctx.get_raw_state()
usage = format_usage(await ctx.get_usage())
cost = await ctx.get_cost()
print(
f"{TEXT_COLOR_MAGENTA}"
f" --- {ctx.provider_name} STATE: {raw_state}"
f"{TEXT_COLOR_DEFAULT}"
)
print(
f"{TEXT_COLOR_MAGENTA}"
f" --- {ctx.provider_name} USAGE: {usage}"
f"{TEXT_COLOR_DEFAULT}"
)
print(
f"{TEXT_COLOR_MAGENTA}"
f" --- {ctx.provider_name} COST: {cost}"
f"{TEXT_COLOR_DEFAULT}"
)

# Iterator over the frame indices that we want to render
frames: range = range(0, 60, 10)
# Worst-case overhead, in minutes, for initialization (negotiation, file transfer etc.)
# TODO: make this dynamic, e.g. depending on the size of files to transfer
init_overhead = 3
# Providers will not accept work if the timeout is outside of the [5 min, 30min] range.
# We increase the lower bound to 6 min to account for the time needed for our demand to
# reach the providers.
min_timeout, max_timeout = 6, 30

timeout = timedelta(minutes=max(min(init_overhead + len(frames) * 2, max_timeout), min_timeout))
num_tasks = 0
start_time = datetime.now()

await golem.start()
completed_tasks = golem.execute_tasks(
worker,
[Task(data=frame) for frame in frames],
payload=package,
max_workers=3,
timeout=timeout,
)
async for task in completed_tasks:
num_tasks += 1
print(
f"{TEXT_COLOR_CYAN}"
f"Task computed: {task}, result: {task.result}, time: {task.running_time}"
f"{TEXT_COLOR_DEFAULT}"
)

print(
f"{TEXT_COLOR_CYAN}"
f"{num_tasks} tasks computed, total time: {datetime.now() - start_time}"
f"{TEXT_COLOR_DEFAULT}"
)
await golem.stop()


if __name__ == "__main__":
parser = build_parser("Render a Blender scene")
parser.add_argument("--show-usage", action="store_true", help="show activity usage and cost")
now = datetime.now().strftime("%Y-%m-%d_%H.%M.%S")
parser.set_defaults(log_file=f"blender-yapapi-{now}.log")
args = parser.parse_args()

# This is only required when running on Windows with Python prior to 3.8:
windows_event_loop_fix()

enable_default_logger(
log_file=args.log_file,
debug_activity_api=True,
debug_market_api=True,
debug_payment_api=True,
)

loop = asyncio.get_event_loop()

golem = Golem(
budget=10,
subnet_tag=args.subnet_tag,
payment_driver=args.payment_driver,
payment_network=args.payment_network,
)
task = loop.create_task(main(golem, show_usage=args.show_usage))

try:
loop.run_until_complete(task)
except NoPaymentAccountError as e:
handbook_url = (
"https://handbook.golem.network/requestor-tutorials/"
"flash-tutorial-of-requestor-development"
)
print(
f"{TEXT_COLOR_RED}"
f"No payment account initialized for driver `{e.required_driver}` "
f"and network `{e.required_network}`.\n\n"
f"See {handbook_url} on how to initialize payment accounts for a requestor node."
f"{TEXT_COLOR_DEFAULT}"
)
except KeyboardInterrupt:
print(
f"{TEXT_COLOR_YELLOW}"
"Shutting down gracefully, please wait a short while "
"or press Ctrl+C to exit immediately..."
f"{TEXT_COLOR_DEFAULT}"
)
task.cancel()
try:
loop.run_until_complete(task)
print(
f"{TEXT_COLOR_YELLOW}Shutdown completed, thank you for waiting!{TEXT_COLOR_DEFAULT}"
)
except (asyncio.CancelledError, KeyboardInterrupt):
pass
9 changes: 9 additions & 0 deletions yapapi/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ def __init__(
self._services: Set[asyncio.Task] = set()
self._stack = AsyncExitStack()

self._started = False

async def create_demand_builder(
self, expiration_time: datetime, payload: Payload
) -> DemandBuilder:
Expand Down Expand Up @@ -212,6 +214,11 @@ def subnet_tag(self) -> Optional[str]:
"""Return the name of the subnet used by this engine, or `None` if it is not set."""
return self._subnet

@property
def started(self) -> bool:
"""Return `True` if this instance is initialized, `False` otherwise."""
return self._started

def emit(self, event: events.Event) -> None:
"""Emit an event to be consumed by this engine's event consumer."""
if self._wrapped_consumer:
Expand Down Expand Up @@ -276,6 +283,8 @@ def report_shutdown(*exc_info):

stack.push_async_exit(self._shutdown)

self._started = True

async def add_to_async_context(self, async_context_manager: AsyncContextManager) -> None:
await self._stack.enter_async_context(async_context_manager)

Expand Down
38 changes: 31 additions & 7 deletions yapapi/golem.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import sys
from datetime import datetime, timedelta
import json
Expand Down Expand Up @@ -125,6 +126,7 @@ def __init__(
}

self._engine: _Engine = self._get_new_engine()
self._engine_state_lock = asyncio.Lock()

@property
def driver(self) -> str:
Expand Down Expand Up @@ -164,19 +166,41 @@ def subnet_tag(self) -> Optional[str]:
"""Return the name of the subnet, or `None` if it is not set."""
return self._engine.subnet_tag

async def __aenter__(self) -> "Golem":
@property
def operative(self) -> bool:
"""Return True if Golem started and didn't stop"""
return self._engine.started

async def start(self) -> None:
try:
await self._engine.start()
return self
async with self._engine_state_lock:
if self.operative:
# Something started us before we got to the locked part
return
await self._engine.start()
except:
await self.__aexit__(*sys.exc_info())
await self._stop_with_exc_info(*sys.exc_info())
raise

async def stop(self) -> None:
await self._stop_with_exc_info(None, None, None)

async def __aenter__(self) -> "Golem":
await self.start()
return self

async def __aexit__(self, *exc_info) -> Optional[bool]:
res = await self._engine.stop(*exc_info)
return await self._stop_with_exc_info(*exc_info)

async def _stop_with_exc_info(self, *exc_info) -> Optional[bool]:
async with self._engine_state_lock:
if not self.operative:
# Something stopped us before we got to the locked part
return None
res = await self._engine.stop(*exc_info)

# Engine that was stopped is not usable anymore, there is no "full" cleanup
# That's why here we replace it with a fresh one
# Engine that was stopped is not usable anymore, there is no "full" cleanup.
# That's why here we replace it with a fresh one.
self._engine = self._get_new_engine()
return res

Expand Down

0 comments on commit 5e14c4f

Please sign in to comment.