diff --git a/fsspec/asyn.py b/fsspec/asyn.py index fb4e05e74..a040efc4b 100644 --- a/fsspec/asyn.py +++ b/fsspec/asyn.py @@ -239,20 +239,35 @@ async def _run_coros_in_chunks( batch_size = len(coros) assert batch_size > 0 - results = [] - for start in range(0, len(coros), batch_size): - chunk = [ - asyncio.Task(asyncio.wait_for(c, timeout=timeout)) - for c in coros[start : start + batch_size] - ] - if callback is not DEFAULT_CALLBACK: - [ - t.add_done_callback(lambda *_, **__: callback.relative_update(1)) - for t in chunk - ] - results.extend( - await asyncio.gather(*chunk, return_exceptions=return_exceptions), - ) + + async def _run_coro(coro, i): + try: + return await asyncio.wait_for(coro, timeout=timeout), i + except Exception as e: + if not return_exceptions: + raise + return e, i + finally: + callback.relative_update(1) + + i = 0 + n = len(coros) + results = [None] * n + pending = set() + + while pending or i < n: + while len(pending) < batch_size and i < n: + pending.add(asyncio.ensure_future(_run_coro(coros[i], i))) + i += 1 + + if not pending: + break + + done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) + while done: + result, k = await done.pop() + results[k] = result + return results