Skip to content

Commit

Permalink
[trinodb#152] Refine data skipping metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
杨金德 committed Sep 1, 2022
1 parent e65cc75 commit 6750bbf
Show file tree
Hide file tree
Showing 22 changed files with 620 additions and 179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,9 @@ public void recordGetSplitTime(long start)
stateMachine.recordGetSplitTime(start);
}

public void updateConnectorMetrics(Metrics newConnectorMetrics)
public void updateConnectorMetrics(Metrics newConnectorMetrics, PlanNodeId planNodeId)
{
stateMachine.updateConnectorMetrics(newConnectorMetrics);
stateMachine.updateConnectorMetrics(newConnectorMetrics, planNodeId);
}

private synchronized void updateTaskStatus(TaskStatus status)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@
import io.trino.operator.BlockedReason;
import io.trino.operator.OperatorStats;
import io.trino.operator.PipelineStats;
import io.trino.operator.ScanFilterAndProjectOperator;
import io.trino.operator.TaskStats;
import io.trino.spi.eventlistener.StageGcStatistics;
import io.trino.spi.metrics.DataSkippingMetrics;
import io.trino.spi.metrics.Metrics;
import io.trino.spi.metrics.TableLevelDataSkippingMetrics;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.TableScanNode;
import io.trino.util.Failures;
Expand All @@ -41,6 +45,8 @@
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -50,6 +56,7 @@
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.airlift.units.DataSize.succinctBytes;
import static io.airlift.units.Duration.succinctDuration;
import static io.trino.execution.StageState.ABORTED;
Expand All @@ -60,6 +67,7 @@
import static io.trino.execution.StageState.RUNNING;
import static io.trino.execution.StageState.SCHEDULING;
import static io.trino.execution.StageState.TERMINAL_STAGE_STATES;
import static io.trino.sql.planner.optimizations.PlanNodeSearcher.searchFrom;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.lang.Math.toIntExact;
Expand All @@ -71,6 +79,7 @@
public class StageStateMachine
{
private static final Logger log = Logger.get(StageStateMachine.class);
private static final String TABLE_LEVEL_METRICS_SUFFIX = "_table_level";

private final StageId stageId;
private final PlanFragment fragment;
Expand All @@ -91,6 +100,7 @@ public class StageStateMachine
private final AtomicLong currentTotalMemory = new AtomicLong();

private final AtomicReference<Metrics> connectorMetrics = new AtomicReference<>(Metrics.EMPTY);
private final ConcurrentMap<PlanNodeId, PlanNodeId> projectOrFilterNodeToTableScan = new ConcurrentHashMap<>();

public StageStateMachine(
StageId stageId,
Expand Down Expand Up @@ -221,9 +231,27 @@ public void updateMemoryUsage(long deltaUserMemoryInBytes, long deltaRevocableMe
peakRevocableMemory.updateAndGet(currentPeakValue -> max(currentRevocableMemory.get(), currentPeakValue));
}

public void updateConnectorMetrics(Metrics newConnectorMetrics)
public void updateConnectorMetrics(Metrics newConnectorMetrics, PlanNodeId planNodeId)
{
connectorMetrics.updateAndGet(metrics -> metrics.mergeWith(newConnectorMetrics));
if (isValidTableScanNodeId(planNodeId)) {
connectorMetrics.updateAndGet(metrics -> metrics.mergeWith(processTableLevelMetrics(newConnectorMetrics, planNodeId)));
}
}

private boolean isValidTableScanNodeId(PlanNodeId planNodeId)
{
return planNodeId != null && tables.containsKey(planNodeId);
}

private Metrics processTableLevelMetrics(Metrics metrics, PlanNodeId planNodeId)
{
String table = stageId.getId() + ":" + planNodeId + ":" + tables.get(planNodeId).getTableName().asSchemaTableName().toString();
return new Metrics(metrics.getMetrics().entrySet().stream()
.filter(entry -> entry.getValue() instanceof DataSkippingMetrics)
.collect(toImmutableMap(
entry -> entry.getKey() + TABLE_LEVEL_METRICS_SUFFIX,
entry -> new TableLevelDataSkippingMetrics(ImmutableMap.of(table, (DataSkippingMetrics) entry.getValue())))));
}

public BasicStageStats getBasicStageStats(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
Expand Down Expand Up @@ -472,8 +500,6 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)

physicalWrittenDataSize += taskStats.getPhysicalWrittenDataSize().toBytes();

connectorMetricsAccumulator.add(taskStats.getConnectorMetrics());

fullGcCount += taskStats.getFullGcCount();
fullGcTaskCount += taskStats.getFullGcCount() > 0 ? 1 : 0;

Expand All @@ -486,6 +512,18 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
for (OperatorStats operatorStats : pipeline.getOperatorSummaries()) {
String id = pipeline.getPipelineId() + "." + operatorStats.getOperatorId();
operatorToStats.compute(id, (k, v) -> v == null ? operatorStats : v.add(operatorStats));

connectorMetricsAccumulator.add(operatorStats.getConnectorMetrics());

// generate table level data skipping metrics for TableScan node
PlanNodeId planNodeId = operatorStats.getPlanNodeId();
if (planNodeId != null && ScanFilterAndProjectOperator.class.getSimpleName().equals(operatorStats.getOperatorType())) {
// need to map Project/Filter node to TableScan node for ScanFilterAndProjectOperator
planNodeId = getTableScanNodeFromProjectOrFilter(planNodeId);
}
if (isValidTableScanNodeId(planNodeId)) {
connectorMetricsAccumulator.add(processTableLevelMetrics(operatorStats.getConnectorMetrics(), planNodeId));
}
}
}
}
Expand Down Expand Up @@ -565,6 +603,25 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
failureInfo);
}

