Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Adds logs, fix for index creation date -1L, nullable checks #170

Merged
merged 3 commits into from
Apr 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,6 @@ task ktlintFormat(type: JavaExec, group: "formatting") {
args "-F", "src/**/*.kt"
}

compileKotlin { kotlinOptions.freeCompilerArgs = ['-Xjsr305=strict'] }

apply from: 'build-tools/pkgbuild.gradle'
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,15 @@ class WaitForForceMergeStep(
val statsResponse: IndicesStatsResponse = client.admin().indices().suspendUntil { stats(statsRequest, it) }

if (statsResponse.status == RestStatus.OK) {
return statsResponse.shards.count { it.stats.segments.count > maxNumSegments }
return statsResponse.shards.count {
val count = it.stats.segments?.count
if (count == null) {
logger.warn("$indexName wait for force merge had null segments")
false
} else {
count > maxNumSegments
}
}
}

logger.debug("Failed to get index stats for index: [$indexName], status response: [${statsResponse.status}]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ class AttemptRolloverStep(

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
val index = managedIndexMetaData.index
// If we have already rolled over this index then fail as we only allow an index to be rolled over once
if (managedIndexMetaData.rolledOver == true) {
logger.warn("$index was already rolled over, cannot execute rollover step")
stepStatus = StepStatus.FAILED
info = mapOf("message" to "This index has already been rolled over")
return
Expand All @@ -64,11 +66,17 @@ class AttemptRolloverStep(
// If statsResponse is null we already updated failed info from getIndexStatsOrUpdateInfo and can return early
statsResponse ?: return

val indexCreationDate = Instant.ofEpochMilli(clusterService.state().metaData().index(managedIndexMetaData.index).creationDate)
val numDocs = statsResponse.primaries.docs.count
val indexSize = ByteSizeValue(statsResponse.primaries.docs.totalSizeInBytes)
val indexCreationDate = clusterService.state().metaData().index(index).creationDate
val indexCreationDateInstant = Instant.ofEpochMilli(indexCreationDate)
if (indexCreationDate == -1L) {
logger.warn("$index had an indexCreationDate=-1L, cannot use for comparison")
}
val numDocs = statsResponse.primaries.docs?.count ?: 0
val indexSize = ByteSizeValue(statsResponse.primaries.docs?.totalSizeInBytes ?: 0)

if (config.evaluateConditions(indexCreationDate, numDocs, indexSize)) {
if (config.evaluateConditions(indexCreationDateInstant, numDocs, indexSize)) {
logger.info("$index rollover conditions evaluated to true [indexCreationDate=$indexCreationDate," +
dbbaughe marked this conversation as resolved.
Show resolved Hide resolved
" numDocs=$numDocs, indexSize=${indexSize.bytes}]")
executeRollover(alias)
} else {
stepStatus = StepStatus.CONDITION_NOT_MET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,32 @@ class AttemptTransitionStep(

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
val index = managedIndexMetaData.index
try {
if (config.transitions.isEmpty()) {
logger.info("$index transitions are empty, completing policy")
dbbaughe marked this conversation as resolved.
Show resolved Hide resolved
policyCompleted = true
stepStatus = StepStatus.COMPLETED
return
}

val indexCreationDate = clusterService.state().metaData().index(index).creationDate
val indexCreationDateInstant = Instant.ofEpochMilli(indexCreationDate)
if (indexCreationDate == -1L) {
logger.warn("$index had an indexCreationDate=-1L, cannot use for comparison")
dbbaughe marked this conversation as resolved.
Show resolved Hide resolved
}
val stepStartTime = getStepStartTime()
var numDocs: Long? = null
var indexSize: ByteSizeValue? = null

if (config.transitions.any { it.hasStatsConditions() }) {
val statsRequest = IndicesStatsRequest()
.indices(managedIndexMetaData.index).clear().docs(true)
.indices(index).clear().docs(true)
val statsResponse: IndicesStatsResponse = client.admin().indices().suspendUntil { stats(statsRequest, it) }

if (statsResponse.status != RestStatus.OK) {
logger.debug(
"Failed to get index stats for index: [${managedIndexMetaData.index}], status response: [${statsResponse.status}]"
"Failed to get index stats for index: [$index], status response: [${statsResponse.status}]"
)

stepStatus = StepStatus.FAILED
Expand All @@ -83,23 +91,25 @@ class AttemptTransitionStep(
return
}

numDocs = statsResponse.primaries.docs.count
indexSize = ByteSizeValue(statsResponse.primaries.docs.totalSizeInBytes)
// Find the first transition that evaluates to true and get the state to transition to, otherwise return null if none are true
numDocs = statsResponse.primaries.docs?.count ?: 0
indexSize = ByteSizeValue(statsResponse.primaries.docs?.totalSizeInBytes ?: 0)
}

// Find the first transition that evaluates to true and get the state to transition to, otherwise return null if none are true
stateName = config.transitions.find { it.evaluateConditions(getIndexCreationDate(), numDocs, indexSize, getStepStartTime()) }?.stateName
val message = if (stateName == null) {
stepStatus = StepStatus.CONDITION_NOT_MET
"Attempting to transition"
} else {
stateName = config.transitions.find { it.evaluateConditions(indexCreationDateInstant, numDocs, indexSize, stepStartTime) }?.stateName
val message: String
if (stateName != null) {
logger.info("$index transition conditions evaluated to true [indexCreationDate=$indexCreationDate," +
dbbaughe marked this conversation as resolved.
Show resolved Hide resolved
" numDocs=$numDocs, indexSize=${indexSize?.bytes},stepStartTime=${stepStartTime.toEpochMilli()}]")
stepStatus = StepStatus.COMPLETED
"Transitioning to $stateName"
message = "Transitioning to $stateName"
} else {
stepStatus = StepStatus.CONDITION_NOT_MET
message = "Attempting to transition"
}
info = mapOf("message" to message)
} catch (e: Exception) {
logger.error("Failed to transition index [index=${managedIndexMetaData.index}]", e)
logger.error("Failed to transition index [index=$index]", e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to "Failed to transition index")
val errorMessage = e.message
Expand All @@ -108,9 +118,6 @@ class AttemptTransitionStep(
}
}

private fun getIndexCreationDate(): Instant =
Instant.ofEpochMilli(clusterService.state().metaData().index(managedIndexMetaData.index).creationDate)

override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData {
return currentMetaData.copy(
policyCompleted = policyCompleted,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ fun Transition.evaluateConditions(
}

if (this.conditions.indexAge != null) {
val elapsedTime = Instant.now().toEpochMilli() - indexCreationDate.toEpochMilli()
val indexCreationDateMilli = indexCreationDate.toEpochMilli()
if (indexCreationDateMilli == -1L) return false // transitions cannot currently be ORd like rollover, so we must return here
val elapsedTime = Instant.now().toEpochMilli() - indexCreationDateMilli
return this.conditions.indexAge.millis <= elapsedTime
}

Expand Down Expand Up @@ -230,8 +232,11 @@ fun RolloverActionConfig.evaluateConditions(
}

if (this.minAge != null) {
val elapsedTime = Instant.now().toEpochMilli() - indexCreationDate.toEpochMilli()
if (this.minAge.millis <= elapsedTime) return true
val indexCreationDateMilli = indexCreationDate.toEpochMilli()
if (indexCreationDateMilli != -1L) {
val elapsedTime = Instant.now().toEpochMilli() - indexCreationDateMilli
if (this.minAge.millis <= elapsedTime) return true
}
}

if (this.minSize != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,25 @@
package com.amazon.opendistroforelasticsearch.indexstatemanagement.util

import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementPlugin.Companion.INDEX_STATE_MANAGEMENT_INDEX
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Conditions
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Transition
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.RolloverActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.coordinator.SweptManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomChangePolicy
import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomClusterStateManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomSweptManagedIndexConfig
import org.elasticsearch.action.delete.DeleteRequest
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.common.bytes.BytesReference
import org.elasticsearch.common.unit.ByteSizeValue
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler
import org.elasticsearch.common.xcontent.XContentHelper
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.test.ESTestCase
import java.time.Instant

class ManagedIndexUtilsTests : ESTestCase() {

Expand Down Expand Up @@ -173,6 +179,69 @@ class ManagedIndexUtilsTests : ESTestCase() {
assertEquals("Wrong index being searched", listOf(INDEX_STATE_MANAGEMENT_INDEX), indices)
}

fun `test rollover action config evaluate conditions`() {
val noConditionsConfig = RolloverActionConfig(minSize = null, minDocs = null, minAge = null, index = 0)
assertTrue("No conditions should always pass", noConditionsConfig
.evaluateConditions(indexCreationDate = Instant.ofEpochMilli(-1L), numDocs = 0, indexSize = ByteSizeValue(0)))
assertTrue("No conditions should always pass", noConditionsConfig
.evaluateConditions(indexCreationDate = Instant.now(), numDocs = 5, indexSize = ByteSizeValue(5)))
assertTrue("No conditions should always pass", noConditionsConfig
.evaluateConditions(indexCreationDate = Instant.now().minusSeconds(600), numDocs = 5, indexSize = ByteSizeValue(5)))
assertTrue("No conditions should always pass", noConditionsConfig
.evaluateConditions(indexCreationDate = Instant.now().plusSeconds(600), numDocs = 5, indexSize = ByteSizeValue(5)))

val minSizeConfig = RolloverActionConfig(minSize = ByteSizeValue(5), minDocs = null, minAge = null, index = 0)
assertFalse("Less bytes should not pass", minSizeConfig
.evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue.ZERO))
assertTrue("Equal bytes should pass", minSizeConfig
.evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue(5)))
assertTrue("More bytes should pass", minSizeConfig
.evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue(10)))

val minDocsConfig = RolloverActionConfig(minSize = null, minDocs = 5, minAge = null, index = 0)
assertFalse("Less docs should not pass", minDocsConfig
.evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue.ZERO))
assertTrue("Equal docs should pass", minDocsConfig
.evaluateConditions(indexCreationDate = Instant.now(), numDocs = 5, indexSize = ByteSizeValue.ZERO))
assertTrue("More docs should pass", minDocsConfig
.evaluateConditions(indexCreationDate = Instant.now(), numDocs = 10, indexSize = ByteSizeValue.ZERO))

val minAgeConfig = RolloverActionConfig(minSize = null, minDocs = null, minAge = TimeValue.timeValueSeconds(5), index = 0)
assertFalse("Index age that is too young should not pass", minAgeConfig
.evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue.ZERO))
assertTrue("Index age that is older should pass", minAgeConfig
.evaluateConditions(indexCreationDate = Instant.now().minusSeconds(10), numDocs = 0, indexSize = ByteSizeValue.ZERO))
assertFalse("Index age that is -1L should not pass", minAgeConfig
.evaluateConditions(indexCreationDate = Instant.ofEpochMilli(-1L), numDocs = 0, indexSize = ByteSizeValue.ZERO))

val multiConfig = RolloverActionConfig(minSize = ByteSizeValue(1), minDocs = 1, minAge = TimeValue.timeValueSeconds(5), index = 0)
assertFalse("No conditions met should not pass", multiConfig
.evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue.ZERO))
assertFalse("Multi condition, index age -1L should not pass", multiConfig
.evaluateConditions(indexCreationDate = Instant.ofEpochMilli(-1L), numDocs = 0, indexSize = ByteSizeValue.ZERO))
assertTrue("Multi condition, age should pass", multiConfig
.evaluateConditions(indexCreationDate = Instant.now().minusSeconds(10), numDocs = 0, indexSize = ByteSizeValue.ZERO))
assertTrue("Multi condition, docs should pass", multiConfig
.evaluateConditions(indexCreationDate = Instant.now(), numDocs = 2, indexSize = ByteSizeValue.ZERO))
assertTrue("Multi condition, size should pass", multiConfig
.evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue(2)))
}

fun `test transition evaluate conditions`() {
val emptyTransition = Transition(stateName = "some_state", conditions = null)
assertTrue("No conditions should pass", emptyTransition
.evaluateConditions(indexCreationDate = Instant.now(), numDocs = null, indexSize = null, transitionStartTime = Instant.now()))

val timeTransition = Transition(stateName = "some_state",
conditions = Conditions(indexAge = TimeValue.timeValueSeconds(5), docCount = null, size = null, cron = null))
assertFalse("Index age that is too young should not pass", timeTransition
.evaluateConditions(indexCreationDate = Instant.now(), numDocs = null, indexSize = null, transitionStartTime = Instant.now()))
assertTrue("Index age that is older should pass", timeTransition
.evaluateConditions(indexCreationDate = Instant.now().minusSeconds(10), numDocs = null, indexSize = null, transitionStartTime = Instant.now()))
assertFalse("Index age that is -1L should not pass", timeTransition
.evaluateConditions(indexCreationDate = Instant.ofEpochMilli(-1L), numDocs = null, indexSize = null, transitionStartTime = Instant.now()))
}

private fun contentParser(bytesReference: BytesReference): XContentParser {
return XContentHelper.createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE,
bytesReference, XContentType.JSON)
Expand Down