Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blue/executor split #365

Merged
merged 40 commits into from
May 21, 2021
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
fe7cb1b
+ comment to "decrease-scores" strategy
shadeofblue May 5, 2021
6da6bff
splitting executor...
shadeofblue May 5, 2021
46479f3
Merge branch 'master' into blue/executor-split
shadeofblue May 13, 2021
cea1705
s/Engine/Golem/
shadeofblue May 13, 2021
dc8a2ac
Merge branch 'master' into blue/executor-split
shadeofblue May 13, 2021
0857796
Merge remote-tracking branch 'origin/master' into blue/executor-split
azawlocki May 14, 2021
a617f40
Add method Golem.create_demand_builder()
azawlocki May 17, 2021
91ad481
Make Job a toplevel class
azawlocki May 17, 2021
d240e14
Add Golem.create_activity(), add Activity._stream_events field
azawlocki May 17, 2021
9337107
move `agreements_pool` to Job
shadeofblue May 17, 2021
38a0148
Working blender example (without payments-related features)
azawlocki May 17, 2021
3315794
Merge branch 'blue/executor-split' of github.com:golemfactory/yapapi …
azawlocki May 17, 2021
feb956d
move payment methods to `Golem`
shadeofblue May 17, 2021
2dbf655
Merge branch 'blue/executor-split' of github.com:golemfactory/yapapi …
shadeofblue May 17, 2021
9c3e6af
move debit note and invoice processing to `Golem`
shadeofblue May 17, 2021
0794947
use `payload` instead of `package` when instantiating the Executor
shadeofblue May 17, 2021
79d4e89
Move `process_batches()` from `Executor` to `Golem`
azawlocki May 17, 2021
5eafa01
Make Executor backward compatible, so that original blender.py works
azawlocki May 18, 2021
d4ed1ef
Merge branch 'master' into blue/executor-split
shadeofblue May 18, 2021
907b97a
Merge branch 'blue/executor-split' of github.com:golemfactory/yapapi …
shadeofblue May 18, 2021
e131548
move `storage_manager` to Golem
shadeofblue May 18, 2021
de300fc
bug
shadeofblue May 18, 2021
2199cd4
Merge branch 'master' into blue/executor-split
shadeofblue May 18, 2021
e51ef68
rename the "old" `blender.py` to `blener-deprecated.py`
shadeofblue May 18, 2021
7429ed9
Golem.execute_task() -> Golem.execute_tasks() & other minor changes
azawlocki May 18, 2021
71181ce
Merge branch 'blue/executor-split' of github.com:golemfactory/yapapi …
azawlocki May 18, 2021
142d3a3
black
shadeofblue May 18, 2021
c53903c
Merge branch 'master' into blue/executor-split
shadeofblue May 18, 2021
9d8a9a3
move `execute_tasks` to the end of `Golem`
shadeofblue May 18, 2021
8c13ca0
Fixes in executor/__init__.py to pass unit tests
azawlocki May 18, 2021
5dac94b
Merge branch 'blue/executor-split' of github.com:golemfactory/yapapi …
azawlocki May 18, 2021
b0df49c
Changes in events/SummaryLogger to run yacat withour errors
azawlocki May 19, 2021
b1627de
Update yapapi/executor/__init__.py
shadeofblue May 20, 2021
99de163
fix tests
shadeofblue May 20, 2021
61a2896
fix script vs task events confusion
shadeofblue May 20, 2021
68da31a
fix test, black
shadeofblue May 20, 2021
be30dbf
Add `message` field to `CommandExecuted` event
azawlocki May 20, 2021
37e4c01
Adjust integration tests to changed summary logger messages
azawlocki May 20, 2021
1973569
Make `AsyncWrapper` an async context manager
azawlocki May 21, 2021
a2e5c2d
Address CR suggestions
azawlocki May 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 181 additions & 0 deletions examples/blender/blender-deprecated.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
#!/usr/bin/env python3
import asyncio
from datetime import datetime, timedelta
import pathlib
import sys

from yapapi import (
Executor,
NoPaymentAccountError,
Task,
__version__ as yapapi_version,
WorkContext,
windows_event_loop_fix,
)
from yapapi.log import enable_default_logger, log_summary, log_event_repr # noqa
from yapapi.payload import vm
from yapapi.rest.activity import BatchTimeoutError

examples_dir = pathlib.Path(__file__).resolve().parent.parent
sys.path.append(str(examples_dir))

from utils import (
build_parser,
TEXT_COLOR_CYAN,
TEXT_COLOR_DEFAULT,
TEXT_COLOR_RED,
TEXT_COLOR_YELLOW,
)


async def main(subnet_tag, driver=None, network=None):
package = await vm.repo(
image_hash="9a3b5d67b0b27746283cb5f287c13eab1beaa12d92a9f536b747c7ae",
min_mem_gib=0.5,
min_storage_gib=2.0,
)

