-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add PER_QUERY_MEMORY_LIMIT spilling strategy #15898
Conversation
204edbc
to
ab7683b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this can revoke one query at a time, but it forgoes revoking many queries when the node is out of memory.
When something exceeds the memory limits are we blocking the driver?
If the node runs out of memory it still goes down the regular memory killer path right?
that's correct
the revoking happens right after memory reservation before the memory future is returned, so I guess so.
yeah, if the node is out of memory it goes through the regular memory killer path |
I'm working on fixing the test. The task executor looks like it is actually running these tasks somehow and so memory gets allocated for the query aside from what the test is explicitly setting. Depending on timing that causes some of the operators to start spilling before the test expects them to. |
{ | ||
try { | ||
QueryContext queryContext = queryContextSupplier.apply(queryId); | ||
long queryTotalMemoryUse = getTotalMemoryUse(queryContext); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this is a bit unfortunate in that every time a query's memory limit increases it will contend for the QueryContext
lock, the MemoryPool
lock, release the MemoryPool
lock, and then acquire the MemoryPool
lock two more times inside of MemoryPool#getTotalMemoryUse
.
Maybe MemoryPool
listener should accept the memory reservation values as an argument so that it doesn't have to call right back into the MemoryPool
to get them?
{ | ||
if (checkPending.getAndSet(false)) { | ||
// order tasks by decreasing revocable memory so that we revoke from the tasks with highest revocable memory first | ||
PriorityQueue<TaskContext> queryTaskContexts = new PriorityQueue<>(Comparator.comparing(TaskContext::getRevocableMemoryReservation).reversed()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth noting that TaskContext#getRevocableMemoryReservation
is not necessarily going to be stable between calls through the Comparator
(and will contend for locks on each access). You might consider capturing a snapshot of these values before sorting them into the priority queue or maybe skipping the sort altogether (how often does a task have a non-zero but trivial amount of memory to revoke so as to not be a NOOP but still be a waste of time to revoke out of order?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept an ordering so we know what order to expect revoking for testing purposes, but changed to order by taskId which shouldn't require a lock.
// Technically not thread safe but should be fine since we only call this on PostConstruct and PreDestroy. | ||
// PreDestroy isn't called until server shuts down/ in between tests. | ||
@Nullable | ||
private ScheduledFuture<?> scheduledFuture; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be missing something, but this field does not appear to ever be assigned to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yes, thanks. I copied this class from the TaskThresholdMemoryRevokingScheduler but got rid of the regular asynchronous checks since they didn't seem useful.
12ec118
to
a587e78
Compare
So my point in mentioning those three things is that # 1 is that we don't really want to lose revocation due to aggregate memory pressure. We don't want to hit the cluster OOM killer unnecessarily Additionally the driver won't block if it exceeds its revokable limit it just triggers revocation? |
I agree and can add that as a follow up (it also doesn't exist for the TaskThresholdMemoryRevokingScheduler).
yes, but the OOM killer kills only after a fixed delay, so I don't expect it to be so common that we'd trigger the oom killer before revoking.
Yes, that's true. Revocation will be triggered immediately but the driver won't block. Should I make it blocking (i.e. directly call revokeMemory vs. using the taskManagementExecutor)? |
The OOM killer runs randomly (relative to revocation timing) so it's random? And revocation is slow because it's all disk IO? I would kind of expect under pressure it starts killing things and in batch we kind of want to have periods of heavy pressure because it's throughput oriented, but I could completely be wrong here and it works just fine.
I don't want to dictate how it blocks, but I think yes if there is a memory limit it should probably block in the driver loop. I think we seem to have a pattern where we don't just block the threads inside the actual processing we wait for them to come back to the main driver loop? Not 100% sure why. |
It's a fixed delay from when we notice we're out of memory (which only happens when we reserve new memory), so it's not quite random. but actually, i just checked and the oom killer excludes revocable bytes, so only considers a node blocked if the non-revocable bytes exceeds the pool size. That probably actually makes it more important to spill whenever the pool is full. |
the test is fixed. The other random test failure is unrelated. |
Oh interesting, good find. I had a hard time understanding how blocked nodes is actually determined WRT to revocable bytes. |
presto/presto-main/src/main/java/com/facebook/presto/memory/ClusterMemoryPool.java Lines 157 to 159 in 2ad67dc
|
valueOrZero(queryMemoryReservations.get(queryId)) + valueOrZero(queryMemoryRevocableReservations.get(queryId)))); | ||
} | ||
|
||
private long valueOrZero(Long l) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could be static
listeners.forEach(listener -> listener.onMemoryReserved( | ||
this, | ||
queryId, | ||
valueOrZero(queryMemoryReservations.get(queryId)) + valueOrZero(queryMemoryRevocableReservations.get(queryId)))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a big deal since we only expect on listener to be registered, but to be safe you probably want to hoist the memory reservation computation out of the loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, why not queryMemoryReservations.getOrDefault(queryId, 0L)
?
return; | ||
} | ||
|
||
if (checkPending.compareAndSet(false, true)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this is only going to allow one query to be revoked at a time and would permit queries to skip revoking if another unrelated query is being revoked. One option I suppose would be to use a concurrent set of query Id's being revoked, but another might be to enforce this whole thing inside of the QueryContext
itself since you could trigger that reliably while holding the lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a great catch. To clarify your suggestion, do you mean checkPending should go in QueryContext
or the whole revokeMemory()
method?
a587e78
to
06a441d
Compare
} | ||
|
||
log.debug("Scheduling check for %s", queryId); | ||
scheduleRevoking(queryContext, queryTotalMemoryUse, maxTotalMemory); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How likely are we to repeatedly revoke the same query because when we schedule revoking we don't check if we already attempted revoking?
And filtering out so we have less of those is also pretty sketchy because it's a lost wakeup scenario if we rely on having revocation take place to make progress.
The whole arrangement is actually a bit sketchy when you don't have a periodic check to unblock things.
The way this arrangement normally works safely is that when process the revocation task we first do the revocation, then check if revocation is necessary/would be beneficial and go do it again and keep looping until we have resolved the "needs revocation" condition.
The trick is that we have do the check for "needs more revocation" after we have allowed the submission of new revocation tasks in order to avoid lost wakeups.
Hope this actually make sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, just spoke about this with @pettyjamesm and decided to remove the checkPending setting. It wasn't working as intended, and if you're allocating more memory, it's not unlikely that you do need to revoke more. However, you'll probably over-revoke since the queryTotalMemoryUse won't be correct (doesn't take into account memory that's been revoked in the meantime).
I could change to make it allow only one query at a time, and then deal with the issue of possible not revoking when you need to.
Possible ways to deal with not revoking memory when you should if I do change it.
- Adding the periodic check like the other spill strategies have. The other memory revoking strategies use this, but it always seemed pretty hacky to me (and we'd have to add calls to get the memory reservation for each query, so that seems strictly worse than option 2)
- When you do revoke for a query, do it on a loop as you suggested until there is nothing else to be done. The downside is that we'd have to call for the queryTotalMemory usage from the memory pool in the loop during revokeMemory(). We just got rid of that to avoid extra locking.
- wait during memory reservation if there's memory revocation scheduling in progress for the query. we'd have to be super careful about how the locking would work though.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re # 1
I think you don't need to it to cover for lost wakeups. It's more useful for triggering revocation across multiple queries, but really do you even need it for that because we do basically global memory accounting anyways and so we know if the node is blocked (or approaching it) and memory revocation is needed?
Re # 2
The loop will usually only have to run once, and only when revocation needs to occur. It's relatively rare so I would be concerned about the extra locking.
I think @pettyjamesm was concerned about what happens in the non-revocation required case not the revocation required case. 99% of the time it's non-revocation and we want to avoid acquiring the lock, but the other 1% (or 0.1%) it's probably fine.
My concern about extra revocation is that it means we will create many small files for something like aggregation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @pettyjamesm was concerned about what happens in the non-revocation required case not the revocation required case. 99% of the time it's non-revocation and we want to avoid acquiring the lock, but the other 1% (or 0.1%) it's probably fine.
Correct. I think a looping revoke / check for more revoking strategy that makes sure that no more than one revoke per query is in flight at once is probably not too big of a deal (famous last words).
My concern about extra revocation is that it means we will create many small files for something like aggregation.
That hadn't occurred to me, but does sound like a potential cause for concern. That seems to be unavoidable regardless of whether the revokes are concurrent or looping per query though, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as per discussion, updated to recheck the total memory reservation before starting revoking, so we don't do extra revocation. No need for a loop.
06a441d
to
8ccc9a8
Compare
8ccc9a8
to
e8064f0
Compare
synchronized void revokeMemory(QueryContext queryContext, long maxTotalMemory) | ||
{ | ||
// get a fresh value for queryTotalMemory in case it's changed (e.g. by a previous revocation request) | ||
long queryTotalMemory = getTotalQueryMemoryReservation(queryContext); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this actually reflect in progress revocations bytes? I think this will return all the bytes that are already being revoked if I am reading OperatorContext.requestMemoryRevoking
correctly.
I think the behavior we want is to fully check the entire condition from onMemoryReserved
as if any in-flight revocations had already completed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we (at least I) thought that QueryLimitMemoryRevokingScheduler#revokeMemory(QueryContext, long)
being synchronized
was enough to ensure that concurrent revokes wouldn't happen, but you're right- looking at OperatorContext#requestMemoryRevoking()
indicates that the actual revoke is performed by the driver loop asynchronously and could very well still be in progress.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The synchronized
just guarantees that nothing else is requesting revocation at the same time as you are. As for the requested revocations not being completed, I don't think it matters because we revoke in the same task order, so if the revocable memory is still being included, it would be included in both the total memory and the number of revoked bytes (possibly with some slight differences if some extra memory got actually revoked between when we calculated the total memory and called requestMemoryRevoke). It would just mean that an operator can get requestMemoryRevoking multiple times. It won't have any effect though.
This is the same as for the other memory revoking strategies where you can have sequential memory reservations before the memory is actually revoked. Looking closer i'm actually sure what "checkPending" gets you for those other strategies. It doesn't even wait for the revocation loop to complete before setting checkPending to false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, just double checked, and requestMemoryRevoke returns 0 bytes if memory revoking has already been requested. I'm not sure if that makes sense. Maybe I should change it to return the number of bytes still to be revoked, even if it's in process.
It's still the case, that I think the other strategies have the exact same issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So confusing :-) OK so it will do the revocation, but it's mostly a no-op unless previous revocations have completed and it can trigger a new revocation. But if it triggers a new revocation it will probably be based on the number of bytes currently needing to be revoked so I think it's fine?
Also on this pass of review I am now noticing that it's revoking tasks based on task id instead of how many bytes would be freed up which is possibly less efficient since you could end up spilling things like joins with very small builds sides and then you need to spill the entire probe when it wasn't actually necessary.
To solve the stability issue maybe just put them in a tree map based on the size and use that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol #15898 (comment). but also fair point about the joins with small build sides and giant probes. I'll switch to a tree map ordered by size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Heh. The actual optimal choice of what to spill is probably something like "prefer to spill aggregation unless there is a build side that is greater than some threshold to spill". But that is a wild guess and to do this optimally would be a project in and of itself.
e8064f0
to
be8d433
Compare
I added a commit that changes requestMemoryRevoking() to return |
The test has become flaky. I ran 100 invocations locally and it failed 20-30% of the time. I believe it's because we end up with a few revokeMemory() calls queued up (because of the random other memory allocations that occur in the test tasks outside of what we set). One of them will get the totalMemoryLimit before we reset the memory for the task we revoked, but the actual memory revocation request will happen after that memory has been freed. This is causing additional spilling that we did not expect. Which suggests that the same could happen in production too. We may have to switch to one scheduleRevoke per-query + some kind of loop to check if we're done. |
be8d433
to
05b5267
Compare
Updated with the following changes:
|
05b5267
to
a6a85a6
Compare
Updated with: |
a6a85a6
to
a16c8d0
Compare
@pettyjamesm @aweisberg this is ready for review again |
{ | ||
QueryId queryId = queryContext.getQueryId(); | ||
MemoryPool memoryPool = queryContext.getMemoryPool(); | ||
while (!revocationRequestedByQuery.remove(queryId, false)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this ordering works and removes duplicate revocation attempts.
What is the strategy around the node being blocked and needing revocation because it is out of memory? Say N queries each below the revocation limit, and N queries that have no revokable memory, but need to allocate? There will be no revocation until the N queries below the revocation limit hit the revocation limit for their particular query?
I know we talked about this, but I don't know what our final strategy was?
It seems to me like it's two parallel set of revocation conditions and actions. One is to revoke across queries and one is to revoke individual queries based on their own memory usage in isolation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, the TaskThresholdMemoryRevokingScheduler has the same problem. The plan is to have a follow up PR that allows enabling both strategies (query limit based and memory pool based).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah OK. So we would have multiple strategies executed in sequence? I am not sure that is my favorite approach. Between DRY and having the implementation self-contained I feel like I would prefer the latter. Another reason is to reduce the repetition fetching values needed to determine what to do.
Another weirdness there is task execution order, but maybe they have a shared executor so it is a consistent order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll look more into what's the right way to combine them.
private void onMemoryReserved(MemoryPool memoryPool, QueryId queryId, long queryTotalMemoryUse) | ||
{ | ||
try { | ||
QueryContext queryContext = queryContextSupplier.apply(queryId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably worth checking for null
here instead of letting this blow up with an NPE
if, for whatever reason, the query context mapping isn't found.
scheduleRevoking(queryContext, maxTotalMemory); | ||
} | ||
} | ||
catch (Throwable e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might consider catching Exception
here instead, catching Throwable
can accidentally prevent Error
from propagating upwards and being fatal even when it should be (eg: OutOfMemoryError
, NoSuchMethodError
, etc).
try { | ||
revokeMemory(queryContext, maxTotalMemory); | ||
} | ||
catch (Throwable e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto here about Exception
vs Throwable
.
QueryId queryId = queryContext.getQueryId(); | ||
MemoryPool memoryPool = queryContext.getMemoryPool(); | ||
while (!revocationRequestedByQuery.remove(queryId, false)) { | ||
revocationRequestedByQuery.put(queryId, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking about this for ~5 minutes, I'm pretty sure that there isn't a meaningful race or leak condition between putting true
into the map originally, the loop conditional, and this put of false
. Probably merits a comment to describe what's going on with that while (!revocationRequestedByQuery.remove(queryId, false)) {
statement though.
Simplify to a functional interface and add queryId and memoryReservation arg in preparation for per-query spill strategy.
Simplify to functional interface, remove unused parameter, and make clearer that it's only called on revocable memory reservation.
so that it can be used by the PerQueryMemoryRevokingScheduler
Add spilling strategy that will spill whenever the user + system + revocabble memory for a query on a node exceeds the maxTotalMemoryPerNode. This allows queries to have consistent behavior regardless of the cluster load, while also ensuring that queries don't use more memory than the cluster can support.
Catch Exception instead of Throwable to make sure Errors that should be fatal get propagated (e.g. OutOfMemoryError, etc.)
5252992
to
0d5df77
Compare
@pettyjamesm addressed comments. Also switched the other MemoryRevokingSchedulers to catch Exception instead of Throwable for the same reason. |
Test plan - new unit test