Skip to content

Commit

Permalink
Revert clean-up code for a CSV parser
Browse files Browse the repository at this point in the history
  • Loading branch information
maropu committed Feb 23, 2017
1 parent 512fb42 commit 3d514e5
Showing 1 changed file with 28 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,35 @@ private[csv] class UnivocityParser(
private val dataSchema = StructType(schema.filter(_.name != options.columnNameOfCorruptRecord))

private val valueConverters =
schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
dataSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray

private val parser = new CsvParser(options.asParserSettings)

private var numMalformedRecords = 0

private val row = new GenericInternalRow(requiredSchema.length)

private val indexArr: Array[Int] = {
// This parser loads an `indexArr._1`-th position value in input tokens,
// then put the value in `row(indexArr._2)`.
private val indexArr: Array[(Int, Int)] = {
val fields = if (options.dropMalformed) {
// If `dropMalformed` is enabled, then it needs to parse all the values
// so that we can decide which row is malformed.
requiredSchema ++ schema.filterNot(requiredSchema.contains(_))
} else {
requiredSchema
}
fields.map(schema.indexOf(_: StructField)).toArray
// TODO: Revisit this; we need to clean up code here for readability.
// See an URL below for related discussions:
// https://github.com/apache/spark/pull/16928#discussion_r102636720
val fieldsWithIndexes = fields.zipWithIndex
corruptFieldIndex.map { case corrFieldIndex =>
fieldsWithIndexes.filter { case (_, i) => i != corrFieldIndex }
}.getOrElse {
fieldsWithIndexes
}.map { case (f, i) =>
(dataSchema.indexOf(f), i)
}.toArray
}

/**
Expand Down Expand Up @@ -184,13 +196,13 @@ private[csv] class UnivocityParser(
convertWithParseMode(input) { tokens =>
var i: Int = 0
while (i < indexArr.length) {
val pos = indexArr(i)
val (pos, rowIdx) = indexArr(i)
// It anyway needs to try to parse since it decides if this row is malformed
// or not after trying to cast in `DROPMALFORMED` mode even if the casted
// value is not stored in the row.
val value = valueConverters(pos).apply(tokens(pos))
if (i < requiredSchema.length) {
row(i) = value
row(rowIdx) = value
}
i += 1
}
Expand All @@ -216,25 +228,19 @@ private[csv] class UnivocityParser(
throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
s"${tokens.mkString(options.delimiter.toString)}")
} else {
val checkedTokens = if (options.permissive) {
// If a length of parsed tokens is not equal to expected one, it makes the length the same
// with the expected. If the length is shorter, it adds extra tokens in the tail.
// If longer, it drops extra tokens.
val lengthSafeTokens = if (dataSchema.length > tokens.length) {
// If a length of parsed tokens is not equal to expected one, it makes the length the same
// with the expected. If the length is shorter, it adds extra tokens in the tail.
// If longer, it drops extra tokens.
//
// TODO: Revisit this; if a length of tokens does not match an expected length in the schema,
// we probably need to treat it as a malformed record.
// See an URL below for related discussions:
// https://github.com/apache/spark/pull/16928#discussion_r102657214
val checkedTokens = if (options.permissive && dataSchema.length != tokens.length) {
if (dataSchema.length > tokens.length) {
tokens ++ new Array[String](dataSchema.length - tokens.length)
} else if (dataSchema.length < tokens.length) {
tokens.take(dataSchema.length)
} else {
tokens
}

// If we need to handle corrupt fields, it adds an extra token to skip a field for malformed
// strings when loading parsed tokens into a resulting `row`.
corruptFieldIndex.map { corrFieldIndex =>
val (front, back) = lengthSafeTokens.splitAt(corrFieldIndex)
front ++ new Array[String](1) ++ back
}.getOrElse {
lengthSafeTokens
tokens.take(dataSchema.length)
}
} else {
tokens
Expand Down

0 comments on commit 3d514e5

Please sign in to comment.