Skip to content

Commit

Permalink
Delete and close failing during snapshot in progress (opendistro-for-…
Browse files Browse the repository at this point in the history
…elasticsearch#172)

* Fixes AttemptDeleteStep failing from a SnapshotInProgressException

* Fixes close action failing on snapshot in progress exception, fixes imports

* Fixes styling
  • Loading branch information
dbbaughe committed Apr 6, 2020
1 parent d44ac63 commit fb0b9e5
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 4 deletions.
6 changes: 6 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ buildscript {

plugins {
id 'nebula.ospackage' version "8.2.0"
id "com.dorongold.task-tree" version "1.5"
}

apply plugin: 'java'
Expand Down Expand Up @@ -72,6 +73,10 @@ detekt {
buildUponDefaultConfig = true
}

configurations.testCompile {
exclude module: "securemock"
}

dependencies {
compileOnly "org.elasticsearch:elasticsearch:${es_version}"
compileOnly "com.amazon.opendistroforelasticsearch:opendistro-job-scheduler-spi:1.6.0.0"
Expand All @@ -83,6 +88,7 @@ dependencies {

testCompile "org.elasticsearch.test:framework:${es_version}"
testCompile "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0"

ktlint "com.pinterest:ktlint:0.33.0"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexRequest
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.snapshots.SnapshotInProgressException

class AttemptCloseStep(
val clusterService: ClusterService,
Expand All @@ -41,21 +42,27 @@ class AttemptCloseStep(

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
val index = managedIndexMetaData.index
try {
logger.info("Executing close on ${managedIndexMetaData.index}")
logger.info("Executing close on $index")
val closeIndexRequest = CloseIndexRequest()
.indices(managedIndexMetaData.index)
.indices(index)

val response: CloseIndexResponse = client.admin().indices().suspendUntil { close(closeIndexRequest, it) }
logger.info("Close index for $index was acknowledged=${response.isAcknowledged}")
if (response.isAcknowledged) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to "Successfully closed index")
} else {
stepStatus = StepStatus.FAILED
info = mapOf("message" to "Failed to close index: ${managedIndexMetaData.index}")
info = mapOf("message" to "Failed to close index")
}
} catch (e: SnapshotInProgressException) {
logger.warn("Failed to close index [index=$index] with snapshot in progress")
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to "Index had snapshot in progress, retrying closing")
} catch (e: Exception) {
logger.error("Failed to set index to close [index=${managedIndexMetaData.index}]", e)
logger.error("Failed to set index to close [index=$index]", e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to "Failed to set index to close")
val errorMessage = e.message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.snapshots.SnapshotInProgressException
import java.lang.Exception

class AttemptDeleteStep(
Expand Down Expand Up @@ -53,6 +54,10 @@ class AttemptDeleteStep(
stepStatus = StepStatus.FAILED
info = mapOf("message" to "Failed to delete index")
}
} catch (e: SnapshotInProgressException) {
logger.warn("Failed to delete index [index=${managedIndexMetaData.index}] with snapshot in progress")
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to "Index had snapshot in progress, retrying deletion")
} catch (e: Exception) {
logger.error("Failed to delete index [index=${managedIndexMetaData.index}]", e)
stepStatus = StepStatus.FAILED
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexstatemanagement.step

import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.CloseActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.close.AttemptCloseStep
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.doAnswer
import com.nhaarman.mockitokotlin2.doReturn
import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.whenever
import kotlinx.coroutines.runBlocking
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse
import org.elasticsearch.client.AdminClient
import org.elasticsearch.client.Client
import org.elasticsearch.client.IndicesAdminClient
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.snapshots.SnapshotInProgressException
import org.elasticsearch.test.ESTestCase
import kotlin.IllegalArgumentException

class AttemptCloseStepTests : ESTestCase() {

private val clusterService: ClusterService = mock()

fun `test close step sets step status to completed when successful`() {
val closeIndexResponse = CloseIndexResponse(true, true, listOf())
val client = getClient(getAdminClient(getIndicesAdminClient(closeIndexResponse, null)))

runBlocking {
val closeActionConfig = CloseActionConfig(0)
val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null)
val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData)
attemptCloseStep.execute()
val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData)
assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
}
}

fun `test close step sets step status to failed when not acknowledged`() {
val closeIndexResponse = CloseIndexResponse(false, false, listOf())
val client = getClient(getAdminClient(getIndicesAdminClient(closeIndexResponse, null)))

runBlocking {
val closeActionConfig = CloseActionConfig(0)
val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null)
val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData)
attemptCloseStep.execute()
val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData)
assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
}
}

fun `test close step sets step status to failed when error thrown`() {
val exception = IllegalArgumentException("example")
val client = getClient(getAdminClient(getIndicesAdminClient(null, exception)))

runBlocking {
val closeActionConfig = CloseActionConfig(0)
val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null)
val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData)
attemptCloseStep.execute()
val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData)
logger.info(updatedManagedIndexMetaData)
assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
}
}

