Skip to content

Commit

Permalink
[SPARK-49163][SQL] Attempt to create table based on broken parquet pa…
Browse files Browse the repository at this point in the history
…rtition data results should return user-facing error

### What changes were proposed in this pull request?

Create an example parquet table with partitions and insert data in Spark:
```
create table t(col1 string, col2 string, col3 string) using parquet location 'some/path/parquet-test' partitioned by (col1, col2);
insert into t (col1, col2, col3) values ('a', 'b', 'c');
```
Go into the `parquet-test` path in the filesystem and try to copy parquet data file from path `col1=a/col2=b` directory into `col1=a`. After that, try to create new table based on parquet data in Spark:
```
create table broken_table using parquet location 'some/path/parquet-test';
```
This query errors with internal error. Stack trace excerpts:
```
org.apache.spark.SparkException: [INTERNAL_ERROR] Eagerly executed command failed. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace. SQLSTATE: XX000
...
Caused by: java.lang.AssertionError: assertion failed: Conflicting partition column names detected:        Partition column name list #0: col1
        Partition column name list #1: col1, col2For partitioned table directories, data files should only live in leaf directories.
And directories at the same level should have the same partition column name.
Please check the following directories for unexpected files or inconsistent partition column names:        file:some/path/parquet-test/col1=a
        file:some/path/parquet-test/col1=a/col2=b
  at scala.Predef$.assert(Predef.scala:279)
  at org.apache.spark.sql.execution.datasources.PartitioningUtils$.resolvePartitions(PartitioningUtils.scala:391)
...
```
Fix this by changing internal error to user-facing error.

### Why are the changes needed?

Replace internal error with user-facing one for valid sequence of Spark SQL operations.

### Does this PR introduce _any_ user-facing change?

Yes, it presents the user with regular error instead of internal error.

### How was this patch tested?

Added checks to `ParquetPartitionDiscoverySuite` which simulate the described scenario by manually breaking parquet table in the filesystem.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47668 from nikolamand-db/SPARK-49163.

Authored-by: Nikola Mandic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
nikolamand-db authored and cloud-fan committed Aug 9, 2024
1 parent 731a104 commit fd3069a
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 47 deletions.
11 changes: 11 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,17 @@
],
"sqlState" : "40000"
},
"CONFLICTING_PARTITION_COLUMN_NAMES" : {
"message" : [
"Conflicting partition column names detected:",
"<distinctPartColLists>",
"For partitioned table directories, data files should only live in leaf directories.",
"And directories at the same level should have the same partition column name.",
"Please check the following directories for unexpected files or inconsistent partition column names:",
"<suspiciousPaths>"
],
"sqlState" : "KD009"
},
"CONNECT" : {
"message" : [
"Generic Spark Connect error."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2837,4 +2837,16 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
"parameter" -> toSQLId("unit"),
"invalidValue" -> s"'$invalidValue'"))
}

def conflictingPartitionColumnNamesError(
distinctPartColLists: Seq[String],
suspiciousPaths: Seq[Path]): SparkRuntimeException = {
new SparkRuntimeException(
errorClass = "CONFLICTING_PARTITION_COLUMN_NAMES",
messageParameters = Map(
"distinctPartColLists" -> distinctPartColLists.mkString("\n\t", "\n\t", "\n"),
"suspiciousPaths" -> suspiciousPaths.map("\t" + _).mkString("\n", "\n", "")
)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import scala.util.control.NonFatal

import org.apache.hadoop.fs.Path

import org.apache.spark.SparkRuntimeException
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
Expand Down Expand Up @@ -386,9 +387,9 @@ object PartitioningUtils extends SQLConfHelper {
} else {
pathsWithPartitionValues.map(_._2.columnNames.map(_.toLowerCase()))
}
assert(
partColNames.distinct.size == 1,
listConflictingPartitionColumns(pathsWithPartitionValues))
if (partColNames.distinct.size != 1) {
throw conflictingPartitionColumnsError(pathsWithPartitionValues)
}

// Resolves possible type conflicts for each column
val values = pathsWithPartitionValues.map(_._2)
Expand All @@ -404,8 +405,8 @@ object PartitioningUtils extends SQLConfHelper {
}
}

private[datasources] def listConflictingPartitionColumns(
pathWithPartitionValues: Seq[(Path, PartitionValues)]): String = {
private[datasources] def conflictingPartitionColumnsError(
pathWithPartitionValues: Seq[(Path, PartitionValues)]): SparkRuntimeException = {
val distinctPartColNames = pathWithPartitionValues.map(_._2.columnNames).distinct

def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] =
Expand All @@ -423,13 +424,8 @@ object PartitioningUtils extends SQLConfHelper {
// Lists out those non-leaf partition directories that also contain files
val suspiciousPaths = distinctPartColNames.sortBy(_.length).flatMap(partColNamesToPaths)

s"Conflicting partition column names detected:\n" +
distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") +
"For partitioned table directories, data files should only live in leaf directories.\n" +
"And directories at the same level should have the same partition column name.\n" +
"Please check the following directories for unexpected files or " +
"inconsistent partition column names:\n" +
suspiciousPaths.map("\t" + _).mkString("\n", "\n", "")
QueryExecutionErrors.conflictingPartitionColumnNamesError(
distinctPartColLists, suspiciousPaths)
}

// scalastyle:off line.size.limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class FileIndexSuite extends SharedSparkSession {
}

withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
val msg = intercept[AssertionError] {
val msg = intercept[SparkRuntimeException] {
val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, None)
fileIndex.partitionSpec()
}.getMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import com.google.common.io.Files
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetOutputFormat

import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, SparkRuntimeException}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
Expand Down Expand Up @@ -958,54 +958,58 @@ abstract class ParquetPartitionDiscoverySuite
}
}