private PlanNodeId getTableScanNodeFromProjectOrFilter(PlanNodeId planNodeId)
{
if (projectOrFilterNodeToTableScan.containsKey(planNodeId)) {
return projectOrFilterNodeToTableScan.get(planNodeId);
}

PlanNode planNode = searchFrom(fragment.getRoot())
.where(node -> node.getId().equals(planNodeId))
.findSingle()
.get();
PlanNodeId tableScanNodeId = searchFrom(planNode)
.where(TableScanNode.class::isInstance)
.findSingle()
.get()
.getId();
projectOrFilterNodeToTableScan.put(planNodeId, tableScanNodeId);
return tableScanNodeId;
}

public void recordGetSplitTime(long startNanos)
{
long elapsedNanos = System.nanoTime() - startNanos;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,9 +515,9 @@ public void recordGetSplitTime(long start)
stage.recordGetSplitTime(start);
}

public void updateConnectorMetrics(Metrics newConnectorMetrics)
public void updateConnectorMetrics(Metrics newConnectorMetrics, PlanNodeId planNodeId)
{
stage.updateConnectorMetrics(newConnectorMetrics);
stage.updateConnectorMetrics(newConnectorMetrics, planNodeId);
}

public StageId getStageId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,8 @@ else if (pendingSplits.isEmpty()) {
whenFinishedOrNewLifespanAdded.set(null);
// fall through
case FINISHED:
splitSource.getMetrics().ifPresent(stageExecution::updateConnectorMetrics);
splitSource.getMetrics().ifPresent(metrics ->
stageExecution.updateConnectorMetrics(metrics, partitionedNode));
return new ScheduleResult(
true,
overallNewTasks.build(),
Expand Down
14 changes: 0 additions & 14 deletions core/trino-main/src/main/java/io/trino/operator/TaskContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import io.trino.memory.QueryContextVisitor;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.memory.context.MemoryTrackingContext;
import io.trino.spi.metrics.Metrics;
import io.trino.spi.predicate.Domain;
import io.trino.sql.planner.LocalDynamicFiltersCollector;
import io.trino.sql.planner.plan.DynamicFilterId;
Expand Down Expand Up @@ -90,9 +89,6 @@ public class TaskContext

private final List<PipelineContext> pipelineContexts = new CopyOnWriteArrayList<>();

// update only if task is done to avoid redundant computation
private final AtomicReference<Metrics> connectorMetrics = new AtomicReference<>(Metrics.EMPTY);

private final boolean perOperatorCpuTimerEnabled;
private final boolean cpuTimerEnabled;

Expand Down Expand Up @@ -238,15 +234,6 @@ private void updateStatsIfDone(TaskState newState)
endNanos.compareAndSet(0, System.nanoTime());
endFullGcCount.compareAndSet(-1, majorGcCount);
endFullGcTimeNanos.compareAndSet(-1, majorGcTime);

// collect connector metrics directly from operators
Metrics.Accumulator connectorMetricsAccumulator = Metrics.accumulator();
pipelineContexts.stream()
.map(PipelineContext::getPipelineStats)
.flatMap(pipelineStats -> pipelineStats.getOperatorSummaries().stream())
.map(OperatorStats::getConnectorMetrics)
.forEach(connectorMetricsAccumulator::add);
connectorMetrics.compareAndSet(Metrics.EMPTY, connectorMetricsAccumulator.get());
}
}

