-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove broken/unused Connection.getChunkFIFO method. #69
Closed
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This method appears to be broken -- since it never removes anything from messages, and it adds new messages to it, the while loop is an infinite loop. The method also does not appear to have ever been used since the code was added in 2012, so this commit removes it.
Merged build triggered. |
Merged build started. |
@tdas should take a look at this actually, I think it was his code. But yes there's no reason to keep FIFO. |
Merged build finished. |
All automated tests passed. |
I've merged this, thanks. |
jhartlaub
referenced
this pull request
in jhartlaub/spark
May 27, 2014
Fix for issue SPARK-627. Implementing --config argument in the scripts. This code fix is for issue SPARK-627. I added code to consider --config arguments in the scripts. In case the <conf-dir> is not a directory the scripts exit. I removed the --hosts argument. It can be achieved by giving a different config directory. Let me know if an explicit --hosts argument is required. (cherry picked from commit fc26e5b) Signed-off-by: Reynold Xin <[email protected]>
JasonMWhite
pushed a commit
to JasonMWhite/spark
that referenced
this pull request
Dec 2, 2015
Upload a tar of our spark to s3
ash211
pushed a commit
to ash211/spark
that referenced
this pull request
Feb 2, 2017
* Retry the submit-application request to multiple nodes. * Fix doc style comment * Check node unschedulable, log retry failures
lins05
pushed a commit
to lins05/spark
that referenced
this pull request
Apr 23, 2017
* Retry the submit-application request to multiple nodes. * Fix doc style comment * Check node unschedulable, log retry failures
erikerlandson
pushed a commit
to erikerlandson/spark
that referenced
this pull request
Jul 28, 2017
* Retry the submit-application request to multiple nodes. * Fix doc style comment * Check node unschedulable, log retry failures
jlopezmalla
pushed a commit
to jlopezmalla/spark
that referenced
this pull request
Oct 23, 2017
* added AT for dispatcher installation * added installation services and parameters * fixed docker version as parameter * added history server deployment
bzhaoopenstack
pushed a commit
to bzhaoopenstack/spark
that referenced
this pull request
Sep 11, 2019
* Add job for check oepnlab-zuul-jobs changes For apache#68 * modify the lint regular expression to support ".yml" format
cloud-fan
pushed a commit
that referenced
this pull request
Oct 8, 2020
… more scenarios such as PartitioningCollection ### What changes were proposed in this pull request? This PR proposes to improve `EnsureRquirement.reorderJoinKeys` to handle the following scenarios: 1. If the keys cannot be reordered to match the left-side `HashPartitioning`, consider the right-side `HashPartitioning`. 2. Handle `PartitioningCollection`, which may contain `HashPartitioning` ### Why are the changes needed? 1. For the scenario 1), the current behavior matches either the left-side `HashPartitioning` or the right-side `HashPartitioning`. This means that if both sides are `HashPartitioning`, it will try to match only the left side. The following will not consider the right-side `HashPartitioning`: ``` val df1 = (0 until 10).map(i => (i % 5, i % 13)).toDF("i1", "j1") val df2 = (0 until 10).map(i => (i % 7, i % 11)).toDF("i2", "j2") df1.write.format("parquet").bucketBy(4, "i1", "j1").saveAsTable("t1")df2.write.format("parquet").bucketBy(4, "i2", "j2").saveAsTable("t2") val t1 = spark.table("t1") val t2 = spark.table("t2") val join = t1.join(t2, t1("i1") === t2("j2") && t1("i1") === t2("i2")) join.explain == Physical Plan == *(5) SortMergeJoin [i1#26, i1#26], [j2#31, i2#30], Inner :- *(2) Sort [i1#26 ASC NULLS FIRST, i1#26 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(i1#26, i1#26, 4), true, [id=#69] : +- *(1) Project [i1#26, j1#27] : +- *(1) Filter isnotnull(i1#26) : +- *(1) ColumnarToRow : +- FileScan parquet default.t1[i1#26,j1#27] Batched: true, DataFilters: [isnotnull(i1#26)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int>, SelectedBucketsCount: 4 out of 4 +- *(4) Sort [j2#31 ASC NULLS FIRST, i2#30 ASC NULLS FIRST], false, 0. +- Exchange hashpartitioning(j2#31, i2#30, 4), true, [id=#79]. <===== This can be removed +- *(3) Project [i2#30, j2#31] +- *(3) Filter (((j2#31 = i2#30) AND isnotnull(j2#31)) AND isnotnull(i2#30)) +- *(3) ColumnarToRow +- FileScan parquet default.t2[i2#30,j2#31] Batched: true, DataFilters: [(j2#31 = i2#30), isnotnull(j2#31), isnotnull(i2#30)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(j2), IsNotNull(i2)], ReadSchema: struct<i2:int,j2:int>, SelectedBucketsCount: 4 out of 4 ``` 2. For the scenario 2), the current behavior does not handle `PartitioningCollection`: ``` val df1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1") val df2 = (0 until 100).map(i => (i % 7, i % 11)).toDF("i2", "j2") val df3 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i3", "j3") val join = df1.join(df2, df1("i1") === df2("i2") && df1("j1") === df2("j2")) // PartitioningCollection val join2 = join.join(df3, join("j1") === df3("j3") && join("i1") === df3("i3")) join2.explain == Physical Plan == *(9) SortMergeJoin [j1#8, i1#7], [j3#30, i3#29], Inner :- *(6) Sort [j1#8 ASC NULLS FIRST, i1#7 ASC NULLS FIRST], false, 0. <===== This can be removed : +- Exchange hashpartitioning(j1#8, i1#7, 5), true, [id=#58] <===== This can be removed : +- *(5) SortMergeJoin [i1#7, j1#8], [i2#18, j2#19], Inner : :- *(2) Sort [i1#7 ASC NULLS FIRST, j1#8 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(i1#7, j1#8, 5), true, [id=#45] : : +- *(1) Project [_1#2 AS i1#7, _2#3 AS j1#8] : : +- *(1) LocalTableScan [_1#2, _2#3] : +- *(4) Sort [i2#18 ASC NULLS FIRST, j2#19 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(i2#18, j2#19, 5), true, [id=#51] : +- *(3) Project [_1#13 AS i2#18, _2#14 AS j2#19] : +- *(3) LocalTableScan [_1#13, _2#14] +- *(8) Sort [j3#30 ASC NULLS FIRST, i3#29 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(j3#30, i3#29, 5), true, [id=#64] +- *(7) Project [_1#24 AS i3#29, _2#25 AS j3#30] +- *(7) LocalTableScan [_1#24, _2#25] ``` ### Does this PR introduce _any_ user-facing change? Yes, now from the above examples, the shuffle/sort nodes pointed by `This can be removed` are now removed: 1. Senario 1): ``` == Physical Plan == *(4) SortMergeJoin [i1#26, i1#26], [i2#30, j2#31], Inner :- *(2) Sort [i1#26 ASC NULLS FIRST, i1#26 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(i1#26, i1#26, 4), true, [id=#67] : +- *(1) Project [i1#26, j1#27] : +- *(1) Filter isnotnull(i1#26) : +- *(1) ColumnarToRow : +- FileScan parquet default.t1[i1#26,j1#27] Batched: true, DataFilters: [isnotnull(i1#26)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int>, SelectedBucketsCount: 4 out of 4 +- *(3) Sort [i2#30 ASC NULLS FIRST, j2#31 ASC NULLS FIRST], false, 0 +- *(3) Project [i2#30, j2#31] +- *(3) Filter (((j2#31 = i2#30) AND isnotnull(j2#31)) AND isnotnull(i2#30)) +- *(3) ColumnarToRow +- FileScan parquet default.t2[i2#30,j2#31] Batched: true, DataFilters: [(j2#31 = i2#30), isnotnull(j2#31), isnotnull(i2#30)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(j2), IsNotNull(i2)], ReadSchema: struct<i2:int,j2:int>, SelectedBucketsCount: 4 out of 4 ``` 2. Scenario 2): ``` == Physical Plan == *(8) SortMergeJoin [i1#7, j1#8], [i3#29, j3#30], Inner :- *(5) SortMergeJoin [i1#7, j1#8], [i2#18, j2#19], Inner : :- *(2) Sort [i1#7 ASC NULLS FIRST, j1#8 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(i1#7, j1#8, 5), true, [id=#43] : : +- *(1) Project [_1#2 AS i1#7, _2#3 AS j1#8] : : +- *(1) LocalTableScan [_1#2, _2#3] : +- *(4) Sort [i2#18 ASC NULLS FIRST, j2#19 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(i2#18, j2#19, 5), true, [id=#49] : +- *(3) Project [_1#13 AS i2#18, _2#14 AS j2#19] : +- *(3) LocalTableScan [_1#13, _2#14] +- *(7) Sort [i3#29 ASC NULLS FIRST, j3#30 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(i3#29, j3#30, 5), true, [id=#58] +- *(6) Project [_1#24 AS i3#29, _2#25 AS j3#30] +- *(6) LocalTableScan [_1#24, _2#25] ``` ### How was this patch tested? Added tests. Closes #29074 from imback82/reorder_keys. Authored-by: Terry Kim <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This method appears to be broken -- since it never removes
anything from messages, and it adds new messages to it,
the while loop is an infinite loop. The method also does not appear
to have ever been used since the code was added in 2012, so
this commit removes it.
cc @mateiz who originally added this method in case there's a reason it should be here! (63051dd)