Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove implicit_init from the WorkContext #647

Merged
merged 11 commits into from
Sep 15, 2021
2 changes: 2 additions & 0 deletions examples/simple-service-poc/simple_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ async def get_payload():

async def start(self):
# handler responsible for starting the service
async for script in super().start():
yield script
self._ctx.run(self.SIMPLE_SERVICE_CTL, "--start")
yield self._ctx.commit()

Expand Down
3 changes: 3 additions & 0 deletions tests/goth_tests/test_instance_restart/requestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ async def start(self):

global instances_started

async for script in super().start():
yield script

self._ctx.run("/bin/echo", "STARTING", str(instances_started + 1))
future_results = yield self._ctx.commit()
results = await future_results
Expand Down
25 changes: 2 additions & 23 deletions tests/script/test_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

from yapapi.events import CommandExecuted
from yapapi.script import Script
from yapapi.script.command import Deploy, Start

if sys.version_info >= (3, 8):
from tests.factories.context import WorkContextFactory
Expand Down Expand Up @@ -98,36 +97,16 @@ async def test_download_json(self):
self._assert_src_path(script, src_path)
assert self._on_download_executed

@pytest.mark.asyncio
async def test_implicit_init(self):
work_context = WorkContextFactory()
script = work_context.new_script()

# first script, should include implicit deploy and start cmds
await script._before()
assert len(script._commands) == 2
deploy_cmd = script._commands[0]
assert isinstance(deploy_cmd, Deploy)
start_cmd = script._commands[1]
assert isinstance(start_cmd, Start)
assert work_context._started

# second script, should not include implicit deploy and start
script = work_context.new_script()
script.run("/some/cmd")
await script._before()
assert len(script._commands) == 1

@pytest.mark.asyncio
async def test_cmd_result(self):
work_context = WorkContextFactory()
script = work_context.new_script()
future_result = script.run("/some/cmd", 1)

await script._before()
run_cmd = script._commands[2]
run_cmd = script._commands[0]
result = CommandExecuted(
"job_id", "agr_id", "script_id", 2, command=run_cmd.evaluate(work_context)
"job_id", "agr_id", "script_id", 0, command=run_cmd.evaluate(work_context)
)
script._set_cmd_result(result)

Expand Down
3 changes: 0 additions & 3 deletions yapapi/ctx.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,13 @@ def __init__(
agreement_details: AgreementDetails,
storage: StorageProvider,
emitter: Optional[Callable[[StorageEvent], None]] = None,
implicit_init: bool = True,
):
self._activity = activity
self._agreement_details = agreement_details
self._storage: StorageProvider = storage
self._emitter: Optional[Callable[[StorageEvent], None]] = emitter
self._implicit_init = implicit_init

self._pending_steps: List[Work] = []
self._started: bool = False

self.__payment_model: Optional[ComLinear] = None
self.__script: Script = self.new_script()
Expand Down
21 changes: 21 additions & 0 deletions yapapi/executor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from yapapi.rest.activity import Activity
from yapapi.script import Script
from yapapi.engine import _Engine, Job
from yapapi.script.command import Deploy, Start
import yapapi.utils

from .task import Task, TaskStatus
Expand Down Expand Up @@ -56,13 +57,15 @@ def __init__(
*,
_engine: _Engine,
payload: Payload,
implicit_init: bool,
max_workers: int = 5,
timeout: timedelta = DEFAULT_EXECUTOR_TIMEOUT,
):
logger.debug("Creating Executor instance; parameters: %s", locals())

self._engine = _engine
self._payload = payload
self._implicit_init = implicit_init
self._timeout = timeout
self._max_workers = max_workers

Expand Down Expand Up @@ -208,6 +211,12 @@ async def task_generator() -> AsyncIterator[Task[D, R]]:
)

batch_generator = worker(work_context, task_generator())

if self._implicit_init:
await self._perform_implicit_init(
work_context, job.id, agreement.id, activity
)

