-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-7858] [SQL] Use output schema, not relation schema, for data source input conversion #6400
Conversation
The tests for the first commit should fail the assertion that it added, demonstrating the bug. My second commit should fix it. |
val converters = schemaFields.map { | ||
f => CatalystTypeConverters.createToCatalystConverter(f.dataType) | ||
} | ||
val mutableRow = new SpecificMutableRow(outputTypes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoops, I copy-pasted a bit too eagerly here; this should be a GenericMutableRow
for consistency with the old code. Will fix now.
Test build #33488 has finished for PR 6400 at commit
|
I went ahead and removed the use of |
How about we add a test using |
Good idea; I can do this and remove the asserts, which might be expensive. EDIT: they're probably pretty cheap, actually, compared to the other costs, so I might leave them in. |
Test build #33489 has finished for PR 6400 at commit
|
Test build #33492 timed out for PR 6400 at commit |
I added a simple regression test in HadoopFsRelationTest. Projecting more columns than the input relation contains seems to trigger an ArrayIndexOutOfBoundsException when the wrong schema / types are used. |
Well, that wasn't the failure I was hoping for, but you should be able to check out e6d97fb and run |
Test build #33505 timed out for PR 6400 at commit |
@@ -76,6 +76,12 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { | |||
df.filter('a > 1 && 'p1 < 2).select('b, 'p1), | |||
for (i <- 2 to 3; _ <- Seq("foo", "bar")) yield Row(s"val_$i", 1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JoshRosen Found that I made a mistake in this checkAnswer
test, which should have caught this bug... Values of p1
should be either "foo"
or "bar"
, but I put a 1
there. The correct version of this test should be:
// Simple projection and partition pruning
checkAnswer(
df.filter('a > 1 && 'p1 < 2).select('b, 'p1),
for (i <- 2 to 3; j <- Seq("foo", "bar")) yield Row(s"val_$i", j))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure that p1
is string-typed? I think that p2
is, but there's other code which implies that p1
should be an int.
e.g. at the top of HadoopFsRelationTest
:
val partitionedTestDF1 = (for {
i <- 1 to 3
p2 <- Seq("foo", "bar")
} yield (i, s"val_$i", 1, p2)).toDF("a", "b", "p1", "p2")
val partitionedTestDF2 = (for {
i <- 1 to 3
p2 <- Seq("foo", "bar")
} yield (i, s"val_$i", 2, p2)).toDF("a", "b", "p1", "p2")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think the type of p1
is int.
cc @liancheng
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, sorry my bad.
Test build #860 has finished for PR 6400 at commit
|
@JoshRosen Would you please also help fixing the minor bug in the test case mentioned in #6400 (comment)? Thanks! |
Test build #33543 has finished for PR 6400 at commit
|
val numColumns = outputTypes.length | ||
val mutableRow = new GenericMutableRow(numColumns) | ||
val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter) | ||
iterator.map { r => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a while loop will be better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually the return value of the mapPartitions
call, which must be a Scala iterator. Note that we do use a while loop to iterate over the columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I am fine with the current version. We are not slower than the previous version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably faster than the older version since it doesn't use an unnecessary bufferedIterator :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I also realized that after I made my comment.
LGTM. I am merging it to master and branch 1.4. |
…ource input conversion In `DataSourceStrategy.createPhysicalRDD`, we use the relation schema as the target schema for converting incoming rows into Catalyst rows. However, we should be using the output schema instead, since our scan might return a subset of the relation's columns. This patch incorporates #6414 by liancheng, which fixes an issue in `SimpleTestRelation` that prevented this bug from being caught by our old tests: > In `SimpleTextRelation`, we specified `needsConversion` to `true`, indicating that values produced by this testing relation should be of Scala types, and need to be converted to Catalyst types when necessary. However, we also used `Cast` to convert strings to expected data types. And `Cast` always produces values of Catalyst types, thus no conversion is done at all. This PR makes `SimpleTextRelation` produce Scala values so that data conversion code paths can be properly tested. Closes #5986. Author: Josh Rosen <[email protected]> Author: Cheng Lian <[email protected]> Author: Cheng Lian <[email protected]> Closes #6400 from JoshRosen/SPARK-7858 and squashes the following commits: e71c866 [Josh Rosen] Re-fix bug so that the tests pass again 56b13e5 [Josh Rosen] Add regression test to hadoopFsRelationSuites 2169a0f [Josh Rosen] Remove use of SpecificMutableRow and BufferedIterator 6cd7366 [Josh Rosen] Fix SPARK-7858 by using output types for conversion. 5a00e66 [Josh Rosen] Add assertions in order to reproduce SPARK-7858 8ba195c [Cheng Lian] Merge 9968fba into 6166473 9968fba [Cheng Lian] Tests the data type conversion code paths (cherry picked from commit 0c33c7b) Signed-off-by: Yin Huai <[email protected]>
…c row accessors This patch significantly refactors CatalystTypeConverters to both clean up the code and enable these conversions to work with future Project Tungsten features. At a high level, I've reorganized the code so that all functions dealing with the same type are grouped together into type-specific subclasses of `CatalystTypeConveter`. In addition, I've added new methods that allow the Catalyst Row -> Scala Row conversions to access the Catalyst row's fields through type-specific `getTYPE()` methods rather than the generic `get()` / `Row.apply` methods. This refactoring is a blocker to being able to unit test new operators that I'm developing as part of Project Tungsten, since those operators may output `UnsafeRow` instances which don't support the generic `get()`. The stricter type usage of types here has uncovered some bugs in other parts of Spark SQL: - #6217: DescribeCommand is assigned wrong output attributes in SparkStrategies - #6218: DataFrame.describe() should cast all aggregates to String - #6400: Use output schema, not relation schema, for data source input conversion Spark SQL current has undefined behavior for what happens when you try to create a DataFrame from user-specified rows whose values don't match the declared schema. According to the `createDataFrame()` Scaladoc: > It is important to make sure that the structure of every [[Row]] of the provided RDD matches the provided schema. Otherwise, there will be runtime exception. Given this, it sounds like it's technically not a break of our API contract to fail-fast when the data types don't match. However, there appear to be many cases where we don't fail even though the types don't match. For example, `JavaHashingTFSuite.hasingTF` passes a column of integers values for a "label" column which is supposed to contain floats. This column isn't actually read or modified as part of query processing, so its actual concrete type doesn't seem to matter. In other cases, there could be situations where we have generic numeric aggregates that tolerate being called with different numeric types than the schema specified, but this can be okay due to numeric conversions. In the long run, we will probably want to come up with precise semantics for implicit type conversions / widening when converting Java / Scala rows to Catalyst rows. Until then, though, I think that failing fast with a ClassCastException is a reasonable behavior; this is the approach taken in this patch. Note that certain optimizations in the inbound conversion functions for primitive types mean that we'll probably preserve the old undefined behavior in a majority of cases. Author: Josh Rosen <[email protected]> Closes #6222 from JoshRosen/catalyst-converters-refactoring and squashes the following commits: 740341b [Josh Rosen] Optimize method dispatch for primitive type conversions befc613 [Josh Rosen] Add tests to document Option-handling behavior. 5989593 [Josh Rosen] Use new SparkFunSuite base in CatalystTypeConvertersSuite 6edf7f8 [Josh Rosen] Re-add convertToScala(), since a Hive test still needs it 3f7b2d8 [Josh Rosen] Initialize converters lazily so that the attributes are resolved first 6ad0ebb [Josh Rosen] Fix JavaHashingTFSuite ClassCastException 677ff27 [Josh Rosen] Fix null handling bug; add tests. 8033d4c [Josh Rosen] Fix serialization error in UserDefinedGenerator. 85bba9d [Josh Rosen] Fix wrong input data in InMemoryColumnarQuerySuite 9c0e4e1 [Josh Rosen] Remove last use of convertToScala(). ae3278d [Josh Rosen] Throw ClassCastException errors during inbound conversions. 7ca7fcb [Josh Rosen] Comments and cleanup 1e87a45 [Josh Rosen] WIP refactoring of CatalystTypeConverters
…ource input conversion In `DataSourceStrategy.createPhysicalRDD`, we use the relation schema as the target schema for converting incoming rows into Catalyst rows. However, we should be using the output schema instead, since our scan might return a subset of the relation's columns. This patch incorporates apache#6414 by liancheng, which fixes an issue in `SimpleTestRelation` that prevented this bug from being caught by our old tests: > In `SimpleTextRelation`, we specified `needsConversion` to `true`, indicating that values produced by this testing relation should be of Scala types, and need to be converted to Catalyst types when necessary. However, we also used `Cast` to convert strings to expected data types. And `Cast` always produces values of Catalyst types, thus no conversion is done at all. This PR makes `SimpleTextRelation` produce Scala values so that data conversion code paths can be properly tested. Closes apache#5986. Author: Josh Rosen <[email protected]> Author: Cheng Lian <[email protected]> Author: Cheng Lian <[email protected]> Closes apache#6400 from JoshRosen/SPARK-7858 and squashes the following commits: e71c866 [Josh Rosen] Re-fix bug so that the tests pass again 56b13e5 [Josh Rosen] Add regression test to hadoopFsRelationSuites 2169a0f [Josh Rosen] Remove use of SpecificMutableRow and BufferedIterator 6cd7366 [Josh Rosen] Fix SPARK-7858 by using output types for conversion. 5a00e66 [Josh Rosen] Add assertions in order to reproduce SPARK-7858 8ba195c [Cheng Lian] Merge 9968fba into 6166473 9968fba [Cheng Lian] Tests the data type conversion code paths
…c row accessors This patch significantly refactors CatalystTypeConverters to both clean up the code and enable these conversions to work with future Project Tungsten features. At a high level, I've reorganized the code so that all functions dealing with the same type are grouped together into type-specific subclasses of `CatalystTypeConveter`. In addition, I've added new methods that allow the Catalyst Row -> Scala Row conversions to access the Catalyst row's fields through type-specific `getTYPE()` methods rather than the generic `get()` / `Row.apply` methods. This refactoring is a blocker to being able to unit test new operators that I'm developing as part of Project Tungsten, since those operators may output `UnsafeRow` instances which don't support the generic `get()`. The stricter type usage of types here has uncovered some bugs in other parts of Spark SQL: - apache#6217: DescribeCommand is assigned wrong output attributes in SparkStrategies - apache#6218: DataFrame.describe() should cast all aggregates to String - apache#6400: Use output schema, not relation schema, for data source input conversion Spark SQL current has undefined behavior for what happens when you try to create a DataFrame from user-specified rows whose values don't match the declared schema. According to the `createDataFrame()` Scaladoc: > It is important to make sure that the structure of every [[Row]] of the provided RDD matches the provided schema. Otherwise, there will be runtime exception. Given this, it sounds like it's technically not a break of our API contract to fail-fast when the data types don't match. However, there appear to be many cases where we don't fail even though the types don't match. For example, `JavaHashingTFSuite.hasingTF` passes a column of integers values for a "label" column which is supposed to contain floats. This column isn't actually read or modified as part of query processing, so its actual concrete type doesn't seem to matter. In other cases, there could be situations where we have generic numeric aggregates that tolerate being called with different numeric types than the schema specified, but this can be okay due to numeric conversions. In the long run, we will probably want to come up with precise semantics for implicit type conversions / widening when converting Java / Scala rows to Catalyst rows. Until then, though, I think that failing fast with a ClassCastException is a reasonable behavior; this is the approach taken in this patch. Note that certain optimizations in the inbound conversion functions for primitive types mean that we'll probably preserve the old undefined behavior in a majority of cases. Author: Josh Rosen <[email protected]> Closes apache#6222 from JoshRosen/catalyst-converters-refactoring and squashes the following commits: 740341b [Josh Rosen] Optimize method dispatch for primitive type conversions befc613 [Josh Rosen] Add tests to document Option-handling behavior. 5989593 [Josh Rosen] Use new SparkFunSuite base in CatalystTypeConvertersSuite 6edf7f8 [Josh Rosen] Re-add convertToScala(), since a Hive test still needs it 3f7b2d8 [Josh Rosen] Initialize converters lazily so that the attributes are resolved first 6ad0ebb [Josh Rosen] Fix JavaHashingTFSuite ClassCastException 677ff27 [Josh Rosen] Fix null handling bug; add tests. 8033d4c [Josh Rosen] Fix serialization error in UserDefinedGenerator. 85bba9d [Josh Rosen] Fix wrong input data in InMemoryColumnarQuerySuite 9c0e4e1 [Josh Rosen] Remove last use of convertToScala(). ae3278d [Josh Rosen] Throw ClassCastException errors during inbound conversions. 7ca7fcb [Josh Rosen] Comments and cleanup 1e87a45 [Josh Rosen] WIP refactoring of CatalystTypeConverters
…ource input conversion In `DataSourceStrategy.createPhysicalRDD`, we use the relation schema as the target schema for converting incoming rows into Catalyst rows. However, we should be using the output schema instead, since our scan might return a subset of the relation's columns. This patch incorporates apache#6414 by liancheng, which fixes an issue in `SimpleTestRelation` that prevented this bug from being caught by our old tests: > In `SimpleTextRelation`, we specified `needsConversion` to `true`, indicating that values produced by this testing relation should be of Scala types, and need to be converted to Catalyst types when necessary. However, we also used `Cast` to convert strings to expected data types. And `Cast` always produces values of Catalyst types, thus no conversion is done at all. This PR makes `SimpleTextRelation` produce Scala values so that data conversion code paths can be properly tested. Closes apache#5986. Author: Josh Rosen <[email protected]> Author: Cheng Lian <[email protected]> Author: Cheng Lian <[email protected]> Closes apache#6400 from JoshRosen/SPARK-7858 and squashes the following commits: e71c866 [Josh Rosen] Re-fix bug so that the tests pass again 56b13e5 [Josh Rosen] Add regression test to hadoopFsRelationSuites 2169a0f [Josh Rosen] Remove use of SpecificMutableRow and BufferedIterator 6cd7366 [Josh Rosen] Fix SPARK-7858 by using output types for conversion. 5a00e66 [Josh Rosen] Add assertions in order to reproduce SPARK-7858 8ba195c [Cheng Lian] Merge 9968fba into 6166473 9968fba [Cheng Lian] Tests the data type conversion code paths
…c row accessors This patch significantly refactors CatalystTypeConverters to both clean up the code and enable these conversions to work with future Project Tungsten features. At a high level, I've reorganized the code so that all functions dealing with the same type are grouped together into type-specific subclasses of `CatalystTypeConveter`. In addition, I've added new methods that allow the Catalyst Row -> Scala Row conversions to access the Catalyst row's fields through type-specific `getTYPE()` methods rather than the generic `get()` / `Row.apply` methods. This refactoring is a blocker to being able to unit test new operators that I'm developing as part of Project Tungsten, since those operators may output `UnsafeRow` instances which don't support the generic `get()`. The stricter type usage of types here has uncovered some bugs in other parts of Spark SQL: - apache#6217: DescribeCommand is assigned wrong output attributes in SparkStrategies - apache#6218: DataFrame.describe() should cast all aggregates to String - apache#6400: Use output schema, not relation schema, for data source input conversion Spark SQL current has undefined behavior for what happens when you try to create a DataFrame from user-specified rows whose values don't match the declared schema. According to the `createDataFrame()` Scaladoc: > It is important to make sure that the structure of every [[Row]] of the provided RDD matches the provided schema. Otherwise, there will be runtime exception. Given this, it sounds like it's technically not a break of our API contract to fail-fast when the data types don't match. However, there appear to be many cases where we don't fail even though the types don't match. For example, `JavaHashingTFSuite.hasingTF` passes a column of integers values for a "label" column which is supposed to contain floats. This column isn't actually read or modified as part of query processing, so its actual concrete type doesn't seem to matter. In other cases, there could be situations where we have generic numeric aggregates that tolerate being called with different numeric types than the schema specified, but this can be okay due to numeric conversions. In the long run, we will probably want to come up with precise semantics for implicit type conversions / widening when converting Java / Scala rows to Catalyst rows. Until then, though, I think that failing fast with a ClassCastException is a reasonable behavior; this is the approach taken in this patch. Note that certain optimizations in the inbound conversion functions for primitive types mean that we'll probably preserve the old undefined behavior in a majority of cases. Author: Josh Rosen <[email protected]> Closes apache#6222 from JoshRosen/catalyst-converters-refactoring and squashes the following commits: 740341b [Josh Rosen] Optimize method dispatch for primitive type conversions befc613 [Josh Rosen] Add tests to document Option-handling behavior. 5989593 [Josh Rosen] Use new SparkFunSuite base in CatalystTypeConvertersSuite 6edf7f8 [Josh Rosen] Re-add convertToScala(), since a Hive test still needs it 3f7b2d8 [Josh Rosen] Initialize converters lazily so that the attributes are resolved first 6ad0ebb [Josh Rosen] Fix JavaHashingTFSuite ClassCastException 677ff27 [Josh Rosen] Fix null handling bug; add tests. 8033d4c [Josh Rosen] Fix serialization error in UserDefinedGenerator. 85bba9d [Josh Rosen] Fix wrong input data in InMemoryColumnarQuerySuite 9c0e4e1 [Josh Rosen] Remove last use of convertToScala(). ae3278d [Josh Rosen] Throw ClassCastException errors during inbound conversions. 7ca7fcb [Josh Rosen] Comments and cleanup 1e87a45 [Josh Rosen] WIP refactoring of CatalystTypeConverters
In
DataSourceStrategy.createPhysicalRDD
, we use the relation schema as the target schema for converting incoming rows into Catalyst rows. However, we should be using the output schema instead, since our scan might return a subset of the relation's columns.This patch incorporates #6414 by @liancheng, which fixes an issue in
SimpleTestRelation
that prevented this bug from being caught by our old tests:Closes #5986.