Skip to content

Commit

Permalink
Apply review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
maropu committed Feb 23, 2017
1 parent 80c3775 commit c86febe
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 8 deletions.
2 changes: 1 addition & 1 deletion python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
If None is set, it uses the default value, session local timezone.
* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
If users set a string-type field named ``columnNameOfCorruptRecord`` in a
If users set a string type field named ``columnNameOfCorruptRecord`` in a
user-specified ``schema``, it puts the malformed string into the field. When
a ``schema`` is set by user, it sets ``null`` for extra fields.
* ``DROPMALFORMED`` : ignores the whole corrupted records.
Expand Down
16 changes: 13 additions & 3 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None):
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None,
columnNameOfCorruptRecord=None):
"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
Expand Down Expand Up @@ -619,10 +620,18 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
If None is set, it uses the default value, session local timezone.
* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
When a schema is set by user, it sets ``null`` for extra fields.
If users set a string type field named ``columnNameOfCorruptRecord`` in a
user-specified ``schema``, it puts the malformed string into the field. When
a ``schema`` is set by user, it sets ``null`` for extra fields.
* ``DROPMALFORMED`` : ignores the whole corrupted records.
* ``FAILFAST`` : throws an exception when it meets corrupted records.
:param columnNameOfCorruptRecord: defines a field name for malformed strings created
by ``PERMISSIVE`` mode. If a user-specified `schema`
has this named field, Spark puts malformed strings
in this field. This overrides
`spark.sql.columnNameOfCorruptRecord`.
>>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema)
>>> csv_sdf.isStreaming
True
Expand All @@ -636,7 +645,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
maxCharsPerColumn=maxCharsPerColumn,
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone)
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone,
columnNameOfCorruptRecord=columnNameOfCorruptRecord)
if isinstance(path, basestring):
return self._df(self._jreader.csv(path))
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* during parsing.
* <ul>
* <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record. If users
* set a string-type field named `columnNameOfCorruptRecord` in a user-specified `schema`,
* set a string type field named `columnNameOfCorruptRecord` in a user-specified `schema`,
* it puts the malformed string into the field. When a `schema` is set by user, it sets `null`
* for extra fields.</li>
* <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ private[csv] class UnivocityParser(
}

private def convertWithParseMode(
input: String)(convert: Array[String] => InternalRow): Option[InternalRow] = {
input: String)(convert: Array[String] => InternalRow): Option[InternalRow] = {
val tokens = parser.parseLine(input)
if (options.dropMalformed && inputSchema.length != tokens.length) {
if (numMalformedRecords < options.maxMalformedLogPerPartition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* during parsing.
* <ul>
* <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record. If users
* set a string-type field named `columnNameOfCorruptRecord` in a user-specified `schema`,
* set a string type field named `columnNameOfCorruptRecord` in a user-specified `schema`,
* it puts the malformed string into the field. When a `schema` is set by user, it sets `null`
* for extra fields.</li>
* <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,6 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
Nil)

// We put a `columnNameOfCorruptRecord` field in the middle of a schema
new StructType
val schemaWithCorrField2 = new StructType()
.add("a", IntegerType)
.add(columnNameOfCorruptRecord, StringType)
Expand Down

0 comments on commit c86febe

Please sign in to comment.