Skip to content

Commit

Permalink
Merge pull request #132 from golemfactory/hotfix/118-retrieve-task-ex…
Browse files Browse the repository at this point in the history
…ceptions

Retrieve task exceptions before exiting Executor.submit()
  • Loading branch information
azawlocki authored Nov 19, 2020
2 parents a52e427 + 3fed2cc commit e53fcca
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 38 deletions.
88 changes: 52 additions & 36 deletions yapapi/executor/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
An implementation of the new Golem's task executor.
"""
from asyncio import CancelledError
from datetime import datetime, timedelta, timezone
from decimal import Decimal
import os
Expand Down Expand Up @@ -397,17 +398,26 @@ async def worker_starter() -> None:
process_invoices_job = loop.create_task(process_invoices())
wait_until_done = loop.create_task(work_queue.wait_until_done())
# Py38: find_offers_task.set_name('find_offers_task')

get_offers_deadline = datetime.now(timezone.utc) + self._conf.get_offers_timeout
get_done_task: Optional[asyncio.Task] = None
initial_services = {
find_offers_task,
loop.create_task(worker_starter()),
process_invoices_job,
wait_until_done,
}
services = initial_services.copy()

try:
get_done_task: Optional[asyncio.Task] = None
services = {
find_offers_task,
loop.create_task(worker_starter()),
process_invoices_job,
wait_until_done,
}
async def _cancel_tasks_silently() -> None:
for task in initial_services.union(services).union(workers):
try:
task.cancel()
await asyncio.wait_for(task, timeout=0.1)
except (Exception, CancelledError, KeyboardInterrupt) as e:
pass

try:
while wait_until_done in services or not done_queue.empty():

now = datetime.now(timezone.utc)
Expand Down Expand Up @@ -451,38 +461,44 @@ async def worker_starter() -> None:

emit(events.ComputationFinished())

except Exception as e:
if (
not isinstance(e, (KeyboardInterrupt, asyncio.CancelledError))
and self._conf.traceback
):
traceback.print_exc()
except (Exception, CancelledError, KeyboardInterrupt) as e:

emit(events.ComputationFinished(exc_info=sys.exc_info())) # type: ignore
if not isinstance(e, (CancelledError, KeyboardInterrupt)):
raise

finally:
payment_closing = True
find_offers_task.cancel()

try:
if workers:
for worker_task in workers:
worker_task.cancel()
await asyncio.wait(workers, timeout=15, return_when=asyncio.ALL_COMPLETED)
except Exception:
if self._conf.traceback:
traceback.print_exc()

for worker_task in workers:
worker_task.cancel()
await asyncio.wait(
workers.union({find_offers_task, process_invoices_job}),
timeout=5,
return_when=asyncio.ALL_COMPLETED,
)
payment_closing = True
if agreements_to_pay:
await asyncio.wait(
{process_invoices_job}, timeout=15, return_when=asyncio.ALL_COMPLETED
)
payment_closing = True
find_offers_task.cancel()
try:
if workers:
for worker_task in workers:
worker_task.cancel()
await asyncio.wait(workers, timeout=15, return_when=asyncio.ALL_COMPLETED)
except Exception:
if self._conf.traceback:
traceback.print_exc()

for worker_task in workers:
worker_task.cancel()
await asyncio.wait(
workers.union({find_offers_task, process_invoices_job}),
timeout=5,
return_when=asyncio.ALL_COMPLETED,
)
finally:
await _cancel_tasks_silently()

try:
payment_closing = True
if agreements_to_pay:
await asyncio.wait(
{process_invoices_job}, timeout=15, return_when=asyncio.ALL_COMPLETED
)
finally:
await _cancel_tasks_silently()

async def _create_allocations(self) -> rest.payment.MarketDecoration:
if not self._budget_allocations:
Expand Down
4 changes: 2 additions & 2 deletions yapapi/rest/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
# w/o for some buggy providers which do not kill exe-unit
# on destroy_activity event.
if exc_type:
_log.info(
_log.debug(
"activity %s CLOSE for [%s] %s",
self._id,
exc_type.__name__,
Expand Down Expand Up @@ -112,7 +112,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
with contextlib.suppress(yexc.ApiException):
await self._api.destroy_activity(self._id)
if exc_type:
_log.info("activity %s CLOSE done", self._id)
_log.debug("activity %s CLOSE done", self._id)


@dataclass
Expand Down

0 comments on commit e53fcca

Please sign in to comment.