Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20978][SQL] Set null for malformed column when the number of tokens is less than schema in CSV read/permissive mode #18200

Closed
wants to merge 1 commit into from

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Jun 5, 2017

What changes were proposed in this pull request?

This PR proposes to fix NPE when the number of tokens are less then the schema (namely the input was parsed all but the number of tokens is not matched with the one of schema).

Currently, tokenizer.getContext.currentParsedContent can return null as the actual input was successfully parsed and at this stage it does not look started to parse the next record (and therefore it appears no current parsed content).

Before

scala> spark.read.schema("a string, b string, unparsed string").option("columnNameOfCorruptRecord", "unparsed").csv(Seq("a").toDS).show()
17/06/05 13:59:26 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.NullPointerException
	at scala.collection.immutable.StringLike$class.stripLineEnd(StringLike.scala:89)
	at scala.collection.immutable.StringOps.stripLineEnd(StringOps.scala:29)
	at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$getCurrentInput(UnivocityParser.scala:56)
	at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$convert$1.apply(UnivocityParser.scala:211)
	at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$convert$1.apply(UnivocityParser.scala:211)
	at org.apache.spark.sql.execution.datasources.FailureSafeParser$$anonfun$2.apply(FailureSafeParser.scala:50)
	at org.apache.spark.sql.execution.datasources.FailureSafeParser$$anonfun$2.apply(FailureSafeParser.scala:43)
	at org.apache.spark.sql.execution.datasources.FailureSafeParser.parse(FailureSafeParser.scala:64)
	at org.apache.spark.sql.DataFrameReader$$anonfun$11$$anonfun$apply$4.apply(DataFrameReader.scala:471)
	at org.apache.spark.sql.DataFrameReader$$anonfun$11$$anonfun$apply$4.apply(DataFrameReader.scala:471)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)

After

scala> spark.read.schema("a string, b string, unparsed string").option("columnNameOfCorruptRecord", "unparsed").csv(Seq("a").toDS).show()
+---+----+--------+
|  a|   b|unparsed|
+---+----+--------+
|  a|null|    null|
+---+----+--------+

It looks we might not need to put this in the malformed column as actually in this case the input is parsed correctly and putting in it might not useful much (whereas the number of tokens exceeds the schema, we need to know the truncated contents).

How was this patch tested?

Unit tests added in CSVSuite.

@SparkQA
Copy link

SparkQA commented Jun 5, 2017

Test build #77731 has started for PR 18200 at commit 046566c.

@HyukjinKwon
Copy link
Member Author

cc @cloud-fan and @gatorsmile, I think this needs a decision (putting it in that column or not). It looks either is better than NPE. Please let me know.

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Jun 5, 2017

Test build #77743 has finished for PR 18200 at commit 046566c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon HyukjinKwon changed the title [SPARK-20978][SQL] Set null for malformed column when the number of tokens is less than schema [SPARK-20978][SQL] Set null for malformed column when the number of tokens is less than schema in CSV read Jun 6, 2017
@HyukjinKwon HyukjinKwon changed the title [SPARK-20978][SQL] Set null for malformed column when the number of tokens is less than schema in CSV read [SPARK-20978][SQL] Set null for malformed column when the number of tokens is less than schema in CSV read/permissive mode Jun 6, 2017
@maropu
Copy link
Member

maropu commented Jun 6, 2017

Sorry for interrupting you though, is this a blocker for the v2.2 release? It seems v2.1 accepts this case:

#cat test.csv 
1,3
1

#./bin/spark-shell
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/
         
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_31)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> spark.read.schema(new StructType().add("a", IntegerType).add("b", IntegerType)).csv("/Users/maropu/Desktop/test.csv").show
+---+----+
|  a|   b|
+---+----+
|  1|   3|
|  1|null|
+---+----+

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Jun 6, 2017

@maropu, I think it is fine. It only happens when the malformed column is selected. For example,

The case below is fine

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> spark.read.schema(new StructType().add("a", IntegerType).add("b", IntegerType)).csv("test.csv").show
+---+----+
|  a|   b|
+---+----+
|  1|   3|
|  1|null|
+---+----+

