diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java index fc2e4217bae79..0bffed2fd68ed 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java @@ -44,6 +44,7 @@ import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.http.HttpTransportSettings; +import org.opensearch.tasks.ResourceUsageStatsTCPropagator; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskThreadContextStatePropagator; @@ -126,7 +127,8 @@ public ThreadContext(Settings settings) { this.threadLocal = ThreadLocal.withInitial(() -> DEFAULT_CONTEXT); this.maxWarningHeaderCount = SETTING_HTTP_MAX_WARNING_HEADER_COUNT.get(settings); this.maxWarningHeaderSize = SETTING_HTTP_MAX_WARNING_HEADER_SIZE.get(settings).getBytes(); - this.propagators = new CopyOnWriteArrayList<>(List.of(new TaskThreadContextStatePropagator())); + this.propagators = new CopyOnWriteArrayList<>(List.of(new TaskThreadContextStatePropagator(), + new ResourceUsageStatsTCPropagator())); } public void registerThreadContextStatePropagator(final ThreadContextStatePropagator propagator) { diff --git a/server/src/main/java/org/opensearch/tasks/ResourceUsageStatsTCPropagator.java b/server/src/main/java/org/opensearch/tasks/ResourceUsageStatsTCPropagator.java new file mode 100644 index 0000000000000..bd19d75425d67 --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/ResourceUsageStatsTCPropagator.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.opensearch.common.util.concurrent.ThreadContextStatePropagator; + +import static org.opensearch.tasks.TaskResourceTrackingService.TASK_ID; + + +public class ResourceUsageStatsTCPropagator implements ThreadContextStatePropagator { + public static final String NODE_RESOURCE_STATS = "PERF_STATS"; + @Override + public Map transients(Map source) { + final Map transients = new HashMap<>(); + for(Map.Entry entry : source.entrySet()) { + if(entry.getKey().startsWith(NODE_RESOURCE_STATS)) { + // key starts with prefix + transients.put(entry.getKey(), entry.getValue()); + } + } + return transients; + } + + @Override + public Map headers(Map source) { + final Map headers = new HashMap<>(); + for(Map.Entry entry : source.entrySet()) { + if(entry.getKey().startsWith(NODE_RESOURCE_STATS)) { + // key starts with prefix + headers.put(entry.getKey(), entry.getValue().toString()); + } + } + return headers; + } +} diff --git a/server/src/main/java/org/opensearch/transport/ResourceUsageStatsReference.java b/server/src/main/java/org/opensearch/transport/ResourceUsageStatsReference.java new file mode 100644 index 0000000000000..5db9b5053c7b7 --- /dev/null +++ b/server/src/main/java/org/opensearch/transport/ResourceUsageStatsReference.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport; + +public class ResourceUsageStatsReference { + private String resourceUsageStats; + + public ResourceUsageStatsReference(String stats) { + this.resourceUsageStats = stats; + } + + public String getResourceUsageStats() { + return resourceUsageStats; + } + + public void setResourceUsageStats(String stats) { + this.resourceUsageStats = new String(stats); + } + + @Override + public String toString() { + return this.resourceUsageStats; + } + +} diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index 84f7eedd49838..b59395441c56e 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -211,6 +211,53 @@ public TransportService( ); } + public TransportService( + Settings settings, + Transport transport, + ThreadPool threadPool, + TransportInterceptor transportInterceptor, + Function localNodeFactory, + @Nullable ClusterSettings clusterSettings, + Set taskHeaders, + Tracer tracer) { + this( + settings, + transport, + threadPool, + transportInterceptor, + localNodeFactory, + clusterSettings, + taskHeaders, + new ClusterConnectionManager(settings, transport), + tracer, + null + ); + + } + public TransportService( + Settings settings, + Transport transport, + ThreadPool threadPool, + TransportInterceptor transportInterceptor, + Function localNodeFactory, + @Nullable ClusterSettings clusterSettings, + Set taskHeaders, + ConnectionManager connectionManager, + Tracer tracer) { + this( + settings, + transport, + threadPool, + transportInterceptor, + localNodeFactory, + clusterSettings, + taskHeaders, + new ClusterConnectionManager(settings, transport), + tracer, + null + ); + } + public TransportService( Settings settings, Transport transport, @@ -932,62 +979,23 @@ public String toString() { } private void addResourceUsageStatsToThreadContext(String action) { - //if(action.startsWith("indices:data/read/search")) { - if (resourceUsageCollectorService.getLocalNodeStatistics().isPresent()) { - threadPool.getThreadContext().addResponseHeader("PERF_STATS", - resourceUsageCollectorService.getLocalNodeStatistics().get().toString()); - } - //} - } - - public final void sendRequest( - final Transport.Connection connection, - final String action, - final TransportRequest request, - final TransportRequestOptions options, - final TransportResponseHandler handler, - final boolean shouldAddResourceUsageStats - ) { - final TransportResponseHandler delegate; - if(shouldAddResourceUsageStats) { - delegate = new TransportResponseHandler() { - @Override - public void handleResponse(T response) { - if(resourceUsageCollectorService.getLocalNodeStatistics().isPresent()) { - threadPool.getThreadContext().addResponseHeader("PERF_STATS", - resourceUsageCollectorService.getLocalNodeStatistics().get().toString()); - } - handler.handleResponse(response); - } - - @Override - public void handleException(TransportException exp) { - if(resourceUsageCollectorService.getLocalNodeStatistics().isPresent()) { - threadPool.getThreadContext().addResponseHeader("PERF_STATS", - resourceUsageCollectorService.getLocalNodeStatistics().get().toString()); - } - handler.handleException(exp); - } - - @Override - public String executor() { - return handler.executor(); - } - - @Override - public T read(StreamInput in) throws IOException { - return handler.read(in); - } - - @Override - public String toString() { - return getClass().getName() + "/[" + action + "]:" + handler.toString(); + if (resourceUsageCollectorService.getLocalNodeStatistics().isPresent()) { + try { + ResourceUsageStatsReference statsReference = threadPool.getThreadContext() + .getTransient("PERF_STATS" + localNode.getId()); + if(statsReference != null) { + statsReference.setResourceUsageStats(resourceUsageCollectorService.getLocalNodeStatistics().get().toString()); + } else { + threadPool.getThreadContext().putTransient("PERF_STATS" + localNode.getId(), + new ResourceUsageStatsReference(resourceUsageCollectorService.getLocalNodeStatistics().get().toString())); } - }; - } else { - delegate = handler; + } catch (Exception e) { + logger.info("===EXCEPTION=== {} ===action=== {}", e.getMessage(), action); + } + // Todo : remove this , added this for asserting response equaling request + threadPool.getThreadContext().addResponseHeader("PERF_STATS" + localNode.getId(), + resourceUsageCollectorService.getLocalNodeStatistics().get().toString()); } - sendRequest(connection, action, request, options, delegate); } /** @@ -1319,24 +1327,27 @@ public void onRequestReceived(long requestId, String action) { if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) { tracerLog.trace("[{}][{}] received request", requestId, action); } - addStatsToResourceUsageCollectorService(); + addStatsToResourceUsageCollectorServiceFromRequestHeaders(); messageListener.onRequestReceived(requestId, action); } private void addStatsToResourceUsageCollectorService() { try { Map> responseHeaders = threadPool.getThreadContext().getResponseHeaders(); + if (responseHeaders.size() > 0) { List perfStats = responseHeaders.get("PERF_STATS"); + if(perfStats.size() == 0) return; // nodeid:111113131313,11.0,11.0 // NodeResourceUsageStats[aaxnzZb7R3KdRqjqXfv8SQ](Timestamp: 1699253278365, CPU utilization percent: 3.1, Memory utilization percent: 25.0) StringBuilder sb = new StringBuilder(); - String nodeId = perfStats.get(0).substring(0, perfStats.get(0).indexOf(':') + 1); + String nodeId = perfStats.get(0).substring(0, perfStats.get(0).indexOf(':')); + if(nodeId.length() == 0) if (resourceUsageCollectorService.getNodeStatistics(nodeId).isPresent()) { long timestamp = resourceUsageCollectorService.getNodeStatistics(nodeId).get().getTimestamp(); if (System.currentTimeMillis() - timestamp < 1000) { - logger.warn("Node resource usage stats is updated recently - so skipping"); + logger.info("Node resource usage stats is updated recently - so skipping"); } else { String[] parse = perfStats.get(0).split(":"); String[] parse1 = parse[1].split(","); @@ -1344,7 +1355,7 @@ private void addStatsToResourceUsageCollectorService() { String cpu = parse1[1]; String memory = parse1[2]; resourceUsageCollectorService.collectNodeResourceUsageStats(nodeId, Long.valueOf(datatimestamp), Double.valueOf(cpu), Double.valueOf(memory)); - //logger.warn("Updates stats"); + logger.info("Updates stats"); } } else { String[] parse = perfStats.get(0).split(":"); @@ -1353,15 +1364,48 @@ private void addStatsToResourceUsageCollectorService() { String cpu = parse1[1]; String memory = parse1[2]; resourceUsageCollectorService.collectNodeResourceUsageStats(nodeId, Long.valueOf(datatimestamp), Double.valueOf(cpu), Double.valueOf(memory)); - logger.warn("added stats"); + logger.info("added stats"); } - // String[] parse = responseHeaders.get("PERF_STATS").get(0).split(":")[1].split("NodeResourceUsageStats\\[")[1].split("]"); - // String nodeId = parse[0]; - // String[] parse1 = parse[1].split("\\(")[1].split(": "); - // String timestamp = parse1[1].split(",")[0]; - // String cpu = parse1[2].split(",")[0]; - // String memory = parse1[3].split("\\)")[0]; + } + } catch(Exception e){ + logger.warn("Adding stats failed : ", e); + } + } + + private void addStatsToResourceUsageCollectorServiceFromRequestHeaders() { + try { + for(Map.Entry entry : threadPool.getThreadContext().getHeaders().entrySet()) { + if(entry.getKey().contains("PERF_STATS")) { + String perfStats = entry.getValue(); + assert(threadPool.getThreadContext().getResponseHeaders().get(entry.getKey()).contains(entry.getValue())); + String nodeId = perfStats.substring(0, perfStats.indexOf(':')); + if (resourceUsageCollectorService.getNodeStatistics(nodeId).isPresent()) { + long timestamp = resourceUsageCollectorService.getNodeStatistics(nodeId).get().getTimestamp(); + if (System.currentTimeMillis() - timestamp < 1000) { + logger.info("Node resource usage stats is updated recently - so skipping"); + } else { + String[] parse = perfStats.split(":"); + String[] parse1 = parse[1].split(","); + String datatimestamp = parse1[0]; + String cpu = parse1[1]; + String memory = parse1[2]; + resourceUsageCollectorService.collectNodeResourceUsageStats(nodeId, + Long.valueOf(datatimestamp), Double.valueOf(cpu), Double.valueOf(memory)); + logger.info("Updated stats"); + } + } else { + String[] parse = perfStats.split(":"); + String[] parse1 = parse[1].split(","); + String datatimestamp = parse1[0]; + String cpu = parse1[1]; + String memory = parse1[2]; + resourceUsageCollectorService.collectNodeResourceUsageStats(nodeId, + Long.valueOf(datatimestamp), + Double.valueOf(cpu), Double.valueOf(memory)); + logger.info("added stats"); + } + } } } catch(Exception e){ logger.warn("Adding stats failed : ", e); diff --git a/server/src/test/java/org/opensearch/common/util/concurrent/ThreadContextTests.java b/server/src/test/java/org/opensearch/common/util/concurrent/ThreadContextTests.java index a0531c76bf897..9196251ae5950 100644 --- a/server/src/test/java/org/opensearch/common/util/concurrent/ThreadContextTests.java +++ b/server/src/test/java/org/opensearch/common/util/concurrent/ThreadContextTests.java @@ -60,9 +60,18 @@ public void testStashContext() { assertEquals("bar", threadContext.getHeader("foo")); assertEquals(Integer.valueOf(1), threadContext.getTransient("ctx.foo")); assertEquals("1", threadContext.getHeader("default")); + threadContext.addResponseHeader("resp", "val"); + threadContext.putTransient("PERF_STATS_NODE1", "abc"); + threadContext.addResponseHeader("resp", "val1"); + threadContext.putHeader("PERF", "1"); + assertEquals("val1", threadContext.getResponseHeaders().get("resp").get(1)); + //threadContext.putTransient("PERF_STATS_NODE1", "cde"); try (ThreadContext.StoredContext ctx = threadContext.stashContext()) { assertNull(threadContext.getHeader("foo")); + assertNull(threadContext.getHeader("PERF")); assertNull(threadContext.getTransient("ctx.foo")); + assertNotNull(threadContext.getTransient("PERF_STATS_NODE1")); + assertNull(threadContext.getResponseHeaders().get("resp")); assertEquals("1", threadContext.getHeader("default")); } @@ -463,9 +472,13 @@ public void testSerializeInDifferentContextNoDefaults() throws IOException { threadContext.putHeader("foo", "bar"); threadContext.putTransient("ctx.foo", 1); + // This is part of propagators + threadContext.putTransient("PERF_STATS", "abc"); + assertEquals("bar", threadContext.getHeader("foo")); assertNotNull(threadContext.getTransient("ctx.foo")); assertNull(threadContext.getHeader("default")); + //threadContext.stashContext(); threadContext.writeTo(out); } { @@ -475,10 +488,63 @@ public void testSerializeInDifferentContextNoDefaults() throws IOException { assertEquals("bar", otherhreadContext.getHeader("foo")); assertNull(otherhreadContext.getTransient("ctx.foo")); + assertNotNull(otherhreadContext.getHeader("PERF_STATS")); assertEquals("5", otherhreadContext.getHeader("default")); } } + public void testSerializeInDifferentContextNoDefaults1() throws IOException { + BytesStreamOutput out = new BytesStreamOutput(); + { + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + + // This is part of propagators + threadContext.putTransient("PERF_STATS", "abc"); + assertNotNull(threadContext.getTransient("PERF_STATS")); + threadContext.writeTo(out); + } + { + Settings otherSettings = Settings.builder().put("request.headers.default", "5").build(); + ThreadContext otherhreadContext = new ThreadContext(otherSettings); + otherhreadContext.readHeaders(out.bytes().streamInput()); + + // Here its not null - as the transient headers get propagated as part of request headers + // during serialization + assertNotNull(otherhreadContext.getHeader("PERF_STATS")); + } + } + + public void testSerializeInDifferentContextNoDefaultsStash() throws IOException { + BytesStreamOutput out = new BytesStreamOutput(); + { + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + + // This is part of propagators + threadContext.putTransient("PERF_STATS", "abc"); + threadContext.putTransient("random", "cde"); + assertNotNull(threadContext.getTransient("PERF_STATS")); + threadContext.stashContext(); + + // After stash perf stats is not cleared up since its part of propagators + assertNotNull(threadContext.getTransient("PERF_STATS")); + + // This is cleared since its not part of propagators + assertNull(threadContext.getTransient("random")); + + // serializing the threadcontext + threadContext.writeTo(out); + } + { + Settings otherSettings = Settings.builder().put("request.headers.default", "5").build(); + ThreadContext otherhreadContext = new ThreadContext(otherSettings); + otherhreadContext.readHeaders(out.bytes().streamInput()); + + // Here its not null - as the transient headers get propagated as part of request headers + // during serialization + assertNotNull(otherhreadContext.getHeader("PERF_STATS")); + } + } + public void testCanResetDefault() { Settings build = Settings.builder().put("request.headers.default", "1").build(); ThreadContext threadContext = new ThreadContext(build);