Skip to content

Commit

Permalink
Step Metadata Update on Index Rollover Timeout (opensearch-project#1174)
Browse files Browse the repository at this point in the history
Co-authored-by: Harshita Kaushik <[email protected]>
  • Loading branch information
harshitakaushik-dev and Harshita Kaushik authored May 30, 2024
1 parent acbc930 commit d4ee795
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ abstract class Step(val name: String, val isSafeToDisableOn: Boolean = true) {
CONDITION_NOT_MET("condition_not_met"),
FAILED("failed"),
COMPLETED("completed"),
TIMED_OUT("timed_out"),
;

override fun toString(): String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedInde
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.PolicyRetryInfoMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StateMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.jobscheduler.spi.JobExecutionContext
import org.opensearch.jobscheduler.spi.LockModel
import org.opensearch.jobscheduler.spi.ScheduledJobParameter
Expand Down Expand Up @@ -330,14 +331,18 @@ object ManagedIndexRunner :
if (action?.hasTimedOut(currentActionMetaData) == true) {
val info = mapOf("message" to "Action timed out")
logger.error("Action=${action.type} has timed out")
val updated =
updateManagedIndexMetaData(
managedIndexMetaData
.copy(actionMetaData = currentActionMetaData?.copy(failed = true), info = info),
)

val updatedIndexMetaData = managedIndexMetaData.copy(
actionMetaData = currentActionMetaData?.copy(failed = true),
stepMetaData = step?.let { StepMetaData(it.name, System.currentTimeMillis(), Step.StepStatus.TIMED_OUT) },
info = info,
)

val updated = updateManagedIndexMetaData(updatedIndexMetaData)

if (updated.metadataSaved) {
disableManagedIndexConfig(managedIndexConfig)
publishErrorNotification(policy, managedIndexMetaData)
publishErrorNotification(policy, updatedIndexMetaData)
}
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ package org.opensearch.indexmanagement.indexstatemanagement.action
import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase
import org.opensearch.indexmanagement.indexstatemanagement.step.open.AttemptOpenStep
import org.opensearch.indexmanagement.indexstatemanagement.step.rollover.AttemptRolloverStep
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.indexmanagement.waitFor
import java.time.Instant
import java.util.Locale
Expand All @@ -22,9 +24,9 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() {
val policyID = "${testIndexName}_testPolicyName_1"
val testPolicy =
"""
{"policy":{"description":"Default policy","default_state":"rolloverstate","states":[
{"name":"rolloverstate","actions":[{"timeout":"1s","rollover":{"min_doc_count":100}}],
"transitions":[]}]}}
{"policy":{"description":"Default policy","default_state":"rolloverstate","states":[
{"name":"rolloverstate","actions":[{"timeout":"1s","rollover":{"min_doc_count":100}}],
"transitions":[]}]}}
""".trimIndent()

createPolicyJson(testPolicy, policyID)
Expand Down Expand Up @@ -60,11 +62,24 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() {
fun(actionMetaDataMap: Any?): Boolean =
assertActionEquals(
ActionMetaData(
name = RolloverAction.name, startTime = Instant.now().toEpochMilli(), index = 0,
failed = true, consumedRetries = 0, lastRetryTime = null, actionProperties = null,
name = RolloverAction.name,
startTime = Instant.now().toEpochMilli(),
index = 0,
failed = true,
consumedRetries = 0,
lastRetryTime = null,
actionProperties = null,
),
actionMetaDataMap,
),
StepMetaData.STEP to
fun(stepMetaDataMap: Any?): Boolean =
assertStepEquals(
StepMetaData(
"attempt_rollover", Instant.now().toEpochMilli(), Step.StepStatus.TIMED_OUT,
),
stepMetaDataMap,
),
),
),
getExplainMap(indexName),
Expand Down

0 comments on commit d4ee795

Please sign in to comment.