diff --git a/examples/blender/start_stop_blender.py b/examples/blender/start_stop_blender.py new file mode 100755 index 000000000..2bac33f1c --- /dev/null +++ b/examples/blender/start_stop_blender.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python3 + +"""Other version of the blender example. + +Instead of using Golem in the default way - as a context manager - we directly call `Golem.start()` and `Golem.stop()`. +This way of using Golem might be more convenient for some specific use cases (although doesn't change a lot +in the blender example). +""" + +import asyncio +from datetime import datetime, timedelta +import pathlib +import sys + +from yapapi import ( + Golem, + NoPaymentAccountError, + Task, + __version__ as yapapi_version, + WorkContext, + windows_event_loop_fix, +) +from yapapi.log import enable_default_logger +from yapapi.payload import vm +from yapapi.rest.activity import BatchTimeoutError + +examples_dir = pathlib.Path(__file__).resolve().parent.parent +sys.path.append(str(examples_dir)) + +from utils import ( + build_parser, + TEXT_COLOR_CYAN, + TEXT_COLOR_DEFAULT, + TEXT_COLOR_RED, + TEXT_COLOR_YELLOW, + TEXT_COLOR_MAGENTA, + format_usage, +) + + +async def main(golem, show_usage=False): + print( + f"yapapi version: {TEXT_COLOR_YELLOW}{yapapi_version}{TEXT_COLOR_DEFAULT}\n" + f"Using subnet: {TEXT_COLOR_YELLOW}{golem.subnet_tag}{TEXT_COLOR_DEFAULT}, " + f"payment driver: {TEXT_COLOR_YELLOW}{golem.payment_driver}{TEXT_COLOR_DEFAULT}, " + f"and network: {TEXT_COLOR_YELLOW}{golem.payment_network}{TEXT_COLOR_DEFAULT}\n" + ) + + package = await vm.repo( + image_hash="9a3b5d67b0b27746283cb5f287c13eab1beaa12d92a9f536b747c7ae", + min_mem_gib=0.5, + min_storage_gib=2.0, + ) + + async def worker(ctx: WorkContext, tasks): + script_dir = pathlib.Path(__file__).resolve().parent + scene_path = str(script_dir / "cubes.blend") + ctx.send_file(scene_path, "/golem/resource/scene.blend") + async for task in tasks: + frame = task.data + crops = [{"outfilebasename": "out", "borders_x": [0.0, 1.0], "borders_y": [0.0, 1.0]}] + ctx.send_json( + "/golem/work/params.json", + { + "scene_file": "/golem/resource/scene.blend", + "resolution": (400, 300), + "use_compositing": False, + "crops": crops, + "samples": 100, + "frames": [frame], + "output_format": "PNG", + "RESOURCES_DIR": "/golem/resources", + "WORK_DIR": "/golem/work", + "OUTPUT_DIR": "/golem/output", + }, + ) + ctx.run("/golem/entrypoints/run-blender.sh") + output_file = f"output_{frame}.png" + ctx.download_file(f"/golem/output/out{frame:04d}.png", output_file) + try: + # Set timeout for executing the script on the provider. Usually, 30 seconds + # should be more than enough for computing a single frame, however a provider + # may require more time for the first task if it needs to download a VM image + # first. Once downloaded, the VM image will be cached and other tasks that use + # that image will be computed faster. + yield ctx.commit(timeout=timedelta(minutes=10)) + # TODO: Check if job results are valid + # and reject by: task.reject_task(reason = 'invalid file') + task.accept_result(result=output_file) + except BatchTimeoutError: + print( + f"{TEXT_COLOR_RED}" + f"Task {task} timed out on {ctx.provider_name}, time: {task.running_time}" + f"{TEXT_COLOR_DEFAULT}" + ) + raise + + if show_usage: + raw_state = await ctx.get_raw_state() + usage = format_usage(await ctx.get_usage()) + cost = await ctx.get_cost() + print( + f"{TEXT_COLOR_MAGENTA}" + f" --- {ctx.provider_name} STATE: {raw_state}" + f"{TEXT_COLOR_DEFAULT}" + ) + print( + f"{TEXT_COLOR_MAGENTA}" + f" --- {ctx.provider_name} USAGE: {usage}" + f"{TEXT_COLOR_DEFAULT}" + ) + print( + f"{TEXT_COLOR_MAGENTA}" + f" --- {ctx.provider_name} COST: {cost}" + f"{TEXT_COLOR_DEFAULT}" + ) + + # Iterator over the frame indices that we want to render + frames: range = range(0, 60, 10) + # Worst-case overhead, in minutes, for initialization (negotiation, file transfer etc.) + # TODO: make this dynamic, e.g. depending on the size of files to transfer + init_overhead = 3 + # Providers will not accept work if the timeout is outside of the [5 min, 30min] range. + # We increase the lower bound to 6 min to account for the time needed for our demand to + # reach the providers. + min_timeout, max_timeout = 6, 30 + + timeout = timedelta(minutes=max(min(init_overhead + len(frames) * 2, max_timeout), min_timeout)) + num_tasks = 0 + start_time = datetime.now() + + await golem.start() + completed_tasks = golem.execute_tasks( + worker, + [Task(data=frame) for frame in frames], + payload=package, + max_workers=3, + timeout=timeout, + ) + async for task in completed_tasks: + num_tasks += 1 + print( + f"{TEXT_COLOR_CYAN}" + f"Task computed: {task}, result: {task.result}, time: {task.running_time}" + f"{TEXT_COLOR_DEFAULT}" + ) + + print( + f"{TEXT_COLOR_CYAN}" + f"{num_tasks} tasks computed, total time: {datetime.now() - start_time}" + f"{TEXT_COLOR_DEFAULT}" + ) + await golem.stop() + + +if __name__ == "__main__": + parser = build_parser("Render a Blender scene") + parser.add_argument("--show-usage", action="store_true", help="show activity usage and cost") + now = datetime.now().strftime("%Y-%m-%d_%H.%M.%S") + parser.set_defaults(log_file=f"blender-yapapi-{now}.log") + args = parser.parse_args() + + # This is only required when running on Windows with Python prior to 3.8: + windows_event_loop_fix() + + enable_default_logger( + log_file=args.log_file, + debug_activity_api=True, + debug_market_api=True, + debug_payment_api=True, + ) + + loop = asyncio.get_event_loop() + + golem = Golem( + budget=10, + subnet_tag=args.subnet_tag, + payment_driver=args.payment_driver, + payment_network=args.payment_network, + ) + task = loop.create_task(main(golem, show_usage=args.show_usage)) + + try: + loop.run_until_complete(task) + except NoPaymentAccountError as e: + handbook_url = ( + "https://handbook.golem.network/requestor-tutorials/" + "flash-tutorial-of-requestor-development" + ) + print( + f"{TEXT_COLOR_RED}" + f"No payment account initialized for driver `{e.required_driver}` " + f"and network `{e.required_network}`.\n\n" + f"See {handbook_url} on how to initialize payment accounts for a requestor node." + f"{TEXT_COLOR_DEFAULT}" + ) + except KeyboardInterrupt: + print( + f"{TEXT_COLOR_YELLOW}" + "Shutting down gracefully, please wait a short while " + "or press Ctrl+C to exit immediately..." + f"{TEXT_COLOR_DEFAULT}" + ) + task.cancel() + try: + loop.run_until_complete(task) + print( + f"{TEXT_COLOR_YELLOW}Shutdown completed, thank you for waiting!{TEXT_COLOR_DEFAULT}" + ) + except (asyncio.CancelledError, KeyboardInterrupt): + pass diff --git a/yapapi/engine.py b/yapapi/engine.py index f28f1a4f7..ec95b1245 100644 --- a/yapapi/engine.py +++ b/yapapi/engine.py @@ -175,6 +175,8 @@ def __init__( self._services: Set[asyncio.Task] = set() self._stack = AsyncExitStack() + self._started = False + async def create_demand_builder( self, expiration_time: datetime, payload: Payload ) -> DemandBuilder: @@ -212,6 +214,11 @@ def subnet_tag(self) -> Optional[str]: """Return the name of the subnet used by this engine, or `None` if it is not set.""" return self._subnet + @property + def started(self) -> bool: + """Return `True` if this instance is initialized, `False` otherwise.""" + return self._started + def emit(self, event: events.Event) -> None: """Emit an event to be consumed by this engine's event consumer.""" if self._wrapped_consumer: @@ -276,6 +283,8 @@ def report_shutdown(*exc_info): stack.push_async_exit(self._shutdown) + self._started = True + async def add_to_async_context(self, async_context_manager: AsyncContextManager) -> None: await self._stack.enter_async_context(async_context_manager) diff --git a/yapapi/golem.py b/yapapi/golem.py index 166d15555..401baa8d8 100644 --- a/yapapi/golem.py +++ b/yapapi/golem.py @@ -1,3 +1,4 @@ +import asyncio import sys from datetime import datetime, timedelta import json @@ -125,6 +126,7 @@ def __init__( } self._engine: _Engine = self._get_new_engine() + self._engine_state_lock = asyncio.Lock() @property def driver(self) -> str: @@ -164,19 +166,41 @@ def subnet_tag(self) -> Optional[str]: """Return the name of the subnet, or `None` if it is not set.""" return self._engine.subnet_tag - async def __aenter__(self) -> "Golem": + @property + def operative(self) -> bool: + """Return True if Golem started and didn't stop""" + return self._engine.started + + async def start(self) -> None: try: - await self._engine.start() - return self + async with self._engine_state_lock: + if self.operative: + # Something started us before we got to the locked part + return + await self._engine.start() except: - await self.__aexit__(*sys.exc_info()) + await self._stop_with_exc_info(*sys.exc_info()) raise + async def stop(self) -> None: + await self._stop_with_exc_info(None, None, None) + + async def __aenter__(self) -> "Golem": + await self.start() + return self + async def __aexit__(self, *exc_info) -> Optional[bool]: - res = await self._engine.stop(*exc_info) + return await self._stop_with_exc_info(*exc_info) + + async def _stop_with_exc_info(self, *exc_info) -> Optional[bool]: + async with self._engine_state_lock: + if not self.operative: + # Something stopped us before we got to the locked part + return None + res = await self._engine.stop(*exc_info) - # Engine that was stopped is not usable anymore, there is no "full" cleanup - # That's why here we replace it with a fresh one + # Engine that was stopped is not usable anymore, there is no "full" cleanup. + # That's why here we replace it with a fresh one. self._engine = self._get_new_engine() return res