From ce1a92416433c751f4bc5f61a4ee3c4485cc2819 Mon Sep 17 00:00:00 2001 From: Drew Baugher <46505179+dbbaughe@users.noreply.github.com> Date: Mon, 6 Apr 2020 11:27:37 -0700 Subject: [PATCH] Delete and close failing during snapshot in progress (#172) * Fixes AttemptDeleteStep failing from a SnapshotInProgressException * Fixes close action failing on snapshot in progress exception, fixes imports * Fixes styling --- build.gradle | 6 + .../step/close/AttemptCloseStep.kt | 19 ++- .../step/delete/AttemptDeleteStep.kt | 5 + .../step/AttemptCloseStepTests.kt | 110 ++++++++++++++++++ .../step/AttemptDeleteStepTests.kt | 110 ++++++++++++++++++ 5 files changed, 246 insertions(+), 4 deletions(-) create mode 100644 src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptCloseStepTests.kt create mode 100644 src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptDeleteStepTests.kt diff --git a/build.gradle b/build.gradle index 6c051c4f2..2f3b78c00 100644 --- a/build.gradle +++ b/build.gradle @@ -37,6 +37,7 @@ buildscript { plugins { id 'nebula.ospackage' version "5.3.0" + id "com.dorongold.task-tree" version "1.5" } apply plugin: 'java' @@ -71,6 +72,10 @@ detekt { buildUponDefaultConfig = true } +configurations.testCompile { + exclude module: "securemock" +} + dependencies { compileOnly "org.elasticsearch:elasticsearch:${es_version}" compileOnly "com.amazon.opendistroforelasticsearch:opendistro-job-scheduler-spi:1.1.0.1" @@ -82,6 +87,7 @@ dependencies { testCompile "org.elasticsearch.test:framework:${es_version}" testCompile "org.jetbrains.kotlin:kotlin-test:${kotlin_version}" + testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0" ktlint "com.pinterest:ktlint:0.33.0" } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/close/AttemptCloseStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/close/AttemptCloseStep.kt index 497065351..dd8ab4225 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/close/AttemptCloseStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/close/AttemptCloseStep.kt @@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexRequest import org.elasticsearch.action.support.master.AcknowledgedResponse import org.elasticsearch.client.Client import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.snapshots.SnapshotInProgressException class AttemptCloseStep( val clusterService: ClusterService, @@ -39,21 +40,31 @@ class AttemptCloseStep( @Suppress("TooGenericExceptionCaught") override suspend fun execute() { + val index = managedIndexMetaData.index try { - logger.info("Executing close on ${managedIndexMetaData.index}") + logger.info("Executing close on $index") val closeIndexRequest = CloseIndexRequest() - .indices(managedIndexMetaData.index) + .indices(index) +<<<<<<< HEAD val response: AcknowledgedResponse = client.admin().indices().suspendUntil { close(closeIndexRequest, it) } +======= + val response: CloseIndexResponse = client.admin().indices().suspendUntil { close(closeIndexRequest, it) } + logger.info("Close index for $index was acknowledged=${response.isAcknowledged}") +>>>>>>> 5ec003b... Delete and close failing during snapshot in progress (#172) if (response.isAcknowledged) { stepStatus = StepStatus.COMPLETED info = mapOf("message" to "Successfully closed index") } else { stepStatus = StepStatus.FAILED - info = mapOf("message" to "Failed to close index: ${managedIndexMetaData.index}") + info = mapOf("message" to "Failed to close index") } + } catch (e: SnapshotInProgressException) { + logger.warn("Failed to close index [index=$index] with snapshot in progress") + stepStatus = StepStatus.CONDITION_NOT_MET + info = mapOf("message" to "Index had snapshot in progress, retrying closing") } catch (e: Exception) { - logger.error("Failed to set index to close [index=${managedIndexMetaData.index}]", e) + logger.error("Failed to set index to close [index=$index]", e) stepStatus = StepStatus.FAILED val mutableInfo = mutableMapOf("message" to "Failed to set index to close") val errorMessage = e.message diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/delete/AttemptDeleteStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/delete/AttemptDeleteStep.kt index 3f99a425d..507b74d2e 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/delete/AttemptDeleteStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/delete/AttemptDeleteStep.kt @@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest import org.elasticsearch.action.support.master.AcknowledgedResponse import org.elasticsearch.client.Client import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.snapshots.SnapshotInProgressException import java.lang.Exception class AttemptDeleteStep( @@ -51,6 +52,10 @@ class AttemptDeleteStep( stepStatus = StepStatus.FAILED info = mapOf("message" to "Failed to delete index") } + } catch (e: SnapshotInProgressException) { + logger.warn("Failed to delete index [index=${managedIndexMetaData.index}] with snapshot in progress") + stepStatus = StepStatus.CONDITION_NOT_MET + info = mapOf("message" to "Index had snapshot in progress, retrying deletion") } catch (e: Exception) { logger.error("Failed to delete index [index=${managedIndexMetaData.index}]", e) stepStatus = StepStatus.FAILED diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptCloseStepTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptCloseStepTests.kt new file mode 100644 index 000000000..2e49c9cfb --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptCloseStepTests.kt @@ -0,0 +1,110 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexstatemanagement.step + +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.CloseActionConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.close.AttemptCloseStep +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.doAnswer +import com.nhaarman.mockitokotlin2.doReturn +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import kotlinx.coroutines.runBlocking +import org.elasticsearch.action.ActionListener +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse +import org.elasticsearch.client.AdminClient +import org.elasticsearch.client.Client +import org.elasticsearch.client.IndicesAdminClient +import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.snapshots.SnapshotInProgressException +import org.elasticsearch.test.ESTestCase +import kotlin.IllegalArgumentException + +class AttemptCloseStepTests : ESTestCase() { + + private val clusterService: ClusterService = mock() + + fun `test close step sets step status to completed when successful`() { + val closeIndexResponse = CloseIndexResponse(true, true, listOf()) + val client = getClient(getAdminClient(getIndicesAdminClient(closeIndexResponse, null))) + + runBlocking { + val closeActionConfig = CloseActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData) + attemptCloseStep.execute() + val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test close step sets step status to failed when not acknowledged`() { + val closeIndexResponse = CloseIndexResponse(false, false, listOf()) + val client = getClient(getAdminClient(getIndicesAdminClient(closeIndexResponse, null))) + + runBlocking { + val closeActionConfig = CloseActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData) + attemptCloseStep.execute() + val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test close step sets step status to failed when error thrown`() { + val exception = IllegalArgumentException("example") + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val closeActionConfig = CloseActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData) + attemptCloseStep.execute() + val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + logger.info(updatedManagedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test close step sets step status to condition not met when snapshot in progress error thrown`() { + val exception = SnapshotInProgressException("example") + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val closeActionConfig = CloseActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData) + attemptCloseStep.execute() + val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not CONDITION_NOT_MET", Step.StepStatus.CONDITION_NOT_MET, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient } + private fun getAdminClient(indicesAdminClient: IndicesAdminClient): AdminClient = mock { on { indices() } doReturn indicesAdminClient } + private fun getIndicesAdminClient(closeIndexResponse: CloseIndexResponse?, exception: Exception?): IndicesAdminClient { + assertTrue("Must provide one and only one response or exception", (closeIndexResponse != null).xor(exception != null)) + return mock { + doAnswer { invocationOnMock -> + val listener = invocationOnMock.getArgument>(1) + if (closeIndexResponse != null) listener.onResponse(closeIndexResponse) + else listener.onFailure(exception) + }.whenever(this.mock).close(any(), any()) + } + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptDeleteStepTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptDeleteStepTests.kt new file mode 100644 index 000000000..e00a7f119 --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptDeleteStepTests.kt @@ -0,0 +1,110 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexstatemanagement.step + +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.DeleteActionConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.delete.AttemptDeleteStep +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.doAnswer +import com.nhaarman.mockitokotlin2.doReturn +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import kotlinx.coroutines.runBlocking +import org.elasticsearch.action.ActionListener +import org.elasticsearch.action.support.master.AcknowledgedResponse +import org.elasticsearch.client.AdminClient +import org.elasticsearch.client.Client +import org.elasticsearch.client.IndicesAdminClient +import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.snapshots.SnapshotInProgressException +import org.elasticsearch.test.ESTestCase +import kotlin.IllegalArgumentException + +class AttemptDeleteStepTests : ESTestCase() { + + private val clusterService: ClusterService = mock() + + fun `test delete step sets step status to completed when successful`() { + val acknowledgedResponse = AcknowledgedResponse(true) + val client = getClient(getAdminClient(getIndicesAdminClient(acknowledgedResponse, null))) + + runBlocking { + val deleteActionConfig = DeleteActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val attemptDeleteStep = AttemptDeleteStep(clusterService, client, deleteActionConfig, managedIndexMetaData) + attemptDeleteStep.execute() + val updatedManagedIndexMetaData = attemptDeleteStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test delete step sets step status to failed when not acknowledged`() { + val acknowledgedResponse = AcknowledgedResponse(false) + val client = getClient(getAdminClient(getIndicesAdminClient(acknowledgedResponse, null))) + + runBlocking { + val deleteActionConfig = DeleteActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val attemptDeleteStep = AttemptDeleteStep(clusterService, client, deleteActionConfig, managedIndexMetaData) + attemptDeleteStep.execute() + val updatedManagedIndexMetaData = attemptDeleteStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test delete step sets step status to failed when error thrown`() { + val exception = IllegalArgumentException("example") + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val deleteActionConfig = DeleteActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val attemptDeleteStep = AttemptDeleteStep(clusterService, client, deleteActionConfig, managedIndexMetaData) + attemptDeleteStep.execute() + val updatedManagedIndexMetaData = attemptDeleteStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + logger.info(updatedManagedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test delete step sets step status to condition not met when snapshot in progress error thrown`() { + val exception = SnapshotInProgressException("example") + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val deleteActionConfig = DeleteActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val attemptDeleteStep = AttemptDeleteStep(clusterService, client, deleteActionConfig, managedIndexMetaData) + attemptDeleteStep.execute() + val updatedManagedIndexMetaData = attemptDeleteStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not CONDITION_NOT_MET", Step.StepStatus.CONDITION_NOT_MET, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient } + private fun getAdminClient(indicesAdminClient: IndicesAdminClient): AdminClient = mock { on { indices() } doReturn indicesAdminClient } + private fun getIndicesAdminClient(acknowledgedResponse: AcknowledgedResponse?, exception: Exception?): IndicesAdminClient { + assertTrue("Must provide one and only one response or exception", (acknowledgedResponse != null).xor(exception != null)) + return mock { + doAnswer { invocationOnMock -> + val listener = invocationOnMock.getArgument>(1) + if (acknowledgedResponse != null) listener.onResponse(acknowledgedResponse) + else listener.onFailure(exception) + }.whenever(this.mock).delete(any(), any()) + } + } +} \ No newline at end of file