Skip to content

Commit

Permalink
[SPARK-23049][SQL] spark.sql.files.ignoreCorruptFiles should work f…
Browse files Browse the repository at this point in the history
…or ORC files
  • Loading branch information
dongjoon-hyun committed Jan 11, 2018
1 parent 186bf8f commit 96a2196
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,35 @@ object OrcUtils extends Logging {
paths
}

def readSchema(file: Path, conf: Configuration): Option[TypeDescription] = {
def readSchema(file: Path, conf: Configuration, ignoreCorruptFiles: Boolean)
: Option[TypeDescription] = {
val fs = file.getFileSystem(conf)
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
val reader = OrcFile.createReader(file, readerOptions)
val schema = reader.getSchema
if (schema.getFieldNames.size == 0) {
None
} else {
Some(schema)
try {
val reader = OrcFile.createReader(file, readerOptions)
val schema = reader.getSchema
if (schema.getFieldNames.size == 0) {
None
} else {
Some(schema)
}
} catch {
case e: org.apache.orc.FileFormatException =>
if (true) {
logWarning(s"Skipped the footer in the corrupted file: $file", e)
None
} else {
throw new java.io.IOException(s"Could not read footer for file: $file", e)
}
}
}

def readSchema(sparkSession: SparkSession, files: Seq[FileStatus])
: Option[StructType] = {
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
val conf = sparkSession.sessionState.newHadoopConf()
// TODO: We need to support merge schema. Please see SPARK-11412.
files.map(_.getPath).flatMap(readSchema(_, conf)).headOption.map { schema =>
files.map(_.getPath).flatMap(readSchema(_, conf, ignoreCorruptFiles)).headOption.map { schema =>
logDebug(s"Reading schema from file $files, got Hive schema string: $schema")
CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.orc.OrcConf.COMPRESS
import org.apache.orc.mapred.OrcStruct
import org.apache.orc.mapreduce.OrcInputFormat

import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, RecordReaderIterator}
Expand Down Expand Up @@ -608,4 +609,33 @@ class OrcQuerySuite extends OrcQueryTest with SharedSQLContext {
}
}
}

test("Enabling/disabling ignoreCorruptFiles") {
def testIgnoreCorruptFiles(): Unit = {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
spark.range(1).toDF("a").write.orc(new Path(basePath, "first").toString)
spark.range(1, 2).toDF("a").write.orc(new Path(basePath, "second").toString)
spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
val df = spark.read.orc(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString,
new Path(basePath, "third").toString)
checkAnswer(
df,
Seq(Row(0), Row(1)))
}
}

withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
testIgnoreCorruptFiles()
}

withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
val exception = intercept[SparkException] {
testIgnoreCorruptFiles()
}
assert(exception.getMessage().contains("Malformed ORC file"))
}
}
}

0 comments on commit 96a2196

Please sign in to comment.