From 8973d4792445a040ac203a1772010ed408e805e9 Mon Sep 17 00:00:00 2001 From: patsonluk Date: Fri, 11 Oct 2024 09:18:58 -0700 Subject: [PATCH] SAI-5163 : ConcurrentModificationException from metrics callers (#227) * Fixed ConcurrentModificationException triggered by concurrent write from AggregateMetricsApiCaller and read from CoresMetricsApiCaller on the missingCoreMetrics field Instead we will pass a req specific ResultContext, which will not be shared among threads * Added extra javadoc * Tweak unit test case testConcurrentCallers * ./gradlew tidy * Fixed weird indentation * Fixed javadoc --- .../servlet/PrometheusMetricsServlet.java | 99 ++++++++------ .../servlet/PrometheusMetricsServletTest.java | 121 +++++++++++++++--- 2 files changed, 167 insertions(+), 53 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/servlet/PrometheusMetricsServlet.java b/solr/core/src/java/org/apache/solr/servlet/PrometheusMetricsServlet.java index ee9628dd52f..406c78d1e4a 100644 --- a/solr/core/src/java/org/apache/solr/servlet/PrometheusMetricsServlet.java +++ b/solr/core/src/java/org/apache/solr/servlet/PrometheusMetricsServlet.java @@ -72,16 +72,14 @@ public final class PrometheusMetricsServlet extends BaseSolrServlet { private final List callers = getCallers(); private List getCallers() { - AggregateMetricsApiCaller aggregateMetricsApiCaller = new AggregateMetricsApiCaller(); return List.of( new GarbageCollectorMetricsApiCaller(), new MemoryMetricsApiCaller(), new OsMetricsApiCaller(), new ThreadMetricsApiCaller(), new StatusCodeMetricsApiCaller(), - aggregateMetricsApiCaller, - new CoresMetricsApiCaller( - Collections.unmodifiableList(aggregateMetricsApiCaller.missingCoreMetrics))); + new AggregateMetricsApiCaller(), + new CoresMetricsApiCaller()); } private final Map cacheMetricTypes = @@ -96,9 +94,13 @@ private List getCallers() { public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException, UnavailableException { List metrics = new ArrayList<>(); + ResultContext resultContext = new ResultContext(metrics); + AtomicInteger qTime = new AtomicInteger(); + // callers should be invoked sequentially in the same thread there could be dependencies among + // them for (MetricsApiCaller caller : callers) { - caller.call(qTime, metrics, request); + caller.call(qTime, resultContext, request); } getCompressingDirectoryPoolMetrics(metrics); getCircuitBreakerMetrics(metrics); @@ -289,7 +291,8 @@ static class GarbageCollectorMetricsApiCaller extends MetricsByPrefixApiCaller { "memory.pools.G1-Survivor-Space.used-after-gc":20971520}}} */ @Override - protected void handle(List results, JsonNode metrics) throws IOException { + protected void handle(ResultContext resultContext, JsonNode metrics) throws IOException { + List results = resultContext.resultMetrics; JsonNode parent = metrics.path("solr.jvm"); results.add( new PrometheusMetric( @@ -375,7 +378,8 @@ static class MemoryMetricsApiCaller extends MetricsByPrefixApiCaller { "memory.non-heap.used":93135560}}} */ @Override - protected void handle(List results, JsonNode metrics) throws IOException { + protected void handle(ResultContext resultContext, JsonNode metrics) throws IOException { + List results = resultContext.resultMetrics; JsonNode parent = metrics.path("solr.jvm"); results.add( new PrometheusMetric( @@ -430,7 +434,8 @@ static class OsMetricsApiCaller extends MetricsByPrefixApiCaller { "os.version":"10.15.7"}}} */ @Override - protected void handle(List results, JsonNode metrics) throws IOException { + protected void handle(ResultContext resultContext, JsonNode metrics) throws IOException { + List results = resultContext.resultMetrics; JsonNode parent = metrics.path("solr.jvm"); results.add( new PrometheusMetric( @@ -468,7 +473,8 @@ static class ThreadMetricsApiCaller extends MetricsByPrefixApiCaller { "threads.waiting.count":1756}}} */ @Override - protected void handle(List results, JsonNode metrics) throws IOException { + protected void handle(ResultContext resultContext, JsonNode metrics) throws IOException { + List results = resultContext.resultMetrics; JsonNode parent = metrics.path("solr.jvm"); results.add( new PrometheusMetric( @@ -534,7 +540,8 @@ static class StatusCodeMetricsApiCaller extends MetricsByPrefixApiCaller { ... */ @Override - protected void handle(List results, JsonNode metrics) throws IOException { + protected void handle(ResultContext resultContext, JsonNode metrics) throws IOException { + List results = resultContext.resultMetrics; JsonNode parent = metrics.path("solr.jetty"); results.add( new PrometheusMetric( @@ -826,7 +833,6 @@ static class AggregateMetricsApiCaller extends MetricsByPrefixApiCaller { }, ... */ - List missingCoreMetrics = new ArrayList<>(); AggregateMetricsApiCaller() { super("solr.node", buildPrefix(), buildProperty()); @@ -847,11 +853,12 @@ private static String buildProperty() { } @Override - protected void handle(List results, JsonNode metricsNode) throws IOException { - missingCoreMetrics.clear(); + protected void handle(ResultContext resultContext, JsonNode metricsNode) throws IOException { + List results = resultContext.resultMetrics; JsonNode nodeMetricNode = metricsNode.get("solr.node"); if (nodeMetricNode != null) { + resultContext.missingCoreMetrics = new ArrayList<>(); // explicitly set missing core metrics for (CoreMetric metric : CoreMetric.values()) { Number value = metric.property != null @@ -860,13 +867,12 @@ protected void handle(List results, JsonNode metricsNode) thro if (!INVALID_NUMBER.equals(value)) { results.add(metric.createPrometheusMetric(value, "[node aggregated]")); } else { - missingCoreMetrics.add(metric); + resultContext.missingCoreMetrics.add(metric); } } } else { log.warn( "Cannot find the solr.node metrics, going to fall back to getting metrics from all cores"); - missingCoreMetrics.addAll(Arrays.asList(CoreMetric.values())); } } } @@ -881,20 +887,15 @@ protected void handle(List results, JsonNode metricsNode) thro * concurrently with it. */ static class CoresMetricsApiCaller extends MetricsApiCaller { - private final List missingCoreMetricsView; - - CoresMetricsApiCaller(List missingCoreMetricsView) { - this.missingCoreMetricsView = missingCoreMetricsView; - } - @Override - protected String buildQueryString() { + protected String buildQueryString(ResultContext resultContext) { List prefixes = new ArrayList<>(); List properties = new ArrayList<>(); - for (CoreMetric missingMetric : missingCoreMetricsView) { - prefixes.add(missingMetric.key); - if (missingMetric.property != null) { - properties.add(missingMetric.property); + + for (CoreMetric targetMetric : getTargetCoreMetrics(resultContext)) { + prefixes.add(targetMetric.key); + if (targetMetric.property != null) { + properties.add(targetMetric.property); } } @@ -912,6 +913,15 @@ protected String buildQueryString() { propertyClause); } + private List getTargetCoreMetrics(ResultContext resultContext) { + List targetCoreMetrics = resultContext.missingCoreMetrics; + // if not explicitly defined by other callers, then just get everything + if (targetCoreMetrics == null) { + targetCoreMetrics = Arrays.asList(CoreMetric.values()); + } + return targetCoreMetrics; + } + /* "metrics":{ "solr.core.loadtest.shard1_1.replica_n8":{ @@ -939,9 +949,10 @@ protected String buildQueryString() { */ @Override - protected void handle(List results, JsonNode metrics) throws IOException { + protected void handle(ResultContext resultContext, JsonNode metrics) throws IOException { + List results = resultContext.resultMetrics; Map accumulative = new LinkedHashMap<>(); - for (CoreMetric missingCoreMetric : missingCoreMetricsView) { + for (CoreMetric missingCoreMetric : getTargetCoreMetrics(resultContext)) { for (JsonNode coreMetricNode : metrics) { Number val = missingCoreMetric.property != null @@ -1047,12 +1058,12 @@ static SolrDispatchFilter getSolrDispatchFilter(HttpServletRequest request) thro abstract static class MetricsApiCaller { // use HttpSolrCall to simulate a call to the metrics api. - void call( - AtomicInteger qTime, List results, HttpServletRequest originalRequest) + void call(AtomicInteger qTime, ResultContext resultContext, HttpServletRequest originalRequest) throws IOException, UnavailableException { SolrDispatchFilter filter = getSolrDispatchFilter(originalRequest); CoreContainer cores = filter.getCores(); - HttpServletRequest request = new MetricsApiRequest(originalRequest, buildQueryString()); + HttpServletRequest request = + new MetricsApiRequest(originalRequest, buildQueryString(resultContext)); MetricsApiResponse response = new MetricsApiResponse(); SolrDispatchFilter.Action action = new HttpSolrCall(filter, cores, request, response, false).call(); @@ -1064,10 +1075,10 @@ void call( action, SolrDispatchFilter.Action.RETURN)); } - handleResponse(qTime, results, response.getJsonNode()); + handleResponse(qTime, resultContext, response.getJsonNode()); } - void handleResponse(AtomicInteger qTime, List results, JsonNode response) + void handleResponse(AtomicInteger qTime, ResultContext resultContext, JsonNode response) throws IOException { JsonNode header = response.path("responseHeader"); int status = getNumber(header, "status").intValue(); @@ -1076,13 +1087,12 @@ void handleResponse(AtomicInteger qTime, List results, JsonNod String.format(Locale.ROOT, "metrics api response status is %d; expected 0.", status)); } qTime.addAndGet(getNumber(header, "QTime").intValue()); - handle(results, response.path("metrics")); + handle(resultContext, response.path("metrics")); } - protected abstract void handle(List results, JsonNode metrics) - throws IOException; + abstract void handle(ResultContext resultContext, JsonNode metrics) throws IOException; - protected abstract String buildQueryString(); + abstract String buildQueryString(ResultContext resultContext); } private abstract static class MetricsByPrefixApiCaller extends MetricsApiCaller { @@ -1099,7 +1109,7 @@ private abstract static class MetricsByPrefixApiCaller extends MetricsApiCaller } @Override - protected String buildQueryString() { + protected String buildQueryString(ResultContext resultContext) { String propertyClause = String.join( "&property=", @@ -1359,4 +1369,17 @@ public Locale getLocale() { return Locale.ROOT; } } + + /** + * Context that carries the metrics results as well as information that needs to be propagated in + * the MetricsApiCaller call chain + */ + static class ResultContext { + final List resultMetrics; + List missingCoreMetrics; + + ResultContext(List resultMetrics) { + this.resultMetrics = resultMetrics; + } + } } diff --git a/solr/core/src/test/org/apache/solr/servlet/PrometheusMetricsServletTest.java b/solr/core/src/test/org/apache/solr/servlet/PrometheusMetricsServletTest.java index 5f44cd655d9..b8c761e0cae 100644 --- a/solr/core/src/test/org/apache/solr/servlet/PrometheusMetricsServletTest.java +++ b/solr/core/src/test/org/apache/solr/servlet/PrometheusMetricsServletTest.java @@ -20,9 +20,12 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.common.util.SolrNamedThreadFactory; import org.junit.Assert; import org.junit.Test; @@ -36,9 +39,26 @@ static void assertMetricsApiCaller( int expectedQTime, String expectedOutput) throws Exception { + assertMetricsApiCaller(caller, null, json, expectedQTime, expectedOutput); + } + + static void assertMetricsApiCaller( + PrometheusMetricsServlet.MetricsApiCaller caller, + PrometheusMetricsServlet.ResultContext resultContext, + String json, + int expectedQTime, + String expectedOutput) + throws Exception { AtomicInteger qTime = new AtomicInteger(); - List metrics = new ArrayList<>(); - caller.handleResponse(qTime, metrics, OBJECT_MAPPER.readTree(json)); + + List metrics; + if (resultContext == null) { + metrics = new ArrayList<>(); + resultContext = new PrometheusMetricsServlet.ResultContext(metrics); + } else { + metrics = resultContext.resultMetrics; + } + caller.handleResponse(qTime, resultContext, OBJECT_MAPPER.readTree(json)); Assert.assertEquals(expectedQTime, qTime.get()); StringWriter writer = new StringWriter(); PrintWriter printWriter = new PrintWriter(writer); @@ -386,12 +406,7 @@ public void testCoresMetricsApiCaller() throws Exception { + "# HELP update_errors cumulative number of errors during updates across cores\n" + "# TYPE update_errors counter\n" + "update_errors 4\n"; - assertMetricsApiCaller( - new PrometheusMetricsServlet.CoresMetricsApiCaller( - Arrays.asList(PrometheusMetricsServlet.CoreMetric.values())), - json, - 14, - output); + assertMetricsApiCaller(new PrometheusMetricsServlet.CoresMetricsApiCaller(), json, 14, output); } @Test @@ -491,11 +506,87 @@ public void testCoresMetricsApiCallerMissingIndex() throws Exception { + "# HELP update_errors cumulative number of errors during updates across cores\n" + "# TYPE update_errors counter\n" + "update_errors 0\n"; - assertMetricsApiCaller( - new PrometheusMetricsServlet.CoresMetricsApiCaller( - Arrays.asList(PrometheusMetricsServlet.CoreMetric.values())), - json, - 25, - output); + assertMetricsApiCaller(new PrometheusMetricsServlet.CoresMetricsApiCaller(), json, 25, output); + } + + @Test + public void testConcurrentCallers() throws Exception { + String coreJson = + "{\n" + + " \"responseHeader\":{\n" + + " \"status\":0,\n" + + " \"QTime\":25},\n" + + " \"metrics\":{\n" + + " \"solr.core.loadtest.shard1_1.replica_n8\":{\n" + + " \"QUERY./get.requestTimes\":{\"count\":29},\n" + + " \"QUERY./get[shard].requestTimes\":{\"count\":1},\n" + + " \"UPDATE./update.requestTimes\":{\"count\":2},\n" + + " \"UPDATE./update[local].requestTimes\":{\"count\":1}}}}"; + String nodeJson = + "{\n" + + " \"responseHeader\":{\n" + + " \"status\":0,\n" + + " \"QTime\":25},\n" + + " \"metrics\":{\n" + + "\"solr.node\":{\n" + + " \"UPDATE./update.requestTimes\":{\"count\":20},\n" + + " \"UPDATE./update[local].requestTimes\":{\"count\":10}}}}"; + + String nodeOutput = + "# HELP distributed_requests_update cumulative number of distributed updates across cores[node aggregated]\n" + + "# TYPE distributed_requests_update counter\n" + + "distributed_requests_update 20\n" + + "# HELP local_requests_update cumulative number of local updates across cores[node aggregated]\n" + + "# TYPE local_requests_update counter\n" + + "local_requests_update 10\n"; + + // core CoresMetricsApiCaller output, it should contain both the nodeOutput and the new metrics + // added by the core + String coreOutput = + "# HELP distributed_requests_update cumulative number of distributed updates across cores[node aggregated]\n" + + "# TYPE distributed_requests_update counter\n" + + "distributed_requests_update 20\n" + + "# HELP local_requests_update cumulative number of local updates across cores[node aggregated]\n" + + "# TYPE local_requests_update counter\n" + + "local_requests_update 10\n" + + "# HELP top_level_requests_get cumulative number of top-level gets across cores\n" + + "# TYPE top_level_requests_get counter\n" + + "top_level_requests_get 29\n" + + "# HELP sub_shard_requests_get cumulative number of sub (spawned by re-distributing a top-level req) gets across cores\n" + + "# TYPE sub_shard_requests_get counter\n" + + "sub_shard_requests_get 1\n"; + + final int THREAD_COUNT = 100; + ExecutorService executorService = + ExecutorUtil.newMDCAwareFixedThreadPool( + THREAD_COUNT, new SolrNamedThreadFactory("test-concurrent-metric-callers")); + + List> futures = new ArrayList<>(); + for (int i = 0; i < THREAD_COUNT; i++) { + futures.add( + executorService.submit( + () -> { + PrometheusMetricsServlet.ResultContext resultContext = + new PrometheusMetricsServlet.ResultContext(new ArrayList<>()); + assertMetricsApiCaller( + new PrometheusMetricsServlet.AggregateMetricsApiCaller(), + resultContext, + nodeJson, + 25, + nodeOutput); + + assertMetricsApiCaller( + new PrometheusMetricsServlet.CoresMetricsApiCaller(), + resultContext, + coreJson, + 25, + coreOutput); + return null; + })); + } + + for (Future future : futures) { + future.get(); // this should not throw any concurrent exceptions or ComparisonFailure + } } }