Skip to content

Commit

Permalink
Adds logs, fix for index creation date -1L, nullable checks (opendist…
Browse files Browse the repository at this point in the history
…ro-for-elasticsearch#170)

* Index creation_date of -1L should evaluate to false, adds extra logs

* Adds kotlin compiler check and fixes nullable values

* Adds log
  • Loading branch information
dbbaughe committed Apr 6, 2020
1 parent 697389d commit d44ac63
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 23 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,6 @@ task ktlintFormat(type: JavaExec, group: "formatting") {
args "-F", "src/**/*.kt"
}

compileKotlin { kotlinOptions.freeCompilerArgs = ['-Xjsr305=strict'] }

apply from: 'build-tools/pkgbuild.gradle'
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,15 @@ class WaitForForceMergeStep(
val statsResponse: IndicesStatsResponse = client.admin().indices().suspendUntil { stats(statsRequest, it) }

if (statsResponse.status == RestStatus.OK) {
return statsResponse.shards.count { it.stats.segments.count > maxNumSegments }
return statsResponse.shards.count {
val count = it.stats.segments?.count
if (count == null) {
logger.warn("$indexName wait for force merge had null segments")
false
} else {
count > maxNumSegments
}
}
}

logger.debug("Failed to get index stats for index: [$indexName], status response: [${statsResponse.status}]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ class AttemptRolloverStep(

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
val index = managedIndexMetaData.index
// If we have already rolled over this index then fail as we only allow an index to be rolled over once
if (managedIndexMetaData.rolledOver == true) {
logger.warn("$index was already rolled over, cannot execute rollover step")
stepStatus = StepStatus.FAILED
info = mapOf("message" to "This index has already been rolled over")
return
Expand All @@ -64,11 +66,17 @@ class AttemptRolloverStep(
// If statsResponse is null we already updated failed info from getIndexStatsOrUpdateInfo and can return early
statsResponse ?: return

val indexCreationDate = Instant.ofEpochMilli(clusterService.state().metaData().index(managedIndexMetaData.index).creationDate)
val numDocs = statsResponse.primaries.docs.count
val indexSize = ByteSizeValue(statsResponse.primaries.docs.totalSizeInBytes)
val indexCreationDate = clusterService.state().metaData().index(index).creationDate
val indexCreationDateInstant = Instant.ofEpochMilli(indexCreationDate)
if (indexCreationDate == -1L) {
logger.warn("$index had an indexCreationDate=-1L, cannot use for comparison")
}
val numDocs = statsResponse.primaries.docs?.count ?: 0
val indexSize = ByteSizeValue(statsResponse.primaries.docs?.totalSizeInBytes ?: 0)

if (config.evaluateConditions(indexCreationDate, numDocs, indexSize)) {
if (config.evaluateConditions(indexCreationDateInstant, numDocs, indexSize)) {
logger.info("$index rollover conditions evaluated to true [indexCreationDate=$indexCreationDate," +
" numDocs=$numDocs, indexSize=${indexSize.bytes}]")
executeRollover(alias)
} else {
stepStatus = StepStatus.CONDITION_NOT_MET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,32 @@ class AttemptTransitionStep(

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
val index = managedIndexMetaData.index
try {
if (config.transitions.isEmpty()) {
logger.info("$index transitions are empty, completing policy")
policyCompleted = true
stepStatus = StepStatus.COMPLETED
return
}

val indexCreationDate = clusterService.state().metaData().index(index).creationDate
val indexCreationDateInstant = Instant.ofEpochMilli(indexCreationDate)
if (indexCreationDate == -1L) {
logger.warn("$index had an indexCreationDate=-1L, cannot use for comparison")
}
val stepStartTime = getStepStartTime()
var numDocs: Long? = null
var indexSize: ByteSizeValue? = null

if (config.transitions.any { it.hasStatsConditions() }) {
val statsRequest = IndicesStatsRequest()
.indices(managedIndexMetaData.index).clear().docs(true)
.indices(index).clear().docs(true)
val statsResponse: IndicesStatsResponse = client.admin().indices().suspendUntil { stats(statsRequest, it) }

if (statsResponse.status != RestStatus.OK) {
logger.debug(
"Failed to get index stats for index: [${managedIndexMetaData.index}], status response: [${statsResponse.status}]"
"Failed to get index stats for index: [$index], status response: [${statsResponse.status}]"
)

stepStatus = StepStatus.FAILED
Expand All @@ -83,23 +91,25 @@ class AttemptTransitionStep(
return
}

numDocs = statsResponse.primaries.docs.count
indexSize = ByteSizeValue(statsResponse.primaries.docs.totalSizeInBytes)
// Find the first transition that evaluates to true and get the state to transition to, otherwise return null if none are true
numDocs = statsResponse.primaries.docs?.count ?: 0
indexSize = ByteSizeValue(statsResponse.primaries.docs?.totalSizeInBytes ?: 0)
}

// Find the first transition that evaluates to true and get the state to transition to, otherwise return null if none are true
stateName = config.transitions.find { it.evaluateConditions(getIndexCreationDate(), numDocs, indexSize, getStepStartTime()) }?.stateName
val message = if (stateName == null) {
stepStatus = StepStatus.CONDITION_NOT_MET
"Attempting to transition"
} else {
stateName = config.transitions.find { it.evaluateConditions(indexCreationDateInstant, numDocs, indexSize, stepStartTime) }?.stateName
val message: String
if (stateName != null) {
logger.info("$index transition conditions evaluated to true [indexCreationDate=$indexCreationDate," +
" numDocs=$numDocs, indexSize=${indexSize?.bytes},stepStartTime=${stepStartTime.toEpochMilli()}]")
stepStatus = StepStatus.COMPLETED
"Transitioning to $stateName"
message = "Transitioning to $stateName"
} else {
stepStatus = StepStatus.CONDITION_NOT_MET
message = "Attempting to transition"
}
info = mapOf("message" to message)
} catch (e: Exception) {
logger.error("Failed to transition index [index=${managedIndexMetaData.index}]", e)
logger.error("Failed to transition index [index=$index]", e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to "Failed to transition index")
val errorMessage = e.message
Expand All @@ -108,9 +118,6 @@ class AttemptTransitionStep(
}
}

private fun getIndexCreationDate(): Instant =
Instant.ofEpochMilli(clusterService.state().metaData().index(managedIndexMetaData.index).creationDate)

override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData {
return currentMetaData.copy(
policyCompleted = policyCompleted,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ fun Transition.evaluateConditions(
}

if (this.conditions.indexAge != null) {
val elapsedTime = Instant.now().toEpochMilli() - indexCreationDate.toEpochMilli()
val indexCreationDateMilli = indexCreationDate.toEpochMilli()
if (indexCreationDateMilli == -1L) return false // transitions cannot currently be ORd like rollover, so we must return here
val elapsedTime = Instant.now().toEpochMilli() - indexCreationDateMilli
return this.conditions.indexAge.millis <= elapsedTime
}

Expand Down Expand Up @@ -230,8 +232,11 @@ fun RolloverActionConfig.evaluateConditions(
}

if (this.minAge != null) {
val elapsedTime = Instant.now().toEpochMilli() - indexCreationDate.toEpochMilli()
if (this.minAge.millis <= elapsedTime) return true
val indexCreationDateMilli = indexCreationDate.toEpochMilli()
if (indexCreationDateMilli != -1L) {
val elapsedTime = Instant.now().toEpochMilli() - indexCreationDateMilli
if (this.minAge.millis <= elapsedTime) return true
}
}

if (this.minSize != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,25 @@
package com.amazon.opendistroforelasticsearch.indexstatemanagement.util

import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementPlugin.Companion.INDEX_STATE_MANAGEMENT_INDEX
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Conditions
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Transition
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.RolloverActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.coordinator.SweptManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomChangePolicy
import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomClusterStateManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomSweptManagedIndexConfig
import org.elasticsearch.action.delete.DeleteRequest
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.common.bytes.BytesReference
import org.elasticsearch.common.unit.ByteSizeValue
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler
import org.elasticsearch.common.xcontent.XContentHelper
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.test.ESTestCase
import java.time.Instant

class ManagedIndexUtilsTests : ESTestCase() {

Expand Down Expand Up @@ -173,6 +179,69 @@ class ManagedIndexUtilsTests : ESTestCase() {
assertEquals("Wrong index being searched", listOf(INDEX_STATE_MANAGEMENT_INDEX), indices)
}

fun `test rollover action config evaluate conditions`() {
val noConditionsConfig = RolloverActionConfig(minSize = null, minDocs = null, minAge = null, index = 0)
assertTrue("No conditions should always pass", noConditionsConfig
.evaluateConditions(indexCreationDate = Instant.ofEpochMilli(-1L), numDocs = 0, indexSize = ByteSizeValue(0)))
assertTrue("No conditions should always pass", noConditionsConfig
.evaluateConditions(indexCreationDate = Instant.now(), numDocs = 5, indexSize = ByteSizeValue(5)))
assertTrue("No conditions should always pass", noConditionsConfig
.evaluateConditions(indexCreationDate = Instant.now().minusSeconds(600), numDocs = 5, indexSize = ByteSizeValue(5)))
assertTrue("No conditions should always pass", noConditionsConfig
.evaluateConditions(indexCreationDate = Instant.now().plusSeconds(600), numDocs = 5, indexSize = ByteSizeValue(5)))

val minSizeConfig = RolloverActionConfig(minSize = ByteSizeValue(5), minDocs = null, minAge = null, index = 0)
assertFalse("Less bytes should not pass", minSizeConfig
.evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue.ZERO))
assertTrue("Equal bytes should pass", minSizeConfig
.evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue(5)))
assertTrue("More bytes should pass", minSizeConfig
.evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue(10)))

val minDocsConfig = RolloverActionConfig(minSize = null, minDocs = 5, minAge = null, index = 0)
assertFalse("Less docs should not pass", minDocsConfig
.evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue.ZERO))
assertTrue("Equal docs should pass", minDocsConfig
.evaluateConditions(indexCreationDate = Instant.now(), numDocs = 5, indexSize = ByteSizeValue.ZERO))
assertTrue("More docs should pass", minDocsConfig
.evaluateConditions(indexCreationDate = Instant.now(), numDocs = 10, indexSize = ByteSizeValue.ZERO))

val minAgeConfig = RolloverActionConfig(minSize = null, minDocs = null, minAge = TimeValue.timeValueSeconds(5), index = 0)
assertFalse("Index age that is too young should not pass", minAgeConfig
.evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue.ZERO))
assertTrue("Index age that is older should pass", minAgeConfig
.evaluateConditions(indexCreationDate = Instant.now().minusSeconds(10), numDocs = 0, indexSize = ByteSizeValue.ZERO))
assertFalse("Index age that is -1L should not pass", minAgeConfig
.evaluateConditions(indexCreationDate = Instant.ofEpochMilli(-1L), numDocs = 0, indexSize = ByteSizeValue.ZERO))

val multiConfig = RolloverActionConfig(minSize = ByteSizeValue(1), minDocs = 1, minAge = TimeValue.timeValueSeconds(5), index = 0)
assertFalse("No conditions met should not pass", multiConfig
.evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue.ZERO))
assertFalse("Multi condition, index age -1L should not pass", multiConfig
.evaluateConditions(indexCreationDate = Instant.ofEpochMilli(-1L), numDocs = 0, indexSize = ByteSizeValue.ZERO))
assertTrue("Multi condition, age should pass", multiConfig
.evaluateConditions(indexCreationDate = Instant.now().minusSeconds(10), numDocs = 0, indexSize = ByteSizeValue.ZERO))
assertTrue("Multi condition, docs should pass", multiConfig
.evaluateConditions(indexCreationDate = Instant.now(), numDocs = 2, indexSize = ByteSizeValue.ZERO))
assertTrue("Multi condition, size should pass", multiConfig
.evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue(2)))
}

fun `test transition evaluate conditions`() {
val emptyTransition = Transition(stateName = "some_state", conditions = null)
assertTrue("No conditions should pass", emptyTransition
.evaluateConditions(indexCreationDate = Instant.now(), numDocs = null, indexSize = null, transitionStartTime = Instant.now()))

val timeTransition = Transition(stateName = "some_state",
conditions = Conditions(indexAge = TimeValue.timeValueSeconds(5), docCount = null, size = null, cron = null))
assertFalse("Index age that is too young should not pass", timeTransition
.evaluateConditions(indexCreationDate = Instant.now(), numDocs = null, indexSize = null, transitionStartTime = Instant.now()))
assertTrue("Index age that is older should pass", timeTransition
.evaluateConditions(indexCreationDate = Instant.now().minusSeconds(10), numDocs = null, indexSize = null, transitionStartTime = Instant.now()))
assertFalse("Index age that is -1L should not pass", timeTransition
.evaluateConditions(indexCreationDate = Instant.ofEpochMilli(-1L), numDocs = null, indexSize = null, transitionStartTime = Instant.now()))
}

private fun contentParser(bytesReference: BytesReference): XContentParser {
return XContentHelper.createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE,
bytesReference, XContentType.JSON)
Expand Down

0 comments on commit d44ac63

Please sign in to comment.