diff --git a/yapapi/executor/__init__.py b/yapapi/executor/__init__.py index 55183fe62..46bfa8192 100644 --- a/yapapi/executor/__init__.py +++ b/yapapi/executor/__init__.py @@ -23,6 +23,7 @@ from yapapi.rest.activity import Activity from yapapi.script import Script from yapapi.engine import _Engine, Job +from yapapi.script.command import Deploy, Start import yapapi.utils from .task import Task, TaskStatus @@ -188,10 +189,6 @@ async def _worker( ) -> None: nonlocal job - if self._implicit_init: - work_context.deploy() - work_context.start() - with work_queue.new_consumer() as consumer: try: @@ -214,6 +211,12 @@ async def task_generator() -> AsyncIterator[Task[D, R]]: ) batch_generator = worker(work_context, task_generator()) + + if self._implicit_init: + await self._perform_implicit_init( + work_context, job.id, agreement.id, activity + ) + try: await self._engine.process_batches( job.id, agreement.id, activity, batch_generator @@ -368,3 +371,15 @@ async def worker_starter() -> None: logger.debug( "Got error when waiting for services to finish", exc_info=True, job_id=job.id ) + + async def _perform_implicit_init(self, ctx, job_id, agreement_id, activity): + async def implicit_init(): + script = ctx.new_script() + script.add(Deploy()) + script.add(Start()) + yield script + + try: + await self._engine.process_batches(job_id, agreement_id, activity, implicit_init()) + except StopAsyncIteration: + pass