From 5cbd461b43525acfc1c3c703e53163a838637274 Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Wed, 9 Jun 2021 17:13:28 +0200 Subject: [PATCH 1/3] the docstring for `Golem` (#447) * the docstring for `Golem` * Apply suggestions from code review * resolve the ambiguity that @zakaprov mentioned, address a few more remarks by @zakaprov and @azawlocki --- yapapi/golem.py | 99 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) diff --git a/yapapi/golem.py b/yapapi/golem.py index a566e3f84..2f96e1df6 100644 --- a/yapapi/golem.py +++ b/yapapi/golem.py @@ -28,6 +28,37 @@ class Golem(_Engine): + """The main entrypoint of Golem's high-level API. + + Provides two methods that reflect the two modes of operation, or two types of jobs + that can currently be executed on the Golem network. + + The first one - `execute_tasks` - instructs `Golem` to take a sequence of tasks + that the user wishes to compute on Golem and distributes those among the providers. + + The second one - `run_service` - instructs `Golem` to spawn a certain number of instances + of a service based on a single service specification (a specialized implementation + inheriting from `yapapi.Service`). + + While the two models are not necessarily completely disjoint - in that we can create a + service that exists to process a certain number of computations and, similarly, we can + use the task model to run some service - the main difference lies in the lifetime of + such a job. + + Whereas a task-based job exists for the purpose of computing the specific + sequence of tasks and is done once the whole sequence has been processed, the + service-based job is created for a potentially indefinite period and the services + spawned within it are kept alive for as long as they're needed. + + Additionally, the service interface provides a way to easily define handlers for + certain, discrete phases of a lifetime of each service instance - startup, running and + shutdown. + + As `Golem`'s job includes tracking and executing payments for activities spawned by + either mode of operation, it's usually good to have just one instance of `Golem` active + at any given time. + """ + async def execute_tasks( self, worker: Callable[ @@ -56,6 +87,26 @@ async def execute_tasks( :param timeout: timeout for computing all tasks, passed to the `Executor` instance :param budget: budget for computing all tasks, passed to the `Executor` instance :return: an iterator that yields completed `Task` objects + + example usage: + + ```python + async def worker(context: WorkContext, tasks: AsyncIterable[Task]): + async for task in tasks: + context.run("/bin/sh", "-c", "date") + + future_results = yield context.commit() + results = await future_results + task.accept_result(result=results[-1]) + + package = await vm.repo( + image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376", + ) + + async with Golem(budget=1.0, subnet_tag="devnet-beta.2") as golem: + async for completed in golem.execute_tasks(worker, [Task(data=None)], payload=package): + print(completed.result.stdout) + ``` """ kwargs: Dict[str, Any] = {"payload": payload} @@ -85,6 +136,54 @@ async def run_service( payload specified by the `get_payload()` method of `service_class` is used :param expiration: optional expiration datetime for the service :return: a `Cluster` of service instances + + example usage: + + ```python + DATE_OUTPUT_PATH = "/golem/work/date.txt" + REFRESH_INTERVAL_SEC = 5 + + + class DateService(Service): + @staticmethod + async def get_payload(): + return await vm.repo( + image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376", + ) + + async def start(self): + # every `DATE_POLL_INTERVAL` write output of `date` to `DATE_OUTPUT_PATH` + self._ctx.run( + "/bin/sh", + "-c", + f"while true; do date > {DATE_OUTPUT_PATH}; sleep {REFRESH_INTERVAL_SEC}; done &", + ) + yield self._ctx.commit() + + async def run(self): + while True: + await asyncio.sleep(REFRESH_INTERVAL_SEC) + self._ctx.run( + "/bin/sh", + "-c", + f"cat {DATE_OUTPUT_PATH}", + ) + + future_results = yield self._ctx.commit() + results = await future_results + print(results[0].stdout.strip()) + + + async def main(): + async with Golem(budget=1.0, subnet_tag="devnet-beta.2") as golem: + cluster = await golem.run_service(DateService, num_instances=1) + start_time = datetime.now() + + while datetime.now() < start_time + timedelta(minutes=1): + for num, instance in enumerate(cluster.instances): + print(f"Instance {num} is {instance.state.value} on {instance.provider_name}") + await asyncio.sleep(REFRESH_INTERVAL_SEC) + ``` """ from .services import Cluster # avoid circular dependency From 4b2844e9250a6349caf6b9cca81a4da7c5c6da87 Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Wed, 9 Jun 2021 19:48:22 +0200 Subject: [PATCH 2/3] silence `asyncio.CancelledError` when awaiting the just-cancelled task (#449) --- yapapi/services.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/yapapi/services.py b/yapapi/services.py index 3440ca631..208a74cc6 100644 --- a/yapapi/services.py +++ b/yapapi/services.py @@ -402,12 +402,15 @@ async def _run_instance(self, ctx: WorkContext): logger.debug("No handler for %s in state %s", instance.service, instance.state.value) - if batch_task: - batch_task.cancel() - await batch_task - if signal_task: - signal_task.cancel() - await signal_task + try: + if batch_task: + batch_task.cancel() + await batch_task + if signal_task: + signal_task.cancel() + await signal_task + except asyncio.CancelledError: + pass logger.info("%s decomissioned", instance.service) From 017ba49a198a39d92f4fa43ba2c033d57a4602ed Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Wed, 9 Jun 2021 22:48:51 +0200 Subject: [PATCH 3/3] make the default `Service.run()` await indefinitely instead of yielding `None` in a loop (#448) --- yapapi/services.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/yapapi/services.py b/yapapi/services.py index 208a74cc6..97c314a2c 100644 --- a/yapapi/services.py +++ b/yapapi/services.py @@ -162,9 +162,8 @@ async def start(self): async def run(self): """Implement the `running` state of the service.""" - while True: - await asyncio.sleep(10) - yield + await asyncio.Future() + yield async def shutdown(self): """Implement the `stopping` state of the service."""