Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22566][PYTHON] Better error message for _merge_type in Pandas to Spark DF conversion #19792

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions python/pyspark/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,12 @@ def range(self, start, end=None, step=1, numPartitions=None):

return DataFrame(jdf, self._wrapped)

def _inferSchemaFromList(self, data):
def _inferSchemaFromList(self, data, names=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we set the names for _createFromRDD -> _inferSchema too?:

>>> spark.createDataFrame(spark.sparkContext.parallelize([[None, 1], ["a", None], [1, 1]]), schema=["a", "b"], samplingRatio=0.99)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/sql/session.py", line 644, in createDataFrame
    rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
  File "/.../spark/python/pyspark/sql/session.py", line 383, in _createFromRDD
    struct = self._inferSchema(rdd, samplingRatio)
  File "/.../spark/python/pyspark/sql/session.py", line 375, in _inferSchema
    schema = rdd.map(_infer_schema).reduce(_merge_type)
  File "/.../spark/python/pyspark/rdd.py", line 852, in reduce
    return reduce(f, vals)
  File "/.../spark/python/pyspark/sql/types.py", line 1133, in _merge_type
    for f in a.fields]
  File "/.../spark/python/pyspark/sql/types.py", line 1126, in _merge_type
    raise TypeError(new_msg("Can not merge type %s and %s" % (type(a), type(b))))
TypeError: field _1: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.LongType'>

"""
Infer schema from list of Row or tuple.

:param data: list of Row or tuple
:param names: list of column names
:return: :class:`pyspark.sql.types.StructType`
"""
if not data:
Expand All @@ -337,7 +338,7 @@ def _inferSchemaFromList(self, data):
if type(first) is dict:
warnings.warn("inferring schema from dict is deprecated,"
"please use pyspark.sql.Row instead")
schema = reduce(_merge_type, map(_infer_schema, data))
schema = reduce(_merge_type, [_infer_schema(row, names) for row in data])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big deal but let's use generator expression -> (_infer_schema(row, names) for row in data)

if _has_nulltype(schema):
raise ValueError("Some of types cannot be determined after inferring")
return schema
Expand Down Expand Up @@ -405,7 +406,7 @@ def _createFromLocal(self, data, schema):
data = list(data)

if schema is None or isinstance(schema, (list, tuple)):
struct = self._inferSchemaFromList(data)
struct = self._inferSchemaFromList(data, names=schema)
converter = _create_converter(struct)
data = map(converter, data)
if isinstance(schema, (list, tuple)):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gberger, could we remove this branch too?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed with commit 5131db2

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

>>> spark.createDataFrame([{'a': 1}], ["b"])
DataFrame[a: bigint]

Hm.. sorry actually I think we should not remove this one and L385 because we should primarily respect user's input and it should be DataFrame[b: bigint].

Expand Down
78 changes: 78 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
from pyspark.sql.types import UserDefinedType, _infer_type, _make_type_verifier
from pyspark.sql.types import _array_signed_int_typecode_ctype_mappings, _array_type_mappings
from pyspark.sql.types import _array_unsigned_int_typecode_ctype_mappings
from pyspark.sql.types import _merge_type
from pyspark.tests import QuietTest, ReusedPySparkTestCase, SparkSubmitTests
from pyspark.sql.functions import UserDefinedFunction, sha2, lit
from pyspark.sql.window import Window
Expand Down Expand Up @@ -1722,6 +1723,83 @@ def test_infer_long_type(self):
self.assertEqual(_infer_type(2**61), LongType())
self.assertEqual(_infer_type(2**71), LongType())

def test_merge_type(self):
self.assertEqual(_merge_type(LongType(), NullType()), LongType())
self.assertEqual(_merge_type(NullType(), LongType()), LongType())

self.assertEqual(_merge_type(LongType(), LongType()), LongType())

self.assertEqual(_merge_type(
ArrayType(LongType()),
ArrayType(LongType())
), ArrayType(LongType()))
with self.assertRaisesRegexp(TypeError, 'arrayElement'):
_merge_type(ArrayType(LongType()), ArrayType(DoubleType()))

