diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java index c22e7da8..d9c0dc13 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java @@ -396,7 +396,6 @@ public List> getSettings() { AnomalyDetectorSettings.REQUEST_TIMEOUT, AnomalyDetectorSettings.DETECTION_INTERVAL, AnomalyDetectorSettings.DETECTION_WINDOW_DELAY, - AnomalyDetectorSettings.AD_RESULT_HISTORY_INDEX_MAX_AGE, AnomalyDetectorSettings.AD_RESULT_HISTORY_ROLLOVER_PERIOD, AnomalyDetectorSettings.AD_RESULT_HISTORY_MAX_DOCS, AnomalyDetectorSettings.AD_RESULT_ROLLOVER_PERIOD, @@ -404,7 +403,8 @@ public List> getSettings() { AnomalyDetectorSettings.COOLDOWN_MINUTES, AnomalyDetectorSettings.BACKOFF_MINUTES, AnomalyDetectorSettings.BACKOFF_INITIAL_DELAY, - AnomalyDetectorSettings.MAX_RETRY_FOR_BACKOFF + AnomalyDetectorSettings.MAX_RETRY_FOR_BACKOFF, + AnomalyDetectorSettings.AD_RESULT_HISTORY_RETENTION_PERIOD ); return unmodifiableList(Stream.concat(enabledSetting.stream(), systemSetting.stream()).collect(Collectors.toList())); } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndices.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndices.java index ff80bd47..61e13825 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndices.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndices.java @@ -15,38 +15,48 @@ package com.amazon.opendistroforelasticsearch.ad.indices; -import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.AD_RESULT_HISTORY_INDEX_MAX_AGE; import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.AD_RESULT_HISTORY_MAX_DOCS; +import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.AD_RESULT_HISTORY_RETENTION_PERIOD; import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.AD_RESULT_HISTORY_ROLLOVER_PERIOD; import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.ANOMALY_DETECTORS_INDEX_MAPPING_FILE; import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.ANOMALY_DETECTOR_JOBS_INDEX_MAPPING_FILE; import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.ANOMALY_RESULTS_INDEX_MAPPING_FILE; -import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.REQUEST_TIMEOUT; import java.io.IOException; import java.net.URL; +import java.time.Instant; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; -import org.elasticsearch.action.admin.indices.rollover.RolloverResponse; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.LocalNodeMasterListener; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult; +import com.carrotsearch.hppc.cursors.ObjectCursor; import com.google.common.base.Charsets; import com.google.common.io.Resources; @@ -65,16 +75,15 @@ public class AnomalyDetectionIndices implements LocalNodeMasterListener { public static final String ALL_AD_RESULTS_INDEX_PATTERN = ".opendistro-anomaly-results*"; // Elastic mapping type - private static final String MAPPING_TYPE = "_doc"; + static final String MAPPING_TYPE = "_doc"; private ClusterService clusterService; private final AdminClient adminClient; private final ThreadPool threadPool; - private volatile TimeValue requestTimeout; - private volatile TimeValue historyMaxAge; private volatile TimeValue historyRolloverPeriod; private volatile Long historyMaxDocs; + private volatile TimeValue historyRetentionPeriod; private Scheduler.Cancellable scheduledRollover = null; @@ -93,17 +102,17 @@ public AnomalyDetectionIndices(Client client, ClusterService clusterService, Thr this.clusterService = clusterService; this.threadPool = threadPool; this.clusterService.addLocalNodeMasterListener(this); - this.requestTimeout = REQUEST_TIMEOUT.get(settings); - this.historyMaxAge = AD_RESULT_HISTORY_INDEX_MAX_AGE.get(settings); this.historyRolloverPeriod = AD_RESULT_HISTORY_ROLLOVER_PERIOD.get(settings); this.historyMaxDocs = AD_RESULT_HISTORY_MAX_DOCS.get(settings); + this.historyRetentionPeriod = AD_RESULT_HISTORY_RETENTION_PERIOD.get(settings); this.clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_RESULT_HISTORY_MAX_DOCS, it -> historyMaxDocs = it); - this.clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_RESULT_HISTORY_INDEX_MAX_AGE, it -> historyMaxAge = it); this.clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_RESULT_HISTORY_ROLLOVER_PERIOD, it -> { historyRolloverPeriod = it; rescheduleRollover(); }); - clusterService.getClusterSettings().addSettingsUpdateConsumer(REQUEST_TIMEOUT, it -> requestTimeout = it); + this.clusterService + .getClusterSettings() + .addSettingsUpdateConsumer(AD_RESULT_HISTORY_RETENTION_PERIOD, it -> { historyRetentionPeriod = it; }); } /** @@ -233,9 +242,10 @@ public void initAnomalyDetectorJobIndex(ActionListener acti public void onMaster() { try { // try to rollover immediately as we might be restarting the cluster - rolloverHistoryIndex(); + rolloverAndDeleteHistoryIndex(); // schedule the next rollover for approx MAX_AGE later - scheduledRollover = threadPool.scheduleWithFixedDelay(() -> rolloverHistoryIndex(), historyRolloverPeriod, executorName()); + scheduledRollover = threadPool + .scheduleWithFixedDelay(() -> rolloverAndDeleteHistoryIndex(), historyRolloverPeriod, executorName()); } catch (Exception e) { // This should be run on cluster startup logger.error("Error rollover AD result indices. " + "Can't rollover AD result until master node is restarted.", e); @@ -259,13 +269,14 @@ private void rescheduleRollover() { if (scheduledRollover != null) { scheduledRollover.cancel(); } - scheduledRollover = threadPool.scheduleWithFixedDelay(() -> rolloverHistoryIndex(), historyRolloverPeriod, executorName()); + scheduledRollover = threadPool + .scheduleWithFixedDelay(() -> rolloverAndDeleteHistoryIndex(), historyRolloverPeriod, executorName()); } } - private boolean rolloverHistoryIndex() { + void rolloverAndDeleteHistoryIndex() { if (!doesAnomalyResultIndexExist()) { - return false; + return; } // We have to pass null for newIndexName in order to get Elastic to increment the index count. @@ -275,15 +286,85 @@ private boolean rolloverHistoryIndex() { adResultMapping = getAnomalyResultMappings(); } catch (IOException e) { logger.error("Fail to roll over AD result index, as can't get AD result index mapping"); - return false; + return; } request.getCreateIndexRequest().index(AD_RESULT_HISTORY_INDEX_PATTERN).mapping(MAPPING_TYPE, adResultMapping, XContentType.JSON); request.addMaxIndexDocsCondition(historyMaxDocs); - request.addMaxIndexAgeCondition(historyMaxAge); - RolloverResponse response = adminClient.indices().rolloverIndex(request).actionGet(requestTimeout); - if (!response.isRolledOver()) { - logger.warn("{} not rolled over. Conditions were: {}", AD_RESULT_HISTORY_WRITE_INDEX_ALIAS, response.getConditionStatus()); + adminClient.indices().rolloverIndex(request, ActionListener.wrap(response -> { + if (!response.isRolledOver()) { + logger.warn("{} not rolled over. Conditions were: {}", AD_RESULT_HISTORY_WRITE_INDEX_ALIAS, response.getConditionStatus()); + } else { + logger.info("{} rolled over. Conditions were: {}", AD_RESULT_HISTORY_WRITE_INDEX_ALIAS, response.getConditionStatus()); + deleteOldHistoryIndices(); + } + }, exception -> { logger.error("Fail to roll over result index", exception); })); + } + + void deleteOldHistoryIndices() { + Set candidates = new HashSet(); + + ClusterStateRequest clusterStateRequest = new ClusterStateRequest() + .clear() + .indices(AnomalyDetectionIndices.ALL_AD_RESULTS_INDEX_PATTERN) + .metadata(true) + .local(true) + .indicesOptions(IndicesOptions.strictExpand()); + + adminClient.cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> { + String latestToDelete = null; + long latest = Long.MIN_VALUE; + for (ObjectCursor cursor : clusterStateResponse.getState().metadata().indices().values()) { + IndexMetadata indexMetaData = cursor.value; + long creationTime = indexMetaData.getCreationDate(); + + if ((Instant.now().toEpochMilli() - creationTime) > historyRetentionPeriod.millis()) { + String indexName = indexMetaData.getIndex().getName(); + candidates.add(indexName); + if (latest < creationTime) { + latest = creationTime; + latestToDelete = indexName; + } + } + } + + if (candidates.size() > 1) { + // delete all indices except the last one because the last one may contain docs newer than the retention period + candidates.remove(latestToDelete); + String[] toDelete = candidates.toArray(Strings.EMPTY_ARRAY); + DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(toDelete); + adminClient.indices().delete(deleteIndexRequest, ActionListener.wrap(deleteIndexResponse -> { + if (!deleteIndexResponse.isAcknowledged()) { + logger + .error( + "Could not delete one or more Anomaly result indices: {}. Retrying one by one.", + Arrays.toString(toDelete) + ); + deleteIndexIteration(toDelete); + } else { + logger.info("Succeeded in deleting expired anomaly result indices: {}.", Arrays.toString(toDelete)); + } + }, exception -> { + logger.error("Failed to delete expired anomaly result indices: {}.", Arrays.toString(toDelete)); + deleteIndexIteration(toDelete); + })); + } + }, exception -> { logger.error("Fail to delete result indices", exception); })); + } + + private void deleteIndexIteration(String[] toDelete) { + for (String index : toDelete) { + DeleteIndexRequest singleDeleteRequest = new DeleteIndexRequest(index); + adminClient.indices().delete(singleDeleteRequest, ActionListener.wrap(singleDeleteResponse -> { + if (!singleDeleteResponse.isAcknowledged()) { + logger.error("Retrying deleting {} does not succeed.", index); + } + }, exception -> { + if (exception instanceof IndexNotFoundException) { + logger.info("{} was already deleted.", index); + } else { + logger.error(new ParameterizedMessage("Retrying deleting {} does not succeed.", index), exception); + } + })); } - return response.isRolledOver(); } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/settings/AnomalyDetectorSettings.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/settings/AnomalyDetectorSettings.java index 59ba31dd..985ea54b 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/settings/AnomalyDetectorSettings.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/settings/AnomalyDetectorSettings.java @@ -74,14 +74,6 @@ private AnomalyDetectorSettings() {} Setting.Property.Dynamic ); - public static final Setting AD_RESULT_HISTORY_INDEX_MAX_AGE = Setting - .positiveTimeSetting( - "opendistro.anomaly_detection.ad_result_history_max_age", - TimeValue.timeValueHours(24 * 30), - Setting.Property.NodeScope, - Setting.Property.Dynamic - ); - public static final Setting AD_RESULT_HISTORY_MAX_DOCS = Setting .longSetting( "opendistro.anomaly_detection.ad_result_history_max_docs", @@ -94,6 +86,14 @@ private AnomalyDetectorSettings() {} Setting.Property.Dynamic ); + public static final Setting AD_RESULT_HISTORY_RETENTION_PERIOD = Setting + .positiveTimeSetting( + "opendistro.anomaly_detection.ad_result_history_retention_period", + TimeValue.timeValueDays(90), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + public static final Setting MAX_RETRY_FOR_UNRESPONSIVE_NODE = Setting .intSetting( "opendistro.anomaly_detection.max_retry_for_unresponsive_node", diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/indices/RolloverTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/indices/RolloverTests.java new file mode 100644 index 00000000..a08e4853 --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/indices/RolloverTests.java @@ -0,0 +1,243 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.indices; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.rollover.Condition; +import org.elasticsearch.action.admin.indices.rollover.MaxDocsCondition; +import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; +import org.elasticsearch.action.admin.indices.rollover.RolloverResponse; +import org.elasticsearch.client.AdminClient; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.ClusterAdminClient; +import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasMetadata; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; + +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult; +import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings; + +public class RolloverTests extends ESTestCase { + private AnomalyDetectionIndices adIndices; + private IndicesAdminClient indicesClient; + private ClusterAdminClient clusterAdminClient; + private ClusterName clusterName; + private ClusterState clusterState; + private ClusterService clusterService; + + @Override + public void setUp() throws Exception { + super.setUp(); + Client client = mock(Client.class); + indicesClient = mock(IndicesAdminClient.class); + AdminClient adminClient = mock(AdminClient.class); + clusterService = mock(ClusterService.class); + ClusterSettings clusterSettings = new ClusterSettings( + Settings.EMPTY, + Collections + .unmodifiableSet( + new HashSet<>( + Arrays + .asList( + AnomalyDetectorSettings.AD_RESULT_HISTORY_MAX_DOCS, + AnomalyDetectorSettings.AD_RESULT_HISTORY_ROLLOVER_PERIOD, + AnomalyDetectorSettings.AD_RESULT_HISTORY_RETENTION_PERIOD + ) + ) + ) + ); + + clusterName = new ClusterName("test"); + + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + + ThreadPool threadPool = mock(ThreadPool.class); + Settings settings = Settings.EMPTY; + when(client.admin()).thenReturn(adminClient); + when(adminClient.indices()).thenReturn(indicesClient); + + adIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings); + + clusterAdminClient = mock(ClusterAdminClient.class); + when(adminClient.cluster()).thenReturn(clusterAdminClient); + + doAnswer(invocation -> { + ClusterStateRequest clusterStateRequest = invocation.getArgument(0); + assertEquals(AnomalyDetectionIndices.ALL_AD_RESULTS_INDEX_PATTERN, clusterStateRequest.indices()[0]); + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocation.getArgument(1); + listener.onResponse(new ClusterStateResponse(clusterName, clusterState, true)); + return null; + }).when(clusterAdminClient).state(any(), any()); + } + + private IndexMetadata indexMeta(String name, long creationDate, String... aliases) { + IndexMetadata.Builder builder = IndexMetadata + .builder(name) + .settings( + Settings + .builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put("index.version.created", Version.CURRENT.id) + ); + builder.creationDate(creationDate); + for (String alias : aliases) { + builder.putAlias(AliasMetadata.builder(alias).build()); + } + return builder.build(); + } + + private void assertRolloverRequest(RolloverRequest request) { + assertEquals(AnomalyResult.ANOMALY_RESULT_INDEX, request.indices()[0]); + + Map> conditions = request.getConditions(); + assertEquals(1, conditions.size()); + assertEquals(new MaxDocsCondition(9000000L), conditions.get(MaxDocsCondition.NAME)); + + CreateIndexRequest createIndexRequest = request.getCreateIndexRequest(); + assertEquals(AnomalyDetectionIndices.AD_RESULT_HISTORY_INDEX_PATTERN, createIndexRequest.index()); + assertTrue(createIndexRequest.mappings().get(AnomalyDetectionIndices.MAPPING_TYPE).contains("data_start_time")); + } + + public void testNotRolledOver() { + doAnswer(invocation -> { + RolloverRequest request = invocation.getArgument(0); + assertRolloverRequest(request); + + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocation.getArgument(1); + + listener.onResponse(new RolloverResponse(null, null, Collections.emptyMap(), request.isDryRun(), false, true, true)); + return null; + }).when(indicesClient).rolloverIndex(any(), any()); + + Metadata.Builder metaBuilder = Metadata + .builder() + .put(indexMeta(".opendistro-anomaly-results-history-2020.06.24-000003", 1L, AnomalyResult.ANOMALY_RESULT_INDEX), true); + clusterState = ClusterState.builder(clusterName).metadata(metaBuilder.build()).build(); + when(clusterService.state()).thenReturn(clusterState); + + adIndices.rolloverAndDeleteHistoryIndex(); + verify(clusterAdminClient, never()).state(any(), any()); + verify(indicesClient, times(1)).rolloverIndex(any(), any()); + } + + public void testRolledOverButNotDeleted() { + doAnswer(invocation -> { + RolloverRequest request = invocation.getArgument(0); + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocation.getArgument(1); + + assertEquals(AnomalyResult.ANOMALY_RESULT_INDEX, request.indices()[0]); + + Map> conditions = request.getConditions(); + assertEquals(1, conditions.size()); + assertEquals(new MaxDocsCondition(9000000L), conditions.get(MaxDocsCondition.NAME)); + + CreateIndexRequest createIndexRequest = request.getCreateIndexRequest(); + assertEquals(AnomalyDetectionIndices.AD_RESULT_HISTORY_INDEX_PATTERN, createIndexRequest.index()); + assertTrue(createIndexRequest.mappings().get(AnomalyDetectionIndices.MAPPING_TYPE).contains("data_start_time")); + listener.onResponse(new RolloverResponse(null, null, Collections.emptyMap(), request.isDryRun(), true, true, true)); + return null; + }).when(indicesClient).rolloverIndex(any(), any()); + + Metadata.Builder metaBuilder = Metadata + .builder() + .put(indexMeta(".opendistro-anomaly-results-history-2020.06.24-000003", 1L, AnomalyResult.ANOMALY_RESULT_INDEX), true) + .put( + indexMeta( + ".opendistro-anomaly-results-history-2020.06.24-000004", + Instant.now().toEpochMilli(), + AnomalyResult.ANOMALY_RESULT_INDEX + ), + true + ); + clusterState = ClusterState.builder(clusterName).metadata(metaBuilder.build()).build(); + when(clusterService.state()).thenReturn(clusterState); + + adIndices.rolloverAndDeleteHistoryIndex(); + verify(clusterAdminClient, times(1)).state(any(), any()); + verify(indicesClient, times(1)).rolloverIndex(any(), any()); + verify(indicesClient, never()).delete(any(), any()); + } + + public void testRolledOverDeleted() { + doAnswer(invocation -> { + RolloverRequest request = invocation.getArgument(0); + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocation.getArgument(1); + + assertEquals(AnomalyResult.ANOMALY_RESULT_INDEX, request.indices()[0]); + + Map> conditions = request.getConditions(); + assertEquals(1, conditions.size()); + assertEquals(new MaxDocsCondition(9000000L), conditions.get(MaxDocsCondition.NAME)); + + CreateIndexRequest createIndexRequest = request.getCreateIndexRequest(); + assertEquals(AnomalyDetectionIndices.AD_RESULT_HISTORY_INDEX_PATTERN, createIndexRequest.index()); + assertTrue(createIndexRequest.mappings().get(AnomalyDetectionIndices.MAPPING_TYPE).contains("data_start_time")); + listener.onResponse(new RolloverResponse(null, null, Collections.emptyMap(), request.isDryRun(), true, true, true)); + return null; + }).when(indicesClient).rolloverIndex(any(), any()); + + Metadata.Builder metaBuilder = Metadata + .builder() + .put(indexMeta(".opendistro-anomaly-results-history-2020.06.24-000002", 1L, AnomalyResult.ANOMALY_RESULT_INDEX), true) + .put(indexMeta(".opendistro-anomaly-results-history-2020.06.24-000003", 2L, AnomalyResult.ANOMALY_RESULT_INDEX), true) + .put( + indexMeta( + ".opendistro-anomaly-results-history-2020.06.24-000004", + Instant.now().toEpochMilli(), + AnomalyResult.ANOMALY_RESULT_INDEX + ), + true + ); + clusterState = ClusterState.builder(clusterName).metadata(metaBuilder.build()).build(); + when(clusterService.state()).thenReturn(clusterState); + + adIndices.rolloverAndDeleteHistoryIndex(); + verify(clusterAdminClient, times(1)).state(any(), any()); + verify(indicesClient, times(1)).rolloverIndex(any(), any()); + verify(indicesClient, times(1)).delete(any(), any()); + } +}