Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Add result indices retention period #174

Merged
merged 3 commits into from
Jun 29, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -396,15 +396,15 @@ public List<Setting<?>> getSettings() {
AnomalyDetectorSettings.REQUEST_TIMEOUT,
AnomalyDetectorSettings.DETECTION_INTERVAL,
AnomalyDetectorSettings.DETECTION_WINDOW_DELAY,
AnomalyDetectorSettings.AD_RESULT_HISTORY_INDEX_MAX_AGE,
AnomalyDetectorSettings.AD_RESULT_HISTORY_ROLLOVER_PERIOD,
AnomalyDetectorSettings.AD_RESULT_HISTORY_MAX_DOCS,
AnomalyDetectorSettings.AD_RESULT_ROLLOVER_PERIOD,
AnomalyDetectorSettings.MAX_RETRY_FOR_UNRESPONSIVE_NODE,
AnomalyDetectorSettings.COOLDOWN_MINUTES,
AnomalyDetectorSettings.BACKOFF_MINUTES,
AnomalyDetectorSettings.BACKOFF_INITIAL_DELAY,
AnomalyDetectorSettings.MAX_RETRY_FOR_BACKOFF
AnomalyDetectorSettings.MAX_RETRY_FOR_BACKOFF,
AnomalyDetectorSettings.AD_RESULT_HISTORY_RETENTION_PERIOD
);
return unmodifiableList(Stream.concat(enabledSetting.stream(), systemSetting.stream()).collect(Collectors.toList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,48 @@

package com.amazon.opendistroforelasticsearch.ad.indices;

import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.AD_RESULT_HISTORY_INDEX_MAX_AGE;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.AD_RESULT_HISTORY_MAX_DOCS;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.AD_RESULT_HISTORY_RETENTION_PERIOD;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.AD_RESULT_HISTORY_ROLLOVER_PERIOD;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.ANOMALY_DETECTORS_INDEX_MAPPING_FILE;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.ANOMALY_DETECTOR_JOBS_INDEX_MAPPING_FILE;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.ANOMALY_RESULTS_INDEX_MAPPING_FILE;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.REQUEST_TIMEOUT;

import java.io.IOException;
import java.net.URL;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.base.Charsets;
import com.google.common.io.Resources;

Expand All @@ -71,10 +81,9 @@ public class AnomalyDetectionIndices implements LocalNodeMasterListener {
private final AdminClient adminClient;
private final ThreadPool threadPool;

private volatile TimeValue requestTimeout;
private volatile TimeValue historyMaxAge;
private volatile TimeValue historyRolloverPeriod;
private volatile Long historyMaxDocs;
private volatile TimeValue historyRetentionPeriod;

private Scheduler.Cancellable scheduledRollover = null;

Expand All @@ -93,17 +102,17 @@ public AnomalyDetectionIndices(Client client, ClusterService clusterService, Thr
this.clusterService = clusterService;
this.threadPool = threadPool;
this.clusterService.addLocalNodeMasterListener(this);
this.requestTimeout = REQUEST_TIMEOUT.get(settings);
this.historyMaxAge = AD_RESULT_HISTORY_INDEX_MAX_AGE.get(settings);
this.historyRolloverPeriod = AD_RESULT_HISTORY_ROLLOVER_PERIOD.get(settings);
this.historyMaxDocs = AD_RESULT_HISTORY_MAX_DOCS.get(settings);
this.historyRetentionPeriod = AD_RESULT_HISTORY_RETENTION_PERIOD.get(settings);
this.clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_RESULT_HISTORY_MAX_DOCS, it -> historyMaxDocs = it);
this.clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_RESULT_HISTORY_INDEX_MAX_AGE, it -> historyMaxAge = it);
this.clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_RESULT_HISTORY_ROLLOVER_PERIOD, it -> {
historyRolloverPeriod = it;
rescheduleRollover();
});
clusterService.getClusterSettings().addSettingsUpdateConsumer(REQUEST_TIMEOUT, it -> requestTimeout = it);
this.clusterService
.getClusterSettings()
.addSettingsUpdateConsumer(AD_RESULT_HISTORY_RETENTION_PERIOD, it -> { historyRetentionPeriod = it; });
}