self.assertEqual(_merge_type(
MapType(StringType(), LongType()),
MapType(StringType(), LongType())
), MapType(StringType(), LongType()))
with self.assertRaisesRegexp(TypeError, 'mapKey'):
_merge_type(
MapType(StringType(), LongType()),
MapType(DoubleType(), LongType()))
with self.assertRaisesRegexp(TypeError, 'mapValue'):
_merge_type(
MapType(StringType(), LongType()),
MapType(StringType(), DoubleType()))

self.assertEqual(_merge_type(
StructType([StructField("f1", LongType()), StructField("f2", StringType())]),
StructType([StructField("f1", LongType()), StructField("f2", StringType())])
), StructType([StructField("f1", LongType()), StructField("f2", StringType())]))
with self.assertRaisesRegexp(TypeError, r'structField\("f1"\)'):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We don't need r prefix for each regex for assertRaisesRegexp.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then double backslashes? :(

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I don't think we need double backslashes here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, you are right. Fixed

_merge_type(
StructType([StructField("f1", LongType()), StructField("f2", StringType())]),
StructType([StructField("f1", DoubleType()), StructField("f2", StringType())]))

self.assertEqual(_merge_type(
StructType([StructField("f1", ArrayType(LongType())), StructField("f2", StringType())]),
StructType([StructField("f1", ArrayType(LongType())), StructField("f2", StringType())])
), StructType([StructField("f1", ArrayType(LongType())), StructField("f2", StringType())]))
with self.assertRaisesRegexp(TypeError, r'structField\("f1"\)'):
_merge_type(
StructType([
StructField("f1", ArrayType(LongType())),
StructField("f2", StringType())]),
StructType([
StructField("f1", ArrayType(DoubleType())),
StructField("f2", StringType())]))

self.assertEqual(_merge_type(
StructType([
StructField("f1", MapType(StringType(), LongType())),
StructField("f2", StringType())]),
StructType([
StructField("f1", MapType(StringType(), LongType())),
StructField("f2", StringType())])
), StructType([
StructField("f1", MapType(StringType(), LongType())),
StructField("f2", StringType())]))
with self.assertRaisesRegexp(TypeError, r'structField\("f1"\)'):
_merge_type(
StructType([
StructField("f1", MapType(StringType(), LongType())),
StructField("f2", StringType())]),
StructType([
StructField("f1", MapType(StringType(), DoubleType())),
StructField("f2", StringType())]))

self.assertEqual(_merge_type(
StructType([StructField("f1", ArrayType(MapType(StringType(), LongType())))]),
StructType([StructField("f1", ArrayType(MapType(StringType(), LongType())))])
), StructType([StructField("f1", ArrayType(MapType(StringType(), LongType())))]))
with self.assertRaisesRegexp(TypeError, r'structField\("f1"\)\.arrayElement\.mapKey'):
_merge_type(
StructType([StructField("f1", ArrayType(MapType(StringType(), LongType())))]),
StructType([StructField("f1", ArrayType(MapType(DoubleType(), LongType())))])
)

def test_filter_with_datetime(self):
time = datetime.datetime(2015, 4, 17, 23, 1, 2, 3000)
date = time.date()
Expand Down
22 changes: 14 additions & 8 deletions python/pyspark/sql/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,7 @@ def _infer_type(obj):
raise TypeError("not supported type: %s" % type(obj))


def _infer_schema(row):
def _infer_schema(row, names=None):
"""Infer the schema from dict/namedtuple/object"""
if isinstance(row, dict):
items = sorted(row.items())
Expand All @@ -1083,7 +1083,8 @@ def _infer_schema(row):
elif hasattr(row, "_fields"): # namedtuple
items = zip(row._fields, tuple(row))
else:
names = ['_%d' % i for i in range(1, len(row) + 1)]
if names is None:
names = ['_%d' % i for i in range(1, len(row) + 1)]
Copy link
Member

@HyukjinKwon HyukjinKwon Dec 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gberger, Let's revert this change too. Seems it's going to introduce a behaviour change:

Before

>>> spark.createDataFrame([["a", "b"]], ["col1"]).show()
+----+---+
|col1| _2|
+----+---+
|   a|  b|
+----+---+

After

>>> spark.createDataFrame([["a", "b"]], ["col1"]).show()
...
java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 1 fields are required while 2 values are provided.
	at org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:148)
        ...

Copy link
Author

@gberger gberger Dec 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, but by reverting we lose the nice message. Notice below reverted it says field _1, where it could have said field col1.

