Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Adding the FieldDataCacheRca and ShardRequestCacheRca
Browse files Browse the repository at this point in the history
  • Loading branch information
khushbr committed Jul 6, 2020
1 parent b6f1a51 commit 4238148
Show file tree
Hide file tree
Showing 4 changed files with 417 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,20 @@ public class ResourceUtil {
.setResourceEnum(ResourceEnum.SEARCH_THREADPOOL)
.setMetricEnum(MetricEnum.QUEUE_REJECTION).build();

// cache
public static final Resource FIELD_DATA_CACHE_EVICTION = Resource.newBuilder()
.setResourceEnum(ResourceEnum.FIELD_DATA_CACHE)
.setMetricEnum(MetricEnum.CACHE_EVICTION).build();
public static final Resource FIELD_DATA_CACHE_HIT = Resource.newBuilder()
.setResourceEnum(ResourceEnum.FIELD_DATA_CACHE)
.setMetricEnum(MetricEnum.CACHE_HIT).build();
public static final Resource SHARD_REQUEST_CACHE_EVICTION = Resource.newBuilder()
.setResourceEnum(ResourceEnum.SHARD_REQUEST_CACHE)
.setMetricEnum(MetricEnum.CACHE_EVICTION).build();
public static final Resource SHARD_REQUEST_CACHE_HIT = Resource.newBuilder()
.setResourceEnum(ResourceEnum.SHARD_REQUEST_CACHE)
.setMetricEnum(MetricEnum.CACHE_HIT).build();

/**
* Read the resourceType name from the ResourceType object
* @param resource grpc Resource object
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cache;

import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil.FIELD_DATA_CACHE_EVICTION;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.Resource;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Metric;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Rca;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Resources;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.contexts.ResourceContext;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.MetricFlowUnit;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor;
import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jooq.Record;

import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* Field Data Cache RCA is to identify when the cache is unhealthy(thrashing) and otherwise,
* healthy. The dimension we are using for this analysis is cache eviction count.
* Note : For Field Data Cache, Hit and Miss metrics aren't available.
*
* Cache eviction within Elastisearch happens in following scenarios :
* 1. Mutation to Cache (Entry Insertion/Promotion and Manual Invalidation)
* 2. Explicit call to refresh()
*
* The Cache Eviction requires that either the cache weight exceeds OR the entry TTL is expired.
* For Field Data Cache, no expire setting is present, so only in case of cache_weight exceeding the
* max_cache_weight, eviction(removal from Cache Map and LRU linked List, entry updated to EVICTED)
* happens.
*
* The Entry Invalidation is performed manually on cache clear() and index close() invocation, with
* removalReason as INVALIDATED and a force eviction is performed to ensure cleanup.
*
* This RCA reads 'fieldDataCacheEvictions' from upstream metrics and maintains a collector which
* keeps track of the time window period(tp) where we repeatedly see evictions for the last tp duration.
* This RCA is marked as unhealthy if tp we find tp is above the threshold(300 seconds).
*/
public class FieldDataCacheRca extends Rca<ResourceFlowUnit<HotNodeSummary>> {
private static final Logger LOG = LogManager.getLogger(FieldDataCacheRca.class);
private static final int EVICTION_TIME_PERIOD_IN_SECONDS = 300;

private final Metric fieldDataCacheEvictions;
private final int rcaPeriod;
private int counter;
protected Clock clock;
private final CacheEvictionCollector cacheEvictionCollector;

public <M extends Metric> FieldDataCacheRca(final int rcaPeriod, final M fieldDataCacheEvictions) {
super(5);
this.rcaPeriod = rcaPeriod;
this.fieldDataCacheEvictions = fieldDataCacheEvictions;
this.counter = 0;
this.clock = Clock.systemUTC();
this.cacheEvictionCollector = new CacheEvictionCollector(FIELD_DATA_CACHE_EVICTION,
fieldDataCacheEvictions, EVICTION_TIME_PERIOD_IN_SECONDS);
}

@VisibleForTesting
public void setClock(Clock clock) {
this.clock = clock;
}

@Override
public ResourceFlowUnit<HotNodeSummary> operate() {
counter += 1;
long currTimestamp = clock.millis();

cacheEvictionCollector.collect(currTimestamp);
if (counter == rcaPeriod) {
counter = 0;
ResourceContext context;
HotNodeSummary nodeSummary;
ClusterDetailsEventProcessor.NodeDetails currentNode = ClusterDetailsEventProcessor
.getCurrentNodeDetails();
if (cacheEvictionCollector.isUnhealthy(currTimestamp)) {
context = new ResourceContext(Resources.State.UNHEALTHY);
nodeSummary = new HotNodeSummary(currentNode.getId(), currentNode.getHostAddress());
nodeSummary.appendNestedSummary(cacheEvictionCollector.generateSummary(currTimestamp));
}
else {
context = new ResourceContext(Resources.State.HEALTHY);
nodeSummary = null;
}
return new ResourceFlowUnit<>(currTimestamp, context, nodeSummary, !currentNode.getIsMasterNode());
}
else {
return new ResourceFlowUnit<>(currTimestamp);
}
}

@Override
public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) {
final List<FlowUnitMessage> flowUnitMessages =
args.getWireHopper().readFromWire(args.getNode());
List<ResourceFlowUnit<HotNodeSummary>> flowUnitList = new ArrayList<>();
LOG.debug("rca: Executing fromWire: {}", this.getClass().getSimpleName());
for (FlowUnitMessage flowUnitMessage : flowUnitMessages) {
flowUnitList.add(ResourceFlowUnit.buildFlowUnitFromWrapper(flowUnitMessage));
}
setFlowUnits(flowUnitList);
}

/**
* A collector class to collect eviction metrics
*/
private static class CacheEvictionCollector {
private final Resource cache;
private final Metric cacheEvictionMetrics;
private boolean hasEvictions;
private long evictionTimestamp;
private long evictionTimePeriodThreshold;

private CacheEvictionCollector(final Resource cache, final Metric cacheMetrics, final long threshold) {
this.cache = cache;
this.cacheEvictionMetrics = cacheMetrics;
this.hasEvictions = false;
this.evictionTimestamp = 0;
this.evictionTimePeriodThreshold = threshold;
}

public void collect(final long currTimestamp) {
for (MetricFlowUnit flowUnit : cacheEvictionMetrics.getFlowUnits()) {
if (flowUnit.isEmpty()) {
continue;
}

for (Record record : flowUnit.getData()) {
double evictionCount = record.getValue(MetricsDB.SUM, Double.class);
if (!Double.isNaN(evictionCount)) {
if (evictionCount > 0) {
if (!hasEvictions) {
evictionTimestamp = currTimestamp;
}
hasEvictions = true;
}
else {
hasEvictions = false;
}
}
else {
LOG.error("Failed to parse metric from cache {}", cache.toString());
}

}
}
}

public boolean isUnhealthy(final long currTimestamp) {
return hasEvictions && (currTimestamp - evictionTimestamp) >= evictionTimePeriodThreshold;
}

private HotResourceSummary generateSummary(final long currTimestamp) {
HotResourceSummary resourceSummary = null;
if (isUnhealthy(currTimestamp)) {
resourceSummary = new HotResourceSummary(cache,
TimeUnit.MILLISECONDS.toSeconds(evictionTimePeriodThreshold),
TimeUnit.MILLISECONDS.toSeconds(currTimestamp - evictionTimestamp),
0);
}
return resourceSummary;
}
}
}
Loading

0 comments on commit 4238148

Please sign in to comment.