Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit coroutines using a pool instead of chunks #1544

Merged
merged 4 commits into from
Mar 15, 2024

Conversation

sisp
Copy link
Contributor

@sisp sisp commented Mar 13, 2024

I've changed the implementation of the function _run_coros_in_chunks() by using a coroutine pool instead of running coroutines in chunks.

The goal of this function is to limit the number of simultaneous coroutines, but chunking may lead to inferior throughput when coroutines have different execution times, as the next chunk will run only once all coroutines in the previous chunk have completed. In contrast, a coroutine pool will run the next coroutine in the queue as soon as any of the running coroutines completes.

fsspec/asyn.py Outdated Show resolved Hide resolved
@sisp
Copy link
Contributor Author

sisp commented Mar 13, 2024

The check CI / gcsfs-pytest is failing with this error:

FAILED gcsfs/gcsfs/tests/test_core.py::test_sign - ImportError: cannot import name 'storage' from 'google.cloud' (unknown location)

But I doubt it's related to this PR.

fsspec/asyn.py Outdated
semaphore = asyncio.BoundedSemaphore(batch_size)

async def _worker(coro):
async with semaphore:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this approach, I agree that a limited number of IO tasks are concurrently running/waiting, but is there any potential downside of simply having a very large number of tasks created and waiting on this semaphore? The previous incantation would only have the given number of top-level tasks at a time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I'm not aware of any downsides but can't prove there aren't any. I was also considering to use asyncio.as_completed and pass an iterable of asyncio.Tasks, but it also materializes this iterator as a set internally.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave this question open a little while, to see if any knowledgable person comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I've used this implementation to limit the simultaneous execution of 70,000 coroutines and it worked fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might have a point: https://stackoverflow.com/a/62404509 I wonder whether we could use the shown queue-based approach instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've pushed an implementation based on asyncio.wait inspired by https://death.andgravity.com/limit-concurrency#asyncio-wait. I think this looks good and tests are passing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think the linked solution may be more efficient. Does this solution run into the issue that the next batch does not start until the current batch finishes? That means with large batch size, future workers will be stalled waiting on the slowest worker?

Or am I misunderstanding the current batch async method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this solution run into the issue that the next batch does not start until the current batch finishes? That means with large batch size, future workers will be stalled waiting on the slowest worker?

Correct, the slowest coroutine in a batch determines the runtime of the whole batch and the next batch won't start before all coroutines in the current batch have finished.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think some of the solutions I linked would have alleviated that issue so it could have eagerly continued the next batch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The solution that @martindurant merged is essentially https://death.andgravity.com/limit-concurrency#asyncio-wait, which you linked.

@sisp sisp force-pushed the perf/coroutine-pool branch from bbb8b3d to 5890050 Compare March 15, 2024 09:38
@sisp sisp requested a review from martindurant March 15, 2024 09:47
fsspec/asyn.py Show resolved Hide resolved
@martindurant martindurant merged commit f2f4c26 into fsspec:master Mar 15, 2024
10 checks passed
@sisp sisp deleted the perf/coroutine-pool branch March 15, 2024 19:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants