Skip to content

Commit

Permalink
Merge pull request #365 from golemfactory/blue/executor-split
Browse files Browse the repository at this point in the history
Blue/executor split
  • Loading branch information
azawlocki authored May 21, 2021
2 parents 4214f34 + a2e5c2d commit b70ab8e
Show file tree
Hide file tree
Showing 16 changed files with 1,216 additions and 776 deletions.
181 changes: 181 additions & 0 deletions examples/blender/blender-deprecated.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
#!/usr/bin/env python3
import asyncio
from datetime import datetime, timedelta
import pathlib
import sys

from yapapi import (
Executor,
NoPaymentAccountError,
Task,
__version__ as yapapi_version,
WorkContext,
windows_event_loop_fix,
)
from yapapi.log import enable_default_logger, log_summary, log_event_repr # noqa
from yapapi.payload import vm
from yapapi.rest.activity import BatchTimeoutError

examples_dir = pathlib.Path(__file__).resolve().parent.parent
sys.path.append(str(examples_dir))

from utils import (
build_parser,
TEXT_COLOR_CYAN,
TEXT_COLOR_DEFAULT,
TEXT_COLOR_RED,
TEXT_COLOR_YELLOW,
)


async def main(subnet_tag, driver=None, network=None):
package = await vm.repo(
image_hash="9a3b5d67b0b27746283cb5f287c13eab1beaa12d92a9f536b747c7ae",
min_mem_gib=0.5,
min_storage_gib=2.0,
)

async def worker(ctx: WorkContext, tasks):
script_dir = pathlib.Path(__file__).resolve().parent
scene_path = str(script_dir / "cubes.blend")
ctx.send_file(scene_path, "/golem/resource/scene.blend")
async for task in tasks:
frame = task.data
crops = [{"outfilebasename": "out", "borders_x": [0.0, 1.0], "borders_y": [0.0, 1.0]}]
ctx.send_json(
"/golem/work/params.json",
{
"scene_file": "/golem/resource/scene.blend",
"resolution": (400, 300),
"use_compositing": False,
"crops": crops,
"samples": 100,
"frames": [frame],
"output_format": "PNG",
"RESOURCES_DIR": "/golem/resources",
"WORK_DIR": "/golem/work",
"OUTPUT_DIR": "/golem/output",
},
)
ctx.run("/golem/entrypoints/run-blender.sh")
output_file = f"output_{frame}.png"
ctx.download_file(f"/golem/output/out{frame:04d}.png", output_file)
try:
# Set timeout for executing the script on the provider. Usually, 30 seconds
# should be more than enough for computing a single frame, however a provider
# may require more time for the first task if it needs to download a VM image
# first. Once downloaded, the VM image will be cached and other tasks that use
# that image will be computed faster.
yield ctx.commit(timeout=timedelta(minutes=10))
# TODO: Check if job results are valid
# and reject by: task.reject_task(reason = 'invalid file')
task.accept_result(result=output_file)
except BatchTimeoutError:
print(
f"{TEXT_COLOR_RED}"
f"Task {task} timed out on {ctx.provider_name}, time: {task.running_time}"
f"{TEXT_COLOR_DEFAULT}"
)
raise

# Iterator over the frame indices that we want to render
frames: range = range(0, 60, 10)
# Worst-case overhead, in minutes, for initialization (negotiation, file transfer etc.)
# TODO: make this dynamic, e.g. depending on the size of files to transfer
init_overhead = 3
# Providers will not accept work if the timeout is outside of the [5 min, 30min] range.
# We increase the lower bound to 6 min to account for the time needed for our demand to
# reach the providers.
min_timeout, max_timeout = 6, 30

timeout = timedelta(minutes=max(min(init_overhead + len(frames) * 2, max_timeout), min_timeout))

# By passing `event_consumer=log_summary()` we enable summary logging.
# See the documentation of the `yapapi.log` module on how to set
# the level of detail and format of the logged information.
async with Executor(
payload=package,
max_workers=3,
budget=10.0,
timeout=timeout,
subnet_tag=subnet_tag,
driver=driver,
network=network,
event_consumer=log_summary(log_event_repr),
) as executor:

print(
f"yapapi version: {TEXT_COLOR_YELLOW}{yapapi_version}{TEXT_COLOR_DEFAULT}\n"
f"Using subnet: {TEXT_COLOR_YELLOW}{subnet_tag}{TEXT_COLOR_DEFAULT}, "
f"payment driver: {TEXT_COLOR_YELLOW}{executor.driver}{TEXT_COLOR_DEFAULT}, "
f"and network: {TEXT_COLOR_YELLOW}{executor.network}{TEXT_COLOR_DEFAULT}\n"
)

