-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-1335. Also increase perm gen / code cache for scalatest when invoked via Maven build #253
Conversation
…s elsewhere, which allows tests to complete in at least one environment where they are failing. (Also removed a duplicate -Xms setting elsewhere.)
…warnings, from some recent doc changes. We apparently can't generate links outside the module.
Merged build triggered. One or more automated tests failed |
Merged build started. One or more automated tests failed |
Merged build finished. One or more automated tests failed |
One or more automated tests failed |
Merged build triggered. One or more automated tests failed |
Merged build started. One or more automated tests failed |
Merged build finished. All automated tests passed. |
All automated tests passed. |
Thanks for looking at this, we were also having problems with our maven Jenkins build. I'm a little torn on removing the scaladoc links, as I believe it is possible to have cross project links using a new features of SBT (which is probably what we will continue to use to publish the docs). However, false positives for warnings make you ignore important warnings and I don't know when I'm going to have time to configure the aforementioned feature. |
@srowen do you mind dropping the build warning stuff for now? I agree we should either fix it or change the doc script within the next few days. Just want to get this other fix merged in ASAP... |
I just went ahead and merged this but removed the scaladoc changes. We can consider those separately - wanted to get the jenkins maven build back online. |
…voked via Maven build I am observing build failures when the Maven build reaches tests in the new SQL components. (I'm on Java 7 / OSX 10.9). The failure is the usual complaint from scala, that it's out of permgen space, or that JIT out of code cache space. I see that various build scripts increase these both for SBT. This change simply adds these settings to scalatest's arguments. Works for me and seems a bit more consistent. (I also snuck in cures for new build warnings from new scaladoc. Felt too trivial for a new PR, although it's separate. Just something I also saw while examining the build output.) Author: Sean Owen <[email protected]> Closes apache#253 from srowen/SPARK-1335 and squashes the following commits: c0f2d31 [Sean Owen] Appease scalastyle with a newline at the end of the file a02679c [Sean Owen] Fix scaladoc errors due to missing links, which are generating build warnings, from some recent doc changes. We apparently can't generate links outside the module. b2c6a09 [Sean Owen] Add perm gen, code cache settings to scalatest, mirroring SBT settings elsewhere, which allows tests to complete in at least one environment where they are failing. (Also removed a duplicate -Xms setting elsewhere.)
This is a backport of apache@8f0511e. ## What changes were proposed in this pull request? Spark executes SQL commands eagerly. It does this by creating an RDD which contains the command's results. The downside to this is that any action on this RDD triggers a Spark job which is expensive and is unnecessary. This PR fixes this by avoiding the materialization of an `RDD` for `Command`s; it just materializes the result and puts them in a `LocalRelation`. ## How was this patch tested? Added a regression test to `SQLQuerySuite`. Author: Herman van Hovell <[email protected]> Closes apache#253 from hvanhovell/SPARK-19650.
* added quota docs * small revisions * formatting and more revisions * add max cores section * use scala 2.11 in examples
…is reused ## What changes were proposed in this pull request? With this change, we can easily identify the plan difference when subquery is reused. When the reuse is enabled, the plan looks like ``` == Physical Plan == CollectLimit 1 +- *(1) Project [(Subquery subquery240 + ReusedSubquery Subquery subquery240) AS (scalarsubquery() + scalarsubquery())#253] : :- Subquery subquery240 : : +- *(2) HashAggregate(keys=[], functions=[avg(cast(key#13 as bigint))], output=[avg(key)#250]) : : +- Exchange SinglePartition : : +- *(1) HashAggregate(keys=[], functions=[partial_avg(cast(key#13 as bigint))], output=[sum#256, count#257L]) : : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13] : : +- Scan[obj#12] : +- ReusedSubquery Subquery subquery240 +- *(1) SerializeFromObject +- Scan[obj#12] ``` When the reuse is disabled, the plan looks like ``` == Physical Plan == CollectLimit 1 +- *(1) Project [(Subquery subquery286 + Subquery subquery287) AS (scalarsubquery() + scalarsubquery())#299] : :- Subquery subquery286 : : +- *(2) HashAggregate(keys=[], functions=[avg(cast(key#13 as bigint))], output=[avg(key)#296]) : : +- Exchange SinglePartition : : +- *(1) HashAggregate(keys=[], functions=[partial_avg(cast(key#13 as bigint))], output=[sum#302, count#303L]) : : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13] : : +- Scan[obj#12] : +- Subquery subquery287 : +- *(2) HashAggregate(keys=[], functions=[avg(cast(key#13 as bigint))], output=[avg(key)#298]) : +- Exchange SinglePartition : +- *(1) HashAggregate(keys=[], functions=[partial_avg(cast(key#13 as bigint))], output=[sum#306, count#307L]) : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13] : +- Scan[obj#12] +- *(1) SerializeFromObject +- Scan[obj#12] ``` ## How was this patch tested? Modified the existing test. Closes #24258 from gatorsmile/followupSPARK-27279. Authored-by: gatorsmile <[email protected]> Signed-off-by: gatorsmile <[email protected]>
Export env variables before setting -x and -e
…is reused With this change, we can easily identify the plan difference when subquery is reused. When the reuse is enabled, the plan looks like ``` == Physical Plan == CollectLimit 1 +- *(1) Project [(Subquery subquery240 + ReusedSubquery Subquery subquery240) AS (scalarsubquery() + scalarsubquery())apache#253] : :- Subquery subquery240 : : +- *(2) HashAggregate(keys=[], functions=[avg(cast(key#13 as bigint))], output=[avg(key)apache#250]) : : +- Exchange SinglePartition : : +- *(1) HashAggregate(keys=[], functions=[partial_avg(cast(key#13 as bigint))], output=[sum#256, count#257L]) : : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13] : : +- Scan[obj#12] : +- ReusedSubquery Subquery subquery240 +- *(1) SerializeFromObject +- Scan[obj#12] ``` When the reuse is disabled, the plan looks like ``` == Physical Plan == CollectLimit 1 +- *(1) Project [(Subquery subquery286 + Subquery subquery287) AS (scalarsubquery() + scalarsubquery())apache#299] : :- Subquery subquery286 : : +- *(2) HashAggregate(keys=[], functions=[avg(cast(key#13 as bigint))], output=[avg(key)apache#296]) : : +- Exchange SinglePartition : : +- *(1) HashAggregate(keys=[], functions=[partial_avg(cast(key#13 as bigint))], output=[sum#302, count#303L]) : : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13] : : +- Scan[obj#12] : +- Subquery subquery287 : +- *(2) HashAggregate(keys=[], functions=[avg(cast(key#13 as bigint))], output=[avg(key)apache#298]) : +- Exchange SinglePartition : +- *(1) HashAggregate(keys=[], functions=[partial_avg(cast(key#13 as bigint))], output=[sum#306, count#307L]) : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13] : +- Scan[obj#12] +- *(1) SerializeFromObject +- Scan[obj#12] ``` Modified the existing test. Closes apache#24258 from gatorsmile/followupSPARK-27279. Authored-by: gatorsmile <[email protected]> Signed-off-by: gatorsmile <[email protected]>
* KE-26764 fix loophole in spark * upgrade thriftserver to 0.14 * upgrade spark version to r21
### What changes were proposed in this pull request? This PR adds a new optimizer rule `MergeScalarSubqueries` to merge multiple non-correlated `ScalarSubquery`s to compute multiple scalar values once. E.g. the following query: ``` SELECT (SELECT avg(a) FROM t), (SELECT sum(b) FROM t) ``` is optimized from: ``` == Optimized Logical Plan == Project [scalar-subquery#242 [] AS scalarsubquery()#253, scalar-subquery#243 [] AS scalarsubquery()#254L] : :- Aggregate [avg(a#244) AS avg(a)#247] : : +- Project [a#244] : : +- Relation default.t[a#244,b#245] parquet : +- Aggregate [sum(a#251) AS sum(a)#250L] : +- Project [a#251] : +- Relation default.t[a#251,b#252] parquet +- OneRowRelation ``` to: ``` == Optimized Logical Plan == Project [scalar-subquery#242 [].avg(a) AS scalarsubquery()#253, scalar-subquery#243 [].sum(a) AS scalarsubquery()#254L] : :- Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260] : : +- Aggregate [avg(a#244) AS avg(a)#247, sum(a#244) AS sum(a)#250L] : : +- Project [a#244] : : +- Relation default.t[a#244,b#245] parquet : +- Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260] : +- Aggregate [avg(a#244) AS avg(a)#247, sum(a#244) AS sum(a)#250L] : +- Project [a#244] : +- Relation default.t[a#244,b#245] parquet +- OneRowRelation ``` and in the physical plan subqueries are reused: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(1) Project [Subquery subquery#242, [id=#113].avg(a) AS scalarsubquery()#253, ReusedSubquery Subquery subquery#242, [id=#113].sum(a) AS scalarsubquery()#254L] : :- Subquery subquery#242, [id=#113] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(2) Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260] +- *(2) HashAggregate(keys=[], functions=[avg(a#244), sum(a#244)], output=[avg(a)#247, sum(a)#250L]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#158] +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#244), partial_sum(a#244)], output=[sum#262, count#263L, sum#264L]) +- *(1) ColumnarToRow +- FileScan parquet default.t[a#244] Batched: true, DataFilters: [], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int> +- == Initial Plan == Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260] +- HashAggregate(keys=[], functions=[avg(a#244), sum(a#244)], output=[avg(a)#247, sum(a)#250L]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#110] +- HashAggregate(keys=[], functions=[partial_avg(a#244), partial_sum(a#244)], output=[sum#262, count#263L, sum#264L]) +- FileScan parquet default.t[a#244] Batched: true, DataFilters: [], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int> : +- ReusedSubquery Subquery subquery#242, [id=#113] +- *(1) Scan OneRowRelation[] +- == Initial Plan == ... ``` Please note that the above simple example could be easily optimized into a common select expression without reuse node, but this PR can handle more complex queries as well. ### Why are the changes needed? Performance improvement. ``` [info] TPCDS Snappy: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] q9 - MergeScalarSubqueries off 50798 52521 1423 0.0 Infinity 1.0X [info] q9 - MergeScalarSubqueries on 19484 19675 226 0.0 Infinity 2.6X [info] TPCDS Snappy: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] q9b - MergeScalarSubqueries off 15430 17803 NaN 0.0 Infinity 1.0X [info] q9b - MergeScalarSubqueries on 3862 4002 196 0.0 Infinity 4.0X ``` Please find `q9b` in the description of SPARK-34079. It is a variant of [q9.sql](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q9.sql) using CTE. The performance improvement in case of `q9` comes from merging 15 subqueries into 5 and in case of `q9b` it comes from merging 5 subqueries into 1. ### Does this PR introduce _any_ user-facing change? No. But this optimization can be disabled with `spark.sql.optimizer.excludedRules` config. ### How was this patch tested? Existing and new UTs. Closes #32298 from peter-toth/SPARK-34079-multi-column-scalar-subquery. Lead-authored-by: Peter Toth <[email protected]> Co-authored-by: attilapiros <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request? This PR adds a new optimizer rule `MergeScalarSubqueries` to merge multiple non-correlated `ScalarSubquery`s to compute multiple scalar values once. E.g. the following query: ``` SELECT (SELECT avg(a) FROM t), (SELECT sum(b) FROM t) ``` is optimized from: ``` == Optimized Logical Plan == Project [scalar-subquery#242 [] AS scalarsubquery()#253, scalar-subquery#243 [] AS scalarsubquery()#254L] : :- Aggregate [avg(a#244) AS avg(a)#247] : : +- Project [a#244] : : +- Relation default.t[a#244,b#245] parquet : +- Aggregate [sum(a#251) AS sum(a)#250L] : +- Project [a#251] : +- Relation default.t[a#251,b#252] parquet +- OneRowRelation ``` to: ``` == Optimized Logical Plan == Project [scalar-subquery#242 [].avg(a) AS scalarsubquery()#253, scalar-subquery#243 [].sum(a) AS scalarsubquery()#254L] : :- Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260] : : +- Aggregate [avg(a#244) AS avg(a)#247, sum(a#244) AS sum(a)#250L] : : +- Project [a#244] : : +- Relation default.t[a#244,b#245] parquet : +- Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260] : +- Aggregate [avg(a#244) AS avg(a)#247, sum(a#244) AS sum(a)#250L] : +- Project [a#244] : +- Relation default.t[a#244,b#245] parquet +- OneRowRelation ``` and in the physical plan subqueries are reused: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(1) Project [Subquery subquery#242, [id=#113].avg(a) AS scalarsubquery()#253, ReusedSubquery Subquery subquery#242, [id=#113].sum(a) AS scalarsubquery()#254L] : :- Subquery subquery#242, [id=#113] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(2) Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260] +- *(2) HashAggregate(keys=[], functions=[avg(a#244), sum(a#244)], output=[avg(a)#247, sum(a)#250L]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#158] +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#244), partial_sum(a#244)], output=[sum#262, count#263L, sum#264L]) +- *(1) ColumnarToRow +- FileScan parquet default.t[a#244] Batched: true, DataFilters: [], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int> +- == Initial Plan == Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260] +- HashAggregate(keys=[], functions=[avg(a#244), sum(a#244)], output=[avg(a)#247, sum(a)#250L]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#110] +- HashAggregate(keys=[], functions=[partial_avg(a#244), partial_sum(a#244)], output=[sum#262, count#263L, sum#264L]) +- FileScan parquet default.t[a#244] Batched: true, DataFilters: [], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int> : +- ReusedSubquery Subquery subquery#242, [id=#113] +- *(1) Scan OneRowRelation[] +- == Initial Plan == ... ``` Please note that the above simple example could be easily optimized into a common select expression without reuse node, but this PR can handle more complex queries as well. ### Why are the changes needed? Performance improvement. ``` [info] TPCDS Snappy: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] q9 - MergeScalarSubqueries off 50798 52521 1423 0.0 Infinity 1.0X [info] q9 - MergeScalarSubqueries on 19484 19675 226 0.0 Infinity 2.6X [info] TPCDS Snappy: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] q9b - MergeScalarSubqueries off 15430 17803 NaN 0.0 Infinity 1.0X [info] q9b - MergeScalarSubqueries on 3862 4002 196 0.0 Infinity 4.0X ``` Please find `q9b` in the description of SPARK-34079. It is a variant of [q9.sql](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q9.sql) using CTE. The performance improvement in case of `q9` comes from merging 15 subqueries into 5 and in case of `q9b` it comes from merging 5 subqueries into 1. ### Does this PR introduce _any_ user-facing change? No. But this optimization can be disabled with `spark.sql.optimizer.excludedRules` config. ### How was this patch tested? Existing and new UTs. Closes #32298 from peter-toth/SPARK-34079-multi-column-scalar-subquery. Lead-authored-by: Peter Toth <[email protected]> Co-authored-by: attilapiros <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit e00b81e) Signed-off-by: Wenchen Fan <[email protected]>
…ource is incorrect ### What changes were proposed in this pull request? #36726 supports TimestampNTZ type in JDBC data source. But the implement is incorrect. This PR just modify a test case and it will be failed ! The test case show below. ``` test("SPARK-39339: TimestampNTZType with different local time zones") { val tableName = "timestamp_ntz_diff_tz_support_table" DateTimeTestUtils.outstandingZoneIds.foreach { zoneId => DateTimeTestUtils.withDefaultTimeZone(zoneId) { Seq( "1972-07-04 03:30:00", "2019-01-20 12:00:00.502", "2019-01-20T00:00:00.123456", "1500-01-20T00:00:00.123456" ).foreach { case datetime => val df = spark.sql(s"select timestamp_ntz '$datetime'") df.write.format("jdbc") .mode("overwrite") .option("url", urlWithUserAndPass) .option("dbtable", tableName) .save() DateTimeTestUtils.outstandingZoneIds.foreach { zoneId => DateTimeTestUtils.withDefaultTimeZone(zoneId) { val res = spark.read.format("jdbc") .option("inferTimestampNTZType", "true") .option("url", urlWithUserAndPass) .option("dbtable", tableName) .load() checkAnswer(res, df) } } } } } } ``` The test case output failure show below. ``` Results do not match for query: Timezone: sun.util.calendar.ZoneInfo[id="Africa/Dakar",offset=0,dstSavings=0,useDaylight=false,transitions=3,lastRule=null] Timezone Env: == Parsed Logical Plan == Relation [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'#253] JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1] == Analyzed Logical Plan == TIMESTAMP_NTZ '1500-01-20 00:00:00.123456': timestamp_ntz Relation [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'#253] JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1] == Optimized Logical Plan == Relation [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'#253] JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1] == Physical Plan == *(1) Scan JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1] [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'#253] PushedFilters: [], ReadSchema: struct<TIMESTAMP_NTZ '1500-01-20 00:00:00.123456':timestamp_ntz> == Results == == Results == !== Correct Answer - 1 == == Spark Answer - 1 == struct<TIMESTAMP_NTZ '1500-01-20 00:00:00.123456':timestamp_ntz> struct<TIMESTAMP_NTZ '1500-01-20 00:00:00.123456':timestamp_ntz> ![1500-01-20T00:00:00.123456] [1500-01-20T00:16:08.123456] ScalaTestFailureLocation: org.apache.spark.sql.QueryTest$ at (QueryTest.scala:243) org.scalatest.exceptions.TestFailedException: ``` ### Why are the changes needed? Fix an implement bug. The reason of the bug is use `toJavaTimestamp` and `fromJavaTimestamp`. `toJavaTimestamp` and `fromJavaTimestamp` lead to the timestamp with JVM system time zone. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New test case. Closes #37013 from beliefer/SPARK-39339_followup. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Gengliang Wang <[email protected]>
I am observing build failures when the Maven build reaches tests in the new SQL components. (I'm on Java 7 / OSX 10.9). The failure is the usual complaint from scala, that it's out of permgen space, or that JIT out of code cache space.
I see that various build scripts increase these both for SBT. This change simply adds these settings to scalatest's arguments. Works for me and seems a bit more consistent.
(I also snuck in cures for new build warnings from new scaladoc. Felt too trivial for a new PR, although it's separate. Just something I also saw while examining the build output.)