Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove redundant joinOutputRows metric [databricks] #10348

Merged
merged 1 commit into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ object GpuMetric extends Logging {
val SORT_TIME = "sortTime"
val AGG_TIME = "computeAggTime"
val JOIN_TIME = "joinTime"
val JOIN_OUTPUT_ROWS = "joinOutputRows"
val FILTER_TIME = "filterTime"
val BUILD_DATA_SIZE = "buildDataSize"
val BUILD_TIME = "buildTime"
Expand Down Expand Up @@ -98,7 +97,6 @@ object GpuMetric extends Logging {
val DESCRIPTION_SORT_TIME = "sort time"
val DESCRIPTION_AGG_TIME = "aggregation time"
val DESCRIPTION_JOIN_TIME = "join time"
val DESCRIPTION_JOIN_OUTPUT_ROWS = "join output rows"
val DESCRIPTION_FILTER_TIME = "filter time"
val DESCRIPTION_BUILD_DATA_SIZE = "build side size"
val DESCRIPTION_BUILD_TIME = "build time"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ case class GpuShuffledHashJoinExec(
BUILD_DATA_SIZE -> createSizeMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_DATA_SIZE),
BUILD_TIME -> createNanoTimingMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_TIME),
STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_JOIN_TIME),
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS))
JOIN_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_JOIN_TIME))

override def requiredChildDistribution: Seq[Distribution] =
Seq(GpuHashPartitioning.getDistribution(cpuLeftKeys),
Expand Down Expand Up @@ -170,7 +169,6 @@ case class GpuShuffledHashJoinExec(
val opTime = gpuLongMetric(OP_TIME)
val streamTime = gpuLongMetric(STREAM_TIME)
val joinTime = gpuLongMetric(JOIN_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)
val numPartitions = RapidsConf.NUM_SUB_PARTITIONS.get(conf)
val subPartConf = RapidsConf.HASH_SUB_PARTITION_TEST_ENABLED.get(conf)
.map(_ && RapidsConf.TEST_CONF.get(conf))
Expand Down Expand Up @@ -202,7 +200,7 @@ case class GpuShuffledHashJoinExec(
}
// doJoin will close singleBatch
doJoin(singleBatch, maybeBufferedStreamIter, realTarget,
numOutputRows, joinOutputRows, numOutputBatches, opTime, joinTime)
numOutputRows, numOutputBatches, opTime, joinTime)
case Right(builtBatchIter) =>
// For big joins, when the build data can not fit into a single batch.
val sizeBuildIter = builtBatchIter.map { cb =>
Expand All @@ -212,8 +210,7 @@ case class GpuShuffledHashJoinExec(
cb
}
doJoinBySubPartition(sizeBuildIter, maybeBufferedStreamIter, realTarget,
numPartitions, numOutputRows, joinOutputRows, numOutputBatches,
opTime, joinTime)
numPartitions, numOutputRows, numOutputBatches, opTime, joinTime)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -122,7 +122,6 @@ class GpuCartesianRDD(
targetSize: Long,
opTime: GpuMetric,
joinTime: GpuMetric,
joinOutputRows: GpuMetric,
numOutputRows: GpuMetric,
numOutputBatches: GpuMetric,
var rdd1: RDD[GpuSerializableBatch],
Expand Down Expand Up @@ -191,7 +190,6 @@ class GpuCartesianRDD(
Cross, GpuBuildLeft, numFirstTableColumns, batch, streamIterator, streamAttributes,
targetSize, boundCondition,
numOutputRows = numOutputRows,
joinOutputRows = joinOutputRows,
numOutputBatches = numOutputBatches,
opTime = opTime,
joinTime = joinTime)
Expand Down Expand Up @@ -236,8 +234,7 @@ case class GpuCartesianProductExec(
protected override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL
override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME),
JOIN_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_JOIN_TIME),
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS))
JOIN_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_JOIN_TIME))

protected override def doExecute(): RDD[InternalRow] =
throw new IllegalStateException("This should only be called from columnar")
Expand All @@ -246,7 +243,6 @@ case class GpuCartesianProductExec(
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val joinTime = gpuLongMetric(JOIN_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)
val opTime = gpuLongMetric(OP_TIME)

val boundCondition = condition.map(GpuBindReferences.bindGpuReference(_, output))
Expand Down Expand Up @@ -282,7 +278,6 @@ case class GpuCartesianProductExec(
targetSizeBytes,
opTime,
joinTime,
joinOutputRows,
numOutputRows,
numOutputBatches,
left.executeColumnar().map(cb => new GpuSerializableBatch(cb)),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -111,7 +111,6 @@ abstract class GpuBroadcastHashJoinExecBase(
override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL
override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME),
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_JOIN_TIME))

Expand Down Expand Up @@ -180,7 +179,6 @@ abstract class GpuBroadcastHashJoinExecBase(
val opTime = gpuLongMetric(OP_TIME)
val streamTime = gpuLongMetric(STREAM_TIME)
val joinTime = gpuLongMetric(JOIN_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)

val targetSize = RapidsConf.GPU_BATCH_SIZE_BYTES.get(conf)

Expand All @@ -196,8 +194,7 @@ abstract class GpuBroadcastHashJoinExecBase(
new CollectTimeIterator("broadcast join stream", it, streamTime),
allMetrics)
// builtBatch will be closed in doJoin
doJoin(builtBatch, streamIter, targetSize,
numOutputRows, joinOutputRows, numOutputBatches, opTime, joinTime)
doJoin(builtBatch, streamIter, targetSize, numOutputRows, numOutputBatches, opTime, joinTime)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -329,7 +329,6 @@ object GpuBroadcastNestedLoopJoinExecBase {
targetSize: Long,
boundCondition: Option[GpuExpression],
numOutputRows: GpuMetric,
joinOutputRows: GpuMetric,
numOutputBatches: GpuMetric,
opTime: GpuMetric,
joinTime: GpuMetric): Iterator[ColumnarBatch] = {
Expand All @@ -355,7 +354,6 @@ object GpuBroadcastNestedLoopJoinExecBase {
}
}
joinIterator.map { cb =>
joinOutputRows += cb.numRows()
numOutputRows += cb.numRows()
numOutputBatches += 1
cb
Expand Down Expand Up @@ -462,8 +460,7 @@ abstract class GpuBroadcastNestedLoopJoinExecBase(
OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME),
BUILD_DATA_SIZE -> createSizeMetric(MODERATE_LEVEL, DESCRIPTION_BUILD_DATA_SIZE),
BUILD_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_BUILD_TIME),
JOIN_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_JOIN_TIME),
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS))
JOIN_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_JOIN_TIME))

/** BuildRight means the right relation <=> the broadcast relation. */
val (streamed, buildPlan) = gpuBuildSide match {
Expand Down Expand Up @@ -613,7 +610,6 @@ abstract class GpuBroadcastNestedLoopJoinExecBase(
if (output.isEmpty) {
doUnconditionalJoinRowCount(relation)
} else {
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val buildTime = gpuLongMetric(BUILD_TIME)
Expand Down Expand Up @@ -666,7 +662,6 @@ abstract class GpuBroadcastNestedLoopJoinExecBase(
}
}
joinIterator.map { cb =>
joinOutputRows += cb.numRows()
numOutputRows += cb.numRows()
numOutputBatches += 1
cb
Expand Down Expand Up @@ -773,7 +768,6 @@ abstract class GpuBroadcastNestedLoopJoinExecBase(
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val opTime = gpuLongMetric(OP_TIME)
val joinTime = gpuLongMetric(JOIN_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)
val nestedLoopJoinType = joinType
val buildSide = gpuBuildSide
streamed.executeColumnar().mapPartitions { streamedIter =>
Expand All @@ -791,7 +785,6 @@ abstract class GpuBroadcastNestedLoopJoinExecBase(
spillableBuiltBatch,
lazyStream, streamAttributes, targetSizeBytes, boundCondition,
numOutputRows = numOutputRows,
joinOutputRows = joinOutputRows,
numOutputBatches = numOutputBatches,
opTime = opTime,
joinTime = joinTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,6 @@ trait GpuHashJoin extends GpuExec {
stream: Iterator[ColumnarBatch],
targetSize: Long,
numOutputRows: GpuMetric,
joinOutputRows: GpuMetric,
numOutputBatches: GpuMetric,
opTime: GpuMetric,
joinTime: GpuMetric): Iterator[ColumnarBatch] = {
Expand Down Expand Up @@ -1140,7 +1139,6 @@ trait GpuHashJoin extends GpuExec {
}

joinIterator.map { cb =>
joinOutputRows += cb.numRows()
numOutputRows += cb.numRows()
numOutputBatches += 1
cb
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2023-2024, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -554,7 +554,6 @@ trait GpuSubPartitionHashJoin extends Logging { self: GpuHashJoin =>
targetSize: Long,
numPartitions: Int,
numOutputRows: GpuMetric,
joinOutputRows: GpuMetric,
numOutputBatches: GpuMetric,
opTime: GpuMetric,
joinTime: GpuMetric): Iterator[ColumnarBatch] = {
Expand Down Expand Up @@ -594,7 +593,7 @@ trait GpuSubPartitionHashJoin extends Logging { self: GpuHashJoin =>
}
// Leverage the original join iterators
val joinIter = doJoin(buildCb, streamIter, targetSize,
numOutputRows, joinOutputRows, numOutputBatches, opTime, joinTime)
numOutputRows, numOutputBatches, opTime, joinTime)
Some(joinIter)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -88,7 +88,6 @@ case class GpuBroadcastHashJoinExec(

override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME),
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_JOIN_TIME),
NUM_INPUT_ROWS -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_ROWS),
Expand Down Expand Up @@ -159,7 +158,6 @@ case class GpuBroadcastHashJoinExec(
val opTime = gpuLongMetric(OP_TIME)
val streamTime = gpuLongMetric(STREAM_TIME)
val joinTime = gpuLongMetric(JOIN_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)

val targetSize = RapidsConf.GPU_BATCH_SIZE_BYTES.get(conf)

Expand All @@ -180,8 +178,7 @@ case class GpuBroadcastHashJoinExec(
new CollectTimeIterator("executor broadcast join stream", it, streamTime),
allMetrics)
// builtBatch will be closed in doJoin
doJoin(builtBatch, streamIter, targetSize,
numOutputRows, joinOutputRows, numOutputBatches, opTime, joinTime)
doJoin(builtBatch, streamIter, targetSize, numOutputRows, numOutputBatches, opTime, joinTime)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -146,7 +146,6 @@ case class GpuBroadcastNestedLoopJoinExec(
BUILD_DATA_SIZE -> createSizeMetric(MODERATE_LEVEL, DESCRIPTION_BUILD_DATA_SIZE),
BUILD_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_BUILD_TIME),
JOIN_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_JOIN_TIME),
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
NUM_INPUT_ROWS -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_ROWS),
NUM_INPUT_BATCHES -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_BATCHES),
CONCAT_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_CONCAT_TIME)
Expand Down
Loading