-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30069][CORE][YARN] Clean up non-shuffle disk block manager files following executor exists on YARN #29378
Conversation
…es following executor exists on YARN
Test build #127138 has finished for PR 29378 at commit
|
Test build #127183 has finished for PR 29378 at commit
|
retest this please |
Test build #127185 has finished for PR 29378 at commit
|
core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
Outdated
Show resolved
Hide resolved
|
||
/** Check if disk block manager has a block. */ | ||
def containsBlock(blockId: BlockId): Boolean = { | ||
getFile(blockId.name).exists() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a performance optimization? I guess we can leave the original style because def getFile(blockId: BlockId): File
still exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def getFile(blockId: BlockId)
will check the blockId is temp block or not, if is a temp block, then storage to container directory.
def getFile(filename: String)
just storage a block to local directory.
Looks like we can change def getFile(filename: String)
to private since this PR changes all invoker outside class to def getFile(blockId: BlockId)
, such as:
- val targetFile = diskManager.getFile(targetBlockId.name)
+ val targetFile = diskManager.getFile(targetBlockId)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So def getFile(filename: String)
is only used for testing now. So I added private[spark] on it. Actually, better to remove it but test "SPARK-22227: non-block files are skipped" still uses this interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the description for the interface changing part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I refactor the test case and remove def getFile(filename: String)
now!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for explanation. It's a little confusing to me, but I'll take a look more in that way later.
@@ -81,17 +99,29 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea | |||
new File(subDir, filename) | |||
} | |||
|
|||
def getFile(blockId: BlockId): File = getFile(blockId.name) | |||
/** | |||
* Used only for testing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is wrong because this is used in this class.
Did you want to write "Visible for testing"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this method. See my above comments.
private val shutdownHook = addShutdownHook() | ||
|
||
/** Looks up a file by hashing it into one of our local subdirectories. */ | ||
// This method should be kept in sync with | ||
// org.apache.spark.network.shuffle.ExecutorDiskUtils#getFile(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@LantaoJin . This comment reminds me that we need to handle ExecutorDiskUtils
properly. Do we need to update ExecutorDiskUtils.getFile
? Or, do we need to remove this comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. I didn't change any behaviour for non-temp files. This ExecutorDiskUtils.getFile
is handing the non-temp files such as start with "shuffle_", "rdd_". So it should still be kept in sync with DiskBlockManager.getFile
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can see the method body is not changed. Just change the signature from def getFile(filename: String): File
to private def getFile(localDirs: Array[File], subDirs: Array[Array[File]], subDirsPerLocalDir: Int, filename: String): File
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this function need to have unused parameters? Maybe, I'm still confused with this frame change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def getFile(blockId: BlockId): File = {
if (containerDirEnabled && blockId.isTemp) {
getFile(containerDirs, subContainerDirs, subDirsPerLocalDir, blockId.name)
} else {
getFile(localDirs, subDirs, subDirsPerLocalDir, blockId.name)
}
}
Now we just storage temp files and non-temp files to different root paths. The algorithm how to find a file doesn't change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this function need to have unused parameters?
Which parameters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this function need to have unused parameters?
Ah, you must mean the getFile
in DiskBlockManager
has 4 parameters and getFile
in ExecutorDiskUtils
has 3 parameters. ExecutorDiskUtils is a utility, so 3 parameters can define a file path structure. The parameter subDirs
in DiskBlockManager
only is a local variable which used in other parts. It does not impact the finding algorithm result. I can use 3 parameters in DiskBlockManager.getFile
as well. But I need to use different array by a condition, Like:
private def getFile(localDirs: Array[File], subDirsPerLocalDir: Int, filename: String): File = {
...
val properSubDirs = if (someConditions) {
subDirs
} else {
subContainerDirs
}
val old = properSubDirs(dirId)(subDirId)
...
But the someConditions
is a little hard to determine in this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the getFile
in DiskBlockManager
has 4 parameters:
private def getFile(localDirs: Array[File], subDirs: Array[Array[File]],
subDirsPerLocalDir: Int, filename: String): File
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need to pay more attention on how parameters in getFile
. Before this PR. There was only 1 parameter filename: String
in DiskBlockManager and 3 parameters in ExecutorDiskUtils
.
Test build #127253 has finished for PR 29378 at commit
|
Test build #127256 has finished for PR 29378 at commit
|
Test build #127254 has finished for PR 29378 at commit
|
retest this please |
1 similar comment
retest this please |
Test build #127269 has finished for PR 29378 at commit
|
Test build #127271 has finished for PR 29378 at commit
|
so I haven't looked at this fully but from skimming it, it seems like we are introducing yarn specific concepts into the disk block manager. Maybe its just the wording of calling it container dir for instance, but I would like to see as much yarn specific stuff not in there. At least put it in Utils like we have getYarnLocalDirs. Also perhaps just name it something generic like temp file directories. Can this be reused by standalone mode and others so we keep that logic similar between the cluster managers? The standalone fix looks like it recurses all the files looking for those names, if it could just remove a single directory would seem more efficient but I would have to look more in depth. |
@tgravescs Yes. I see your concern. I am looking for a better refactor. |
Test build #127441 has finished for PR 29378 at commit
|
Test build #127443 has finished for PR 29378 at commit
|
Gently ping @tgravescs @dongjoon-hyun |
I'll try to take a look in the next few days |
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
This is a reopen for PR #26711 which closed as stale.
#21390 fixed the same problem on Standalone mode. On YARN, this issue still exists. Especially in a long-running thrift-server, its executors could be dead but the application never stops.
This patch is very straightforward:
We create these "temp_xxx " files under the YARN container dirs when the executor is running in YARN container. So these temp_xxx files could be cleaned when the containers exit.
Why are the changes needed?
Currently, we only clean up the local directories on an application removed. However, when executors die and restart repeatedly, many temp files are left untouched in the local directories, which is undesired behavior and could cause disk space used up gradually. Especially, in a long-running service like Spark thrift-server with dynamic resource allocation disabled, it's very easy to cause local disk full.
From #21390 (comment), YARN only cleans container local dirs when a container (executor) is exited. But these files are not in container local dirs.
Does this PR introduce any user-facing change?
Remove
def getFile(filename: String): File
inDiskBlockManager
.We should invoke
def getFile(blockId: BlockId): File
instead, since this method will check a block temporary or not, then storage them to different directories.How was this patch tested?
Add a UT and manually test.