Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple scripts from WorkContext #609

Merged
merged 38 commits into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
fef588d
Create Script class, move relevant logic to its package
kmazurek Aug 19, 2021
9484671
WIP: add evaluation to script steps
kmazurek Aug 19, 2021
b767676
WIP: add new_script to WorkContext
kmazurek Aug 23, 2021
22373aa
Add timeout and wait_for_results to Script init
kmazurek Aug 24, 2021
9b5010b
Make script commands a public interface
kmazurek Aug 24, 2021
0351664
Enable awaiting individual commands of a script
kmazurek Aug 26, 2021
5f385c3
Make Script._serialize synchronous
kmazurek Aug 26, 2021
efaf2c3
Add direct step calls to Script
kmazurek Aug 26, 2021
6f5fcfd
Make WorkContext backward compatible with Script
kmazurek Aug 26, 2021
2af95d3
Merge branch 'master' into km/context-script-decouple
kmazurek Aug 27, 2021
3742e18
Fix isinstance check in _set_cmd_result
kmazurek Aug 30, 2021
063c684
Update type hint on *args
kmazurek Aug 30, 2021
8278256
Adapt WorkContext unit tests
kmazurek Aug 30, 2021
6a95fc7
Add Script unit tests
kmazurek Aug 31, 2021
3d9c123
WIP: test agr termination debugging
kmazurek Sep 1, 2021
cc27a0a
Fix script handling in process_batches
kmazurek Sep 1, 2021
9d7f2aa
Update Script unit tests
kmazurek Sep 1, 2021
d8430ed
Update types based on mypy
kmazurek Sep 1, 2021
92775fc
Remove unused classes
kmazurek Sep 1, 2021
8214af1
Address flake8 errors
kmazurek Sep 1, 2021
69150eb
Fix AsyncMock imports for Python <3.8
kmazurek Sep 1, 2021
f788d54
Merge branch 'master' into km/context-script-decouple
kmazurek Sep 1, 2021
8dfea06
Add missing docstrings
kmazurek Sep 1, 2021
3386271
Merge branch 'master' into km/context-script-decouple
kmazurek Sep 2, 2021
f503432
Add Script docstring
kmazurek Sep 2, 2021
5378db3
Reverse order of stdout and stderr in Run#__init__
kmazurek Sep 3, 2021
82d2cd7
Change Script fields to be instance variables
kmazurek Sep 3, 2021
4281648
Add id property to Script instances
kmazurek Sep 3, 2021
c086011
Move command result future to Command field
kmazurek Sep 7, 2021
e574447
Update type hint on Command result
kmazurek Sep 7, 2021
fbb32b1
Add return statements to WorkContext command methods
kmazurek Sep 8, 2021
53d23eb
Add a factory for WorkContext objects in tests
kmazurek Sep 8, 2021
4741009
Rename send_* methods to upload_* on Script
kmazurek Sep 8, 2021
37f3dfe
Make stdout and stderr optional on Script.run
kmazurek Sep 8, 2021
64f4a17
Remove redundant condition from Run.evaluate
kmazurek Sep 8, 2021
804f765
Update yapapi/script/__init__.py
kmazurek Sep 8, 2021
b9f4d32
Merge branch 'master' into km/context-script-decouple
kmazurek Sep 8, 2021
41d731e
Merge branch 'master' into km/context-script-decouple
kmazurek Sep 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 43 additions & 31 deletions yapapi/ctx.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import abc
from copy import copy
from dataclasses import dataclass, field
from datetime import timedelta, datetime
from deprecated import deprecated # type: ignore
import enum
import json
import logging
Expand All @@ -16,6 +18,7 @@