async def worker(ctx: WorkContext, tasks):
script_dir = pathlib.Path(__file__).resolve().parent
scene_path = str(script_dir / "cubes.blend")
ctx.send_file(scene_path, "/golem/resource/scene.blend")
async for task in tasks:
frame = task.data
crops = [{"outfilebasename": "out", "borders_x": [0.0, 1.0], "borders_y": [0.0, 1.0]}]
ctx.send_json(
"/golem/work/params.json",
{
"scene_file": "/golem/resource/scene.blend",
"resolution": (400, 300),
"use_compositing": False,
"crops": crops,
"samples": 100,
"frames": [frame],
"output_format": "PNG",
"RESOURCES_DIR": "/golem/resources",
"WORK_DIR": "/golem/work",
"OUTPUT_DIR": "/golem/output",
},
)
ctx.run("/golem/entrypoints/run-blender.sh")
output_file = f"output_{frame}.png"
ctx.download_file(f"/golem/output/out{frame:04d}.png", output_file)
try:
# Set timeout for executing the script on the provider. Usually, 30 seconds
# should be more than enough for computing a single frame, however a provider
# may require more time for the first task if it needs to download a VM image
# first. Once downloaded, the VM image will be cached and other tasks that use
# that image will be computed faster.
yield ctx.commit(timeout=timedelta(minutes=10))
# TODO: Check if job results are valid
# and reject by: task.reject_task(reason = 'invalid file')
task.accept_result(result=output_file)
except BatchTimeoutError:
print(
f"{TEXT_COLOR_RED}"
f"Task {task} timed out on {ctx.provider_name}, time: {task.running_time}"
f"{TEXT_COLOR_DEFAULT}"
)
raise

# Iterator over the frame indices that we want to render
frames: range = range(0, 60, 10)
# Worst-case overhead, in minutes, for initialization (negotiation, file transfer etc.)
# TODO: make this dynamic, e.g. depending on the size of files to transfer
init_overhead = 3
# Providers will not accept work if the timeout is outside of the [5 min, 30min] range.
# We increase the lower bound to 6 min to account for the time needed for our demand to
# reach the providers.
min_timeout, max_timeout = 6, 30

timeout = timedelta(minutes=max(min(init_overhead + len(frames) * 2, max_timeout), min_timeout))

# By passing `event_consumer=log_summary()` we enable summary logging.
# See the documentation of the `yapapi.log` module on how to set
# the level of detail and format of the logged information.
async with Executor(
payload=package,
max_workers=3,
budget=10.0,
timeout=timeout,
subnet_tag=subnet_tag,
driver=driver,
network=network,
event_consumer=log_summary(log_event_repr),
) as executor:

print(
f"yapapi version: {TEXT_COLOR_YELLOW}{yapapi_version}{TEXT_COLOR_DEFAULT}\n"
f"Using subnet: {TEXT_COLOR_YELLOW}{subnet_tag}{TEXT_COLOR_DEFAULT}, "
f"payment driver: {TEXT_COLOR_YELLOW}{executor.driver}{TEXT_COLOR_DEFAULT}, "
f"and network: {TEXT_COLOR_YELLOW}{executor.network}{TEXT_COLOR_DEFAULT}\n"
)

num_tasks = 0
start_time = datetime.now()

async for task in executor.submit(worker, [Task(data=frame) for frame in frames]):
num_tasks += 1
print(
f"{TEXT_COLOR_CYAN}"
f"Task computed: {task}, result: {task.result}, time: {task.running_time}"
f"{TEXT_COLOR_DEFAULT}"
)

print(
f"{TEXT_COLOR_CYAN}"
f"{num_tasks} tasks computed, total time: {datetime.now() - start_time}"
f"{TEXT_COLOR_DEFAULT}"
)


if __name__ == "__main__":
parser = build_parser("Render a Blender scene")
now = datetime.now().strftime("%Y-%m-%d_%H.%M.%S")
parser.set_defaults(log_file=f"blender-yapapi-{now}.log")
args = parser.parse_args()

# This is only required when running on Windows with Python prior to 3.8:
windows_event_loop_fix()

enable_default_logger(
log_file=args.log_file,
debug_activity_api=True,
debug_market_api=True,
debug_payment_api=True,
)

loop = asyncio.get_event_loop()
task = loop.create_task(
main(subnet_tag=args.subnet_tag, driver=args.driver, network=args.network)
)

try:
loop.run_until_complete(task)
except NoPaymentAccountError as e:
handbook_url = (
"https://handbook.golem.network/requestor-tutorials/"
"flash-tutorial-of-requestor-development"
)
print(
f"{TEXT_COLOR_RED}"
f"No payment account initialized for driver `{e.required_driver}` "
f"and network `{e.required_network}`.\n\n"
f"See {handbook_url} on how to initialize payment accounts for a requestor node."
f"{TEXT_COLOR_DEFAULT}"
)
except KeyboardInterrupt:
print(
f"{TEXT_COLOR_YELLOW}"
"Shutting down gracefully, please wait a short while "
"or press Ctrl+C to exit immediately..."
f"{TEXT_COLOR_DEFAULT}"
)
task.cancel()
try:
loop.run_until_complete(task)
print(
f"{TEXT_COLOR_YELLOW}Shutdown completed, thank you for waiting!{TEXT_COLOR_DEFAULT}"
)
except (asyncio.CancelledError, KeyboardInterrupt):
pass
21 changes: 13 additions & 8 deletions examples/blender/blender.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
WorkContext,
windows_event_loop_fix,
)
from yapapi.executor import Golem
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's most likely not the scope of this PR but it would be best to make this from yapapi import Golem, either by actually moving the code or just importing it in the base package.

