Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Backport bug fixes #191

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ buildscript {

plugins {
id 'nebula.ospackage' version "5.3.0"
id "com.dorongold.task-tree" version "1.5"
}

apply plugin: 'java'
Expand Down Expand Up @@ -71,6 +72,10 @@ detekt {
buildUponDefaultConfig = true
}

configurations.testCompile {
exclude module: "securemock"
}

dependencies {
compileOnly "org.elasticsearch:elasticsearch:${es_version}"
compileOnly "com.amazon.opendistroforelasticsearch:opendistro-job-scheduler-spi:1.0.0.2"
Expand All @@ -82,6 +87,7 @@ dependencies {

testCompile "org.elasticsearch.test:framework:${es_version}"
testCompile "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0"

ktlint "com.pinterest:ktlint:0.33.0"
}
Expand Down Expand Up @@ -189,4 +195,6 @@ task ktlintFormat(type: JavaExec, group: "formatting") {
args "-F", "src/**/*.kt"
}

compileKotlin { kotlinOptions.freeCompilerArgs = ['-Xjsr305=strict'] }

apply from: 'build-tools/pkgbuild.gradle'
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexRequest
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.snapshots.SnapshotInProgressException

class AttemptCloseStep(
val clusterService: ClusterService,
Expand All @@ -39,21 +40,27 @@ class AttemptCloseStep(

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
val index = managedIndexMetaData.index
try {
logger.info("Executing close on ${managedIndexMetaData.index}")
logger.info("Executing close on $index")
val closeIndexRequest = CloseIndexRequest()
.indices(managedIndexMetaData.index)
.indices(index)

val response: AcknowledgedResponse = client.admin().indices().suspendUntil { close(closeIndexRequest, it) }
logger.info("Close index for $index was acknowledged=${response.isAcknowledged}")
if (response.isAcknowledged) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to "Successfully closed index")
} else {
stepStatus = StepStatus.FAILED
info = mapOf("message" to "Failed to close index: ${managedIndexMetaData.index}")
info = mapOf("message" to "Failed to close index")
}
} catch (e: SnapshotInProgressException) {
logger.warn("Failed to close index [index=$index] with snapshot in progress")
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to "Index had snapshot in progress, retrying closing")
} catch (e: Exception) {
logger.error("Failed to set index to close [index=${managedIndexMetaData.index}]", e)
logger.error("Failed to set index to close [index=$index]", e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to "Failed to set index to close")
val errorMessage = e.message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.snapshots.SnapshotInProgressException
import java.lang.Exception

class AttemptDeleteStep(
Expand All @@ -51,6 +52,10 @@ class AttemptDeleteStep(
stepStatus = StepStatus.FAILED
info = mapOf("message" to "Failed to delete index")
}
} catch (e: SnapshotInProgressException) {
logger.warn("Failed to delete index [index=${managedIndexMetaData.index}] with snapshot in progress")
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to "Index had snapshot in progress, retrying deletion")
} catch (e: Exception) {
logger.error("Failed to delete index [index=${managedIndexMetaData.index}]", e)
stepStatus = StepStatus.FAILED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,15 @@ class WaitForForceMergeStep(
val statsResponse: IndicesStatsResponse = client.admin().indices().suspendUntil { stats(statsRequest, it) }

if (statsResponse.status == RestStatus.OK) {
return statsResponse.shards.count { it.stats.segments.count > maxNumSegments }
return statsResponse.shards.count {
val count = it.stats.segments?.count
if (count == null) {
logger.warn("$indexName wait for force merge had null segments")
false
} else {
count > maxNumSegments
}
}
}

logger.debug("Failed to get index stats for index: [$indexName], status response: [${statsResponse.status}]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ class AttemptRolloverStep(

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
val index = managedIndexMetaData.index
// If we have already rolled over this index then fail as we only allow an index to be rolled over once
if (managedIndexMetaData.rolledOver == true) {
logger.warn("$index was already rolled over, cannot execute rollover step")
stepStatus = StepStatus.FAILED
info = mapOf("message" to "This index has already been rolled over")
return
Expand All @@ -62,11 +64,17 @@ class AttemptRolloverStep(
// If statsResponse is null we already updated failed info from getIndexStatsOrUpdateInfo and can return early
statsResponse ?: return

val indexCreationDate = Instant.ofEpochMilli(clusterService.state().metaData().index(managedIndexMetaData.index).creationDate)
val numDocs = statsResponse.primaries.docs.count
val indexSize = ByteSizeValue(statsResponse.primaries.docs.totalSizeInBytes)
val indexCreationDate = clusterService.state().metaData().index(index).creationDate
val indexCreationDateInstant = Instant.ofEpochMilli(indexCreationDate)
if (indexCreationDate == -1L) {
logger.warn("$index had an indexCreationDate=-1L, cannot use for comparison")
}
val numDocs = statsResponse.primaries.docs?.count ?: 0
val indexSize = ByteSizeValue(statsResponse.primaries.docs?.totalSizeInBytes ?: 0)

if (config.evaluateConditions(indexCreationDate, numDocs, indexSize)) {
if (config.evaluateConditions(indexCreationDateInstant, numDocs, indexSize)) {
logger.info("$index rollover conditions evaluated to true [indexCreationDate=$indexCreationDate," +
" numDocs=$numDocs, indexSize=${indexSize.bytes}]")
executeRollover(alias)
} else {
stepStatus = StepStatus.CONDITION_NOT_MET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,32 @@ class AttemptTransitionStep(

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
val index = managedIndexMetaData.index
try {
if (config.transitions.isEmpty()) {
logger.info("$index transitions are empty, completing policy")
policyCompleted = true
stepStatus = StepStatus.COMPLETED
return
}

val indexCreationDate = clusterService.state().metaData().index(index).creationDate
val indexCreationDateInstant = Instant.ofEpochMilli(indexCreationDate)
if (indexCreationDate == -1L) {
logger.warn("$index had an indexCreationDate=-1L, cannot use for comparison")
}
val stepStartTime = getStepStartTime()
var numDocs: Long? = null
var indexSize: ByteSizeValue? = null

if (config.transitions.any { it.hasStatsConditions() }) {
val statsRequest = IndicesStatsRequest()
.indices(managedIndexMetaData.index).clear().docs(true)
.indices(index).clear().docs(true)
val statsResponse: IndicesStatsResponse = client.admin().indices().suspendUntil { stats(statsRequest, it) }

if (statsResponse.status != RestStatus.OK) {
logger.debug(
"Failed to get index stats for index: [${managedIndexMetaData.index}], status response: [${statsResponse.status}]"
"Failed to get index stats for index: [$index], status response: [${statsResponse.status}]"
)

stepStatus = StepStatus.FAILED
Expand All @@ -81,23 +89,25 @@ class AttemptTransitionStep(
return
}

numDocs = statsResponse.primaries.docs.count
indexSize = ByteSizeValue(statsResponse.primaries.docs.totalSizeInBytes)
// Find the first transition that evaluates to true and get the state to transition to, otherwise return null if none are true
numDocs = statsResponse.primaries.docs?.count ?: 0
indexSize = ByteSizeValue(statsResponse.primaries.docs?.totalSizeInBytes ?: 0)
}

// Find the first transition that evaluates to true and get the state to transition to, otherwise return null if none are true
stateName = config.transitions.find { it.evaluateConditions(getIndexCreationDate(), numDocs, indexSize, getStepStartTime()) }?.stateName
val message = if (stateName == null) {
stepStatus = StepStatus.CONDITION_NOT_MET
"Attempting to transition"
} else {
stateName = config.transitions.find { it.evaluateConditions(indexCreationDateInstant, numDocs, indexSize, stepStartTime) }?.stateName
val message: String
if (stateName != null) {
logger.info("$index transition conditions evaluated to true [indexCreationDate=$indexCreationDate," +
" numDocs=$numDocs, indexSize=${indexSize?.bytes},stepStartTime=${stepStartTime.toEpochMilli()}]")
stepStatus = StepStatus.COMPLETED
"Transitioning to $stateName"
message = "Transitioning to $stateName"
} else {
stepStatus = StepStatus.CONDITION_NOT_MET
message = "Attempting to transition"
}
info = mapOf("message" to message)
} catch (e: Exception) {
logger.error("Failed to transition index [index=${managedIndexMetaData.index}]", e)
logger.error("Failed to transition index [index=$index]", e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to "Failed to transition index")
val errorMessage = e.message
Expand All @@ -106,9 +116,6 @@ class AttemptTransitionStep(
}
}

private fun getIndexCreationDate(): Instant =
Instant.ofEpochMilli(clusterService.state().metaData().index(managedIndexMetaData.index).creationDate)

override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData {
return currentMetaData.copy(
policyCompleted = policyCompleted,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ fun Transition.evaluateConditions(
}

if (this.conditions.indexAge != null) {
val elapsedTime = Instant.now().toEpochMilli() - indexCreationDate.toEpochMilli()
val indexCreationDateMilli = indexCreationDate.toEpochMilli()
if (indexCreationDateMilli == -1L) return false // transitions cannot currently be ORd like rollover, so we must return here
val elapsedTime = Instant.now().toEpochMilli() - indexCreationDateMilli
return this.conditions.indexAge.millis <= elapsedTime
}

Expand Down Expand Up @@ -230,8 +232,11 @@ fun RolloverActionConfig.evaluateConditions(
}

if (this.minAge != null) {
val elapsedTime = Instant.now().toEpochMilli() - indexCreationDate.toEpochMilli()
if (this.minAge.millis <= elapsedTime) return true
val indexCreationDateMilli = indexCreationDate.toEpochMilli()
if (indexCreationDateMilli != -1L) {
val elapsedTime = Instant.now().toEpochMilli() - indexCreationDateMilli
if (this.minAge.millis <= elapsedTime) return true
}
}

if (this.minSize != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexstatemanagement.step

import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.CloseActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.close.AttemptCloseStep
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.doAnswer
import com.nhaarman.mockitokotlin2.doReturn
import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.whenever
import kotlinx.coroutines.runBlocking
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.AdminClient
import org.elasticsearch.client.Client
import org.elasticsearch.client.IndicesAdminClient
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.snapshots.SnapshotInProgressException
import org.elasticsearch.test.ESTestCase
import kotlin.IllegalArgumentException

class AttemptCloseStepTests : ESTestCase() {

private val clusterService: ClusterService = mock()

fun `test close step sets step status to completed when successful`() {
val closeIndexResponse = AcknowledgedResponse(true)
val client = getClient(getAdminClient(getIndicesAdminClient(closeIndexResponse, null)))

runBlocking {
val closeActionConfig = CloseActionConfig(0)
val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null)
val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData)
attemptCloseStep.execute()
val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData)
assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
}
}

fun `test close step sets step status to failed when not acknowledged`() {
val closeIndexResponse = AcknowledgedResponse(false)
val client = getClient(getAdminClient(getIndicesAdminClient(closeIndexResponse, null)))

runBlocking {
val closeActionConfig = CloseActionConfig(0)
val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null)
val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData)
attemptCloseStep.execute()
val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData)
assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
}
}

fun `test close step sets step status to failed when error thrown`() {
val exception = IllegalArgumentException("example")
val client = getClient(getAdminClient(getIndicesAdminClient(null, exception)))

runBlocking {
val closeActionConfig = CloseActionConfig(0)
val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null)
val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData)
attemptCloseStep.execute()
val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData)
logger.info(updatedManagedIndexMetaData)
assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
}
}

fun `test close step sets step status to condition not met when snapshot in progress error thrown`() {
val exception = SnapshotInProgressException("example")
val client = getClient(getAdminClient(getIndicesAdminClient(null, exception)))

runBlocking {
val closeActionConfig = CloseActionConfig(0)
val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null)
val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData)
attemptCloseStep.execute()
val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData)
assertEquals("Step status is not CONDITION_NOT_MET", Step.StepStatus.CONDITION_NOT_MET, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
}
}

private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient }
private fun getAdminClient(indicesAdminClient: IndicesAdminClient): AdminClient = mock { on { indices() } doReturn indicesAdminClient }
private fun getIndicesAdminClient(closeIndexResponse: AcknowledgedResponse?, exception: Exception?): IndicesAdminClient {
assertTrue("Must provide one and only one response or exception", (closeIndexResponse != null).xor(exception != null))
return mock {
doAnswer { invocationOnMock ->
val listener = invocationOnMock.getArgument<ActionListener<AcknowledgedResponse>>(1)
if (closeIndexResponse != null) listener.onResponse(closeIndexResponse)
else listener.onFailure(exception)
}.whenever(this.mock).close(any(), any())
}
}
}
Loading