From df0f8ba1b4c7dca2db2362ec4ca4a9412c2bf5ef Mon Sep 17 00:00:00 2001 From: Jan Betley Date: Fri, 10 Sep 2021 13:01:32 +0200 Subject: [PATCH 1/9] replace implicit init with an explicit init --- yapapi/ctx.py | 2 -- yapapi/executor/__init__.py | 3 +++ yapapi/script/__init__.py | 8 -------- 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/yapapi/ctx.py b/yapapi/ctx.py index 85fb4c3fa..117fbfd6a 100644 --- a/yapapi/ctx.py +++ b/yapapi/ctx.py @@ -86,13 +86,11 @@ def __init__( agreement_details: AgreementDetails, storage: StorageProvider, emitter: Optional[Callable[[StorageEvent], None]] = None, - implicit_init: bool = True, ): self._activity = activity self._agreement_details = agreement_details self._storage: StorageProvider = storage self._emitter: Optional[Callable[[StorageEvent], None]] = emitter - self._implicit_init = implicit_init self._pending_steps: List[Work] = [] self._started: bool = False diff --git a/yapapi/executor/__init__.py b/yapapi/executor/__init__.py index 921d2f863..ad9535752 100644 --- a/yapapi/executor/__init__.py +++ b/yapapi/executor/__init__.py @@ -186,6 +186,9 @@ async def _worker( ) -> None: nonlocal job + work_context.deploy() + work_context.start() + with work_queue.new_consumer() as consumer: try: diff --git a/yapapi/script/__init__.py b/yapapi/script/__init__.py index 8c7791523..377d7777f 100644 --- a/yapapi/script/__init__.py +++ b/yapapi/script/__init__.py @@ -36,9 +36,6 @@ class Script: instance is meant to be yielded from a worker function (work generator pattern). Commands will be run in the order in which they were added to the script. - If the `WorkContext` instance this `Script` uses has the field `_implicit_init` set to `True`, - the first yielded script is going to be prepended with `Deploy` and - `Start` commands. """ timeout: Optional[timedelta] @@ -88,11 +85,6 @@ async def _after(self): async def _before(self): """Hook which is executed before the script is evaluated and sent to the provider.""" - if not self._ctx._started and self._ctx._implicit_init: - self._commands.insert(0, Deploy()) - self._commands.insert(1, Start()) - self._ctx._started = True - for cmd in self._commands: await cmd.before(self._ctx) From 6479eb46dc0f006b7c985779b250bedbd1d0a866 Mon Sep 17 00:00:00 2001 From: Jan Betley Date: Fri, 10 Sep 2021 13:12:06 +0200 Subject: [PATCH 2/9] WorkContext._started is no longer used --- yapapi/ctx.py | 1 - 1 file changed, 1 deletion(-) diff --git a/yapapi/ctx.py b/yapapi/ctx.py index 117fbfd6a..daa82ba91 100644 --- a/yapapi/ctx.py +++ b/yapapi/ctx.py @@ -93,7 +93,6 @@ def __init__( self._emitter: Optional[Callable[[StorageEvent], None]] = emitter self._pending_steps: List[Work] = [] - self._started: bool = False self.__payment_model: Optional[ComLinear] = None self.__script: Script = self.new_script() From e6934a0a88ef77b27e0cf5f3b96300587c5c23b6 Mon Sep 17 00:00:00 2001 From: Jan Betley Date: Fri, 10 Sep 2021 13:18:37 +0200 Subject: [PATCH 3/9] remove implicit_init test and fix other test --- tests/script/test_script.py | 25 ++----------------------- 1 file changed, 2 insertions(+), 23 deletions(-) diff --git a/tests/script/test_script.py b/tests/script/test_script.py index 14f154e16..3861024f3 100644 --- a/tests/script/test_script.py +++ b/tests/script/test_script.py @@ -6,7 +6,6 @@ from yapapi.events import CommandExecuted from yapapi.script import Script -from yapapi.script.command import Deploy, Start if sys.version_info >= (3, 8): from tests.factories.context import WorkContextFactory @@ -98,26 +97,6 @@ async def test_download_json(self): self._assert_src_path(script, src_path) assert self._on_download_executed - @pytest.mark.asyncio - async def test_implicit_init(self): - work_context = WorkContextFactory() - script = work_context.new_script() - - # first script, should include implicit deploy and start cmds - await script._before() - assert len(script._commands) == 2 - deploy_cmd = script._commands[0] - assert isinstance(deploy_cmd, Deploy) - start_cmd = script._commands[1] - assert isinstance(start_cmd, Start) - assert work_context._started - - # second script, should not include implicit deploy and start - script = work_context.new_script() - script.run("/some/cmd") - await script._before() - assert len(script._commands) == 1 - @pytest.mark.asyncio async def test_cmd_result(self): work_context = WorkContextFactory() @@ -125,9 +104,9 @@ async def test_cmd_result(self): future_result = script.run("/some/cmd", 1) await script._before() - run_cmd = script._commands[2] + run_cmd = script._commands[0] result = CommandExecuted( - "job_id", "agr_id", "script_id", 2, command=run_cmd.evaluate(work_context) + "job_id", "agr_id", "script_id", 0, command=run_cmd.evaluate(work_context) ) script._set_cmd_result(result) From 49542c46fe95791e0566cce7edba6f9c66c85c75 Mon Sep 17 00:00:00 2001 From: Jan Betley Date: Fri, 10 Sep 2021 14:29:50 +0200 Subject: [PATCH 4/9] add explicit deploy and start in example and tests --- examples/simple-service-poc/simple_service.py | 2 ++ tests/goth_tests/test_instance_restart/requestor.py | 3 +++ 2 files changed, 5 insertions(+) diff --git a/examples/simple-service-poc/simple_service.py b/examples/simple-service-poc/simple_service.py index 7e52d9b3f..0e4d3cde5 100755 --- a/examples/simple-service-poc/simple_service.py +++ b/examples/simple-service-poc/simple_service.py @@ -59,6 +59,8 @@ async def get_payload(): async def start(self): # handler responsible for starting the service + self._ctx.deploy() + self._ctx.start() self._ctx.run(self.SIMPLE_SERVICE_CTL, "--start") yield self._ctx.commit() diff --git a/tests/goth_tests/test_instance_restart/requestor.py b/tests/goth_tests/test_instance_restart/requestor.py index 4895042e2..2ca49829d 100755 --- a/tests/goth_tests/test_instance_restart/requestor.py +++ b/tests/goth_tests/test_instance_restart/requestor.py @@ -39,6 +39,9 @@ async def start(self): global instances_started + self._ctx.deploy() + self._ctx.start() + self._ctx.run("/bin/echo", "STARTING", str(instances_started + 1)) future_results = yield self._ctx.commit() results = await future_results From 85f157c2a9013239c64a9676cd7c20c53cd43c97 Mon Sep 17 00:00:00 2001 From: Jan Betley Date: Mon, 13 Sep 2021 13:42:56 +0200 Subject: [PATCH 5/9] implicit_init in Golem.execute_tasks --- yapapi/executor/__init__.py | 7 +++++-- yapapi/golem.py | 6 +++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/yapapi/executor/__init__.py b/yapapi/executor/__init__.py index ad9535752..55183fe62 100644 --- a/yapapi/executor/__init__.py +++ b/yapapi/executor/__init__.py @@ -56,6 +56,7 @@ def __init__( *, _engine: _Engine, payload: Payload, + implicit_init: bool, max_workers: int = 5, timeout: timedelta = DEFAULT_EXECUTOR_TIMEOUT, ): @@ -63,6 +64,7 @@ def __init__( self._engine = _engine self._payload = payload + self._implicit_init = implicit_init self._timeout = timeout self._max_workers = max_workers @@ -186,8 +188,9 @@ async def _worker( ) -> None: nonlocal job - work_context.deploy() - work_context.start() + if self._implicit_init: + work_context.deploy() + work_context.start() with work_queue.new_consumer() as consumer: try: diff --git a/yapapi/golem.py b/yapapi/golem.py index a74ded1da..3455c1bec 100644 --- a/yapapi/golem.py +++ b/yapapi/golem.py @@ -153,6 +153,7 @@ async def execute_tasks( max_workers: Optional[int] = None, timeout: Optional[timedelta] = None, job_id: Optional[str] = None, + implicit_init: bool = True, ) -> AsyncIterator[Task[D, R]]: """Submit a sequence of tasks to be executed on providers. @@ -170,6 +171,9 @@ async def execute_tasks( :param timeout: timeout for computing all tasks, passed to the `Executor` instance :param job_id: an optional string to identify the job created by this method. Passed as the value of the `id` parameter to `Job()`. + :param implicit_init: True -> `ctx.deploy()` and `ctx.start()` will be called internally by the `Executor`. + False -> those calls must be in the `worker` function + :return: an iterator that yields completed `Task` objects example usage: @@ -193,7 +197,7 @@ async def worker(context: WorkContext, tasks: AsyncIterable[Task]): ``` """ - kwargs: Dict[str, Any] = {"payload": payload} + kwargs: Dict[str, Any] = {"payload": payload, "implicit_init": implicit_init} if max_workers: kwargs["max_workers"] = max_workers if timeout: From ea1a857442d14bdb13aced4a0efc76b4bd0c09b1 Mon Sep 17 00:00:00 2001 From: Jan Betley Date: Mon, 13 Sep 2021 14:30:13 +0200 Subject: [PATCH 6/9] improved Service.start() documentation --- yapapi/services.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/yapapi/services.py b/yapapi/services.py index c6638b739..f121ffdb2 100644 --- a/yapapi/services.py +++ b/yapapi/services.py @@ -244,7 +244,8 @@ async def start(self) -> AsyncGenerator[Script, Awaitable[List[events.CommandEve Should perform the minimum set of operations after which the instance of a service can be treated as "started", or, in other words, ready to receive service requests. It's up to the developer of the specific Service class to decide what exact operations constitute a - service startup. + service startup. In the most common scenario `ctx.deploy()` and `ctx.start()` are required, + check the `Default implementation` section for more details. As a handler implementing the [work generator pattern](https://handbook.golem.network/requestor-tutorials/golem-application-fundamentals/hl-api-work-generator-pattern), it's expected to be a generator that yields `Script`s (generated using the service's @@ -285,15 +286,16 @@ async def start(self): Additionally, it also assumes that the exe-unit doesn't need any additional parameters in its `start()` call (e.g. for the VM runtime, all the required parameters are already - passed as part of the agreement between the requestor and the provider). + passed as part of the agreement between the requestor and the provider), and parameters + passed to `deploy()` are returned by `self.get_deploy_args()` method. Therefore, this default implementation performs the minimum required for a VM payload to start responding to `run` commands. If your service requires any additional operations - - you'll need to override this method (possibly starting with a call to `super().start()`) - to add appropriate preparatory steps. + you'll need to override this method (possibly starting with yielding everything yielded by + `super().start()`) to add appropriate preparatory steps. In case of runtimes other than VM, `deploy` and/or `start` might be optional or altogether - disallowed, plus `start` itself might take some parameters. It is up to the author of the + disallowed, plus `deploy`/`start` itself might take some parameters. It is up to the author of the specific `Service` implementation that uses such a payload to adjust this method accordingly based on the requirements for the given runtime/exe-unit type. """ From 65fdb30f40493346a067f1d2140a5a6a47d88a61 Mon Sep 17 00:00:00 2001 From: johny-b <33967107+johny-b@users.noreply.github.com> Date: Tue, 14 Sep 2021 10:00:17 +0200 Subject: [PATCH 7/9] Apply suggestions from code review Co-authored-by: shadeofblue --- examples/simple-service-poc/simple_service.py | 4 ++-- tests/goth_tests/test_instance_restart/requestor.py | 4 ++-- yapapi/services.py | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/examples/simple-service-poc/simple_service.py b/examples/simple-service-poc/simple_service.py index 0e4d3cde5..8d9e8706b 100755 --- a/examples/simple-service-poc/simple_service.py +++ b/examples/simple-service-poc/simple_service.py @@ -59,8 +59,8 @@ async def get_payload(): async def start(self): # handler responsible for starting the service - self._ctx.deploy() - self._ctx.start() + async for script in super().start(): + yield script self._ctx.run(self.SIMPLE_SERVICE_CTL, "--start") yield self._ctx.commit() diff --git a/tests/goth_tests/test_instance_restart/requestor.py b/tests/goth_tests/test_instance_restart/requestor.py index 2ca49829d..f7c820dfa 100755 --- a/tests/goth_tests/test_instance_restart/requestor.py +++ b/tests/goth_tests/test_instance_restart/requestor.py @@ -39,8 +39,8 @@ async def start(self): global instances_started - self._ctx.deploy() - self._ctx.start() + async for script in super().start(): + yield script self._ctx.run("/bin/echo", "STARTING", str(instances_started + 1)) future_results = yield self._ctx.commit() diff --git a/yapapi/services.py b/yapapi/services.py index f121ffdb2..94e924f24 100644 --- a/yapapi/services.py +++ b/yapapi/services.py @@ -291,11 +291,11 @@ async def start(self): Therefore, this default implementation performs the minimum required for a VM payload to start responding to `run` commands. If your service requires any additional operations - - you'll need to override this method (possibly starting with yielding everything yielded by - `super().start()`) to add appropriate preparatory steps. + you'll need to override this method (possibly first yielding from the parent - `super().start()` - generator ) + to add appropriate preparatory steps. In case of runtimes other than VM, `deploy` and/or `start` might be optional or altogether - disallowed, plus `deploy`/`start` itself might take some parameters. It is up to the author of the + disallowed, plus `deploy`/`start` themselves may take some parameters. It is up to the author of the specific `Service` implementation that uses such a payload to adjust this method accordingly based on the requirements for the given runtime/exe-unit type. """ From 52c7b54deb31ccf1f7586c033db263e1bab7b809 Mon Sep 17 00:00:00 2001 From: Jan Betley Date: Tue, 14 Sep 2021 10:24:34 +0200 Subject: [PATCH 8/9] black --- yapapi/services.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yapapi/services.py b/yapapi/services.py index 94e924f24..357bcd217 100644 --- a/yapapi/services.py +++ b/yapapi/services.py @@ -291,7 +291,7 @@ async def start(self): Therefore, this default implementation performs the minimum required for a VM payload to start responding to `run` commands. If your service requires any additional operations - - you'll need to override this method (possibly first yielding from the parent - `super().start()` - generator ) + you'll need to override this method (possibly first yielding from the parent - `super().start()` - generator) to add appropriate preparatory steps. In case of runtimes other than VM, `deploy` and/or `start` might be optional or altogether From bc86baee32ea85bf3ff2f585f994787e818d8125 Mon Sep 17 00:00:00 2001 From: Jan Betley Date: Tue, 14 Sep 2021 13:05:28 +0200 Subject: [PATCH 9/9] Better implicit_init implementation in Executor --- yapapi/executor/__init__.py | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/yapapi/executor/__init__.py b/yapapi/executor/__init__.py index 55183fe62..46bfa8192 100644 --- a/yapapi/executor/__init__.py +++ b/yapapi/executor/__init__.py @@ -23,6 +23,7 @@ from yapapi.rest.activity import Activity from yapapi.script import Script from yapapi.engine import _Engine, Job +from yapapi.script.command import Deploy, Start import yapapi.utils from .task import Task, TaskStatus @@ -188,10 +189,6 @@ async def _worker( ) -> None: nonlocal job - if self._implicit_init: - work_context.deploy() - work_context.start() - with work_queue.new_consumer() as consumer: try: @@ -214,6 +211,12 @@ async def task_generator() -> AsyncIterator[Task[D, R]]: ) batch_generator = worker(work_context, task_generator()) + + if self._implicit_init: + await self._perform_implicit_init( + work_context, job.id, agreement.id, activity + ) + try: await self._engine.process_batches( job.id, agreement.id, activity, batch_generator @@ -368,3 +371,15 @@ async def worker_starter() -> None: logger.debug( "Got error when waiting for services to finish", exc_info=True, job_id=job.id ) + + async def _perform_implicit_init(self, ctx, job_id, agreement_id, activity): + async def implicit_init(): + script = ctx.new_script() + script.add(Deploy()) + script.add(Start()) + yield script + + try: + await self._engine.process_batches(job_id, agreement_id, activity, implicit_init()) + except StopAsyncIteration: + pass