diff --git a/yapapi/executor/__init__.py b/yapapi/executor/__init__.py index 013aa4e66..681a1c3f5 100644 --- a/yapapi/executor/__init__.py +++ b/yapapi/executor/__init__.py @@ -479,6 +479,7 @@ async def process_batches( while True: batch, exec_options = unpack_work_item(item) + # TODO: `task_id` should really be `batch_id`, but then we should also rename # `task_id` field of several events (e.g. `ScriptSent`) script_id = str(next(exescript_ids)) @@ -528,14 +529,14 @@ async def get_batch_results() -> List[events.CommandEvent]: future_results = loop.create_future() results = await get_batch_results() future_results.set_result(results) - item = await command_generator.asend(future_results) - except StopAsyncIteration: - raise - except Exception: + except Exception as e: # Raise the exception in `command_generator` (the `worker` coroutine). # If the client code is able to handle it then we'll proceed with # subsequent batches. Otherwise the worker finishes with error. item = await command_generator.athrow(*sys.exc_info()) + else: + item = await command_generator.asend(future_results) + else: # Schedule the coroutine in a separate asyncio task future_results = loop.create_task(get_batch_results())