from yapapi.events import DownloadStarted, DownloadFinished
from yapapi.props.com import ComLinear, Counter
from yapapi.script import Script
from yapapi.storage import StorageProvider, Source, Destination, DOWNLOAD_BYTES_LIMIT_DEFAULT
from yapapi.rest.market import AgreementDetails
from yapapi.rest.activity import Activity
Expand Down Expand Up @@ -326,6 +329,7 @@ def __init__(
self._started: bool = False

self.__payment_model: Optional[ComLinear] = None
self.__script: Optional[Script] = None

@property
def id(self) -> str:
Expand All @@ -352,28 +356,32 @@ def _payment_model(self) -> ComLinear:
return self.__payment_model

def __prepare(self):
if not self._started and self._implicit_init:
self.deploy()
self.start()
self._started = True
if not self.__script:
self.__script = Script(self)

def begin(self):
pass
def new_script(self):
"""Stuff."""
return Script(self)

@deprecated(version="0.7.0", reason="please use a Script object via WorkContext.new_script")
def deploy(self):
"""Schedule a Deploy command."""
self._implicit_init = False
self._pending_steps.append(_Deploy())
self.__prepare()
self.__script.deploy()

@deprecated(version="0.7.0", reason="please use a Script object via WorkContext.new_script")
def start(self, *args: str):
"""Schedule a Start command."""
self._implicit_init = False
self._pending_steps.append(_Start(*args))
self.__prepare()
self.__script.start(*args)

@deprecated(version="0.7.0", reason="please use a Script object via WorkContext.new_script")
def terminate(self):
"""Schedule a Terminate command."""
self._pending_steps.append(_Terminate())
self.__prepare()
self.__script.terminate()

@deprecated(version="0.7.0", reason="please use a Script object via WorkContext.new_script")
def send_json(self, json_path: str, data: dict):
"""Schedule sending JSON data to the provider.

Expand All @@ -382,8 +390,9 @@ def send_json(self, json_path: str, data: dict):
:return: None
"""
self.__prepare()
self._pending_steps.append(_SendJson(self._storage, data, json_path))
self.__script.send_json(data, json_path)
johny-b marked this conversation as resolved.
Show resolved Hide resolved

@deprecated(version="0.7.0", reason="please use a Script object via WorkContext.new_script")
def send_bytes(self, dst_path: str, data: bytes):
"""Schedule sending bytes data to the provider.

Expand All @@ -392,8 +401,9 @@ def send_bytes(self, dst_path: str, data: bytes):
:return: None
"""
self.__prepare()
self._pending_steps.append(_SendBytes(self._storage, data, dst_path))
self.__script.send_bytes(data, dst_path)
johny-b marked this conversation as resolved.
Show resolved Hide resolved

@deprecated(version="0.7.0", reason="please use a Script object via WorkContext.new_script")
def send_file(self, src_path: str, dst_path: str):
"""Schedule sending file to the provider.

Expand All @@ -402,8 +412,9 @@ def send_file(self, src_path: str, dst_path: str):
:return: None
"""
self.__prepare()
self._pending_steps.append(_SendFile(self._storage, src_path, dst_path))
self.__script.send_file(src_path, dst_path)

@deprecated(version="0.7.0", reason="please use a Script object via WorkContext.new_script")
def run(
self,
cmd: str,
Expand All @@ -417,12 +428,10 @@ def run(
:param env: optional dictionary with environmental variables
:return: None
"""
stdout = CaptureContext.build(mode="stream")
stderr = CaptureContext.build(mode="stream")

self.__prepare()
self._pending_steps.append(_Run(cmd, *args, env=env, stdout=stdout, stderr=stderr))
self.__script.run(cmd, *args, env=env)

@deprecated(version="0.7.0", reason="please use a Script object via WorkContext.new_script")
def download_file(self, src_path: str, dst_path: str):
"""Schedule downloading remote file from the provider.

Expand All @@ -431,51 +440,54 @@ def download_file(self, src_path: str, dst_path: str):
:return: None
"""
self.__prepare()
self._pending_steps.append(_ReceiveFile(self._storage, src_path, dst_path, self._emitter))
self.__script.download_file(src_path, dst_path)

@deprecated(version="0.7.0", reason="please use a Script object via WorkContext.new_script")
def download_bytes(
self,
src_path: str,
on_download: Callable[[bytes], Awaitable],
limit: int = DOWNLOAD_BYTES_LIMIT_DEFAULT,
):
"""Schedule downloading a remote file as bytes

:param src_path: remote (provider) path
:param on_download: the callable to run on the received data
:param limit: the maximum length of the expected byte string
:return None
"""
self.__prepare()
self._pending_steps.append(
_ReceiveBytes(self._storage, src_path, on_download, limit, self._emitter)
)
self.__script.download_bytes(src_path, on_download, limit)

@deprecated(version="0.7.0", reason="please use a Script object via WorkContext.new_script")
def download_json(
self,
src_path: str,
on_download: Callable[[Any], Awaitable],
limit: int = DOWNLOAD_BYTES_LIMIT_DEFAULT,
):
"""Schedule downloading a remote file as JSON

:param src_path: remote (provider) path
:param on_download: the callable to run on the received JSON data
:param limit: the maximum length of the expected remote file
:return None
"""
self.__prepare()
self._pending_steps.append(
_ReceiveJson(self._storage, src_path, on_download, limit, self._emitter)
)
self.__script.download_json(src_path, on_download, limit)

def commit(self, timeout: Optional[timedelta] = None) -> Work:
@deprecated(version="0.7.0", reason="please use a Script object via WorkContext.new_script")
def commit(self, timeout: Optional[timedelta] = None) -> Script:
"""Creates a sequence of commands to be sent to provider.

:return: Work object containing the sequence of commands
scheduled within this work context before calling this method)
:return: Script object containing the sequence of commands
scheduled within this work context before calling this method
"""
steps = self._pending_steps
self._pending_steps = []
return Steps(*steps, timeout=timeout)
if timeout:
self.__script.timeout = timeout
script_to_commit = copy(self.__script)
self.__script = None
return script_to_commit

async def get_raw_usage(self) -> yaa_ActivityUsage:
"""Get the raw usage vector for the activity bound to this work context.
Expand Down
47 changes: 17 additions & 30 deletions yapapi/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
from yapapi.props.builder import DemandBuilder, DemandDecorator
from yapapi.rest.activity import CommandExecutionError, Activity
from yapapi.rest.market import Agreement, AgreementDetails, OfferProposal, Subscription
from yapapi.script import Script
from yapapi.script.command import BatchCommand
from yapapi.storage import gftp
from yapapi.strategy import (
DecreaseScoreForUnconfirmedAgreement,
Expand Down Expand Up @@ -576,66 +578,51 @@ async def process_batches(
job_id: str,
agreement_id: str,
activity: rest.activity.Activity,
command_generator: AsyncGenerator[WorkItem, Awaitable[List[events.CommandEvent]]],
command_generator: AsyncGenerator[Script, Awaitable[List[events.CommandEvent]]],
) -> None:
"""Send command batches produced by `command_generator` to `activity`."""

item = await command_generator.__anext__()
script: Script = await command_generator.__anext__()

while True:

batch, exec_options = _unpack_work_item(item)

# TODO: `task_id` should really be `batch_id`, but then we should also rename
# `task_id` field of several events (e.g. `ScriptSent`)
script_id = str(next(exescript_ids))
johny-b marked this conversation as resolved.
Show resolved Hide resolved

if batch.timeout:
if exec_options.batch_timeout:
logger.warning(
"Overriding batch timeout set with commit(batch_timeout)"
"by the value set in exec options"
)
else:
exec_options.batch_timeout = batch.timeout

batch_deadline = (
datetime.now(timezone.utc) + exec_options.batch_timeout
if exec_options.batch_timeout
else None
)
batch_deadline = datetime.now(timezone.utc) + script.timeout if script.timeout else None
johny-b marked this conversation as resolved.
Show resolved Hide resolved

cc = CommandContainer()
try:
await batch.prepare()
batch.register(cc)
remote = await activity.send(cc.commands(), deadline=batch_deadline)
await script._before()
batch: List[BatchCommand] = script._evaluate()
remote = await activity.send(batch, deadline=batch_deadline)
except Exception:
item = await command_generator.athrow(*sys.exc_info())
continue

cmds = cc.commands()
self.emit(
events.ScriptSent(
job_id=job_id, agr_id=agreement_id, script_id=script_id, cmds=cmds
job_id=job_id, agr_id=agreement_id, script_id=script_id, cmds=batch
)
)

async def get_batch_results() -> List[events.CommandEvent]:
results = []
async for evt_ctx in remote:
evt = evt_ctx.event(
job_id=job_id, agr_id=agreement_id, script_id=script_id, cmds=cmds
job_id=job_id, agr_id=agreement_id, script_id=script_id, cmds=batch
)
self.emit(evt)
results.append(evt)
if isinstance(evt, events.CommandExecuted) and not evt.success:
raise CommandExecutionError(evt.command, evt.message, evt.stderr)
if isinstance(evt, events.CommandExecuted):
if evt.success:
script._set_cmd_result(evt)
else:
raise CommandExecutionError(evt.command, evt.message, evt.stderr)

self.emit(
events.GettingResults(job_id=job_id, agr_id=agreement_id, script_id=script_id)
)
await batch.post()
await script._after()
self.emit(
events.ScriptFinished(job_id=job_id, agr_id=agreement_id, script_id=script_id)
)
Expand All @@ -644,7 +631,7 @@ async def get_batch_results() -> List[events.CommandEvent]:

loop = asyncio.get_event_loop()

if exec_options.wait_for_results:
if script.wait_for_results:
# Block until the results are available
try:
future_results = loop.create_future()
Expand Down
Loading