Skip to content

Commit

Permalink
Move getMemoryAlreadyBeingRevoked to utility class
Browse files Browse the repository at this point in the history
so that it can be used by the PerQueryMemoryRevokingScheduler
  • Loading branch information
rschlussel committed Apr 23, 2021
1 parent 8269d72 commit 6c09beb
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.facebook.presto.memory.LocalMemoryManager;
import com.facebook.presto.memory.MemoryPool;
import com.facebook.presto.memory.MemoryPoolListener;
import com.facebook.presto.memory.TraversingQueryContextVisitor;
import com.facebook.presto.memory.VoidTraversingQueryContextVisitor;
import com.facebook.presto.operator.OperatorContext;
import com.facebook.presto.operator.PipelineContext;
Expand Down Expand Up @@ -49,6 +48,7 @@
import static com.facebook.presto.execution.MemoryRevokingUtils.getMemoryPools;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.TaskSpillingStrategy.PER_TASK_MEMORY_THRESHOLD;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.SECONDS;

Expand Down Expand Up @@ -226,37 +226,12 @@ private boolean memoryRevokingNeeded(MemoryPool memoryPool)

private long getMemoryAlreadyBeingRevoked(List<SqlTask> sqlTasks, long targetRevokingLimit)
{
TraversingQueryContextVisitor<Void, Long> visitor = new TraversingQueryContextVisitor<Void, Long>()
{
@Override
public Long visitOperatorContext(OperatorContext operatorContext, Void context)
{
if (operatorContext.isMemoryRevokingRequested()) {
return operatorContext.getReservedRevocableBytes();
}
return 0L;
}

@Override
public Long mergeResults(List<Long> childrenResults)
{
return childrenResults.stream()
.mapToLong(i -> i).sum();
}
};

long currentRevoking = 0;
for (SqlTask task : sqlTasks) {
Optional<TaskContext> taskContext = task.getTaskContext();
if (taskContext.isPresent()) {
currentRevoking += taskContext.get().accept(visitor, null);
if (currentRevoking > targetRevokingLimit) {
// Return early, target value exceeded and revoking will not occur
return currentRevoking;
}
}
}
return currentRevoking;
List<TaskContext> taskContexts = sqlTasks.stream()
.map(SqlTask::getTaskContext)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(toImmutableList());
return MemoryRevokingSchedulerUtils.getMemoryAlreadyBeingRevoked(taskContexts, targetRevokingLimit);
}

private void requestRevoking(MemoryPoolId memoryPoolId, ArrayList<SqlTask> sqlTasks, long remainingBytesToRevoke)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution;

import com.facebook.presto.memory.TraversingQueryContextVisitor;
import com.facebook.presto.operator.OperatorContext;
import com.facebook.presto.operator.TaskContext;

import java.util.Collection;
import java.util.List;

public class MemoryRevokingSchedulerUtils
{
private MemoryRevokingSchedulerUtils() {}

public static long getMemoryAlreadyBeingRevoked(Collection<TaskContext> taskContexts, long targetRevokingLimit)
{
TraversingQueryContextVisitor<Void, Long> visitor = new TraversingQueryContextVisitor<Void, Long>()
{
@Override
public Long visitOperatorContext(OperatorContext operatorContext, Void context)
{
if (operatorContext.isMemoryRevokingRequested()) {
return operatorContext.getReservedRevocableBytes();
}
return 0L;
}

@Override
public Long mergeResults(List<Long> childrenResults)
{
return childrenResults.stream()
.mapToLong(i -> i).sum();
}
};
long currentRevoking = 0;
for (TaskContext taskContext : taskContexts) {
currentRevoking += taskContext.accept(visitor, null);
if (currentRevoking > targetRevokingLimit) {
// Return early, target value exceeded and revoking will not occur
return currentRevoking;
}
}
return currentRevoking;
}
}

0 comments on commit 6c09beb

Please sign in to comment.