diff --git a/build.gradle b/build.gradle index 5489c5c35..ce72e0775 100644 --- a/build.gradle +++ b/build.gradle @@ -169,4 +169,6 @@ task ktlintFormat(type: JavaExec, group: "formatting") { args "-F", "src/**/*.kt" } +compileKotlin { kotlinOptions.freeCompilerArgs = ['-Xjsr305=strict'] } + apply from: 'build-tools/pkgbuild.gradle' diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt index 1656a3046..04d3989f9 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt @@ -114,7 +114,15 @@ class WaitForForceMergeStep( val statsResponse: IndicesStatsResponse = client.admin().indices().suspendUntil { stats(statsRequest, it) } if (statsResponse.status == RestStatus.OK) { - return statsResponse.shards.count { it.stats.segments.count > maxNumSegments } + return statsResponse.shards.count { + val count = it.stats.segments?.count + if (count == null) { + logger.warn("$indexName wait for force merge had null segments") + false + } else { + count > maxNumSegments + } + } } logger.debug("Failed to get index stats for index: [$indexName], status response: [${statsResponse.status}]") diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt index f22b20145..56410dfa9 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt @@ -49,8 +49,10 @@ class AttemptRolloverStep( @Suppress("TooGenericExceptionCaught") override suspend fun execute() { + val index = managedIndexMetaData.index // If we have already rolled over this index then fail as we only allow an index to be rolled over once if (managedIndexMetaData.rolledOver == true) { + logger.warn("$index was already rolled over, cannot execute rollover step") stepStatus = StepStatus.FAILED info = mapOf("message" to "This index has already been rolled over") return @@ -64,11 +66,17 @@ class AttemptRolloverStep( // If statsResponse is null we already updated failed info from getIndexStatsOrUpdateInfo and can return early statsResponse ?: return - val indexCreationDate = Instant.ofEpochMilli(clusterService.state().metaData().index(managedIndexMetaData.index).creationDate) - val numDocs = statsResponse.primaries.docs.count - val indexSize = ByteSizeValue(statsResponse.primaries.docs.totalSizeInBytes) + val indexCreationDate = clusterService.state().metaData().index(index).creationDate + val indexCreationDateInstant = Instant.ofEpochMilli(indexCreationDate) + if (indexCreationDate == -1L) { + logger.warn("$index had an indexCreationDate=-1L, cannot use for comparison") + } + val numDocs = statsResponse.primaries.docs?.count ?: 0 + val indexSize = ByteSizeValue(statsResponse.primaries.docs?.totalSizeInBytes ?: 0) - if (config.evaluateConditions(indexCreationDate, numDocs, indexSize)) { + if (config.evaluateConditions(indexCreationDateInstant, numDocs, indexSize)) { + logger.info("$index rollover conditions evaluated to true [indexCreationDate=$indexCreationDate," + + " numDocs=$numDocs, indexSize=${indexSize.bytes}]") executeRollover(alias) } else { stepStatus = StepStatus.CONDITION_NOT_MET diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/transition/AttemptTransitionStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/transition/AttemptTransitionStep.kt index 4f7bd34fa..17da79078 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/transition/AttemptTransitionStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/transition/AttemptTransitionStep.kt @@ -55,24 +55,32 @@ class AttemptTransitionStep( @Suppress("TooGenericExceptionCaught") override suspend fun execute() { + val index = managedIndexMetaData.index try { if (config.transitions.isEmpty()) { + logger.info("$index transitions are empty, completing policy") policyCompleted = true stepStatus = StepStatus.COMPLETED return } + val indexCreationDate = clusterService.state().metaData().index(index).creationDate + val indexCreationDateInstant = Instant.ofEpochMilli(indexCreationDate) + if (indexCreationDate == -1L) { + logger.warn("$index had an indexCreationDate=-1L, cannot use for comparison") + } + val stepStartTime = getStepStartTime() var numDocs: Long? = null var indexSize: ByteSizeValue? = null if (config.transitions.any { it.hasStatsConditions() }) { val statsRequest = IndicesStatsRequest() - .indices(managedIndexMetaData.index).clear().docs(true) + .indices(index).clear().docs(true) val statsResponse: IndicesStatsResponse = client.admin().indices().suspendUntil { stats(statsRequest, it) } if (statsResponse.status != RestStatus.OK) { logger.debug( - "Failed to get index stats for index: [${managedIndexMetaData.index}], status response: [${statsResponse.status}]" + "Failed to get index stats for index: [$index], status response: [${statsResponse.status}]" ) stepStatus = StepStatus.FAILED @@ -83,23 +91,25 @@ class AttemptTransitionStep( return } - numDocs = statsResponse.primaries.docs.count - indexSize = ByteSizeValue(statsResponse.primaries.docs.totalSizeInBytes) - // Find the first transition that evaluates to true and get the state to transition to, otherwise return null if none are true + numDocs = statsResponse.primaries.docs?.count ?: 0 + indexSize = ByteSizeValue(statsResponse.primaries.docs?.totalSizeInBytes ?: 0) } // Find the first transition that evaluates to true and get the state to transition to, otherwise return null if none are true - stateName = config.transitions.find { it.evaluateConditions(getIndexCreationDate(), numDocs, indexSize, getStepStartTime()) }?.stateName - val message = if (stateName == null) { - stepStatus = StepStatus.CONDITION_NOT_MET - "Attempting to transition" - } else { + stateName = config.transitions.find { it.evaluateConditions(indexCreationDateInstant, numDocs, indexSize, stepStartTime) }?.stateName + val message: String + if (stateName != null) { + logger.info("$index transition conditions evaluated to true [indexCreationDate=$indexCreationDate," + + " numDocs=$numDocs, indexSize=${indexSize?.bytes},stepStartTime=${stepStartTime.toEpochMilli()}]") stepStatus = StepStatus.COMPLETED - "Transitioning to $stateName" + message = "Transitioning to $stateName" + } else { + stepStatus = StepStatus.CONDITION_NOT_MET + message = "Attempting to transition" } info = mapOf("message" to message) } catch (e: Exception) { - logger.error("Failed to transition index [index=${managedIndexMetaData.index}]", e) + logger.error("Failed to transition index [index=$index]", e) stepStatus = StepStatus.FAILED val mutableInfo = mutableMapOf("message" to "Failed to transition index") val errorMessage = e.message @@ -108,9 +118,6 @@ class AttemptTransitionStep( } } - private fun getIndexCreationDate(): Instant = - Instant.ofEpochMilli(clusterService.state().metaData().index(managedIndexMetaData.index).creationDate) - override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData { return currentMetaData.copy( policyCompleted = policyCompleted, diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtils.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtils.kt index d98b7028f..f203b035d 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtils.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtils.kt @@ -193,7 +193,9 @@ fun Transition.evaluateConditions( } if (this.conditions.indexAge != null) { - val elapsedTime = Instant.now().toEpochMilli() - indexCreationDate.toEpochMilli() + val indexCreationDateMilli = indexCreationDate.toEpochMilli() + if (indexCreationDateMilli == -1L) return false // transitions cannot currently be ORd like rollover, so we must return here + val elapsedTime = Instant.now().toEpochMilli() - indexCreationDateMilli return this.conditions.indexAge.millis <= elapsedTime } @@ -230,8 +232,11 @@ fun RolloverActionConfig.evaluateConditions( } if (this.minAge != null) { - val elapsedTime = Instant.now().toEpochMilli() - indexCreationDate.toEpochMilli() - if (this.minAge.millis <= elapsedTime) return true + val indexCreationDateMilli = indexCreationDate.toEpochMilli() + if (indexCreationDateMilli != -1L) { + val elapsedTime = Instant.now().toEpochMilli() - indexCreationDateMilli + if (this.minAge.millis <= elapsedTime) return true + } } if (this.minSize != null) { diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtilsTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtilsTests.kt index d0e819495..2ae64bc00 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtilsTests.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtilsTests.kt @@ -16,7 +16,10 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.util import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementPlugin.Companion.INDEX_STATE_MANAGEMENT_INDEX +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Conditions import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Transition +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.RolloverActionConfig import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.coordinator.SweptManagedIndexConfig import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomChangePolicy import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomClusterStateManagedIndexConfig @@ -24,11 +27,14 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomSweptMan import org.elasticsearch.action.delete.DeleteRequest import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.common.bytes.BytesReference +import org.elasticsearch.common.unit.ByteSizeValue +import org.elasticsearch.common.unit.TimeValue import org.elasticsearch.common.xcontent.LoggingDeprecationHandler import org.elasticsearch.common.xcontent.XContentHelper import org.elasticsearch.common.xcontent.XContentParser import org.elasticsearch.common.xcontent.XContentType import org.elasticsearch.test.ESTestCase +import java.time.Instant class ManagedIndexUtilsTests : ESTestCase() { @@ -173,6 +179,69 @@ class ManagedIndexUtilsTests : ESTestCase() { assertEquals("Wrong index being searched", listOf(INDEX_STATE_MANAGEMENT_INDEX), indices) } + fun `test rollover action config evaluate conditions`() { + val noConditionsConfig = RolloverActionConfig(minSize = null, minDocs = null, minAge = null, index = 0) + assertTrue("No conditions should always pass", noConditionsConfig + .evaluateConditions(indexCreationDate = Instant.ofEpochMilli(-1L), numDocs = 0, indexSize = ByteSizeValue(0))) + assertTrue("No conditions should always pass", noConditionsConfig + .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 5, indexSize = ByteSizeValue(5))) + assertTrue("No conditions should always pass", noConditionsConfig + .evaluateConditions(indexCreationDate = Instant.now().minusSeconds(600), numDocs = 5, indexSize = ByteSizeValue(5))) + assertTrue("No conditions should always pass", noConditionsConfig + .evaluateConditions(indexCreationDate = Instant.now().plusSeconds(600), numDocs = 5, indexSize = ByteSizeValue(5))) + + val minSizeConfig = RolloverActionConfig(minSize = ByteSizeValue(5), minDocs = null, minAge = null, index = 0) + assertFalse("Less bytes should not pass", minSizeConfig + .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + assertTrue("Equal bytes should pass", minSizeConfig + .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue(5))) + assertTrue("More bytes should pass", minSizeConfig + .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue(10))) + + val minDocsConfig = RolloverActionConfig(minSize = null, minDocs = 5, minAge = null, index = 0) + assertFalse("Less docs should not pass", minDocsConfig + .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + assertTrue("Equal docs should pass", minDocsConfig + .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 5, indexSize = ByteSizeValue.ZERO)) + assertTrue("More docs should pass", minDocsConfig + .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 10, indexSize = ByteSizeValue.ZERO)) + + val minAgeConfig = RolloverActionConfig(minSize = null, minDocs = null, minAge = TimeValue.timeValueSeconds(5), index = 0) + assertFalse("Index age that is too young should not pass", minAgeConfig + .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + assertTrue("Index age that is older should pass", minAgeConfig + .evaluateConditions(indexCreationDate = Instant.now().minusSeconds(10), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + assertFalse("Index age that is -1L should not pass", minAgeConfig + .evaluateConditions(indexCreationDate = Instant.ofEpochMilli(-1L), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + + val multiConfig = RolloverActionConfig(minSize = ByteSizeValue(1), minDocs = 1, minAge = TimeValue.timeValueSeconds(5), index = 0) + assertFalse("No conditions met should not pass", multiConfig + .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + assertFalse("Multi condition, index age -1L should not pass", multiConfig + .evaluateConditions(indexCreationDate = Instant.ofEpochMilli(-1L), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + assertTrue("Multi condition, age should pass", multiConfig + .evaluateConditions(indexCreationDate = Instant.now().minusSeconds(10), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + assertTrue("Multi condition, docs should pass", multiConfig + .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 2, indexSize = ByteSizeValue.ZERO)) + assertTrue("Multi condition, size should pass", multiConfig + .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue(2))) + } + + fun `test transition evaluate conditions`() { + val emptyTransition = Transition(stateName = "some_state", conditions = null) + assertTrue("No conditions should pass", emptyTransition + .evaluateConditions(indexCreationDate = Instant.now(), numDocs = null, indexSize = null, transitionStartTime = Instant.now())) + + val timeTransition = Transition(stateName = "some_state", + conditions = Conditions(indexAge = TimeValue.timeValueSeconds(5), docCount = null, size = null, cron = null)) + assertFalse("Index age that is too young should not pass", timeTransition + .evaluateConditions(indexCreationDate = Instant.now(), numDocs = null, indexSize = null, transitionStartTime = Instant.now())) + assertTrue("Index age that is older should pass", timeTransition + .evaluateConditions(indexCreationDate = Instant.now().minusSeconds(10), numDocs = null, indexSize = null, transitionStartTime = Instant.now())) + assertFalse("Index age that is -1L should not pass", timeTransition + .evaluateConditions(indexCreationDate = Instant.ofEpochMilli(-1L), numDocs = null, indexSize = null, transitionStartTime = Instant.now())) + } + private fun contentParser(bytesReference: BytesReference): XContentParser { return XContentHelper.createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, bytesReference, XContentType.JSON)