-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-2974] [SPARK-2975] Fix two bugs related to spark.local.dirs #2002
Conversation
Now, the logic for determining the precedence of the different configuration options is in Utils.getOrCreateLocalRootDirs(). DiskBlockManager now accepts a SparkConf rather than a list of root directories and I’ve updated other tests to reflect this.
QA tests have started for PR 2002 at commit
|
*/ | ||
private[spark] def getOrCreateLocalRootDirs(conf: SparkConf): Array[String] = { | ||
val isYarn = java.lang.Boolean.valueOf( | ||
System.getProperty("SPARK_YARN_MODE", conf.getenv("SPARK_YARN_MODE"))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tgravescs, it would be great if you could take a look at the YARN logic below to see whether I've gotten it right. Most of the code is a straight copy-paste from the old code in Executor, but I want to make sure that I'm not overlooking any corner-cases related to SPARK_YARN_MODE being set on the driver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes look fine for yarn. This way does have more logic to look it up and check for existence every time its called, but as long as this isn't called very many times it shouldn't be a problem. I think its only called 3 times right now (DiskManager, Broadcast, and potentially on fetchfile). I guess if you are fetching a lot of files and have a lot of disks (you can easily have systems with 12+ disks) it could be excessive to keep looking up and checking.
There is also logic in the ApplicationMaster and ExecutorLauncher (for both stable and alpha) for yarn for setting the spark.local.dir. I think we can also remove that code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry actually there is a problem with this in yarn client mode. In yarn client mode the spark driver is on the launcher/gateway box (not in yarn cluster) and with these changes the driver tries to look up yarn directories even though its not on yarn.
14/08/18 16:21:59 INFO SparkEnv: Registering BlockManagerMaster
14/08/18 16:21:59 DEBUG BlockManagerMasterActor: [actor] received message ExpireDeadHosts from Actor[akka://spark/user/BlockManagerMaster#-836784892]
14/08/18 16:21:59 DEBUG BlockManagerMasterActor: [actor] handled message (2.386093 ms) ExpireDeadHosts from Actor[akka://spark/user/BlockManagerMaster#-836784892]
Exception in thread "main" java.lang.Exception: Yarn Local dirs can't be empty
at org.apache.spark.util.Utils$.getYarnLocalDirs(Utils.scala:501)
at org.apache.spark.util.Utils$.getOrCreateLocalRootDirs(Utils.scala:471)
at org.apache.spark.storage.DiskBlockManager.createLocalDirs(DiskBlockManager.scala:137)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this becomes really expensive (e.g. many calls to fetchFile), you could cache the result in a field of SparkConf (or just make this into a private instance method of SparkConf). I could make those changes now, or we could wait to see if this is an actual performance problem.
I guess the correct YARN logic is "if this process is running inside of a YARN container, then use the YARN directories". If so, I need a way to determine isRunningInYarnContainer.
Is SPARK_YARN_MODE
the right way to do this, or should I be using some other signal (or does SPARK_YARN_MODE need to be set differently so that it's not set on the driver when running in client mode)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this commit passed Jenkins despite this bug, that's an indicator that we're not testing this yarn-client code path. I'll open a JIRA for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/cc @andrewor14 (since I think you're our resident YARN expert).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SPARK_YARN_USER_ENV is optional. We could make sure some other env variable is always set when we launch a container in yarn though.
I was looking for something else we could use that would be reliable. For instance, you could use the LOCAL_DIRS or YARN_LOCAL_DIRS as an indicator since YARN always set those. But LOCAL_DIRS is generic enough I guess a user could have it set for something else. There are other variables that YARN sets for you. I think what might be a good idea is to check to see if either CONTAINER_ID or NM_HOST are set. The reason to check for both is hadoop 2.x sets CONTAINER_ID which I prefer to use but hadoop 0.23 doesn't and NM_HOST is set on both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could make sure some other env variable is always set when we launch a container in yarn though.
Yeah, that was my suggestion: set a new variable, say, IN_YARN_CONTAINER, using the same code path that we use to expose SPARK_YARN_USER_ENV to the container. Will this work, though? Does every container that we launch go through this code path, including workers and the Spark driver (if running in yarn-cluster mode)?
I think what might be a good idea is to check to see if either CONTAINER_ID or NM_HOST are set.
I considered an approach like this, too (but I didn't know which variables to use), but it might be safer to set our own variable and not have to worry about compatibility across YARN versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes the same code paths as SPARK_YARN_USER_ENV would work.
I consider all the env variables being set by hadoop 2.x as api so I don't expect them to change going forward. To many applications are relying on them. I would actually prefer checking for the yarn ones then having to maintain it ourselves. its adding one more env variable that we then have to maintain. If you want to use just one then I say use NM_HOST.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're more familiar with YARN than me, so I'll trust your judgement and check for both NM_HOST and CONTAINER_ID. Commit coming soon...
/cc @YanTangZhai, who has a similar PR to address one of these issues (#1281) and who added the patch that ignores invalid local directories (#1274). |
QA tests have finished for PR 2002 at commit
|
} | ||
|
||
/** | ||
* Gets or creates the directories listed in spark.local.dir or SPARK_LOCAL_DIRS, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be worth adding note about YARN here.
Alright, I've updated this to check whether we're running inside of a YARN container (rather than checking SPARK_YARN_MODE). |
QA tests have started for PR 2002 at commit
|
QA tests have finished for PR 2002 at commit
|
Hmm, looks like a test in CheckpointSuite failed, but I think it's unrelated:
|
Jenkins, retest this please. |
QA tests have started for PR 2002 at commit
|
QA tests have finished for PR 2002 at commit
|
@@ -62,7 +62,7 @@ private[spark] class PythonRDD( | |||
val env = SparkEnv.get | |||
val localdir = env.blockManager.diskBlockManager.localDirs.map( | |||
f => f.getPath()).mkString(",") | |||
envVars += ("SPARK_LOCAL_DIR" -> localdir) // it's also used in monitor thread | |||
envVars += ("SPARK_LOCAL_DIRS" -> localdir) // it's also used in monitor thread |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was simply a typo before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so. It looks like it was innocuous because both the Java and Python sides had the same typo.
I suppose it might be cleaner here to read the local directories using the methods in Utils instead of reading them out of diskBlockManager, but I think this code is correct as written.
Hey Josh - this LGTM with some minor comments. |
QA tests have started for PR 2002 at commit
|
QA tests have finished for PR 2002 at commit
|
Just tested this with YARN on HDP-2.1 - Hadoop 2.4 and it worked fine in both |
} | ||
|
||
private[spark] def isRunningInYarnContainer(conf: SparkConf): Boolean = { | ||
conf.getenv("NM_HOST") != null || conf.getenv("CONTAINER_ID") != null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you reverse the checks and have CONTAINER_ID first. Also it might be nice to add a small comment saying those env variables are set by yarn.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for changing but after thinking about this some more could you change the NM_HOST to be YARN_LOCAL_DIRS (for hadoop 0.23). We are already relying on that below so one less dependency. Then for hadoop 2.x we use CONTAINER_ID which I think is better to rely on.
I tested that out (using YARN_LOCAL_DIRS) on hadoop 0.23 and it works fine.
on testing yarn did you just verify it ran or did you verify it is using the correct yarn directories? Its fairly simple looking at the logs to see the latter. Of course it assume you have yarn setup to use something other then tmp. I'll try it out on 0.23. |
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir"))) | ||
} | ||
val rootDirs = confValue.split(',') | ||
logDebug(s"Getting/creating local root dirs at '$rootDirs'") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this debug doesn't really print anything useful:
14/08/19 13:46:47 DEBUG Utils: Getting/creating local root dirs at '[Ljava.lang.String;@1278ae3'
- check for different env. vars. - fix logging message
QA tests have started for PR 2002 at commit
|
@tgravescs Thanks for the review comments; I've incorporated your suggestions in my latest patch. Good point about verifying that YARN uses the right directories rather than simply not crashing; I'll check this on an actual YARN cluster in a little bit. |
QA tests have finished for PR 2002 at commit
|
I tried this out on 0.23 and hadoop 2.4 and it is working fine for me. @pwendell were all your comments addressed or did you have further comments? |
My comments are all set. Jenkins, test this please. |
QA tests have started for PR 2002 at commit
|
QA tests have finished for PR 2002 at commit
|
Jenkins, test this please. |
QA tests have started for PR 2002 at commit
|
QA tests have finished for PR 2002 at commit
|
Jenkins, test this please. |
QA tests have started for PR 2002 at commit
|
QA tests have finished for PR 2002 at commit
|
Okay I think we're good on this one - going to pull it in. Thanks @JoshRosen and @tgravescs . |
This PR fixes two bugs related to `spark.local.dirs` and `SPARK_LOCAL_DIRS`, one where `Utils.getLocalDir()` might return an invalid directory (SPARK-2974) and another where the `SPARK_LOCAL_DIRS` override didn't affect the driver, which could cause problems when running tasks in local mode (SPARK-2975). This patch fixes both issues: the new `Utils.getOrCreateLocalRootDirs(conf: SparkConf)` utility method manages the creation of local directories and handles the precedence among the different configuration options, so we should see the same behavior whether we're running in local mode or on a worker. It's kind of a pain to mock out environment variables in tests (no easy way to mock System.getenv), so I added a `private[spark]` method to SparkConf for accessing environment variables (by default, it just delegates to System.getenv). By subclassing SparkConf and overriding this method, we can mock out SPARK_LOCAL_DIRS in tests. I also fixed a typo in PySpark where we used `SPARK_LOCAL_DIR` instead of `SPARK_LOCAL_DIRS` (I think this was technically innocuous, but it seemed worth fixing). Author: Josh Rosen <[email protected]> Closes #2002 from JoshRosen/local-dirs and squashes the following commits: efad8c6 [Josh Rosen] Address review comments: 1dec709 [Josh Rosen] Minor updates to Javadocs. 7f36999 [Josh Rosen] Use env vars to detect if running in YARN container. 399ac25 [Josh Rosen] Update getLocalDir() documentation. bb3ad89 [Josh Rosen] Remove duplicated YARN getLocalDirs() code. 3e92d44 [Josh Rosen] Move local dirs override logic into Utils; fix bugs: b2c4736 [Josh Rosen] Add failing tests for SPARK-2974 and SPARK-2975. 007298b [Josh Rosen] Allow environment variables to be mocked in tests. 6d9259b [Josh Rosen] Fix typo in PySpark: SPARK_LOCAL_DIR should be SPARK_LOCAL_DIRS (cherry picked from commit ebcb94f) Signed-off-by: Patrick Wendell <[email protected]>
This PR fixes two bugs related to `spark.local.dirs` and `SPARK_LOCAL_DIRS`, one where `Utils.getLocalDir()` might return an invalid directory (SPARK-2974) and another where the `SPARK_LOCAL_DIRS` override didn't affect the driver, which could cause problems when running tasks in local mode (SPARK-2975). This patch fixes both issues: the new `Utils.getOrCreateLocalRootDirs(conf: SparkConf)` utility method manages the creation of local directories and handles the precedence among the different configuration options, so we should see the same behavior whether we're running in local mode or on a worker. It's kind of a pain to mock out environment variables in tests (no easy way to mock System.getenv), so I added a `private[spark]` method to SparkConf for accessing environment variables (by default, it just delegates to System.getenv). By subclassing SparkConf and overriding this method, we can mock out SPARK_LOCAL_DIRS in tests. I also fixed a typo in PySpark where we used `SPARK_LOCAL_DIR` instead of `SPARK_LOCAL_DIRS` (I think this was technically innocuous, but it seemed worth fixing). Author: Josh Rosen <[email protected]> Closes apache#2002 from JoshRosen/local-dirs and squashes the following commits: efad8c6 [Josh Rosen] Address review comments: 1dec709 [Josh Rosen] Minor updates to Javadocs. 7f36999 [Josh Rosen] Use env vars to detect if running in YARN container. 399ac25 [Josh Rosen] Update getLocalDir() documentation. bb3ad89 [Josh Rosen] Remove duplicated YARN getLocalDirs() code. 3e92d44 [Josh Rosen] Move local dirs override logic into Utils; fix bugs: b2c4736 [Josh Rosen] Add failing tests for SPARK-2974 and SPARK-2975. 007298b [Josh Rosen] Allow environment variables to be mocked in tests. 6d9259b [Josh Rosen] Fix typo in PySpark: SPARK_LOCAL_DIR should be SPARK_LOCAL_DIRS
This PR fixes two bugs related to
spark.local.dirs
andSPARK_LOCAL_DIRS
, one whereUtils.getLocalDir()
might return an invalid directory (SPARK-2974) and another where theSPARK_LOCAL_DIRS
override didn't affect the driver, which could cause problems when running tasks in local mode (SPARK-2975).This patch fixes both issues: the new
Utils.getOrCreateLocalRootDirs(conf: SparkConf)
utility method manages the creation of local directories and handles the precedence among the different configuration options, so we should see the same behavior whether we're running in local mode or on a worker.It's kind of a pain to mock out environment variables in tests (no easy way to mock System.getenv), so I added a
private[spark]
method to SparkConf for accessing environment variables (by default, it just delegates to System.getenv). By subclassing SparkConf and overriding this method, we can mock out SPARK_LOCAL_DIRS in tests.I also fixed a typo in PySpark where we used
SPARK_LOCAL_DIR
instead ofSPARK_LOCAL_DIRS
(I think this was technically innocuous, but it seemed worth fixing).