Skip to content

Commit

Permalink
Delay agreement termination until activity is destroyed
Browse files Browse the repository at this point in the history
  • Loading branch information
azawlocki committed Apr 2, 2021
1 parent 82fd6e3 commit 527d59c
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 8 deletions.
8 changes: 1 addition & 7 deletions yapapi/executor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,8 +469,6 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) ->
)
)
emit(events.WorkerFinished(agr_id=agreement.id))
# Don't reuse the agreement after activity creation failed
await agreements_pool.release_agreement(agreement.id, allow_reuse=False)
raise
async with act:
emit(events.ActivityCreated(act_id=act.id, agr_id=agreement.id))
Expand Down Expand Up @@ -531,11 +529,7 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) ->
agr_id=agreement.id, exc_info=sys.exc_info() # type: ignore
)
)
# Don't reuse the agreement after failure
await agreements_pool.release_agreement(
agreement.id, allow_reuse=False
)
return
raise

await accept_payment_for_agreement(agreement.id)
emit(events.WorkerFinished(agr_id=agreement.id))
Expand Down
4 changes: 3 additions & 1 deletion yapapi/executor/agreements_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ async def cycle(self):
continue
task = buffered_agreement.worker_task
if task is not None and task.done():
await self.release_agreement(buffered_agreement.agreement.id)
await self.release_agreement(
buffered_agreement.agreement.id, allow_reuse=task.exception() is None
)

async def add_proposal(self, score: float, proposal: OfferProposal) -> None:
"""Adds providers' proposal to the pool of available proposals"""
Expand Down
7 changes: 7 additions & 0 deletions yapapi/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,13 @@ def _handle(self, event: events.Event):
reason = str(exc) or repr(exc) or "unexpected error"
self.logger.error("Error when shutting down Executor: %s", reason)

elif isinstance(event, events.AgreementTerminated):
if event.reason.get("golem.requestor.code") == "Success":
pass
else:
prov_info = self.agreement_provider_info[event.agr_id]
self.logger.info(f"Terminated agreement with {prov_info.name}")


def log_summary(wrapped_emitter: Optional[Callable[[events.Event], None]] = None):
"""Output a summary of computation.
Expand Down

0 comments on commit 527d59c

Please sign in to comment.