Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blue/executor split #365

Merged
merged 40 commits into from
May 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
fe7cb1b
+ comment to "decrease-scores" strategy
shadeofblue May 5, 2021
6da6bff
splitting executor...
shadeofblue May 5, 2021
46479f3
Merge branch 'master' into blue/executor-split
shadeofblue May 13, 2021
cea1705
s/Engine/Golem/
shadeofblue May 13, 2021
dc8a2ac
Merge branch 'master' into blue/executor-split
shadeofblue May 13, 2021
0857796
Merge remote-tracking branch 'origin/master' into blue/executor-split
azawlocki May 14, 2021
a617f40
Add method Golem.create_demand_builder()
azawlocki May 17, 2021
91ad481
Make Job a toplevel class
azawlocki May 17, 2021
d240e14
Add Golem.create_activity(), add Activity._stream_events field
azawlocki May 17, 2021
9337107
move `agreements_pool` to Job
shadeofblue May 17, 2021
38a0148
Working blender example (without payments-related features)
azawlocki May 17, 2021
3315794
Merge branch 'blue/executor-split' of github.com:golemfactory/yapapi …
azawlocki May 17, 2021
feb956d
move payment methods to `Golem`
shadeofblue May 17, 2021
2dbf655
Merge branch 'blue/executor-split' of github.com:golemfactory/yapapi …
shadeofblue May 17, 2021
9c3e6af
move debit note and invoice processing to `Golem`
shadeofblue May 17, 2021
0794947
use `payload` instead of `package` when instantiating the Executor
shadeofblue May 17, 2021
79d4e89
Move `process_batches()` from `Executor` to `Golem`
azawlocki May 17, 2021
5eafa01
Make Executor backward compatible, so that original blender.py works
azawlocki May 18, 2021
d4ed1ef
Merge branch 'master' into blue/executor-split
shadeofblue May 18, 2021
907b97a
Merge branch 'blue/executor-split' of github.com:golemfactory/yapapi …
shadeofblue May 18, 2021
e131548
move `storage_manager` to Golem
shadeofblue May 18, 2021
de300fc
bug
shadeofblue May 18, 2021
2199cd4
Merge branch 'master' into blue/executor-split
shadeofblue May 18, 2021
e51ef68
rename the "old" `blender.py` to `blener-deprecated.py`
shadeofblue May 18, 2021
7429ed9
Golem.execute_task() -> Golem.execute_tasks() & other minor changes
azawlocki May 18, 2021
71181ce
Merge branch 'blue/executor-split' of github.com:golemfactory/yapapi …
azawlocki May 18, 2021
142d3a3
black
shadeofblue May 18, 2021
c53903c
Merge branch 'master' into blue/executor-split
shadeofblue May 18, 2021
9d8a9a3
move `execute_tasks` to the end of `Golem`
shadeofblue May 18, 2021
8c13ca0
Fixes in executor/__init__.py to pass unit tests
azawlocki May 18, 2021
5dac94b
Merge branch 'blue/executor-split' of github.com:golemfactory/yapapi …
azawlocki May 18, 2021
b0df49c
Changes in events/SummaryLogger to run yacat withour errors
azawlocki May 19, 2021
b1627de
Update yapapi/executor/__init__.py
shadeofblue May 20, 2021
99de163
fix tests
shadeofblue May 20, 2021
61a2896
fix script vs task events confusion
shadeofblue May 20, 2021
68da31a
fix test, black
shadeofblue May 20, 2021
be30dbf
Add `message` field to `CommandExecuted` event
azawlocki May 20, 2021
37e4c01
Adjust integration tests to changed summary logger messages
azawlocki May 20, 2021
1973569
Make `AsyncWrapper` an async context manager
azawlocki May 21, 2021
a2e5c2d
Address CR suggestions
azawlocki May 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 181 additions & 0 deletions examples/blender/blender-deprecated.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
#!/usr/bin/env python3
import asyncio
from datetime import datetime, timedelta
import pathlib
import sys

from yapapi import (
Executor,
NoPaymentAccountError,
Task,
__version__ as yapapi_version,
WorkContext,
windows_event_loop_fix,
)
from yapapi.log import enable_default_logger, log_summary, log_event_repr # noqa
from yapapi.payload import vm
from yapapi.rest.activity import BatchTimeoutError