Instead, I am adding a new elif branch where we check len(names) vs len(row). If we have fewer names than we have columns, we extend the names list, completing it with entries such as "_2".
I have included a test for this as well.

Reverted

>>> spark.createDataFrame([["a", "b"], [1, 2]], ["col1"]).show()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/gberger/Projects/spark/python/pyspark/sql/session.py", line 646, in createDataFrame
    rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File "/Users/gberger/Projects/spark/python/pyspark/sql/session.py", line 409, in _createFromLocal
    struct = self._inferSchemaFromList(data, names=schema)
  File "/Users/gberger/Projects/spark/python/pyspark/sql/session.py", line 341, in _inferSchemaFromList
    schema = reduce(_merge_type, (_infer_schema(row, names) for row in data))
  File "/Users/gberger/Projects/spark/python/pyspark/sql/types.py", line 1132, in _merge_type
    for f in a.fields]
  File "/Users/gberger/Projects/spark/python/pyspark/sql/types.py", line 1125, in _merge_type
    raise TypeError(new_msg("Can not merge type %s and %s" % (type(a), type(b))))
TypeError: field _1: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.LongType'>

Modified

>>> spark.createDataFrame([["a", "b"]], ["col1"])
DataFrame[col1: string, _2: string]
>>> spark.createDataFrame([["a", "b"], [1, 2]], ["col1"]).show()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/gberger/Projects/spark/python/pyspark/sql/session.py", line 646, in createDataFrame
    rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File "/Users/gberger/Projects/spark/python/pyspark/sql/session.py", line 409, in _createFromLocal
    struct = self._inferSchemaFromList(data, names=schema)
  File "/Users/gberger/Projects/spark/python/pyspark/sql/session.py", line 341, in _inferSchemaFromList
    schema = reduce(_merge_type, (_infer_schema(row, names) for row in data))
  File "/Users/gberger/Projects/spark/python/pyspark/sql/types.py", line 1132, in _merge_type
    for f in a.fields]
  File "/Users/gberger/Projects/spark/python/pyspark/sql/types.py", line 1125, in _merge_type
    raise TypeError(new_msg("Can not merge type %s and %s" % (type(a), type(b))))
TypeError: field col1: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.LongType'>

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yup. I noticed it too but I think the same thing applies to other cases, for example:

>>> spark.createDataFrame([{"a": 1}, {"a": []}], ["col1"])
...
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/sql/session.py", line 646, in createDataFrame
    rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File "/.../spark/python/pyspark/sql/session.py", line 409, in _createFromLocal
    struct = self._inferSchemaFromList(data, names=schema)
  File "/.../spark/python/pyspark/sql/session.py", line 341, in _inferSchemaFromList
    schema = reduce(_merge_type, (_infer_schema(row, names) for row in data))
  File "/.../spark/python/pyspark/sql/types.py", line 1133, in _merge_type
    for f in a.fields]
  File "/.../spark/python/pyspark/sql/types.py", line 1126, in _merge_type
    raise TypeError(new_msg("Can not merge type %s and %s" % (type(a), type(b))))
TypeError: field a: Can not merge type <class 'pyspark.sql.types.LongType'> and <class 'pyspark.sql.types.ArrayType'>

So, let's revert this change here for now. There are some subtleties here but I think it's fine.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we revert it, then the original purpose of the PR is lost:

>>> df = pd.DataFrame(data={'a':[1,2,3], 'b': [4, 5, 'hello']})
>>> spark.createDataFrame(df)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/gberger/Projects/spark/python/pyspark/sql/session.py", line 646, in createDataFrame
    rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File "/Users/gberger/Projects/spark/python/pyspark/sql/session.py", line 409, in _createFromLocal
    struct = self._inferSchemaFromList(data, names=schema)
  File "/Users/gberger/Projects/spark/python/pyspark/sql/session.py", line 341, in _inferSchemaFromList
    schema = reduce(_merge_type, (_infer_schema(row, names) for row in data))
  File "/Users/gberger/Projects/spark/python/pyspark/sql/types.py", line 1132, in _merge_type
    for f in a.fields]
  File "/Users/gberger/Projects/spark/python/pyspark/sql/types.py", line 1125, in _merge_type
    raise TypeError(new_msg("Can not merge type %s and %s" % (type(a), type(b))))
