From 6c09bebb547099e6a781bedc7025212130dc2fb5 Mon Sep 17 00:00:00 2001 From: Rebecca Schlussel Date: Thu, 8 Apr 2021 14:04:43 -0400 Subject: [PATCH] Move getMemoryAlreadyBeingRevoked to utility class so that it can be used by the PerQueryMemoryRevokingScheduler --- .../execution/MemoryRevokingScheduler.java | 39 +++---------- .../MemoryRevokingSchedulerUtils.java | 57 +++++++++++++++++++ 2 files changed, 64 insertions(+), 32 deletions(-) create mode 100644 presto-main/src/main/java/com/facebook/presto/execution/MemoryRevokingSchedulerUtils.java diff --git a/presto-main/src/main/java/com/facebook/presto/execution/MemoryRevokingScheduler.java b/presto-main/src/main/java/com/facebook/presto/execution/MemoryRevokingScheduler.java index 62d14c92bec9..be1bddadaec5 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/MemoryRevokingScheduler.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/MemoryRevokingScheduler.java @@ -17,7 +17,6 @@ import com.facebook.presto.memory.LocalMemoryManager; import com.facebook.presto.memory.MemoryPool; import com.facebook.presto.memory.MemoryPoolListener; -import com.facebook.presto.memory.TraversingQueryContextVisitor; import com.facebook.presto.memory.VoidTraversingQueryContextVisitor; import com.facebook.presto.operator.OperatorContext; import com.facebook.presto.operator.PipelineContext; @@ -49,6 +48,7 @@ import static com.facebook.presto.execution.MemoryRevokingUtils.getMemoryPools; import static com.facebook.presto.sql.analyzer.FeaturesConfig.TaskSpillingStrategy.PER_TASK_MEMORY_THRESHOLD; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.ImmutableList.toImmutableList; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.SECONDS; @@ -226,37 +226,12 @@ private boolean memoryRevokingNeeded(MemoryPool memoryPool) private long getMemoryAlreadyBeingRevoked(List sqlTasks, long targetRevokingLimit) { - TraversingQueryContextVisitor visitor = new TraversingQueryContextVisitor() - { - @Override - public Long visitOperatorContext(OperatorContext operatorContext, Void context) - { - if (operatorContext.isMemoryRevokingRequested()) { - return operatorContext.getReservedRevocableBytes(); - } - return 0L; - } - - @Override - public Long mergeResults(List childrenResults) - { - return childrenResults.stream() - .mapToLong(i -> i).sum(); - } - }; - - long currentRevoking = 0; - for (SqlTask task : sqlTasks) { - Optional taskContext = task.getTaskContext(); - if (taskContext.isPresent()) { - currentRevoking += taskContext.get().accept(visitor, null); - if (currentRevoking > targetRevokingLimit) { - // Return early, target value exceeded and revoking will not occur - return currentRevoking; - } - } - } - return currentRevoking; + List taskContexts = sqlTasks.stream() + .map(SqlTask::getTaskContext) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(toImmutableList()); + return MemoryRevokingSchedulerUtils.getMemoryAlreadyBeingRevoked(taskContexts, targetRevokingLimit); } private void requestRevoking(MemoryPoolId memoryPoolId, ArrayList sqlTasks, long remainingBytesToRevoke) diff --git a/presto-main/src/main/java/com/facebook/presto/execution/MemoryRevokingSchedulerUtils.java b/presto-main/src/main/java/com/facebook/presto/execution/MemoryRevokingSchedulerUtils.java new file mode 100644 index 000000000000..cc404c81be77 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/execution/MemoryRevokingSchedulerUtils.java @@ -0,0 +1,57 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.execution; + +import com.facebook.presto.memory.TraversingQueryContextVisitor; +import com.facebook.presto.operator.OperatorContext; +import com.facebook.presto.operator.TaskContext; + +import java.util.Collection; +import java.util.List; + +public class MemoryRevokingSchedulerUtils +{ + private MemoryRevokingSchedulerUtils() {} + + public static long getMemoryAlreadyBeingRevoked(Collection taskContexts, long targetRevokingLimit) + { + TraversingQueryContextVisitor visitor = new TraversingQueryContextVisitor() + { + @Override + public Long visitOperatorContext(OperatorContext operatorContext, Void context) + { + if (operatorContext.isMemoryRevokingRequested()) { + return operatorContext.getReservedRevocableBytes(); + } + return 0L; + } + + @Override + public Long mergeResults(List childrenResults) + { + return childrenResults.stream() + .mapToLong(i -> i).sum(); + } + }; + long currentRevoking = 0; + for (TaskContext taskContext : taskContexts) { + currentRevoking += taskContext.accept(visitor, null); + if (currentRevoking > targetRevokingLimit) { + // Return early, target value exceeded and revoking will not occur + return currentRevoking; + } + } + return currentRevoking; + } +}