forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[SPARK-18364][YARN] Expose metrics for YarnShuffleService
## What changes were proposed in this pull request? This PR is follow-up of closed apache#17401 which only ended due to of inactivity, but its still nice feature to have. Given review by jerryshao taken in consideration and edited: - VisibleForTesting deleted because of dependency conflicts - removed unnecessary reflection for `MetricsSystemImpl` - added more available types for gauge ## How was this patch tested? Manual deploy of new yarn-shuffle jar into a Node Manager and verifying that the metrics appear in the Node Manager-standard location. This is JMX with an query endpoint running on `hostname:port` Resulting metrics look like this: ``` curl -sk -XGET hostname:port | grep -v '#' | grep 'shuffleService' hadoop_nodemanager_openblockrequestlatencymillis_rate15{name="shuffleService",} 0.31428910657834713 hadoop_nodemanager_blocktransferratebytes_rate15{name="shuffleService",} 566144.9983653595 hadoop_nodemanager_blocktransferratebytes_ratemean{name="shuffleService",} 2464409.9678099006 hadoop_nodemanager_openblockrequestlatencymillis_rate1{name="shuffleService",} 1.2893844732240272 hadoop_nodemanager_registeredexecutorssize{name="shuffleService",} 2.0 hadoop_nodemanager_openblockrequestlatencymillis_ratemean{name="shuffleService",} 1.255574678369966 hadoop_nodemanager_openblockrequestlatencymillis_count{name="shuffleService",} 315.0 hadoop_nodemanager_openblockrequestlatencymillis_rate5{name="shuffleService",} 0.7661929192569739 hadoop_nodemanager_registerexecutorrequestlatencymillis_ratemean{name="shuffleService",} 0.0 hadoop_nodemanager_registerexecutorrequestlatencymillis_count{name="shuffleService",} 0.0 hadoop_nodemanager_registerexecutorrequestlatencymillis_rate1{name="shuffleService",} 0.0 hadoop_nodemanager_registerexecutorrequestlatencymillis_rate5{name="shuffleService",} 0.0 hadoop_nodemanager_blocktransferratebytes_count{name="shuffleService",} 6.18271213E8 hadoop_nodemanager_registerexecutorrequestlatencymillis_rate15{name="shuffleService",} 0.0 hadoop_nodemanager_blocktransferratebytes_rate5{name="shuffleService",} 1154114.4881816586 hadoop_nodemanager_blocktransferratebytes_rate1{name="shuffleService",} 574745.0749848988 ``` Closes apache#22485 from mareksimunek/SPARK-18364. Lead-authored-by: marek.simunek <[email protected]> Co-authored-by: Andrew Ash <[email protected]> Signed-off-by: Thomas Graves <[email protected]> (cherry picked from commit a802c69)
- Loading branch information
1 parent
b71060f
commit 9ac4818
Showing
3 changed files
with
221 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
137 changes: 137 additions & 0 deletions
137
...n/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.network.yarn; | ||
|
||
import java.util.Map; | ||
|
||
import com.codahale.metrics.*; | ||
import org.apache.hadoop.metrics2.MetricsCollector; | ||
import org.apache.hadoop.metrics2.MetricsInfo; | ||
import org.apache.hadoop.metrics2.MetricsRecordBuilder; | ||
import org.apache.hadoop.metrics2.MetricsSource; | ||
|
||
/** | ||
* Forward {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.ShuffleMetrics} | ||
* to hadoop metrics system. | ||
* NodeManager by default exposes JMX endpoint where can be collected. | ||
*/ | ||
class YarnShuffleServiceMetrics implements MetricsSource { | ||
|
||
private final MetricSet metricSet; | ||
|
||
YarnShuffleServiceMetrics(MetricSet metricSet) { | ||
this.metricSet = metricSet; | ||
} | ||
|
||
/** | ||
* Get metrics from the source | ||
* | ||
* @param collector to contain the resulting metrics snapshot | ||
* @param all if true, return all metrics even if unchanged. | ||
*/ | ||
@Override | ||
public void getMetrics(MetricsCollector collector, boolean all) { | ||
MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("sparkShuffleService"); | ||
|
||
for (Map.Entry<String, Metric> entry : metricSet.getMetrics().entrySet()) { | ||
collectMetric(metricsRecordBuilder, entry.getKey(), entry.getValue()); | ||
} | ||
} | ||
|
||
/** | ||
* The metric types used in | ||
* {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.ShuffleMetrics}. | ||
* Visible for testing. | ||
*/ | ||
public static void collectMetric( | ||
MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) { | ||
|
||
if (metric instanceof Timer) { | ||
Timer t = (Timer) metric; | ||
metricsRecordBuilder | ||
.addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name), | ||
t.getCount()) | ||
.addGauge( | ||
new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of timer " + name), | ||
t.getFifteenMinuteRate()) | ||
.addGauge( | ||
new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of timer " + name), | ||
t.getFiveMinuteRate()) | ||
.addGauge( | ||
new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of timer " + name), | ||
t.getOneMinuteRate()) | ||
.addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name), | ||
t.getMeanRate()); | ||
} else if (metric instanceof Meter) { | ||
Meter m = (Meter) metric; | ||
metricsRecordBuilder | ||
.addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of meter " + name), | ||
m.getCount()) | ||
.addGauge( | ||
new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of meter " + name), | ||
m.getFifteenMinuteRate()) | ||
.addGauge( | ||
new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of meter " + name), | ||
m.getFiveMinuteRate()) | ||
.addGauge( | ||
new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of meter " + name), | ||
m.getOneMinuteRate()) | ||
.addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of meter " + name), | ||
m.getMeanRate()); | ||
} else if (metric instanceof Gauge) { | ||
final Object gaugeValue = ((Gauge) metric).getValue(); | ||
if (gaugeValue instanceof Integer) { | ||
metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), (Integer) gaugeValue); | ||
} else if (gaugeValue instanceof Long) { | ||
metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), (Long) gaugeValue); | ||
} else if (gaugeValue instanceof Float) { | ||
metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), (Float) gaugeValue); | ||
} else if (gaugeValue instanceof Double) { | ||
metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), (Double) gaugeValue); | ||
} else { | ||
throw new IllegalStateException( | ||
"Not supported class type of metric[" + name + "] for value " + gaugeValue); | ||
} | ||
} | ||
} | ||
|
||
private static MetricsInfo getShuffleServiceMetricsInfo(String name) { | ||
return new ShuffleServiceMetricsInfo(name, "Value of gauge " + name); | ||
} | ||
|
||
private static class ShuffleServiceMetricsInfo implements MetricsInfo { | ||
|
||
private final String name; | ||
private final String description; | ||
|
||
ShuffleServiceMetricsInfo(String name, String description) { | ||
this.name = name; | ||
this.description = description; | ||
} | ||
|
||
@Override | ||
public String name() { | ||
return name; | ||
} | ||
|
||
@Override | ||
public String description() { | ||
return description; | ||
} | ||
} | ||
} |
73 changes: 73 additions & 0 deletions
73
...rs/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.spark.network.yarn | ||
|
||
import scala.collection.JavaConverters._ | ||
|
||
import org.apache.hadoop.metrics2.MetricsRecordBuilder | ||
import org.mockito.Matchers._ | ||
import org.mockito.Mockito.{mock, times, verify, when} | ||
import org.scalatest.Matchers | ||
|
||
import org.apache.spark.SparkFunSuite | ||
import org.apache.spark.network.server.OneForOneStreamManager | ||
import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleBlockResolver} | ||
|
||
class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers { | ||
|
||
val streamManager = mock(classOf[OneForOneStreamManager]) | ||
val blockResolver = mock(classOf[ExternalShuffleBlockResolver]) | ||
when(blockResolver.getRegisteredExecutorsSize).thenReturn(42) | ||
|
||
val metrics = new ExternalShuffleBlockHandler(streamManager, blockResolver).getAllMetrics | ||
|
||
test("metrics named as expected") { | ||
val allMetrics = Set( | ||
"openBlockRequestLatencyMillis", "registerExecutorRequestLatencyMillis", | ||
"blockTransferRateBytes", "registeredExecutorsSize") | ||
|
||
metrics.getMetrics.keySet().asScala should be (allMetrics) | ||
} | ||
|
||
// these three metrics have the same effect on the collector | ||
for (testname <- Seq("openBlockRequestLatencyMillis", | ||
"registerExecutorRequestLatencyMillis", | ||
"blockTransferRateBytes")) { | ||
test(s"$testname - collector receives correct types") { | ||
val builder = mock(classOf[MetricsRecordBuilder]) | ||
when(builder.addCounter(any(), anyLong())).thenReturn(builder) | ||
when(builder.addGauge(any(), anyDouble())).thenReturn(builder) | ||
|
||
YarnShuffleServiceMetrics.collectMetric(builder, testname, | ||
metrics.getMetrics.get(testname)) | ||
|
||
verify(builder).addCounter(anyObject(), anyLong()) | ||
verify(builder, times(4)).addGauge(anyObject(), anyDouble()) | ||
} | ||
} | ||
|
||
// this metric writes only one gauge to the collector | ||
test("registeredExecutorsSize - collector receives correct types") { | ||
val builder = mock(classOf[MetricsRecordBuilder]) | ||
|
||
YarnShuffleServiceMetrics.collectMetric(builder, "registeredExecutorsSize", | ||
metrics.getMetrics.get("registeredExecutorsSize")) | ||
|
||
// only one | ||
verify(builder).addGauge(anyObject(), anyInt()) | ||
} | ||
} |