Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactors the managed index runner to work with extensions #262

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ integTest {
}

// TODO: Remove me after refactoring all actions
exclude 'org/opensearch/indexmanagement/indexstatemanagement/runner/*'
exclude 'org/opensearch/indexmanagement/indexstatemanagement/resthandler/*'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionTimeout
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import java.time.Instant

abstract class Action(
val type: String,
Expand Down Expand Up @@ -50,6 +53,21 @@ abstract class Action(
populateAction(out)
}

fun getUpdatedActionMetadata(managedIndexMetaData: ManagedIndexMetaData, stateName: String): ActionMetaData {
val stateMetaData = managedIndexMetaData.stateMetaData
val actionMetaData = managedIndexMetaData.actionMetaData

return when {
// start a new action
stateMetaData?.name != stateName ->
ActionMetaData(this.type, Instant.now().toEpochMilli(), this.actionIndex, false, 0, 0, null)
actionMetaData?.index != this.actionIndex ->
ActionMetaData(this.type, Instant.now().toEpochMilli(), this.actionIndex, false, 0, 0, null)
// RetryAPI will reset startTime to null for actionMetaData and we'll reset it to "now" here
else -> actionMetaData.copy(startTime = actionMetaData.startTime ?: Instant.now().toEpochMilli())
}
}

/**
* The implementer of Action can change this method to correctly serialize the internals of the action
* when data is shared between nodes
Expand All @@ -72,6 +90,29 @@ abstract class Action(

final fun isFirstStep(stepName: String): Boolean = getSteps().first().name == stepName

/*
* Gets if the managedIndexMetaData reflects a state in which this action has completed successfully. Used in the
* runner when determining if the index metadata should be deleted. If the action isFinishedSuccessfully and
* deleteIndexMetadataAfterFinish is set to true, then we issue a request to delete the managedIndexConfig and its
* managedIndexMetadata.
*/
final fun isFinishedSuccessfully(managedIndexMetaData: ManagedIndexMetaData): Boolean {
val policyRetryInfo = managedIndexMetaData.policyRetryInfo
if (policyRetryInfo?.failed == true) return false
val actionMetaData = managedIndexMetaData.actionMetaData
if (actionMetaData == null || actionMetaData.failed || actionMetaData.name != this.type) return false
val stepMetaData = managedIndexMetaData.stepMetaData
if (stepMetaData == null || !isLastStep(stepMetaData.name) || stepMetaData.stepStatus != Step.StepStatus.COMPLETED) return false
return true
}

/*
* Denotes if the index metadata in the config index should be deleted for the index this action has just
* successfully finished running on. This may be used by custom actions which delete some off-cluster index,
* and following the action's success, the managed index config and metadata need to be deleted.
*/
open fun deleteIndexMetadataAfterFinish(): Boolean = false

companion object {
const val DEFAULT_RETRIES = 3L
const val CUSTOM_ACTION_FIELD = "custom"
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ data class ManagedIndexMetaData(
val policyPrimaryTerm: Long?,
val policyCompleted: Boolean?,
val rolledOver: Boolean?,
val indexCreationDate: Long?,
val transitionTo: String?,
val stateMetaData: StateMetaData?,
val actionMetaData: ActionMetaData?,
Expand Down Expand Up @@ -58,6 +59,7 @@ data class ManagedIndexMetaData(
if (policyPrimaryTerm != null) resultMap[POLICY_PRIMARY_TERM] = policyPrimaryTerm.toString()
if (policyCompleted != null) resultMap[POLICY_COMPLETED] = policyCompleted.toString()
if (rolledOver != null) resultMap[ROLLED_OVER] = rolledOver.toString()
if (indexCreationDate != null) resultMap[INDEX_CREATION_DATE] = indexCreationDate.toString()
if (transitionTo != null) resultMap[TRANSITION_TO] = transitionTo
if (stateMetaData != null) resultMap[StateMetaData.STATE] = stateMetaData.getMapValueString()
if (actionMetaData != null) resultMap[ActionMetaData.ACTION] = actionMetaData.getMapValueString()
Expand All @@ -82,6 +84,7 @@ data class ManagedIndexMetaData(
.field(POLICY_PRIMARY_TERM, policyPrimaryTerm)
.field(POLICY_COMPLETED, policyCompleted)
.field(ROLLED_OVER, rolledOver)
.field(INDEX_CREATION_DATE, indexCreationDate)
.field(TRANSITION_TO, transitionTo)
.addObject(StateMetaData.STATE, stateMetaData, params, true)
.addObject(ActionMetaData.ACTION, actionMetaData, params, true)
Expand Down Expand Up @@ -117,6 +120,8 @@ data class ManagedIndexMetaData(
builder.field(ROLLED_OVER, rolledOver)
}

if (indexCreationDate != null) builder.field(INDEX_CREATION_DATE, indexCreationDate)

if (policyCompleted == true) {
builder.field(POLICY_COMPLETED, policyCompleted)
return builder
Expand Down Expand Up @@ -145,6 +150,7 @@ data class ManagedIndexMetaData(
streamOutput.writeOptionalLong(policyPrimaryTerm)
streamOutput.writeOptionalBoolean(policyCompleted)
streamOutput.writeOptionalBoolean(rolledOver)
streamOutput.writeOptionalLong(indexCreationDate)
streamOutput.writeOptionalString(transitionTo)

streamOutput.writeOptionalWriteable(stateMetaData)
Expand Down Expand Up @@ -174,6 +180,7 @@ data class ManagedIndexMetaData(
const val POLICY_PRIMARY_TERM = "policy_primary_term"
const val POLICY_COMPLETED = "policy_completed"
const val ROLLED_OVER = "rolled_over"
const val INDEX_CREATION_DATE = "index_creation_date"
const val TRANSITION_TO = "transition_to"
const val INFO = "info"
const val ENABLED = "enabled"
Expand All @@ -186,6 +193,7 @@ data class ManagedIndexMetaData(
val policyPrimaryTerm: Long? = si.readOptionalLong()
val policyCompleted: Boolean? = si.readOptionalBoolean()
val rolledOver: Boolean? = si.readOptionalBoolean()
val indexCreationDate: Long? = si.readOptionalLong()
val transitionTo: String? = si.readOptionalString()

val state: StateMetaData? = si.readOptionalWriteable { StateMetaData.fromStreamInput(it) }
Expand All @@ -207,6 +215,7 @@ data class ManagedIndexMetaData(
policyPrimaryTerm = policyPrimaryTerm,
policyCompleted = policyCompleted,
rolledOver = rolledOver,
indexCreationDate = indexCreationDate,
transitionTo = transitionTo,
stateMetaData = state,
actionMetaData = action,
Expand All @@ -233,6 +242,7 @@ data class ManagedIndexMetaData(
var policyPrimaryTerm: Long? = null
var policyCompleted: Boolean? = null
var rolledOver: Boolean? = null
var indexCreationDate: Long? = null
var transitionTo: String? = null

var state: StateMetaData? = null
Expand All @@ -255,6 +265,7 @@ data class ManagedIndexMetaData(
POLICY_PRIMARY_TERM -> policyPrimaryTerm = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.longValue()
POLICY_COMPLETED -> policyCompleted = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.booleanValue()
ROLLED_OVER -> rolledOver = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.booleanValue()
INDEX_CREATION_DATE -> indexCreationDate = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.longValue()
TRANSITION_TO -> transitionTo = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.text()
StateMetaData.STATE -> {
state = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else StateMetaData.parse(xcp)
Expand Down Expand Up @@ -282,6 +293,7 @@ data class ManagedIndexMetaData(
policyPrimaryTerm,
policyCompleted,
rolledOver,
indexCreationDate,
transitionTo,
state,
action,
Expand Down Expand Up @@ -320,6 +332,7 @@ data class ManagedIndexMetaData(
policyPrimaryTerm = map[POLICY_PRIMARY_TERM]?.toLong(),
policyCompleted = map[POLICY_COMPLETED]?.toBoolean(),
rolledOver = map[ROLLED_OVER]?.toBoolean(),
indexCreationDate = map[INDEX_CREATION_DATE]?.toLong(),
transitionTo = map[TRANSITION_TO],
stateMetaData = StateMetaData.fromManagedIndexMetaDataMap(map),
actionMetaData = ActionMetaData.fromManagedIndexMetaDataMap(map),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
.registerSkipFlag(skipFlag)
.registerThreadPool(threadPool)
.registerExtensionChecker(extensionChecker)
.registerIndexMetadataProvider(indexMetadataProvider)

val metadataService = MetadataService(client, clusterService, skipFlag, indexManagementIndices)
val templateService = ISMTemplateService(client, clusterService, xContentRegistry, indexManagementIndices)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class IndexMetadataProvider(
return service.getMetadata(indexNames, client, clusterService)
}

/*
* Attempts to get the index metadata for of all indexNames for each of the index types designated in the types parameter.
* Returns a map of <index type to <index names to index metadata>>
*/
suspend fun getMultiTypeISMIndexMetadata(
types: List<String> = services.keys.toList(),
indexNames: List<String>
Expand Down
Loading