From de081f5d165f418fcf33bbc25843a8db58524c0d Mon Sep 17 00:00:00 2001 From: Drew Baugher <46505179+dbbaughe@users.noreply.github.com> Date: Tue, 7 Apr 2020 11:58:39 -0700 Subject: [PATCH] Backports bug fixes (#188) * Adds logs, fix for index creation date -1L, nullable checks (#170) * Index creation_date of -1L should evaluate to false, adds extra logs * Adds kotlin compiler check and fixes nullable values * Adds log * Delete and close failing during snapshot in progress (#172) * Fixes AttemptDeleteStep failing from a SnapshotInProgressException * Fixes close action failing on snapshot in progress exception, fixes imports * Fixes styling --- build.gradle | 8 ++ .../step/close/AttemptCloseStep.kt | 15 ++- .../step/delete/AttemptDeleteStep.kt | 5 + .../step/forcemerge/WaitForForceMergeStep.kt | 10 +- .../step/rollover/AttemptRolloverStep.kt | 16 ++- .../step/transition/AttemptTransitionStep.kt | 37 +++--- .../util/ManagedIndexUtils.kt | 11 +- .../step/AttemptCloseStepTests.kt | 110 ++++++++++++++++++ .../step/AttemptDeleteStepTests.kt | 110 ++++++++++++++++++ .../util/ManagedIndexUtilsTests.kt | 69 +++++++++++ 10 files changed, 364 insertions(+), 27 deletions(-) create mode 100644 src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptCloseStepTests.kt create mode 100644 src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptDeleteStepTests.kt diff --git a/build.gradle b/build.gradle index ed5bdd950..c16296c9a 100644 --- a/build.gradle +++ b/build.gradle @@ -37,6 +37,7 @@ buildscript { plugins { id 'nebula.ospackage' version "5.3.0" + id "com.dorongold.task-tree" version "1.5" } apply plugin: 'java' @@ -71,6 +72,10 @@ detekt { buildUponDefaultConfig = true } +configurations.testCompile { + exclude module: "securemock" +} + dependencies { compileOnly "org.elasticsearch:elasticsearch:${es_version}" compileOnly "com.amazon.opendistroforelasticsearch:opendistro-job-scheduler-spi:1.3.0.0" @@ -82,6 +87,7 @@ dependencies { testCompile "org.elasticsearch.test:framework:${es_version}" testCompile "org.jetbrains.kotlin:kotlin-test:${kotlin_version}" + testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0" ktlint "com.pinterest:ktlint:0.33.0" } @@ -189,4 +195,6 @@ task ktlintFormat(type: JavaExec, group: "formatting") { args "-F", "src/**/*.kt" } +compileKotlin { kotlinOptions.freeCompilerArgs = ['-Xjsr305=strict'] } + apply from: 'build-tools/pkgbuild.gradle' diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/close/AttemptCloseStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/close/AttemptCloseStep.kt index 407ae38f4..7be775edb 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/close/AttemptCloseStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/close/AttemptCloseStep.kt @@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexRequest import org.elasticsearch.action.admin.indices.close.CloseIndexResponse import org.elasticsearch.client.Client import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.snapshots.SnapshotInProgressException class AttemptCloseStep( val clusterService: ClusterService, @@ -39,21 +40,27 @@ class AttemptCloseStep( @Suppress("TooGenericExceptionCaught") override suspend fun execute() { + val index = managedIndexMetaData.index try { - logger.info("Executing close on ${managedIndexMetaData.index}") + logger.info("Executing close on $index") val closeIndexRequest = CloseIndexRequest() - .indices(managedIndexMetaData.index) + .indices(index) val response: CloseIndexResponse = client.admin().indices().suspendUntil { close(closeIndexRequest, it) } + logger.info("Close index for $index was acknowledged=${response.isAcknowledged}") if (response.isAcknowledged) { stepStatus = StepStatus.COMPLETED info = mapOf("message" to "Successfully closed index") } else { stepStatus = StepStatus.FAILED - info = mapOf("message" to "Failed to close index: ${managedIndexMetaData.index}") + info = mapOf("message" to "Failed to close index") } + } catch (e: SnapshotInProgressException) { + logger.warn("Failed to close index [index=$index] with snapshot in progress") + stepStatus = StepStatus.CONDITION_NOT_MET + info = mapOf("message" to "Index had snapshot in progress, retrying closing") } catch (e: Exception) { - logger.error("Failed to set index to close [index=${managedIndexMetaData.index}]", e) + logger.error("Failed to set index to close [index=$index]", e) stepStatus = StepStatus.FAILED val mutableInfo = mutableMapOf("message" to "Failed to set index to close") val errorMessage = e.message diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/delete/AttemptDeleteStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/delete/AttemptDeleteStep.kt index 3f99a425d..507b74d2e 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/delete/AttemptDeleteStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/delete/AttemptDeleteStep.kt @@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest import org.elasticsearch.action.support.master.AcknowledgedResponse import org.elasticsearch.client.Client import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.snapshots.SnapshotInProgressException import java.lang.Exception class AttemptDeleteStep( @@ -51,6 +52,10 @@ class AttemptDeleteStep( stepStatus = StepStatus.FAILED info = mapOf("message" to "Failed to delete index") } + } catch (e: SnapshotInProgressException) { + logger.warn("Failed to delete index [index=${managedIndexMetaData.index}] with snapshot in progress") + stepStatus = StepStatus.CONDITION_NOT_MET + info = mapOf("message" to "Index had snapshot in progress, retrying deletion") } catch (e: Exception) { logger.error("Failed to delete index [index=${managedIndexMetaData.index}]", e) stepStatus = StepStatus.FAILED diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt index 0a55983c0..a5849b9c6 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt @@ -112,7 +112,15 @@ class WaitForForceMergeStep( val statsResponse: IndicesStatsResponse = client.admin().indices().suspendUntil { stats(statsRequest, it) } if (statsResponse.status == RestStatus.OK) { - return statsResponse.shards.count { it.stats.segments.count > maxNumSegments } + return statsResponse.shards.count { + val count = it.stats.segments?.count + if (count == null) { + logger.warn("$indexName wait for force merge had null segments") + false + } else { + count > maxNumSegments + } + } } logger.debug("Failed to get index stats for index: [$indexName], status response: [${statsResponse.status}]") diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt index b995312b0..d79299319 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt @@ -47,8 +47,10 @@ class AttemptRolloverStep( @Suppress("TooGenericExceptionCaught") override suspend fun execute() { + val index = managedIndexMetaData.index // If we have already rolled over this index then fail as we only allow an index to be rolled over once if (managedIndexMetaData.rolledOver == true) { + logger.warn("$index was already rolled over, cannot execute rollover step") stepStatus = StepStatus.FAILED info = mapOf("message" to "This index has already been rolled over") return @@ -62,11 +64,17 @@ class AttemptRolloverStep( // If statsResponse is null we already updated failed info from getIndexStatsOrUpdateInfo and can return early statsResponse ?: return - val indexCreationDate = Instant.ofEpochMilli(clusterService.state().metaData().index(managedIndexMetaData.index).creationDate) - val numDocs = statsResponse.primaries.docs.count - val indexSize = ByteSizeValue(statsResponse.primaries.docs.totalSizeInBytes) + val indexCreationDate = clusterService.state().metaData().index(index).creationDate + val indexCreationDateInstant = Instant.ofEpochMilli(indexCreationDate) + if (indexCreationDate == -1L) { + logger.warn("$index had an indexCreationDate=-1L, cannot use for comparison") + } + val numDocs = statsResponse.primaries.docs?.count ?: 0 + val indexSize = ByteSizeValue(statsResponse.primaries.docs?.totalSizeInBytes ?: 0) - if (config.evaluateConditions(indexCreationDate, numDocs, indexSize)) { + if (config.evaluateConditions(indexCreationDateInstant, numDocs, indexSize)) { + logger.info("$index rollover conditions evaluated to true [indexCreationDate=$indexCreationDate," + + " numDocs=$numDocs, indexSize=${indexSize.bytes}]") executeRollover(alias) } else { stepStatus = StepStatus.CONDITION_NOT_MET diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/transition/AttemptTransitionStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/transition/AttemptTransitionStep.kt index 70d00cf35..a6701d328 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/transition/AttemptTransitionStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/transition/AttemptTransitionStep.kt @@ -53,24 +53,32 @@ class AttemptTransitionStep( @Suppress("TooGenericExceptionCaught") override suspend fun execute() { + val index = managedIndexMetaData.index try { if (config.transitions.isEmpty()) { + logger.info("$index transitions are empty, completing policy") policyCompleted = true stepStatus = StepStatus.COMPLETED return } + val indexCreationDate = clusterService.state().metaData().index(index).creationDate + val indexCreationDateInstant = Instant.ofEpochMilli(indexCreationDate) + if (indexCreationDate == -1L) { + logger.warn("$index had an indexCreationDate=-1L, cannot use for comparison") + } + val stepStartTime = getStepStartTime() var numDocs: Long? = null var indexSize: ByteSizeValue? = null if (config.transitions.any { it.hasStatsConditions() }) { val statsRequest = IndicesStatsRequest() - .indices(managedIndexMetaData.index).clear().docs(true) + .indices(index).clear().docs(true) val statsResponse: IndicesStatsResponse = client.admin().indices().suspendUntil { stats(statsRequest, it) } if (statsResponse.status != RestStatus.OK) { logger.debug( - "Failed to get index stats for index: [${managedIndexMetaData.index}], status response: [${statsResponse.status}]" + "Failed to get index stats for index: [$index], status response: [${statsResponse.status}]" ) stepStatus = StepStatus.FAILED @@ -81,23 +89,25 @@ class AttemptTransitionStep( return } - numDocs = statsResponse.primaries.docs.count - indexSize = ByteSizeValue(statsResponse.primaries.docs.totalSizeInBytes) - // Find the first transition that evaluates to true and get the state to transition to, otherwise return null if none are true + numDocs = statsResponse.primaries.docs?.count ?: 0 + indexSize = ByteSizeValue(statsResponse.primaries.docs?.totalSizeInBytes ?: 0) } // Find the first transition that evaluates to true and get the state to transition to, otherwise return null if none are true - stateName = config.transitions.find { it.evaluateConditions(getIndexCreationDate(), numDocs, indexSize, getStepStartTime()) }?.stateName - val message = if (stateName == null) { - stepStatus = StepStatus.CONDITION_NOT_MET - "Attempting to transition" - } else { + stateName = config.transitions.find { it.evaluateConditions(indexCreationDateInstant, numDocs, indexSize, stepStartTime) }?.stateName + val message: String + if (stateName != null) { + logger.info("$index transition conditions evaluated to true [indexCreationDate=$indexCreationDate," + + " numDocs=$numDocs, indexSize=${indexSize?.bytes},stepStartTime=${stepStartTime.toEpochMilli()}]") stepStatus = StepStatus.COMPLETED - "Transitioning to $stateName" + message = "Transitioning to $stateName" + } else { + stepStatus = StepStatus.CONDITION_NOT_MET + message = "Attempting to transition" } info = mapOf("message" to message) } catch (e: Exception) { - logger.error("Failed to transition index [index=${managedIndexMetaData.index}]", e) + logger.error("Failed to transition index [index=$index]", e) stepStatus = StepStatus.FAILED val mutableInfo = mutableMapOf("message" to "Failed to transition index") val errorMessage = e.message @@ -106,9 +116,6 @@ class AttemptTransitionStep( } } - private fun getIndexCreationDate(): Instant = - Instant.ofEpochMilli(clusterService.state().metaData().index(managedIndexMetaData.index).creationDate) - override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData { return currentMetaData.copy( policyCompleted = policyCompleted, diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtils.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtils.kt index d98b7028f..f203b035d 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtils.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtils.kt @@ -193,7 +193,9 @@ fun Transition.evaluateConditions( } if (this.conditions.indexAge != null) { - val elapsedTime = Instant.now().toEpochMilli() - indexCreationDate.toEpochMilli() + val indexCreationDateMilli = indexCreationDate.toEpochMilli() + if (indexCreationDateMilli == -1L) return false // transitions cannot currently be ORd like rollover, so we must return here + val elapsedTime = Instant.now().toEpochMilli() - indexCreationDateMilli return this.conditions.indexAge.millis <= elapsedTime } @@ -230,8 +232,11 @@ fun RolloverActionConfig.evaluateConditions( } if (this.minAge != null) { - val elapsedTime = Instant.now().toEpochMilli() - indexCreationDate.toEpochMilli() - if (this.minAge.millis <= elapsedTime) return true + val indexCreationDateMilli = indexCreationDate.toEpochMilli() + if (indexCreationDateMilli != -1L) { + val elapsedTime = Instant.now().toEpochMilli() - indexCreationDateMilli + if (this.minAge.millis <= elapsedTime) return true + } } if (this.minSize != null) { diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptCloseStepTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptCloseStepTests.kt new file mode 100644 index 000000000..2e49c9cfb --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptCloseStepTests.kt @@ -0,0 +1,110 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexstatemanagement.step + +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.CloseActionConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.close.AttemptCloseStep +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.doAnswer +import com.nhaarman.mockitokotlin2.doReturn +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import kotlinx.coroutines.runBlocking +import org.elasticsearch.action.ActionListener +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse +import org.elasticsearch.client.AdminClient +import org.elasticsearch.client.Client +import org.elasticsearch.client.IndicesAdminClient +import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.snapshots.SnapshotInProgressException +import org.elasticsearch.test.ESTestCase +import kotlin.IllegalArgumentException + +class AttemptCloseStepTests : ESTestCase() { + + private val clusterService: ClusterService = mock() + + fun `test close step sets step status to completed when successful`() { + val closeIndexResponse = CloseIndexResponse(true, true, listOf()) + val client = getClient(getAdminClient(getIndicesAdminClient(closeIndexResponse, null))) + + runBlocking { + val closeActionConfig = CloseActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData) + attemptCloseStep.execute() + val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test close step sets step status to failed when not acknowledged`() { + val closeIndexResponse = CloseIndexResponse(false, false, listOf()) + val client = getClient(getAdminClient(getIndicesAdminClient(closeIndexResponse, null))) + + runBlocking { + val closeActionConfig = CloseActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData) + attemptCloseStep.execute() + val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test close step sets step status to failed when error thrown`() { + val exception = IllegalArgumentException("example") + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val closeActionConfig = CloseActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData) + attemptCloseStep.execute() + val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + logger.info(updatedManagedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test close step sets step status to condition not met when snapshot in progress error thrown`() { + val exception = SnapshotInProgressException("example") + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val closeActionConfig = CloseActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData) + attemptCloseStep.execute() + val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not CONDITION_NOT_MET", Step.StepStatus.CONDITION_NOT_MET, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient } + private fun getAdminClient(indicesAdminClient: IndicesAdminClient): AdminClient = mock { on { indices() } doReturn indicesAdminClient } + private fun getIndicesAdminClient(closeIndexResponse: CloseIndexResponse?, exception: Exception?): IndicesAdminClient { + assertTrue("Must provide one and only one response or exception", (closeIndexResponse != null).xor(exception != null)) + return mock { + doAnswer { invocationOnMock -> + val listener = invocationOnMock.getArgument>(1) + if (closeIndexResponse != null) listener.onResponse(closeIndexResponse) + else listener.onFailure(exception) + }.whenever(this.mock).close(any(), any()) + } + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptDeleteStepTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptDeleteStepTests.kt new file mode 100644 index 000000000..e00a7f119 --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptDeleteStepTests.kt @@ -0,0 +1,110 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexstatemanagement.step + +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.DeleteActionConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.delete.AttemptDeleteStep +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.doAnswer +import com.nhaarman.mockitokotlin2.doReturn +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import kotlinx.coroutines.runBlocking +import org.elasticsearch.action.ActionListener +import org.elasticsearch.action.support.master.AcknowledgedResponse +import org.elasticsearch.client.AdminClient +import org.elasticsearch.client.Client +import org.elasticsearch.client.IndicesAdminClient +import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.snapshots.SnapshotInProgressException +import org.elasticsearch.test.ESTestCase +import kotlin.IllegalArgumentException + +class AttemptDeleteStepTests : ESTestCase() { + + private val clusterService: ClusterService = mock() + + fun `test delete step sets step status to completed when successful`() { + val acknowledgedResponse = AcknowledgedResponse(true) + val client = getClient(getAdminClient(getIndicesAdminClient(acknowledgedResponse, null))) + + runBlocking { + val deleteActionConfig = DeleteActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val attemptDeleteStep = AttemptDeleteStep(clusterService, client, deleteActionConfig, managedIndexMetaData) + attemptDeleteStep.execute() + val updatedManagedIndexMetaData = attemptDeleteStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test delete step sets step status to failed when not acknowledged`() { + val acknowledgedResponse = AcknowledgedResponse(false) + val client = getClient(getAdminClient(getIndicesAdminClient(acknowledgedResponse, null))) + + runBlocking { + val deleteActionConfig = DeleteActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val attemptDeleteStep = AttemptDeleteStep(clusterService, client, deleteActionConfig, managedIndexMetaData) + attemptDeleteStep.execute() + val updatedManagedIndexMetaData = attemptDeleteStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test delete step sets step status to failed when error thrown`() { + val exception = IllegalArgumentException("example") + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val deleteActionConfig = DeleteActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val attemptDeleteStep = AttemptDeleteStep(clusterService, client, deleteActionConfig, managedIndexMetaData) + attemptDeleteStep.execute() + val updatedManagedIndexMetaData = attemptDeleteStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + logger.info(updatedManagedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test delete step sets step status to condition not met when snapshot in progress error thrown`() { + val exception = SnapshotInProgressException("example") + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val deleteActionConfig = DeleteActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val attemptDeleteStep = AttemptDeleteStep(clusterService, client, deleteActionConfig, managedIndexMetaData) + attemptDeleteStep.execute() + val updatedManagedIndexMetaData = attemptDeleteStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not CONDITION_NOT_MET", Step.StepStatus.CONDITION_NOT_MET, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient } + private fun getAdminClient(indicesAdminClient: IndicesAdminClient): AdminClient = mock { on { indices() } doReturn indicesAdminClient } + private fun getIndicesAdminClient(acknowledgedResponse: AcknowledgedResponse?, exception: Exception?): IndicesAdminClient { + assertTrue("Must provide one and only one response or exception", (acknowledgedResponse != null).xor(exception != null)) + return mock { + doAnswer { invocationOnMock -> + val listener = invocationOnMock.getArgument>(1) + if (acknowledgedResponse != null) listener.onResponse(acknowledgedResponse) + else listener.onFailure(exception) + }.whenever(this.mock).delete(any(), any()) + } + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtilsTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtilsTests.kt index d0e819495..2ae64bc00 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtilsTests.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtilsTests.kt @@ -16,7 +16,10 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.util import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementPlugin.Companion.INDEX_STATE_MANAGEMENT_INDEX +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Conditions import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Transition +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.RolloverActionConfig import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.coordinator.SweptManagedIndexConfig import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomChangePolicy import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomClusterStateManagedIndexConfig @@ -24,11 +27,14 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomSweptMan import org.elasticsearch.action.delete.DeleteRequest import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.common.bytes.BytesReference +import org.elasticsearch.common.unit.ByteSizeValue +import org.elasticsearch.common.unit.TimeValue import org.elasticsearch.common.xcontent.LoggingDeprecationHandler import org.elasticsearch.common.xcontent.XContentHelper import org.elasticsearch.common.xcontent.XContentParser import org.elasticsearch.common.xcontent.XContentType import org.elasticsearch.test.ESTestCase +import java.time.Instant class ManagedIndexUtilsTests : ESTestCase() { @@ -173,6 +179,69 @@ class ManagedIndexUtilsTests : ESTestCase() { assertEquals("Wrong index being searched", listOf(INDEX_STATE_MANAGEMENT_INDEX), indices) } + fun `test rollover action config evaluate conditions`() { + val noConditionsConfig = RolloverActionConfig(minSize = null, minDocs = null, minAge = null, index = 0) + assertTrue("No conditions should always pass", noConditionsConfig + .evaluateConditions(indexCreationDate = Instant.ofEpochMilli(-1L), numDocs = 0, indexSize = ByteSizeValue(0))) + assertTrue("No conditions should always pass", noConditionsConfig + .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 5, indexSize = ByteSizeValue(5))) + assertTrue("No conditions should always pass", noConditionsConfig + .evaluateConditions(indexCreationDate = Instant.now().minusSeconds(600), numDocs = 5, indexSize = ByteSizeValue(5))) + assertTrue("No conditions should always pass", noConditionsConfig + .evaluateConditions(indexCreationDate = Instant.now().plusSeconds(600), numDocs = 5, indexSize = ByteSizeValue(5))) + + val minSizeConfig = RolloverActionConfig(minSize = ByteSizeValue(5), minDocs = null, minAge = null, index = 0) + assertFalse("Less bytes should not pass", minSizeConfig + .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + assertTrue("Equal bytes should pass", minSizeConfig + .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue(5))) + assertTrue("More bytes should pass", minSizeConfig + .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue(10))) + + val minDocsConfig = RolloverActionConfig(minSize = null, minDocs = 5, minAge = null, index = 0) + assertFalse("Less docs should not pass", minDocsConfig + .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + assertTrue("Equal docs should pass", minDocsConfig + .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 5, indexSize = ByteSizeValue.ZERO)) + assertTrue("More docs should pass", minDocsConfig + .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 10, indexSize = ByteSizeValue.ZERO)) + + val minAgeConfig = RolloverActionConfig(minSize = null, minDocs = null, minAge = TimeValue.timeValueSeconds(5), index = 0) + assertFalse("Index age that is too young should not pass", minAgeConfig + .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + assertTrue("Index age that is older should pass", minAgeConfig + .evaluateConditions(indexCreationDate = Instant.now().minusSeconds(10), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + assertFalse("Index age that is -1L should not pass", minAgeConfig + .evaluateConditions(indexCreationDate = Instant.ofEpochMilli(-1L), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + + val multiConfig = RolloverActionConfig(minSize = ByteSizeValue(1), minDocs = 1, minAge = TimeValue.timeValueSeconds(5), index = 0) + assertFalse("No conditions met should not pass", multiConfig + .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + assertFalse("Multi condition, index age -1L should not pass", multiConfig + .evaluateConditions(indexCreationDate = Instant.ofEpochMilli(-1L), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + assertTrue("Multi condition, age should pass", multiConfig + .evaluateConditions(indexCreationDate = Instant.now().minusSeconds(10), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + assertTrue("Multi condition, docs should pass", multiConfig + .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 2, indexSize = ByteSizeValue.ZERO)) + assertTrue("Multi condition, size should pass", multiConfig + .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue(2))) + } + + fun `test transition evaluate conditions`() { + val emptyTransition = Transition(stateName = "some_state", conditions = null) + assertTrue("No conditions should pass", emptyTransition + .evaluateConditions(indexCreationDate = Instant.now(), numDocs = null, indexSize = null, transitionStartTime = Instant.now())) + + val timeTransition = Transition(stateName = "some_state", + conditions = Conditions(indexAge = TimeValue.timeValueSeconds(5), docCount = null, size = null, cron = null)) + assertFalse("Index age that is too young should not pass", timeTransition + .evaluateConditions(indexCreationDate = Instant.now(), numDocs = null, indexSize = null, transitionStartTime = Instant.now())) + assertTrue("Index age that is older should pass", timeTransition + .evaluateConditions(indexCreationDate = Instant.now().minusSeconds(10), numDocs = null, indexSize = null, transitionStartTime = Instant.now())) + assertFalse("Index age that is -1L should not pass", timeTransition + .evaluateConditions(indexCreationDate = Instant.ofEpochMilli(-1L), numDocs = null, indexSize = null, transitionStartTime = Instant.now())) + } + private fun contentParser(bytesReference: BytesReference): XContentParser { return XContentHelper.createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, bytesReference, XContentType.JSON)