Skip to content

Commit

Permalink
Remove unnecessary digests from queryinfo
Browse files Browse the repository at this point in the history
  • Loading branch information
raunaqmorarka authored and wendigo committed Nov 8, 2024
1 parent d79e9be commit f157121
Show file tree
Hide file tree
Showing 12 changed files with 195 additions and 35 deletions.
24 changes: 12 additions & 12 deletions core/trino-main/src/main/java/io/trino/event/QueryMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -762,18 +762,18 @@ private static void populateStageOutputBufferUtilization(StageInfo stageInfo, Im
stageInfo.getStageId().getId(),
stageInfo.getTasks().size(),
// scale ratio to percentages
utilization.getP01() * 100,
utilization.getP05() * 100,
utilization.getP10() * 100,
utilization.getP25() * 100,
utilization.getP50() * 100,
utilization.getP75() * 100,
utilization.getP90() * 100,
utilization.getP95() * 100,
utilization.getP99() * 100,
utilization.getMin() * 100,
utilization.getMax() * 100,
Duration.ofNanos(utilization.getTotal())));
utilization.p01() * 100,
utilization.p05() * 100,
utilization.p10() * 100,
utilization.p25() * 100,
utilization.p50() * 100,
utilization.p75() * 100,
utilization.p90() * 100,
utilization.p95() * 100,
utilization.p99() * 100,
utilization.min() * 100,
utilization.max() * 100,
Duration.ofNanos(utilization.total())));
});
for (StageInfo subStage : stageInfo.getSubStages()) {
populateStageOutputBufferUtilization(subStage, utilizations);
Expand Down
40 changes: 40 additions & 0 deletions core/trino-main/src/main/java/io/trino/execution/QueryInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -438,4 +438,44 @@ public String toString()
.add("fieldNames", fieldNames)
.toString();
}

public QueryInfo pruneDigests()
{
return new QueryInfo(
queryId,
session,
state,
self,
fieldNames,
query,
preparedQuery,
queryStats,
setCatalog,
setSchema,
setPath,
setAuthorizationUser,
resetAuthorizationUser,
setSessionProperties,
resetSessionProperties,
setRoles,
addedPreparedStatements,
deallocatedPreparedStatements,
startedTransactionId,
clearTransactionId,
updateType,
outputStage.map(StageInfo::pruneDigests),
failureInfo,
errorCode,
warnings,
inputs,
output,
referencedTables,
routines,
finalQueryInfo,
resourceGroupId,
queryType,
retryPolicy,
pruned,
version);
}
}
16 changes: 16 additions & 0 deletions core/trino-main/src/main/java/io/trino/execution/StageInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

@Immutable
Expand Down Expand Up @@ -167,6 +168,21 @@ public StageInfo withSubStages(List<StageInfo> subStages)
failureCause);
}

public StageInfo pruneDigests()
{
return new StageInfo(
stageId,
state,
plan,
coordinatorOnly,
types,
stageStats,
tasks.stream().map(TaskInfo::pruneDigests).collect(toImmutableList()),
subStages.stream().map(StageInfo::pruneDigests).collect(toImmutableList()),
tables,
failureCause);
}

public static StageInfo createInitial(QueryId queryId, StageState state, PlanFragment fragment)
{
return new StageInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
succinctDuration(inputBlockedTime, NANOSECONDS),
succinctDuration(failedInputBlockedTime, NANOSECONDS),
succinctBytes(bufferedDataSize),
TDigestHistogram.merge(bufferUtilizationHistograms.build()),
TDigestHistogram.merge(bufferUtilizationHistograms.build()).map(DistributionSnapshot::new),
succinctBytes(outputDataSize),
succinctBytes(failedOutputDataSize),
outputPositions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.airlift.units.Duration;
import io.trino.operator.BlockedReason;
import io.trino.operator.OperatorStats;
import io.trino.plugin.base.metrics.TDigestHistogram;
import io.trino.spi.eventlistener.StageGcStatistics;
import org.joda.time.DateTime;

Expand Down Expand Up @@ -102,7 +101,7 @@ public class StageStats
private final Duration failedInputBlockedTime;

private final DataSize bufferedDataSize;
private final Optional<TDigestHistogram> outputBufferUtilization;
private final Optional<io.trino.execution.DistributionSnapshot> outputBufferUtilization;
private final DataSize outputDataSize;
private final DataSize failedOutputDataSize;
private final long outputPositions;
Expand Down Expand Up @@ -177,7 +176,7 @@ public StageStats(
@JsonProperty("failedInputBlockedTime") Duration failedInputBlockedTime,

@JsonProperty("bufferedDataSize") DataSize bufferedDataSize,
@JsonProperty("outputBufferUtilization") Optional<TDigestHistogram> outputBufferUtilization,
@JsonProperty("outputBufferUtilization") Optional<io.trino.execution.DistributionSnapshot> outputBufferUtilization,
@JsonProperty("outputDataSize") DataSize outputDataSize,
@JsonProperty("failedOutputDataSize") DataSize failedOutputDataSize,
@JsonProperty("outputPositions") long outputPositions,
Expand Down Expand Up @@ -562,7 +561,7 @@ public DataSize getBufferedDataSize()
}

@JsonProperty
public Optional<TDigestHistogram> getOutputBufferUtilization()
public Optional<io.trino.execution.DistributionSnapshot> getOutputBufferUtilization()
{
return outputBufferUtilization;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public TaskInfo pruneSpoolingOutputStats()
return new TaskInfo(taskStatus, lastHeartbeat, outputBuffers.pruneSpoolingOutputStats(), noMoreSplits, stats, estimatedMemory, needsPlan);
}

public TaskInfo pruneDigests()
{
return new TaskInfo(taskStatus, lastHeartbeat, outputBuffers, noMoreSplits, stats.pruneDigests(), estimatedMemory, needsPlan);
}

@Override
public String toString()
{
Expand Down
48 changes: 48 additions & 0 deletions core/trino-main/src/main/java/io/trino/operator/PipelineStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

@Immutable
Expand Down Expand Up @@ -506,6 +507,53 @@ public PipelineStats summarize()
ImmutableList.of());
}