test("listConflictingPartitionColumns") {
def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]): String = {
val conflictingColNameLists = colNameLists.zipWithIndex.map { case (list, index) =>
s"\tPartition column name list #$index: $list"
}.mkString("\n", "\n", "\n")

// scalastyle:off
s"""Conflicting partition column names detected:
|$conflictingColNameLists
|For partitioned table directories, data files should only live in leaf directories.
|And directories at the same level should have the same partition column name.
|Please check the following directories for unexpected files or inconsistent partition column names:
|${paths.map("\t" + _).mkString("\n", "\n", "")}
""".stripMargin.trim
// scalastyle:on
}

assert(
listConflictingPartitionColumns(
test("conflictingPartitionColumnsError") {
checkError(
exception = conflictingPartitionColumnsError(
Seq(
(new Path("file:/tmp/foo/a=1"),
PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType)))),
(new Path("file:/tmp/foo/b=1"),
PartitionValues(Seq("b"), Seq(TypedPartValue("1", IntegerType)))))).trim ===
makeExpectedMessage(Seq("a", "b"), Seq("file:/tmp/foo/a=1", "file:/tmp/foo/b=1")))
PartitionValues(Seq("b"), Seq(TypedPartValue("1", IntegerType))))
)
),
errorClass = "CONFLICTING_PARTITION_COLUMN_NAMES",
parameters = Map(
"distinctPartColLists" ->
"\n\tPartition column name list #0: a\n\tPartition column name list #1: b\n",
"suspiciousPaths" -> "\n\tfile:/tmp/foo/a=1\n\tfile:/tmp/foo/b=1"
)
)

assert(
listConflictingPartitionColumns(
checkError(
exception = conflictingPartitionColumnsError(
Seq(
(new Path("file:/tmp/foo/a=1/_temporary"),
PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType)))),
(new Path("file:/tmp/foo/a=1"),
PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType)))))).trim ===
makeExpectedMessage(
Seq("a"),
Seq("file:/tmp/foo/a=1/_temporary", "file:/tmp/foo/a=1")))
PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType))))
)
),
errorClass = "CONFLICTING_PARTITION_COLUMN_NAMES",
parameters = Map(
"distinctPartColLists" ->
"\n\tPartition column name list #0: a\n",
"suspiciousPaths" -> "\n\tfile:/tmp/foo/a=1/_temporary\n\tfile:/tmp/foo/a=1"
)
)

assert(
listConflictingPartitionColumns(
checkError(
exception = conflictingPartitionColumnsError(
Seq(
(new Path("file:/tmp/foo/a=1"),
PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType)))),
(new Path("file:/tmp/foo/a=1/b=foo"),
PartitionValues(Seq("a", "b"),
Seq(TypedPartValue("1", IntegerType), TypedPartValue("foo", StringType)))))).trim ===
makeExpectedMessage(
Seq("a", "a, b"),
Seq("file:/tmp/foo/a=1", "file:/tmp/foo/a=1/b=foo")))
Seq(TypedPartValue("1", IntegerType), TypedPartValue("foo", StringType))))
)
),
errorClass = "CONFLICTING_PARTITION_COLUMN_NAMES",
parameters = Map(
"distinctPartColLists" ->
"\n\tPartition column name list #0: a\n\tPartition column name list #1: a, b\n",
"suspiciousPaths" -> "\n\tfile:/tmp/foo/a=1\n\tfile:/tmp/foo/a=1/b=foo"
)
)
}

test("Parallel partition discovery") {
Expand Down Expand Up @@ -1145,6 +1149,44 @@ abstract class ParquetPartitionDiscoverySuite
checkAnswer(res, Seq(Row(1, 2, 3, 4.0f)))
}
}

test("SPARK-49163: Attempt to create table based on broken parquet partition data") {
withTempDir { dir =>
val data = Seq[(String, String, String)](("a", "b", "c"))
data
.toDF("col1", "col2", "col3")
.write
.mode("overwrite")
.partitionBy("col1", "col2")
.parquet(dir.getCanonicalPath)

// Structure of parquet table in filesystem:
// <base>
// +- col1=a
// +- col2=b
// |- part-00000.parquet

val partition = new File(dir, "col1=a")
val dummyData = new File(partition, "dummy")
dummyData.createNewFile()

// Structure of parquet table in filesystem is now corrupt:
// <base>
// +- col1=a
// |- dummy
// +- col2=b
// |- part-00000.parquet

val exception = intercept[SparkRuntimeException] {
spark.read.parquet(dir.toString)
}
val msg = exception.getMessage
assert(exception.getErrorClass === "CONFLICTING_PARTITION_COLUMN_NAMES")
// Partitions inside the error message can be presented in any order
assert("Partition column name list #[0-1]: col1".r.findFirstIn(msg).isDefined)
assert("Partition column name list #[0-1]: col1, col2".r.findFirstIn(msg).isDefined)
}
}
}

class ParquetV1PartitionDiscoverySuite extends ParquetPartitionDiscoverySuite {
Expand Down

0 comments on commit fd3069a

Please sign in to comment.