-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18699][SQL] Put malformed tokens into a new field when parsing CSV data #16928
Conversation
Test build #72877 has finished for PR 16928 at commit
|
@HyukjinKwon Could you check this and give me any insight before committers do? |
@maropu, I just ran some similar tests with JSON datasource. What do you think about matching it to JSON's behaviour by introducing I ran with the data and schema as below: Seq("""{"a": "a", "b" : 1}""").toDF().write.text("/tmp/path")
val schema = StructType(StructField("a", IntegerType, true) :: StructField("b", StringType, true) :: StructField("_corrupt_record", StringType, true) :: Nil)
scala> spark.read.schema(schema).option("mode", "FAILFAST").json("/tmp/path").show()
org.apache.spark.sql.catalyst.json.SparkSQLJsonProcessingException: Malformed line in FAILFAST mode: {"a": "a", "b" : 1}
scala> spark.read.schema(schema).option("mode", "DROPMALFORMED").json("/tmp/path").show()
+---+---+---------------+
| a| b|_corrupt_record|
+---+---+---------------+
+---+---+---------------+
scala> spark.read.schema(schema).option("mode", "PERMISSIVE").json("/tmp/path").show()
+----+----+-------------------+
| a| b| _corrupt_record|
+----+----+-------------------+
|null|null|{"a": "a", "b" : 1}|
+----+----+-------------------+ |
Aha, looks good to me. Just a sec, and I'll modify the code. |
5486f5d
to
f09a899
Compare
cc @cloud-fan, could I ask if you think matching it seems reasonable? I wrote some details below for you to track this issue easily. Currently, JSON produces Seq("""{"a": "a", "b" : 1}""").toDF().write.text("/tmp/path")
val schema = StructType(
StructField("a", IntegerType, true) ::
StructField("b", StringType, true) ::
StructField("_corrupt_record", StringType, true) :: Nil)
spark.read.schema(schema)
.option("mode", "PERMISSIVE")
.json("/tmp/path").show() +----+----+-------------------+
| a| b| _corrupt_record|
+----+----+-------------------+
|null|null|{"a": "a", "b" : 1}|
+----+----+-------------------+ whereas CSV produces Seq("""a,1""").toDF().write.text("/tmp/path")
val schema = StructType(
StructField("a", IntegerType, true) ::
StructField("b", StringType, true) :: Nil)
spark.read.schema(schema)
.option("mode", "PERMISSIVE")
.csv("/tmp/path").show() java.lang.NumberFormatException: For input string: "a"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580) To cut it short, the problem is, parsing itself is fine but when a value is unable to convert. In case of JSON, it fills the value in the column specified in It seems there are two ways to fix this. One is what @maropu initially suggested - permissively fills it as The other way I suggested here - matching it to JSON's one, storing the value in the column specified in JSON schema inference produces this column when the JSON is malformed as below: Seq("""{"a": "a", "b" :""").toDF().write.text("/tmp/test123")
spark.read.json("/tmp/test123").printSchema
root
|-- _corrupt_record: string (nullable = true) |
Let me try to review further at my best once it is decided. |
I also keep considering other ways to fix this... |
Test build #72971 has finished for PR 16928 at commit
|
Test build #72969 has finished for PR 16928 at commit
|
f09a899
to
df39e39
Compare
Test build #72991 has finished for PR 16928 at commit
|
Jenkins, retest this please. |
Test build #72993 has finished for PR 16928 at commit
|
definitely we should match the behavior of json |
Thanks, let me review further within tomorrow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu, I left my opinion on the codes. I hope they make sense.
@@ -101,6 +101,12 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { | |||
val broadcastedHadoopConf = | |||
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) | |||
|
|||
val columnNameOfCorruptRecord = csvOptions.columnNameOfCorruptRecord.getOrElse(sparkSession | |||
.sessionState.conf.columnNameOfCorruptRecord) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, we could put columnNameOfCorruptRecord
as an argument in CSVOptions
consistently with JSONOptions
. I guess then it won't require the changes between 129L - 133L as well.
@@ -222,12 +250,6 @@ private[csv] class UnivocityParser( | |||
logWarning("Parse exception. " + | |||
s"Dropping malformed line: ${tokens.mkString(options.delimiter.toString)}") | |||
} | |||
if (numMalformedRecords == options.maxMalformedLogPerPartition - 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, I missed something. Do you mind if I ask elaborate why it is removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, my bad. I'll revert this part.
row(i) = value | ||
} | ||
} catch { | ||
case _: NumberFormatException | _: IllegalArgumentException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we maybe decouple this parsing mode handling logics into convertWithParseMode
?
|
||
private val parser = new CsvParser(options.asParserSettings) | ||
|
||
private var numMalformedRecords = 0 | ||
|
||
private val row = new GenericInternalRow(requiredSchema.length) | ||
|
||
private val indexArr: Array[Int] = { | ||
private val shouldHandleCorruptRecord = columnNameOfCorruptRecord.isDefined | ||
private val corruptIndex = columnNameOfCorruptRecord.flatMap { fn => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a trivial but maybe
private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord)
if possible.
fields.map(schema.indexOf(_: StructField)).toArray | ||
fields.zipWithIndex.filter { case (_, i) => i != corruptIndex }.map { case (f, i) => | ||
(inputSchema.indexOf(f), i) | ||
}.toArray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me checkout this PR and check if this bit can be cleaner soon but it might be even nicer if you could double check please.
@@ -26,7 +26,7 @@ import org.apache.commons.lang3.time.FastDateFormat | |||
import org.apache.spark.internal.Logging | |||
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs, ParseModes} | |||
|
|||
private[csv] class CSVOptions( | |||
private[sql] class CSVOptions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
execution
package itself is meant to be a private according to 511f52f . We could just remove this if this should be accessible other than csv
package.
@HyukjinKwon okay, thanks! I'll check soon |
df39e39
to
873a383
Compare
Test build #73109 has finished for PR 16928 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu, I left some more comments that might be helpful. Let me take a look further in following few days.
val parsedOptions = new CSVOptions( | ||
options, | ||
sparkSession.sessionState.conf.sessionLocalTimeZone, | ||
sparkSession.sessionState.conf.columnNameOfCorruptRecord) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(It seems CSVOptions
is created twice above :)).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@@ -95,6 +104,9 @@ private[csv] class CSVOptions( | |||
val dropMalformed = ParseModes.isDropMalformedMode(parseMode) | |||
val permissive = ParseModes.isPermissiveMode(parseMode) | |||
|
|||
val columnNameOfCorruptRecord = | |||
parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, we should add this in readwriter.py
too and document this in readwriter.py
, DataFrameReader
and DataStreamReader
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added doc descriptions in readwriter.py
, DataFrameReader
, and DataStreamReader
.
|
||
private val parser = new CsvParser(options.asParserSettings) | ||
|
||
private var numMalformedRecords = 0 | ||
|
||
private val row = new GenericInternalRow(requiredSchema.length) | ||
|
||
private val indexArr: Array[Int] = { | ||
private val corruptIndex = | ||
requiredSchema.getFieldIndex(options.columnNameOfCorruptRecord).getOrElse(-1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we make this an Option
and use it?
private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord)
corruptFieldIndex.foreach { idx =>
require(schema(idx).dataType == StringType, "blabla")
require(schema(idx).nullable, "blabla")
}
// (e.g., SparkException about unsupported types). | ||
case _: NumberFormatException | _: IllegalArgumentException | ||
if options.permissive && shouldHandleCorruptRecord => | ||
row(corruptIndex) = UTF8String.fromString(tokens.mkString(options.delimiter.toString)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, so it had to be manually concatenated. I think it is okay to rename convertWithParseMode
to parseWithMode
and make it take intput: String
as an argument. I worry if the original input
is different with what is stored in the corrupt column.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then,.. we would be able to do something as below:
case NonFatal(e) if options.permissive =>
val row = new GenericInternalRow(requiredSchema.length)
corruptFieldIndex.foreach { idx =>
row(idx) = UTF8String.fromString(input)
}
I think catching all NonFatal
seems consistent but could you check if the behaviour is consistent with JSON for unsupported types when PERMISSIVE
mode?
If we should throw an exception, we could utilize CSVUtils.verifySchema
to throw an exception ahead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aha, it seems reasonable to me and I'll re-check again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added CSVUtils.verifySchema
in the head of buildReader
, then modified the code above to handle all the NonFatal
exceptions with a permissive mode in parseWithMode
.
var i: Int = 0 | ||
while (i < indexArr.length) { | ||
val pos = indexArr(i) | ||
indexArr.foreach { case (pos, i) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Oh, BTW, we should just use while
in the critical path if we are not pretty sure whether the byte codes are virtually the same or more efficient. This is also written as Use while loops instead of for loops or functional transformations (e.g. map, foreach)
in scala-style-guide#traversal-and-zipwithindex)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(off-topic: oh, I found the style-guide has great Chinese and Korean versions!)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
I'll update in a day, thanks! |
Test build #73166 has finished for PR 16928 at commit
|
bf286c7
to
4df4bc6
Compare
Test build #73167 has finished for PR 16928 at commit
|
…` field violates requirements in json formats ## What changes were proposed in this pull request? This pr comes from #16928 and fixed a json behaviour along with the CSV one. ## How was this patch tested? Added tests in `JsonSuite`. Author: Takeshi Yamamuro <[email protected]> Closes #17023 from maropu/SPARK-19695.
python/pyspark/sql/readwriter.py
Outdated
@@ -367,10 +368,18 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non | |||
If None is set, it uses the default value, session local timezone. | |||
|
|||
* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record. | |||
When a schema is set by user, it sets ``null`` for extra fields. | |||
If users set a string type field named ``columnNameOfCorruptRecord`` in a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just copy-paste the doc from json
? or you can make some changes but please make them consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay
require(schema(corrFieldIndex).nullable) | ||
} | ||
|
||
private val inputSchema = StructType(schema.filter(_.name != options.columnNameOfCorruptRecord)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
json doesn't do this, why is this difference? cc @HyukjinKwon
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is because parsing CSV is dependent on the order/length of schema and tokens. In case of JSON, this can be just mapped by its key but for CSV it depends on the order/length of schema.
So, it seems this filters the corrupt field out in order to match the data schema with parsed tokens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(For example, this code path is dependent on the length of the data schema. It drops/adds tokens after comparing the length between data schema and tokens. If we keep the corrupt column, then the schema length would be different with the tokens.)
} else { | ||
tokens | ||
} | ||
|
||
try { | ||
Some(convert(checkedTokens)) | ||
} catch { | ||
case NonFatal(e) if options.permissive => | ||
val row = new GenericInternalRow(requiredSchema.length) | ||
corruptFieldIndex.map(row(_) = UTF8String.fromString(input)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
map
-> foreach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, I would like to note that this change is actually an important point that we should avoid. This could cause some bugs that are hard to figure out. (see SPARK-16694)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aha, fixed. Thanks!
val checkedTokens = if (options.permissive && inputSchema.length > tokens.length) { | ||
tokens ++ new Array[String](inputSchema.length - tokens.length) | ||
} else if (options.permissive && inputSchema.length < tokens.length) { | ||
tokens.take(inputSchema.length) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not related to this PR, but can you add some comments for this code block? It's kind if hard to follow the logic here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, I'll update soon
Test build #73322 has finished for PR 16928 at commit
|
Test build #73328 has started for PR 16928 at commit |
Test build #73331 has started for PR 16928 at commit |
tokens | ||
} | ||
|
||
// If we need to handle corrupt fields, it adds an extra token to skip a field for malformed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon This fix satisfies your intention? I slightly modified code based on your code.
python/pyspark/sql/readwriter.py
Outdated
``null`` for extra fields. | ||
``columnNameOfCorruptRecord``. An user-defined schema can include \ | ||
a string type field named ``columnNameOfCorruptRecord`` for corrupt records. \ | ||
When a schema is set by user, it sets ``null`` for extra fields. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about the other 2 modes? do they also set null for extra fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. The two mode does not set null. In failFast
mode, it fails a job. In dropMalformed
mode, it drops the malformed lines whose length is shorter or longer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does json have similar behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah..., a bit different I think. As @HyukjinKwon said above(#16928 (comment)), CSV formats depend on a length of parsed tokens (if the length shorter, fills null
, and if longer, drops them in permissive mode). One the other hand, in JSON formats, fields in a required schema are mapped by key
. In case of missing keys in JSON formats, it just sets null
in these fields with the keys in all the three mode. cc: @HyukjinKwon
e.x.)
import org.apache.spark.sql.types._
scala> Seq("""{"a": "a", "b" : 1}""", """{"a": "a"}""").toDF().write.text("/Users/maropu/Desktop/data")
scala> val dataSchema = StructType(StructField("a", StringType, true) :: StructField("b", IntegerType, true) :: Nil)
scala> spark.read.schema(dataSchema).option("mode", "PERMISSIVE").json("/Users/maropu/Desktop/data").show()
+---+----+
| a| b|
+---+----+
| a| 1|
| a|null|
+---+----+
scala> spark.read.schema(dataSchema).option("mode", "FAILFAST").json("/Users/maropu/Desktop/data").show()
+---+----+
| a| b|
+---+----+
| a| 1|
| a|null|
+---+----+
scala> spark.read.schema(dataSchema).option("mode", "DROPMALFORMED").json("/Users/maropu/Desktop/data").show()
+---+----+
| a| b|
+---+----+
| a| 1|
| a|null|
+---+----+
val checkedTokens = if (options.permissive) { | ||
// If a length of parsed tokens is not equal to expected one, it makes the length the same | ||
// with the expected. If the length is shorter, it adds extra tokens in the tail. | ||
// If longer, it drops extra tokens. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should revisit this in the future. If the token length doesn't match the expected schema, we should treat it as a malformed record. cc @HyukjinKwon
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also put that malformed record (shorter or longer) into a corrupt field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I agree in a way but I guess "it is pretty common that CSV is malformed in this way" (said by the analysis team in my company). Could we leave it as is for now here?
Let me try to raise a different JIRA after checking R's read.csv
or other libraries.
Jenkins, retest this please. |
// strings when loading parsed tokens into a resulting `row`. | ||
corruptFieldIndex.map { corrFieldIndex => | ||
val (front, back) = lengthSafeTokens.splitAt(corrFieldIndex) | ||
front ++ new Array[String](1) ++ back |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will introduce a lot of extra object allocation, I think the previous version is better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have two options; 1) we just revert this part, or (2) modify this part to avoid the allocation based on this code. cc: @HyukjinKwon
e.x.)
This is just an example and it seems to be a little hard to understand.
val parsedTokens = new Array[String](schema.length)
...
// If we need to handle corrupt fields, it adds an extra token to skip a field for malformed
// strings when loading parsed tokens into a resulting `row`.
corruptFieldIndex.map { corrFieldIndex =>
lengthSafeTokens.splitAt(corrFieldIndex) match { case (front, back) =>
front.zipWithIndex.foreach { case (s, i) =>
parsedTokens(i) = s
}
back.zipWithIndex.foreach { case (s, i) =>
parsedTokens(schema.length - back.length + i) = s
}
}
parsedTokens
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We value on the committer's opinion. I am fine if we revert. I personally prefer 1) revert this change then if this sounds not good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably, it'd be better to leave comments here as TODO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
python/pyspark/sql/readwriter.py
Outdated
@@ -193,8 +193,9 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, | |||
|
|||
* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ | |||
record and puts the malformed string into a new field configured by \ | |||
``columnNameOfCorruptRecord``. When a schema is set by user, it sets \ | |||
``null`` for extra fields. | |||
``columnNameOfCorruptRecord``. An user-defined schema can include \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please rephrase this document a little bit, to make it more clear
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, I'll brush up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated and could you check this again? Thanks!
require(schema(corrFieldIndex).nullable) | ||
} | ||
|
||
private val dataSchema = StructType(schema.filter(_.name != options.columnNameOfCorruptRecord)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realised now we only use the length of dataSchema
now. Could we just use the length if more commits should be pushed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I'll update
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reverted some parts of code and then dataSchema
is used except for the length https://github.com/apache/spark/pull/16928/files#diff-d19881aceddcaa5c60620fdcda99b4c4R57. So, I kept this variable as it is.
Test build #73335 has finished for PR 16928 at commit
|
Test build #73348 has finished for PR 16928 at commit
|
Test build #73353 has finished for PR 16928 at commit
|
thanks, merging to master! |
…` field violates requirements in json formats ## What changes were proposed in this pull request? This pr comes from apache#16928 and fixed a json behaviour along with the CSV one. ## How was this patch tested? Added tests in `JsonSuite`. Author: Takeshi Yamamuro <[email protected]> Closes apache#17023 from maropu/SPARK-19695.
… CSV data ## What changes were proposed in this pull request? This pr added a logic to put malformed tokens into a new field when parsing CSV data in case of permissive modes. In the current master, if the CSV parser hits these malformed ones, it throws an exception below (and then a job fails); ``` Caused by: java.lang.IllegalArgumentException at java.sql.Date.valueOf(Date.java:143) at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:137) at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply$mcJ$sp(CSVInferSchema.scala:272) at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply(CSVInferSchema.scala:272) at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply(CSVInferSchema.scala:272) at scala.util.Try.getOrElse(Try.scala:79) at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:269) at ``` In case that users load large CSV-formatted data, the job failure makes users get some confused. So, this fix set NULL for original columns and put malformed tokens in a new field. ## How was this patch tested? Added tests in `CSVSuite`. Author: Takeshi Yamamuro <[email protected]> Closes apache#16928 from maropu/SPARK-18699-2.
a string type field named ``columnNameOfCorruptRecord`` in an user-defined \ | ||
schema. If a schema does not have the field, it drops corrupt records during \ | ||
parsing. When inferring a schema, it implicitly adds a \ | ||
``columnNameOfCorruptRecord`` field in an output schema. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu For JSON, we implicitly add the columnNameOfCorruptRecord
field during schema inference, when the mode is PERMISSIVE
. What is the reason we are not doing the same thing for CSV schema inference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Sorry for interrupting) yea, it should be consistent and we probably should change. Probably, we should also consider the records with tokens less or more than the schema as malformed records in PERMISSIVE mode rafher than filling some of it. @cloud-fan raised this issue before and I had a talk with some data analysists. It looked some agree and others do not. So, I just decided to not change the current behaviour for now.
To cut it short, the reason (I assume) is I could not imagine a simple common case that fails to parse CSV (not during conversion) for the current implementation. If there are, we should match the behaviour.
I am currently outside and this is my phone. I will double check this when I get to my computer but this will be correct if I haven't missed some changes in this code path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In CSV, the records with tokens less or more than the schema are already viewed as malformed records in (at least) 2.2. I did not check the previous versions.
I think we need to implicitly add the column columnNameOfCorruptRecord
during the schema inference too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It has more than one issue here. The default of columnNameOfCorruptRecord
does not respect the session conf spark.sql.columnNameOfCorruptRecord
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will submit a PR soon for fixing both issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, users have to manually add the column columnNameOfCorruptRecord
for seeing these malformed records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile, I just got to my laptop.
I checked when the length of tokens are more than the schema it fills the malformed column. with the data below:
a,a
(BTW, it looks respecting spark.sql.columnNameOfCorruptRecord
?)
scala> spark.read.schema("a string, _corrupt_record string").csv("test.csv").show()
+---+---------------+
| a|_corrupt_record|
+---+---------------+
| a| a,a|
+---+---------------+
scala> spark.conf.set("spark.sql.columnNameOfCorruptRecord", "abc")
scala> spark.read.schema("a string, abc string").csv("test.csv").show()
+---+---+
| a|abc|
+---+---+
| a|a,a|
+---+---+
And, I found another bug (when the length is less then the schema):
with data
a
a
a
a
a
scala> spark.read.schema("a string, b string, _corrupt_record string").csv("test.csv").show()
prints ...
17/06/05 09:45:26 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5)
java.lang.NullPointerException
at scala.collection.immutable.StringLike$class.stripLineEnd(StringLike.scala:89)
at scala.collection.immutable.StringOps.stripLineEnd(StringOps.scala:29)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$getCurrentInput(UnivocityParser.scala:56)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$convert$1.apply(UnivocityParser.scala:211)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$convert$1.apply(UnivocityParser.scala:211)
at org.apache.spark.sql.execution.datasources.FailureSafeParser$$anonfun$2.apply(FailureSafeParser.scala:50)
at org.apache.spark.sql.execution.datasources.FailureSafeParser$$anonfun$2.apply(FailureSafeParser.scala:43)
at org.apache.spark.sql.execution.datasources.FailureSafeParser.parse(FailureSafeParser.scala:64)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$parseIterator$1.apply(UnivocityParser.scala:312)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$parseIterator$1.apply(UnivocityParser.scala:312)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:236)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:230)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
It looks getCurrentInput
produces null
as the input is all parsed.
Another thing I would like to leave is (just to note the difference for all of us to not forget), JSON produces null
in the columns and put the contents in the malformed column:
With the input:
{"a": 1, "b": "a"}
scala> spark.read.json("test.json").show()
+---+---+
| a| b|
+---+---+
| 1| a|
+---+---+
scala> spark.read.schema("a string, b int, _corrupt_record string").json("test.json").show()
+----+----+------------------+
| a| b| _corrupt_record|
+----+----+------------------+
|null|null|{"a": 1, "b": "a"}|
+----+----+------------------+
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh.. I was writing the comments before seeing your comments ... Yes, I agree with your comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me give a shot to fix the bug I found above (NullPointerException
). I think this can be easily fixed (but I am pretty sure the behaviour could be arguable). I will open a PR and cc you to show what it looks like.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for my late response. yea, I also think these behaviour should be the same. But, I tried though in this pr though, I couldn't because (both you already noticed this...) we couldn't easily add a new column in the CSV code path. So, I think we probably need some refactoring DataSource
to make this behaviour consistent.
What changes were proposed in this pull request?
This pr added a logic to put malformed tokens into a new field when parsing CSV data in case of permissive modes. In the current master, if the CSV parser hits these malformed ones, it throws an exception below (and then a job fails);
In case that users load large CSV-formatted data, the job failure makes users get some confused. So, this fix set NULL for original columns and put malformed tokens in a new field.
How was this patch tested?
Added tests in
CSVSuite
.