Skip to content

Commit

Permalink
Handle activity creation failures in spawn_instance (#458)
Browse files Browse the repository at this point in the history
Co-authored-by: shadeofblue <[email protected]>
  • Loading branch information
azawlocki and shadeofblue authored Jun 17, 2021
1 parent 7beca1c commit 787ad78
Showing 1 changed file with 18 additions and 10 deletions.
28 changes: 18 additions & 10 deletions yapapi/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,10 +418,15 @@ async def spawn_instance(self) -> None:

logger.debug("spawning instance within %s", self)
spawned = False
agreement_id: Optional[str] # set in start_worker

async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) -> None:
nonlocal spawned

nonlocal agreement_id, spawned

agreement_id = agreement.id
self.emit(events.WorkerStarted(agr_id=agreement.id))

try:
act = await self._engine.create_activity(agreement.id)
except Exception:
Expand All @@ -430,12 +435,12 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) ->
agr_id=agreement.id, exc_info=sys.exc_info() # type: ignore
)
)
self.emit(events.WorkerFinished(agr_id=agreement.id))
raise

async with act:
spawned = True
self.emit(events.ActivityCreated(act_id=act.id, agr_id=agreement.id))

self._engine.accept_debit_notes_for_agreement(agreement.id)
work_context = WorkContext(
act.id, node_info, self._engine.storage_manager, emitter=self.emit
Expand Down Expand Up @@ -463,13 +468,6 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) ->
)
)
self.emit(events.WorkerFinished(agr_id=agreement.id))
except Exception:
self.emit(
events.WorkerFinished(
agr_id=agreement.id, exc_info=sys.exc_info() # type: ignore
)
)
raise
finally:
await self._engine.accept_payments_for_agreement(agreement.id)
await self._job.agreements_pool.release_agreement(
Expand All @@ -479,12 +477,22 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) ->
loop = asyncio.get_event_loop()

while not spawned:
agreement_id = None
await asyncio.sleep(1.0)
task = await self._job.agreements_pool.use_agreement(
lambda agreement, node: loop.create_task(start_worker(agreement, node))
)
if task:
if not task:
continue
try:
await task
except Exception:
if agreement_id:
self.emit(events.WorkerFinished(agr_id=agreement_id, exc_info=sys.exc_info()))
else:
# This shouldn't happen, we may log and return as well
logger.error("Failed to spawn instance", exc_info=True)
return

def stop_instance(self, service: Service):
"""Stop the specific service instance belonging to this Cluster."""
Expand Down

0 comments on commit 787ad78

Please sign in to comment.