From c099f39be3de9f47dfd5641164e455da60ef4670 Mon Sep 17 00:00:00 2001 From: azawlocki Date: Thu, 10 Jun 2021 10:49:32 +0200 Subject: [PATCH 1/5] Adjust README.md to changes in the API (#451) * Adjust README.md to changes in the API * Apply suggestions from code review Co-authored-by: shadeofblue Co-authored-by: shadeofblue --- README.md | 216 +++++++++++++++++------------------------------------- 1 file changed, 68 insertions(+), 148 deletions(-) diff --git a/README.md b/README.md index 2e8f50fe9..be20fcfb0 100644 --- a/README.md +++ b/README.md @@ -39,46 +39,69 @@ do that in the [yagna repository](https://github.com/golemfactory/yagna) and in read files) with it. This image needs to be packed and uploaded into Golem's image repository using our dedicated tool - [`gvmkit-build`](https://pypi.org/project/gvmkit-build/). * **create your requestor agent** - this is where `yapapi` comes in. Utilizing our high-level - API, the creation of a requestor agent should be straighforward and require minimal effort. - You can use examples contained in this repository (blender and hashcat) as references. + API, the creation of a requestor agent should be straightforward and require minimal effort. + You can use examples contained in this repository as references, the directory + [`examples/hello-world/`](https://github.com/golemfactory/yapapi/tree/master/examples/hello-world) + contains minimal examples of fully functional requestor agents and + is therefore the best place to start exploring. + ### Components There are a few components that are crucial for any requestor agent app: -#### Executor +#### Golem + +The heart of the high-level API is the `Golem` class (`yapapi.Golem`), which serves as +the "engine" of a requestor agent. `Golem` is responsible for finding providers interested in +the jobs you want to execute, negotiating agreements with them and processing payments. It also +implements core functionality required to execute commands on providers that have signed such agreements. + +`Golem` provides two entry points for executing jobs on the Golem network, corresponding to the +two basic modes of operation of a requestor agent: + +* The method `execute_tasks` allows you to submit a _task-based_ job for execution. Arguments +to this method must include a sequence of independent _tasks_ (units of work) to be distributed +among providers, a _payload_ (a VM image) required to compute them, and a _worker_ function, which +will be used to convert each task to a sequence of steps to be executed on a provider. You may also +specify the timeout for the whole job, the maximum number of providers used at any given time, +and the maximum amount that you want to spend. -The heart of the high-level API is the requestor's task executor (`yapapi.Executor`). -You tell it, among others, which package (VM image) will be used to run your task, -how much you'd like to pay and how many providers you'd like to involve in the execution. -Finally, you feed it the worker script and a list of `Task` objects to execute on providers. +* The method `run_service` allows you, as you probably guessed, to run a _service_ on Golem. +Instead of a task-processing worker function, an argument to `run_service` is a +class (a subclass of `yapapi.Service`) that implements the behaviour of your service in various +stages of its lifecycle (when it's _starting_, _running_ etc.). Additionally, you may specify +the number of _service instances_ you want to run and the service expiration datetime. -#### Worker script +Prior to version `0.6.0`, only task-based jobs could be executed. For more information on both types of jobs please refer to our [handbook](https://handbook.golem.network). + +#### Worker function -The `worker` will most likely be the very core of your requestor app. You need to define -this function in your agent code and then you pass it to the Executor. +The worker will most likely be the very core of your task-based requestor app. +You need to define this function in your agent code and then you pass it (as the value of +the `worker` parameter) to the `execute_tasks` method of `Golem`. -It receives a `WorkContext` (`yapapi.WorkContext`) object that serves +The worker receives a _work context_ (`yapapi.WorkContext`) object that serves as an interface between your script and the execution unit within the provider. Using the work context, you define the steps that the provider needs to execute in order to complete the job you're giving them - e.g. transferring files to and from the provider or running commands within the execution unit on the provider's end. -Depending on the number of workers, and thus, the maximum number of providers that your -Executor utilizes in parallel, a single worker may tackle several tasks -(units of your work) and you can differentiate the steps that need to happen once +Depending on the number of workers, and thus, the maximum number of providers that +`execute_tasks` utilizes in parallel, a single worker may tackle several tasks +and you can differentiate the steps that need to happen once per worker run, which usually means once per provider node - but that depends on the exact implementation of your worker function - from those that happen for each -individual unit of work. An example of the former would be an upload of a source -file that's common to each fragment; and of the latter - a step that triggers the -processing of the file using a set of parameters specified in the `Task` data. +task. An example of the former would be an upload of a source +file that's common to each task; and of the latter - a step that triggers the +processing of the file using a set of parameters specified for a particular task. #### Task -The `Task` (`yapapi.Task`) object describes a unit of work that your application needs +The _task_ (`yapapi.Task`) object describes a unit of work that your application needs to carry out. -The Executor will feed an instance of your worker - bound to a single provider node - +`Golem` will feed an instance of your worker - bound to a single provider node - with `Task` objects. The worker will be responsible for completing those tasks. Typically, it will turn each task into a sequence of steps to be executed in a single run of the execution script on a provider's machine, in order to compute the task's result. @@ -86,146 +109,43 @@ of the execution script on a provider's machine, in order to compute the task's ### Example -An example Golem application, using a Docker image containing the Blender renderer: - +An example task-based Golem application, using a minimal Docker image +(Python file with the example and the Dockerfile for the image reside in +[`examples/hello-world/`](https://github.com/golemfactory/yapapi/tree/master/examples/hello-world)): ```python import asyncio -from datetime import datetime, timedelta -import pathlib -import sys - -from yapapi import ( - Executor, - NoPaymentAccountError, - Task, - __version__ as yapapi_version, - WorkContext, - windows_event_loop_fix, -) -from yapapi.log import enable_default_logger, log_summary, log_event_repr # noqa +from typing import AsyncIterable + +from yapapi import Golem, Task, WorkContext +from yapapi.log import enable_default_logger from yapapi.payload import vm -from yapapi.rest.activity import BatchTimeoutError -async def main(subnet_tag, driver=None, network=None): +async def worker(context: WorkContext, tasks: AsyncIterable[Task]): + async for task in tasks: + context.run("/bin/sh", "-c", "date") + + future_results = yield context.commit() + results = await future_results + task.accept_result(result=results[-1]) + + +async def main(): package = await vm.repo( - image_hash="9a3b5d67b0b27746283cb5f287c13eab1beaa12d92a9f536b747c7ae", - min_mem_gib=0.5, - min_storage_gib=2.0, + image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376", ) - async def worker(ctx: WorkContext, tasks): - script_dir = pathlib.Path(__file__).resolve().parent - scene_path = str(script_dir / "cubes.blend") - ctx.send_file(scene_path, "/golem/resource/scene.blend") - async for task in tasks: - frame = task.data - crops = [{"outfilebasename": "out", "borders_x": [0.0, 1.0], "borders_y": [0.0, 1.0]}] - ctx.send_json( - "/golem/work/params.json", - { - "scene_file": "/golem/resource/scene.blend", - "resolution": (400, 300), - "use_compositing": False, - "crops": crops, - "samples": 100, - "frames": [frame], - "output_format": "PNG", - "RESOURCES_DIR": "/golem/resources", - "WORK_DIR": "/golem/work", - "OUTPUT_DIR": "/golem/output", - }, - ) - ctx.run("/golem/entrypoints/run-blender.sh") - output_file = f"output_{frame}.png" - ctx.download_file(f"/golem/output/out{frame:04d}.png", output_file) - try: - # Set timeout for executing the script on the provider. Two minutes is plenty - # of time for computing a single frame, for other tasks it may be not enough. - # If the timeout is exceeded, this worker instance will be shut down and all - # remaining tasks, including the current one, will be computed by other providers. - yield ctx.commit(timeout=timedelta(seconds=120)) - # TODO: Check if job results are valid - # and reject by: task.reject_task(reason = 'invalid file') - task.accept_result(result=output_file) - except BatchTimeoutError: - print( - f"Task {task} timed out on {ctx.provider_name}, time: {task.running_time}" - ) - raise - - frames: range = range(0, 60, 10) - init_overhead = 3 - min_timeout, max_timeout = 6, 30 - - timeout = timedelta(minutes=max(min(init_overhead + len(frames) * 2, max_timeout), min_timeout)) - - async with Executor( - package=package, - max_workers=3, - budget=10.0, - timeout=timeout, - subnet_tag=subnet_tag, - driver=driver, - network=network, - event_consumer=log_summary(), - ) as executor: - - sys.stderr.write( - f"yapapi version: {yapapi_version}\n" - f"Using subnet: {subnet_tag}, " - f"payment driver: {executor.driver}, " - f"and network: {executor.network}\n" - ) - - num_tasks = 0 - start_time = datetime.now() - - async for task in executor.submit(worker, [Task(data=frame) for frame in frames]): - num_tasks += 1 - print( - f"Task computed: {task}, result: {task.result}, time: {task.running_time}" - ) - - print( - f"{num_tasks} tasks computed, total time: {datetime.now() - start_time}" - ) + tasks = [Task(data=None)] + async with Golem(budget=1.0, subnet_tag="devnet-beta.2") as golem: + async for completed in golem.execute_tasks(worker, tasks, payload=package): + print(completed.result.stdout) -if __name__ == "__main__": - # This is only required when running on Windows with Python prior to 3.8: - windows_event_loop_fix() - enable_default_logger() +if __name__ == "__main__": + enable_default_logger(log_file="hello.log") loop = asyncio.get_event_loop() - task = loop.create_task( - main(subnet_tag="devnet-beta.1", driver="zksync", network="rinkeby") - ) - - try: - loop.run_until_complete(task) - except NoPaymentAccountError as e: - handbook_url = ( - "https://handbook.golem.network/requestor-tutorials/" - "flash-tutorial-of-requestor-development" - ) - print( - f"No payment account initialized for driver `{e.required_driver}` " - f"and network `{e.required_network}`.\n\n" - f"See {handbook_url} on how to initialize payment accounts for a requestor node." - ) - except KeyboardInterrupt: - print( - "Shutting down gracefully, please wait a short while " - "or press Ctrl+C to exit immediately..." - ) - task.cancel() - try: - loop.run_until_complete(task) - print( - f"Shutdown completed, thank you for waiting!" - ) - except (asyncio.CancelledError, KeyboardInterrupt): - pass + task = loop.create_task(main()) + loop.run_until_complete(task) ``` From cdf69f9bb9c794c4d80ae46c641c9ee9d4a09500 Mon Sep 17 00:00:00 2001 From: azawlocki Date: Thu, 10 Jun 2021 11:22:53 +0200 Subject: [PATCH 2/5] Always cancel process_invoices_job when shutting down Golem (#452) --- yapapi/engine.py | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/yapapi/engine.py b/yapapi/engine.py index e296a0f90..9c7b8dda7 100644 --- a/yapapi/engine.py +++ b/yapapi/engine.py @@ -266,20 +266,34 @@ async def _shutdown(self, *exc_info): from yapapi.log import pluralize logger.info("Golem is shutting down...") + # Wait until all computations are finished await asyncio.gather(*[job.finished.wait() for job in self._jobs]) logger.info("All jobs have finished") self._payment_closing = True + # Cancel all services except the one that processes invoices for task in self._services: if task is not self._process_invoices_job: task.cancel() - if self._process_invoices_job and not any( - True for job in self._jobs if job.agreements_pool.confirmed > 0 - ): - logger.debug("No need to wait for invoices.") + # Wait for some time for invoices for unpaid agreements, + # then cancel the invoices service + if self._process_invoices_job: + + if self._agreements_to_pay: + logger.info( + "%s still unpaid, waiting for invoices...", + pluralize(len(self._agreements_to_pay), "agreement"), + ) + try: + await asyncio.wait_for(self._process_invoices_job, timeout=30) + except asyncio.TimeoutError: + logger.debug("process_invoices_job cancelled") + if self._agreements_to_pay: + logger.warning("Unpaid agreements: %s", self._agreements_to_pay) + self._process_invoices_job.cancel() try: @@ -292,20 +306,6 @@ async def _shutdown(self, *exc_info): except Exception: logger.debug("Got error when waiting for services to finish", exc_info=True) - if self._agreements_to_pay and self._process_invoices_job: - logger.info( - "%s still unpaid, waiting for invoices...", - pluralize(len(self._agreements_to_pay), "agreement"), - ) - try: - await asyncio.wait_for(self._process_invoices_job, timeout=30) - except asyncio.TimeoutError: - logger.debug("process_invoices_job cancelled") - if self._agreements_to_pay: - logger.warning("Unpaid agreements: %s", self._agreements_to_pay) - - await asyncio.gather(*[job.finished.wait() for job in self._jobs]) - async def __aexit__(self, exc_type, exc_val, exc_tb): await self._stack.aclose() From 6cb6be78a677219c9ce42e074af405d107868244 Mon Sep 17 00:00:00 2001 From: azawlocki Date: Thu, 10 Jun 2021 11:25:59 +0200 Subject: [PATCH 3/5] Update project version to 0.6.0-apha.1 (#453) --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 64be2989b..c6241aa61 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "yapapi" -version = "0.6.0-alpha.0" +version = "0.6.0-alpha.1" description = "High-level Python API for the New Golem" authors = ["Przemysław K. Rekucki ", "GolemFactory "] license = "LGPL-3.0-or-later" From d5c56344eafd1329a93e0fa468aaac27d8e01f2c Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Wed, 16 Jun 2021 15:16:01 +0200 Subject: [PATCH 4/5] version 0.6.0 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index c6241aa61..6d359a30b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "yapapi" -version = "0.6.0-alpha.1" +version = "0.6.0" description = "High-level Python API for the New Golem" authors = ["Przemysław K. Rekucki ", "GolemFactory "] license = "LGPL-3.0-or-later" From c26318ad1942d0ce8fbb9bc6f30089a838bb68c9 Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Wed, 16 Jun 2021 19:40:03 +0200 Subject: [PATCH 5/5] - unneeded constants in the simple-service-poc example (#462) --- examples/simple-service-poc/simple_service.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/examples/simple-service-poc/simple_service.py b/examples/simple-service-poc/simple_service.py index 9a7403950..0d07dd7e8 100644 --- a/examples/simple-service-poc/simple_service.py +++ b/examples/simple-service-poc/simple_service.py @@ -37,8 +37,6 @@ class SimpleService(Service): - STATS_PATH = "/golem/out/stats" - PLOT_INFO_PATH = "/golem/out/plot" SIMPLE_SERVICE = "/golem/run/simple_service.py" SIMPLE_SERVICE_CTL = "/golem/run/simulate_observations_ctl.py"