public PipelineStats pruneDigests()
{
return new PipelineStats(
pipelineId,
firstStartTime,
lastStartTime,
lastEndTime,
inputPipeline,
outputPipeline,
totalDrivers,
queuedDrivers,
queuedPartitionedDrivers,
queuedPartitionedSplitsWeight,
runningDrivers,
runningPartitionedDrivers,
runningPartitionedSplitsWeight,
blockedDrivers,
completedDrivers,
userMemoryReservation,
revocableMemoryReservation,
queuedTime,
elapsedTime,
totalScheduledTime,
totalCpuTime,
totalBlockedTime,
fullyBlocked,
blockedReasons,
physicalInputDataSize,
physicalInputPositions,
physicalInputReadTime,
internalNetworkInputDataSize,
internalNetworkInputPositions,
rawInputDataSize,
rawInputPositions,
processedInputDataSize,
processedInputPositions,
inputBlockedTime,
outputDataSize,
outputPositions,
outputBlockedTime,
physicalWrittenDataSize,
operatorSummaries.stream()
.map(io.trino.execution.DistributionSnapshot::pruneOperatorStats)
.collect(toImmutableList()),
drivers);
}

private static List<OperatorStats> summarizeOperatorStats(List<OperatorStats> operatorSummaries)
{
// Use an exact size ImmutableList builder to avoid a redundant copy in the PipelineStats constructor
Expand Down
51 changes: 51 additions & 0 deletions core/trino-main/src/main/java/io/trino/operator/TaskStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

Expand Down Expand Up @@ -655,6 +656,56 @@ public TaskStats summarizeFinal()
summarizePipelineStats(pipelines));
}

public TaskStats pruneDigests()
{
return new TaskStats(
createTime,
firstStartTime,
lastStartTime,
terminatingStartTime,
lastEndTime,
endTime,
elapsedTime,
queuedTime,
totalDrivers,
queuedDrivers,
queuedPartitionedDrivers,
queuedPartitionedSplitsWeight,
runningDrivers,
runningPartitionedDrivers,
runningPartitionedSplitsWeight,
blockedDrivers,
completedDrivers,
cumulativeUserMemory,
userMemoryReservation,
peakUserMemoryReservation,
revocableMemoryReservation,
totalScheduledTime,
totalCpuTime,
totalBlockedTime,
fullyBlocked,
blockedReasons,
physicalInputDataSize,
physicalInputPositions,
physicalInputReadTime,
internalNetworkInputDataSize,
internalNetworkInputPositions,
rawInputDataSize,
rawInputPositions,
processedInputDataSize,
processedInputPositions,
inputBlockedTime,
outputDataSize,
outputPositions,
outputBlockedTime,
writerInputDataSize,
physicalWrittenDataSize,
maxWriterCount,
fullGcCount,
fullGcTime,
pipelines.stream().map(PipelineStats::pruneDigests).collect(toImmutableList()));
}