but it only happens when malformed column exists in the schema:

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> spark.read.schema(new StructType().add("a", IntegerType).add("b", IntegerType).add("_corrupt_record", StringType)).csv("test.csv").show
17/06/07 05:54:55 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.NullPointerException
	at scala.collection.immutable.StringLike$class.stripLineEnd(StringLike.scala:89)
	at scala.collection.immutable.StringOps.stripLineEnd(StringOps.scala:29)
	at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$getCurrentInput(UnivocityParser.scala:56)
	at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$convert$1.apply(UnivocityParser.scala:211)
	at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$convert$1.apply(UnivocityParser.scala:211)
	at org.apache.spark.sql.execution.datasources.FailureSafeParser$$anonfun$2.apply(FailureSafeParser.scala:50)
	at org.apache.spark.sql.execution.datasources.FailureSafeParser$$anonfun$2.apply(FailureSafeParser.scala:43)
	at org.apache.spark.sql.execution.datasources.FailureSafeParser.parse(FailureSafeParser.scala:64)

I think we introduced columnNameOfCorruptRecord option in CSV in 2.2.0 - https://issues.apache.org/jira/browse/SPARK-18699. So, I guess this is not a regression (but a bug we should definitely fix).

@maropu
Copy link
Member

maropu commented Jun 6, 2017

@HyukjinKwon oh, I see. Thanks for your kindly explanation!

.schema("a string, b string, unparsed string")
.option("columnNameOfCorruptRecord", "unparsed")
.csv(Seq("a").toDS())
checkAnswer(df, Row("a", null, null))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not read the code yet, but it looks like the result is wrong? We should output the a in the corrupted column.

In addition, this scenario works if we pass the csv files that contain less column values than the schema, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for cutting in, but how to handle these longer/shorter cases is some arguable as @HyukjinKwon said in #16928 (comment). IMHO currently we just regard shorter cases as not-corrupted so as to keep existing behaviour (the previous Spark releases have regarded these case as not-corrupted, so other developers said that this behaviour change was not acceptable in earlier prs).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this PR, you can try this example.

    val corruptCVSRecords: Dataset[String] =
      spark.createDataset(spark.sparkContext.parallelize(
        """1997, "b"""" ::
          """2015""" :: Nil))(Encoders.STRING)

    val df1 = spark.read
      .schema("col1 int, col2 String, __corrupted_column_name string")
      .option("columnNameOfCorruptRecord", "__corrupted_column_name")
      .option("mode", PermissiveMode.name)
      .csv(corruptCVSRecords)
    df1.show()

The output result of Permissive mode also treats the records with less columns as corrupted.

+----+----+-----------------------+
|col1|col2|__corrupted_column_name|
+----+----+-----------------------+
|1997| "b"|                   null|
|2015|null|                   2015|
+----+----+-----------------------+

@@ -53,7 +53,8 @@ class UnivocityParser(

// Retrieve the raw record string.
private def getCurrentInput: UTF8String = {
UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd)
UTF8String.fromString(
Option(tokenizer.getContext.currentParsedContent()).map(_.stripLineEnd).orNull)
Copy link
Member Author

@HyukjinKwon HyukjinKwon Jun 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, this looks only called when PERMISSIVE mode when the malformed column is defined.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look so, is this something wrong?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Sorry, I meant just a note for a review ...)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(oh, ... sorry)

@HyukjinKwon
Copy link
Member Author

@maropu and @gatorsmile, I will ask and double check this one in Univocity parser (it looks I misunderstood). I will reopen this PR with more details soon. Thanks for your review.

@HyukjinKwon HyukjinKwon closed this Jun 8, 2017
@gatorsmile
Copy link
Member

gatorsmile commented Jun 8, 2017

tokenizer.getContext.currentParsedContent() has a bug. It returns NULL for some cases, but for the other cases, it returns the unparsed string. We might need another way to pass the original contents, instead of relying on the third-party parser.

