Skip to content

Commit

Permalink
Backport bug fixes (opendistro-for-elasticsearch#190)
Browse files Browse the repository at this point in the history
* Adds logs, fix for index creation date -1L, nullable checks (opendistro-for-elasticsearch#170)

* Index creation_date of -1L should evaluate to false, adds extra logs

* Adds kotlin compiler check and fixes nullable values

* Adds log

* Delete and close failing during snapshot in progress (opendistro-for-elasticsearch#172)

* Fixes AttemptDeleteStep failing from a SnapshotInProgressException

* Fixes close action failing on snapshot in progress exception, fixes imports

* Fixes styling

* Fixes CloseIndexResponse init

* Fixes missed merge conflict

* Fixes tests
  • Loading branch information
dbbaughe authored Apr 7, 2020
1 parent e96b02f commit f420553
Show file tree
Hide file tree
Showing 10 changed files with 364 additions and 27 deletions.
8 changes: 8 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ buildscript {

plugins {
id 'nebula.ospackage' version "5.3.0"
id "com.dorongold.task-tree" version "1.5"
}

apply plugin: 'java'
Expand Down Expand Up @@ -71,6 +72,10 @@ detekt {
buildUponDefaultConfig = true
}

configurations.testCompile {
exclude module: "securemock"
}

dependencies {
compileOnly "org.elasticsearch:elasticsearch:${es_version}"
compileOnly "com.amazon.opendistroforelasticsearch:opendistro-job-scheduler-spi:1.1.0.1"
Expand All @@ -82,6 +87,7 @@ dependencies {

testCompile "org.elasticsearch.test:framework:${es_version}"
testCompile "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0"

ktlint "com.pinterest:ktlint:0.33.0"
}
Expand Down Expand Up @@ -189,4 +195,6 @@ task ktlintFormat(type: JavaExec, group: "formatting") {
args "-F", "src/**/*.kt"
}

compileKotlin { kotlinOptions.freeCompilerArgs = ['-Xjsr305=strict'] }

apply from: 'build-tools/pkgbuild.gradle'
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexRequest
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.snapshots.SnapshotInProgressException

class AttemptCloseStep(
val clusterService: ClusterService,
Expand All @@ -39,21 +40,27 @@ class AttemptCloseStep(

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
val index = managedIndexMetaData.index
try {
logger.info("Executing close on ${managedIndexMetaData.index}")
logger.info("Executing close on $index")
val closeIndexRequest = CloseIndexRequest()
.indices(managedIndexMetaData.index)
.indices(index)

val response: AcknowledgedResponse = client.admin().indices().suspendUntil { close(closeIndexRequest, it) }
logger.info("Close index for $index was acknowledged=${response.isAcknowledged}")
if (response.isAcknowledged) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to "Successfully closed index")
} else {
stepStatus = StepStatus.FAILED
info = mapOf("message" to "Failed to close index: ${managedIndexMetaData.index}")
info = mapOf("message" to "Failed to close index")
}
} catch (e: SnapshotInProgressException) {
logger.warn("Failed to close index [index=$index] with snapshot in progress")
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to "Index had snapshot in progress, retrying closing")
} catch (e: Exception) {
logger.error("Failed to set index to close [index=${managedIndexMetaData.index}]", e)
logger.error("Failed to set index to close [index=$index]", e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to "Failed to set index to close")
val errorMessage = e.message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.snapshots.SnapshotInProgressException
import java.lang.Exception

class AttemptDeleteStep(
Expand All @@ -51,6 +52,10 @@ class AttemptDeleteStep(
stepStatus = StepStatus.FAILED
info = mapOf("message" to "Failed to delete index")
}
} catch (e: SnapshotInProgressException) {
logger.warn("Failed to delete index [index=${managedIndexMetaData.index}] with snapshot in progress")
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to "Index had snapshot in progress, retrying deletion")
} catch (e: Exception) {
logger.error("Failed to delete index [index=${managedIndexMetaData.index}]", e)
stepStatus = StepStatus.FAILED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,15 @@ class WaitForForceMergeStep(
val statsResponse: IndicesStatsResponse = client.admin().indices().suspendUntil { stats(statsRequest, it) }

if (statsResponse.status == RestStatus.OK) {
return statsResponse.shards.count { it.stats.segments.count > maxNumSegments }
return statsResponse.shards.count {
val count = it.stats.segments?.count
if (count == null) {
logger.warn("$indexName wait for force merge had null segments")
false
} else {
count > maxNumSegments
}
}
}

logger.debug("Failed to get index stats for index: [$indexName], status response: [${statsResponse.status}]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ class AttemptRolloverStep(

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
val index = managedIndexMetaData.index
// If we have already rolled over this index then fail as we only allow an index to be rolled over once
if (managedIndexMetaData.rolledOver == true) {
logger.warn("$index was already rolled over, cannot execute rollover step")
stepStatus = StepStatus.FAILED
info = mapOf("message" to "This index has already been rolled over")
return
Expand All @@ -62,11 +64,17 @@ class AttemptRolloverStep(
// If statsResponse is null we already updated failed info from getIndexStatsOrUpdateInfo and can return early
statsResponse ?: return

val indexCreationDate = Instant.ofEpochMilli(clusterService.state().metaData().index(managedIndexMetaData.index).creationDate)
val numDocs = statsResponse.primaries.docs.count
val indexSize = ByteSizeValue(statsResponse.primaries.docs.totalSizeInBytes)
val indexCreationDate = clusterService.state().metaData().index(index).creationDate
val indexCreationDateInstant = Instant.ofEpochMilli(indexCreationDate)
if (indexCreationDate == -1L) {
logger.warn("$index had an indexCreationDate=-1L, cannot use for comparison")
}
val numDocs = statsResponse.primaries.docs?.count ?: 0
val indexSize = ByteSizeValue(statsResponse.primaries.docs?.totalSizeInBytes ?: 0)

if (config.evaluateConditions(indexCreationDate, numDocs, indexSize)) {
if (config.evaluateConditions(indexCreationDateInstant, numDocs, indexSize)) {
logger.info("$index rollover conditions evaluated to true [indexCreationDate=$indexCreationDate," +
" numDocs=$numDocs, indexSize=${indexSize.bytes}]")
executeRollover(alias)
} else {
stepStatus = StepStatus.CONDITION_NOT_MET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,32 @@ class AttemptTransitionStep(

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
val index = managedIndexMetaData.index
try {
if (config.transitions.isEmpty()) {
logger.info("$index transitions are empty, completing policy")
policyCompleted = true
stepStatus = StepStatus.COMPLETED
return
}

val indexCreationDate = clusterService.state().metaData().index(index).creationDate
val indexCreationDateInstant = Instant.ofEpochMilli(indexCreationDate)
if (indexCreationDate == -1L) {
logger.warn("$index had an indexCreationDate=-1L, cannot use for comparison")
}
val stepStartTime = getStepStartTime()
var numDocs: Long? = null
var indexSize: ByteSizeValue? = null

if (config.transitions.any { it.hasStatsConditions() }) {
val statsRequest = IndicesStatsRequest()
.indices(managedIndexMetaData.index).clear().docs(true)
.indices(index).clear().docs(true)
val statsResponse: IndicesStatsResponse = client.admin().indices().suspendUntil { stats(statsRequest, it) }

if (statsResponse.status != RestStatus.OK) {
logger.debug(
"Failed to get index stats for index: [${managedIndexMetaData.index}], status response: [${statsResponse.status}]"
"Failed to get index stats for index: [$index], status response: [${statsResponse.status}]"
)

stepStatus = StepStatus.FAILED
Expand All @@ -81,23 +89,25 @@ class AttemptTransitionStep(
return
}

numDocs = statsResponse.primaries.docs.count
indexSize = ByteSizeValue(statsResponse.primaries.docs.totalSizeInBytes)
// Find the first transition that evaluates to true and get the state to transition to, otherwise return null if none are true
numDocs = statsResponse.primaries.docs?.count ?: 0
indexSize = ByteSizeValue(statsResponse.primaries.docs?.totalSizeInBytes ?: 0)
}

// Find the first transition that evaluates to true and get the state to transition to, otherwise return null if none are true
stateName = config.transitions.find { it.evaluateConditions(getIndexCreationDate(), numDocs, indexSize, getStepStartTime()) }?.stateName
val message = if (stateName == null) {
stepStatus = StepStatus.CONDITION_NOT_MET
"Attempting to transition"
} else {
stateName = config.transitions.find { it.evaluateConditions(indexCreationDateInstant, numDocs, indexSize, stepStartTime) }?.stateName
val message: String
if (stateName != null) {
logger.info("$index transition conditions evaluated to true [indexCreationDate=$indexCreationDate," +
" numDocs=$numDocs, indexSize=${indexSize?.bytes},stepStartTime=${stepStartTime.toEpochMilli()}]")
stepStatus = StepStatus.COMPLETED
"Transitioning to $stateName"
message = "Transitioning to $stateName"
} else {
stepStatus = StepStatus.CONDITION_NOT_MET
message = "Attempting to transition"
}
info = mapOf("message" to message)
} catch (e: Exception) {
logger.error("Failed to transition index [index=${managedIndexMetaData.index}]", e)
logger.error("Failed to transition index [index=$index]", e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to "Failed to transition index")
val errorMessage = e.message
Expand All @@ -106,9 +116,6 @@ class AttemptTransitionStep(
}
}

private fun getIndexCreationDate(): Instant =
Instant.ofEpochMilli(clusterService.state().metaData().index(managedIndexMetaData.index).creationDate)

override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData {
return currentMetaData.copy(
policyCompleted = policyCompleted,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ fun Transition.evaluateConditions(
}

if (this.conditions.indexAge != null) {
val elapsedTime = Instant.now().toEpochMilli() - indexCreationDate.toEpochMilli()
val indexCreationDateMilli = indexCreationDate.toEpochMilli()
if (indexCreationDateMilli == -1L) return false // transitions cannot currently be ORd like rollover, so we must return here
val elapsedTime = Instant.now().toEpochMilli() - indexCreationDateMilli
return this.conditions.indexAge.millis <= elapsedTime
}

Expand Down Expand Up @@ -230,8 +232,11 @@ fun RolloverActionConfig.evaluateConditions(
}

if (this.minAge != null) {
val elapsedTime = Instant.now().toEpochMilli() - indexCreationDate.toEpochMilli()
if (this.minAge.millis <= elapsedTime) return true
val indexCreationDateMilli = indexCreationDate.toEpochMilli()
if (indexCreationDateMilli != -1L) {
val elapsedTime = Instant.now().toEpochMilli() - indexCreationDateMilli
if (this.minAge.millis <= elapsedTime) return true
}
}

if (this.minSize != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexstatemanagement.step

import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.CloseActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.close.AttemptCloseStep
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.doAnswer
import com.nhaarman.mockitokotlin2.doReturn
import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.whenever
import kotlinx.coroutines.runBlocking
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.AdminClient
import org.elasticsearch.client.Client
import org.elasticsearch.client.IndicesAdminClient
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.snapshots.SnapshotInProgressException
import org.elasticsearch.test.ESTestCase
import kotlin.IllegalArgumentException

class AttemptCloseStepTests : ESTestCase() {

private val clusterService: ClusterService = mock()

fun `test close step sets step status to completed when successful`() {
val closeIndexResponse = AcknowledgedResponse(true)
val client = getClient(getAdminClient(getIndicesAdminClient(closeIndexResponse, null)))

runBlocking {
val closeActionConfig = CloseActionConfig(0)
val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null)
val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData)
attemptCloseStep.execute()
val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData)
assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
}
}

fun `test close step sets step status to failed when not acknowledged`() {
val closeIndexResponse = AcknowledgedResponse(false)
val client = getClient(getAdminClient(getIndicesAdminClient(closeIndexResponse, null)))

runBlocking {
val closeActionConfig = CloseActionConfig(0)
val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null)
val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData)
attemptCloseStep.execute()
val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData)
assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
}
}

fun `test close step sets step status to failed when error thrown`() {
val exception = IllegalArgumentException("example")
val client = getClient(getAdminClient(getIndicesAdminClient(null, exception)))

runBlocking {
val closeActionConfig = CloseActionConfig(0)
val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null)
val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData)
attemptCloseStep.execute()
val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData)
logger.info(updatedManagedIndexMetaData)
assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
}
}

fun `test close step sets step status to condition not met when snapshot in progress error thrown`() {
val exception = SnapshotInProgressException("example")
val client = getClient(getAdminClient(getIndicesAdminClient(null, exception)))

runBlocking {
val closeActionConfig = CloseActionConfig(0)
val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null)
val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData)
attemptCloseStep.execute()
val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData)
assertEquals("Step status is not CONDITION_NOT_MET", Step.StepStatus.CONDITION_NOT_MET, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
}
}

private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient }
private fun getAdminClient(indicesAdminClient: IndicesAdminClient): AdminClient = mock { on { indices() } doReturn indicesAdminClient }
private fun getIndicesAdminClient(closeIndexResponse: AcknowledgedResponse?, exception: Exception?): IndicesAdminClient {
assertTrue("Must provide one and only one response or exception", (closeIndexResponse != null).xor(exception != null))
return mock {
doAnswer { invocationOnMock ->
val listener = invocationOnMock.getArgument<ActionListener<AcknowledgedResponse>>(1)
if (closeIndexResponse != null) listener.onResponse(closeIndexResponse)
else listener.onFailure(exception)
}.whenever(this.mock).close(any(), any())
}
}
}
Loading

0 comments on commit f420553

Please sign in to comment.