Copy link
Contributor Author

@shadeofblue shadeofblue May 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say this should be in the scope of the changes to be frozen - but I'll let @azawlocki decide if he'd like to proceed with reshuffling of code between the files within the scope of this pull request...

from yapapi.log import enable_default_logger, log_summary, log_event_repr # noqa
from yapapi.payload import vm
from yapapi.rest.activity import BatchTimeoutError
Expand Down Expand Up @@ -93,28 +94,32 @@ async def worker(ctx: WorkContext, tasks):
# By passing `event_consumer=log_summary()` we enable summary logging.
# See the documentation of the `yapapi.log` module on how to set
# the level of detail and format of the logged information.
async with Executor(
payload=package,
max_workers=3,
async with Golem(
budget=10.0,
timeout=timeout,
subnet_tag=subnet_tag,
driver=driver,
network=network,
event_consumer=log_summary(log_event_repr),
) as executor:
) as golem:

print(
f"yapapi version: {TEXT_COLOR_YELLOW}{yapapi_version}{TEXT_COLOR_DEFAULT}\n"
f"Using subnet: {TEXT_COLOR_YELLOW}{subnet_tag}{TEXT_COLOR_DEFAULT}, "
f"payment driver: {TEXT_COLOR_YELLOW}{executor.driver}{TEXT_COLOR_DEFAULT}, "
f"and network: {TEXT_COLOR_YELLOW}{executor.network}{TEXT_COLOR_DEFAULT}\n"
f"payment driver: {TEXT_COLOR_YELLOW}{driver}{TEXT_COLOR_DEFAULT}, "
f"and network: {TEXT_COLOR_YELLOW}{network}{TEXT_COLOR_DEFAULT}\n"
)

num_tasks = 0
start_time = datetime.now()

async for task in executor.submit(worker, [Task(data=frame) for frame in frames]):
completed_tasks = golem.execute_tasks(
worker,
[Task(data=frame) for frame in frames],
payload=package,
max_workers=3,
timeout=timeout,
)
async for task in completed_tasks:
num_tasks += 1
print(
f"{TEXT_COLOR_CYAN}"
Expand Down
18 changes: 9 additions & 9 deletions tests/executor/test_payment_platforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ async def test_no_accounts_raises(monkeypatch):

monkeypatch.setattr(Payment, "accounts", _mock_accounts_iterator())

async with Executor(package=mock.Mock(), budget=10.0) as executor:
with pytest.raises(NoPaymentAccountError):
with pytest.raises(NoPaymentAccountError):
async with Executor(package=mock.Mock(), budget=10.0) as executor:
async for _ in executor.submit(worker=mock.Mock(), data=mock.Mock()):
pass

Expand All @@ -90,15 +90,15 @@ async def test_no_matching_account_raises(monkeypatch):
),
)

async with Executor(
package=mock.Mock(), budget=10.0, driver="matching-driver", network="matching-network"
) as executor:
with pytest.raises(NoPaymentAccountError) as exc_info:
with pytest.raises(NoPaymentAccountError) as exc_info:
async with Executor(
package=mock.Mock(), budget=10.0, driver="matching-driver", network="matching-network"
) as executor:
async for _ in executor.submit(worker=mock.Mock(), data=mock.Mock()):
pass
exc = exc_info.value
assert exc.required_driver == "matching-driver"
assert exc.required_network == "matching-network"
exc = exc_info.value
assert exc.required_driver == "matching-driver"
assert exc.required_network == "matching-network"


@pytest.mark.asyncio
Expand Down
10 changes: 5 additions & 5 deletions tests/executor/test_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytest
from unittest.mock import Mock

from yapapi.executor import Executor
from yapapi.executor import Golem
from yapapi.executor.strategy import (
DecreaseScoreForUnconfirmedAgreement,
LeastExpensiveLinearPayuMS,
Expand Down Expand Up @@ -130,8 +130,8 @@ async def test_default_strategy_type(monkeypatch):

monkeypatch.setattr(yapapi.rest, "Configuration", Mock)

executor = Executor(package=Mock(), budget=1.0)
default_strategy = executor.strategy
golem = Golem(budget=1.0)
default_strategy = golem.strategy
assert isinstance(default_strategy, DecreaseScoreForUnconfirmedAgreement)
assert isinstance(default_strategy.base_strategy, LeastExpensiveLinearPayuMS)

Expand All @@ -143,8 +143,8 @@ async def test_user_strategy_not_modified(monkeypatch):
monkeypatch.setattr(yapapi.rest, "Configuration", Mock)

user_strategy = Mock()
executor = Executor(package=Mock(), budget=1.0, strategy=user_strategy)
assert executor.strategy == user_strategy
golem = Golem(budget=1.0, strategy=user_strategy)
assert golem.strategy == user_strategy


class TestLeastExpensiveLinearPayuMS:
Expand Down
Loading