diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer.java index b52a34b6ad4..0914ca64671 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer.java @@ -55,7 +55,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.jmx.JMXJsonServlet; import org.apache.hadoop.log.LogLevel; -import org.apache.hadoop.metrics.MetricsServlet; +import org.apache.hadoop.metrics2.lib.MetricsServlet2; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authentication.server.AuthenticationFilter; @@ -98,6 +98,7 @@ */ @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce", "HBase"}) @InterfaceStability.Evolving +@SuppressWarnings("deprecation") public class HttpServer implements FilterContainer { public static final Log LOG = LogFactory.getLog(HttpServer.class); @@ -497,11 +498,16 @@ private void setContextAttributes(Context context, Configuration conf) { /** * Add default servlets. */ + @SuppressWarnings("deprecation") protected void addDefaultServlets() { // set up default servlets addServlet("stacks", "/stacks", StackServlet.class); addServlet("logLevel", "/logLevel", LogLevel.Servlet.class); - addServlet("metrics", "/metrics", MetricsServlet.class); + // legacy metrics servlet to show old "o.a.h.metrics" metrics data: + addServlet("metrics", "/metrics", + org.apache.hadoop.metrics.MetricsServlet.class); + // new metrics servlet to show new "o.a.h.metrics2" metrics data: + addServlet("metrics2", "/metrics2", MetricsServlet2.class); addServlet("jmx", "/jmx", JMXJsonServlet.class); addServlet("conf", "/conf", ConfServlet.class); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/MetricsServlet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/MetricsServlet.java index edfdc10c7c8..2335e07397f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/MetricsServlet.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/MetricsServlet.java @@ -36,6 +36,7 @@ import org.apache.hadoop.metrics.spi.OutputRecord; import org.apache.hadoop.metrics.spi.AbstractMetricsContext.MetricMap; import org.apache.hadoop.metrics.spi.AbstractMetricsContext.TagMap; +import org.apache.hadoop.metrics2.lib.MetricsServlet2; import org.mortbay.util.ajax.JSON; import org.mortbay.util.ajax.JSON.Output; @@ -43,9 +44,12 @@ * A servlet to print out metrics data. By default, the servlet returns a * textual representation (no promises are made for parseability), and * users can use "?format=json" for parseable output. + * + * @deprecated Consider to use {@link MetricsServlet2} instead. */ @InterfaceAudience.Private @InterfaceStability.Evolving +@Deprecated public class MetricsServlet extends HttpServlet { /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/spi/NoEmitMetricsContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/spi/NoEmitMetricsContext.java index 7d992ab314d..320e53fb47d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/spi/NoEmitMetricsContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/spi/NoEmitMetricsContext.java @@ -17,20 +17,22 @@ */ package org.apache.hadoop.metrics.spi; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.metrics.ContextFactory; -import org.apache.hadoop.metrics.MetricsServlet; /** * A MetricsContext that does not emit data, but, unlike NullContextWithUpdate, * does save it for retrieval with getAllRecords(). * - * This is useful if you want to support {@link MetricsServlet}, but + * This is useful if you want to support + * {@link org.apache.hadoop.metrics.MetricsServlet}, but * not emit metrics in any other way. */ @InterfaceAudience.Public @InterfaceStability.Evolving +@SuppressWarnings("deprecation") public class NoEmitMetricsContext extends AbstractMetricsContext { private static final String PERIOD_PROPERTY = "period"; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/spi/OutputRecord.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/spi/OutputRecord.java index d94c8ab46e1..e22a682ad1c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/spi/OutputRecord.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/spi/OutputRecord.java @@ -34,8 +34,8 @@ @InterfaceStability.Evolving public class OutputRecord { - private TagMap tagMap; - private MetricMap metricMap; + private final TagMap tagMap; + private final MetricMap metricMap; /** Creates a new instance of OutputRecord */ OutputRecord(TagMap tagMap, MetricMap metricMap) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MetricsServlet2.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MetricsServlet2.java new file mode 100644 index 00000000000..3d2e96505fe --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MetricsServlet2.java @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.lib; + +import java.io.IOException; +import java.io.Serializable; +import java.io.PrintWriter; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.commons.configuration.SubsetConfiguration; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.http.HttpServer; +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.MetricsSink; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.MetricsTag; +import org.apache.hadoop.metrics2.impl.MsInfo; +import org.mortbay.util.ajax.JSON; +import org.mortbay.util.ajax.JSON.Output; + +import com.google.common.annotations.VisibleForTesting; + +/** + * A servlet to print out metrics data. By default, the servlet returns a + * textual representation (no promises are made for parseability), and + * users can use "?format=json" for parseable output. + * + * This implementation is a re-implemented version of + * org.apache.hadoop.metrics.MetricsServlet that uses + * the new metrics API (org.apache.hadoop.metrics2). + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class MetricsServlet2 extends HttpServlet { + + private static final MetricsSystem metricsSystem + = DefaultMetricsSystem.initialize("MetricsServlet2"); + + private final ServletSink servletSink; + + public MetricsServlet2() { + servletSink = createServletSink(); + final String sinkName = servletSink.getSinkName(); + metricsSystem.register(sinkName, sinkName, servletSink); + } + + /* + * This method is visible for testing, but may be re-defined also for + * another need. + */ + @VisibleForTesting + protected ServletSink createServletSink() { + return new ServletSink(true); + } + + protected static class ServletSink implements MetricsSink, Serializable { + protected final boolean filterOutMetricsSystemTags; + private static int numInstances = 0; + private final String sinkName; + // map to accumulate the metrics data: + private Map>> metricsMap; + + public ServletSink(boolean filterOutMsTags) { + numInstances++; + filterOutMetricsSystemTags = filterOutMsTags; + // NB: register sinks with unique names to avoid collisions, + // because several instances of this servlet *may* in principle + // be created: + sinkName = "MetricsServlet2-Sink-"+numInstances; + clear(); + } + + protected final String getSinkName() { + return sinkName; + } + + @Override + public void init(SubsetConfiguration conf) { + // noop + } + + /* + * Collects all metric data, and returns a map: + * contextName -> recordName -> [ (tag->tagValue), (metric->metricValue) ]. + * The values are either String or Number. The final value is implemented + * as a list of TagsMetricsPair. + */ + @Override + public synchronized void putMetrics(final MetricsRecord record) { + final String recordContext = record.context(); + final String recordName = record.name(); + + Map> records = metricsMap.get(recordContext); + if (records == null) { + records = new TreeMap>(); + metricsMap.put(recordContext, records); + } + + List tagsMetricsPairList = records.get(recordName); + if (tagsMetricsPairList == null) { + final TreeMap tagMap = new TreeMap(); + final TreeMap metricMap = new TreeMap(); + // NB: ordinary List would grow infinitely, so we use singleton list there. + // However, List interface is still needed to provide the + // expected JSON serialization: + tagsMetricsPairList = Collections.singletonList(new TagsMetricsPair(tagMap, metricMap)); + records.put(recordName, tagsMetricsPairList); + } + + for (final MetricsTag metricsTag: record.tags()) { + // NB: may ignore pre-defined tags (like "Context" or "Hostname") + // to provide backwards compatibility with the + // legacy servlet: + if (!filterOutMetricsSystemTags + || metricsTag.info().getClass() != MsInfo.class) { + String tagValue = metricsTag.value(); + if (tagValue == null) { + tagValue = ""; + } + tagsMetricsPairList.get(0).tagMap.put(metricsTag.name(), tagValue); + } + } + + for (AbstractMetric metric: record.metrics()) { + tagsMetricsPairList.get(0).metricMap.put(metric.name(), metric.value()); + } + } + + protected synchronized final Map>> getMetricsMap() { + final Map>> result = metricsMap; + clear(); + return result; + } + + @Override + public void flush() { + // noop + } + + /* + * clears the data stored in the sink + */ + protected synchronized void clear() { + // re-create the metrics accumulator: + metricsMap = new TreeMap>>(); + } + } + + /** + * A helper class to hold a TagMap and MetricMap. + */ + static class TagsMetricsPair implements JSON.Convertible { + final TreeMap tagMap; + final TreeMap metricMap; + + public TagsMetricsPair(TreeMap tagMap, TreeMap metricMap) { + this.tagMap = tagMap; + this.metricMap = metricMap; + } + + @Override + @SuppressWarnings("rawtypes") + public void fromJSON(Map map) { + throw new UnsupportedOperationException(); + } + + /** Converts to JSON by providing an array. */ + @Override + public void toJSON(Output out) { + out.add(new Object[] { tagMap, metricMap }); + } + } + + @Override + public void doGet(HttpServletRequest request, HttpServletResponse response) + throws ServletException, IOException { + + // Do the authorization + if (!HttpServer.isInstrumentationAccessAllowed(getServletContext(), + request, response)) { + return; + } + + final Map>> metricsMap = + makeMap(); + + final String format = request.getParameter("format"); + final PrintWriter out = response.getWriter(); + if ("json".equals(format)) { + response.setContentType("application/json; charset=utf-8"); + try { + printJson(out, metricsMap); + } finally { + out.close(); + } + } else { + try { + printMap(out, metricsMap); + } finally { + out.close(); + } + } + } + + @VisibleForTesting + Map>> makeMap() { + // drop the metrics to sinks: + metricsSystem.publishMetricsNow(); + // take the collected metrics data: + final Map>> metricsMap + = servletSink.getMetricsMap(); + return metricsMap; + } + + @VisibleForTesting + void printJson(PrintWriter out, Map>> metricsMap) { + // Uses Jetty's built-in JSON support to convert the map into JSON. + out.print(new JSON().toJSON(metricsMap)); + } + + /** + * Prints metrics data in a multi-line text form. + */ + @VisibleForTesting + void printMap(PrintWriter out, Map>> map) { + for (Map.Entry>> context : map.entrySet()) { + out.println(context.getKey()); + for (Map.Entry> record : context.getValue().entrySet()) { + indent(out, 1); + out.println(record.getKey()); + for (TagsMetricsPair pair : record.getValue()) { + indent(out, 2); + // Prints tag values in the form "{key=value,key=value}:" + out.print("{"); + boolean first = true; + for (Map.Entry tagValue : pair.tagMap.entrySet()) { + if (first) { + first = false; + } else { + out.print(","); + } + out.print(tagValue.getKey()); + out.print("="); + out.print(tagValue.getValue().toString()); + } + out.println("}:"); + + // Now print metric values, one per line + for (Map.Entry metricValue : + pair.metricMap.entrySet()) { + indent(out, 3); + out.print(metricValue.getKey()); + out.print("="); + out.println(metricValue.getValue().toString()); + } + } + } + } + } + + private void indent(PrintWriter out, int indent) { + for (int i = 0; i < indent; ++i) { + out.append(" "); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics/TestMetricsServlet.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics/TestMetricsServlet.java index ec54f596869..a15d94f76a0 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics/TestMetricsServlet.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics/TestMetricsServlet.java @@ -27,11 +27,11 @@ import junit.framework.TestCase; -import org.apache.hadoop.metrics.MetricsServlet.TagsMetricsPair; import org.apache.hadoop.metrics.spi.NoEmitMetricsContext; import org.apache.hadoop.metrics.spi.OutputRecord; import org.mortbay.util.ajax.JSON; +@SuppressWarnings("deprecation") public class TestMetricsServlet extends TestCase { MetricsContext nc1; MetricsContext nc2; @@ -68,11 +68,12 @@ public void setUp() throws IOException { assertEquals(1, outputRecords.size()); outputRecord = outputRecords.iterator().next(); } - - + @SuppressWarnings("deprecation") public void testTagsMetricsPair() throws IOException { - TagsMetricsPair pair = new TagsMetricsPair(outputRecord.getTagsCopy(), + org.apache.hadoop.metrics.MetricsServlet.TagsMetricsPair pair + = new org.apache.hadoop.metrics.MetricsServlet + .TagsMetricsPair(outputRecord.getTagsCopy(), outputRecord.getMetricsCopy()); String s = JSON.toString(pair); assertEquals( @@ -80,23 +81,31 @@ public void testTagsMetricsPair() throws IOException { "{\"testMetric1\":1,\"testMetric2\":33}]", s); } + @SuppressWarnings("deprecation") public void testGetMap() throws IOException { - MetricsServlet servlet = new MetricsServlet(); - Map>> m = servlet.makeMap(contexts); + org.apache.hadoop.metrics.MetricsServlet servlet + = new org.apache.hadoop.metrics.MetricsServlet(); + Map>> m + = servlet.makeMap(contexts); assertEquals("Map missing contexts", 2, m.size()); assertTrue(m.containsKey("test1")); - Map> m2 = m.get("test1"); + Map> m2 + = m.get("test1"); assertEquals("Missing records", 1, m2.size()); assertTrue(m2.containsKey("testRecord")); assertEquals("Wrong number of tags-values pairs.", 1, m2.get("testRecord").size()); } + @SuppressWarnings("deprecation") public void testPrintMap() throws IOException { StringWriter sw = new StringWriter(); PrintWriter out = new PrintWriter(sw); - MetricsServlet servlet = new MetricsServlet(); + org.apache.hadoop.metrics.MetricsServlet servlet + = new org.apache.hadoop.metrics.MetricsServlet(); servlet.printMap(out, servlet.makeMap(contexts)); String EXPECTED = "" + diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMetricsServlet2.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMetricsServlet2.java new file mode 100644 index 00000000000..896429f501a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMetricsServlet2.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.lib; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.annotation.Metric.Type; +import org.apache.hadoop.metrics2.lib.MetricsServlet2.TagsMetricsPair; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class TestMetricsServlet2 { + + // The 2 sample metric classes: + @Metrics(name="testRecord", context="test1") + static class MyMetrics1 { + @Metric(value={"testTag1", ""}, type=Type.TAG) + String testTag1() { return "testTagValue1"; } + + @Metric(value={"testTag2", ""}, type=Type.TAG) + String gettestTag2() { return "testTagValue2"; } + + @Metric(value={"testMetric1", "An integer gauge"},always=true) + MutableGaugeInt testMetric1; + + @Metric(value={"testMetric2", "An integer gauge"},always=true) + MutableGaugeInt testMetric2; + + public MyMetrics1 registerWith(MetricsSystem ms) { + return ms.register("m1", null, this); + } + } + + @Metrics(name="testRecord", context="test2") + static class MyMetrics2 { + // NB: new metrics system does not allow @Metrics classes + // without any tag or metric inside, so, let's put + // a tag there: + @Metric(value={"testTag22", ""}, type=Type.TAG) + String testTag1() { return "testTagValue22"; } + + public MyMetrics2 registerWith(MetricsSystem ms) { + return ms.register("m2", null, this); + } + } + + private static class MetricsServlet2NoMetricsSystemContext + extends MetricsServlet2 { + @Override + protected ServletSink createServletSink() { + return new ServletSink(true) { + @Override + public void putMetrics(MetricsRecord record) { + // NB: ignore the default metricssystem context metrics for the test: + if (!"metricssystem".equals(record.context())) { + super.putMetrics(record); + } + } + }; + } + }; + + private static MyMetrics1 myMetrics1; + private static MyMetrics2 myMetrics2; + private final MetricsServlet2 metricsServlet2 + = new MetricsServlet2NoMetricsSystemContext(); + + /** + * Initializes, for testing, two NoEmitMetricsContext's, and adds one value + * to the first of them. + */ + @BeforeClass + public static void beforeClass() { + final MetricsSystem ms = DefaultMetricsSystem.initialize("servlettest"); + + myMetrics1 = new MyMetrics1().registerWith(ms); + myMetrics1.testMetric1.set(1); + myMetrics1.testMetric2.set(33); + + myMetrics2 = new MyMetrics2().registerWith(ms); + } + + private void changeMetricsImpl() { + myMetrics1.testMetric1.incr(); + myMetrics1.testMetric1.decr(); + + myMetrics1.testMetric2.incr(); + myMetrics1.testMetric2.decr(); + } + + @Before + public void before() { + // NB: need to change the metrics, see HADOOP-9269: + changeMetricsImpl(); + } + + @AfterClass + public static void afterClass() { + DefaultMetricsSystem.shutdown(); + } + + @Test + public void testGetMap() { + Map>> m = metricsServlet2.makeMap(); + assertEquals("Map missing contexts", 2, m.size()); + assertTrue(m.containsKey("test1")); + + Map> m2 = m.get("test1"); + + assertEquals("Missing records", 1, m2.size()); + assertTrue(m2.containsKey("testRecord")); + assertEquals("Wrong number of tags-values pairs.", 1, m2.get("testRecord").size()); + + TagsMetricsPair pair = m2.get("testRecord").get(0); + assertEquals(2, pair.metricMap.size()); + } + + @Test + public void testPrintMap() { + testPrintMapImpl(metricsServlet2); + } + + private void testPrintMapImpl(final MetricsServlet2 servlet) { + StringWriter sw = new StringWriter(); + PrintWriter out = new PrintWriter(sw); + servlet.printMap(out, servlet.makeMap()); + out.close(); + final String actual = sw.toString(); + String EXPECTED = "" + + "test1\n" + + " testRecord\n" + + " {testTag1=testTagValue1,testTag2=testTagValue2}:\n" + + " testMetric1=1\n" + + " testMetric2=33\n" + + "test2\n" + + " testRecord\n"+ + " {testTag22=testTagValue22}:\n"; + assertEquals(EXPECTED, actual); + } + + @Test + public void testPrintJson() { + Map>> m = metricsServlet2.makeMap(); + StringWriter sw = new StringWriter(); + PrintWriter out = new PrintWriter(sw); + metricsServlet2.printJson(out, m); + out.close(); + final String actualJson = sw.toString(); + assertEquals( + "{\"test1\":{\"testRecord\":[[{\"testTag1\":\"testTagValue1\"," + + "\"testTag2\":\"testTagValue2\"},{\"testMetric1\":1," + + "\"testMetric2\":33}]]}," + + "\"test2\":{\"testRecord\":" + + "[[{\"testTag22\":\"testTagValue22\"},{}]]}}", actualJson); + } + + /* + * See if we can create another instance of the servlet, + * and both the instances will work correctly. + */ + @Test + public void testSeveralMetricsServlet2Instances() { + MetricsServlet2 metricsServlet22 + = new MetricsServlet2NoMetricsSystemContext(); + // NB: need to change the metrics (see HADOOP-9269) + before(); + testPrintMapImpl(metricsServlet2); + testPrintMapImpl(metricsServlet22); + + // change again: + before(); + // (this time sinks are queried in reversed order): + testPrintMapImpl(metricsServlet22); + testPrintMapImpl(metricsServlet2); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java index 2bb7dc83655..714de936502 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java @@ -94,6 +94,7 @@ public class LocalJobRunner implements ClientProtocol { private static final String jobDir = "localRunner/"; + @Override public long getProtocolVersion(String protocol, long clientVersion) { return ClientProtocol.versionID; } @@ -135,6 +136,7 @@ private class Job extends Thread implements TaskUmbilicalProtocol { private LocalDistributedCacheManager localDistributedCacheManager; + @Override public long getProtocolVersion(String protocol, long clientVersion) { return TaskUmbilicalProtocol.versionID; } @@ -217,6 +219,7 @@ public MapTaskRunnable(TaskSplitMetaInfo info, int taskId, JobID jobId, this.localConf = new JobConf(job); } + @Override public void run() { try { TaskAttemptID mapId = new TaskAttemptID(new TaskID( @@ -575,8 +578,10 @@ public void run() { // TaskUmbilicalProtocol methods + @Override public JvmTask getTask(JvmContext context) { return null; } + @Override public synchronized boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) throws IOException, InterruptedException { // Serialize as we would if distributed in order to make deep copy @@ -648,30 +653,36 @@ public synchronized Counters getCurrentCounters() { * Task is reporting that it is in commit_pending * and it is waiting for the commit Response */ + @Override public void commitPending(TaskAttemptID taskid, TaskStatus taskStatus) throws IOException, InterruptedException { statusUpdate(taskid, taskStatus); } + @Override public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) { // Ignore for now } + @Override public void reportNextRecordRange(TaskAttemptID taskid, SortedRanges.Range range) throws IOException { LOG.info("Task " + taskid + " reportedNextRecordRange " + range); } + @Override public boolean ping(TaskAttemptID taskid) throws IOException { return true; } + @Override public boolean canCommit(TaskAttemptID taskid) throws IOException { return true; } + @Override public void done(TaskAttemptID taskId) throws IOException { int taskIndex = mapIds.indexOf(taskId); if (taskIndex >= 0) { // mapping @@ -681,20 +692,24 @@ public void done(TaskAttemptID taskId) throws IOException { } } + @Override public synchronized void fsError(TaskAttemptID taskId, String message) throws IOException { LOG.fatal("FSError: "+ message + "from task: " + taskId); } + @Override public void shuffleError(TaskAttemptID taskId, String message) throws IOException { LOG.fatal("shuffleError: "+ message + "from task: " + taskId); } + @Override public synchronized void fatalError(TaskAttemptID taskId, String msg) throws IOException { LOG.fatal("Fatal: "+ msg + "from task: " + taskId); } + @Override public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId, int fromEventId, int maxLocs, TaskAttemptID id) throws IOException { return new MapTaskCompletionEventsUpdate( @@ -725,6 +740,7 @@ public synchronized org.apache.hadoop.mapreduce.JobID getNewJobID() { return new org.apache.hadoop.mapreduce.JobID("local" + randid, ++jobid); } + @Override public org.apache.hadoop.mapreduce.JobStatus submitJob( org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir, Credentials credentials) throws IOException { @@ -734,11 +750,13 @@ public org.apache.hadoop.mapreduce.JobStatus submitJob( } + @Override public void killJob(org.apache.hadoop.mapreduce.JobID id) { jobs.get(JobID.downgrade(id)).killed = true; jobs.get(JobID.downgrade(id)).interrupt(); } + @Override public void setJobPriority(org.apache.hadoop.mapreduce.JobID id, String jp) throws IOException { throw new UnsupportedOperationException("Changing job priority " + @@ -746,17 +764,20 @@ public void setJobPriority(org.apache.hadoop.mapreduce.JobID id, } /** Throws {@link UnsupportedOperationException} */ + @Override public boolean killTask(org.apache.hadoop.mapreduce.TaskAttemptID taskId, boolean shouldFail) throws IOException { throw new UnsupportedOperationException("Killing tasks in " + "LocalJobRunner is not supported"); } + @Override public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports( org.apache.hadoop.mapreduce.JobID id, TaskType type) { return new org.apache.hadoop.mapreduce.TaskReport[0]; } + @Override public org.apache.hadoop.mapreduce.JobStatus getJobStatus( org.apache.hadoop.mapreduce.JobID id) { Job job = jobs.get(JobID.downgrade(id)); @@ -766,6 +787,7 @@ public org.apache.hadoop.mapreduce.JobStatus getJobStatus( return null; } + @Override public org.apache.hadoop.mapreduce.Counters getJobCounters( org.apache.hadoop.mapreduce.JobID id) { Job job = jobs.get(JobID.downgrade(id)); @@ -773,10 +795,12 @@ public org.apache.hadoop.mapreduce.Counters getJobCounters( return new org.apache.hadoop.mapreduce.Counters(job.getCurrentCounters()); } + @Override public String getFilesystemName() throws IOException { return fs.getUri().toString(); } + @Override public ClusterMetrics getClusterMetrics() { int numMapTasks = map_tasks.get(); int numReduceTasks = reduce_tasks.get(); @@ -784,10 +808,12 @@ public ClusterMetrics getClusterMetrics() { numReduceTasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0); } + @Override public JobTrackerStatus getJobTrackerStatus() { return JobTrackerStatus.RUNNING; } + @Override public long getTaskTrackerExpiryInterval() throws IOException, InterruptedException { return 0; } @@ -796,6 +822,7 @@ public long getTaskTrackerExpiryInterval() throws IOException, InterruptedExcept * Get all active trackers in cluster. * @return array of TaskTrackerInfo */ + @Override public TaskTrackerInfo[] getActiveTrackers() throws IOException, InterruptedException { return new TaskTrackerInfo[0]; @@ -805,17 +832,20 @@ public TaskTrackerInfo[] getActiveTrackers() * Get all blacklisted trackers in cluster. * @return array of TaskTrackerInfo */ + @Override public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException, InterruptedException { return new TaskTrackerInfo[0]; } + @Override public TaskCompletionEvent[] getTaskCompletionEvents( org.apache.hadoop.mapreduce.JobID jobid , int fromEventId, int maxEvents) throws IOException { return TaskCompletionEvent.EMPTY_ARRAY; } + @Override public org.apache.hadoop.mapreduce.JobStatus[] getAllJobs() {return null;} @@ -823,6 +853,7 @@ public TaskCompletionEvent[] getTaskCompletionEvents( * Returns the diagnostic information for a particular task in the given job. * To be implemented */ + @Override public String[] getTaskDiagnostics( org.apache.hadoop.mapreduce.TaskAttemptID taskid) throws IOException{ return new String [0]; @@ -831,6 +862,7 @@ public String[] getTaskDiagnostics( /** * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getSystemDir() */ + @Override public String getSystemDir() { Path sysDir = new Path( conf.get(JTConfig.JT_SYSTEM_DIR, "/tmp/hadoop/mapred/system")); @@ -840,6 +872,7 @@ public String getSystemDir() { /** * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getQueueAdmins(String) */ + @Override public AccessControlList getQueueAdmins(String queueName) throws IOException { return new AccessControlList(" ");// no queue admins for local job runner } @@ -847,6 +880,7 @@ public AccessControlList getQueueAdmins(String queueName) throws IOException { /** * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getStagingAreaDir() */ + @Override public String getStagingAreaDir() throws IOException { Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT, "/tmp/hadoop/mapred/staging")); @@ -861,6 +895,7 @@ public String getStagingAreaDir() throws IOException { return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString(); } + @Override public String getJobHistoryDir() { return null; } @@ -988,5 +1023,4 @@ static String getLocalTaskDir(String user, String jobid, String taskid, return taskDir; } - } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java index aec70edefc2..8a4799b6439 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java @@ -17,15 +17,37 @@ */ package org.apache.hadoop.mapred; -import org.apache.hadoop.metrics.MetricsContext; -import org.apache.hadoop.metrics.MetricsRecord; -import org.apache.hadoop.metrics.MetricsUtil; -import org.apache.hadoop.metrics.Updater; -import org.apache.hadoop.metrics.jvm.JvmMetrics; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.impl.MsInfo; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.Interns; -@SuppressWarnings("deprecation") -class LocalJobRunnerMetrics implements Updater { - private final MetricsRecord metricsRecord; +class LocalJobRunnerMetrics implements MetricsSource { + private static final String CONTEXT = "mapred"; + private static final String RECORD_NAME = "jobtracker"; + + private static final MetricsInfo mapsLaunchedCountMI + = Interns.info("maps_launched", "Maps launched count."); + private static final MetricsInfo mapsCompletedCountMI + = Interns.info("maps_completed", "Maps completed count."); + + private static final MetricsInfo reducesLaunchedCountMI + = Interns.info("reduces_launched", "Reduces launched count."); + private static final MetricsInfo reducesCompletedCountMI + = Interns.info("reduces_completed", "Reduces completed count."); + + private static final MetricsInfo waitingMapsGaugeMI + = Interns.info( "waiting_maps", "Waiting maps gauge."); + private static final MetricsInfo waitingReducesGaugeMI + = Interns.info( "waiting_reduces", "Waiting reduces gauge."); + + private static int instanceCount = 0; + + private final String sessionId; private int numMapTasksLaunched = 0; private int numMapTasksCompleted = 0; @@ -34,31 +56,35 @@ class LocalJobRunnerMetrics implements Updater { private int numWaitingMaps = 0; private int numWaitingReduces = 0; + @SuppressWarnings("deprecation") public LocalJobRunnerMetrics(JobConf conf) { - String sessionId = conf.getSessionId(); - // Initiate JVM Metrics - JvmMetrics.init("JobTracker", sessionId); - // Create a record for map-reduce metrics - MetricsContext context = MetricsUtil.getContext("mapred"); - // record name is jobtracker for compatibility - metricsRecord = MetricsUtil.createRecord(context, "jobtracker"); - metricsRecord.setTag("sessionId", sessionId); - context.registerUpdater(this); + instanceCount++; + sessionId = conf.getSessionId(); + MetricsSystem metricsSystem = DefaultMetricsSystem.instance(); + // NB: instance count added to class name to avoid collision if + // the defaultMetricsSystem is not in mini-cluster mode, and several + // instances of this class are created: + metricsSystem.register(getClass().getName() + "-" + instanceCount, + "Metrics source for LocalJobRunner", this); } - - /** - * Since this object is a registered updater, this method will be called - * periodically, e.g. every 5 seconds. - */ - public void doUpdates(MetricsContext unused) { + + @Override + public void getMetrics(final MetricsCollector collector, boolean all) { + final MetricsRecordBuilder mrb = collector.addRecord(RECORD_NAME); + mrb.setContext(CONTEXT); + mrb.tag(MsInfo.SessionId, sessionId); synchronized (this) { - metricsRecord.incrMetric("maps_launched", numMapTasksLaunched); - metricsRecord.incrMetric("maps_completed", numMapTasksCompleted); - metricsRecord.incrMetric("reduces_launched", numReduceTasksLaunched); - metricsRecord.incrMetric("reduces_completed", numReduceTasksCompleted); - metricsRecord.incrMetric("waiting_maps", numWaitingMaps); - metricsRecord.incrMetric("waiting_reduces", numWaitingReduces); - + mrb.addGauge(mapsLaunchedCountMI, numMapTasksLaunched); + mrb.addGauge(mapsCompletedCountMI, numMapTasksCompleted); + + mrb.addGauge(reducesLaunchedCountMI, numReduceTasksLaunched); + mrb.addGauge(reducesCompletedCountMI, numReduceTasksCompleted); + + mrb.addGauge(waitingMapsGaugeMI, numWaitingMaps); + mrb.addGauge(waitingReducesGaugeMI, numWaitingReduces); + + mrb.endRecord(); + numMapTasksLaunched = 0; numMapTasksCompleted = 0; numReduceTasksLaunched = 0; @@ -66,7 +92,6 @@ public void doUpdates(MetricsContext unused) { numWaitingMaps = 0; numWaitingReduces = 0; } - metricsRecord.update(); } public synchronized void launchMap(TaskAttemptID taskAttemptID) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java index c1391fa6feb..9a0cb1ccdcb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java @@ -66,6 +66,7 @@ public class ReduceTask extends Task { WritableFactories.setFactory (ReduceTask.class, new WritableFactory() { + @Override public Writable newInstance() { return new ReduceTask(); } }); } @@ -259,11 +260,13 @@ public SkippingReduceValuesIterator(RawKeyValueIterator in, mayBeSkip(); } + @Override public void nextKey() throws IOException { super.nextKey(); mayBeSkip(); } + @Override public boolean more() { return super.more() && hasNext; } @@ -300,7 +303,6 @@ private void mayBeSkip() throws IOException { reportNextRecordRange(umbilical, grpIndex); } - @SuppressWarnings("unchecked") private void writeSkippedRec(KEY key, VALUE value) throws IOException{ if(skipWriter==null) { Path skipDir = SkipBadRecords.getSkipOutputPath(conf); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java index 92c69a60a5a..00d82922138 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java @@ -23,67 +23,113 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.metrics.MetricsContext; -import org.apache.hadoop.metrics.MetricsRecord; -import org.apache.hadoop.metrics.MetricsUtil; -import org.apache.hadoop.metrics.Updater; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.impl.MsInfo; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.Interns; @InterfaceAudience.LimitedPrivate({"MapReduce"}) @InterfaceStability.Unstable -public class ShuffleClientMetrics implements Updater { +public class ShuffleClientMetrics implements MetricsSource { - private MetricsRecord shuffleMetrics = null; + private static final String CONTEXT = "mapred"; + private static final String RECORD_NAME = "shuffleInput"; + + private static final MetricsInfo tagUser = Interns.info("user", ""); + private static final MetricsInfo tagJobName = Interns.info("jobName", ""); + private static final MetricsInfo tagJobId = Interns.info("jobId", ""); + private static final MetricsInfo tagTaskId = Interns.info("taskId", ""); + + private static final MetricsInfo shuffleInputBytesMI + = Interns.info("shuffle_input_bytes", "Shuffle input bytes."); + private static final MetricsInfo shuffleFailedFetchesMI + = Interns.info("shuffle_failed_fetches", "Shuffle failed fetches."); + private static final MetricsInfo shuffleSuccessFetchesMI + = Interns.info("shuffle_success_fetches", "Shuffle success fetches."); + private static final MetricsInfo shuffleFetchersBusyPercentMI + = Interns.info("shuffle_fetchers_busy_percent", "Shuffle fetches busy percent."); + + private static int instanceCount = 0; + + // metrics: private int numFailedFetches = 0; private int numSuccessFetches = 0; private long numBytes = 0; private int numThreadsBusy = 0; + private final int numCopiers; - ShuffleClientMetrics(TaskAttemptID reduceId, JobConf jobConf) { - this.numCopiers = jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5); - - MetricsContext metricsContext = MetricsUtil.getContext("mapred"); - this.shuffleMetrics = - MetricsUtil.createRecord(metricsContext, "shuffleInput"); - this.shuffleMetrics.setTag("user", jobConf.getUser()); - this.shuffleMetrics.setTag("jobName", jobConf.getJobName()); - this.shuffleMetrics.setTag("jobId", reduceId.getJobID().toString()); - this.shuffleMetrics.setTag("taskId", reduceId.toString()); - this.shuffleMetrics.setTag("sessionId", jobConf.getSessionId()); - metricsContext.registerUpdater(this); + private final TaskAttemptID reduceId; + private final JobConf jobConf; + + ShuffleClientMetrics(TaskAttemptID reduceId0, JobConf jobConf0) { + instanceCount++; + + reduceId = reduceId0; + jobConf = jobConf0; + + numCopiers = jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5); + + final MetricsSystem metricsSystem = DefaultMetricsSystem.instance(); + // NB: instance count added to class name to avoid collision if + // the defaultMetricsSystem is not in mini-cluster mode, and several + // instances of this class are created: + metricsSystem.register(getClass().getName() + "-" + instanceCount, + "Metrics source ShuffleClient.", this); } + public synchronized void inputBytes(long numBytes) { this.numBytes += numBytes; } + public synchronized void failedFetch() { ++numFailedFetches; } + public synchronized void successFetch() { ++numSuccessFetches; } + public synchronized void threadBusy() { ++numThreadsBusy; } + public synchronized void threadFree() { --numThreadsBusy; } - public void doUpdates(MetricsContext unused) { + + @Override + @SuppressWarnings("deprecation") + public void getMetrics(final MetricsCollector collector, boolean all) { + final MetricsRecordBuilder mrb = collector.addRecord(RECORD_NAME); + mrb.setContext(CONTEXT); + + mrb.tag(tagUser, jobConf.getUser()); + mrb.tag(tagJobName, jobConf.getJobName()); + mrb.tag(tagJobId, reduceId.getJobID().toString()); + mrb.tag(tagTaskId, reduceId.toString()); + mrb.tag(MsInfo.SessionId, jobConf.getSessionId()); + synchronized (this) { - shuffleMetrics.incrMetric("shuffle_input_bytes", numBytes); - shuffleMetrics.incrMetric("shuffle_failed_fetches", - numFailedFetches); - shuffleMetrics.incrMetric("shuffle_success_fetches", - numSuccessFetches); + mrb.addGauge(shuffleInputBytesMI, numBytes); + mrb.addGauge(shuffleFailedFetchesMI, numFailedFetches); + mrb.addGauge(shuffleSuccessFetchesMI, numSuccessFetches); + final float busyPercent; if (numCopiers != 0) { - shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", - 100*((float)numThreadsBusy/numCopiers)); + busyPercent = 100*((float)numThreadsBusy/numCopiers); } else { - shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 0); + busyPercent = 0f; } + mrb.addGauge(shuffleFetchersBusyPercentMI, busyPercent); + mrb.endRecord(); + numBytes = 0; numSuccessFetches = 0; numFailedFetches = 0; } - shuffleMetrics.update(); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java index 29640c8854b..c8afa848b03 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java @@ -40,13 +40,14 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; +import static org.junit.Assert.*; + import org.junit.Test; -import junit.framework.TestCase; /** * Stress tests for the LocalJobRunner */ -public class TestLocalRunner extends TestCase { +public class TestLocalRunner { private static final Log LOG = LogFactory.getLog(TestLocalRunner.class); @@ -68,6 +69,7 @@ private static class StressMapper // some code. public long exposedState; + @Override protected void setup(Context context) { // Get the thread num from the file number. FileSplit split = (FileSplit) context.getInputSplit(); @@ -80,6 +82,7 @@ protected void setup(Context context) { } /** Map method with different behavior based on the thread id */ + @Override public void map(LongWritable key, Text val, Context c) throws IOException, InterruptedException { @@ -92,6 +95,7 @@ public void map(LongWritable key, Text val, Context c) } } + @Override protected void cleanup(Context context) { // Output this here, to ensure that the incrementing done in map() // cannot be optimized away. @@ -102,6 +106,7 @@ protected void cleanup(Context context) { private static class CountingReducer extends Reducer { + @Override public void reduce(LongWritable key, Iterable vals, Context context) throws IOException, InterruptedException { long out = 0; @@ -115,6 +120,7 @@ public void reduce(LongWritable key, Iterable vals, Context context) private static class GCMapper extends Mapper { + @Override public void map(LongWritable key, Text val, Context c) throws IOException, InterruptedException { @@ -233,7 +239,6 @@ private void verifyOutput(Path outputPath) throws IOException { assertEquals("Incorrect count generated!", TOTAL_RECORDS, count); r.close(); - } /** @@ -309,6 +314,7 @@ public void testMultiMaps() throws Exception { final Thread toInterrupt = Thread.currentThread(); Thread interrupter = new Thread() { + @Override public void run() { try { Thread.sleep(120*1000); // 2m @@ -375,10 +381,12 @@ public void testInvalidMultiMapParallelism() throws Exception { /** An IF that creates no splits */ private static class EmptyInputFormat extends InputFormat { + @Override public List getSplits(JobContext context) { return new ArrayList(); } + @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) { return new EmptyRecordReader(); @@ -386,24 +394,30 @@ public RecordReader createRecordReader(InputSplit split, } private static class EmptyRecordReader extends RecordReader { + @Override public void initialize(InputSplit split, TaskAttemptContext context) { } + @Override public Object getCurrentKey() { return new Object(); } + @Override public Object getCurrentValue() { return new Object(); } + @Override public float getProgress() { return 0.0f; } + @Override public void close() { } + @Override public boolean nextKeyValue() { return false; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index 8a030952504..e2c24581eda 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -209,6 +209,7 @@ private void updateRunningTime() { } } + @Override public void getMetrics(MetricsCollector collector, boolean all) { updateRunningTime(); registry.snapshot(collector.addRecord(registry.info()), all);