Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed Sep 15, 2023
1 parent 20f5d1a commit bf75f28
Show file tree
Hide file tree
Showing 12 changed files with 455 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util;

/**
* MovingAverage is used to calculate the moving average of last 'n' observations of double type.
*
* @opensearch.internal
*/
public class DoubleMovingAverage {
private final int windowSize;
private final double[] observations;

private volatile long count = 0;
private volatile double sum = 0.0;
private volatile double average = 0.0;

public DoubleMovingAverage(int windowSize) {
checkWindowSize(windowSize);
this.windowSize = windowSize;
this.observations = new double[windowSize];
}

/**
* Used for changing the window size of {@code MovingAverage}.
*
* @param newWindowSize new window size.
* @return copy of original object with updated size.
*/
public DoubleMovingAverage copyWithSize(int newWindowSize) {
DoubleMovingAverage copy = new DoubleMovingAverage(newWindowSize);
// Start is inclusive, but end is exclusive
long start, end = count;
if (isReady() == false) {
start = 0;
} else {
start = end - windowSize;
}
// If the newWindow Size is smaller than the elements eligible to be copied over, then we adjust the start value
if (end - start > newWindowSize) {
start = end - newWindowSize;
}
for (int i = (int) start; i < end; i++) {
copy.record(observations[i % observations.length]);
}
return copy;
}

private void checkWindowSize(int size) {
if (size <= 0) {
throw new IllegalArgumentException("window size must be greater than zero");
}
}

/**
* Records a new observation and evicts the n-th last observation.
*/
public synchronized double record(double value) {
double delta = value - observations[(int) (count % observations.length)];
observations[(int) (count % observations.length)] = value;

count++;
sum += delta;
average = sum / (double) Math.min(count, observations.length);
return average;
}

public double getAverage() {
return average;
}

public long getCount() {
return count;
}

public boolean isReady() {
return count >= windowSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class MovingAverage {
private final long[] observations;

private volatile long count = 0;
private volatile long sum = 0;
private volatile double sum = 0;
private volatile double average = 0;

public MovingAverage(int windowSize) {
Expand Down Expand Up @@ -67,7 +67,7 @@ public synchronized double record(long value) {

count++;
sum += delta;
average = (double) sum / Math.min(count, observations.length);
average = sum / Math.min(count, observations.length);
return average;
}

Expand Down
15 changes: 15 additions & 0 deletions server/src/main/java/org/opensearch/monitor/fs/FsInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.monitor.fs;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.common.Nullable;
import org.opensearch.core.common.io.stream.StreamInput;
Expand All @@ -41,6 +43,7 @@
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.throttling.tracker.AverageCpuUsageTracker;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -223,6 +226,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
* @opensearch.internal]
*/
public static class DeviceStats implements Writeable, ToXContentFragment {
private static final Logger logger = LogManager.getLogger(DeviceStats.class);

final int majorDeviceNumber;
final int minorDeviceNumber;
Expand Down Expand Up @@ -389,11 +393,14 @@ public long operations() {
public long readOperations() {
if (previousReadsCompleted == -1) return -1;

//logger.info("Current reads : {} , Previous reads : {}", currentReadsCompleted, previousReadsCompleted);

return (currentReadsCompleted - previousReadsCompleted);
}

public long writeOperations() {
if (previousWritesCompleted == -1) return -1;
//logger.info("Current writes : {} , Previous writes : {}", currentWritesCompleted, previousWritesCompleted);

return (currentWritesCompleted - previousWritesCompleted);
}
Expand All @@ -412,6 +419,14 @@ public long readKilobytes() {
return (currentSectorsRead - previousSectorsRead) / 2;
}

public long getCurrentReadKilobytes() {
return currentSectorsRead / 2;
}

public long getCurrentWriteKilobytes() {
return currentSectorsWritten / 2;
}

public long writeKilobytes() {
if (previousSectorsWritten == -1) return -1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.throttling.tracker.AverageDiskStats;

import java.io.IOException;
import java.util.Locale;
Expand All @@ -25,17 +26,22 @@ public class NodePerformanceStatistics implements Writeable {
double cpuUtilizationPercent;
double memoryUtilizationPercent;

public NodePerformanceStatistics(String nodeId, double cpuUtilizationPercent, double memoryUtilizationPercent, long timestamp) {
AverageDiskStats averageDiskStats;

public NodePerformanceStatistics(String nodeId, double cpuUtilizationPercent, double memoryUtilizationPercent,
AverageDiskStats averageDiskStats, long timestamp) {
this.nodeId = nodeId;
this.cpuUtilizationPercent = cpuUtilizationPercent;
this.memoryUtilizationPercent = memoryUtilizationPercent;
this.averageDiskStats = averageDiskStats;
this.timestamp = timestamp;
}

public NodePerformanceStatistics(StreamInput in) throws IOException {
this.nodeId = in.readString();
this.cpuUtilizationPercent = in.readDouble();
this.memoryUtilizationPercent = in.readDouble();
this.averageDiskStats = new AverageDiskStats(in);
this.timestamp = in.readLong();
}

Expand All @@ -44,6 +50,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(this.nodeId);
out.writeDouble(this.cpuUtilizationPercent);
out.writeDouble(this.memoryUtilizationPercent);
this.averageDiskStats.writeTo(out);
out.writeLong(this.timestamp);
}

Expand All @@ -63,6 +70,7 @@ public String toString() {
nodePerformanceStatistics.nodeId,
nodePerformanceStatistics.cpuUtilizationPercent,
nodePerformanceStatistics.memoryUtilizationPercent,
nodePerformanceStatistics.averageDiskStats,
nodePerformanceStatistics.timestamp
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
"elapsed_time",
new TimeValue(System.currentTimeMillis() - perfStats.timestamp, TimeUnit.MILLISECONDS).toString()
);
perfStats.averageDiskStats.toXContent(builder, params);
}
builder.endObject();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.throttling.tracker.AverageDiskStats;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -43,13 +44,17 @@ void removeNode(String nodeId) {
nodeIdToPerfStats.remove(nodeId);
}

public void addNodePerfStatistics(String nodeId, double cpuUtilizationPercent, double memoryUtilizationPercent, long timestamp) {
public void addNodePerfStatistics(String nodeId, double cpuUtilizationPercent, double memoryUtilizationPercent,
AverageDiskStats averageDiskStats,
long timestamp) {
nodeIdToPerfStats.compute(nodeId, (id, nodePerfStats) -> {
if (nodePerfStats == null) {
return new NodePerformanceStatistics(nodeId, cpuUtilizationPercent, memoryUtilizationPercent, timestamp);
return new NodePerformanceStatistics(nodeId, cpuUtilizationPercent, memoryUtilizationPercent,
averageDiskStats, timestamp);
} else {
nodePerfStats.cpuUtilizationPercent = cpuUtilizationPercent;
nodePerfStats.memoryUtilizationPercent = memoryUtilizationPercent;
nodePerfStats.averageDiskStats = averageDiskStats;
nodePerfStats.timestamp = timestamp;
return nodePerfStats;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.throttling.tracker;

import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Locale;
import java.util.concurrent.TimeUnit;

public class AverageDiskStats implements Writeable {
private final double readIopsAverage;
private final double writeIopsAverage;
private final double readKbAverage;
private final double writeKbAverage;
private final double readLatencyAverage;
private final double writeLatencyAverage;
private final double ioUtilizationPercent;

public AverageDiskStats(double readIopsAverage, double writeIopsAverage, double readKbAverage, double writeKbAverage,
double readLatencyAverage, double writeLatencyAverage, double ioUtilizationPercent) {
this.readIopsAverage = readIopsAverage;
this.writeIopsAverage = writeIopsAverage;
this.readKbAverage = readKbAverage;
this.writeKbAverage = writeKbAverage;
this.readLatencyAverage = readLatencyAverage;
this.writeLatencyAverage = writeLatencyAverage;
this.ioUtilizationPercent = ioUtilizationPercent;
}

public AverageDiskStats(StreamInput in) throws IOException {
this.readIopsAverage = in.readDouble();
this.readKbAverage = in.readDouble();
this.readLatencyAverage = in.readDouble();
this.writeIopsAverage = in.readDouble();
this.writeKbAverage = in.readDouble();
this.writeLatencyAverage = in.readDouble();
this.ioUtilizationPercent = in.readDouble();
}

public double getIoUtilizationPercent() {
return ioUtilizationPercent;
}

public double getReadIopsAverage() {
return readIopsAverage;
}

public double getReadKbAverage() {
return readKbAverage;
}

public double getReadLatencyAverage() {
return readLatencyAverage;
}

public double getWriteIopsAverage() {
return writeIopsAverage;
}

public double getWriteKbAverage() {
return writeKbAverage;
}

public double getWriteLatencyAverage() {
return writeLatencyAverage;
}

@Override
public void writeTo(StreamOutput out) throws IOException {

}

public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject("io_stats");
builder.field("read_iops_average", String.format(Locale.ROOT, "%.1f", readIopsAverage ));
builder.field("write_iops_average", String.format(Locale.ROOT, "%.1f", writeIopsAverage));
builder.field("read_throughput_average", String.format(Locale.ROOT, "%.1f", readKbAverage));
builder.field("write_throughput_average", String.format(Locale.ROOT, "%.1f", writeKbAverage));
builder.field("read_latency_average", String.format(Locale.ROOT, "%.8f", readLatencyAverage));
builder.field("write_latency_average", String.format(Locale.ROOT, "%.8f", writeLatencyAverage));
builder.field("io_utilization_percent", String.format(Locale.ROOT, "%.3f", ioUtilizationPercent));
builder.endObject();
return builder;
}
}
Loading

0 comments on commit bf75f28

Please sign in to comment.