From 03c5f55b28a8a78fafd88616a294289edd4b1759 Mon Sep 17 00:00:00 2001 From: azawlocki Date: Fri, 21 May 2021 15:30:49 +0200 Subject: [PATCH 1/2] Move command_generator.asend() outside of try...except in process_batches --- yapapi/executor/__init__.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/yapapi/executor/__init__.py b/yapapi/executor/__init__.py index 013aa4e66..681a1c3f5 100644 --- a/yapapi/executor/__init__.py +++ b/yapapi/executor/__init__.py @@ -479,6 +479,7 @@ async def process_batches( while True: batch, exec_options = unpack_work_item(item) + # TODO: `task_id` should really be `batch_id`, but then we should also rename # `task_id` field of several events (e.g. `ScriptSent`) script_id = str(next(exescript_ids)) @@ -528,14 +529,14 @@ async def get_batch_results() -> List[events.CommandEvent]: future_results = loop.create_future() results = await get_batch_results() future_results.set_result(results) - item = await command_generator.asend(future_results) - except StopAsyncIteration: - raise - except Exception: + except Exception as e: # Raise the exception in `command_generator` (the `worker` coroutine). # If the client code is able to handle it then we'll proceed with # subsequent batches. Otherwise the worker finishes with error. item = await command_generator.athrow(*sys.exc_info()) + else: + item = await command_generator.asend(future_results) + else: # Schedule the coroutine in a separate asyncio task future_results = loop.create_task(get_batch_results()) From 9cbee0f1e473cb14d6e85701e09bc0293cdffe16 Mon Sep 17 00:00:00 2001 From: azawlocki Date: Mon, 24 May 2021 09:33:58 +0200 Subject: [PATCH 2/2] Don't wait for new items if there are rescheduled ones in SmartQueue --- tests/executor/test_smartq.py | 30 ++++++++++++++++++++++++++++++ yapapi/executor/_smartq.py | 8 +++++++- 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/tests/executor/test_smartq.py b/tests/executor/test_smartq.py index 2b7a139a8..bad82a4f7 100644 --- a/tests/executor/test_smartq.py +++ b/tests/executor/test_smartq.py @@ -109,3 +109,33 @@ async def invalid_worker(q): print("w end") assert outputs == {1, 2, 3} + + +@pytest.mark.asyncio +async def test_has_unassigned_items_doesnt_block(): + """Test if `has_unassigned_items` does not block if there are rescheduled items.""" + + loop = asyncio.get_event_loop() + + async def blocking_task_source(): + yield 1 + await asyncio.sleep(10) + assert False, "Task iterator should not block" + + q = SmartQueue(blocking_task_source()) + + async def worker(): + with q.new_consumer() as con: + async for handle in con: + print("Item retrieved:", handle.data) + # This will yield control to the other worker + await asyncio.sleep(0.1) + await q.reschedule(handle) + # After the item is rescheduled, the second worker should + # pick it up, instead of waiting until `blocking()` fails. + print("Item rescheduled") + return + + worker_1 = loop.create_task(worker()) + worker_2 = loop.create_task(worker()) + await asyncio.gather(worker_1, worker_2) diff --git a/yapapi/executor/_smartq.py b/yapapi/executor/_smartq.py index 710fb11c4..bd5e592d5 100644 --- a/yapapi/executor/_smartq.py +++ b/yapapi/executor/_smartq.py @@ -126,7 +126,13 @@ async def has_unassigned_items(self) -> bool: A queue has unassigned items iff the next call to `get()` will immediately return some item, without waiting for an item that is currently "in progress" to be rescheduled. """ - return await self.has_new_items() or bool(self._rescheduled_items) + while True: + if self._rescheduled_items: + return True + try: + return await asyncio.wait_for(self.has_new_items(), 1.0) + except asyncio.TimeoutError: + pass def new_consumer(self) -> "Consumer[Item]": return Consumer(self)