examples_dir = pathlib.Path(__file__).resolve().parent.parent
sys.path.append(str(examples_dir))

from utils import (
build_parser,
TEXT_COLOR_CYAN,
TEXT_COLOR_DEFAULT,
TEXT_COLOR_RED,
TEXT_COLOR_YELLOW,
)


async def main(subnet_tag, driver=None, network=None):
package = await vm.repo(
image_hash="9a3b5d67b0b27746283cb5f287c13eab1beaa12d92a9f536b747c7ae",
min_mem_gib=0.5,
min_storage_gib=2.0,
)

async def worker(ctx: WorkContext, tasks):
script_dir = pathlib.Path(__file__).resolve().parent
scene_path = str(script_dir / "cubes.blend")
ctx.send_file(scene_path, "/golem/resource/scene.blend")
async for task in tasks:
frame = task.data
crops = [{"outfilebasename": "out", "borders_x": [0.0, 1.0], "borders_y": [0.0, 1.0]}]
ctx.send_json(
"/golem/work/params.json",
{
"scene_file": "/golem/resource/scene.blend",
"resolution": (400, 300),
"use_compositing": False,
"crops": crops,
"samples": 100,
"frames": [frame],
"output_format": "PNG",
"RESOURCES_DIR": "/golem/resources",
"WORK_DIR": "/golem/work",
"OUTPUT_DIR": "/golem/output",
},
)
ctx.run("/golem/entrypoints/run-blender.sh")
output_file = f"output_{frame}.png"
ctx.download_file(f"/golem/output/out{frame:04d}.png", output_file)
try:
# Set timeout for executing the script on the provider. Usually, 30 seconds
# should be more than enough for computing a single frame, however a provider
# may require more time for the first task if it needs to download a VM image
# first. Once downloaded, the VM image will be cached and other tasks that use
# that image will be computed faster.
yield ctx.commit(timeout=timedelta(minutes=10))
# TODO: Check if job results are valid
# and reject by: task.reject_task(reason = 'invalid file')
task.accept_result(result=output_file)
except BatchTimeoutError:
print(
f"{TEXT_COLOR_RED}"
f"Task {task} timed out on {ctx.provider_name}, time: {task.running_time}"
f"{TEXT_COLOR_DEFAULT}"
)
raise

# Iterator over the frame indices that we want to render
frames: range = range(0, 60, 10)
# Worst-case overhead, in minutes, for initialization (negotiation, file transfer etc.)
# TODO: make this dynamic, e.g. depending on the size of files to transfer
init_overhead = 3
# Providers will not accept work if the timeout is outside of the [5 min, 30min] range.
# We increase the lower bound to 6 min to account for the time needed for our demand to
# reach the providers.
min_timeout, max_timeout = 6, 30

timeout = timedelta(minutes=max(min(init_overhead + len(frames) * 2, max_timeout), min_timeout))

# By passing `event_consumer=log_summary()` we enable summary logging.
# See the documentation of the `yapapi.log` module on how to set
# the level of detail and format of the logged information.
async with Executor(
payload=package,
max_workers=3,
budget=10.0,
timeout=timeout,
subnet_tag=subnet_tag,
driver=driver,
network=network,
event_consumer=log_summary(log_event_repr),
) as executor:

print(
f"yapapi version: {TEXT_COLOR_YELLOW}{yapapi_version}{TEXT_COLOR_DEFAULT}\n"
f"Using subnet: {TEXT_COLOR_YELLOW}{subnet_tag}{TEXT_COLOR_DEFAULT}, "
f"payment driver: {TEXT_COLOR_YELLOW}{executor.driver}{TEXT_COLOR_DEFAULT}, "
f"and network: {TEXT_COLOR_YELLOW}{executor.network}{TEXT_COLOR_DEFAULT}\n"
)

num_tasks = 0
start_time = datetime.now()

async for task in executor.submit(worker, [Task(data=frame) for frame in frames]):
num_tasks += 1
print(
f"{TEXT_COLOR_CYAN}"
f"Task computed: {task}, result: {task.result}, time: {task.running_time}"
f"{TEXT_COLOR_DEFAULT}"
)

print(
f"{TEXT_COLOR_CYAN}"
f"{num_tasks} tasks computed, total time: {datetime.now() - start_time}"
f"{TEXT_COLOR_DEFAULT}"
)


