Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix passing command errors from the engine to worker functions #394

Merged
merged 3 commits into from
May 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions tests/executor/test_smartq.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,33 @@ async def invalid_worker(q):
print("w end")

assert outputs == {1, 2, 3}


@pytest.mark.asyncio
async def test_has_unassigned_items_doesnt_block():
"""Test if `has_unassigned_items` does not block if there are rescheduled items."""

loop = asyncio.get_event_loop()

async def blocking_task_source():
yield 1
await asyncio.sleep(10)
assert False, "Task iterator should not block"

q = SmartQueue(blocking_task_source())

async def worker():
with q.new_consumer() as con:
async for handle in con:
print("Item retrieved:", handle.data)
# This will yield control to the other worker
await asyncio.sleep(0.1)
await q.reschedule(handle)
# After the item is rescheduled, the second worker should
# pick it up, instead of waiting until `blocking()` fails.
print("Item rescheduled")
return

worker_1 = loop.create_task(worker())
worker_2 = loop.create_task(worker())
await asyncio.gather(worker_1, worker_2)
9 changes: 5 additions & 4 deletions yapapi/executor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ async def process_batches(
while True:

batch, exec_options = unpack_work_item(item)

# TODO: `task_id` should really be `batch_id`, but then we should also rename
# `task_id` field of several events (e.g. `ScriptSent`)
script_id = str(next(exescript_ids))
Expand Down Expand Up @@ -528,14 +529,14 @@ async def get_batch_results() -> List[events.CommandEvent]:
future_results = loop.create_future()
results = await get_batch_results()
future_results.set_result(results)
item = await command_generator.asend(future_results)
except StopAsyncIteration:
raise
except Exception:
except Exception as e:
# Raise the exception in `command_generator` (the `worker` coroutine).
# If the client code is able to handle it then we'll proceed with
# subsequent batches. Otherwise the worker finishes with error.
item = await command_generator.athrow(*sys.exc_info())
else:
item = await command_generator.asend(future_results)

Comment on lines +538 to +539
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't handle errors raised in command_generator.asend() as they include exceptions thrown in the worker function.

else:
# Schedule the coroutine in a separate asyncio task
future_results = loop.create_task(get_batch_results())
Expand Down
8 changes: 7 additions & 1 deletion yapapi/executor/_smartq.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,13 @@ async def has_unassigned_items(self) -> bool:
A queue has unassigned items iff the next call to `get()` will immediately return
some item, without waiting for an item that is currently "in progress" to be rescheduled.
"""
return await self.has_new_items() or bool(self._rescheduled_items)
while True:
if self._rescheduled_items:
return True
try:
return await asyncio.wait_for(self.has_new_items(), 1.0)
except asyncio.TimeoutError:
pass
Comment on lines +129 to +135
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of waiting indefinitely on potentialy blocking await self.has_new_items(), periodically check if there are self._rescheduled_items.


def new_consumer(self) -> "Consumer[Item]":
return Consumer(self)
Expand Down