Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Adding final set of changes for Cache RCAs
Browse files Browse the repository at this point in the history
  • Loading branch information
khushbr committed Jul 7, 2020
1 parent 09537c2 commit 63ddab2
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 78 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cache;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Metric;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.MetricFlowUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jooq.Record;

public class CacheUtil {
private static final Logger LOG = LogManager.getLogger(CacheUtil.class);
private static final double CONVERT_BYTES_TO_MEGABYTES = Math.pow(1024, 3);

public static Double getTotalSizeInMB(final Metric sizeMetric) {
double sizeTotalInMB = 0;

// we expect the Metric to have single flow unit
// TODO: Depending on Ruizhen's comment
MetricFlowUnit flowUnit = sizeMetric.getFlowUnits().get(0);
if (flowUnit.isEmpty() || flowUnit.getData() == null) {
return sizeTotalInMB;
}

for (Record record : flowUnit.getData()) {
double size = record.getValue(MetricsDB.MAX, Double.class);
if (Double.isNaN(size)) {
LOG.error("Failed to parse metric in FlowUnit from {}", sizeMetric.getClass().getName());
} else {
sizeTotalInMB += size / CONVERT_BYTES_TO_MEGABYTES;
}
}
return sizeTotalInMB;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cache;

import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil.FIELD_DATA_CACHE_EVICTION;
import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cache.CacheUtil.getTotalSizeInMB;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.Resource;
Expand All @@ -37,13 +38,11 @@
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jooq.Record;
import org.jooq.Result;

/**
* Field Data Cache RCA is to identify when the cache is unhealthy(thrashing) and otherwise, healthy.
* The dimension we are using for this analysis is cache eviction count, cache current weight and
* cache max weight configured.
* The dimension we are using for this analysis is cache eviction count, cache current weight(size) and
* cache max weight(size) configured.
* Note : For Field Data Cache, Hit and Miss metrics aren't available.
*
* <p>Cache eviction within Elasticsearch happens in following scenarios :
Expand All @@ -60,49 +59,44 @@
* <p>Contrarily, the Cache Invalidation is performed manually on cache clear() and index close()
* invocation, with removalReason as INVALIDATED and a force eviction is performed to ensure cleanup.
*
* <p>This RCA reads 'fieldDataCacheEvictions', 'fieldDataCacheWeight' and 'fieldDataCacheMaxWeight'
* <p>This RCA reads 'fieldDataCacheEvictions', 'fieldDataCacheSize' and 'fieldDataCacheMaxSize'
* from upstream metrics and maintains a collector which keeps track of the time window period(tp)
* where we repeatedly see evictions and exceed weight for the last tp duration. This RCA is marked
* as unhealthy if we find tp is above the threshold(300 seconds) and cache weight is exceeding
* max cache weight.
* where we repeatedly see evictions for the last tp duration. This RCA is marked as unhealthy if
* tp is above the threshold(300 seconds) and cache size exceeds the max cache size configured.
*
*/
public class FieldDataCacheRca extends Rca<ResourceFlowUnit<HotNodeSummary>> {
private static final Logger LOG = LogManager.getLogger(FieldDataCacheRca.class);
private static final long EVICTION_THRESHOLD_TIME_PERIOD_IN_MILLISECOND = TimeUnit.SECONDS.toMillis(300);

private final Metric fieldDataCacheEvictions;
private final Metric fieldDataCacheWeight;
private final Metric fieldDataCacheMaxWeight;
private final Metric fieldDataCacheSize;
private final Metric fieldDataCacheMaxSize;
private final int rcaPeriod;
private int counter;
private boolean exceedsWeight;
private boolean exceedsSize;
protected Clock clock;
private final CacheEvictionCollector cacheEvictionCollector;

public <M extends Metric> FieldDataCacheRca(final int rcaPeriod, final M fieldDataCacheEvictions,
final M fieldDataCacheWeight, final M fieldDataCacheMaxWeight) {
final M fieldDataCacheSize, final M fieldDataCacheMaxSize) {
super(5);
this.rcaPeriod = rcaPeriod;
this.fieldDataCacheEvictions = fieldDataCacheEvictions;
this.fieldDataCacheWeight = fieldDataCacheWeight;
this.fieldDataCacheMaxWeight = fieldDataCacheMaxWeight;
this.fieldDataCacheSize = fieldDataCacheSize;
this.fieldDataCacheMaxSize = fieldDataCacheMaxSize;
this.counter = 0;
this.exceedsWeight = Boolean.FALSE;
this.exceedsSize = Boolean.FALSE;
this.clock = Clock.systemUTC();
this.cacheEvictionCollector = new CacheEvictionCollector(FIELD_DATA_CACHE_EVICTION,
fieldDataCacheEvictions, EVICTION_THRESHOLD_TIME_PERIOD_IN_MILLISECOND);
fieldDataCacheEvictions, EVICTION_THRESHOLD_TIME_PERIOD_IN_MILLISECOND);
}

@VisibleForTesting
public void setClock(Clock clock) {
this.clock = clock;
}

private boolean exceedsWeight(double cacheWeight, double cacheMaxWeight) {
return cacheMaxWeight != 0 && cacheMaxWeight != 0 && cacheWeight > cacheMaxWeight;
}

@Override
public ResourceFlowUnit<HotNodeSummary> operate() {
counter += 1;
Expand All @@ -114,10 +108,10 @@ public ResourceFlowUnit<HotNodeSummary> operate() {
HotNodeSummary nodeSummary;

ClusterDetailsEventProcessor.NodeDetails currentNode = ClusterDetailsEventProcessor.getCurrentNodeDetails();
double cacheWeight = getMetricSum(fieldDataCacheWeight.getFlowUnits());
double cacheMaxWeight = getMetricSum(fieldDataCacheMaxWeight.getFlowUnits());
exceedsWeight = exceedsWeight(cacheWeight, cacheMaxWeight);
if (cacheEvictionCollector.isUnhealthy(currTimestamp) && exceedsWeight) {
double cacheSize = getTotalSizeInMB(fieldDataCacheSize);
double cacheMaxSize = getTotalSizeInMB(fieldDataCacheMaxSize);
exceedsSize = cacheMaxSize != 0 && cacheMaxSize != 0 && cacheSize > cacheMaxSize;
if (cacheEvictionCollector.isUnhealthy(currTimestamp) && exceedsSize) {
context = new ResourceContext(Resources.State.UNHEALTHY);
nodeSummary = new HotNodeSummary(currentNode.getId(), currentNode.getHostAddress());
nodeSummary.appendNestedSummary(cacheEvictionCollector.generateSummary(currTimestamp));
Expand All @@ -128,23 +122,14 @@ public ResourceFlowUnit<HotNodeSummary> operate() {
}

counter = 0;
exceedsWeight = Boolean.FALSE;
exceedsSize = Boolean.FALSE;
return new ResourceFlowUnit<>(currTimestamp, context, nodeSummary, !currentNode.getIsMasterNode());
}
else {
return new ResourceFlowUnit<>(currTimestamp);
}
}

private Double getMetricSum(List<MetricFlowUnit> flowUnits) {
double metricSum = 0;
for (MetricFlowUnit flowUnit : flowUnits) {
metricSum += flowUnit.getData().stream().mapToDouble(
record -> record.getValue(MetricsDB.MAX, Double.class)).sum();
}
return metricSum / flowUnits.size();
}

@Override
public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) {
final List<FlowUnitMessage> flowUnitMessages =
Expand Down Expand Up @@ -178,12 +163,11 @@ private CacheEvictionCollector(final Resource cache, final Metric cacheEvictionM

public void collect(final long currTimestamp) {
for (MetricFlowUnit flowUnit : cacheEvictionMetrics.getFlowUnits()) {
if (flowUnit.isEmpty()) {
if (flowUnit.isEmpty() || flowUnit.getData() == null) {
continue;
}

Result<Record> records = flowUnit.getData();
double evictionCount = records.stream().mapToDouble(
double evictionCount = flowUnit.getData().stream().mapToDouble(
record -> record.getValue(MetricsDB.MAX, Double.class)).sum();
if (!Double.isNaN(evictionCount)) {
if (evictionCount > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil.SHARD_REQUEST_CACHE_EVICTION;
import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil.SHARD_REQUEST_CACHE_HIT;
import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cache.CacheUtil.getTotalSizeInMB;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.Resource;
Expand All @@ -43,8 +44,9 @@
import org.jooq.Result;

/**
* Shard Request Cache RCA is to identify when the cache is unhealthy(thrashing) and otherwise,
* healthy. The dimension we are using for this analysis is cache eviction and hit count.
* Shard Request Cache RCA is to identify when the cache is unhealthy(thrashing) and otherwise, healthy.
* The dimension we are using for this analysis is cache eviction, hit count, cache current weight(size)
* and cache max weight(size) configured.
*
* <p>Cache eviction within Elasticsearch happens in following scenarios:
* <ol>
Expand All @@ -54,40 +56,49 @@
*
* <p>The Cache Eviction requires that either the cache weight exceeds OR the entry TTL is expired.
* For Shard Request Cache, TTL is defined via `indices.requests.cache.expire` setting which is never used
* in production clusters and only provided for backward compatibility, so we will ignore time based evictions.
* The weight based evictions(removal from Cache Map and LRU linked List, entry updated to EVICTED) occur when
* the cache_weight exceeds the max_cache_weight, eviction.
* in production clusters and only provided for backward compatibility, thus we ignore time based evictions.
* The weight based evictions(removal from Cache Map and LRU linked List with entry updated to EVICTED) occur
* when the cache_weight exceeds the max_cache_weight, eviction.
*
* <p>The Entry Invalidation is performed manually on cache clear(), index close() invocation and for cached results
* from timed-out requests. A scheduled runnable, running every 10 minutes cleans up all the invalidated entries
* which have not been read/written to since invalidation.
* <p>The Entry Invalidation is performed manually on cache clear(), index close() and for cached results from
* timed-out requests. A scheduled runnable, running every 10 minutes cleans up all the invalidated entries which
* have not been read/written to since invalidation.
*
* <p>The Cache Hit and Eviction metric presence implies cache is undergoing frequent load and eviction or undergoing
* scheduled cleanup for entries which had timed-out during execution. The 300 second time window threshold ensures
* the later is taken care of.
* scheduled cleanup for entries which had timed-out during execution.
*
* <p>This RCA reads 'shardRequestCacheEvictions', 'shardRequestCacheHits', 'shardRequestCacheSize' and
* 'shardRequestCacheMaxSize' from upstream metrics and maintains collectors which keeps track of the time window
* period(tp) where we repeatedly see evictions and hits for the last tp duration. This RCA is marked as unhealthy
* if tp we find tp is above the threshold(300 seconds) and cache size exceeds the max cache size configured.
*
* <p>This RCA reads 'shardRequestCacheEvictions' and 'shardRequestCacheHits' from upstream metrics and maintains
* collectors which keeps track of the time window period(tp) where we repeatedly see evictions and hits for the
* last tp duration. This RCA is marked as unhealthy if tp we find tp is above the threshold(300 seconds).
*/
public class ShardRequestCacheRca extends Rca<ResourceFlowUnit<HotNodeSummary>> {
private static final Logger LOG = LogManager.getLogger(ShardRequestCacheRca.class);
private static final long THRESHOLD_TIME_PERIOD_IN_MILLISECOND = TimeUnit.SECONDS.toMillis(300);

private final Metric shardRequestCacheEvictions;
private final Metric shardRequestCacheHits;
private final Metric shardRequestCacheSize;
private final Metric shardRequestCacheMaxSize;
private final int rcaPeriod;
private int counter;
private boolean exceedsSize;
protected Clock clock;
private final CacheCollector cacheEvictionCollector;
private final CacheCollector cacheHitCollector;

public <M extends Metric> ShardRequestCacheRca(final int rcaPeriod, final M shardRequestCacheEvictions, final M shardRequestCacheHits) {
public <M extends Metric> ShardRequestCacheRca(final int rcaPeriod, final M shardRequestCacheEvictions,
final M shardRequestCacheHits, final M shardRequestCacheSize,
final M shardRequestCacheMaxSize) {
super(5);
this.rcaPeriod = rcaPeriod;
this.shardRequestCacheEvictions = shardRequestCacheEvictions;
this.shardRequestCacheHits = shardRequestCacheHits;
this.shardRequestCacheSize = shardRequestCacheSize;
this.shardRequestCacheMaxSize = shardRequestCacheMaxSize;
this.counter = 0;
this.exceedsSize = Boolean.FALSE;
this.clock = Clock.systemUTC();
this.cacheEvictionCollector = new CacheCollector(SHARD_REQUEST_CACHE_EVICTION,
shardRequestCacheEvictions, THRESHOLD_TIME_PERIOD_IN_MILLISECOND);
Expand All @@ -107,15 +118,20 @@ public ResourceFlowUnit operate() {

cacheEvictionCollector.collect(currTimestamp);
cacheHitCollector.collect(currTimestamp);
if (counter == rcaPeriod) {
counter = 0;
if (counter >= rcaPeriod) {
ResourceContext context;
HotNodeSummary nodeSummary;

ClusterDetailsEventProcessor.NodeDetails currentNode = ClusterDetailsEventProcessor.getCurrentNodeDetails();
double cacheSize = getTotalSizeInMB(shardRequestCacheSize);
double cacheMaxSize = getTotalSizeInMB(shardRequestCacheMaxSize);
exceedsSize = cacheMaxSize != 0 && cacheMaxSize != 0 && cacheSize > cacheMaxSize;

// if eviction and hit counts persists in last 5 minutes, the cache is considered as unhealthy
// if eviction and hit counts persists in last 5 minutes and cache size exceeds the max cache size configured,
// the cache is considered as unhealthy
if (cacheEvictionCollector.isMetricPresentForThresholdTime(currTimestamp)
&& cacheHitCollector.isMetricPresentForThresholdTime(currTimestamp)) {
&& cacheHitCollector.isMetricPresentForThresholdTime(currTimestamp)
&& exceedsSize) {
context = new ResourceContext(Resources.State.UNHEALTHY);
nodeSummary = new HotNodeSummary(currentNode.getId(), currentNode.getHostAddress());
nodeSummary.appendNestedSummary(cacheEvictionCollector.generateSummary(currTimestamp));
Expand All @@ -124,6 +140,8 @@ public ResourceFlowUnit operate() {
nodeSummary = null;
}

counter = 0;
exceedsSize = Boolean.FALSE;
return new ResourceFlowUnit<>(currTimestamp, context, nodeSummary, !currentNode.getIsMasterNode());
}
else {
Expand Down Expand Up @@ -191,7 +209,7 @@ public boolean isMetricPresentForThresholdTime(final long currTimestamp) {
return hasMetric && (currTimestamp - metricTimestamp) >= metricTimePeriodThreshold;
}

public HotResourceSummary generateSummary(final long currTimestamp) {
private HotResourceSummary generateSummary(final long currTimestamp) {
HotResourceSummary resourceSummary = null;
if (isMetricPresentForThresholdTime(currTimestamp)) {
resourceSummary = new HotResourceSummary(cache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void init() throws Exception {
* | .kibana_1 | 0 | 15.0 | 8.0 | 2.0 | 9.0 |
*
*/
private void mockFlowUnits(int cacheEvictionCnt, int cacheWeight, int cacheMaxWeight) {
private void mockFlowUnits(int cacheEvictionCnt, double cacheWeight, double cacheMaxWeight) {
fieldDataCacheEvictions.createTestFlowUnits(columnName,
Arrays.asList("index_1", "0", String.valueOf(cacheEvictionCnt)));
fieldDataCacheWeight.createTestFlowUnits(columnName,
Expand All @@ -81,32 +81,35 @@ public void testFieldDataCache() {
ResourceFlowUnit<HotNodeSummary> flowUnit;
Clock constantClock = Clock.fixed(ofEpochMilli(0), ZoneId.systemDefault());

mockFlowUnits(0, 1, 5);
// TimeWindow 41of size 300sec
mockFlowUnits(0, 1.0, 5.0);
fieldDataCacheRca.setClock(constantClock);
flowUnit = fieldDataCacheRca.operate();
Assert.assertFalse(flowUnit.getResourceContext().isUnhealthy());

mockFlowUnits(0, 1, 5);
mockFlowUnits(0, 1.0, 5.0);
fieldDataCacheRca.setClock(Clock.offset(constantClock, Duration.ofMinutes(3)));
flowUnit = fieldDataCacheRca.operate();
Assert.assertFalse(flowUnit.getResourceContext().isUnhealthy());

mockFlowUnits(1, 1, 5);
mockFlowUnits(1, 1.0, 5.0);
fieldDataCacheRca.setClock(Clock.offset(constantClock, Duration.ofMinutes(4)));
flowUnit = fieldDataCacheRca.operate();
Assert.assertFalse(flowUnit.getResourceContext().isUnhealthy());

mockFlowUnits(1, 7, 5);
// TimeWindow 2 of size 300sec
mockFlowUnits(1, 7.0, 5.0);
fieldDataCacheRca.setClock(Clock.offset(constantClock, Duration.ofMinutes(7)));
flowUnit = fieldDataCacheRca.operate();
Assert.assertFalse(flowUnit.getResourceContext().isUnhealthy());

mockFlowUnits(1, 1, 5);
mockFlowUnits(1, 1.0, 5.0);
fieldDataCacheRca.setClock(Clock.offset(constantClock, Duration.ofMinutes(10)));
flowUnit = fieldDataCacheRca.operate();
Assert.assertFalse(flowUnit.getResourceContext().isUnhealthy());

mockFlowUnits(1, 7, 5);
// TimeWindow 3 of size 300sec
mockFlowUnits(1, 7.0, 5.0);
fieldDataCacheRca.setClock(Clock.offset(constantClock, Duration.ofMinutes(12)));
flowUnit = fieldDataCacheRca.operate();
Assert.assertTrue(flowUnit.getResourceContext().isUnhealthy());
Expand All @@ -119,12 +122,13 @@ public void testFieldDataCache() {
Assert.assertEquals(ResourceUtil.FIELD_DATA_CACHE_EVICTION, resourceSummary.getResource());
Assert.assertEquals(0.01, 6.0, resourceSummary.getValue());

mockFlowUnits(0, 1, 5);
mockFlowUnits(0, 1.0, 5.0);
fieldDataCacheRca.setClock(Clock.offset(constantClock, Duration.ofMinutes(14)));
flowUnit = fieldDataCacheRca.operate();
Assert.assertFalse(flowUnit.getResourceContext().isUnhealthy());

mockFlowUnits(0, 7, 5);
// TimeWindow 4 of size 300sec
mockFlowUnits(0, 7.0, 5.0);
fieldDataCacheRca.setClock(Clock.offset(constantClock, Duration.ofMinutes(17)));
flowUnit = fieldDataCacheRca.operate();
Assert.assertFalse(flowUnit.getResourceContext().isUnhealthy());
Expand Down
Loading

0 comments on commit 63ddab2

Please sign in to comment.