private static List<PipelineStats> summarizePipelineStats(List<PipelineStats> pipelines)
{
// Use an exact size ImmutableList builder to avoid a redundant copy in the TaskStats constructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public Response getQueryInfo(@PathParam("queryId") QueryId queryId, @Context Htt
if (queryInfo.isPresent()) {
try {
checkCanViewQueryOwnedBy(sessionContextFactory.extractAuthorizedIdentity(servletRequest, httpHeaders), queryInfo.get().getSession().toIdentity(), accessControl);
return Response.ok(queryInfo.get()).build();
return Response.ok(queryInfo.get().pruneDigests()).build();
}
catch (AccessDeniedException e) {
throw new ForbiddenException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.trino.cost.PlanNodeStatsAndCostSummary;
import io.trino.cost.PlanNodeStatsEstimate;
import io.trino.cost.StatsAndCosts;
import io.trino.execution.DistributionSnapshot;
import io.trino.execution.QueryStats;
import io.trino.execution.StageInfo;
import io.trino.execution.StageStats;
Expand All @@ -41,7 +42,6 @@
import io.trino.metadata.Metadata;
import io.trino.metadata.ResolvedFunction;
import io.trino.metadata.TableHandle;
import io.trino.plugin.base.metrics.TDigestHistogram;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.expression.FunctionName;
import io.trino.spi.function.CatalogSchemaFunctionName;
Expand Down Expand Up @@ -507,22 +507,23 @@ private static String formatFragment(
stageStats.getPeakUserMemoryReservation().succinct(),
tasks.size(),
maxPeakTaskMemoryUsage.succinct()));
Optional<TDigestHistogram> outputBufferUtilization = stageInfo.get().getStageStats().getOutputBufferUtilization();
Optional<DistributionSnapshot> outputBufferUtilization = stageInfo.get().getStageStats().getOutputBufferUtilization();
if (verbose && outputBufferUtilization.isPresent()) {
builder.append(indentString(1))
.append(format("Output buffer active time: %s, buffer utilization distribution (%%): {p01=%s, p05=%s, p10=%s, p25=%s, p50=%s, p75=%s, p90=%s, p95=%s, p99=%s, max=%s}\n",
succinctNanos(outputBufferUtilization.get().getTotal()),
.append(format("Output buffer active time: %s, buffer utilization distribution (%%): {p01=%s, p05=%s, p10=%s, p25=%s, p50=%s, p75=%s, p90=%s, p95=%s, p99=%s, min=%s, max=%s}\n",
succinctNanos(outputBufferUtilization.get().total()),
// scale ratio to percentages
formatDouble(outputBufferUtilization.get().getP01() * 100),
formatDouble(outputBufferUtilization.get().getP05() * 100),
formatDouble(outputBufferUtilization.get().getP10() * 100),
formatDouble(outputBufferUtilization.get().getP25() * 100),
formatDouble(outputBufferUtilization.get().getP50() * 100),
formatDouble(outputBufferUtilization.get().getP75() * 100),
formatDouble(outputBufferUtilization.get().getP90() * 100),
formatDouble(outputBufferUtilization.get().getP95() * 100),
formatDouble(outputBufferUtilization.get().getP99() * 100),
formatDouble(outputBufferUtilization.get().getMax() * 100)));
formatDouble(outputBufferUtilization.get().p01() * 100),
formatDouble(outputBufferUtilization.get().p05() * 100),
formatDouble(outputBufferUtilization.get().p10() * 100),
formatDouble(outputBufferUtilization.get().p25() * 100),
formatDouble(outputBufferUtilization.get().p50() * 100),
formatDouble(outputBufferUtilization.get().p75() * 100),
formatDouble(outputBufferUtilization.get().p90() * 100),
formatDouble(outputBufferUtilization.get().p95() * 100),
formatDouble(outputBufferUtilization.get().p99() * 100),
formatDouble(outputBufferUtilization.get().min() * 100),
formatDouble(outputBufferUtilization.get().max() * 100)));
}

TDigest taskOutputDistribution = new TDigest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ private static StageStats createStageStats(int value)
Duration.succinctDuration(value, SECONDS),
Duration.succinctDuration(value, SECONDS),
succinctBytes(value),
Optional.of(new TDigestHistogram(new TDigest())),
Optional.of(new DistributionSnapshot(new TDigestHistogram(new TDigest()))),
succinctBytes(value),
succinctBytes(value),
value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public class TestStageStats
new Duration(202, NANOSECONDS),

DataSize.ofBytes(34),
Optional.of(getTDigestHistogram(10)),
Optional.of(new io.trino.execution.DistributionSnapshot(getTDigestHistogram(10))),
DataSize.ofBytes(35),
DataSize.ofBytes(36),
37,
Expand Down Expand Up @@ -182,7 +182,7 @@ private static void assertExpectedStageStats(StageStats actual)
assertThat(actual.getFailedInputBlockedTime()).isEqualTo(new Duration(202, NANOSECONDS));

assertThat(actual.getBufferedDataSize()).isEqualTo(DataSize.ofBytes(34));
assertThat(actual.getOutputBufferUtilization().get().getMax()).isEqualTo(9.0);
assertThat(actual.getOutputBufferUtilization().get().max()).isEqualTo(9.0);
assertThat(actual.getOutputDataSize()).isEqualTo(DataSize.ofBytes(35));
assertThat(actual.getFailedOutputDataSize()).isEqualTo(DataSize.ofBytes(36));
assertThat(actual.getOutputPositions()).isEqualTo(37);
Expand Down

0 comments on commit f157121

Please sign in to comment.