Skip to content

Commit

Permalink
Merge pull request #450 from golemfactory/b0.6
Browse files Browse the repository at this point in the history
B0.6
  • Loading branch information
shadeofblue authored Jun 10, 2021
2 parents 0ee6bb8 + 017ba49 commit 7fa69ee
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 9 deletions.
99 changes: 99 additions & 0 deletions yapapi/golem.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,37 @@


class Golem(_Engine):
"""The main entrypoint of Golem's high-level API.
Provides two methods that reflect the two modes of operation, or two types of jobs
that can currently be executed on the Golem network.
The first one - `execute_tasks` - instructs `Golem` to take a sequence of tasks
that the user wishes to compute on Golem and distributes those among the providers.
The second one - `run_service` - instructs `Golem` to spawn a certain number of instances
of a service based on a single service specification (a specialized implementation
inheriting from `yapapi.Service`).
While the two models are not necessarily completely disjoint - in that we can create a
service that exists to process a certain number of computations and, similarly, we can
use the task model to run some service - the main difference lies in the lifetime of
such a job.
Whereas a task-based job exists for the purpose of computing the specific
sequence of tasks and is done once the whole sequence has been processed, the
service-based job is created for a potentially indefinite period and the services
spawned within it are kept alive for as long as they're needed.
Additionally, the service interface provides a way to easily define handlers for
certain, discrete phases of a lifetime of each service instance - startup, running and
shutdown.
As `Golem`'s job includes tracking and executing payments for activities spawned by
either mode of operation, it's usually good to have just one instance of `Golem` active
at any given time.
"""

async def execute_tasks(
self,
worker: Callable[
Expand Down Expand Up @@ -56,6 +87,26 @@ async def execute_tasks(
:param timeout: timeout for computing all tasks, passed to the `Executor` instance
:param budget: budget for computing all tasks, passed to the `Executor` instance
:return: an iterator that yields completed `Task` objects
example usage:
```python
async def worker(context: WorkContext, tasks: AsyncIterable[Task]):
async for task in tasks:
context.run("/bin/sh", "-c", "date")
future_results = yield context.commit()
results = await future_results
task.accept_result(result=results[-1])
package = await vm.repo(
image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376",
)
async with Golem(budget=1.0, subnet_tag="devnet-beta.2") as golem:
async for completed in golem.execute_tasks(worker, [Task(data=None)], payload=package):
print(completed.result.stdout)
```
"""

kwargs: Dict[str, Any] = {"payload": payload}
Expand Down Expand Up @@ -85,6 +136,54 @@ async def run_service(
payload specified by the `get_payload()` method of `service_class` is used
:param expiration: optional expiration datetime for the service
:return: a `Cluster` of service instances
example usage:
```python
DATE_OUTPUT_PATH = "/golem/work/date.txt"
REFRESH_INTERVAL_SEC = 5
class DateService(Service):
@staticmethod
async def get_payload():
return await vm.repo(
image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376",
)
async def start(self):
# every `DATE_POLL_INTERVAL` write output of `date` to `DATE_OUTPUT_PATH`
self._ctx.run(
"/bin/sh",
"-c",
f"while true; do date > {DATE_OUTPUT_PATH}; sleep {REFRESH_INTERVAL_SEC}; done &",
)
yield self._ctx.commit()
async def run(self):
while True:
await asyncio.sleep(REFRESH_INTERVAL_SEC)
self._ctx.run(
"/bin/sh",
"-c",
f"cat {DATE_OUTPUT_PATH}",
)
future_results = yield self._ctx.commit()
results = await future_results
print(results[0].stdout.strip())
async def main():
async with Golem(budget=1.0, subnet_tag="devnet-beta.2") as golem:
cluster = await golem.run_service(DateService, num_instances=1)
start_time = datetime.now()
while datetime.now() < start_time + timedelta(minutes=1):
for num, instance in enumerate(cluster.instances):
print(f"Instance {num} is {instance.state.value} on {instance.provider_name}")
await asyncio.sleep(REFRESH_INTERVAL_SEC)
```
"""

from .services import Cluster # avoid circular dependency
Expand Down
20 changes: 11 additions & 9 deletions yapapi/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,8 @@ async def start(self):
async def run(self):
"""Implement the `running` state of the service."""

while True:
await asyncio.sleep(10)
yield
await asyncio.Future()
yield

async def shutdown(self):
"""Implement the `stopping` state of the service."""
Expand Down Expand Up @@ -402,12 +401,15 @@ async def _run_instance(self, ctx: WorkContext):

logger.debug("No handler for %s in state %s", instance.service, instance.state.value)

if batch_task:
batch_task.cancel()
await batch_task
if signal_task:
signal_task.cancel()
await signal_task
try:
if batch_task:
batch_task.cancel()
await batch_task
if signal_task:
signal_task.cancel()
await signal_task
except asyncio.CancelledError:
pass

logger.info("%s decomissioned", instance.service)

Expand Down

0 comments on commit 7fa69ee

Please sign in to comment.