-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-1252. On YARN, use container-log4j.properties for executors #148
Conversation
Merged build triggered. |
Merged build started. |
Merged build finished. |
All automated tests passed. |
Seems reasonable to me. You still working on this or is it good to go? |
I am not sure what the intent of this PR is. Also, the hardcoding of the config file, root logger levels, etc when we already ship a logging property file seems counter intutive - I am probably missing something here. |
@mridulm I think in YARN environments cluster operators can set a logging file on all of the machines to be shared across applications (e.g. Spark, MapReduce, etc). So the idea here is to point to that file. That said @sryza one thing here is that even if a user bundles a log4j file it will get ignored and replaced with the cluster default. That actually seems not ideal and I'm not sure there is a good way to detect whether the user has included their own file. |
But that would be to debug yarn/hadoop api's primarily - and no easy way to inject spark specific logging levels. I am curious why this was required actually. Currently, we have fairly fine grained control over logging from various packages/classes by redirecting logging output to stdout/stderr - which is actually quite heavily used (mute most of spark, enable user code; enable specific parts of spark for debug, etc) in user applications. Having said that, @tgravescs did the initial logging integration in yarn, so will defer to him though. |
To clarify, I am not saying we should not be configuring what is in container-log4j.properties - but we should be trying to do that while preserving the ability to configure (and override container-log4j.properties) via user specified logging config. |
Currently, Spark doesn't ship a log4j.properties. It uses the log4j.properties that comes from Hadoop. This log4j.properties is meant for Hadoop services, not YARN containers. container-log4j.properties is a log4j configuration that Hadoop provides for containers to use. Without this change, is there a way to control executor logging without turning on the config option that puts the user classpath first and then including a log4j.properties in the app jar? This doesn't seem like a good way to configure logging to me. Though with the change, users can still control the change by doing the same, but instead including a file named container-log4j.properties in their app jar. I would be in favor of providing better ways for users to configure logging for executors, but, if I understand correctly, this change isn't making things worse. |
There is a user exposed option to configure log4j when run in yarn - which On Sun, Mar 16, 2014 at 2:25 AM, Sandy Ryza [email protected]:
|
@sryza when a user builds an application assembly jar, they are allowed to bundle their own log4j.properties file in the jar. Is this not working for you on YARN? Spark's own build intentionally ignores the Hadoop log4j.properties file so that it won't be detected: https://github.com/apache/spark/blob/master/assembly/pom.xml#L130 If a user is building their own assembly jar they would need to similarly exclude other |
Unless you are a spark developer, including at Yahoo, the person building the assembly jar is not the same as the person using spark : so depending on assembled jar containing the logging config is not a practical solution. |
@mridulm I'm referring to the application jar, not the Spark assembly. The spark assembly always excludes log4j.properties intentionally. |
Ah ok ! |
@pwendell Spark's build excluding log4j.properties is not enough to keep it off the executor classpath. Executor classpaths include the Hadoop jars as installed locally on the cluster machines. And those include a log4j.properties. @mridulm My bad, I hadn't noticed that there was a way to pass a log4j.properties. In that case, I think the best thing would be to use container-log4j.properties by default, but, if the client gives one, to use that instead. What do you think? |
Sure, or merge it with the user provided logging config even if provided On Sun, Mar 16, 2014 at 11:55 AM, Sandy Ryza [email protected]:
|
It's unfortunate that Hadoop publishes the log4j.properties file directly in it's jars. This is exactly why we've avoided doing this in Spark because it creates a weird situation where you can't easily control logging preferences. @sryza @mridulm - I think what we need is a precedence order here:
|
Check for use of SPARK_LOG4J_CONF in yarn/ On Sun, Mar 16, 2014 at 1:32 PM, Patrick Wendell
|
As pointed out the SPARK_LOG4J_CONF env variable can be used to set the logging for spark on yarn. The one downside to making the yarn one default is that we now get different looking logs if the user just uses the spark one. In the default most things go to syslog file, and if I just put in the conf/log4j.properties by copying the template I won't get a syslog file and most things will be in stdout right? |
@tgravescs separating out log4j logs from stdout seems like a possible improvement to me, but if you think it will cause confusion, we can still use container-log4j.properties and use JVM props to configure it to log to stdout. |
I agree it could be an improvement. I just don't want to cause confusion when their logs go to a different file when they turn debug on. I'm not sure I have any other good ideas on ways around it. Unless we just document (or put lines in the log4j.properties.template for yarn) that on yarn to get the same behavior use org.apache.hadoop.yarn.ContainerLogAppender. @pwendell thoughts? |
Merged build triggered. |
Merged build started. |
Merged build finished. |
One or more automated tests failed |
Merged build triggered. |
Merged build started. |
I uploaded a patch with a new approach. In it, we use neither Hadoop's log4j.properties nor Hadoop's container-log4j.properties. I added a log4j-spark-container.properties file (with the same conf as conf/log4j.properties.template) that gets included in the Spark yarn jar. This is used by default, but if the user specifies a SPARK_LOG4J_CONF, that will get used instead. I think this should address the issues that y'all have pointed out with the original approach, but let me know if I'm missing anything. |
Can one of the admins verify this patch? |
log4j.rootCategory=INFO, console | ||
log4j.appender.console=org.apache.log4j.ConsoleAppender | ||
log4j.appender.console.target=System.err | ||
log4j.appender.console.layout=org.apache.log4j.PatternLayout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to have 2 different log4j properties files or can they go in common?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried putting them in common initially and they weren't showing up in the YARN jar. I just tried mucking around with Maven a bit and was able to get this to work, so I'm uploading a new patch. I'm not sure how to do the same in sbt though.
Merged build triggered. |
Merged build started. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
Just to be clear, the current status of the patch seems to be : (b) seems to be different from the original intent of the PR, but I guess if we cant merge the container logging config with ours, it cant be helped. |
Right, (b) is different than the original intent of the PR. The reason for copying Spark's log4j instead of Hadoop's was the concern brought up by @tgravescs earlier:
This applies to the master too. |
Changes look good. Sandy can you merge up to the latest master and I'll commit. |
Note the other option would be to change the conf/log4j.properties.template to be more like hadoops. |
Thanks Tom. I just rebased on master.
I don't have an opinion on this, but happy to make the change if you think it's the right thing. |
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13847/ |
Include appId in executor cmd line args add the appId back into the executor cmd line args. I also made a pretty lame regression test, just to make sure it doesn't get dropped in the future. not sure it will run on the build server, though, b/c `ExecutorRunner.buildCommandSeq()` expects to be abel to run the scripts in `bin`. (cherry picked from commit 3d4ad84) Signed-off-by: Reynold Xin <[email protected]>
container-log4j.properties is a file that YARN provides so that containers can have log4j.properties distinct from that of the NodeManagers. Logs now go to syslog, and stderr and stdout just have the process's standard err and standard out. I tested this on pseudo-distributed clusters for both yarn (Hadoop 2.2) and yarn-alpha (Hadoop 0.23.7)/ Author: Sandy Ryza <[email protected]> Closes apache#148 from sryza/sandy-spark-1252 and squashes the following commits: c0043b8 [Sandy Ryza] Put log4j.properties file under common 55823da [Sandy Ryza] Add license headers to new files 10934b8 [Sandy Ryza] Add log4j-spark-container.properties and support SPARK_LOG4J_CONF e74450b [Sandy Ryza] SPARK-1252. On YARN, use container-log4j.properties for executors
* [SPARK-475] Skip HDFS tests * Disable HDFS
Use role to export openrc in k8s cinder in-tree job
### What changes were proposed in this pull request? Currently `BroadcastHashJoinExec` and `ShuffledHashJoinExec` do not preserve children output ordering information (inherit from `SparkPlan.outputOrdering`, which is Nil). This can add unnecessary sort in complex queries involved multiple joins. Example: ``` withSQLConf( SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50") { val df1 = spark.range(100).select($"id".as("k1")) val df2 = spark.range(100).select($"id".as("k2")) val df3 = spark.range(3).select($"id".as("k3")) val df4 = spark.range(100).select($"id".as("k4")) val plan = df1.join(df2, $"k1" === $"k2") .join(df3, $"k1" === $"k3") .join(df4, $"k1" === $"k4") .queryExecution .executedPlan } ``` Current physical plan (extra sort on `k1` before top sort merge join): ``` *(9) SortMergeJoin [k1#220L], [k4#232L], Inner :- *(6) Sort [k1#220L ASC NULLS FIRST], false, 0 : +- *(6) BroadcastHashJoin [k1#220L], [k3#228L], Inner, BuildRight : :- *(6) SortMergeJoin [k1#220L], [k2#224L], Inner : : :- *(2) Sort [k1#220L ASC NULLS FIRST], false, 0 : : : +- Exchange hashpartitioning(k1#220L, 5), true, [id=#128] : : : +- *(1) Project [id#218L AS k1#220L] : : : +- *(1) Range (0, 100, step=1, splits=2) : : +- *(4) Sort [k2#224L ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(k2#224L, 5), true, [id=#134] : : +- *(3) Project [id#222L AS k2#224L] : : +- *(3) Range (0, 100, step=1, splits=2) : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#141] : +- *(5) Project [id#226L AS k3#228L] : +- *(5) Range (0, 3, step=1, splits=2) +- *(8) Sort [k4#232L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(k4#232L, 5), true, [id=#148] +- *(7) Project [id#230L AS k4#232L] +- *(7) Range (0, 100, step=1, splits=2) ``` Ideal physical plan (no extra sort on `k1` before top sort merge join): ``` *(9) SortMergeJoin [k1#220L], [k4#232L], Inner :- *(6) BroadcastHashJoin [k1#220L], [k3#228L], Inner, BuildRight : :- *(6) SortMergeJoin [k1#220L], [k2#224L], Inner : : :- *(2) Sort [k1#220L ASC NULLS FIRST], false, 0 : : : +- Exchange hashpartitioning(k1#220L, 5), true, [id=#127] : : : +- *(1) Project [id#218L AS k1#220L] : : : +- *(1) Range (0, 100, step=1, splits=2) : : +- *(4) Sort [k2#224L ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(k2#224L, 5), true, [id=#133] : : +- *(3) Project [id#222L AS k2#224L] : : +- *(3) Range (0, 100, step=1, splits=2) : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#140] : +- *(5) Project [id#226L AS k3#228L] : +- *(5) Range (0, 3, step=1, splits=2) +- *(8) Sort [k4#232L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(k4#232L, 5), true, [id=#146] +- *(7) Project [id#230L AS k4#232L] +- *(7) Range (0, 100, step=1, splits=2) ``` ### Why are the changes needed? To avoid unnecessary sort in query, and it has most impact when users read sorted bucketed table. Though the unnecessary sort is operating on already sorted data, it would have obvious negative impact on IO and query run time if the data is large and external sorting happens. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit test in `JoinSuite`. Closes #29181 from c21/ordering. Authored-by: Cheng Su <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…nt filters This is a WIP version of apache#37630 at commit 83c59ab5e7e2abfaf83abe7ec418f30a5c7a41ea, but we introduce the `spark.cloudera.sql.advancedSubqueryMerge.enabled` (default true) to disable the feature if needed. After apache#32298 we were able to merge scalar subquery plans. This PR is a follow-up improvement to the merging logic to be able to combine `Filter` nodes with different conditions if those conditions can be merged in an ancestor `Aggregate` node. Consider the following query with 2 subqueries: ``` SELECT (SELECT avg(a) FROM t WHERE c = 1) (SELECT sum(a) FROM t WHERE c = 2) ``` where the subqueries can be merged to: ``` SELECT avg(a) FILTER (WHERE c = 1), sum(b) FILTER (WHERE c = 2) FORM t WHERE c = 1 OR c = 2 ``` After this PR the 2 subqueries are merged to this optimized form: ``` == Optimized Logical Plan == Project [scalar-subquery#260 [].avg(a) AS scalarsubquery()apache#277, scalar-subquery#261 [].sum(b) AS scalarsubquery()#278L] : :- Project [named_struct(avg(a), avg(a)apache#268, sum(b), sum(b)#271L) AS mergedValue#286] : : +- Aggregate [avg(a#264) FILTER (WHERE propagatedFilter#285) AS avg(a)apache#268, sum(b#265) FILTER (WHERE propagatedFilter#284) AS sum(b)#271L] : : +- Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS propagatedFilter#285] : : +- Filter ((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2))) : : +- Relation spark_catalog.default.t[a#264,b#265,c#266] parquet : +- Project [named_struct(avg(a), avg(a)apache#268, sum(b), sum(b)#271L) AS mergedValue#286] : +- Aggregate [avg(a#264) FILTER (WHERE propagatedFilter#285) AS avg(a)apache#268, sum(b#265) FILTER (WHERE propagatedFilter#284) AS sum(b)#271L] : +- Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS propagatedFilter#285] : +- Filter ((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2))) : +- Relation spark_catalog.default.t[a#264,b#265,c#266] parquet +- OneRowRelation ``` and physical form: ``` == Physical Plan == *(1) Project [Subquery scalar-subquery#260, [id=apache#148].avg(a) AS scalarsubquery()apache#277, ReusedSubquery Subquery scalar-subquery#260, [id=apache#148].sum(b) AS scalarsubquery()#278L] : :- Subquery scalar-subquery#260, [id=apache#148] : : +- *(2) Project [named_struct(avg(a), avg(a)apache#268, sum(b), sum(b)#271L) AS mergedValue#286] : : +- *(2) HashAggregate(keys=[], functions=[avg(a#264), sum(b#265)], output=[avg(a)apache#268, sum(b)#271L]) : : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=143] : : +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#264) FILTER (WHERE propagatedFilter#285), partial_sum(b#265) FILTER (WHERE propagatedFilter#284)], output=[sum#288, count#289L, sum#290L]) : : +- *(1) Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS propagatedFilter#285] : : +- *(1) Filter ((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2))) : : +- *(1) ColumnarToRow : : +- FileScan parquet spark_catalog.default.t[a#264,b#265,c#266] Batched: true, DataFilters: [((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2)))], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [Or(And(IsNotNull(c),EqualTo(c,1)),And(IsNotNull(c),EqualTo(c,2)))], ReadSchema: struct<a:int,b:int,c:int> : +- ReusedSubquery Subquery scalar-subquery#260, [id=apache#148] +- *(1) Scan OneRowRelation[] ``` The optimization in this PR doesn't kick in if the filters (`c = 1`, `c = 2`) are partition or bucket filters (to avoid possible performance degradation), but allows merging pushed-down data filters depending on a new `spark.sql.planMerge.ignorePushedDataFilters` config value (default `true`). Performance improvement. ``` [info] TPCDS Snappy: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] q9 - Merge different filters off 9526 9634 97 0.0 244257993.6 1.0X [info] q9 - Merge different filters on 3798 3881 133 0.0 97381735.1 2.5X ``` The performance improvement in case of `q9` comes from merging 15 subqueries into 1 subquery (apache#32298 was able to merge 15 subqueries into 5). No. Existing and new UTs. Change-Id: Ibeab5772549660ed217707f9b7cdac39491bf096
container-log4j.properties is a file that YARN provides so that containers can have log4j.properties distinct from that of the NodeManagers.
Logs now go to syslog, and stderr and stdout just have the process's standard err and standard out.
I tested this on pseudo-distributed clusters for both yarn (Hadoop 2.2) and yarn-alpha (Hadoop 0.23.7)/