try:
await self._engine.process_batches(
job.id, agreement.id, activity, batch_generator
Expand Down Expand Up @@ -362,3 +371,15 @@ async def worker_starter() -> None:
logger.debug(
"Got error when waiting for services to finish", exc_info=True, job_id=job.id
)

async def _perform_implicit_init(self, ctx, job_id, agreement_id, activity):
async def implicit_init():
script = ctx.new_script()
script.add(Deploy())
script.add(Start())
yield script

try:
await self._engine.process_batches(job_id, agreement_id, activity, implicit_init())
except StopAsyncIteration:
pass
6 changes: 5 additions & 1 deletion yapapi/golem.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ async def execute_tasks(
max_workers: Optional[int] = None,
timeout: Optional[timedelta] = None,
job_id: Optional[str] = None,
implicit_init: bool = True,
) -> AsyncIterator[Task[D, R]]:
"""Submit a sequence of tasks to be executed on providers.

Expand All @@ -170,6 +171,9 @@ async def execute_tasks(
:param timeout: timeout for computing all tasks, passed to the `Executor` instance
:param job_id: an optional string to identify the job created by this method.
Passed as the value of the `id` parameter to `Job()`.
:param implicit_init: True -> `ctx.deploy()` and `ctx.start()` will be called internally by the `Executor`.
False -> those calls must be in the `worker` function

:return: an iterator that yields completed `Task` objects

example usage:
Expand All @@ -193,7 +197,7 @@ async def worker(context: WorkContext, tasks: AsyncIterable[Task]):
```
"""

kwargs: Dict[str, Any] = {"payload": payload}
kwargs: Dict[str, Any] = {"payload": payload, "implicit_init": implicit_init}
if max_workers:
kwargs["max_workers"] = max_workers
if timeout:
Expand Down
8 changes: 0 additions & 8 deletions yapapi/script/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ class Script:
instance is meant to be yielded from a worker function (work generator pattern).
Commands will be run in the order in which they were added to the script.

If the `WorkContext` instance this `Script` uses has the field `_implicit_init` set to `True`,
the first yielded script is going to be prepended with `Deploy` and
`Start` commands.
"""

timeout: Optional[timedelta]
Expand Down Expand Up @@ -88,11 +85,6 @@ async def _after(self):

async def _before(self):
"""Hook which is executed before the script is evaluated and sent to the provider."""
if not self._ctx._started and self._ctx._implicit_init:
self._commands.insert(0, Deploy())
self._commands.insert(1, Start())
self._ctx._started = True

for cmd in self._commands:
await cmd.before(self._ctx)

Expand Down
10 changes: 6 additions & 4 deletions yapapi/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ async def start(self) -> AsyncGenerator[Script, Awaitable[List[events.CommandEve
Should perform the minimum set of operations after which the instance of a service can be
treated as "started", or, in other words, ready to receive service requests. It's up to the
developer of the specific Service class to decide what exact operations constitute a
service startup.
service startup. In the most common scenario `ctx.deploy()` and `ctx.start()` are required,
check the `Default implementation` section for more details.

As a handler implementing the [work generator pattern](https://handbook.golem.network/requestor-tutorials/golem-application-fundamentals/hl-api-work-generator-pattern),
it's expected to be a generator that yields `Script`s (generated using the service's
Expand Down Expand Up @@ -285,15 +286,16 @@ async def start(self):

Additionally, it also assumes that the exe-unit doesn't need any additional parameters
in its `start()` call (e.g. for the VM runtime, all the required parameters are already
passed as part of the agreement between the requestor and the provider).
passed as part of the agreement between the requestor and the provider), and parameters
passed to `deploy()` are returned by `self.get_deploy_args()` method.

Therefore, this default implementation performs the minimum required for a VM payload to
start responding to `run` commands. If your service requires any additional operations -
you'll need to override this method (possibly starting with a call to `super().start()`)
you'll need to override this method (possibly first yielding from the parent - `super().start()` - generator)
to add appropriate preparatory steps.

In case of runtimes other than VM, `deploy` and/or `start` might be optional or altogether
disallowed, plus `start` itself might take some parameters. It is up to the author of the
disallowed, plus `deploy`/`start` themselves may take some parameters. It is up to the author of the
specific `Service` implementation that uses such a payload to adjust this method accordingly
based on the requirements for the given runtime/exe-unit type.
"""
Expand Down