/**
Expand Down Expand Up @@ -233,9 +242,10 @@ public void initAnomalyDetectorJobIndex(ActionListener<CreateIndexResponse> acti
public void onMaster() {
try {
// try to rollover immediately as we might be restarting the cluster
rolloverHistoryIndex();
rolloverAndDeleteHistoryIndex();
// schedule the next rollover for approx MAX_AGE later
scheduledRollover = threadPool.scheduleWithFixedDelay(() -> rolloverHistoryIndex(), historyRolloverPeriod, executorName());
scheduledRollover = threadPool
.scheduleWithFixedDelay(() -> rolloverAndDeleteHistoryIndex(), historyRolloverPeriod, executorName());
} catch (Exception e) {
// This should be run on cluster startup
logger.error("Error rollover AD result indices. " + "Can't rollover AD result until master node is restarted.", e);
Expand All @@ -259,13 +269,14 @@ private void rescheduleRollover() {
if (scheduledRollover != null) {
scheduledRollover.cancel();
}
scheduledRollover = threadPool.scheduleWithFixedDelay(() -> rolloverHistoryIndex(), historyRolloverPeriod, executorName());
scheduledRollover = threadPool
.scheduleWithFixedDelay(() -> rolloverAndDeleteHistoryIndex(), historyRolloverPeriod, executorName());
}
}

private boolean rolloverHistoryIndex() {
private void rolloverAndDeleteHistoryIndex() {
if (!doesAnomalyResultIndexExist()) {
return false;
return;
}

// We have to pass null for newIndexName in order to get Elastic to increment the index count.
Expand All @@ -275,15 +286,82 @@ private boolean rolloverHistoryIndex() {
adResultMapping = getAnomalyResultMappings();
} catch (IOException e) {
logger.error("Fail to roll over AD result index, as can't get AD result index mapping");
return false;
return;
}
request.getCreateIndexRequest().index(AD_RESULT_HISTORY_INDEX_PATTERN).mapping(MAPPING_TYPE, adResultMapping, XContentType.JSON);
request.addMaxIndexDocsCondition(historyMaxDocs);
request.addMaxIndexAgeCondition(historyMaxAge);
RolloverResponse response = adminClient.indices().rolloverIndex(request).actionGet(requestTimeout);
if (!response.isRolledOver()) {
logger.warn("{} not rolled over. Conditions were: {}", AD_RESULT_HISTORY_WRITE_INDEX_ALIAS, response.getConditionStatus());
adminClient.indices().rolloverIndex(request, ActionListener.wrap(response -> {
if (!response.isRolledOver()) {
logger.warn("{} not rolled over. Conditions were: {}", AD_RESULT_HISTORY_WRITE_INDEX_ALIAS, response.getConditionStatus());
} else {
logger.info("{} rolled over. Conditions were: {}", AD_RESULT_HISTORY_WRITE_INDEX_ALIAS, response.getConditionStatus());
deleteOldHistoryIndices();
}
}, exception -> { logger.error("Fail to roll over result index", exception); }));
}

private void deleteOldHistoryIndices() {
Set<String> candidates = new HashSet<String>();

ClusterStateRequest clusterStateRequest = new ClusterStateRequest()
.clear()
.indices(AnomalyDetectionIndices.ALL_AD_RESULTS_INDEX_PATTERN)
.metadata(true)
.local(true)
.indicesOptions(IndicesOptions.strictExpand());

adminClient.cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
String latestToDelete = null;
long latest = Long.MIN_VALUE;
for (ObjectCursor<IndexMetadata> cursor : clusterStateResponse.getState().metadata().indices().values()) {
IndexMetadata indexMetaData = cursor.value;
long creationTime = indexMetaData.getCreationDate();

if ((Instant.now().toEpochMilli() - creationTime) > historyRetentionPeriod.millis()) {
String indexName = indexMetaData.getIndex().getName();
candidates.add(indexName);
if (latest < creationTime) {
latest = creationTime;
latestToDelete = indexName;
}
}
}

if (candidates.size() > 1) {
// delete all indices except the last one because the last one may contain docs newer than the retention period
candidates.remove(latestToDelete);
Copy link
Contributor

@ylwu-amzn ylwu-amzn Jun 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we just don't add latestToDelete to candidates in the for loop above?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we do that? We don't know latestToDelete before looping through all indices.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we iterate indices reversely and ignore the first index which meet this condition (Instant.now().toEpochMilli() - creationTime) > historyRetentionPeriod.millis() ?

This is a minor suggestion. Ignore it if it's hard/impossible to do so.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indices has no order. So we cannot do that.

String[] toDelete = candidates.toArray(Strings.EMPTY_ARRAY);
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(toDelete);
adminClient.indices().delete(deleteIndexRequest, ActionListener.wrap(deleteIndexResponse -> {
if (!deleteIndexResponse.isAcknowledged()) {
logger
.error(
"Could not delete one or more Anomaly result indices: {}. Retrying one by one.",
Arrays.toString(toDelete)
);
deleteIndexIteration(toDelete);
Copy link
Contributor

@ylwu-amzn ylwu-amzn Jun 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to use two rounds of deletion? Roll over will be triggered periodically, so next run can re-delete the failed indices.
If fail to delete all indices due to some error, will it help we re-delete them immediately? Error may be still there

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, we need to wait for 12 hours before re-deleting. Adding a retry can mitigate some disk/memory issue without waiting for too long. This is best effort and we cannot guarantee this would help. We don't retry endlessly. Just once.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get which index failed to delete? Can we delete these failed indices in one call?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how to. Delete response is of AcknowledgedResponse type that contains only isAcknowledged field.

} else {
logger.error("Succeeded in deleting expired anomaly result indices: {}.", Arrays.toString(toDelete));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.info

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. Fixed.

}
}, exception -> { deleteIndexIteration(toDelete); }));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can log the exception before retrying

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

}
}, exception -> { logger.error("Fail to delete result indices", exception); }));
}

private void deleteIndexIteration(String[] toDelete) {
for (String index : toDelete) {
DeleteIndexRequest singleDeleteRequest = new DeleteIndexRequest(index);
adminClient.indices().delete(singleDeleteRequest, ActionListener.wrap(singleDeleteResponse -> {
if (!singleDeleteResponse.isAcknowledged()) {
logger.error("Retrying deleting {} does not succeed.", index);
}
}, exception -> {
if (exception instanceof IndexNotFoundException) {
logger.info("{} was already deleted.", index);
} else {
logger.error(new ParameterizedMessage("Retrying deleting {} does not succeed.", index), exception);
}
}));
}
return response.isRolledOver();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,6 @@ private AnomalyDetectorSettings() {}
Setting.Property.Dynamic
);

public static final Setting<TimeValue> AD_RESULT_HISTORY_INDEX_MAX_AGE = Setting
.positiveTimeSetting(
"opendistro.anomaly_detection.ad_result_history_max_age",
TimeValue.timeValueHours(24 * 30),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<Long> AD_RESULT_HISTORY_MAX_DOCS = Setting
.longSetting(
"opendistro.anomaly_detection.ad_result_history_max_docs",
Expand All @@ -94,6 +86,14 @@ private AnomalyDetectorSettings() {}
Setting.Property.Dynamic
);

public static final Setting<TimeValue> AD_RESULT_HISTORY_RETENTION_PERIOD = Setting
.positiveTimeSetting(
"opendistro.anomaly_detection.ad_result_history_retention_period",
TimeValue.timeValueDays(90),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<Integer> MAX_RETRY_FOR_UNRESPONSIVE_NODE = Setting
.intSetting(
"opendistro.anomaly_detection.max_retry_for_unresponsive_node",
Expand Down