From cd5299b9e75f14a37ee5f908784287ab7348c0e7 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Fri, 6 Oct 2023 11:10:04 +0530 Subject: [PATCH] Addressing comments Signed-off-by: Bharathwaj G --- .../admin/cluster/node/stats/NodeStats.java | 22 ++++----- .../cluster/node/stats/NodesStatsRequest.java | 2 +- .../node/stats/TransportNodesStatsAction.java | 2 +- .../main/java/org/opensearch/node/Node.java | 14 +++--- ...tistics.java => NodePerformanceStats.java} | 14 +++--- .../java/org/opensearch/node/NodeService.java | 8 ++-- ...eStats.java => NodesPerformanceStats.java} | 14 +++--- ....java => PerformanceCollectorService.java} | 45 ++++++++++--------- .../tracker/AbstractAverageUsageTracker.java | 3 +- .../tracker/NodePerformanceTracker.java | 28 +++++++++--- .../tracker/PerformanceTrackerSettings.java | 6 +-- .../cluster/node/stats/NodeStatsTests.java | 30 ++++++------- ... => PerformanceCollectorServiceTests.java} | 27 +++++------ .../tracker/NodePerformanceTrackerTests.java | 42 ++++++++++++++++- 14 files changed, 153 insertions(+), 104 deletions(-) rename server/src/main/java/org/opensearch/node/{NodePerformanceStatistics.java => NodePerformanceStats.java} (79%) rename server/src/main/java/org/opensearch/node/{GlobalPerformanceStats.java => NodesPerformanceStats.java} (80%) rename server/src/main/java/org/opensearch/node/{PerfStatsCollectorService.java => PerformanceCollectorService.java} (73%) rename server/src/test/java/org/opensearch/node/{PerfStatsCollectorServiceTests.java => PerformanceCollectorServiceTests.java} (84%) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java index 497836611c7ca..62b32af1f1b75 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java @@ -56,7 +56,7 @@ import org.opensearch.monitor.os.OsStats; import org.opensearch.monitor.process.ProcessStats; import org.opensearch.node.AdaptiveSelectionStats; -import org.opensearch.node.GlobalPerformanceStats; +import org.opensearch.node.NodesPerformanceStats; import org.opensearch.script.ScriptCacheStats; import org.opensearch.script.ScriptStats; import org.opensearch.search.backpressure.stats.SearchBackpressureStats; @@ -144,7 +144,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { private SearchPipelineStats searchPipelineStats; @Nullable - private GlobalPerformanceStats globalPerformanceStats; + private NodesPerformanceStats nodesPerformanceStats; public NodeStats(StreamInput in) throws IOException { super(in); @@ -202,10 +202,10 @@ public NodeStats(StreamInput in) throws IOException { } else { searchPipelineStats = null; } - if (in.getVersion().onOrAfter(Version.V_3_0_0)) { // make it 2.11 when we backport - globalPerformanceStats = in.readOptionalWriteable(GlobalPerformanceStats::new); + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { // make it 2.12 when we backport + nodesPerformanceStats = in.readOptionalWriteable(NodesPerformanceStats::new); } else { - globalPerformanceStats = null; + nodesPerformanceStats = null; } } @@ -225,7 +225,7 @@ public NodeStats( @Nullable DiscoveryStats discoveryStats, @Nullable IngestStats ingestStats, @Nullable AdaptiveSelectionStats adaptiveSelectionStats, - @Nullable GlobalPerformanceStats globalPerformanceStats, + @Nullable NodesPerformanceStats nodesPerformanceStats, @Nullable ScriptCacheStats scriptCacheStats, @Nullable IndexingPressureStats indexingPressureStats, @Nullable ShardIndexingPressureStats shardIndexingPressureStats, @@ -251,7 +251,7 @@ public NodeStats( this.discoveryStats = discoveryStats; this.ingestStats = ingestStats; this.adaptiveSelectionStats = adaptiveSelectionStats; - this.globalPerformanceStats = globalPerformanceStats; + this.nodesPerformanceStats = nodesPerformanceStats; this.scriptCacheStats = scriptCacheStats; this.indexingPressureStats = indexingPressureStats; this.shardIndexingPressureStats = shardIndexingPressureStats; @@ -356,8 +356,8 @@ public AdaptiveSelectionStats getAdaptiveSelectionStats() { } @Nullable - public GlobalPerformanceStats getNodesPerformanceStats() { - return globalPerformanceStats; + public NodesPerformanceStats getNodesPerformanceStats() { + return nodesPerformanceStats; } @Nullable @@ -446,8 +446,8 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_9_0)) { out.writeOptionalWriteable(searchPipelineStats); } - if (out.getVersion().onOrAfter(Version.V_3_0_0)) { // TODO : make it 2.11 when we backport - out.writeOptionalWriteable(globalPerformanceStats); + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { // make it 2.12 when we backport + out.writeOptionalWriteable(nodesPerformanceStats); } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java index 8ddf843f94217..9700b2001cc97 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java @@ -214,7 +214,7 @@ public enum Metric { FILE_CACHE_STATS("file_cache"), TASK_CANCELLATION("task_cancellation"), SEARCH_PIPELINE("search_pipeline"), - GLOBAL_PERFORMANCE_STATS("performance_stats"); + PERFORMANCE_STATS("performance_stats"); private String metricName; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index 098ae85114e1c..5a2b916625ed7 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -125,7 +125,7 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) { NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics), NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics), NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics), - NodesStatsRequest.Metric.GLOBAL_PERFORMANCE_STATS.containedIn(metrics) + NodesStatsRequest.Metric.PERFORMANCE_STATS.containedIn(metrics) ); } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 52c049f4d6a69..0277fdbc74c05 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1075,7 +1075,7 @@ protected Node( settings, clusterService.getClusterSettings() ); - final PerfStatsCollectorService perfStatsCollectorService = new PerfStatsCollectorService( + final PerformanceCollectorService performanceCollectorService = new PerformanceCollectorService( nodePerformanceTracker, clusterService, threadPool @@ -1102,7 +1102,7 @@ protected Node( searchPipelineService, fileCache, taskCancellationMonitoringService, - perfStatsCollectorService + performanceCollectorService ); final SearchService searchService = newSearchService( @@ -1223,8 +1223,8 @@ protected Node( b.bind(RerouteService.class).toInstance(rerouteService); b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator); b.bind(FsHealthService.class).toInstance(fsHealthService); - b.bind(PerfStatsCollectorService.class).toInstance(perfStatsCollectorService); b.bind(NodePerformanceTracker.class).toInstance(nodePerformanceTracker); + b.bind(PerformanceCollectorService.class).toInstance(performanceCollectorService); b.bind(SystemIndices.class).toInstance(systemIndices); b.bind(IdentityService.class).toInstance(identityService); b.bind(Tracer.class).toInstance(tracer); @@ -1342,7 +1342,7 @@ public Node start() throws NodeValidationException { injector.getInstance(SearchService.class).start(); injector.getInstance(FsHealthService.class).start(); injector.getInstance(NodePerformanceTracker.class).start(); - injector.getInstance(PerfStatsCollectorService.class).start(); + injector.getInstance(PerformanceCollectorService.class).start(); nodeService.getMonitorService().start(); nodeService.getSearchBackpressureService().start(); nodeService.getTaskCancellationMonitoringService().start(); @@ -1506,7 +1506,7 @@ private Node stop() { injector.getInstance(NodeConnectionsService.class).stop(); injector.getInstance(FsHealthService.class).stop(); injector.getInstance(NodePerformanceTracker.class).stop(); - injector.getInstance(PerfStatsCollectorService.class).stop(); + injector.getInstance(PerformanceCollectorService.class).stop(); nodeService.getMonitorService().stop(); nodeService.getSearchBackpressureService().stop(); injector.getInstance(GatewayService.class).stop(); @@ -1572,8 +1572,8 @@ public synchronized void close() throws IOException { toClose.add(injector.getInstance(FsHealthService.class)); toClose.add(() -> stopWatch.stop().start("node_performance_tracker")); toClose.add(injector.getInstance(NodePerformanceTracker.class)); - toClose.add(() -> stopWatch.stop().start("perf_stats_collector")); - toClose.add(injector.getInstance(PerfStatsCollectorService.class)); + toClose.add(() -> stopWatch.stop().start("performance_collector")); + toClose.add(injector.getInstance(PerformanceCollectorService.class)); toClose.add(() -> stopWatch.stop().start("gateway")); toClose.add(injector.getInstance(GatewayService.class)); toClose.add(() -> stopWatch.stop().start("search")); diff --git a/server/src/main/java/org/opensearch/node/NodePerformanceStatistics.java b/server/src/main/java/org/opensearch/node/NodePerformanceStats.java similarity index 79% rename from server/src/main/java/org/opensearch/node/NodePerformanceStatistics.java rename to server/src/main/java/org/opensearch/node/NodePerformanceStats.java index 95d1dbd6e9b09..85493fbbf3a4b 100644 --- a/server/src/main/java/org/opensearch/node/NodePerformanceStatistics.java +++ b/server/src/main/java/org/opensearch/node/NodePerformanceStats.java @@ -19,20 +19,20 @@ * This represents the performance stats of a node along with the timestamp at which the stats object was created * in the respective node */ -public class NodePerformanceStatistics implements Writeable { +public class NodePerformanceStats implements Writeable { final String nodeId; long timestamp; double cpuUtilizationPercent; double memoryUtilizationPercent; - public NodePerformanceStatistics(String nodeId, double cpuUtilizationPercent, double memoryUtilizationPercent, long timestamp) { + public NodePerformanceStats(String nodeId, long timestamp, double memoryUtilizationPercent, double cpuUtilizationPercent) { this.nodeId = nodeId; this.cpuUtilizationPercent = cpuUtilizationPercent; this.memoryUtilizationPercent = memoryUtilizationPercent; this.timestamp = timestamp; } - public NodePerformanceStatistics(StreamInput in) throws IOException { + public NodePerformanceStats(StreamInput in) throws IOException { this.nodeId = in.readString(); this.timestamp = in.readLong(); this.cpuUtilizationPercent = in.readDouble(); @@ -58,12 +58,10 @@ public String toString() { return sb.toString(); } - NodePerformanceStatistics(NodePerformanceStatistics nodePerformanceStatistics) { + NodePerformanceStats(NodePerformanceStats nodePerformanceStats) { this( - nodePerformanceStatistics.nodeId, - nodePerformanceStatistics.cpuUtilizationPercent, - nodePerformanceStatistics.memoryUtilizationPercent, - nodePerformanceStatistics.timestamp + nodePerformanceStats.nodeId, + nodePerformanceStats.timestamp, nodePerformanceStats.memoryUtilizationPercent, nodePerformanceStats.cpuUtilizationPercent ); } diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 98124a602a42c..ec3e985db1f1d 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -83,7 +83,7 @@ public class NodeService implements Closeable { private final ScriptService scriptService; private final HttpServerTransport httpServerTransport; private final ResponseCollectorService responseCollectorService; - private final PerfStatsCollectorService perfStatsCollectorService; + private final PerformanceCollectorService performanceCollectorService; private final SearchTransportService searchTransportService; private final IndexingPressureService indexingPressureService; private final AggregationUsageService aggregationUsageService; @@ -116,7 +116,7 @@ public class NodeService implements Closeable { SearchPipelineService searchPipelineService, FileCache fileCache, TaskCancellationMonitoringService taskCancellationMonitoringService, - PerfStatsCollectorService perfStatsCollectorService + PerformanceCollectorService performanceCollectorService ) { this.settings = settings; this.threadPool = threadPool; @@ -139,7 +139,7 @@ public class NodeService implements Closeable { this.clusterService = clusterService; this.fileCache = fileCache; this.taskCancellationMonitoringService = taskCancellationMonitoringService; - this.perfStatsCollectorService = perfStatsCollectorService; + this.performanceCollectorService = performanceCollectorService; clusterService.addStateApplier(ingestService); clusterService.addStateApplier(searchPipelineService); } @@ -241,7 +241,7 @@ public NodeStats stats( discoveryStats ? discovery.stats() : null, ingest ? ingestService.stats() : null, adaptiveSelection ? responseCollectorService.getAdaptiveStats(searchTransportService.getPendingSearchRequests()) : null, - nodesPerfStats ? perfStatsCollectorService.stats() : null, + nodesPerfStats ? performanceCollectorService.stats() : null, scriptCache ? scriptService.cacheStats() : null, indexingPressure ? this.indexingPressureService.nodeStats() : null, shardIndexingPressure ? this.indexingPressureService.shardStats(indices) : null, diff --git a/server/src/main/java/org/opensearch/node/GlobalPerformanceStats.java b/server/src/main/java/org/opensearch/node/NodesPerformanceStats.java similarity index 80% rename from server/src/main/java/org/opensearch/node/GlobalPerformanceStats.java rename to server/src/main/java/org/opensearch/node/NodesPerformanceStats.java index 1617b34802c5d..de196d698ed2f 100644 --- a/server/src/main/java/org/opensearch/node/GlobalPerformanceStats.java +++ b/server/src/main/java/org/opensearch/node/NodesPerformanceStats.java @@ -24,17 +24,17 @@ * This class represents performance stats such as CPU, Memory and IO resource usage of each node along with the time * elapsed from when the stats were recorded. */ -public class GlobalPerformanceStats implements Writeable, ToXContentFragment { +public class NodesPerformanceStats implements Writeable, ToXContentFragment { // Map of node id to perf stats of the corresponding node. - private final Map nodeIdToPerfStatsMap; + private final Map nodeIdToPerfStatsMap; - public GlobalPerformanceStats(Map nodeIdToPerfStatsMap) { + public NodesPerformanceStats(Map nodeIdToPerfStatsMap) { this.nodeIdToPerfStatsMap = nodeIdToPerfStatsMap; } - public GlobalPerformanceStats(StreamInput in) throws IOException { - this.nodeIdToPerfStatsMap = in.readMap(StreamInput::readString, NodePerformanceStatistics::new); + public NodesPerformanceStats(StreamInput in) throws IOException { + this.nodeIdToPerfStatsMap = in.readMap(StreamInput::readString, NodePerformanceStats::new); } @Override @@ -45,7 +45,7 @@ public void writeTo(StreamOutput out) throws IOException { /** * Returns map of node id to perf stats of the corresponding node. */ - public Map getNodeIdToNodePerfStatsMap() { + public Map getNodeIdToNodePerfStatsMap() { return nodeIdToPerfStatsMap; } @@ -54,7 +54,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject("performance_stats"); for (String nodeId : nodeIdToPerfStatsMap.keySet()) { builder.startObject(nodeId); - NodePerformanceStatistics perfStats = nodeIdToPerfStatsMap.get(nodeId); + NodePerformanceStats perfStats = nodeIdToPerfStatsMap.get(nodeId); if (perfStats != null) { builder.field( "elapsed_time", diff --git a/server/src/main/java/org/opensearch/node/PerfStatsCollectorService.java b/server/src/main/java/org/opensearch/node/PerformanceCollectorService.java similarity index 73% rename from server/src/main/java/org/opensearch/node/PerfStatsCollectorService.java rename to server/src/main/java/org/opensearch/node/PerformanceCollectorService.java index 1e7ec793d96d5..23d34701b1ef7 100644 --- a/server/src/main/java/org/opensearch/node/PerfStatsCollectorService.java +++ b/server/src/main/java/org/opensearch/node/PerformanceCollectorService.java @@ -21,7 +21,6 @@ import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; -import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -31,16 +30,16 @@ * This collects node level performance statistics such as cpu, memory, IO of each node and makes it available for * coordinator node to aid in throttling, ranking etc */ -public class PerfStatsCollectorService extends AbstractLifecycleComponent implements ClusterStateListener { +public class PerformanceCollectorService extends AbstractLifecycleComponent implements ClusterStateListener { /** - * This refresh interval denotes the polling interval of PerfStatsCollectorService to refresh the performance stats + * This refresh interval denotes the polling interval of PerformanceCollectorService to refresh the performance stats * from local node */ private static long REFRESH_INTERVAL_IN_MILLIS = 1000; - private static final Logger logger = LogManager.getLogger(PerfStatsCollectorService.class); - private final ConcurrentMap nodeIdToPerfStats = ConcurrentCollections.newConcurrentMap(); + private static final Logger logger = LogManager.getLogger(PerformanceCollectorService.class); + private final ConcurrentMap nodeIdToPerfStats = ConcurrentCollections.newConcurrentMap(); private ThreadPool threadPool; private volatile Scheduler.Cancellable scheduledFuture; @@ -48,7 +47,11 @@ public class PerfStatsCollectorService extends AbstractLifecycleComponent implem private NodePerformanceTracker nodePerformanceTracker; private ClusterService clusterService; - public PerfStatsCollectorService(NodePerformanceTracker nodePerformanceTracker, ClusterService clusterService, ThreadPool threadPool) { + public PerformanceCollectorService( + NodePerformanceTracker nodePerformanceTracker, + ClusterService clusterService, + ThreadPool threadPool + ) { this.threadPool = threadPool; this.nodePerformanceTracker = nodePerformanceTracker; this.clusterService = clusterService; @@ -71,10 +74,10 @@ void removeNodePerfStatistics(String nodeId) { /** * Collect node performance statistics along with the timestamp */ - public void collectNodePerfStatistics(String nodeId, double cpuUtilizationPercent, double memoryUtilizationPercent, long timestamp) { + public void collectNodePerfStatistics(String nodeId, long timestamp, double memoryUtilizationPercent, double cpuUtilizationPercent) { nodeIdToPerfStats.compute(nodeId, (id, nodePerfStats) -> { if (nodePerfStats == null) { - return new NodePerformanceStatistics(nodeId, cpuUtilizationPercent, memoryUtilizationPercent, timestamp); + return new NodePerformanceStats(nodeId, timestamp, memoryUtilizationPercent, cpuUtilizationPercent); } else { nodePerfStats.cpuUtilizationPercent = cpuUtilizationPercent; nodePerfStats.memoryUtilizationPercent = memoryUtilizationPercent; @@ -87,9 +90,9 @@ public void collectNodePerfStatistics(String nodeId, double cpuUtilizationPercen /** * Get all node statistics which will be used for node stats */ - public Map getAllNodeStatistics() { - Map nodeStats = new HashMap<>(nodeIdToPerfStats.size()); - nodeIdToPerfStats.forEach((nodeId, nodePerfStats) -> { nodeStats.put(nodeId, new NodePerformanceStatistics(nodePerfStats)); }); + public Map getAllNodeStatistics() { + Map nodeStats = new HashMap<>(nodeIdToPerfStats.size()); + nodeIdToPerfStats.forEach((nodeId, nodePerfStats) -> { nodeStats.put(nodeId, new NodePerformanceStats(nodePerfStats)); }); return nodeStats; } @@ -98,27 +101,27 @@ public Map getAllNodeStatistics() { * performance stats information exists for the given node. Returns an empty * {@code Optional} if the node was not found. */ - public Optional getNodeStatistics(final String nodeId) { - return Optional.ofNullable(nodeIdToPerfStats.get(nodeId)).map(perfStats -> new NodePerformanceStatistics(perfStats)); + public Optional getNodeStatistics(final String nodeId) { + return Optional.ofNullable(nodeIdToPerfStats.get(nodeId)).map(perfStats -> new NodePerformanceStats(perfStats)); } /** * Returns collected performance statistics of all nodes */ - public GlobalPerformanceStats stats() { - return new GlobalPerformanceStats(getAllNodeStatistics()); + public NodesPerformanceStats stats() { + return new NodesPerformanceStats(getAllNodeStatistics()); } /** * Fetch local node performance statistics and add it to store along with the current timestamp */ - private void getLocalNodePerformanceStats() { + private void collectLocalNodePerformanceStats() { if (nodePerformanceTracker.isReady() && clusterService.state() != null) { collectNodePerfStatistics( clusterService.state().nodes().getLocalNodeId(), - nodePerformanceTracker.getCpuUtilizationPercent(), + System.currentTimeMillis(), nodePerformanceTracker.getMemoryUtilizationPercent(), - System.currentTimeMillis() + nodePerformanceTracker.getCpuUtilizationPercent() ); } } @@ -130,9 +133,9 @@ protected void doStart() { */ scheduledFuture = threadPool.scheduleWithFixedDelay(() -> { try { - getLocalNodePerformanceStats(); + collectLocalNodePerformanceStats(); } catch (Exception e) { - logger.warn("failure in PerfStatsCollectorService", e); + logger.warn("failure in PerformanceCollectorService", e); } }, new TimeValue(REFRESH_INTERVAL_IN_MILLIS), ThreadPool.Names.GENERIC); } @@ -145,5 +148,5 @@ protected void doStop() { } @Override - protected void doClose() throws IOException {} + protected void doClose() {} } diff --git a/server/src/main/java/org/opensearch/ratelimiting/tracker/AbstractAverageUsageTracker.java b/server/src/main/java/org/opensearch/ratelimiting/tracker/AbstractAverageUsageTracker.java index 0ed53e424b3a5..40917f3c38894 100644 --- a/server/src/main/java/org/opensearch/ratelimiting/tracker/AbstractAverageUsageTracker.java +++ b/server/src/main/java/org/opensearch/ratelimiting/tracker/AbstractAverageUsageTracker.java @@ -16,7 +16,6 @@ import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; -import java.io.IOException; import java.util.concurrent.atomic.AtomicReference; /** @@ -91,5 +90,5 @@ protected void doStop() { } @Override - protected void doClose() throws IOException {} + protected void doClose() {} } diff --git a/server/src/main/java/org/opensearch/ratelimiting/tracker/NodePerformanceTracker.java b/server/src/main/java/org/opensearch/ratelimiting/tracker/NodePerformanceTracker.java index 5f4445c4d1abe..5776571925ab8 100644 --- a/server/src/main/java/org/opensearch/ratelimiting/tracker/NodePerformanceTracker.java +++ b/server/src/main/java/org/opensearch/ratelimiting/tracker/NodePerformanceTracker.java @@ -11,10 +11,9 @@ import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.threadpool.ThreadPool; -import java.io.IOException; - /** * This tracks the performance of node resources such as CPU, IO and memory */ @@ -29,7 +28,7 @@ public class NodePerformanceTracker extends AbstractLifecycleComponent { public NodePerformanceTracker(ThreadPool threadPool, Settings settings, ClusterSettings clusterSettings) { this.threadPool = threadPool; this.clusterSettings = clusterSettings; - this.performanceTrackerSettings = new PerformanceTrackerSettings(settings, clusterSettings); + this.performanceTrackerSettings = new PerformanceTrackerSettings(settings); initialize(); } @@ -68,7 +67,7 @@ void initialize() { ); clusterSettings.addSettingsUpdateConsumer( PerformanceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING, - cpuUsageTracker::setWindowSize + this::setCpuWindowDuration ); memoryUsageTracker = new AverageMemoryUsageTracker( @@ -78,10 +77,27 @@ void initialize() { ); clusterSettings.addSettingsUpdateConsumer( PerformanceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING, - memoryUsageTracker::setWindowSize + this::setMemoryWindowDuration ); } + private void setMemoryWindowDuration(TimeValue windowDuration) { + memoryUsageTracker.setWindowSize(windowDuration); + performanceTrackerSettings.setMemoryWindowDuration(windowDuration); + } + + private void setCpuWindowDuration(TimeValue windowDuration) { + cpuUsageTracker.setWindowSize(windowDuration); + performanceTrackerSettings.setCpuWindowDuration(windowDuration); + } + + /** + * Visible for testing + */ + public PerformanceTrackerSettings getPerformanceTrackerSettings() { + return performanceTrackerSettings; + } + @Override protected void doStart() { cpuUsageTracker.doStart(); @@ -95,7 +111,7 @@ protected void doStop() { } @Override - protected void doClose() throws IOException { + protected void doClose() { cpuUsageTracker.doClose(); memoryUsageTracker.doClose(); } diff --git a/server/src/main/java/org/opensearch/ratelimiting/tracker/PerformanceTrackerSettings.java b/server/src/main/java/org/opensearch/ratelimiting/tracker/PerformanceTrackerSettings.java index bb5babe77bd74..23e8652b7c233 100644 --- a/server/src/main/java/org/opensearch/ratelimiting/tracker/PerformanceTrackerSettings.java +++ b/server/src/main/java/org/opensearch/ratelimiting/tracker/PerformanceTrackerSettings.java @@ -8,7 +8,6 @@ package org.opensearch.ratelimiting.tracker; -import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -58,14 +57,11 @@ private static class Defaults { private volatile TimeValue memoryWindowDuration; private volatile TimeValue memoryPollingInterval; - public PerformanceTrackerSettings(Settings settings, ClusterSettings clusterSettings) { + public PerformanceTrackerSettings(Settings settings) { this.cpuPollingInterval = GLOBAL_CPU_USAGE_AC_POLLING_INTERVAL_SETTING.get(settings); this.cpuWindowDuration = GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.get(settings); this.memoryPollingInterval = GLOBAL_JVM_USAGE_AC_POLLING_INTERVAL_SETTING.get(settings); this.memoryWindowDuration = GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.get(settings); - - clusterSettings.addSettingsUpdateConsumer(GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING, this::setCpuWindowDuration); - clusterSettings.addSettingsUpdateConsumer(GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING, this::setMemoryWindowDuration); } public TimeValue getCpuWindowDuration() { diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index 8a784f2407128..aed2c3a72ef0f 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -59,8 +59,8 @@ import org.opensearch.monitor.os.OsStats; import org.opensearch.monitor.process.ProcessStats; import org.opensearch.node.AdaptiveSelectionStats; -import org.opensearch.node.GlobalPerformanceStats; -import org.opensearch.node.NodePerformanceStatistics; +import org.opensearch.node.NodePerformanceStats; +import org.opensearch.node.NodesPerformanceStats; import org.opensearch.node.ResponseCollectorService; import org.opensearch.script.ScriptCacheStats; import org.opensearch.script.ScriptStats; @@ -395,14 +395,14 @@ public void testSerialization() throws IOException { assertEquals(aStats.responseTime, bStats.responseTime, 0.01); }); } - GlobalPerformanceStats globalPerformanceStats = nodeStats.getNodesPerformanceStats(); - GlobalPerformanceStats deserializedNodePerfStats = deserializedNodeStats.getNodesPerformanceStats(); - if (globalPerformanceStats == null) { + NodesPerformanceStats nodesPerformanceStats = nodeStats.getNodesPerformanceStats(); + NodesPerformanceStats deserializedNodePerfStats = deserializedNodeStats.getNodesPerformanceStats(); + if (nodesPerformanceStats == null) { assertNull(deserializedNodePerfStats); } else { - globalPerformanceStats.getNodeIdToNodePerfStatsMap().forEach((k, v) -> { - NodePerformanceStatistics aPerfStats = globalPerformanceStats.getNodeIdToNodePerfStatsMap().get(k); - NodePerformanceStatistics bPerfStats = globalPerformanceStats.getNodeIdToNodePerfStatsMap().get(k); + nodesPerformanceStats.getNodeIdToNodePerfStatsMap().forEach((k, v) -> { + NodePerformanceStats aPerfStats = nodesPerformanceStats.getNodeIdToNodePerfStatsMap().get(k); + NodePerformanceStats bPerfStats = nodesPerformanceStats.getNodeIdToNodePerfStatsMap().get(k); assertEquals(aPerfStats.getMemoryUtilizationPercent(), bPerfStats.getMemoryUtilizationPercent(), 0.0); assertEquals(aPerfStats.getCpuUtilizationPercent(), bPerfStats.getCpuUtilizationPercent(), 0.0); assertEquals(aPerfStats.getTimestamp(), bPerfStats.getTimestamp()); @@ -771,11 +771,11 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) { } adaptiveSelectionStats = new AdaptiveSelectionStats(nodeConnections, nodeStats); } - GlobalPerformanceStats globalPerformanceStats = null; + NodesPerformanceStats nodesPerformanceStats = null; if (frequently()) { int numNodes = randomIntBetween(0, 10); Map nodeConnections = new HashMap<>(); - Map nodePerfStats = new HashMap<>(); + Map nodePerfStats = new HashMap<>(); for (int i = 0; i < numNodes; i++) { String nodeId = randomAlphaOfLengthBetween(3, 10); // add outgoing connection info @@ -784,16 +784,14 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) { } // add node calculations if (frequently()) { - NodePerformanceStatistics stats = new NodePerformanceStatistics( + NodePerformanceStats stats = new NodePerformanceStats( nodeId, - randomDoubleBetween(1.0, 100.0, true), - randomDoubleBetween(1.0, 100.0, true), - System.currentTimeMillis() + System.currentTimeMillis(), randomDoubleBetween(1.0, 100.0, true), randomDoubleBetween(1.0, 100.0, true) ); nodePerfStats.put(nodeId, stats); } } - globalPerformanceStats = new GlobalPerformanceStats(nodePerfStats); + nodesPerformanceStats = new NodesPerformanceStats(nodePerfStats); } ClusterManagerThrottlingStats clusterManagerThrottlingStats = null; if (frequently()) { @@ -826,7 +824,7 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) { discoveryStats, ingestStats, adaptiveSelectionStats, - globalPerformanceStats, + nodesPerformanceStats, scriptCacheStats, null, null, diff --git a/server/src/test/java/org/opensearch/node/PerfStatsCollectorServiceTests.java b/server/src/test/java/org/opensearch/node/PerformanceCollectorServiceTests.java similarity index 84% rename from server/src/test/java/org/opensearch/node/PerfStatsCollectorServiceTests.java rename to server/src/test/java/org/opensearch/node/PerformanceCollectorServiceTests.java index a819c69ee5d5b..0b3ab783286c9 100644 --- a/server/src/test/java/org/opensearch/node/PerfStatsCollectorServiceTests.java +++ b/server/src/test/java/org/opensearch/node/PerformanceCollectorServiceTests.java @@ -35,12 +35,13 @@ import static org.hamcrest.Matchers.greaterThan; /** - * Tests for PerfStatsCollectorService where we test collect, get and schedulers are working as expected + * Tests for PerformanceCollectorService where we test collect method, get method and whether schedulers + * are working as expected */ -public class PerfStatsCollectorServiceTests extends OpenSearchTestCase { +public class PerformanceCollectorServiceTests extends OpenSearchTestCase { private ClusterService clusterService; - private PerfStatsCollectorService collector; + private PerformanceCollectorService collector; private ThreadPool threadpool; NodePerformanceTracker tracker; @@ -60,7 +61,7 @@ public void setUp() throws Exception { settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) ); - collector = new PerfStatsCollectorService(tracker, clusterService, threadpool); + collector = new PerformanceCollectorService(tracker, clusterService, threadpool); tracker.start(); collector.start(); } @@ -77,13 +78,13 @@ public void tearDown() throws Exception { } public void testNodePerformanceStats() { - collector.collectNodePerfStatistics("node1", 99, 97, System.currentTimeMillis()); - Map nodeStats = collector.getAllNodeStatistics(); + collector.collectNodePerfStatistics("node1", System.currentTimeMillis(), 97, 99); + Map nodeStats = collector.getAllNodeStatistics(); assertTrue(nodeStats.containsKey("node1")); assertEquals(99.0, nodeStats.get("node1").cpuUtilizationPercent, 0.0); assertEquals(97.0, nodeStats.get("node1").memoryUtilizationPercent, 0.0); - Optional nodePerformanceStatistics = collector.getNodeStatistics("node1"); + Optional nodePerformanceStatistics = collector.getNodeStatistics("node1"); assertNotNull(nodePerformanceStatistics.get()); assertEquals(99.0, nodePerformanceStatistics.get().cpuUtilizationPercent, 0.0); @@ -134,9 +135,9 @@ public void testConcurrentAddingAndRemovingNodes() throws Exception { } collector.collectNodePerfStatistics( randomFrom(nodes), + System.currentTimeMillis(), randomIntBetween(1, 100), - randomIntBetween(1, 100), - System.currentTimeMillis() + randomIntBetween(1, 100) ); } }; @@ -156,7 +157,7 @@ public void testConcurrentAddingAndRemovingNodes() throws Exception { t3.join(); t4.join(); - final Map nodeStats = collector.getAllNodeStatistics(); + final Map nodeStats = collector.getAllNodeStatistics(); logger.info("--> got stats: {}", nodeStats); for (String nodeId : nodes) { if (nodeStats.containsKey(nodeId)) { @@ -167,8 +168,8 @@ public void testConcurrentAddingAndRemovingNodes() throws Exception { } public void testNodeRemoval() { - collector.collectNodePerfStatistics("node1", randomIntBetween(1, 100), randomIntBetween(1, 100), System.currentTimeMillis()); - collector.collectNodePerfStatistics("node2", randomIntBetween(1, 100), randomIntBetween(1, 100), System.currentTimeMillis()); + collector.collectNodePerfStatistics("node1", System.currentTimeMillis(), randomIntBetween(1, 100), randomIntBetween(1, 100)); + collector.collectNodePerfStatistics("node2", System.currentTimeMillis(), randomIntBetween(1, 100), randomIntBetween(1, 100)); ClusterState previousState = ClusterState.builder(new ClusterName("cluster")) .nodes( @@ -183,7 +184,7 @@ public void testNodeRemoval() { ClusterChangedEvent event = new ClusterChangedEvent("test", newState, previousState); collector.clusterChanged(event); - final Map nodeStats = collector.getAllNodeStatistics(); + final Map nodeStats = collector.getAllNodeStatistics(); assertTrue(nodeStats.containsKey("node1")); assertFalse(nodeStats.containsKey("node2")); } diff --git a/server/src/test/java/org/opensearch/ratelimiting/tracker/NodePerformanceTrackerTests.java b/server/src/test/java/org/opensearch/ratelimiting/tracker/NodePerformanceTrackerTests.java index f19fc5fcc402f..40d59dd7f7207 100644 --- a/server/src/test/java/org/opensearch/ratelimiting/tracker/NodePerformanceTrackerTests.java +++ b/server/src/test/java/org/opensearch/ratelimiting/tracker/NodePerformanceTrackerTests.java @@ -8,10 +8,11 @@ package org.opensearch.ratelimiting.tracker; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; -import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.OpenSearchSingleNodeTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.junit.After; @@ -19,12 +20,13 @@ import java.util.concurrent.TimeUnit; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.greaterThan; /** * Tests to assert node performance trackers retrieving resource utilization averages */ -public class NodePerformanceTrackerTests extends OpenSearchTestCase { +public class NodePerformanceTrackerTests extends OpenSearchSingleNodeTestCase { ThreadPool threadPool; @Before @@ -35,6 +37,13 @@ public void setup() { @After public void cleanup() { ThreadPool.terminate(threadPool, 5, TimeUnit.SECONDS); + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().putNull("*")) + .setTransientSettings(Settings.builder().putNull("*")) + ); } public void testStats() throws Exception { @@ -55,4 +64,33 @@ public void testStats() throws Exception { tracker.stop(); tracker.close(); } + + public void testUpdateSettings() { + NodePerformanceTracker tracker = new NodePerformanceTracker( + threadPool, + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + + assertEquals(tracker.getPerformanceTrackerSettings().getCpuWindowDuration().getSeconds(), 30); + assertEquals(tracker.getPerformanceTrackerSettings().getMemoryWindowDuration().getSeconds(), 30); + + Settings settings = Settings.builder() + .put(PerformanceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), "10s") + .build(); + ClusterUpdateSettingsResponse response = client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings).get(); + assertEquals( + "10s", + response.getPersistentSettings().get(PerformanceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.getKey()) + ); + + Settings jvmsettings = Settings.builder() + .put(PerformanceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), "5s") + .build(); + response = client().admin().cluster().prepareUpdateSettings().setPersistentSettings(jvmsettings).get(); + assertEquals( + "5s", + response.getPersistentSettings().get(PerformanceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.getKey()) + ); + } }