diff --git a/build.gradle b/build.gradle index 2f3b78c00..b6f3ad4e7 100644 --- a/build.gradle +++ b/build.gradle @@ -106,7 +106,7 @@ ext { } group = "com.amazon.opendistroforelasticsearch" -version = "${opendistroVersion}.2" +version = "${opendistroVersion}.3" if (isSnapshot) { version += "-SNAPSHOT" diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexRunner.kt index c3421bac7..98d4a1b1c 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexRunner.kt @@ -265,10 +265,14 @@ object ManagedIndexRunner : ScheduledJobRunner, } if (managedIndexMetaData.stepMetaData?.stepStatus == Step.StepStatus.STARTING) { - val info = mapOf("message" to "Previous action was not able to update IndexMetaData.") - val updated = updateManagedIndexMetaData(managedIndexMetaData.copy(policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info)) - if (updated) disableManagedIndexConfig(managedIndexConfig) - return + val isIdempotent = step?.isIdempotent() + logger.info("Previous execution failed to update step status, isIdempotent=$isIdempotent") + if (isIdempotent != true) { + val info = mapOf("message" to "Previous action was not able to update IndexMetaData.") + val updated = updateManagedIndexMetaData(managedIndexMetaData.copy(policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info)) + if (updated) disableManagedIndexConfig(managedIndexConfig) + return + } } // If this action is not allowed and the step to be executed is the first step in the action then we will fail diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/Step.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/Step.kt index 82d81697e..35cd30051 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/Step.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/Step.kt @@ -29,6 +29,21 @@ abstract class Step(val name: String, val managedIndexMetaData: ManagedIndexMeta abstract fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData + /** + * Before every execution of a step, we first update the step_status in cluster state to [StepStatus.STARTING] + * to signal that work is about to be done for the managed index. The step then attempts to do work by + * calling execute, and finally updates the step_status with the results of that work ([StepStatus]). + * + * If we ever start an execution with a step_status of [StepStatus.STARTING] it means we failed to update the step_status + * after calling the execute function. Since we do not know if the execution was a noop, failed, or completed then + * we can't always assume it's safe to just retry it (e.g. calling force merge multiple times in a row). This means + * that final update is a failure point that can't be retried and when multiplied by # of executions it leads to a lot of + * chances over time for random network failures, timeouts, etc. + * + * To get around this every step should have an [isIdempotent] method to signal if it's safe to retry this step for such failures. + */ + abstract fun isIdempotent(): Boolean + fun getStartingStepMetaData(): StepMetaData { return StepMetaData(name, getStepStartTime().toEpochMilli(), StepStatus.STARTING) } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/close/AttemptCloseStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/close/AttemptCloseStep.kt index 3742974a0..d7192f122 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/close/AttemptCloseStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/close/AttemptCloseStep.kt @@ -38,6 +38,8 @@ class AttemptCloseStep( private var stepStatus = StepStatus.STARTING private var info: Map? = null + override fun isIdempotent() = true + @Suppress("TooGenericExceptionCaught") override suspend fun execute() { val index = managedIndexMetaData.index diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/delete/AttemptDeleteStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/delete/AttemptDeleteStep.kt index 507b74d2e..a7ad4dba3 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/delete/AttemptDeleteStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/delete/AttemptDeleteStep.kt @@ -39,6 +39,8 @@ class AttemptDeleteStep( private var stepStatus = StepStatus.STARTING private var info: Map? = null + override fun isIdempotent() = true + @Suppress("TooGenericExceptionCaught") override suspend fun execute() { try { diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptCallForceMergeStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptCallForceMergeStep.kt index d10920679..66e4baae1 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptCallForceMergeStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptCallForceMergeStep.kt @@ -39,6 +39,8 @@ class AttemptCallForceMergeStep( private var stepStatus = StepStatus.STARTING private var info: Map? = null + override fun isIdempotent() = false + @Suppress("TooGenericExceptionCaught") override suspend fun execute() { try { diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptSetReadOnlyStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptSetReadOnlyStep.kt index e9576103e..5894d2fca 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptSetReadOnlyStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptSetReadOnlyStep.kt @@ -39,6 +39,8 @@ class AttemptSetReadOnlyStep( private var stepStatus = StepStatus.STARTING private var info: Map? = null + override fun isIdempotent() = true + override suspend fun execute() { val indexName = managedIndexMetaData.index diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt index a5849b9c6..04d3989f9 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt @@ -41,6 +41,8 @@ class WaitForForceMergeStep( private var stepStatus = StepStatus.STARTING private var info: Map? = null + override fun isIdempotent() = true + @Suppress("TooGenericExceptionCaught") override suspend fun execute() { val indexName = managedIndexMetaData.index diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/notification/AttemptNotificationStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/notification/AttemptNotificationStep.kt index 2bdeb4bd3..75b8c7bd7 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/notification/AttemptNotificationStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/notification/AttemptNotificationStep.kt @@ -41,6 +41,8 @@ class AttemptNotificationStep( private var stepStatus = StepStatus.STARTING private var info: Map? = null + override fun isIdempotent() = false + @Suppress("TooGenericExceptionCaught") override suspend fun execute() { try { diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/open/AttemptOpenStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/open/AttemptOpenStep.kt index 545918b9c..950357ad1 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/open/AttemptOpenStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/open/AttemptOpenStep.kt @@ -37,6 +37,8 @@ class AttemptOpenStep( private var stepStatus = StepStatus.STARTING private var info: Map? = null + override fun isIdempotent() = true + @Suppress("TooGenericExceptionCaught") override suspend fun execute() { try { diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readonly/SetReadOnlyStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readonly/SetReadOnlyStep.kt index 70276e88f..9d217515c 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readonly/SetReadOnlyStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readonly/SetReadOnlyStep.kt @@ -38,6 +38,8 @@ class SetReadOnlyStep( private var stepStatus = StepStatus.STARTING private var info: Map? = null + override fun isIdempotent() = true + @Suppress("TooGenericExceptionCaught") override suspend fun execute() { try { diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readwrite/SetReadWriteStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readwrite/SetReadWriteStep.kt index 90dd465ba..f38d6ce3f 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readwrite/SetReadWriteStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readwrite/SetReadWriteStep.kt @@ -38,6 +38,8 @@ class SetReadWriteStep( private var stepStatus = StepStatus.STARTING private var info: Map? = null + override fun isIdempotent() = true + @Suppress("TooGenericExceptionCaught") override suspend fun execute() { try { diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/replicacount/AttemptSetReplicaCountStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/replicacount/AttemptSetReplicaCountStep.kt index 9bed79c38..e452736b0 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/replicacount/AttemptSetReplicaCountStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/replicacount/AttemptSetReplicaCountStep.kt @@ -38,6 +38,8 @@ class AttemptSetReplicaCountStep( private var stepStatus = StepStatus.STARTING private var info: Map? = null + override fun isIdempotent() = true + @Suppress("TooGenericExceptionCaught") override suspend fun execute() { val numOfReplicas = config.numOfReplicas diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt index d79299319..56410dfa9 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt @@ -45,6 +45,8 @@ class AttemptRolloverStep( private var stepStatus = StepStatus.STARTING private var info: Map? = null + override fun isIdempotent() = false + @Suppress("TooGenericExceptionCaught") override suspend fun execute() { val index = managedIndexMetaData.index diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/transition/AttemptTransitionStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/transition/AttemptTransitionStep.kt index a6701d328..17da79078 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/transition/AttemptTransitionStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/transition/AttemptTransitionStep.kt @@ -51,6 +51,8 @@ class AttemptTransitionStep( private var policyCompleted: Boolean = false private var info: Map? = null + override fun isIdempotent() = true + @Suppress("TooGenericExceptionCaught") override suspend fun execute() { val index = managedIndexMetaData.index