-
-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AsyncBuffer #88
AsyncBuffer #88
Conversation
crusaderky
commented
Mar 21, 2023
•
edited
Loading
edited
- Partially closes [WIP] Asynchronous SpillBuffer distributed#7686
bd957ed
to
2bcd214
Compare
2bcd214
to
bda0501
Compare
bda0501
to
426ca0b
Compare
1c6518d
to
c009687
Compare
f66d9f8
to
67d1ec2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly nits. I think the one thing I care about is the add_done_callback question. Most other things are rather an API design question but that concerns functionality that might cause surprising behavior
zict/async_buffer.py
Outdated
if self.evicting is not None: | ||
weight = min(weight, self.evicting) | ||
if weight <= n: | ||
return | ||
|
||
self.evicting = n | ||
|
||
def done(_: asyncio.Future) -> None: | ||
self.evicting = None | ||
|
||
# Note: this can get cancelled by LRU.close(), which in turn is | ||
# triggered by Buffer.close() | ||
future = self._offload(self.evict_until_below_target, n) | ||
future.add_done_callback(done) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC this is possibly called multiple times (even if it is locked) but the done callbacks of concurrently running evict_untiL_below_target
would overwrite / first one would already reset evicting. Is this intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, fixed
zict/async_buffer.py
Outdated
if self.executor is None: | ||
self.executor = ThreadPoolExecutor( | ||
1, thread_name_prefix="zict.Buffer offloader" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: From an API POV I would suggest to allow the init to take an TPE and if none is provided, we'll initialize it during initialization of the class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
def async_get( | ||
self, keys: Collection[KT], missing: Literal["raise", "omit"] = "raise" | ||
) -> asyncio.Future[dict[KT, VT]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly curious, I don't think we should change this now but why is the API accepting multiple keys? is this somehow a requirement or an optimization?
The reason why I'm asking is because I think this drives some complexity and I would expect missing
not to be required otherwise. I'm not entirely sure about this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's an optimization to avoid having an extremely long queue to the thread pool, with each individual key incurring in thread synchronization overhead.
# Do not pull keys towards the top of the LRU unless they are all available. | ||
# This matters when there is a very long queue of async_get futures. | ||
d = self.fast.get_all_or_nothing(keys) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strictly speaking, fast doesn't need to be a LRU, does it? In this case, I suspect we'll get an AttributeError
here, won't we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Buffer.fast is always an LRU:
Lines 79 to 85 in b96afc4
self.fast = LRU( | |
n, | |
fast, | |
weight=weight, | |
on_evict=[self.fast_to_slow], | |
on_cancel_evict=[self._cancel_evict], | |
) |
@fjetter all review comments have been addressed |
Ouch. That was a merge without squash. I force-pushed to fix it; could we force squashing in the repo like we already did in distributed? |