Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

B0.6 #450

Merged
merged 3 commits into from
Jun 10, 2021
Merged

B0.6 #450

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions yapapi/golem.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,37 @@


class Golem(_Engine):
"""The main entrypoint of Golem's high-level API.

Provides two methods that reflect the two modes of operation, or two types of jobs
that can currently be executed on the Golem network.

The first one - `execute_tasks` - instructs `Golem` to take a sequence of tasks
that the user wishes to compute on Golem and distributes those among the providers.

The second one - `run_service` - instructs `Golem` to spawn a certain number of instances
of a service based on a single service specification (a specialized implementation
inheriting from `yapapi.Service`).

While the two models are not necessarily completely disjoint - in that we can create a
service that exists to process a certain number of computations and, similarly, we can
use the task model to run some service - the main difference lies in the lifetime of
such a job.

Whereas a task-based job exists for the purpose of computing the specific
sequence of tasks and is done once the whole sequence has been processed, the
service-based job is created for a potentially indefinite period and the services
spawned within it are kept alive for as long as they're needed.

Additionally, the service interface provides a way to easily define handlers for
certain, discrete phases of a lifetime of each service instance - startup, running and
shutdown.

As `Golem`'s job includes tracking and executing payments for activities spawned by
either mode of operation, it's usually good to have just one instance of `Golem` active
at any given time.
"""

async def execute_tasks(
self,
worker: Callable[
Expand Down Expand Up @@ -56,6 +87,26 @@ async def execute_tasks(
:param timeout: timeout for computing all tasks, passed to the `Executor` instance
:param budget: budget for computing all tasks, passed to the `Executor` instance
:return: an iterator that yields completed `Task` objects

example usage:

```python
async def worker(context: WorkContext, tasks: AsyncIterable[Task]):
async for task in tasks:
context.run("/bin/sh", "-c", "date")

future_results = yield context.commit()
results = await future_results
task.accept_result(result=results[-1])

package = await vm.repo(
image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376",
)

async with Golem(budget=1.0, subnet_tag="devnet-beta.2") as golem:
async for completed in golem.execute_tasks(worker, [Task(data=None)], payload=package):
print(completed.result.stdout)
```
"""

kwargs: Dict[str, Any] = {"payload": payload}
Expand Down Expand Up @@ -85,6 +136,54 @@ async def run_service(
payload specified by the `get_payload()` method of `service_class` is used
:param expiration: optional expiration datetime for the service
:return: a `Cluster` of service instances

example usage:

```python
DATE_OUTPUT_PATH = "/golem/work/date.txt"
REFRESH_INTERVAL_SEC = 5


class DateService(Service):
@staticmethod
async def get_payload():
return await vm.repo(
image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376",
)

async def start(self):
# every `DATE_POLL_INTERVAL` write output of `date` to `DATE_OUTPUT_PATH`
self._ctx.run(
"/bin/sh",
"-c",
f"while true; do date > {DATE_OUTPUT_PATH}; sleep {REFRESH_INTERVAL_SEC}; done &",
)
yield self._ctx.commit()

async def run(self):
while True:
await asyncio.sleep(REFRESH_INTERVAL_SEC)
self._ctx.run(
"/bin/sh",
"-c",
f"cat {DATE_OUTPUT_PATH}",
)

future_results = yield self._ctx.commit()
results = await future_results
print(results[0].stdout.strip())


async def main():
async with Golem(budget=1.0, subnet_tag="devnet-beta.2") as golem:
cluster = await golem.run_service(DateService, num_instances=1)
start_time = datetime.now()

while datetime.now() < start_time + timedelta(minutes=1):
for num, instance in enumerate(cluster.instances):
print(f"Instance {num} is {instance.state.value} on {instance.provider_name}")
await asyncio.sleep(REFRESH_INTERVAL_SEC)
```
"""

from .services import Cluster # avoid circular dependency
Expand Down
20 changes: 11 additions & 9 deletions yapapi/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,8 @@ async def start(self):
async def run(self):
"""Implement the `running` state of the service."""

while True:
await asyncio.sleep(10)
yield
await asyncio.Future()
yield

async def shutdown(self):
"""Implement the `stopping` state of the service."""
Expand Down Expand Up @@ -402,12 +401,15 @@ async def _run_instance(self, ctx: WorkContext):

logger.debug("No handler for %s in state %s", instance.service, instance.state.value)

if batch_task:
batch_task.cancel()
await batch_task
if signal_task:
signal_task.cancel()
await signal_task
try:
if batch_task:
batch_task.cancel()
await batch_task
if signal_task:
signal_task.cancel()
await signal_task
except asyncio.CancelledError:
pass

logger.info("%s decomissioned", instance.service)

Expand Down