Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

FieldData and Shard Request Cache RCA #265

Merged
merged 3 commits into from
Jul 23, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ public class ResourceUtil {
.setResourceEnum(ResourceEnum.SEARCH_THREADPOOL)
.setMetricEnum(MetricEnum.QUEUE_REJECTION).build();

// cache
public static final Resource FIELD_DATA_CACHE_EVICTION = Resource.newBuilder()
.setResourceEnum(ResourceEnum.FIELD_DATA_CACHE)
.setMetricEnum(MetricEnum.CACHE_EVICTION).build();
public static final Resource SHARD_REQUEST_CACHE_EVICTION = Resource.newBuilder()
.setResourceEnum(ResourceEnum.SHARD_REQUEST_CACHE)
.setMetricEnum(MetricEnum.CACHE_EVICTION).build();
public static final Resource SHARD_REQUEST_CACHE_HIT = Resource.newBuilder()
.setResourceEnum(ResourceEnum.SHARD_REQUEST_CACHE)
.setMetricEnum(MetricEnum.CACHE_HIT).build();

/**
* Read the resourceType name from the ResourceType object
* @param resource grpc Resource object
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
khushbr marked this conversation as resolved.
Show resolved Hide resolved
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cache;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Metric;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.MetricFlowUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jooq.Record;

public class CacheUtil {
private static final Logger LOG = LogManager.getLogger(CacheUtil.class);
private static final double CONVERT_BYTES_TO_MEGABYTES = Math.pow(1024, 3);

public static Double getTotalSizeInMB(final Metric sizeMetric) {
double sizeTotalInMB = 0;

// we expect the Metric to have single flow unit since it is consumed locally
MetricFlowUnit flowUnit = sizeMetric.getFlowUnits().get(0);
khushbr marked this conversation as resolved.
Show resolved Hide resolved
if (flowUnit.isEmpty() || flowUnit.getData() == null) {
return sizeTotalInMB;
}

for (Record record : flowUnit.getData()) {
double size = record.getValue(MetricsDB.MAX, Double.class);
khushbr marked this conversation as resolved.
Show resolved Hide resolved
if (Double.isNaN(size)) {
LOG.error("Failed to parse metric in FlowUnit from {}", sizeMetric.getClass().getName());
} else {
sizeTotalInMB += size / CONVERT_BYTES_TO_MEGABYTES;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Floating point comparisons have rounding errors which can affect the places where this value is used. Should we just convert it to KB and use long values such that we tolerate a rounding error of 1kb?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you Vigya for the suggestion, I agree we can go with KB and tolerate the 1KB rounding error. I have updated the code for the same.

}
}
return sizeTotalInMB;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cache;

import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil.FIELD_DATA_CACHE_EVICTION;
import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cache.CacheUtil.getTotalSizeInMB;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.Resource;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Metric;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Rca;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Resources;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.contexts.ResourceContext;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.MetricFlowUnit;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor;
import com.google.common.annotations.VisibleForTesting;
import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* Field Data Cache RCA is to identify when the cache is unhealthy(thrashing) and otherwise, healthy.
* The dimension we are using for this analysis is cache eviction count, cache current weight(size) and
* cache max weight(size) configured.
* Note : For Field Data Cache, Hit and Miss metrics aren't available.
*
* <p>Cache eviction within Elasticsearch happens in following scenarios :
* <ol>
* <li>Mutation to Cache (Entry Insertion/Promotion and Manual Invalidation)
* <li>Explicit call to refresh()
* </ol>
*
* <p>The Cache Eviction requires that either the cache weight exceeds OR the entry TTL is expired.
khushbr marked this conversation as resolved.
Show resolved Hide resolved
* For Field Data Cache, no expire setting is present, so only in case of cache_weight exceeding the
* max_cache_weight, eviction(removal from Cache Map and LRU linked List, entry updated to EVICTED)
* happens.
*
* <p>Contrarily, the Cache Invalidation is performed manually on cache clear() and index close()
* invocation, with removalReason as INVALIDATED and a force eviction is performed to ensure cleanup.
*
* <p>This RCA reads 'fieldDataCacheEvictions', 'fieldDataCacheSize' and 'fieldDataCacheMaxSize'
* from upstream metrics and maintains a collector which keeps track of the time window period(tp)
* where we repeatedly see evictions for the last tp duration. This RCA is marked as unhealthy if
* tp is above the threshold(300 seconds) and cache size exceeds the max cache size configured.
*
*/
public class FieldDataCacheRca extends Rca<ResourceFlowUnit<HotNodeSummary>> {
private static final Logger LOG = LogManager.getLogger(FieldDataCacheRca.class);
private static final long EVICTION_THRESHOLD_TIME_PERIOD_IN_MILLISECOND = TimeUnit.SECONDS.toMillis(300);

private final Metric fieldDataCacheEvictions;
private final Metric fieldDataCacheSize;
private final Metric fieldDataCacheMaxSize;
private final int rcaPeriod;
private int counter;
private boolean exceedsSize;
protected Clock clock;
private final CacheEvictionCollector cacheEvictionCollector;

public <M extends Metric> FieldDataCacheRca(final int rcaPeriod, final M fieldDataCacheEvictions,
final M fieldDataCacheSize, final M fieldDataCacheMaxSize) {
khushbr marked this conversation as resolved.
Show resolved Hide resolved
super(5);
this.rcaPeriod = rcaPeriod;
this.fieldDataCacheEvictions = fieldDataCacheEvictions;
this.fieldDataCacheSize = fieldDataCacheSize;
this.fieldDataCacheMaxSize = fieldDataCacheMaxSize;
this.counter = 0;
this.exceedsSize = Boolean.FALSE;
this.clock = Clock.systemUTC();
this.cacheEvictionCollector = new CacheEvictionCollector(FIELD_DATA_CACHE_EVICTION,
fieldDataCacheEvictions, EVICTION_THRESHOLD_TIME_PERIOD_IN_MILLISECOND);
}

@VisibleForTesting
public void setClock(Clock clock) {
this.clock = clock;
}

@Override
public ResourceFlowUnit<HotNodeSummary> operate() {
counter += 1;
long currTimestamp = clock.millis();

cacheEvictionCollector.collect(currTimestamp);
if (counter >= rcaPeriod) {
ResourceContext context;
HotNodeSummary nodeSummary;

ClusterDetailsEventProcessor.NodeDetails currentNode = ClusterDetailsEventProcessor.getCurrentNodeDetails();
double cacheSize = getTotalSizeInMB(fieldDataCacheSize);
double cacheMaxSize = getTotalSizeInMB(fieldDataCacheMaxSize);
exceedsSize = cacheMaxSize != 0 && cacheMaxSize != 0 && cacheSize > cacheMaxSize;
khushbr marked this conversation as resolved.
Show resolved Hide resolved
khushbr marked this conversation as resolved.
Show resolved Hide resolved
if (cacheEvictionCollector.isUnhealthy(currTimestamp) && exceedsSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the exceedsSize check for unhealthy? It's not clear to me why.

Copy link
Contributor

@rguo-aws rguo-aws Jul 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, and I don't think we need exceedsSize here. exceedsSize will immediately jump to non-zero when a scaling down action is applied so it is not a good indicator for cache healthiness

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Field Data Cache Eviction metric is present for cases other than Size based eviction. For example, in case when the cache is invalidated via clear() API. We can go with the proposed cacheMaxSize - cacheSize > threshold instead of exceedsSize

For the "exceedsSize will immediately jump to non-zero when a scaling down action is applied so it is not a good indicator for cache healthiness" case, we will anyway have to handle it separately in decider since we will see evictions post the cache scale down.

context = new ResourceContext(Resources.State.UNHEALTHY);
nodeSummary = new HotNodeSummary(currentNode.getId(), currentNode.getHostAddress());
nodeSummary.appendNestedSummary(cacheEvictionCollector.generateSummary(currTimestamp));
}
else {
context = new ResourceContext(Resources.State.HEALTHY);
nodeSummary = null;
khushbr marked this conversation as resolved.
Show resolved Hide resolved
}

counter = 0;
exceedsSize = Boolean.FALSE;
return new ResourceFlowUnit<>(currTimestamp, context, nodeSummary, !currentNode.getIsMasterNode());
}
else {
return new ResourceFlowUnit<>(currTimestamp);
}
}

@Override
public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) {
khushbr marked this conversation as resolved.
Show resolved Hide resolved
final List<FlowUnitMessage> flowUnitMessages =
args.getWireHopper().readFromWire(args.getNode());
List<ResourceFlowUnit<HotNodeSummary>> flowUnitList = new ArrayList<>();
LOG.debug("rca: Executing fromWire: {}", this.getClass().getSimpleName());
for (FlowUnitMessage flowUnitMessage : flowUnitMessages) {
flowUnitList.add(ResourceFlowUnit.buildFlowUnitFromWrapper(flowUnitMessage));
}
setFlowUnits(flowUnitList);
}

/**
* A collector class to collect eviction metrics
*/
private static class CacheEvictionCollector {
private final Resource cache;
private final Metric cacheEvictionMetrics;
private boolean hasEvictions;
private long evictionTimestamp;
private long evictionTimePeriodThreshold;

private CacheEvictionCollector(final Resource cache, final Metric cacheEvictionMetrics,
final long threshold) {
this.cache = cache;
this.cacheEvictionMetrics = cacheEvictionMetrics;
this.hasEvictions = false;
this.evictionTimestamp = 0;
this.evictionTimePeriodThreshold = threshold;
}

public void collect(final long currTimestamp) {
for (MetricFlowUnit flowUnit : cacheEvictionMetrics.getFlowUnits()) {
if (flowUnit.isEmpty() || flowUnit.getData() == null) {
continue;
}

double evictionCount = flowUnit.getData().stream().mapToDouble(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flowUnit.getData().stream() -- Can each flowUnit here have multiple data points?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is how the flowUnit.getData() looks like.

+------------+-------+-----+-----+-----+-----+
|IndexName   |ShardID|  sum|  avg|  min|  max|
+------------+-------+-----+-----+-----+-----+
|.kibana_1   |0      |  0.0|  0.0|  0.0|  0.0|
|osmgeoshapes|1      |  0.0|  0.0|  0.0|  0.0|
|osmgeoshapes|3      |  0.0|  0.0|  0.0|  0.0|
|sonested    |0      |243.0|243.0|243.0|243.0|
+------------+-------+-----+-----+-----+-----+

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, so we need to sum the stream because this is a shard level metric?

record -> record.getValue(MetricsDB.MAX, Double.class)).sum();
if (!Double.isNaN(evictionCount)) {
if (evictionCount > 0) {
if (!hasEvictions) {
evictionTimestamp = currTimestamp;
}
hasEvictions = true;
}
else {
hasEvictions = false;
}
}
else {
LOG.error("Failed to parse metric from cache {}", cache.toString());
}
}
}

public boolean isUnhealthy(final long currTimestamp) {
return hasEvictions && (currTimestamp - evictionTimestamp) >= evictionTimePeriodThreshold;
}

private HotResourceSummary generateSummary(final long currTimestamp) {
HotResourceSummary resourceSummary = null;
if (isUnhealthy(currTimestamp)) {
resourceSummary = new HotResourceSummary(cache,
TimeUnit.MILLISECONDS.toSeconds(evictionTimePeriodThreshold),
TimeUnit.MILLISECONDS.toSeconds(currTimestamp - evictionTimestamp),
0);
}
return resourceSummary;
}
}
}
Loading