Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactors the managed index runner to work with extensions #262

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ integTest {
}

// TODO: Remove me after refactoring all actions
exclude 'org/opensearch/indexmanagement/indexstatemanagement/runner/*'
exclude 'org/opensearch/indexmanagement/indexstatemanagement/resthandler/*'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionTimeout
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import java.time.Instant

abstract class Action(
val type: String,
Expand Down Expand Up @@ -50,6 +53,21 @@ abstract class Action(
populateAction(out)
}

fun getUpdatedActionMetadata(managedIndexMetaData: ManagedIndexMetaData, stateName: String): ActionMetaData {
val stateMetaData = managedIndexMetaData.stateMetaData
val actionMetaData = managedIndexMetaData.actionMetaData

return when {
// start a new action
stateMetaData?.name != stateName ->
ActionMetaData(this.type, Instant.now().toEpochMilli(), this.actionIndex, false, 0, 0, null)
actionMetaData?.index != this.actionIndex ->
ActionMetaData(this.type, Instant.now().toEpochMilli(), this.actionIndex, false, 0, 0, null)
// RetryAPI will reset startTime to null for actionMetaData and we'll reset it to "now" here
else -> actionMetaData.copy(startTime = actionMetaData.startTime ?: Instant.now().toEpochMilli())
}
}

/**
* The implementer of Action can change this method to correctly serialize the internals of the action
* when data is shared between nodes
Expand All @@ -72,6 +90,25 @@ abstract class Action(

final fun isFirstStep(stepName: String): Boolean = getSteps().first().name == stepName

/*
* Gets if the managedIndexMetaData reflects a state in which this action has completed successfully
*/
final fun isFinishedSuccessfully(managedIndexMetaData: ManagedIndexMetaData): Boolean {
val policyRetryInfo = managedIndexMetaData.policyRetryInfo
if (policyRetryInfo == null || policyRetryInfo.failed) return false
val actionMetaData = managedIndexMetaData.actionMetaData
if (actionMetaData == null || actionMetaData.failed || actionMetaData.name != this.type) return false
val stepMetaData = managedIndexMetaData.stepMetaData
if (stepMetaData == null || !isLastStep(stepMetaData.name) || stepMetaData.stepStatus != Step.StepStatus.COMPLETED) return false
return true
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this new code or just existing code refactored to this place?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a generalization of the isSuccessfulDelete function:
https://github.com/opensearch-project/index-management/blob/main/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt#L430-L433
The purpose is that if an extension creates an action to delete their off cluster index, then they can set deleteIndexMetadataAfterFinish to true in the custom action, and then after it isFinishedSuccessfully, the metadata will be deleted for the action.
Thinking about this a bit more now, I can add an additional validation here to make it so these extension actions which set deleteIndexMetadataAfterFinish = true can't transition or have actions after the delete

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do see a logical error in the code though, if (policyRetryInfo == null || policyRetryInfo.failed) return false would mean that the action would never be noted as isFinishedSuccessfully if the policyRetryInfo was null. It is initialized with the metadata so it shouldn't be a problem but I will change that as well


/*
* Denotes if the index metadata in the config index should be deleted for the index this action has just
* successfully finished running on.
*/
open fun deleteIndexMetadataAfterFinish(): Boolean = false

companion object {
const val DEFAULT_RETRIES = 3L
const val CUSTOM_ACTION_FIELD = "custom"
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ data class ManagedIndexMetaData(
val policyPrimaryTerm: Long?,
val policyCompleted: Boolean?,
val rolledOver: Boolean?,
val indexCreationDate: Long?,
val transitionTo: String?,
val stateMetaData: StateMetaData?,
val actionMetaData: ActionMetaData?,
Expand Down Expand Up @@ -58,6 +59,7 @@ data class ManagedIndexMetaData(
if (policyPrimaryTerm != null) resultMap[POLICY_PRIMARY_TERM] = policyPrimaryTerm.toString()
if (policyCompleted != null) resultMap[POLICY_COMPLETED] = policyCompleted.toString()
if (rolledOver != null) resultMap[ROLLED_OVER] = rolledOver.toString()
if (indexCreationDate != null) resultMap[INDEX_CREATION_DATE] = indexCreationDate.toString()
if (transitionTo != null) resultMap[TRANSITION_TO] = transitionTo
if (stateMetaData != null) resultMap[StateMetaData.STATE] = stateMetaData.getMapValueString()
if (actionMetaData != null) resultMap[ActionMetaData.ACTION] = actionMetaData.getMapValueString()
Expand All @@ -82,6 +84,7 @@ data class ManagedIndexMetaData(
.field(POLICY_PRIMARY_TERM, policyPrimaryTerm)
.field(POLICY_COMPLETED, policyCompleted)
.field(ROLLED_OVER, rolledOver)
.field(INDEX_CREATION_DATE, indexCreationDate)
.field(TRANSITION_TO, transitionTo)
.addObject(StateMetaData.STATE, stateMetaData, params, true)
.addObject(ActionMetaData.ACTION, actionMetaData, params, true)
Expand Down Expand Up @@ -117,6 +120,8 @@ data class ManagedIndexMetaData(
builder.field(ROLLED_OVER, rolledOver)
}

if (indexCreationDate != null) builder.field(INDEX_CREATION_DATE, indexCreationDate)

if (policyCompleted == true) {
builder.field(POLICY_COMPLETED, policyCompleted)
return builder
Expand Down Expand Up @@ -145,6 +150,7 @@ data class ManagedIndexMetaData(
streamOutput.writeOptionalLong(policyPrimaryTerm)
streamOutput.writeOptionalBoolean(policyCompleted)
streamOutput.writeOptionalBoolean(rolledOver)
streamOutput.writeOptionalLong(indexCreationDate)
streamOutput.writeOptionalString(transitionTo)

streamOutput.writeOptionalWriteable(stateMetaData)
Expand Down Expand Up @@ -174,6 +180,7 @@ data class ManagedIndexMetaData(
const val POLICY_PRIMARY_TERM = "policy_primary_term"
const val POLICY_COMPLETED = "policy_completed"
const val ROLLED_OVER = "rolled_over"
const val INDEX_CREATION_DATE = "index_creation_date"
const val TRANSITION_TO = "transition_to"
const val INFO = "info"
const val ENABLED = "enabled"
Expand All @@ -186,6 +193,7 @@ data class ManagedIndexMetaData(
val policyPrimaryTerm: Long? = si.readOptionalLong()
val policyCompleted: Boolean? = si.readOptionalBoolean()
val rolledOver: Boolean? = si.readOptionalBoolean()
val indexCreationDate: Long? = si.readOptionalLong()
val transitionTo: String? = si.readOptionalString()

val state: StateMetaData? = si.readOptionalWriteable { StateMetaData.fromStreamInput(it) }
Expand All @@ -207,6 +215,7 @@ data class ManagedIndexMetaData(
policyPrimaryTerm = policyPrimaryTerm,
policyCompleted = policyCompleted,
rolledOver = rolledOver,
indexCreationDate = indexCreationDate,
transitionTo = transitionTo,
stateMetaData = state,
actionMetaData = action,
Expand All @@ -233,6 +242,7 @@ data class ManagedIndexMetaData(
var policyPrimaryTerm: Long? = null
var policyCompleted: Boolean? = null
var rolledOver: Boolean? = null
var indexCreationDate: Long? = null
var transitionTo: String? = null

var state: StateMetaData? = null
Expand All @@ -255,6 +265,7 @@ data class ManagedIndexMetaData(
POLICY_PRIMARY_TERM -> policyPrimaryTerm = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.longValue()
POLICY_COMPLETED -> policyCompleted = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.booleanValue()
ROLLED_OVER -> rolledOver = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.booleanValue()
INDEX_CREATION_DATE -> indexCreationDate = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.longValue()
TRANSITION_TO -> transitionTo = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.text()
StateMetaData.STATE -> {
state = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else StateMetaData.parse(xcp)
Expand Down Expand Up @@ -282,6 +293,7 @@ data class ManagedIndexMetaData(
policyPrimaryTerm,
policyCompleted,
rolledOver,
indexCreationDate,
transitionTo,
state,
action,
Expand Down Expand Up @@ -320,6 +332,7 @@ data class ManagedIndexMetaData(
policyPrimaryTerm = map[POLICY_PRIMARY_TERM]?.toLong(),
policyCompleted = map[POLICY_COMPLETED]?.toBoolean(),
rolledOver = map[ROLLED_OVER]?.toBoolean(),
indexCreationDate = map[INDEX_CREATION_DATE]?.toLong(),
transitionTo = map[TRANSITION_TO],
stateMetaData = StateMetaData.fromManagedIndexMetaDataMap(map),
actionMetaData = ActionMetaData.fromManagedIndexMetaDataMap(map),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
.registerSkipFlag(skipFlag)
.registerThreadPool(threadPool)
.registerExtensionChecker(extensionChecker)
.registerIndexMetadataProvider(indexMetadataProvider)

val metadataService = MetadataService(client, clusterService, skipFlag, indexManagementIndices)
val templateService = ISMTemplateService(client, clusterService, xContentRegistry, indexManagementIndices)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class IndexMetadataProvider(
return service.getMetadata(indexNames, client, clusterService)
}

/*
* Attempts to get the index metadata for of all indexNames for each of the index types designated in the types parameter.
* Returns a map of <index type to <index names to index metadata>>
*/
suspend fun getMultiTypeISMIndexMetadata(
types: List<String> = services.keys.toList(),
indexNames: List<String>
Expand Down
Loading