fun `test close step sets step status to condition not met when snapshot in progress error thrown`() {
val exception = SnapshotInProgressException("example")
val client = getClient(getAdminClient(getIndicesAdminClient(null, exception)))

runBlocking {
val closeActionConfig = CloseActionConfig(0)
val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null)
val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData)
attemptCloseStep.execute()
val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData)
assertEquals("Step status is not CONDITION_NOT_MET", Step.StepStatus.CONDITION_NOT_MET, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
}
}

private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient }
private fun getAdminClient(indicesAdminClient: IndicesAdminClient): AdminClient = mock { on { indices() } doReturn indicesAdminClient }
private fun getIndicesAdminClient(closeIndexResponse: CloseIndexResponse?, exception: Exception?): IndicesAdminClient {
assertTrue("Must provide one and only one response or exception", (closeIndexResponse != null).xor(exception != null))
return mock {
doAnswer { invocationOnMock ->
val listener = invocationOnMock.getArgument<ActionListener<CloseIndexResponse>>(1)
if (closeIndexResponse != null) listener.onResponse(closeIndexResponse)
else listener.onFailure(exception)
}.whenever(this.mock).close(any(), any())
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexstatemanagement.step

import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.DeleteActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.delete.AttemptDeleteStep
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.doAnswer
import com.nhaarman.mockitokotlin2.doReturn
import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.whenever
import kotlinx.coroutines.runBlocking
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.AdminClient
import org.elasticsearch.client.Client
import org.elasticsearch.client.IndicesAdminClient
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.snapshots.SnapshotInProgressException
import org.elasticsearch.test.ESTestCase
import kotlin.IllegalArgumentException

class AttemptDeleteStepTests : ESTestCase() {

private val clusterService: ClusterService = mock()

fun `test delete step sets step status to completed when successful`() {
val acknowledgedResponse = AcknowledgedResponse(true)
val client = getClient(getAdminClient(getIndicesAdminClient(acknowledgedResponse, null)))

runBlocking {
val deleteActionConfig = DeleteActionConfig(0)
val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null)
val attemptDeleteStep = AttemptDeleteStep(clusterService, client, deleteActionConfig, managedIndexMetaData)
attemptDeleteStep.execute()
val updatedManagedIndexMetaData = attemptDeleteStep.getUpdatedManagedIndexMetaData(managedIndexMetaData)
assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
}
}

fun `test delete step sets step status to failed when not acknowledged`() {
val acknowledgedResponse = AcknowledgedResponse(false)
val client = getClient(getAdminClient(getIndicesAdminClient(acknowledgedResponse, null)))

runBlocking {
val deleteActionConfig = DeleteActionConfig(0)
val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null)
val attemptDeleteStep = AttemptDeleteStep(clusterService, client, deleteActionConfig, managedIndexMetaData)
attemptDeleteStep.execute()
val updatedManagedIndexMetaData = attemptDeleteStep.getUpdatedManagedIndexMetaData(managedIndexMetaData)
assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
}
}

fun `test delete step sets step status to failed when error thrown`() {
val exception = IllegalArgumentException("example")
val client = getClient(getAdminClient(getIndicesAdminClient(null, exception)))

runBlocking {
val deleteActionConfig = DeleteActionConfig(0)
val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null)
val attemptDeleteStep = AttemptDeleteStep(clusterService, client, deleteActionConfig, managedIndexMetaData)
attemptDeleteStep.execute()
val updatedManagedIndexMetaData = attemptDeleteStep.getUpdatedManagedIndexMetaData(managedIndexMetaData)
logger.info(updatedManagedIndexMetaData)
assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
}
}

fun `test delete step sets step status to condition not met when snapshot in progress error thrown`() {
val exception = SnapshotInProgressException("example")
val client = getClient(getAdminClient(getIndicesAdminClient(null, exception)))

runBlocking {
val deleteActionConfig = DeleteActionConfig(0)
val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null)
val attemptDeleteStep = AttemptDeleteStep(clusterService, client, deleteActionConfig, managedIndexMetaData)
attemptDeleteStep.execute()
val updatedManagedIndexMetaData = attemptDeleteStep.getUpdatedManagedIndexMetaData(managedIndexMetaData)
assertEquals("Step status is not CONDITION_NOT_MET", Step.StepStatus.CONDITION_NOT_MET, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
}
}

private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient }
private fun getAdminClient(indicesAdminClient: IndicesAdminClient): AdminClient = mock { on { indices() } doReturn indicesAdminClient }
private fun getIndicesAdminClient(acknowledgedResponse: AcknowledgedResponse?, exception: Exception?): IndicesAdminClient {
assertTrue("Must provide one and only one response or exception", (acknowledgedResponse != null).xor(exception != null))
return mock {
doAnswer { invocationOnMock ->
val listener = invocationOnMock.getArgument<ActionListener<AcknowledgedResponse>>(1)
if (acknowledgedResponse != null) listener.onResponse(acknowledgedResponse)
else listener.onFailure(exception)
}.whenever(this.mock).delete(any(), any())
}
}
}

0 comments on commit fb0b9e5

Please sign in to comment.