Skip to content

Commit

Permalink
Adds isIdempotent method to each step and updates ManagedIndexRunner …
Browse files Browse the repository at this point in the history
  • Loading branch information
jinsoor-amzn and dbbaughe authored Apr 7, 2020
1 parent f420553 commit 03b0a2e
Show file tree
Hide file tree
Showing 15 changed files with 48 additions and 5 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ ext {
}

group = "com.amazon.opendistroforelasticsearch"
version = "${opendistroVersion}.2"
version = "${opendistroVersion}.3"

if (isSnapshot) {
version += "-SNAPSHOT"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,14 @@ object ManagedIndexRunner : ScheduledJobRunner,
}

if (managedIndexMetaData.stepMetaData?.stepStatus == Step.StepStatus.STARTING) {
val info = mapOf("message" to "Previous action was not able to update IndexMetaData.")
val updated = updateManagedIndexMetaData(managedIndexMetaData.copy(policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info))
if (updated) disableManagedIndexConfig(managedIndexConfig)
return
val isIdempotent = step?.isIdempotent()
logger.info("Previous execution failed to update step status, isIdempotent=$isIdempotent")
if (isIdempotent != true) {
val info = mapOf("message" to "Previous action was not able to update IndexMetaData.")
val updated = updateManagedIndexMetaData(managedIndexMetaData.copy(policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info))
if (updated) disableManagedIndexConfig(managedIndexConfig)
return
}
}

// If this action is not allowed and the step to be executed is the first step in the action then we will fail
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,21 @@ abstract class Step(val name: String, val managedIndexMetaData: ManagedIndexMeta

abstract fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData

/**
* Before every execution of a step, we first update the step_status in cluster state to [StepStatus.STARTING]
* to signal that work is about to be done for the managed index. The step then attempts to do work by
* calling execute, and finally updates the step_status with the results of that work ([StepStatus]).
*
* If we ever start an execution with a step_status of [StepStatus.STARTING] it means we failed to update the step_status
* after calling the execute function. Since we do not know if the execution was a noop, failed, or completed then
* we can't always assume it's safe to just retry it (e.g. calling force merge multiple times in a row). This means
* that final update is a failure point that can't be retried and when multiplied by # of executions it leads to a lot of
* chances over time for random network failures, timeouts, etc.
*
* To get around this every step should have an [isIdempotent] method to signal if it's safe to retry this step for such failures.
*/
abstract fun isIdempotent(): Boolean

fun getStartingStepMetaData(): StepMetaData {
return StepMetaData(name, getStepStartTime().toEpochMilli(), StepStatus.STARTING)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class AttemptCloseStep(
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override fun isIdempotent() = true

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
val index = managedIndexMetaData.index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class AttemptDeleteStep(
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override fun isIdempotent() = true

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class AttemptCallForceMergeStep(
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override fun isIdempotent() = false

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class AttemptSetReadOnlyStep(
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override fun isIdempotent() = true

override suspend fun execute() {
val indexName = managedIndexMetaData.index

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class WaitForForceMergeStep(
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override fun isIdempotent() = true

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
val indexName = managedIndexMetaData.index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class AttemptNotificationStep(
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override fun isIdempotent() = false

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class AttemptOpenStep(
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override fun isIdempotent() = true

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class SetReadOnlyStep(
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override fun isIdempotent() = true

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class SetReadWriteStep(
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override fun isIdempotent() = true

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class AttemptSetReplicaCountStep(
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override fun isIdempotent() = true

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
val numOfReplicas = config.numOfReplicas
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class AttemptRolloverStep(
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override fun isIdempotent() = false

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
val index = managedIndexMetaData.index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class AttemptTransitionStep(
private var policyCompleted: Boolean = false
private var info: Map<String, Any>? = null

override fun isIdempotent() = true

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
val index = managedIndexMetaData.index
Expand Down

0 comments on commit 03b0a2e

Please sign in to comment.