Skip to content

Commit

Permalink
Better implicit_init implementation in Executor
Browse files Browse the repository at this point in the history
  • Loading branch information
johny-b committed Sep 14, 2021
1 parent 52c7b54 commit bc86bae
Showing 1 changed file with 19 additions and 4 deletions.
23 changes: 19 additions & 4 deletions yapapi/executor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from yapapi.rest.activity import Activity
from yapapi.script import Script
from yapapi.engine import _Engine, Job
from yapapi.script.command import Deploy, Start
import yapapi.utils

from .task import Task, TaskStatus
Expand Down Expand Up @@ -188,10 +189,6 @@ async def _worker(
) -> None:
nonlocal job

if self._implicit_init:
work_context.deploy()
work_context.start()

with work_queue.new_consumer() as consumer:
try:

Expand All @@ -214,6 +211,12 @@ async def task_generator() -> AsyncIterator[Task[D, R]]:
)

batch_generator = worker(work_context, task_generator())

if self._implicit_init:
await self._perform_implicit_init(
work_context, job.id, agreement.id, activity
)

try:
await self._engine.process_batches(
job.id, agreement.id, activity, batch_generator
Expand Down Expand Up @@ -368,3 +371,15 @@ async def worker_starter() -> None:
logger.debug(
"Got error when waiting for services to finish", exc_info=True, job_id=job.id
)

async def _perform_implicit_init(self, ctx, job_id, agreement_id, activity):
async def implicit_init():
script = ctx.new_script()
script.add(Deploy())
script.add(Start())
yield script

try:
await self._engine.process_batches(job_id, agreement_id, activity, implicit_init())
except StopAsyncIteration:
pass

0 comments on commit bc86bae

Please sign in to comment.