Skip to content

Commit

Permalink
Refactors the managed index runner to work with extensions (#262)
Browse files Browse the repository at this point in the history
* Adds index creation date

Signed-off-by: Robert Downs <[email protected]>

Adds NewClusterEventListeners and refactors Transition to work with custom actions

Signed-off-by: Robert Downs <[email protected]>

Marks blocked actions list as deprecated

Signed-off-by: Clay Downs <[email protected]>

Asserts the deprecation warning after adding to allow list in test

Signed-off-by: Robert Downs <[email protected]>

Removes allow_list test

Signed-off-by: Robert Downs <[email protected]>

* Fix additional rebase issues

Signed-off-by: Robert Downs <[email protected]>

* Fixes failing tests

Signed-off-by: Robert Downs <[email protected]>

* changes based on comments

Signed-off-by: Robert Downs <[email protected]>

* Adds additional comments

Signed-off-by: Clay Downs <[email protected]>
  • Loading branch information
downsrob authored Mar 6, 2022
1 parent 40544b8 commit dc05f52
Show file tree
Hide file tree
Showing 37 changed files with 454 additions and 140 deletions.
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ integTest {
}

// TODO: Remove me after refactoring all actions
exclude 'org/opensearch/indexmanagement/indexstatemanagement/runner/*'
exclude 'org/opensearch/indexmanagement/indexstatemanagement/resthandler/*'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionTimeout
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import java.time.Instant

abstract class Action(
val type: String,
Expand Down Expand Up @@ -50,6 +53,21 @@ abstract class Action(
populateAction(out)
}

fun getUpdatedActionMetadata(managedIndexMetaData: ManagedIndexMetaData, stateName: String): ActionMetaData {
val stateMetaData = managedIndexMetaData.stateMetaData
val actionMetaData = managedIndexMetaData.actionMetaData

return when {
// start a new action
stateMetaData?.name != stateName ->
ActionMetaData(this.type, Instant.now().toEpochMilli(), this.actionIndex, false, 0, 0, null)
actionMetaData?.index != this.actionIndex ->
ActionMetaData(this.type, Instant.now().toEpochMilli(), this.actionIndex, false, 0, 0, null)
// RetryAPI will reset startTime to null for actionMetaData and we'll reset it to "now" here
else -> actionMetaData.copy(startTime = actionMetaData.startTime ?: Instant.now().toEpochMilli())
}
}

/**
* The implementer of Action can change this method to correctly serialize the internals of the action
* when data is shared between nodes
Expand All @@ -72,6 +90,29 @@ abstract class Action(

final fun isFirstStep(stepName: String): Boolean = getSteps().first().name == stepName

/*
* Gets if the managedIndexMetaData reflects a state in which this action has completed successfully. Used in the
* runner when determining if the index metadata should be deleted. If the action isFinishedSuccessfully and
* deleteIndexMetadataAfterFinish is set to true, then we issue a request to delete the managedIndexConfig and its
* managedIndexMetadata.
*/
final fun isFinishedSuccessfully(managedIndexMetaData: ManagedIndexMetaData): Boolean {
val policyRetryInfo = managedIndexMetaData.policyRetryInfo
if (policyRetryInfo?.failed == true) return false
val actionMetaData = managedIndexMetaData.actionMetaData
if (actionMetaData == null || actionMetaData.failed || actionMetaData.name != this.type) return false
val stepMetaData = managedIndexMetaData.stepMetaData
if (stepMetaData == null || !isLastStep(stepMetaData.name) || stepMetaData.stepStatus != Step.StepStatus.COMPLETED) return false
return true
}

/*
* Denotes if the index metadata in the config index should be deleted for the index this action has just
* successfully finished running on. This may be used by custom actions which delete some off-cluster index,
* and following the action's success, the managed index config and metadata need to be deleted.
*/
open fun deleteIndexMetadataAfterFinish(): Boolean = false

companion object {
const val DEFAULT_RETRIES = 3L
const val CUSTOM_ACTION_FIELD = "custom"
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ data class ManagedIndexMetaData(
val policyPrimaryTerm: Long?,
val policyCompleted: Boolean?,
val rolledOver: Boolean?,
val indexCreationDate: Long?,
val transitionTo: String?,
val stateMetaData: StateMetaData?,
val actionMetaData: ActionMetaData?,
Expand Down Expand Up @@ -58,6 +59,7 @@ data class ManagedIndexMetaData(
if (policyPrimaryTerm != null) resultMap[POLICY_PRIMARY_TERM] = policyPrimaryTerm.toString()
if (policyCompleted != null) resultMap[POLICY_COMPLETED] = policyCompleted.toString()
if (rolledOver != null) resultMap[ROLLED_OVER] = rolledOver.toString()
if (indexCreationDate != null) resultMap[INDEX_CREATION_DATE] = indexCreationDate.toString()
if (transitionTo != null) resultMap[TRANSITION_TO] = transitionTo
if (stateMetaData != null) resultMap[StateMetaData.STATE] = stateMetaData.getMapValueString()
if (actionMetaData != null) resultMap[ActionMetaData.ACTION] = actionMetaData.getMapValueString()
Expand All @@ -82,6 +84,7 @@ data class ManagedIndexMetaData(
.field(POLICY_PRIMARY_TERM, policyPrimaryTerm)
.field(POLICY_COMPLETED, policyCompleted)
.field(ROLLED_OVER, rolledOver)
.field(INDEX_CREATION_DATE, indexCreationDate)
.field(TRANSITION_TO, transitionTo)
.addObject(StateMetaData.STATE, stateMetaData, params, true)
.addObject(ActionMetaData.ACTION, actionMetaData, params, true)
Expand Down Expand Up @@ -117,6 +120,8 @@ data class ManagedIndexMetaData(
builder.field(ROLLED_OVER, rolledOver)
}

if (indexCreationDate != null) builder.field(INDEX_CREATION_DATE, indexCreationDate)

if (policyCompleted == true) {
builder.field(POLICY_COMPLETED, policyCompleted)
return builder
Expand Down Expand Up @@ -145,6 +150,7 @@ data class ManagedIndexMetaData(
streamOutput.writeOptionalLong(policyPrimaryTerm)
streamOutput.writeOptionalBoolean(policyCompleted)
streamOutput.writeOptionalBoolean(rolledOver)
streamOutput.writeOptionalLong(indexCreationDate)
streamOutput.writeOptionalString(transitionTo)

streamOutput.writeOptionalWriteable(stateMetaData)
Expand Down Expand Up @@ -174,6 +180,7 @@ data class ManagedIndexMetaData(
const val POLICY_PRIMARY_TERM = "policy_primary_term"
const val POLICY_COMPLETED = "policy_completed"
const val ROLLED_OVER = "rolled_over"
const val INDEX_CREATION_DATE = "index_creation_date"
const val TRANSITION_TO = "transition_to"
const val INFO = "info"
const val ENABLED = "enabled"
Expand All @@ -186,6 +193,7 @@ data class ManagedIndexMetaData(
val policyPrimaryTerm: Long? = si.readOptionalLong()
val policyCompleted: Boolean? = si.readOptionalBoolean()
val rolledOver: Boolean? = si.readOptionalBoolean()
val indexCreationDate: Long? = si.readOptionalLong()
val transitionTo: String? = si.readOptionalString()

val state: StateMetaData? = si.readOptionalWriteable { StateMetaData.fromStreamInput(it) }
Expand All @@ -207,6 +215,7 @@ data class ManagedIndexMetaData(
policyPrimaryTerm = policyPrimaryTerm,
policyCompleted = policyCompleted,
rolledOver = rolledOver,
indexCreationDate = indexCreationDate,
transitionTo = transitionTo,
stateMetaData = state,
actionMetaData = action,
Expand All @@ -233,6 +242,7 @@ data class ManagedIndexMetaData(
var policyPrimaryTerm: Long? = null
var policyCompleted: Boolean? = null
var rolledOver: Boolean? = null
var indexCreationDate: Long? = null
var transitionTo: String? = null

var state: StateMetaData? = null
Expand All @@ -255,6 +265,7 @@ data class ManagedIndexMetaData(
POLICY_PRIMARY_TERM -> policyPrimaryTerm = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.longValue()
POLICY_COMPLETED -> policyCompleted = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.booleanValue()
ROLLED_OVER -> rolledOver = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.booleanValue()
INDEX_CREATION_DATE -> indexCreationDate = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.longValue()
TRANSITION_TO -> transitionTo = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.text()
StateMetaData.STATE -> {
state = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else StateMetaData.parse(xcp)
Expand Down Expand Up @@ -282,6 +293,7 @@ data class ManagedIndexMetaData(
policyPrimaryTerm,
policyCompleted,
rolledOver,
indexCreationDate,
transitionTo,
state,
action,
Expand Down Expand Up @@ -320,6 +332,7 @@ data class ManagedIndexMetaData(
policyPrimaryTerm = map[POLICY_PRIMARY_TERM]?.toLong(),
policyCompleted = map[POLICY_COMPLETED]?.toBoolean(),
rolledOver = map[ROLLED_OVER]?.toBoolean(),
indexCreationDate = map[INDEX_CREATION_DATE]?.toLong(),
transitionTo = map[TRANSITION_TO],
stateMetaData = StateMetaData.fromManagedIndexMetaDataMap(map),
actionMetaData = ActionMetaData.fromManagedIndexMetaDataMap(map),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
.registerSkipFlag(skipFlag)
.registerThreadPool(threadPool)
.registerExtensionChecker(extensionChecker)
.registerIndexMetadataProvider(indexMetadataProvider)

val metadataService = MetadataService(client, clusterService, skipFlag, indexManagementIndices)
val templateService = ISMTemplateService(client, clusterService, xContentRegistry, indexManagementIndices)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.cluster.state.ClusterStateResponse
import org.opensearch.action.support.IndicesOptions
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.unit.TimeValue
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
Expand Down Expand Up @@ -38,18 +39,27 @@ class DefaultIndexMetadataService(val customUUIDSetting: String? = null) : Index

response.state.metadata.indices.forEach {
// TODO waiting to add document count until it is definitely needed
val uuid = if (customUUIDSetting != null) {
it.value.settings.get(customUUIDSetting, it.value.indexUUID)
} else {
it.value.indexUUID
}
val uuid = getCustomIndexUUID(it.value)
val indexMetadata = ISMIndexMetadata(uuid, it.value.creationDate, -1)
indexNameToMetadata[it.key] = indexMetadata
}

return indexNameToMetadata
}

/*
* If an extension wants Index Management to determine cluster state indices UUID based on a custom index setting if
* present of cluster state, the extension will override this customUUID setting. This allows an index to migrate off
* cluster and back while using this persistent uuid.
*/
fun getCustomIndexUUID(indexMetadata: IndexMetadata): String {
return if (customUUIDSetting != null) {
indexMetadata.settings.get(customUUIDSetting, indexMetadata.indexUUID)
} else {
indexMetadata.indexUUID
}
}

override suspend fun getMetadataForAllIndices(client: Client, clusterService: ClusterService): Map<String, ISMIndexMetadata> {
return getMetadata(listOf("*"), client, clusterService)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class IndexMetadataProvider(
return service.getMetadata(indexNames, client, clusterService)
}

/*
* Attempts to get the index metadata for of all indexNames for each of the index types designated in the types parameter.
* Returns a map of <index type to <index names to index metadata>>
*/
suspend fun getMultiTypeISMIndexMetadata(
types: List<String> = services.keys.toList(),
indexNames: List<String>
Expand Down
Loading

0 comments on commit dc05f52

Please sign in to comment.