Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust README.md to changes in the API #451

Merged
merged 2 commits into from
Jun 10, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 75 additions & 152 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,193 +39,116 @@ do that in the [yagna repository](https://github.com/golemfactory/yagna) and in
read files) with it. This image needs to be packed and uploaded into Golem's image repository
using our dedicated tool - [`gvmkit-build`](https://pypi.org/project/gvmkit-build/).
* **create your requestor agent** - this is where `yapapi` comes in. Utilizing our high-level
API, the creation of a requestor agent should be straighforward and require minimal effort.
You can use examples contained in this repository (blender and hashcat) as references.
API, the creation of a requestor agent should be straightforward and require minimal effort.
You can use examples contained in this repository as references, the directory
[`examples/hello-world/`](https://github.com/golemfactory/yapapi/tree/master/examples/hello-world)
contains minimal examples of fully functional requestor agents and
is therefore the best place to start exploring.


### Components

There are a few components that are crucial for any requestor agent app:

#### Executor

The heart of the high-level API is the requestor's task executor (`yapapi.Executor`).
You tell it, among others, which package (VM image) will be used to run your task,
how much you'd like to pay and how many providers you'd like to involve in the execution.
Finally, you feed it the worker script and a list of `Task` objects to execute on providers.

#### Worker script

The `worker` will most likely be the very core of your requestor app. You need to define
this function in your agent code and then you pass it to the Executor.

It receives a `WorkContext` (`yapapi.WorkContext`) object that serves
#### Golem

The heart of the high-level API is the `Golem` class (`yapapi.Golem`), which serves as
the "engine" of a requestor agent. `Golem` is responsible for finding providers interested in
the jobs you want to execute, negotiating agreements with them and processing payments. It also
implements core functionality required for sending commands to providers that
agreed to run your tasks.
azawlocki marked this conversation as resolved.
Show resolved Hide resolved

`Golem` provides two entry points for executing jobs on the Golem network, corresponding to the
two basic modes of operation of a requestor agent:

* The method `execute_tasks` allows you to submit a _task-based_ job for execution. Arguments
to this method must include a sequence of independent _tasks_ (units of work) to be distributed
among providers, a _payload_ (a VM image) required to compute them, and a _worker_ function, which
will be used to convert each task to a sequence of steps to be executed on a provider. You may also
specify the timeout for the whole job, the maximum number of providers used at any given time,
and the maximum amount that you want to spend.

* The method `run_service` allows you, as you probably guessed, to run a _service_ on Golem.
Instead of a task-processing worker function, an argument to `run_service` is a
class (a subclass of `yapapi.Service`) that implements the behaviour of your service in various
stages of its lifecycle (when it's _starting_, _running_ etc.). Additionally, you may specify
the number of _service instances_ you want to run and the service expiration datetime.

Prior to version `0.6.0`, only task-based jobs could be executed. They are probably conceptually
simpler than services, and therefore in this `README` we focus on jobs of this type.
For more information on both types of jobs please refer to our [handbook](https://handbook.golem.network).
azawlocki marked this conversation as resolved.
Show resolved Hide resolved

#### Worker function

The worker will most likely be the very core of your task-based requestor app.
You need to define this function in your agent code and then you pass it (as the value of
the `worker` parameter) to the `execute_tasks` method of `Golem`.

The worker receives a _work context_ (`yapapi.WorkContext`) object that serves
as an interface between your script and the execution unit within the provider.
Using the work context, you define the steps that the provider needs to execute in order
to complete the job you're giving them - e.g. transferring files to and from the provider
or running commands within the execution unit on the provider's end.

Depending on the number of workers, and thus, the maximum number of providers that your
Executor utilizes in parallel, a single worker may tackle several tasks
(units of your work) and you can differentiate the steps that need to happen once
Depending on the number of workers, and thus, the maximum number of providers that
`execute_tasks` utilizes in parallel, a single worker may tackle several tasks
and you can differentiate the steps that need to happen once
per worker run, which usually means once per provider node - but that depends on the
exact implementation of your worker function - from those that happen for each
individual unit of work. An example of the former would be an upload of a source
file that's common to each fragment; and of the latter - a step that triggers the
processing of the file using a set of parameters specified in the `Task` data.
task. An example of the former would be an upload of a source
file that's common to each task; and of the latter - a step that triggers the
processing of the file using a set of parameters specified for a particular task.

#### Task

The `Task` (`yapapi.Task`) object describes a unit of work that your application needs
The _task_ (`yapapi.Task`) object describes a unit of work that your application needs
to carry out.

The Executor will feed an instance of your worker - bound to a single provider node -
A `Golem` object will feed an instance of your worker - bound to a single provider node -
azawlocki marked this conversation as resolved.
Show resolved Hide resolved
with `Task` objects. The worker will be responsible for completing those tasks. Typically,
it will turn each task into a sequence of steps to be executed in a single run
of the execution script on a provider's machine, in order to compute the task's result.


### Example

An example Golem application, using a Docker image containing the Blender renderer:

An example task-based Golem application, using a Docker image containing Alpine Linux installation
azawlocki marked this conversation as resolved.
Show resolved Hide resolved
(Python file with the example and the Dockerfile for the image reside in
[`examples/hello-world/`](https://github.com/golemfactory/yapapi/tree/master/examples/hello-world)):
```python
import asyncio
from datetime import datetime, timedelta
import pathlib
import sys

from yapapi import (
Executor,
NoPaymentAccountError,
Task,
__version__ as yapapi_version,
WorkContext,
windows_event_loop_fix,
)
from yapapi.log import enable_default_logger, log_summary, log_event_repr # noqa
from typing import AsyncIterable

from yapapi import Golem, Task, WorkContext
from yapapi.log import enable_default_logger
from yapapi.payload import vm
from yapapi.rest.activity import BatchTimeoutError


async def main(subnet_tag, driver=None, network=None):
async def worker(context: WorkContext, tasks: AsyncIterable[Task]):
async for task in tasks:
context.run("/bin/sh", "-c", "date")

future_results = yield context.commit()
results = await future_results
task.accept_result(result=results[-1])


async def main():
package = await vm.repo(
image_hash="9a3b5d67b0b27746283cb5f287c13eab1beaa12d92a9f536b747c7ae",
min_mem_gib=0.5,
min_storage_gib=2.0,
image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376",
)

async def worker(ctx: WorkContext, tasks):
script_dir = pathlib.Path(__file__).resolve().parent
scene_path = str(script_dir / "cubes.blend")
ctx.send_file(scene_path, "/golem/resource/scene.blend")
async for task in tasks:
frame = task.data
crops = [{"outfilebasename": "out", "borders_x": [0.0, 1.0], "borders_y": [0.0, 1.0]}]
ctx.send_json(
"/golem/work/params.json",
{
"scene_file": "/golem/resource/scene.blend",
"resolution": (400, 300),
"use_compositing": False,
"crops": crops,
"samples": 100,
"frames": [frame],
"output_format": "PNG",
"RESOURCES_DIR": "/golem/resources",
"WORK_DIR": "/golem/work",
"OUTPUT_DIR": "/golem/output",
},
)
ctx.run("/golem/entrypoints/run-blender.sh")
output_file = f"output_{frame}.png"
ctx.download_file(f"/golem/output/out{frame:04d}.png", output_file)
try:
# Set timeout for executing the script on the provider. Two minutes is plenty
# of time for computing a single frame, for other tasks it may be not enough.
# If the timeout is exceeded, this worker instance will be shut down and all
# remaining tasks, including the current one, will be computed by other providers.
yield ctx.commit(timeout=timedelta(seconds=120))
# TODO: Check if job results are valid
# and reject by: task.reject_task(reason = 'invalid file')
task.accept_result(result=output_file)
except BatchTimeoutError:
print(
f"Task {task} timed out on {ctx.provider_name}, time: {task.running_time}"
)
raise

frames: range = range(0, 60, 10)
init_overhead = 3
min_timeout, max_timeout = 6, 30

timeout = timedelta(minutes=max(min(init_overhead + len(frames) * 2, max_timeout), min_timeout))

async with Executor(
package=package,
max_workers=3,
budget=10.0,
timeout=timeout,
subnet_tag=subnet_tag,
driver=driver,
network=network,
event_consumer=log_summary(),
) as executor:

sys.stderr.write(
f"yapapi version: {yapapi_version}\n"
f"Using subnet: {subnet_tag}, "
f"payment driver: {executor.driver}, "
f"and network: {executor.network}\n"
)

num_tasks = 0
start_time = datetime.now()

async for task in executor.submit(worker, [Task(data=frame) for frame in frames]):
num_tasks += 1
print(
f"Task computed: {task}, result: {task.result}, time: {task.running_time}"
)

print(
f"{num_tasks} tasks computed, total time: {datetime.now() - start_time}"
)
tasks = [Task(data=None)]

async with Golem(budget=1.0, subnet_tag="devnet-beta.2") as golem:
async for completed in golem.execute_tasks(worker, tasks, payload=package):
print(completed.result.stdout)

if __name__ == "__main__":
# This is only required when running on Windows with Python prior to 3.8:
windows_event_loop_fix()

enable_default_logger()
if __name__ == "__main__":
enable_default_logger(log_file="hello.log")

loop = asyncio.get_event_loop()
task = loop.create_task(
main(subnet_tag="devnet-beta.1", driver="zksync", network="rinkeby")
)

try:
loop.run_until_complete(task)
except NoPaymentAccountError as e:
handbook_url = (
"https://handbook.golem.network/requestor-tutorials/"
"flash-tutorial-of-requestor-development"
)
print(
f"No payment account initialized for driver `{e.required_driver}` "
f"and network `{e.required_network}`.\n\n"
f"See {handbook_url} on how to initialize payment accounts for a requestor node."
)
except KeyboardInterrupt:
print(
"Shutting down gracefully, please wait a short while "
"or press Ctrl+C to exit immediately..."
)
task.cancel()
try:
loop.run_until_complete(task)
print(
f"Shutdown completed, thank you for waiting!"
)
except (asyncio.CancelledError, KeyboardInterrupt):
pass
task = loop.create_task(main())
loop.run_until_complete(task)
```