Skip to content

Commit

Permalink
Skip coordinator in getWorkerMemoryInfo if not included in task proce…
Browse files Browse the repository at this point in the history
…ssing
  • Loading branch information
losipiuk committed Aug 20, 2024
1 parent 8edf5b4 commit 5583b5f
Showing 1 changed file with 8 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.trino.execution.StageInfo;
import io.trino.execution.TaskId;
import io.trino.execution.TaskInfo;
import io.trino.execution.scheduler.NodeSchedulerConfig;
import io.trino.memory.LowMemoryKiller.ForQueryLowMemoryKiller;
import io.trino.memory.LowMemoryKiller.ForTaskLowMemoryKiller;
import io.trino.memory.LowMemoryKiller.RunningQueryInfo;
Expand Down Expand Up @@ -100,6 +101,7 @@ public class ClusterMemoryManager
private final JsonCodec<MemoryInfo> memoryInfoCodec;
private final DataSize maxQueryMemory;
private final DataSize maxQueryTotalMemory;
private final boolean includeCoordinator;
private final List<LowMemoryKiller> lowMemoryKillers;
private final AtomicLong totalAvailableProcessors = new AtomicLong();
private final AtomicLong clusterUserMemoryReservation = new AtomicLong();
Expand Down Expand Up @@ -129,7 +131,8 @@ public ClusterMemoryManager(
@ForTaskLowMemoryKiller LowMemoryKiller taskLowMemoryKiller,
@ForQueryLowMemoryKiller LowMemoryKiller queryLowMemoryKiller,
ServerConfig serverConfig,
MemoryManagerConfig config)
MemoryManagerConfig config,
NodeSchedulerConfig nodeSchedulerConfig)
{
checkState(serverConfig.isCoordinator(), "ClusterMemoryManager must not be bound on worker");

Expand All @@ -145,6 +148,7 @@ public ClusterMemoryManager(
queryLowMemoryKiller);
this.maxQueryMemory = config.getMaxQueryMemory();
this.maxQueryTotalMemory = config.getMaxQueryTotalMemory();
this.includeCoordinator = nodeSchedulerConfig.isIncludeCoordinator();

verify(maxQueryMemory.toBytes() <= maxQueryTotalMemory.toBytes(),
"maxQueryMemory cannot be greater than maxQueryTotalMemory");
Expand Down Expand Up @@ -485,6 +489,9 @@ public synchronized Map<String, Optional<MemoryInfo>> getWorkerMemoryInfo()
{
Map<String, Optional<MemoryInfo>> memoryInfo = new HashMap<>();
for (Entry<String, RemoteNodeMemory> entry : nodes.entrySet()) {
if (!includeCoordinator && entry.getValue().getNode().isCoordinator()) {
continue;
}
String workerId = entry.getKey();
memoryInfo.put(workerId, entry.getValue().getInfo());
}
Expand Down

0 comments on commit 5583b5f

Please sign in to comment.