I tried to upgrade univocity-parsers to 2.2.3. It does not help.

@gatorsmile
Copy link
Member

Let me give the example here.
Case 1:

    val corruptCVSRecords: Dataset[String] =
      spark.createDataset(spark.sparkContext.parallelize(
        """1, "b"""" ::
          """2015""" :: Nil))(Encoders.STRING)

Case 2:

    val corruptCVSRecords: Dataset[String] =
      spark.createDataset(spark.sparkContext.parallelize(
        """1, "b"""" ::
          """2""" :: Nil))(Encoders.STRING)

When passing the corruptCVSRecords of Case 1, it works well; however, if we change it to case 2, it failed.

    val df1 = spark.read
      .schema("col1 int, col2 String, __corrupted_column_name string")
      .option("columnNameOfCorruptRecord", "__corrupted_column_name")
      .option("mode", PermissiveMode.name)
      .csv(corruptCVSRecords)
    df1.show()

@HyukjinKwon
Copy link
Member Author

Thank for trying out. I also opened an issue - https://github.com/uniVocity/univocity-parsers/issues/165. I will check out those cases with it soon.

@HyukjinKwon
Copy link
Member Author

@maropu and @gatorsmile, I double-checked

--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -38,7 +38,7 @@
     <dependency>
       <groupId>com.univocity</groupId>
       <artifactId>univocity-parsers</artifactId>
-      <version>2.2.1</version>
+      <version>2.5.0-SNAPSHOT</version>
       <type>jar</type>
     </dependency>
     <dependency>

it is fixed in 2.5.0-SNAPSHOT and other tests pass fine.

val df = spark.read
  .schema("a string, b string, unparsed string")
  .option("columnNameOfCorruptRecord", "unparsed")
  .csv(Seq("a").toDS())
df.show()
+---+----+--------+
|  a|   b|unparsed|
+---+----+--------+
|  a|null|       a|
+---+----+--------+
val corruptCVSRecords: Dataset[String] =
  spark.createDataset(spark.sparkContext.parallelize(
    """1, "b"""" ::
    """2""" :: Nil))(Encoders.STRING)

val df1 = spark.read
  .schema("col1 int, col2 String, __corrupted_column_name string")
  .option("columnNameOfCorruptRecord", "__corrupted_column_name")
  .option("mode", PermissiveMode.name)
  .csv(corruptCVSRecords)
df1.show()
+----+----+-----------------------+
|col1|col2|__corrupted_column_name|
+----+----+-----------------------+
|   1| "b"|                   null|
|   2|null|                      2|
+----+----+-----------------------+

Thank you for checking this. I will double check and will propose to bump up this library after the release.

@maropu
Copy link
Member

maropu commented Jun 10, 2017

Looks great!

@gatorsmile
Copy link
Member

It might be too early to bump to 2.5.x, which might introduce the other bugs that have not been exposed.

We should still be able to resolve the issue without the fix in the parser.

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Jun 10, 2017

Yea, I do underatand the concern. I asked this in the issue for sure. I will check other other related changes before opening a PR.

@maropu
Copy link
Member

maropu commented Jun 11, 2017

Instead of upgrading the parser, as @gatorsmile suggested above, we better looking for another way to keep original texts about this case?

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Jun 11, 2017

I am not against fixing this within Spark for now. That would be a workaround anyway and I guess we want to remove out when upgrading Univocity.

If the fix is simple, I could work around for now but I am not sure if it is that simple (uniVocity/univocity-parsers@08b67c3). I guess using existing functionalities should be safe. Also, there is another (not for now) related issue fixed in https://github.com/uniVocity/univocity-parsers/issues/143.

I will ask to the author if it is safe or not again when I am about to open a PR. I guess we will have some time to check this after upgrading this (in the early state of a release, Spark 2.3.0, assuming 2.2.0 release is soon) and it should be good to find/report other bugs in Univocity if there are.

@HyukjinKwon HyukjinKwon deleted the less-token-npe branch January 2, 2018 03:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants