diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 3d897ec4af7f6..0be4a61f27587 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -312,23 +312,31 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) * Represents a partitioning where rows are split across partitions based on transforms defined * by `expressions`. `partitionValues`, if defined, should contain value of partition key(s) in * ascending order, after evaluated by the transforms in `expressions`, for each input partition. - * In addition, its length must be the same as the number of input partitions (and thus is a 1-1 - * mapping). The `partitionValues` may contain duplicated partition values. + * In addition, its length must be the same as the number of Spark partitions (and thus is a 1-1 + * mapping), and each row in `partitionValues` must be unique. * - * For example, if `expressions` is `[years(ts_col)]`, then a valid value of `partitionValues` is - * `[0, 1, 2]`, which represents 3 input partitions with distinct partition values. All rows - * in each partition have the same value for column `ts_col` (which is of timestamp type), after - * being applied by the `years` transform. + * The `originalPartitionValues`, on the other hand, are partition values from the original input + * splits returned by data sources. It may contain duplicated values. * - * On the other hand, `[0, 0, 1]` is not a valid value for `partitionValues` since `0` is - * duplicated twice. + * For example, if a data source reports partition transform expressions `[years(ts_col)]` with 4 + * input splits whose corresponding partition values are `[0, 1, 2, 2]`, then the `expressions` + * in this case is `[years(ts_col)]`, while `partitionValues` is `[0, 1, 2]`, which + * represents 3 input partitions with distinct partition values. All rows in each partition have + * the same value for column `ts_col` (which is of timestamp type), after being applied by the + * `years` transform. This is generated after combining the two splits with partition value `2` + * into a single Spark partition. + * + * On the other hand, in this example `[0, 1, 2, 2]` is the value of `originalPartitionValues` + * which is calculated from the original input splits. * * @param expressions partition expressions for the partitioning. * @param numPartitions the number of partitions - * @param partitionValues the values for the cluster keys of the distribution, must be - * in ascending order. + * @param partitionValues the values for the final cluster keys (that is, after applying grouping + * on the input splits according to `expressions`) of the distribution, + * must be in ascending order, and must NOT contain duplicated values. * @param originalPartitionValues the original input partition values before any grouping has been - * applied, must be in ascending order. + * applied, must be in ascending order, and may contain duplicated + * values */ case class KeyGroupedPartitioning( expressions: Seq[Expression], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala index ea080924ef332..932ac0f5a1b15 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala @@ -141,9 +141,9 @@ case class BatchScanExec( (splits.head.asInstanceOf[HasPartitionKey].partitionKey(), splits) }) - // This means the input partitions are not grouped by partition values. We'll need to - // check `groupByPartitionValues` and decide whether to group and replicate splits - // within a partition. + // When partially clustered, the input partitions are not grouped by partition + // values. Here we'll need to check `commonPartitionValues` and decide how to group + // and replicate splits within a partition. if (spjParams.commonPartitionValues.isDefined && spjParams.applyPartialClustering) { // A mapping from the common partition values to how many splits the partition // should contain.