Expand Down Expand Up @@ -596,7 +583,6 @@ public TaskStats getTaskStats()
succinctBytes(outputDataSize),
outputPositions,
succinctBytes(physicalWrittenDataSize),
connectorMetrics.get(),
fullGcCount,
fullGcTime,
pipelineStats);
Expand Down
16 changes: 0 additions & 16 deletions core/trino-main/src/main/java/io/trino/operator/TaskStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.google.common.collect.ImmutableSet;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.spi.metrics.Metrics;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -85,8 +84,6 @@ public class TaskStats

private final DataSize physicalWrittenDataSize;

private final Metrics connectorMetrics;

private final int fullGcCount;
private final Duration fullGcTime;

Expand Down Expand Up @@ -134,7 +131,6 @@ public TaskStats(DateTime createTime, DateTime endTime)
DataSize.ofBytes(0),
0,
DataSize.ofBytes(0),
Metrics.EMPTY,
0,
new Duration(0, MILLISECONDS),
ImmutableList.of());
Expand Down Expand Up @@ -193,8 +189,6 @@ public TaskStats(

@JsonProperty("physicalWrittenDataSize") DataSize physicalWrittenDataSize,

@JsonProperty("connectorMetrics") Metrics connectorMetrics,

@JsonProperty("fullGcCount") int fullGcCount,
@JsonProperty("fullGcTime") Duration fullGcTime,

Expand Down Expand Up @@ -268,8 +262,6 @@ public TaskStats(

this.physicalWrittenDataSize = requireNonNull(physicalWrittenDataSize, "physicalWrittenDataSize is null");

this.connectorMetrics = requireNonNull(connectorMetrics, "connectorMetrics is null");

checkArgument(fullGcCount >= 0, "fullGcCount is negative");
this.fullGcCount = fullGcCount;
this.fullGcTime = requireNonNull(fullGcTime, "fullGcTime is null");
Expand Down Expand Up @@ -497,12 +489,6 @@ public DataSize getPhysicalWrittenDataSize()
return physicalWrittenDataSize;
}

@JsonProperty
public Metrics getConnectorMetrics()
{
return connectorMetrics;
}

@JsonProperty
public List<PipelineStats> getPipelines()
{
Expand Down Expand Up @@ -588,7 +574,6 @@ public TaskStats summarize()
outputDataSize,
outputPositions,
physicalWrittenDataSize,
connectorMetrics,
fullGcCount,
fullGcTime,
ImmutableList.of());
Expand Down Expand Up @@ -637,7 +622,6 @@ public TaskStats summarizeFinal()
outputDataSize,
outputPositions,
physicalWrittenDataSize,
connectorMetrics,
fullGcCount,
fullGcTime,
summarizePipelineStats(pipelines));
Expand Down

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion core/trino-main/src/main/resources/webapp/dist/index.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion core/trino-main/src/main/resources/webapp/dist/plan.js

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions core/trino-main/src/main/resources/webapp/dist/query.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion core/trino-main/src/main/resources/webapp/dist/stage.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion core/trino-main/src/main/resources/webapp/dist/worker.js

Large diffs are not rendered by default.

Loading

0 comments on commit 6750bbf

Please sign in to comment.