TypeError: field _2: Can not merge type <class 'pyspark.sql.types.LongType'> and <class 'pyspark.sql.types.StringType'>

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon just pushed the elif branch change that I talked about above, please see if it is suitable

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, will take another look soon anyway.

items = zip(names, row)

elif hasattr(row, "__dict__"): # object
Expand All @@ -1108,19 +1109,23 @@ def _has_nulltype(dt):
return isinstance(dt, NullType)


def _merge_type(a, b):
def _merge_type(a, b, path=''):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have the default path name for the case we don't have names?
Otherwise, the error message would be like TypeError: .arrayElement: Can not merge type <class 'pyspark.sql.types.LongType'> and <class 'pyspark.sql.types.DoubleType'>. WDYT? @HyukjinKwon

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I agree with it for the current status ..

it's kind of difficult to come up with a good message format in such cases. To me, I actually kind of gave up a pretty format and just chose prose before (in the PR #18521).

I still don't have a good idea to show the nested structure in the error message to be honest. I was thinking one of prose, kind of piece of codes (like schema['f1'].dataType.elementType.keyType instead of #19792 (comment)), or somehow pretty one like printSchema() ... ?

Maybe, I am thinking of referring other formats in Spark or somewhere like Pandas or piece of codes for now.

So, to cut it short, here are what are on my mind for the example of #19792 (comment):

  1. Prose
TypeError: key in map field in array element in field f1: Can not blabla
  1. Piece of codes with StructType
TypeError: schema(?)['f1'].dataType.elementType.keyType: Can not blabla
TypeError: struct(?)['f1'].dataType.elementType.keyType: Can not blabla
  1. printSchema()
TypeError: 
root
 |-- f1: array (nullable = false)
 |    |-- element: map (containsNull = false)
 |    |    |-- key: integer*
 |    |    |-- value: long (valueContainsNull = false)
: *Can not blabla

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't have a strong opinion on this. @ueshin do you maybe have a preference or another option, or are you okay with the current format in general?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, just printing the string in SQL could be an option too to be consistent .. it's a bit messy for deeply nested schema though.:

TypeError: struct<f1:array<map<int,string>>> and struct<f1:array<map<int,int>>> Can not blabla

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the format we can know the path to the error and I guess we don't need the pretty one like the printSchema().
Maybe following the #18521 format is good enough, and the current one is good as well but I wanted something in front of the path string instead of starting with ..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, looks we need something ahead anyway. To me, I am okay with any format.

I think we could fix later again. Just I wondered if you have a preference.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi folks,

Great options proposed by @HyukjinKwon - though I did not comprehend what was the conclusion of the discussion with @ueshin :)

Which format should we employ? And do you want me to use this format right now or is it something you'll fix later?

Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, can you follow the #18521 format for now? Thanks!

if isinstance(a, NullType):
return b
elif isinstance(b, NullType):
return a
elif type(a) is not type(b):
# TODO: type cast (such as int -> long)
raise TypeError("Can not merge type %s and %s" % (type(a), type(b)))
if path == '':
raise TypeError("Can not merge type %s and %s" % (type(a), type(b)))
else:
raise TypeError("%s: Can not merge type %s and %s" % (path, type(a), type(b)))

# same type
if isinstance(a, StructType):
nfs = dict((f.name, f.dataType) for f in b.fields)
fields = [StructField(f.name, _merge_type(f.dataType, nfs.get(f.name, NullType())))
fields = [StructField(f.name, _merge_type(f.dataType, nfs.get(f.name, NullType()),
path='%s.structField("%s")' % (path, f.name)))
for f in a.fields]
names = set([f.name for f in fields])
for n in nfs:
Expand All @@ -1129,11 +1134,12 @@ def _merge_type(a, b):
return StructType(fields)

elif isinstance(a, ArrayType):
return ArrayType(_merge_type(a.elementType, b.elementType), True)
return ArrayType(_merge_type(a.elementType, b.elementType,
path=path + '.arrayElement'), True)

elif isinstance(a, MapType):
return MapType(_merge_type(a.keyType, b.keyType),
_merge_type(a.valueType, b.valueType),
return MapType(_merge_type(a.keyType, b.keyType, path=path + '.mapKey'),
_merge_type(a.valueType, b.valueType, path=path + '.mapValue'),
True)
else:
return a
Expand Down