Skip to content

Commit

Permalink
Fixes issue where action timeout was using start_time from previous a…
Browse files Browse the repository at this point in the history
  • Loading branch information
dbbaughe committed Jan 16, 2020
1 parent 750f043 commit 05b180c
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getAction
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getStartingManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getStateToExecute
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getCompletedManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getUpdatedActionMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.hasDifferentJobInterval
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.hasTimedOut
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.hasVersionConflict
Expand Down Expand Up @@ -233,6 +234,7 @@ object ManagedIndexRunner : ScheduledJobRunner,
val state = policy.getStateToExecute(managedIndexMetaData)
val action: Action? = state?.getActionToExecute(clusterService, scriptService, client, managedIndexMetaData)
val step: Step? = action?.getStepToExecute()
val currentActionMetaData = action?.getUpdatedActionMetaData(managedIndexMetaData, state)

// If Index State Management is disabled and the current step is not null and safe to disable on
// then disable the job and return early
Expand All @@ -241,11 +243,11 @@ object ManagedIndexRunner : ScheduledJobRunner,
return
}

if (action?.hasTimedOut(managedIndexMetaData.actionMetaData) == true) {
if (action?.hasTimedOut(currentActionMetaData) == true) {
val info = mapOf("message" to "Action timed out")
logger.error("Action=${action.type.type} has timed out")
val updated = updateManagedIndexMetaData(managedIndexMetaData
.copy(actionMetaData = managedIndexMetaData.actionMetaData?.copy(failed = true), info = info))
.copy(actionMetaData = currentActionMetaData?.copy(failed = true), info = info))
if (updated) disableManagedIndexConfig(managedIndexConfig)
return
}
Expand All @@ -255,7 +257,7 @@ object ManagedIndexRunner : ScheduledJobRunner,
return
}

val shouldBackOff = action?.shouldBackoff(managedIndexMetaData.actionMetaData, action.config.configRetry)
val shouldBackOff = action?.shouldBackoff(currentActionMetaData, action.config.configRetry)
if (shouldBackOff?.first == true) {
// If we should back off then exit early.
logger.info("Backoff for retrying. Remaining time ${shouldBackOff.second}")
Expand All @@ -282,9 +284,7 @@ object ManagedIndexRunner : ScheduledJobRunner,
val startingManagedIndexMetaData = managedIndexMetaData.getStartingManagedIndexMetaData(state, action, step)
val updateResult = updateManagedIndexMetaData(startingManagedIndexMetaData)

val actionMetaData = startingManagedIndexMetaData.actionMetaData

if (updateResult && state != null && action != null && step != null && actionMetaData != null) {
if (updateResult && state != null && action != null && step != null && currentActionMetaData != null) {
// Step null check is done in getStartingManagedIndexMetaData
step.execute()
var executedManagedIndexMetaData = startingManagedIndexMetaData.getCompletedManagedIndexMetaData(action, step)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,60 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() {
)
}
}

// https://github.com/opendistro-for-elasticsearch/index-management/issues/130
fun `test action timeout doesn't bleed over into next action`() {
val indexName = "${testIndexName}_index_1"
val policyID = "${testIndexName}_testPolicyName_1"
val testPolicy = """
{"policy":{"description":"Default policy","default_state":"rolloverstate","states":[
{"name":"rolloverstate","actions":[{"timeout": "5s","open":{}},{"timeout":"1s","rollover":{"min_doc_count":100}}],
"transitions":[]}]}}
""".trimIndent()

createPolicyJson(testPolicy, policyID)

createIndex(indexName, policyID, "some_alias")

val managedIndexConfig = getExistingManagedIndexConfig(indexName)
// Change the start time so the job will trigger in 2 seconds.
// First execution. We need to initialize the policy.
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor {
assertPredicatesOnMetaData(
listOf(indexName to listOf(ManagedIndexMetaData.POLICY_ID to policyID::equals)),
getExplainMap(indexName),
strict = false
)
}

// the second execution we move into open action, we won't hit the timeout as this is the execution that sets the startTime
updateManagedIndexConfigStartTime(managedIndexConfig)

val expectedOpenInfoString = mapOf("message" to "Successfully opened index").toString()
waitFor {
assertPredicatesOnMetaData(
listOf(indexName to listOf(ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedOpenInfoString == info.toString())),
getExplainMap(indexName),
strict = false
)
}

// wait 5 seconds for the timeout from the first action to have passed
Thread.sleep(5000L)

// the third execution we move into rollover action, we should not hit the timeout yet because its the first execution of rollover
// but there was a bug before where it would use the startTime from the previous actions metadata and immediately fail
updateManagedIndexConfigStartTime(managedIndexConfig)

val expectedRolloverInfoString = mapOf("message" to "Attempting to rollover").toString()
waitFor {
assertPredicatesOnMetaData(
listOf(indexName to listOf(ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedRolloverInfoString == info.toString())),
getExplainMap(indexName),
strict = false
)
}
}
}

0 comments on commit 05b180c

Please sign in to comment.