Skip to content

Commit

Permalink
Remove the standalone Executor (#635)
Browse files Browse the repository at this point in the history
Executor initialization requires now passing an _engine,
other interfaces were already deprecated for some time.

Also: some additional cleanup of the Executor class.
  • Loading branch information
johny-b authored Sep 8, 2021
1 parent 476fce2 commit e034735
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 381 deletions.
177 changes: 0 additions & 177 deletions examples/blender/blender-deprecated.py

This file was deleted.

19 changes: 11 additions & 8 deletions tests/goth_tests/test_agreement_termination/requestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from datetime import timedelta
import logging

from yapapi import Executor, Task, WorkContext
from yapapi import Golem, Task, WorkContext
from yapapi.log import enable_default_logger
from yapapi.payload import vm

Expand All @@ -20,7 +20,7 @@ async def main():
first_worker = True

async def worker(ctx: WorkContext, tasks):
"""A worker function for `Executor.submit()`.
"""A worker function for `Golem.execute_tasks()`.
The first call to this function will produce a worker
that sends an invalid `run` command to the provider.
Expand Down Expand Up @@ -48,18 +48,21 @@ async def worker(ctx: WorkContext, tasks):

task.accept_result()

async with Executor(
package=package,
max_workers=1,
async with Golem(
budget=10.0,
timeout=timedelta(minutes=6),
subnet_tag="goth",
driver="zksync",
network="rinkeby",
) as executor:
) as golem:

tasks = [Task(data=n) for n in range(6)]
async for task in executor.submit(worker, tasks):
async for task in golem.execute_tasks(
worker,
tasks,
package,
max_workers=1,
timeout=timedelta(minutes=6),
):
print(f"Task computed: {task}, time: {task.running_time}")

print("All tasks computed")
Expand Down
18 changes: 10 additions & 8 deletions tests/goth_tests/test_async_task_generation/requestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
from datetime import timedelta
import pathlib
import sys
from typing import AsyncGenerator

from yapapi import Executor, Task
from yapapi import Golem, Task
from yapapi.log import enable_default_logger, log_event_repr
from yapapi.package import vm

Expand All @@ -26,14 +25,11 @@ async def worker(work_ctx, tasks):
yield work_ctx.commit()
task.accept_result(result=task.data)

async with Executor(
async with Golem(
budget=10.0,
package=vm_package,
max_workers=1,
subnet_tag="goth",
timeout=timedelta(minutes=6),
event_consumer=log_event_repr,
) as executor:
) as golem:

# We use an async task generator that yields tasks removed from
# an async queue. Each computed task will potentially spawn
Expand All @@ -53,7 +49,13 @@ async def input_generator():
break
yield task

async for task in executor.submit(worker, input_generator()):
async for task in golem.execute_tasks(
worker,
input_generator(),
vm_package,
max_workers=1,
timeout=timedelta(minutes=6),
):
print("task result:", task.result, file=sys.stderr)
for n in range(task.result):
await task_queue.put(Task(data=task.result - 1))
Expand Down
18 changes: 10 additions & 8 deletions tests/goth_tests/test_multiactivity_agreement/requestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from datetime import timedelta
import logging

from yapapi import Executor, Task
from yapapi import Golem, Task
from yapapi.log import enable_default_logger, log_event_repr # noqa
from yapapi.package import vm

Expand All @@ -25,17 +25,19 @@ async def worker(work_ctx, tasks):
task.accept_result()
return

async with Executor(
async with Golem(
budget=10.0,
package=vm_package,
max_workers=1,
subnet_tag="goth",
timeout=timedelta(minutes=6),
event_consumer=log_event_repr,
) as executor:

) as golem:
tasks = [Task(data=n) for n in range(3)]
async for task in executor.submit(worker, tasks):
async for task in golem.execute_tasks(
worker,
tasks,
vm_package,
max_workers=1,
timeout=timedelta(minutes=6),
):
print(f"Task computed: {task}")


Expand Down
19 changes: 11 additions & 8 deletions tests/goth_tests/test_resubscription.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Test if subscription expiration is handled correctly by Executor"""
"""Test if subscription expiration is handled correctly by Golem"""
from datetime import timedelta
import logging
import os
Expand All @@ -19,7 +19,7 @@
from goth.runner.log import configure_logging
from goth.runner.probe import RequestorProbe

from yapapi import Executor, Task
from yapapi import Golem, Task
from yapapi.events import (
Event,
ComputationStarted,
Expand Down Expand Up @@ -172,16 +172,19 @@ async def worker(work_ctx, tasks):
yield work_ctx.commit()
task.accept_result()

async with Executor(
async with Golem(
budget=10.0,
package=vm_package,
max_workers=1,
timeout=timedelta(seconds=30),
event_consumer=monitor.add_event_sync,
) as executor:
) as golem:

task: Task # mypy needs this for some reason
async for task in executor.submit(worker, [Task(data=n) for n in range(20)]):
async for task in golem.execute_tasks(
worker,
[Task(data=n) for n in range(20)],
vm_package,
max_workers=1,
timeout=timedelta(seconds=30),
):
logger.info("Task %d computed", task.data)

await monitor.stop()
Expand Down
Loading

0 comments on commit e034735

Please sign in to comment.