num_tasks = 0
start_time = datetime.now()

async for task in executor.submit(worker, [Task(data=frame) for frame in frames]):
num_tasks += 1
print(
f"{TEXT_COLOR_CYAN}"
f"Task computed: {task}, result: {task.result}, time: {task.running_time}"
f"{TEXT_COLOR_DEFAULT}"
)

print(
f"{TEXT_COLOR_CYAN}"
f"{num_tasks} tasks computed, total time: {datetime.now() - start_time}"
f"{TEXT_COLOR_DEFAULT}"
)


if __name__ == "__main__":
parser = build_parser("Render a Blender scene")
now = datetime.now().strftime("%Y-%m-%d_%H.%M.%S")
parser.set_defaults(log_file=f"blender-yapapi-{now}.log")
args = parser.parse_args()

# This is only required when running on Windows with Python prior to 3.8:
windows_event_loop_fix()

enable_default_logger(
log_file=args.log_file,
debug_activity_api=True,
debug_market_api=True,
debug_payment_api=True,
)

loop = asyncio.get_event_loop()
task = loop.create_task(
main(subnet_tag=args.subnet_tag, driver=args.driver, network=args.network)
)

try:
loop.run_until_complete(task)
except NoPaymentAccountError as e:
handbook_url = (
"https://handbook.golem.network/requestor-tutorials/"
"flash-tutorial-of-requestor-development"
)
print(
f"{TEXT_COLOR_RED}"
f"No payment account initialized for driver `{e.required_driver}` "
f"and network `{e.required_network}`.\n\n"
f"See {handbook_url} on how to initialize payment accounts for a requestor node."
f"{TEXT_COLOR_DEFAULT}"
)
except KeyboardInterrupt:
print(
f"{TEXT_COLOR_YELLOW}"
"Shutting down gracefully, please wait a short while "
"or press Ctrl+C to exit immediately..."
f"{TEXT_COLOR_DEFAULT}"
)
task.cancel()
try:
loop.run_until_complete(task)
print(
f"{TEXT_COLOR_YELLOW}Shutdown completed, thank you for waiting!{TEXT_COLOR_DEFAULT}"
)
except (asyncio.CancelledError, KeyboardInterrupt):
pass
22 changes: 13 additions & 9 deletions examples/blender/blender.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import sys