if __name__ == "__main__":
parser = build_parser("Render a Blender scene")
now = datetime.now().strftime("%Y-%m-%d_%H.%M.%S")
parser.set_defaults(log_file=f"blender-yapapi-{now}.log")
args = parser.parse_args()

# This is only required when running on Windows with Python prior to 3.8:
windows_event_loop_fix()

enable_default_logger(
log_file=args.log_file,
debug_activity_api=True,
debug_market_api=True,
debug_payment_api=True,
)

loop = asyncio.get_event_loop()
task = loop.create_task(
main(subnet_tag=args.subnet_tag, driver=args.driver, network=args.network)
)

try:
loop.run_until_complete(task)
except NoPaymentAccountError as e:
handbook_url = (
"https://handbook.golem.network/requestor-tutorials/"
"flash-tutorial-of-requestor-development"
)
print(
f"{TEXT_COLOR_RED}"
f"No payment account initialized for driver `{e.required_driver}` "
f"and network `{e.required_network}`.\n\n"
f"See {handbook_url} on how to initialize payment accounts for a requestor node."
f"{TEXT_COLOR_DEFAULT}"
)
except KeyboardInterrupt:
print(
f"{TEXT_COLOR_YELLOW}"
"Shutting down gracefully, please wait a short while "
"or press Ctrl+C to exit immediately..."
f"{TEXT_COLOR_DEFAULT}"
)
task.cancel()
try:
loop.run_until_complete(task)
print(
f"{TEXT_COLOR_YELLOW}Shutdown completed, thank you for waiting!{TEXT_COLOR_DEFAULT}"
)
except (asyncio.CancelledError, KeyboardInterrupt):
pass
22 changes: 13 additions & 9 deletions examples/blender/blender.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import sys

