Skip to content

Commit

Permalink
Resource usage propagator changes
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed Dec 25, 2023
1 parent 327b1dc commit e920b1f
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.http.HttpTransportSettings;
import org.opensearch.tasks.ResourceUsageStatsTCPropagator;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskThreadContextStatePropagator;

Expand Down Expand Up @@ -126,7 +127,8 @@ public ThreadContext(Settings settings) {
this.threadLocal = ThreadLocal.withInitial(() -> DEFAULT_CONTEXT);
this.maxWarningHeaderCount = SETTING_HTTP_MAX_WARNING_HEADER_COUNT.get(settings);
this.maxWarningHeaderSize = SETTING_HTTP_MAX_WARNING_HEADER_SIZE.get(settings).getBytes();
this.propagators = new CopyOnWriteArrayList<>(List.of(new TaskThreadContextStatePropagator()));
this.propagators = new CopyOnWriteArrayList<>(List.of(new TaskThreadContextStatePropagator(),
new ResourceUsageStatsTCPropagator()));
}

public void registerThreadContextStatePropagator(final ThreadContextStatePropagator propagator) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tasks;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.opensearch.common.util.concurrent.ThreadContextStatePropagator;

import static org.opensearch.tasks.TaskResourceTrackingService.TASK_ID;


public class ResourceUsageStatsTCPropagator implements ThreadContextStatePropagator {
public static final String NODE_RESOURCE_STATS = "PERF_STATS";
@Override
public Map<String, Object> transients(Map<String, Object> source) {
final Map<String, Object> transients = new HashMap<>();
for(Map.Entry<String, Object> entry : source.entrySet()) {
if(entry.getKey().startsWith(NODE_RESOURCE_STATS)) {
// key starts with prefix
transients.put(entry.getKey(), entry.getValue());
}
}
return transients;
}

@Override
public Map<String, String> headers(Map<String, Object> source) {
final Map<String, String> headers = new HashMap<>();
for(Map.Entry<String, Object> entry : source.entrySet()) {
if(entry.getKey().startsWith(NODE_RESOURCE_STATS)) {
// key starts with prefix
headers.put(entry.getKey(), entry.getValue().toString());
}
}
return headers;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport;

public class ResourceUsageStatsReference {
private String resourceUsageStats;

public ResourceUsageStatsReference(String stats) {
this.resourceUsageStats = stats;
}

public String getResourceUsageStats() {
return resourceUsageStats;
}

public void setResourceUsageStats(String stats) {
this.resourceUsageStats = new String(stats);
}

@Override
public String toString() {
return this.resourceUsageStats;
}

}
174 changes: 109 additions & 65 deletions server/src/main/java/org/opensearch/transport/TransportService.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,53 @@ public TransportService(
);
}

public TransportService(
Settings settings,
Transport transport,
ThreadPool threadPool,
TransportInterceptor transportInterceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
@Nullable ClusterSettings clusterSettings,
Set<String> taskHeaders,
Tracer tracer) {
this(
settings,
transport,
threadPool,
transportInterceptor,
localNodeFactory,
clusterSettings,
taskHeaders,
new ClusterConnectionManager(settings, transport),
tracer,
null
);

}
public TransportService(
Settings settings,
Transport transport,
ThreadPool threadPool,
TransportInterceptor transportInterceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
@Nullable ClusterSettings clusterSettings,
Set<String> taskHeaders,
ConnectionManager connectionManager,
Tracer tracer) {
this(
settings,
transport,
threadPool,
transportInterceptor,
localNodeFactory,
clusterSettings,
taskHeaders,
new ClusterConnectionManager(settings, transport),
tracer,
null
);
}

public TransportService(
Settings settings,
Transport transport,
Expand Down Expand Up @@ -932,62 +979,23 @@ public String toString() {
}

private void addResourceUsageStatsToThreadContext(String action) {
//if(action.startsWith("indices:data/read/search")) {
if (resourceUsageCollectorService.getLocalNodeStatistics().isPresent()) {
threadPool.getThreadContext().addResponseHeader("PERF_STATS",
resourceUsageCollectorService.getLocalNodeStatistics().get().toString());
}
//}
}

public final <T extends TransportResponse> void sendRequest(
final Transport.Connection connection,
final String action,
final TransportRequest request,
final TransportRequestOptions options,
final TransportResponseHandler<T> handler,
final boolean shouldAddResourceUsageStats
) {
final TransportResponseHandler<T> delegate;
if(shouldAddResourceUsageStats) {
delegate = new TransportResponseHandler<T>() {
@Override
public void handleResponse(T response) {
if(resourceUsageCollectorService.getLocalNodeStatistics().isPresent()) {
threadPool.getThreadContext().addResponseHeader("PERF_STATS",
resourceUsageCollectorService.getLocalNodeStatistics().get().toString());
}
handler.handleResponse(response);
}

@Override
public void handleException(TransportException exp) {
if(resourceUsageCollectorService.getLocalNodeStatistics().isPresent()) {
threadPool.getThreadContext().addResponseHeader("PERF_STATS",
resourceUsageCollectorService.getLocalNodeStatistics().get().toString());
}
handler.handleException(exp);
}

@Override
public String executor() {
return handler.executor();
}

@Override
public T read(StreamInput in) throws IOException {
return handler.read(in);
}

@Override
public String toString() {
return getClass().getName() + "/[" + action + "]:" + handler.toString();
if (resourceUsageCollectorService.getLocalNodeStatistics().isPresent()) {
try {
ResourceUsageStatsReference statsReference = threadPool.getThreadContext()
.getTransient("PERF_STATS" + localNode.getId());
if(statsReference != null) {
statsReference.setResourceUsageStats(resourceUsageCollectorService.getLocalNodeStatistics().get().toString());
} else {
threadPool.getThreadContext().putTransient("PERF_STATS" + localNode.getId(),
new ResourceUsageStatsReference(resourceUsageCollectorService.getLocalNodeStatistics().get().toString()));
}
};
} else {
delegate = handler;
} catch (Exception e) {
logger.info("===EXCEPTION=== {} ===action=== {}", e.getMessage(), action);
}
// Todo : remove this , added this for asserting response equaling request
threadPool.getThreadContext().addResponseHeader("PERF_STATS" + localNode.getId(),
resourceUsageCollectorService.getLocalNodeStatistics().get().toString());
}
sendRequest(connection, action, request, options, delegate);
}

/**
Expand Down Expand Up @@ -1319,32 +1327,35 @@ public void onRequestReceived(long requestId, String action) {
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {
tracerLog.trace("[{}][{}] received request", requestId, action);
}
addStatsToResourceUsageCollectorService();
addStatsToResourceUsageCollectorServiceFromRequestHeaders();
messageListener.onRequestReceived(requestId, action);
}

private void addStatsToResourceUsageCollectorService() {
try {
Map<String, List<String>> responseHeaders = threadPool.getThreadContext().getResponseHeaders();

if (responseHeaders.size() > 0) {
List<String> perfStats = responseHeaders.get("PERF_STATS");
if(perfStats.size() == 0) return;
// nodeid:111113131313,11.0,11.0
// NodeResourceUsageStats[aaxnzZb7R3KdRqjqXfv8SQ](Timestamp: 1699253278365, CPU utilization percent: 3.1, Memory utilization percent: 25.0)

StringBuilder sb = new StringBuilder();
String nodeId = perfStats.get(0).substring(0, perfStats.get(0).indexOf(':') + 1);
String nodeId = perfStats.get(0).substring(0, perfStats.get(0).indexOf(':'));
if(nodeId.length() == 0)
if (resourceUsageCollectorService.getNodeStatistics(nodeId).isPresent()) {
long timestamp = resourceUsageCollectorService.getNodeStatistics(nodeId).get().getTimestamp();
if (System.currentTimeMillis() - timestamp < 1000) {
logger.warn("Node resource usage stats is updated recently - so skipping");
logger.info("Node resource usage stats is updated recently - so skipping");
} else {
String[] parse = perfStats.get(0).split(":");
String[] parse1 = parse[1].split(",");
String datatimestamp = parse1[0];
String cpu = parse1[1];
String memory = parse1[2];
resourceUsageCollectorService.collectNodeResourceUsageStats(nodeId, Long.valueOf(datatimestamp), Double.valueOf(cpu), Double.valueOf(memory));
//logger.warn("Updates stats");
logger.info("Updates stats");
}
} else {
String[] parse = perfStats.get(0).split(":");
Expand All @@ -1353,15 +1364,48 @@ private void addStatsToResourceUsageCollectorService() {
String cpu = parse1[1];
String memory = parse1[2];
resourceUsageCollectorService.collectNodeResourceUsageStats(nodeId, Long.valueOf(datatimestamp), Double.valueOf(cpu), Double.valueOf(memory));
logger.warn("added stats");
logger.info("added stats");
}
// String[] parse = responseHeaders.get("PERF_STATS").get(0).split(":")[1].split("NodeResourceUsageStats\\[")[1].split("]");
// String nodeId = parse[0];
// String[] parse1 = parse[1].split("\\(")[1].split(": ");
// String timestamp = parse1[1].split(",")[0];
// String cpu = parse1[2].split(",")[0];
// String memory = parse1[3].split("\\)")[0];
}
} catch(Exception e){
logger.warn("Adding stats failed : ", e);
}
}

private void addStatsToResourceUsageCollectorServiceFromRequestHeaders() {
try {

for(Map.Entry<String,String> entry : threadPool.getThreadContext().getHeaders().entrySet()) {
if(entry.getKey().contains("PERF_STATS")) {
String perfStats = entry.getValue();
assert(threadPool.getThreadContext().getResponseHeaders().get(entry.getKey()).contains(entry.getValue()));
String nodeId = perfStats.substring(0, perfStats.indexOf(':'));
if (resourceUsageCollectorService.getNodeStatistics(nodeId).isPresent()) {
long timestamp = resourceUsageCollectorService.getNodeStatistics(nodeId).get().getTimestamp();
if (System.currentTimeMillis() - timestamp < 1000) {
logger.info("Node resource usage stats is updated recently - so skipping");
} else {
String[] parse = perfStats.split(":");
String[] parse1 = parse[1].split(",");
String datatimestamp = parse1[0];
String cpu = parse1[1];
String memory = parse1[2];
resourceUsageCollectorService.collectNodeResourceUsageStats(nodeId,
Long.valueOf(datatimestamp), Double.valueOf(cpu), Double.valueOf(memory));
logger.info("Updated stats");
}
} else {
String[] parse = perfStats.split(":");
String[] parse1 = parse[1].split(",");
String datatimestamp = parse1[0];
String cpu = parse1[1];
String memory = parse1[2];
resourceUsageCollectorService.collectNodeResourceUsageStats(nodeId,
Long.valueOf(datatimestamp),
Double.valueOf(cpu), Double.valueOf(memory));
logger.info("added stats");
}
}
}
} catch(Exception e){
logger.warn("Adding stats failed : ", e);
Expand Down
Loading

0 comments on commit e920b1f

Please sign in to comment.