from yapapi import (
Executor,
Golem,
NoPaymentAccountError,
Task,
__version__ as yapapi_version,
Expand Down Expand Up @@ -93,28 +93,32 @@ async def worker(ctx: WorkContext, tasks):
# By passing `event_consumer=log_summary()` we enable summary logging.
# See the documentation of the `yapapi.log` module on how to set
# the level of detail and format of the logged information.
async with Executor(
payload=package,
max_workers=3,
async with Golem(
budget=10.0,
timeout=timeout,
subnet_tag=subnet_tag,
driver=driver,
network=network,
event_consumer=log_summary(log_event_repr),
) as executor:
) as golem:

print(
f"yapapi version: {TEXT_COLOR_YELLOW}{yapapi_version}{TEXT_COLOR_DEFAULT}\n"
f"Using subnet: {TEXT_COLOR_YELLOW}{subnet_tag}{TEXT_COLOR_DEFAULT}, "
f"payment driver: {TEXT_COLOR_YELLOW}{executor.driver}{TEXT_COLOR_DEFAULT}, "
f"and network: {TEXT_COLOR_YELLOW}{executor.network}{TEXT_COLOR_DEFAULT}\n"
f"payment driver: {TEXT_COLOR_YELLOW}{golem.driver}{TEXT_COLOR_DEFAULT}, "
f"and network: {TEXT_COLOR_YELLOW}{golem.network}{TEXT_COLOR_DEFAULT}\n"
)

num_tasks = 0
start_time = datetime.now()

async for task in executor.submit(worker, [Task(data=frame) for frame in frames]):
completed_tasks = golem.execute_tasks(
worker,
[Task(data=frame) for frame in frames],
payload=package,
max_workers=3,
timeout=timeout,
)
async for task in completed_tasks:
num_tasks += 1
print(
f"{TEXT_COLOR_CYAN}"
Expand Down
129 changes: 75 additions & 54 deletions tests/executor/test_async_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,54 @@
from yapapi.executor.utils import AsyncWrapper


def test_keyboard_interrupt(event_loop):
def test_async_wrapper_ordering():
"""Test if AsyncWrapper preserves order of calls."""

input_ = list(range(10))
output = []

def func(n):
output.append(n)

async def main():
async with AsyncWrapper(func) as wrapper:
for n in input_:
wrapper.async_call(n)

asyncio.get_event_loop().run_until_complete(main())
assert output == input_


def test_keyboard_interrupt():
"""Test if AsyncWrapper handles KeyboardInterrupt by passing it to the event loop."""

def func(interrupt):
if interrupt:
raise KeyboardInterrupt

wrapper = AsyncWrapper(func, event_loop)

async def main():
for _ in range(100):
wrapper.async_call(False)
# This will raise KeyboardInterrupt in the wrapper's worker task
wrapper.async_call(True)
await asyncio.sleep(0.01)
async with AsyncWrapper(func) as wrapper:
for _ in range(100):
wrapper.async_call(False)
# This will raise KeyboardInterrupt in the wrapper's worker task
wrapper.async_call(True)
await asyncio.sleep(0.01)

task = event_loop.create_task(main())
loop = asyncio.get_event_loop()
task = loop.create_task(main())
with pytest.raises(KeyboardInterrupt):
event_loop.run_until_complete(task)
loop.run_until_complete(task)

# Make sure the main task did not get KeyboardInterrupt
assert not task.done()

# Make sure the wrapper can still make calls, it's worker task shouldn't exit
wrapper.async_call(False)
with pytest.raises(asyncio.CancelledError):
task.cancel()
loop.run_until_complete(task)


def test_stop_doesnt_deadlock(event_loop):
"""Test if the AsyncWrapper.stop() coroutine completes after an AsyncWrapper is interrupted.
def test_aexit_doesnt_deadlock():
"""Test if the AsyncWrapper.__aexit__() completes after an AsyncWrapper is interrupted.
See https://github.com/golemfactory/yapapi/issues/238.
"""
Expand All @@ -46,55 +65,57 @@ def func(interrupt):
async def main():
""""This coroutine mimics how an AsyncWrapper is used in an Executor."""

wrapper = AsyncWrapper(func, event_loop)
try:
# Queue some calls
for _ in range(10):
wrapper.async_call(False)
wrapper.async_call(True)
for _ in range(10):
wrapper.async_call(False)
# Sleep until cancelled
await asyncio.sleep(30)
assert False, "Sleep should be cancelled"
except asyncio.CancelledError:
# This call should exit without timeout
await asyncio.wait_for(wrapper.stop(), timeout=30.0)

task = event_loop.create_task(main())
async with AsyncWrapper(func) as wrapper:
try:
# Queue some calls
for _ in range(10):
wrapper.async_call(False)
wrapper.async_call(True)
for _ in range(10):
wrapper.async_call(False)
# Sleep until cancelled
await asyncio.sleep(30)
assert False, "Sleep should be cancelled"
except asyncio.CancelledError:
pass

loop = asyncio.get_event_loop()
task = loop.create_task(main())
try:
event_loop.run_until_complete(task)
loop.run_until_complete(task)
assert False, "Expected KeyboardInterrupt"
except KeyboardInterrupt:
task.cancel()
event_loop.run_until_complete(task)
loop.run_until_complete(task)


def test_stop_doesnt_wait(event_loop):
"""Test if the AsyncWrapper.stop() coroutine prevents new calls from be queued."""
def test_cancel_doesnt_wait():
"""Test if the AsyncWrapper stops processing calls when it's cancelled."""

def func():
time.sleep(0.1)
pass
num_calls = 0

wrapper = AsyncWrapper(func, event_loop)
def func(d):
print("Calling func()")
nonlocal num_calls
num_calls += 1
time.sleep(d)

async def main():
with pytest.raises(RuntimeError):
for n in range(100):
wrapper.async_call()
await asyncio.sleep(0.01)
# wrapper should be stopped before all calls are made
assert False, "Should raise RuntimeError"

async def stop():
await asyncio.sleep(0.1)
await wrapper.stop()
try:
async with AsyncWrapper(func) as wrapper:
for _ in range(10):
wrapper.async_call(0.1)
except asyncio.CancelledError:
pass

task = event_loop.create_task(main())
event_loop.create_task(stop())
try:
event_loop.run_until_complete(task)
except KeyboardInterrupt:
async def cancel():
await asyncio.sleep(0.05)
print("Cancelling!")
task.cancel()
event_loop.run_until_complete(task)

loop = asyncio.get_event_loop()
task = loop.create_task(main())
loop.create_task(cancel())
loop.run_until_complete(task)

assert num_calls < 10
Loading

0 comments on commit b70ab8e

Please sign in to comment.