-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-2565. Update ShuffleReadMetrics as blocks are fetched #1507
Conversation
QA tests have started for PR 1507. This patch merges cleanly. |
QA results for PR 1507: |
} | ||
shuffleReadMetrics = Some(merged) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this outside the synchronized block?
At a high level, this depends on one of your other patches (#1056?) to incrementally send updates right? Is the idea that mergeShuffleReadMetrics will get called incrementally as the task runs, before sending partial results back to the driver? |
Exactly. The idea is to call mergeShuffleReadMetrics when we're about to send the metrics update. |
QA tests have started for PR 1507. This patch merges cleanly. |
if (address == blockManagerId) { | ||
numLocal = blockInfos.size | ||
readMetrics.localBlocksFetched += blockInfos.size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we don't care about this...but this results in the metrics reporting that local blocks have been fetched before they're actually read from disk. Is it too annoying to move this to where the blocks actually get read?
Thanks Sandy!! Just a few more small things. |
QA results for PR 1507: |
Jenkins, retest this please |
QA tests have started for PR 1507. This patch merges cleanly. |
QA results for PR 1507: |
Just tested this and observed the shuffle bytes read going up for in-progress tasks. |
@@ -191,7 +184,7 @@ object BlockFetcherIterator { | |||
} | |||
} | |||
logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " + | |||
(numLocal + numRemote) + " blocks") | |||
totalBlocks + " blocks") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this ever used other than for logging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naw
QA tests have started for PR 1507. This patch DID NOT merge cleanly! |
* dependency, and merge these metrics before reporting them to the driver. This method returns | ||
* a ShuffleReadMetrics for a dependency and registers it for merging later. | ||
*/ | ||
def createShuffleReadMetricsForDependency(): ShuffleReadMetrics = synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be private[spark]?
QA tests have started for PR 1507. This patch merges cleanly. |
QA results for PR 1507: |
QA results for PR 1507: |
QA tests have started for PR 1507. This patch merges cleanly. |
QA results for PR 1507: |
It's failing the 3 flaky tests that have been failing many PRs lately... test this please |
QA tests have started for PR 1507. This patch merges cleanly. |
QA results for PR 1507: |
*/ | ||
private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None | ||
|
||
def shuffleReadMetrics = _shuffleReadMetrics | ||
def shuffleReadMetrics() = _shuffleReadMetrics |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: since this doesn't mutate internal state the original lack of parentheses is correct style.
Okay I merged this with the minor style change. Thanks Sandy! |
Author: Sandy Ryza <[email protected]> Closes #1507 from sryza/sandy-spark-2565 and squashes the following commits: 74dad41 [Sandy Ryza] SPARK-2565. Update ShuffleReadMetrics as blocks are fetched (cherry picked from commit 4c51098) Signed-off-by: Patrick Wendell <[email protected]>
Author: Sandy Ryza <[email protected]> Closes apache#1507 from sryza/sandy-spark-2565 and squashes the following commits: 74dad41 [Sandy Ryza] SPARK-2565. Update ShuffleReadMetrics as blocks are fetched
…nabled` by default (apache#1507) ### What changes were proposed in this pull request? This is a backport from Spark 3.3(d762205) to 3.2. rdar://99066157 (SPARK-39846][CORE] Enable spark.dynamicAllocation.shuffleTracking.enabled by default) This PR aims to enable `spark.dynamicAllocation.shuffleTracking.enabled` by default in Apache Spark 3.4 when `spark.dynamicAllocation.enabled=true` and `spark.shuffle.service.enabled=false` ### Why are the changes needed? Here is a brief history around `spark.dynamicAllocation.shuffleTracking.enabled`. - Apache Spark 3.0.0 added it via SPARK-27963 for K8s environment. > One immediate use case is the ability to use dynamic allocation on Kubernetes, which doesn't yet have that service. - Apache Spark 3.1.1 made K8s GA via SPARK-33005 and started to used it in K8s widely. - Apache Spark 3.2.0 started to support shuffle data recovery on the reused PVCs via SPARK-35593 - Apache Spark 3.3.0 removed `Experimental` tag from it via SPARK-39322. - Apache Spark 3.4.0 will enable it by default via SPARK-39846 (this PR) to help Spark K8s users to dynamic allocation more easily. ### Does this PR introduce _any_ user-facing change? The `Core` migration guide is updated. ### How was this patch tested? Pass the CIs including K8s IT GitHub Action job.
No description provided.