-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-1151: Update dev merge script to use spark.git instead of incubator-spark #47
Closed
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Merged build triggered. |
Merged build started. |
Merged build triggered. |
Merged build finished. |
All automated tests passed. |
Thanks tom - merged. |
jhartlaub
referenced
this pull request
in jhartlaub/spark
May 27, 2014
add Function3 and WrappedFunction3 I need these two to add more API to spark streaming
JasonMWhite
pushed a commit
to JasonMWhite/spark
that referenced
this pull request
Dec 2, 2015
Orens problematic bump
JasonMWhite
pushed a commit
to JasonMWhite/spark
that referenced
this pull request
Dec 2, 2015
…ematic_bump"" This reverts commit 03aa82b.
JasonMWhite
pushed a commit
to JasonMWhite/spark
that referenced
this pull request
Dec 2, 2015
…ns_problematic_bump""" This reverts commit f37de6c.
asfgit
pushed a commit
that referenced
this pull request
Apr 27, 2016
…w queries ## What changes were proposed in this pull request? This PR aims to implement decimal aggregation optimization for window queries by improving existing `DecimalAggregates`. Historically, `DecimalAggregates` optimizer is designed to transform general `sum/avg(decimal)`, but it breaks recently added windows queries like the followings. The following queries work well without the current `DecimalAggregates` optimizer. **Sum** ```scala scala> sql("select sum(a) over () from (select explode(array(1.0,2.0)) a) t").head java.lang.RuntimeException: Unsupported window function: MakeDecimal((sum(UnscaledValue(a#31)),mode=Complete,isDistinct=false),12,1) scala> sql("select sum(a) over () from (select explode(array(1.0,2.0)) a) t").explain() == Physical Plan == WholeStageCodegen : +- Project [sum(a) OVER ( ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#23] : +- INPUT +- Window [MakeDecimal((sum(UnscaledValue(a#21)),mode=Complete,isDistinct=false),12,1) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum(a) OVER ( ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#23] +- Exchange SinglePartition, None +- Generate explode([1.0,2.0]), false, false, [a#21] +- Scan OneRowRelation[] ``` **Average** ```scala scala> sql("select avg(a) over () from (select explode(array(1.0,2.0)) a) t").head java.lang.RuntimeException: Unsupported window function: cast(((avg(UnscaledValue(a#40)),mode=Complete,isDistinct=false) / 10.0) as decimal(6,5)) scala> sql("select avg(a) over () from (select explode(array(1.0,2.0)) a) t").explain() == Physical Plan == WholeStageCodegen : +- Project [avg(a) OVER ( ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#44] : +- INPUT +- Window [cast(((avg(UnscaledValue(a#42)),mode=Complete,isDistinct=false) / 10.0) as decimal(6,5)) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS avg(a) OVER ( ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#44] +- Exchange SinglePartition, None +- Generate explode([1.0,2.0]), false, false, [a#42] +- Scan OneRowRelation[] ``` After this PR, those queries work fine and new optimized physical plans look like the followings. **Sum** ```scala scala> sql("select sum(a) over () from (select explode(array(1.0,2.0)) a) t").explain() == Physical Plan == WholeStageCodegen : +- Project [sum(a) OVER ( ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#35] : +- INPUT +- Window [MakeDecimal((sum(UnscaledValue(a#33)),mode=Complete,isDistinct=false) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING),12,1) AS sum(a) OVER ( ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#35] +- Exchange SinglePartition, None +- Generate explode([1.0,2.0]), false, false, [a#33] +- Scan OneRowRelation[] ``` **Average** ```scala scala> sql("select avg(a) over () from (select explode(array(1.0,2.0)) a) t").explain() == Physical Plan == WholeStageCodegen : +- Project [avg(a) OVER ( ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#47] : +- INPUT +- Window [cast(((avg(UnscaledValue(a#45)),mode=Complete,isDistinct=false) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) / 10.0) as decimal(6,5)) AS avg(a) OVER ( ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#47] +- Exchange SinglePartition, None +- Generate explode([1.0,2.0]), false, false, [a#45] +- Scan OneRowRelation[] ``` In this PR, *SUM over window* pattern matching is based on the code of hvanhovell ; he should be credited for the work he did. ## How was this patch tested? Pass the Jenkins tests (with newly added testcases) Author: Dongjoon Hyun <[email protected]> Closes #12421 from dongjoon-hyun/SPARK-14664.
vlad17
pushed a commit
to vlad17/spark
that referenced
this pull request
Aug 23, 2016
## What changes were proposed in this pull request? This patch was originally authored by davies. The patch implements dynamic partition pruning for branch-2.0. ## How was this patch tested? An end-to-end test was added in SQLQuerySuite. Author: Davies Liu <[email protected]> Closes apache#47 from rxin/rxin-dynamic-partition-pruning.
jzhuge
added a commit
to jzhuge/spark
that referenced
this pull request
Mar 7, 2019
…dv2table to netflix/2.3.2-unstable Squashed commit of the following: commit fec21fd4abab2ef27ae1f4199e3c75217b6a3e8e Author: Vinitha Gankidi <[email protected]> Date: Tue Feb 26 14:58:13 2019 -0800 NETFLIX-BUILD: loadV2Table should throw NoSuchTableException instead of returning None
hn5092
pushed a commit
to hn5092/spark
that referenced
this pull request
Nov 4, 2019
hn5092
pushed a commit
to hn5092/spark
that referenced
this pull request
Nov 4, 2019
apache#47 ShuffleMapTask ignore spark.driver.maxResultSize
cloud-fan
pushed a commit
that referenced
this pull request
Nov 10, 2020
### What changes were proposed in this pull request? Push down filter through expand. For case below: ``` create table t1(pid int, uid int, sid int, dt date, suid int) using parquet; create table t2(pid int, vs int, uid int, csid int) using parquet; SELECT years, appversion, SUM(uusers) AS users FROM (SELECT Date_trunc('year', dt) AS years, CASE WHEN h.pid = 3 THEN 'iOS' WHEN h.pid = 4 THEN 'Android' ELSE 'Other' END AS viewport, h.vs AS appversion, Count(DISTINCT u.uid) AS uusers ,Count(DISTINCT u.suid) AS srcusers FROM t1 u join t2 h ON h.uid = u.uid GROUP BY 1, 2, 3) AS a WHERE viewport = 'iOS' GROUP BY 1, 2 ``` Plan. before this pr: ``` == Physical Plan == *(5) HashAggregate(keys=[years#30, appversion#32], functions=[sum(uusers#33L)]) +- Exchange hashpartitioning(years#30, appversion#32, 200), true, [id=#251] +- *(4) HashAggregate(keys=[years#30, appversion#32], functions=[partial_sum(uusers#33L)]) +- *(4) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12], functions=[count(if ((gid#44 = 1)) u.`uid`#47 else null)]) +- Exchange hashpartitioning(date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, 200), true, [id=#246] +- *(3) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12], functions=[partial_count(if ((gid#44 = 1)) u.`uid`#47 else null)]) +- *(3) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, u.`uid`#47, u.`suid`#48, gid#44], functions=[]) +- Exchange hashpartitioning(date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, u.`uid`#47, u.`suid`#48, gid#44, 200), true, [id=#241] +- *(2) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, u.`uid`#47, u.`suid`#48, gid#44], functions=[]) +- *(2) Filter (CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46 = iOS) +- *(2) Expand [ArrayBuffer(date_trunc(year, cast(dt#9 as timestamp), Some(Etc/GMT+7)), CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END, vs#12, uid#7, null, 1), ArrayBuffer(date_trunc(year, cast(dt#9 as timestamp), Some(Etc/GMT+7)), CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END, vs#12, null, suid#10, 2)], [date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, u.`uid`#47, u.`suid`#48, gid#44] +- *(2) Project [uid#7, dt#9, suid#10, pid#11, vs#12] +- *(2) BroadcastHashJoin [uid#7], [uid#13], Inner, BuildRight :- *(2) Project [uid#7, dt#9, suid#10] : +- *(2) Filter isnotnull(uid#7) : +- *(2) ColumnarToRow : +- FileScan parquet default.t1[uid#7,dt#9,suid#10] Batched: true, DataFilters: [isnotnull(uid#7)], Format: Parquet, Location: InMemoryFileIndex[file:/root/spark-3.0.0-bin-hadoop3.2/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,dt:date,suid:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[2, int, true] as bigint))), [id=#233] +- *(1) Project [pid#11, vs#12, uid#13] +- *(1) Filter isnotnull(uid#13) +- *(1) ColumnarToRow +- FileScan parquet default.t2[pid#11,vs#12,uid#13] Batched: true, DataFilters: [isnotnull(uid#13)], Format: Parquet, Location: InMemoryFileIndex[file:/root/spark-3.0.0-bin-hadoop3.2/spark-warehouse/t2], PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<pid:int,vs:int,uid:int> ``` Plan. after. this pr. : ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[years#0, appversion#2], functions=[sum(uusers#3L)], output=[years#0, appversion#2, users#5L]) +- Exchange hashpartitioning(years#0, appversion#2, 5), true, [id=#71] +- HashAggregate(keys=[years#0, appversion#2], functions=[partial_sum(uusers#3L)], output=[years#0, appversion#2, sum#22L]) +- HashAggregate(keys=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12], functions=[count(distinct uid#7)], output=[years#0, appversion#2, uusers#3L]) +- Exchange hashpartitioning(date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, 5), true, [id=#67] +- HashAggregate(keys=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12], functions=[partial_count(distinct uid#7)], output=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, count#27L]) +- HashAggregate(keys=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7], functions=[], output=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7]) +- Exchange hashpartitioning(date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7, 5), true, [id=#63] +- HashAggregate(keys=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles)) AS date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END AS CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7], functions=[], output=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7]) +- Project [uid#7, dt#9, pid#11, vs#12] +- BroadcastHashJoin [uid#7], [uid#13], Inner, BuildRight, false :- Filter isnotnull(uid#7) : +- FileScan parquet default.t1[uid#7,dt#9] Batched: true, DataFilters: [isnotnull(uid#7)], Format: Parquet, Location: InMemoryFileIndex[file:/private/var/folders/4l/7_c5c97s1_gb0d9_d6shygx00000gn/T/warehouse-c069d87..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,dt:date> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[2, int, false] as bigint)),false), [id=#58] +- Filter ((CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END = iOS) AND isnotnull(uid#13)) +- FileScan parquet default.t2[pid#11,vs#12,uid#13] Batched: true, DataFilters: [(CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END = iOS), isnotnull..., Format: Parquet, Location: InMemoryFileIndex[file:/private/var/folders/4l/7_c5c97s1_gb0d9_d6shygx00000gn/T/warehouse-c069d87..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<pid:int,vs:int,uid:int> ``` ### Why are the changes needed? Improve performance, filter more data. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UT Closes #30278 from AngersZhuuuu/SPARK-33302. Authored-by: angerszhu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
No description provided.