Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23049][SQL] spark.sql.files.ignoreCorruptFiles should work for ORC files #20240

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.orc.{OrcFile, Reader, TypeDescription}

import org.apache.spark.SparkException
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -50,23 +51,35 @@ object OrcUtils extends Logging {
paths
}

def readSchema(file: Path, conf: Configuration): Option[TypeDescription] = {
def readSchema(file: Path, conf: Configuration, ignoreCorruptFiles: Boolean)
: Option[TypeDescription] = {
val fs = file.getFileSystem(conf)
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
val reader = OrcFile.createReader(file, readerOptions)
val schema = reader.getSchema
if (schema.getFieldNames.size == 0) {
None
} else {
Some(schema)
try {
val reader = OrcFile.createReader(file, readerOptions)
val schema = reader.getSchema
if (schema.getFieldNames.size == 0) {
None
} else {
Some(schema)
}
} catch {
case e: org.apache.orc.FileFormatException =>
if (ignoreCorruptFiles) {
logWarning(s"Skipped the footer in the corrupted file: $file", e)
None
} else {
throw new SparkException(s"Could not read footer for file: $file", e)
}
}
}

def readSchema(sparkSession: SparkSession, files: Seq[FileStatus])
: Option[StructType] = {
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
val conf = sparkSession.sessionState.newHadoopConf()
// TODO: We need to support merge schema. Please see SPARK-11412.
files.map(_.getPath).flatMap(readSchema(_, conf)).headOption.map { schema =>
files.map(_.getPath).flatMap(readSchema(_, conf, ignoreCorruptFiles)).headOption.map { schema =>
logDebug(s"Reading schema from file $files, got Hive schema string: $schema")
CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.orc.OrcConf.COMPRESS
import org.apache.orc.mapred.OrcStruct
import org.apache.orc.mapreduce.OrcInputFormat

import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, RecordReaderIterator}
Expand Down Expand Up @@ -531,6 +532,52 @@ abstract class OrcQueryTest extends OrcTest {
val df = spark.read.orc(path1.getCanonicalPath, path2.getCanonicalPath)
assert(df.count() == 20)
}

test("Enabling/disabling ignoreCorruptFiles") {
def testIgnoreCorruptFiles(): Unit = {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
spark.range(1).toDF("a").write.orc(new Path(basePath, "first").toString)
spark.range(1, 2).toDF("a").write.orc(new Path(basePath, "second").toString)
spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
val df = spark.read.orc(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString,
new Path(basePath, "third").toString)
checkAnswer(df, Seq(Row(0), Row(1)))
}
}

def testIgnoreCorruptFilesWithoutSchemaInfer(): Unit = {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
spark.range(1).toDF("a").write.orc(new Path(basePath, "first").toString)
spark.range(1, 2).toDF("a").write.orc(new Path(basePath, "second").toString)
spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
val df = spark.read.schema("a long").orc(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString,
new Path(basePath, "third").toString)
checkAnswer(df, Seq(Row(0), Row(1)))
}
}

withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
testIgnoreCorruptFiles()
testIgnoreCorruptFilesWithoutSchemaInfer()
}

withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
val m1 = intercept[SparkException] {
testIgnoreCorruptFiles()
}.getMessage
assert(m1.contains("Could not read footer for file"))
val m2 = intercept[SparkException] {
testIgnoreCorruptFilesWithoutSchemaInfer()
}.getMessage
assert(m2.contains("Malformed ORC file"))
}
}
}

class OrcQuerySuite extends OrcQueryTest with SharedSQLContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,21 +320,38 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
new Path(basePath, "first").toString,
new Path(basePath, "second").toString,
new Path(basePath, "third").toString)
checkAnswer(
df,
Seq(Row(0), Row(1)))
checkAnswer(df, Seq(Row(0), Row(1)))
}
}

def testIgnoreCorruptFilesWithoutSchemaInfer(): Unit = {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString)
spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString)
spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
val df = spark.read.schema("a long").parquet(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString,
new Path(basePath, "third").toString)
checkAnswer(df, Seq(Row(0), Row(1)))
}
}

withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
testIgnoreCorruptFiles()
testIgnoreCorruptFilesWithoutSchemaInfer()
}

withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
val exception = intercept[SparkException] {
testIgnoreCorruptFiles()
}
assert(exception.getMessage().contains("is not a Parquet file"))
val exception2 = intercept[SparkException] {
testIgnoreCorruptFilesWithoutSchemaInfer()
}
assert(exception2.getMessage().contains("is not a Parquet file"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
OrcFileOperator.readSchema(
files.map(_.getPath.toString),
Some(sparkSession.sessionState.newHadoopConf())
Some(sparkSession.sessionState.newHadoopConf()),
ignoreCorruptFiles
)
}

Expand Down Expand Up @@ -129,6 +131,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable

val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles

(file: PartitionedFile) => {
val conf = broadcastedHadoopConf.value.value
Expand All @@ -138,7 +141,8 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
// SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this
// case, `OrcFileOperator.readSchema` returns `None`, and we can't read the underlying file
// using the given physical schema. Instead, we simply return an empty iterator.
val isEmptyFile = OrcFileOperator.readSchema(Seq(filePath.toString), Some(conf)).isEmpty
val isEmptyFile =
OrcFileOperator.readSchema(Seq(filePath.toString), Some(conf), ignoreCorruptFiles).isEmpty
if (isEmptyFile) {
Iterator.empty
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

package org.apache.spark.sql.hive.orc

import java.io.IOException

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.io.orc.{OrcFile, Reader}
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector

import org.apache.spark.SparkException
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
Expand All @@ -46,7 +49,10 @@ private[hive] object OrcFileOperator extends Logging {
* create the result reader from that file. If no such file is found, it returns `None`.
* @todo Needs to consider all files when schema evolution is taken into account.
*/
def getFileReader(basePath: String, config: Option[Configuration] = None): Option[Reader] = {
def getFileReader(basePath: String,
config: Option[Configuration] = None,
ignoreCorruptFiles: Boolean = false)
: Option[Reader] = {
def isWithNonEmptySchema(path: Path, reader: Reader): Boolean = {
reader.getObjectInspector match {
case oi: StructObjectInspector if oi.getAllStructFieldRefs.size() == 0 =>
Expand All @@ -65,16 +71,28 @@ private[hive] object OrcFileOperator extends Logging {
}

listOrcFiles(basePath, conf).iterator.map { path =>
path -> OrcFile.createReader(fs, path)
val reader = try {
Some(OrcFile.createReader(fs, path))
} catch {
case e: IOException =>
if (ignoreCorruptFiles) {
logWarning(s"Skipped the footer in the corrupted file: $path", e)
None
} else {
throw new SparkException(s"Could not read footer for file: $path", e)
}
}
path -> reader
}.collectFirst {
case (path, reader) if isWithNonEmptySchema(path, reader) => reader
case (path, Some(reader)) if isWithNonEmptySchema(path, reader) => reader
}
}

def readSchema(paths: Seq[String], conf: Option[Configuration]): Option[StructType] = {
def readSchema(paths: Seq[String], conf: Option[Configuration], ignoreCorruptFiles: Boolean)
: Option[StructType] = {
// Take the first file where we can open a valid reader if we can find one. Otherwise just
// return None to indicate we can't infer the schema.
paths.flatMap(getFileReader(_, conf)).headOption.map { reader =>
paths.flatMap(getFileReader(_, conf, ignoreCorruptFiles)).headOption.map { reader =>
val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
val schema = readerInspector.getTypeName
logDebug(s"Reading schema from file $paths, got Hive schema string: $schema")
Expand Down