from yapapi import (
Executor,
Golem,
NoPaymentAccountError,
Task,
__version__ as yapapi_version,
Expand Down Expand Up @@ -93,28 +93,32 @@ async def worker(ctx: WorkContext, tasks):
# By passing `event_consumer=log_summary()` we enable summary logging.
# See the documentation of the `yapapi.log` module on how to set
# the level of detail and format of the logged information.
async with Executor(
payload=package,
max_workers=3,
async with Golem(
budget=10.0,
timeout=timeout,
subnet_tag=subnet_tag,
driver=driver,
network=network,
event_consumer=log_summary(log_event_repr),
) as executor:
) as golem:

print(
f"yapapi version: {TEXT_COLOR_YELLOW}{yapapi_version}{TEXT_COLOR_DEFAULT}\n"
f"Using subnet: {TEXT_COLOR_YELLOW}{subnet_tag}{TEXT_COLOR_DEFAULT}, "
f"payment driver: {TEXT_COLOR_YELLOW}{executor.driver}{TEXT_COLOR_DEFAULT}, "
f"and network: {TEXT_COLOR_YELLOW}{executor.network}{TEXT_COLOR_DEFAULT}\n"
f"payment driver: {TEXT_COLOR_YELLOW}{golem.driver}{TEXT_COLOR_DEFAULT}, "
f"and network: {TEXT_COLOR_YELLOW}{golem.network}{TEXT_COLOR_DEFAULT}\n"
)

num_tasks = 0
start_time = datetime.now()

async for task in executor.submit(worker, [Task(data=frame) for frame in frames]):
completed_tasks = golem.execute_tasks(
worker,
[Task(data=frame) for frame in frames],
payload=package,
max_workers=3,
timeout=timeout,
)
async for task in completed_tasks:
num_tasks += 1
print(
f"{TEXT_COLOR_CYAN}"
Expand Down
129 changes: 75 additions & 54 deletions tests/executor/test_async_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,54 @@
from yapapi.executor.utils import AsyncWrapper


def test_keyboard_interrupt(event_loop):
def test_async_wrapper_ordering():
"""Test if AsyncWrapper preserves order of calls."""

input_ = list(range(10))
output = []

def func(n):
output.append(n)

async def main():
async with AsyncWrapper(func) as wrapper:
for n in input_:
wrapper.async_call(n)

asyncio.get_event_loop().run_until_complete(main())
assert output == input_


def test_keyboard_interrupt():
"""Test if AsyncWrapper handles KeyboardInterrupt by passing it to the event loop."""

def func(interrupt):
if interrupt:
raise KeyboardInterrupt

wrapper = AsyncWrapper(func, event_loop)

async def main():
for _ in range(100):
wrapper.async_call(False)
# This will raise KeyboardInterrupt in the wrapper's worker task
wrapper.async_call(True)
await asyncio.sleep(0.01)
async with AsyncWrapper(func) as wrapper:
for _ in range(100):
wrapper.async_call(False)
# This will raise KeyboardInterrupt in the wrapper's worker task
wrapper.async_call(True)
await asyncio.sleep(0.01)

task = event_loop.create_task(main())
loop = asyncio.get_event_loop()
task = loop.create_task(main())
with pytest.raises(KeyboardInterrupt):
event_loop.run_until_complete(task)
loop.run_until_complete(task)

# Make sure the main task did not get KeyboardInterrupt
assert not task.done()

# Make sure the wrapper can still make calls, it's worker task shouldn't exit
wrapper.async_call(False)
with pytest.raises(asyncio.CancelledError):
task.cancel()
loop.run_until_complete(task)


def test_stop_doesnt_deadlock(event_loop):
"""Test if the AsyncWrapper.stop() coroutine completes after an AsyncWrapper is interrupted.
def test_aexit_doesnt_deadlock():
"""Test if the AsyncWrapper.__aexit__() completes after an AsyncWrapper is interrupted.

See https://github.com/golemfactory/yapapi/issues/238.
"""
Expand All @@ -46,55 +65,57 @@ def func(interrupt):
async def main():
""""This coroutine mimics how an AsyncWrapper is used in an Executor."""

wrapper = AsyncWrapper(func, event_loop)
try:
# Queue some calls
for _ in range(10):
wrapper.async_call(False)
wrapper.async_call(True)
for _ in range(10):
wrapper.async_call(False)
# Sleep until cancelled
await asyncio.sleep(30)
assert False, "Sleep should be cancelled"
except asyncio.CancelledError:
# This call should exit without timeout
await asyncio.wait_for(wrapper.stop(), timeout=30.0)

task = event_loop.create_task(main())
async with AsyncWrapper(func) as wrapper:
try:
# Queue some calls
for _ in range(10):
wrapper.async_call(False)
wrapper.async_call(True)
for _ in range(10):
wrapper.async_call(False)
# Sleep until cancelled
await asyncio.sleep(30)
assert False, "Sleep should be cancelled"
except asyncio.CancelledError:
pass

loop = asyncio.get_event_loop()
task = loop.create_task(main())
try:
event_loop.run_until_complete(task)
loop.run_until_complete(task)
assert False, "Expected KeyboardInterrupt"
except KeyboardInterrupt:
task.cancel()
event_loop.run_until_complete(task)
loop.run_until_complete(task)


def test_stop_doesnt_wait(event_loop):
"""Test if the AsyncWrapper.stop() coroutine prevents new calls from be queued."""
def test_cancel_doesnt_wait():
"""Test if the AsyncWrapper stops processing calls when it's cancelled."""

def func():
time.sleep(0.1)
pass
num_calls = 0

wrapper = AsyncWrapper(func, event_loop)
def func(d):
print("Calling func()")
nonlocal num_calls
num_calls += 1
time.sleep(d)

async def main():
with pytest.raises(RuntimeError):
for n in range(100):
wrapper.async_call()
await asyncio.sleep(0.01)
# wrapper should be stopped before all calls are made
assert False, "Should raise RuntimeError"

async def stop():
await asyncio.sleep(0.1)
await wrapper.stop()
try:
async with AsyncWrapper(func) as wrapper:
for _ in range(10):
wrapper.async_call(0.1)
except asyncio.CancelledError:
pass

task = event_loop.create_task(main())
event_loop.create_task(stop())
try:
event_loop.run_until_complete(task)
except KeyboardInterrupt:
async def cancel():
await asyncio.sleep(0.05)
print("Cancelling!")
task.cancel()
event_loop.run_until_complete(task)

loop = asyncio.get_event_loop()
task = loop.create_task(main())
loop.create_task(cancel())
loop.run_until_complete(task)

assert num_calls < 10
Loading