Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Backport bug fixes/improvements to opendistro-1.3 (#138)
Browse files Browse the repository at this point in the history
* Fixes history index iteration breaking instead of continuing (#134)

* Fixes issue where action timeout was using start_time from previous action (#133)

* Updates plugin version and release notes
  • Loading branch information
dbbaughe authored Jan 17, 2020
1 parent 486f833 commit b7dddc5
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 9 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ ext {
}

group = "com.amazon.opendistroforelasticsearch"
version = "${opendistroVersion}.1"
version = "${opendistroVersion}.2"

if (isSnapshot) {
version += "-SNAPSHOT"
Expand Down
8 changes: 7 additions & 1 deletion opendistro-elasticsearch-index-management.release-notes.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
## Version 1.3.0.1 (Current)
## Version 1.3.0.2 (Current)

### Bug Fixes
* Fixes issue where action timeout was using start_time from previous action (#133)
* Fixes history index iteration breaking instead of continuing (#134)

## 2019-12-17, Version 1.3.0.1

### Bug Fixes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class IndexStateManagementHistory(
val alias = indexMetaData.aliases.firstOrNull { IndexStateManagementIndices.HISTORY_WRITE_INDEX_ALIAS == it.value.alias }
if (alias != null && historyEnabled) {
// If index has write alias and history is enable, don't delete the index.
break
continue
}

indexToDelete.add(indexMetaData.index.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getAction
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getStartingManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getStateToExecute
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getCompletedManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getUpdatedActionMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.hasDifferentJobInterval
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.hasTimedOut
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.hasVersionConflict
Expand Down Expand Up @@ -233,6 +234,7 @@ object ManagedIndexRunner : ScheduledJobRunner,
val state = policy.getStateToExecute(managedIndexMetaData)
val action: Action? = state?.getActionToExecute(clusterService, scriptService, client, managedIndexMetaData)
val step: Step? = action?.getStepToExecute()
val currentActionMetaData = action?.getUpdatedActionMetaData(managedIndexMetaData, state)

// If Index State Management is disabled and the current step is not null and safe to disable on
// then disable the job and return early
Expand All @@ -241,11 +243,11 @@ object ManagedIndexRunner : ScheduledJobRunner,
return
}

if (action?.hasTimedOut(managedIndexMetaData.actionMetaData) == true) {
if (action?.hasTimedOut(currentActionMetaData) == true) {
val info = mapOf("message" to "Action timed out")
logger.error("Action=${action.type.type} has timed out")
val updated = updateManagedIndexMetaData(managedIndexMetaData
.copy(actionMetaData = managedIndexMetaData.actionMetaData?.copy(failed = true), info = info))
.copy(actionMetaData = currentActionMetaData?.copy(failed = true), info = info))
if (updated) disableManagedIndexConfig(managedIndexConfig)
return
}
Expand All @@ -255,7 +257,7 @@ object ManagedIndexRunner : ScheduledJobRunner,
return
}

val shouldBackOff = action?.shouldBackoff(managedIndexMetaData.actionMetaData, action.config.configRetry)
val shouldBackOff = action?.shouldBackoff(currentActionMetaData, action.config.configRetry)
if (shouldBackOff?.first == true) {
// If we should back off then exit early.
logger.info("Backoff for retrying. Remaining time ${shouldBackOff.second}")
Expand All @@ -282,9 +284,7 @@ object ManagedIndexRunner : ScheduledJobRunner,
val startingManagedIndexMetaData = managedIndexMetaData.getStartingManagedIndexMetaData(state, action, step)
val updateResult = updateManagedIndexMetaData(startingManagedIndexMetaData)

val actionMetaData = startingManagedIndexMetaData.actionMetaData

if (updateResult && state != null && action != null && step != null && actionMetaData != null) {
if (updateResult && state != null && action != null && step != null && currentActionMetaData != null) {
// Step null check is done in getStartingManagedIndexMetaData
step.execute()
var executedManagedIndexMetaData = startingManagedIndexMetaData.getCompletedManagedIndexMetaData(action, step)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,60 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() {
)
}
}

// https://github.com/opendistro-for-elasticsearch/index-management/issues/130
fun `test action timeout doesn't bleed over into next action`() {
val indexName = "${testIndexName}_index_1"
val policyID = "${testIndexName}_testPolicyName_1"
val testPolicy = """
{"policy":{"description":"Default policy","default_state":"rolloverstate","states":[
{"name":"rolloverstate","actions":[{"timeout": "5s","open":{}},{"timeout":"1s","rollover":{"min_doc_count":100}}],
"transitions":[]}]}}
""".trimIndent()

createPolicyJson(testPolicy, policyID)

createIndex(indexName, policyID, "some_alias")

val managedIndexConfig = getExistingManagedIndexConfig(indexName)
// Change the start time so the job will trigger in 2 seconds.
// First execution. We need to initialize the policy.
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor {
assertPredicatesOnMetaData(
listOf(indexName to listOf(ManagedIndexMetaData.POLICY_ID to policyID::equals)),
getExplainMap(indexName),
strict = false
)
}

// the second execution we move into open action, we won't hit the timeout as this is the execution that sets the startTime
updateManagedIndexConfigStartTime(managedIndexConfig)

val expectedOpenInfoString = mapOf("message" to "Successfully opened index").toString()
waitFor {
assertPredicatesOnMetaData(
listOf(indexName to listOf(ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedOpenInfoString == info.toString())),
getExplainMap(indexName),
strict = false
)
}

// wait 5 seconds for the timeout from the first action to have passed
Thread.sleep(5000L)

// the third execution we move into rollover action, we should not hit the timeout yet because its the first execution of rollover
// but there was a bug before where it would use the startTime from the previous actions metadata and immediately fail
updateManagedIndexConfigStartTime(managedIndexConfig)

val expectedRolloverInfoString = mapOf("message" to "Attempting to rollover").toString()
waitFor {
assertPredicatesOnMetaData(
listOf(indexName to listOf(ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedRolloverInfoString == info.toString())),
getExplainMap(indexName),
strict = false
)
}
}
}

0 comments on commit b7dddc5

Please sign in to comment.