From dc05f52bd27b931730c328a68306e255eafbf134 Mon Sep 17 00:00:00 2001 From: Clay Downs <89109232+downsrob@users.noreply.github.com> Date: Sun, 6 Mar 2022 11:12:04 -0800 Subject: [PATCH] Refactors the managed index runner to work with extensions (#262) * Adds index creation date Signed-off-by: Robert Downs Adds NewClusterEventListeners and refactors Transition to work with custom actions Signed-off-by: Robert Downs Marks blocked actions list as deprecated Signed-off-by: Clay Downs <89109232+downsrob@users.noreply.github.com> Asserts the deprecation warning after adding to allow list in test Signed-off-by: Robert Downs Removes allow_list test Signed-off-by: Robert Downs * Fix additional rebase issues Signed-off-by: Robert Downs * Fixes failing tests Signed-off-by: Robert Downs * changes based on comments Signed-off-by: Robert Downs * Adds additional comments Signed-off-by: Clay Downs <89109232+downsrob@users.noreply.github.com> --- build.gradle | 1 - .../indexstatemanagement/Action.kt | 41 ++++++ .../indexstatemanagement/model/Decision.kt | 9 -- .../model/ManagedIndexMetaData.kt | 13 ++ .../indexmanagement/IndexManagementPlugin.kt | 1 + .../DefaultIndexMetadataService.kt | 20 ++- .../IndexMetadataProvider.kt | 4 + .../ManagedIndexRunner.kt | 135 +++++++++++++++--- .../action/TransitionsAction.kt | 2 + .../indexstatemanagement/model/State.kt | 14 +- .../opensearchapi/OpenSearchExtensions.kt | 19 ++- .../step/transition/AttemptTransitionStep.kt | 79 +++++++--- .../util/ManagedIndexUtils.kt | 67 ++++++--- .../mappings/opendistro-ism-config.json | 4 + .../mappings/opendistro-ism-history.json | 6 +- .../IndexManagementRestTestCase.kt | 2 +- .../IndexStateManagementIntegTestCase.kt | 1 + .../action/ActionRetryIT.kt | 1 + .../action/IndexStateManagementHistoryIT.kt | 6 + .../model/ManagedIndexMetaDataTests.kt | 5 + .../model/XContentTests.kt | 1 + .../runner/ManagedIndexRunnerIT.kt | 35 ++++- .../step/AttemptCloseStepTests.kt | 12 +- .../step/AttemptCreateRollupJobStepTests.kt | 2 +- .../step/AttemptDeleteStepTests.kt | 8 +- .../step/AttemptOpenStepTests.kt | 6 +- .../step/AttemptSetIndexPriorityStepTests.kt | 8 +- .../step/AttemptSetReplicaCountStepTests.kt | 6 +- .../step/AttemptSnapshotStepTests.kt | 2 +- .../step/AttemptTransitionStepTests.kt | 39 +++-- .../step/SetReadOnlyStepTests.kt | 6 +- .../step/SetReadWriteStepTests.kt | 6 +- .../step/WaitForRollupCompletionStepTests.kt | 2 +- .../step/WaitForSnapshotStepTests.kt | 20 +-- .../action/explain/ExplainResponseTests.kt | 1 + .../cached-opendistro-ism-config.json | 4 + .../cached-opendistro-ism-history.json | 6 +- 37 files changed, 454 insertions(+), 140 deletions(-) delete mode 100644 spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/Decision.kt diff --git a/build.gradle b/build.gradle index 06157b378..48447284c 100644 --- a/build.gradle +++ b/build.gradle @@ -308,7 +308,6 @@ integTest { } // TODO: Remove me after refactoring all actions - exclude 'org/opensearch/indexmanagement/indexstatemanagement/runner/*' exclude 'org/opensearch/indexmanagement/indexstatemanagement/resthandler/*' } diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Action.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Action.kt index 58780b6ca..7b127fd96 100644 --- a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Action.kt +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Action.kt @@ -10,9 +10,12 @@ import org.opensearch.common.io.stream.Writeable import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.ToXContentObject import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionTimeout +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import java.time.Instant abstract class Action( val type: String, @@ -50,6 +53,21 @@ abstract class Action( populateAction(out) } + fun getUpdatedActionMetadata(managedIndexMetaData: ManagedIndexMetaData, stateName: String): ActionMetaData { + val stateMetaData = managedIndexMetaData.stateMetaData + val actionMetaData = managedIndexMetaData.actionMetaData + + return when { + // start a new action + stateMetaData?.name != stateName -> + ActionMetaData(this.type, Instant.now().toEpochMilli(), this.actionIndex, false, 0, 0, null) + actionMetaData?.index != this.actionIndex -> + ActionMetaData(this.type, Instant.now().toEpochMilli(), this.actionIndex, false, 0, 0, null) + // RetryAPI will reset startTime to null for actionMetaData and we'll reset it to "now" here + else -> actionMetaData.copy(startTime = actionMetaData.startTime ?: Instant.now().toEpochMilli()) + } + } + /** * The implementer of Action can change this method to correctly serialize the internals of the action * when data is shared between nodes @@ -72,6 +90,29 @@ abstract class Action( final fun isFirstStep(stepName: String): Boolean = getSteps().first().name == stepName + /* + * Gets if the managedIndexMetaData reflects a state in which this action has completed successfully. Used in the + * runner when determining if the index metadata should be deleted. If the action isFinishedSuccessfully and + * deleteIndexMetadataAfterFinish is set to true, then we issue a request to delete the managedIndexConfig and its + * managedIndexMetadata. + */ + final fun isFinishedSuccessfully(managedIndexMetaData: ManagedIndexMetaData): Boolean { + val policyRetryInfo = managedIndexMetaData.policyRetryInfo + if (policyRetryInfo?.failed == true) return false + val actionMetaData = managedIndexMetaData.actionMetaData + if (actionMetaData == null || actionMetaData.failed || actionMetaData.name != this.type) return false + val stepMetaData = managedIndexMetaData.stepMetaData + if (stepMetaData == null || !isLastStep(stepMetaData.name) || stepMetaData.stepStatus != Step.StepStatus.COMPLETED) return false + return true + } + + /* + * Denotes if the index metadata in the config index should be deleted for the index this action has just + * successfully finished running on. This may be used by custom actions which delete some off-cluster index, + * and following the action's success, the managed index config and metadata need to be deleted. + */ + open fun deleteIndexMetadataAfterFinish(): Boolean = false + companion object { const val DEFAULT_RETRIES = 3L const val CUSTOM_ACTION_FIELD = "custom" diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/Decision.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/Decision.kt deleted file mode 100644 index 6fcaa98ad..000000000 --- a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/Decision.kt +++ /dev/null @@ -1,9 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.spi.indexstatemanagement.model - -// TODO: Probably need an override or priority to clear clashes if there are multiple decisions with conflicting index metadata -data class Decision(val shouldProcess: Boolean = true) diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ManagedIndexMetaData.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ManagedIndexMetaData.kt index 6883be3d9..3832fa29c 100644 --- a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ManagedIndexMetaData.kt +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ManagedIndexMetaData.kt @@ -31,6 +31,7 @@ data class ManagedIndexMetaData( val policyPrimaryTerm: Long?, val policyCompleted: Boolean?, val rolledOver: Boolean?, + val indexCreationDate: Long?, val transitionTo: String?, val stateMetaData: StateMetaData?, val actionMetaData: ActionMetaData?, @@ -58,6 +59,7 @@ data class ManagedIndexMetaData( if (policyPrimaryTerm != null) resultMap[POLICY_PRIMARY_TERM] = policyPrimaryTerm.toString() if (policyCompleted != null) resultMap[POLICY_COMPLETED] = policyCompleted.toString() if (rolledOver != null) resultMap[ROLLED_OVER] = rolledOver.toString() + if (indexCreationDate != null) resultMap[INDEX_CREATION_DATE] = indexCreationDate.toString() if (transitionTo != null) resultMap[TRANSITION_TO] = transitionTo if (stateMetaData != null) resultMap[StateMetaData.STATE] = stateMetaData.getMapValueString() if (actionMetaData != null) resultMap[ActionMetaData.ACTION] = actionMetaData.getMapValueString() @@ -82,6 +84,7 @@ data class ManagedIndexMetaData( .field(POLICY_PRIMARY_TERM, policyPrimaryTerm) .field(POLICY_COMPLETED, policyCompleted) .field(ROLLED_OVER, rolledOver) + .field(INDEX_CREATION_DATE, indexCreationDate) .field(TRANSITION_TO, transitionTo) .addObject(StateMetaData.STATE, stateMetaData, params, true) .addObject(ActionMetaData.ACTION, actionMetaData, params, true) @@ -117,6 +120,8 @@ data class ManagedIndexMetaData( builder.field(ROLLED_OVER, rolledOver) } + if (indexCreationDate != null) builder.field(INDEX_CREATION_DATE, indexCreationDate) + if (policyCompleted == true) { builder.field(POLICY_COMPLETED, policyCompleted) return builder @@ -145,6 +150,7 @@ data class ManagedIndexMetaData( streamOutput.writeOptionalLong(policyPrimaryTerm) streamOutput.writeOptionalBoolean(policyCompleted) streamOutput.writeOptionalBoolean(rolledOver) + streamOutput.writeOptionalLong(indexCreationDate) streamOutput.writeOptionalString(transitionTo) streamOutput.writeOptionalWriteable(stateMetaData) @@ -174,6 +180,7 @@ data class ManagedIndexMetaData( const val POLICY_PRIMARY_TERM = "policy_primary_term" const val POLICY_COMPLETED = "policy_completed" const val ROLLED_OVER = "rolled_over" + const val INDEX_CREATION_DATE = "index_creation_date" const val TRANSITION_TO = "transition_to" const val INFO = "info" const val ENABLED = "enabled" @@ -186,6 +193,7 @@ data class ManagedIndexMetaData( val policyPrimaryTerm: Long? = si.readOptionalLong() val policyCompleted: Boolean? = si.readOptionalBoolean() val rolledOver: Boolean? = si.readOptionalBoolean() + val indexCreationDate: Long? = si.readOptionalLong() val transitionTo: String? = si.readOptionalString() val state: StateMetaData? = si.readOptionalWriteable { StateMetaData.fromStreamInput(it) } @@ -207,6 +215,7 @@ data class ManagedIndexMetaData( policyPrimaryTerm = policyPrimaryTerm, policyCompleted = policyCompleted, rolledOver = rolledOver, + indexCreationDate = indexCreationDate, transitionTo = transitionTo, stateMetaData = state, actionMetaData = action, @@ -233,6 +242,7 @@ data class ManagedIndexMetaData( var policyPrimaryTerm: Long? = null var policyCompleted: Boolean? = null var rolledOver: Boolean? = null + var indexCreationDate: Long? = null var transitionTo: String? = null var state: StateMetaData? = null @@ -255,6 +265,7 @@ data class ManagedIndexMetaData( POLICY_PRIMARY_TERM -> policyPrimaryTerm = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.longValue() POLICY_COMPLETED -> policyCompleted = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.booleanValue() ROLLED_OVER -> rolledOver = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.booleanValue() + INDEX_CREATION_DATE -> indexCreationDate = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.longValue() TRANSITION_TO -> transitionTo = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.text() StateMetaData.STATE -> { state = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else StateMetaData.parse(xcp) @@ -282,6 +293,7 @@ data class ManagedIndexMetaData( policyPrimaryTerm, policyCompleted, rolledOver, + indexCreationDate, transitionTo, state, action, @@ -320,6 +332,7 @@ data class ManagedIndexMetaData( policyPrimaryTerm = map[POLICY_PRIMARY_TERM]?.toLong(), policyCompleted = map[POLICY_COMPLETED]?.toBoolean(), rolledOver = map[ROLLED_OVER]?.toBoolean(), + indexCreationDate = map[INDEX_CREATION_DATE]?.toLong(), transitionTo = map[TRANSITION_TO], stateMetaData = StateMetaData.fromManagedIndexMetaDataMap(map), actionMetaData = ActionMetaData.fromManagedIndexMetaDataMap(map), diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index b3b77407d..4b506f588 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -379,6 +379,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin .registerSkipFlag(skipFlag) .registerThreadPool(threadPool) .registerExtensionChecker(extensionChecker) + .registerIndexMetadataProvider(indexMetadataProvider) val metadataService = MetadataService(client, clusterService, skipFlag, indexManagementIndices) val templateService = ISMTemplateService(client, clusterService, xContentRegistry, indexManagementIndices) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/DefaultIndexMetadataService.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/DefaultIndexMetadataService.kt index 724e18f73..fb6fa43c8 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/DefaultIndexMetadataService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/DefaultIndexMetadataService.kt @@ -9,6 +9,7 @@ import org.opensearch.action.admin.cluster.state.ClusterStateRequest import org.opensearch.action.admin.cluster.state.ClusterStateResponse import org.opensearch.action.support.IndicesOptions import org.opensearch.client.Client +import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.service.ClusterService import org.opensearch.common.unit.TimeValue import org.opensearch.indexmanagement.opensearchapi.suspendUntil @@ -38,11 +39,7 @@ class DefaultIndexMetadataService(val customUUIDSetting: String? = null) : Index response.state.metadata.indices.forEach { // TODO waiting to add document count until it is definitely needed - val uuid = if (customUUIDSetting != null) { - it.value.settings.get(customUUIDSetting, it.value.indexUUID) - } else { - it.value.indexUUID - } + val uuid = getCustomIndexUUID(it.value) val indexMetadata = ISMIndexMetadata(uuid, it.value.creationDate, -1) indexNameToMetadata[it.key] = indexMetadata } @@ -50,6 +47,19 @@ class DefaultIndexMetadataService(val customUUIDSetting: String? = null) : Index return indexNameToMetadata } + /* + * If an extension wants Index Management to determine cluster state indices UUID based on a custom index setting if + * present of cluster state, the extension will override this customUUID setting. This allows an index to migrate off + * cluster and back while using this persistent uuid. + */ + fun getCustomIndexUUID(indexMetadata: IndexMetadata): String { + return if (customUUIDSetting != null) { + indexMetadata.settings.get(customUUIDSetting, indexMetadata.indexUUID) + } else { + indexMetadata.indexUUID + } + } + override suspend fun getMetadataForAllIndices(client: Client, clusterService: ClusterService): Map { return getMetadata(listOf("*"), client, clusterService) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexMetadataProvider.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexMetadataProvider.kt index c7ecc358b..32639c303 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexMetadataProvider.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexMetadataProvider.kt @@ -45,6 +45,10 @@ class IndexMetadataProvider( return service.getMetadata(indexNames, client, clusterService) } + /* + * Attempts to get the index metadata for of all indexNames for each of the index types designated in the types parameter. + * Returns a map of > + */ suspend fun getMultiTypeISMIndexMetadata( types: List = services.keys.toList(), indexNames: List diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index 1e4e70868..1e53c1657 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -16,6 +16,8 @@ import org.apache.logging.log4j.LogManager import org.opensearch.action.admin.cluster.state.ClusterStateRequest import org.opensearch.action.admin.cluster.state.ClusterStateResponse import org.opensearch.action.bulk.BackoffPolicy +import org.opensearch.action.bulk.BulkRequest +import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse import org.opensearch.action.index.IndexResponse @@ -51,15 +53,18 @@ import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndex import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.DEFAULT_JOB_INTERVAL import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.INDEX_STATE_MANAGEMENT_ENABLED import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.JOB_INTERVAL +import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE +import org.opensearch.indexmanagement.indexstatemanagement.util.MetadataCheck +import org.opensearch.indexmanagement.indexstatemanagement.util.checkMetadata +import org.opensearch.indexmanagement.indexstatemanagement.util.deleteManagedIndexMetadataRequest +import org.opensearch.indexmanagement.indexstatemanagement.util.deleteManagedIndexRequest import org.opensearch.indexmanagement.indexstatemanagement.util.getCompletedManagedIndexMetaData import org.opensearch.indexmanagement.indexstatemanagement.util.getStartingManagedIndexMetaData -import org.opensearch.indexmanagement.indexstatemanagement.util.getUpdatedActionMetaData import org.opensearch.indexmanagement.indexstatemanagement.util.hasDifferentJobInterval import org.opensearch.indexmanagement.indexstatemanagement.util.hasTimedOut import org.opensearch.indexmanagement.indexstatemanagement.util.hasVersionConflict import org.opensearch.indexmanagement.indexstatemanagement.util.isAllowed import org.opensearch.indexmanagement.indexstatemanagement.util.isFailed -import org.opensearch.indexmanagement.indexstatemanagement.util.isMetadataMoved import org.opensearch.indexmanagement.indexstatemanagement.util.isSafeToChange import org.opensearch.indexmanagement.indexstatemanagement.util.isSuccessfulDelete import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexConfigIndexRequest @@ -94,7 +99,7 @@ import org.opensearch.threadpool.ThreadPool import java.time.Instant import java.time.temporal.ChronoUnit -@Suppress("TooManyFunctions") +@Suppress("TooManyFunctions", "LargeClass") object ManagedIndexRunner : ScheduledJobRunner, CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ManagedIndexRunner")) { @@ -111,6 +116,7 @@ object ManagedIndexRunner : private lateinit var skipExecFlag: SkipExecution private lateinit var threadPool: ThreadPool private lateinit var extensionStatusChecker: ExtensionStatusChecker + private lateinit var indexMetadataProvider: IndexMetadataProvider private var indexStateManagementEnabled: Boolean = DEFAULT_ISM_ENABLED @Suppress("MagicNumber") private val savePolicyRetryPolicy = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(250), 3) @@ -188,6 +194,11 @@ object ManagedIndexRunner : return this } + fun registerIndexMetadataProvider(indexMetadataProvider: IndexMetadataProvider): ManagedIndexRunner { + this.indexMetadataProvider = indexMetadataProvider + return this + } + fun registerExtensionChecker(extensionStatusChecker: ExtensionStatusChecker): ManagedIndexRunner { this.extensionStatusChecker = extensionStatusChecker return this @@ -219,7 +230,7 @@ object ManagedIndexRunner : } } - @Suppress("ReturnCount", "ComplexMethod", "LongMethod", "ComplexCondition") + @Suppress("ReturnCount", "ComplexMethod", "LongMethod", "ComplexCondition", "NestedBlockDepth") private suspend fun runManagedIndexConfig(managedIndexConfig: ManagedIndexConfig) { logger.debug("Run job for index ${managedIndexConfig.index}") // doing a check of local cluster health as we do not want to overload master node with potentially a lot of calls @@ -228,18 +239,40 @@ object ManagedIndexRunner : return } - // Get current IndexMetaData and ManagedIndexMetaData - val indexMetaData = getIndexMetadata(managedIndexConfig.index) - if (indexMetaData == null) { - logger.warn("Failed to retrieve IndexMetadata.") + val (managedIndexMetaData, getMetadataSuccess) = client.getManagedIndexMetadata(managedIndexConfig.indexUuid) + if (!getMetadataSuccess) { + logger.info("Failed to retrieve managed index metadata of index [${managedIndexConfig.index}] from config index, abort this run.") return } - val managedIndexMetaData = indexMetaData.getManagedIndexMetadata(client) - val clusterStateMetadata = indexMetaData.getManagedIndexMetadata() - if (!isMetadataMoved(clusterStateMetadata, managedIndexMetaData, logger)) { - logger.info("Skipping execution while pending migration of metadata for ${managedIndexConfig.jobName}") - return + // Check the cluster state for the index metadata + var clusterStateIndexMetadata = getIndexMetadata(managedIndexConfig.index) + val defaultIndexMetadataService = indexMetadataProvider.services[DEFAULT_INDEX_TYPE] as DefaultIndexMetadataService + val clusterStateIndexUUID = clusterStateIndexMetadata?.let { defaultIndexMetadataService.getCustomIndexUUID(it) } + // If the index metadata is null, the index is not in the cluster state. If the index metadata is not null, but + // the cluster state index uuid differs from the one in the managed index config then the config is referring + // to a different index which does not exist in the cluster. We need to check all of the extensions to confirm an index exists + if (clusterStateIndexMetadata == null || clusterStateIndexUUID != managedIndexConfig.indexUuid) { + clusterStateIndexMetadata = null + // If the cluster state/default index type didn't have an index with a matching name and uuid combination, try all other index types + val nonDefaultIndexTypes = indexMetadataProvider.services.keys.filter { it != DEFAULT_INDEX_TYPE } + val multiTypeIndexNameToMetaData = + indexMetadataProvider.getMultiTypeISMIndexMetadata(nonDefaultIndexTypes, listOf(managedIndexConfig.index)) + val someTypeMatchedUuid = multiTypeIndexNameToMetaData.values.any { + it[managedIndexConfig.index]?.indexUuid == managedIndexConfig.indexUuid + } + // If no index types had an index with a matching name and uuid combination, return + if (!someTypeMatchedUuid) { + logger.warn("Failed to find IndexMetadata for ${managedIndexConfig.index}.") + return + } + } else { + val clusterStateMetadata = clusterStateIndexMetadata.getManagedIndexMetadata() + val metadataCheck = checkMetadata(clusterStateMetadata, managedIndexMetaData, managedIndexConfig.indexUuid, logger) + if (metadataCheck != MetadataCheck.SUCCESS) { + logger.info("Skipping execution while metadata status is $metadataCheck") + return + } } // If policy or managedIndexMetaData is null then initialize @@ -271,10 +304,13 @@ object ManagedIndexRunner : } val state = policy.getStateToExecute(managedIndexMetaData) - val action: Action? = state?.getActionToExecute(managedIndexMetaData.copy(user = policy.user, threadContext = threadPool.threadContext)) + val action: Action? = state?.getActionToExecute( + managedIndexMetaData.copy(user = policy.user, threadContext = threadPool.threadContext), + indexMetadataProvider + ) val stepContext = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) val step: Step? = action?.getStepToExecute(stepContext) - val currentActionMetaData = action?.getUpdatedActionMetaData(managedIndexMetaData, state) + val currentActionMetaData = action?.getUpdatedActionMetadata(managedIndexMetaData, state.name) // If Index State Management is disabled and the current step is not null and safe to disable on // then disable the job and return early @@ -383,6 +419,16 @@ object ManagedIndexRunner : return } + // If a custom action deletes some off-cluster index and has deleteIndexMetadataAfterFinish set to true, + // then when the action successfully finishes, we will delete the managed index config and metadata. We do not + // need to do this for the standard delete action as the coordinator picks up the index deletion and removes the config + if (action.isFinishedSuccessfully(executedManagedIndexMetaData)) { + if (action.deleteIndexMetadataAfterFinish()) { + deleteFromManagedIndex(managedIndexConfig, action.type) + return + } + } + if (!updateManagedIndexMetaData(executedManagedIndexMetaData, updateResult).metadataSaved) { logger.error("Failed to update ManagedIndexMetaData after executing the Step : ${step.name}") } @@ -418,7 +464,7 @@ object ManagedIndexRunner : getInitializedManagedIndexMetaData(managedIndexMetaData, managedIndexConfig, policy) } - updateManagedIndexMetaData(updatedManagedIndexMetaData) + updateManagedIndexMetaData(updatedManagedIndexMetaData, create = managedIndexMetaData == null) } @Suppress("ReturnCount", "BlockingMethodInNonBlockingContext") @@ -497,7 +543,7 @@ object ManagedIndexRunner : } } - private fun getFailedInitializedManagedIndexMetaData( + private suspend fun getFailedInitializedManagedIndexMetaData( managedIndexMetaData: ManagedIndexMetaData?, managedIndexConfig: ManagedIndexConfig, policyID: String @@ -514,6 +560,7 @@ object ManagedIndexRunner : policyPrimaryTerm = null, policyCompleted = false, rolledOver = false, + indexCreationDate = getIndexCreationDate(managedIndexConfig), transitionTo = null, stateMetaData = null, actionMetaData = null, @@ -541,6 +588,7 @@ object ManagedIndexRunner : policyPrimaryTerm = policy.primaryTerm, policyCompleted = false, rolledOver = false, + indexCreationDate = getIndexCreationDate(managedIndexConfig), transitionTo = null, stateMetaData = stateMetaData, actionMetaData = null, @@ -589,7 +637,8 @@ object ManagedIndexRunner : */ private suspend fun updateManagedIndexMetaData( managedIndexMetaData: ManagedIndexMetaData, - lastUpdateResult: UpdateMetadataResult? = null + lastUpdateResult: UpdateMetadataResult? = null, + create: Boolean = false ): UpdateMetadataResult { var result = UpdateMetadataResult() if (!imIndices.attemptUpdateConfigIndexMapping()) { @@ -602,7 +651,7 @@ object ManagedIndexRunner : metadata = managedIndexMetaData.copy(seqNo = lastUpdateResult.seqNo, primaryTerm = lastUpdateResult.primaryTerm) } - val indexRequest = managedIndexMetadataIndexRequest(metadata) + val indexRequest = managedIndexMetadataIndexRequest(metadata, create = create) try { updateMetaDataRetryPolicy.retry(logger) { val indexResponse: IndexResponse = client.suspendUntil { index(indexRequest, it) } @@ -764,9 +813,6 @@ object ManagedIndexRunner : val response: ClusterStateResponse = client.admin().cluster().suspendUntil { state(clusterStateRequest, it) } indexMetaData = response.state.metadata.indices.firstOrNull()?.value - if (indexMetaData == null) { - logger.error("Could not find IndexMetaData in master cluster state for $index") - } } catch (e: Exception) { logger.error("Failed to get IndexMetaData from master cluster state for index=$index", e) } @@ -795,4 +841,49 @@ object ManagedIndexRunner : Instant.ofEpochMilli(requireNotNull(startTime)) } } + + /** + * Get the index creation date for the first time to cache it on the ManagedIndexMetadata + */ + @Suppress("ReturnCount") + private suspend fun getIndexCreationDate(managedIndexConfig: ManagedIndexConfig): Long? { + try { + val multiTypeIndexNameToMetaData = indexMetadataProvider.getMultiTypeISMIndexMetadata(indexNames = listOf(managedIndexConfig.index)) + // the managedIndexConfig.indexUuid should be unique across all index types + val indexCreationDate = multiTypeIndexNameToMetaData.values.firstOrNull { + it[managedIndexConfig.index]?.indexUuid == managedIndexConfig.indexUuid + }?.get(managedIndexConfig.index)?.indexCreationDate + return indexCreationDate + } catch (e: Exception) { + logger.error("Failed to get the index creation date", e) + } + return null + } + + /** + * Deletes a managedIndexConfig and its managedIndexMetadata. Used after an action has successfully completely + * and the action has deleteIndexMetadataAfterFinish set to true. This should only be set to true for custom + * actions added by extensions, as when indices in the cluster are deleted,the deletion should get picked up + * and the ism config metadata for the index will be removed, but if the index is not in the cluster, as it + * might be for some other index type, we have to do this manually + */ + private suspend fun deleteFromManagedIndex(managedIndexConfig: ManagedIndexConfig, actionType: String) { + try { + val bulkRequest = BulkRequest() + .add(deleteManagedIndexRequest(managedIndexConfig.indexUuid)) + .add(deleteManagedIndexMetadataRequest(managedIndexConfig.indexUuid)) + + val bulkResponse: BulkResponse = client.suspendUntil { bulk(bulkRequest, it) } + for (bulkItemResponse in bulkResponse) { + if (bulkItemResponse.isFailed) { + logger.warn( + "Failed to delete managed index job/metadata [id=${bulkItemResponse.id}] for ${managedIndexConfig.index}" + + " after a successful $actionType [result=${bulkItemResponse.failureMessage}]" + ) + } + } + } catch (e: Exception) { + logger.warn("Failed to delete managed index job for for ${managedIndexConfig.index} after a successful $actionType", e) + } + } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransitionsAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransitionsAction.kt index c78b1af14..60f3a7929 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransitionsAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransitionsAction.kt @@ -5,6 +5,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.action +import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider import org.opensearch.indexmanagement.indexstatemanagement.model.Transition import org.opensearch.indexmanagement.indexstatemanagement.step.transition.AttemptTransitionStep import org.opensearch.indexmanagement.spi.indexstatemanagement.Action @@ -13,6 +14,7 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext class TransitionsAction( val transitions: List, + val indexMetadataProvider: IndexMetadataProvider ) : Action(name, -1) { private val attemptTransitionStep = AttemptTransitionStep(this) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt index cb70964e2..d20169daa 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt @@ -15,6 +15,7 @@ import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentParser.Token import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken import org.opensearch.indexmanagement.indexstatemanagement.ISMActionsParser +import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider import org.opensearch.indexmanagement.indexstatemanagement.action.DeleteAction import org.opensearch.indexmanagement.indexstatemanagement.action.TransitionsAction import org.opensearch.indexmanagement.spi.indexstatemanagement.Action @@ -31,10 +32,10 @@ data class State( init { require(name.isNotBlank()) { "State must contain a valid name" } var hasDelete = false - actions.forEach { actionConfig -> + actions.forEach { action -> // dont allow actions after delete as they will never happen require(!hasDelete) { "State=$name must not contain an action after a delete action" } - hasDelete = actionConfig.type == DeleteAction.name + hasDelete = action.type == DeleteAction.name || action.deleteIndexMetadataAfterFinish() } // dont allow transitions if state contains delete @@ -68,17 +69,18 @@ data class State( } fun getActionToExecute( - managedIndexMetaData: ManagedIndexMetaData + managedIndexMetaData: ManagedIndexMetaData, + indexMetadataProvider: IndexMetadataProvider ): Action? { var actionConfig: Action? val actionMetaData = managedIndexMetaData.actionMetaData // If we are transitioning to this state get the first action in the state // If the action/actionIndex are null it means we just initialized and should get the first action from the state if (managedIndexMetaData.transitionTo != null || actionMetaData == null) { - actionConfig = this.actions.firstOrNull() ?: TransitionsAction(this.transitions) + actionConfig = this.actions.firstOrNull() ?: TransitionsAction(this.transitions, indexMetadataProvider) } else if (actionMetaData.name == TransitionsAction.name) { // If the current action is transition and we do not have a transitionTo set then we should be in Transition - actionConfig = TransitionsAction(this.transitions) + actionConfig = TransitionsAction(this.transitions, indexMetadataProvider) } else { // Get the current actionConfig that is in the ManagedIndexMetaData actionConfig = this.actions.filterIndexed { index, config -> @@ -92,7 +94,7 @@ data class State( if (stepMetaData != null && stepMetaData.stepStatus == Step.StepStatus.COMPLETED) { val action = actionConfig if (action.isLastStep(stepMetaData.name)) { - actionConfig = this.actions.getOrNull(actionMetaData.index + 1) ?: TransitionsAction(this.transitions) + actionConfig = this.actions.getOrNull(actionMetaData.index + 1) ?: TransitionsAction(this.transitions, indexMetadataProvider) } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt index 77ed24c97..d049d9848 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt @@ -82,18 +82,22 @@ fun getUuidsForClosedIndices(state: ClusterState): MutableList { fun Map.filterNotNullValues(): Map = filterValues { it != null } as Map -// get metadata from config index using doc id +/** + * Get metadata from config index + * + * @return metadata object and get call successful or not + */ @Suppress("ReturnCount") -suspend fun IndexMetadata.getManagedIndexMetadata(client: Client): ManagedIndexMetaData? { +suspend fun Client.getManagedIndexMetadata(indexUUID: String): Pair { try { val getRequest = GetRequest(INDEX_MANAGEMENT_INDEX, managedIndexMetadataID(indexUUID)) - .routing(this.indexUUID) - val getResponse: GetResponse = client.suspendUntil { get(getRequest, it) } + .routing(indexUUID) + val getResponse: GetResponse = this.suspendUntil { get(getRequest, it) } if (!getResponse.isExists || getResponse.isSourceEmpty) { - return null + return Pair(null, true) } - return withContext(Dispatchers.IO) { + val metadata = withContext(Dispatchers.IO) { val xcp = XContentHelper.createParser( NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, @@ -101,6 +105,7 @@ suspend fun IndexMetadata.getManagedIndexMetadata(client: Client): ManagedIndexM ) ManagedIndexMetaData.parseWithType(xcp, getResponse.id, getResponse.seqNo, getResponse.primaryTerm) } + return Pair(metadata, true) } catch (e: Exception) { when (e) { is IndexNotFoundException, is NoShardAvailableActionException -> { @@ -109,7 +114,7 @@ suspend fun IndexMetadata.getManagedIndexMetadata(client: Client): ManagedIndexM else -> log.error("Failed to get metadata", e) } - return null + return Pair(null, false) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transition/AttemptTransitionStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transition/AttemptTransitionStep.kt index 3c11acf43..843987b4e 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transition/AttemptTransitionStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transition/AttemptTransitionStep.kt @@ -9,9 +9,12 @@ import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.action.admin.indices.stats.IndicesStatsRequest import org.opensearch.action.admin.indices.stats.IndicesStatsResponse +import org.opensearch.cluster.service.ClusterService import org.opensearch.common.unit.ByteSizeValue +import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider import org.opensearch.indexmanagement.indexstatemanagement.action.TransitionsAction import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getOldestRolloverTime +import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE import org.opensearch.indexmanagement.indexstatemanagement.util.evaluateConditions import org.opensearch.indexmanagement.indexstatemanagement.util.hasStatsConditions import org.opensearch.indexmanagement.opensearchapi.getUsefulCauseString @@ -31,12 +34,13 @@ class AttemptTransitionStep(private val action: TransitionsAction) : Step(name) private var policyCompleted: Boolean = false private var info: Map? = null - @Suppress("ReturnCount", "ComplexMethod", "LongMethod") + @Suppress("ReturnCount", "ComplexMethod", "LongMethod", "NestedBlockDepth") override suspend fun execute(): Step { val context = this.context ?: return this val indexName = context.metadata.index val clusterService = context.clusterService val transitions = action.transitions + val indexMetadataProvider = action.indexMetadataProvider try { if (transitions.isEmpty()) { logger.info("$indexName transitions are empty, completing policy") @@ -45,8 +49,10 @@ class AttemptTransitionStep(private val action: TransitionsAction) : Step(name) return this } - val indexMetaData = clusterService.state().metadata().index(indexName) - val indexCreationDate = indexMetaData.creationDate + val indexMetadata = clusterService.state().metadata().index(indexName) + val inCluster = clusterService.state().metadata().hasIndex(indexName) && indexMetadata?.indexUUID == context.metadata.indexUuid + + val indexCreationDate = getIndexCreationDate(context.metadata, indexMetadataProvider, clusterService, indexName, inCluster) val indexCreationDateInstant = Instant.ofEpochMilli(indexCreationDate) if (indexCreationDate == -1L) { logger.warn("$indexName had an indexCreationDate=-1L, cannot use for comparison") @@ -54,8 +60,8 @@ class AttemptTransitionStep(private val action: TransitionsAction) : Step(name) val stepStartTime = getStepStartTime(context.metadata) var numDocs: Long? = null var indexSize: ByteSizeValue? = null - val rolloverDate: Instant? = indexMetaData.getOldestRolloverTime() + val rolloverDate: Instant? = if (inCluster) indexMetadata.getOldestRolloverTime() else null if (transitions.any { it.conditions?.rolloverAge !== null }) { // if we have a transition with rollover age condition, then we must have a rollover date // otherwise fail this transition @@ -69,22 +75,27 @@ class AttemptTransitionStep(private val action: TransitionsAction) : Step(name) } if (transitions.any { it.hasStatsConditions() }) { - val statsRequest = IndicesStatsRequest() - .indices(indexName).clear().docs(true) - val statsResponse: IndicesStatsResponse = context.client.admin().indices().suspendUntil { stats(statsRequest, it) } + if (inCluster) { + val statsRequest = IndicesStatsRequest() + .indices(indexName).clear().docs(true) + val statsResponse: IndicesStatsResponse = + context.client.admin().indices().suspendUntil { stats(statsRequest, it) } - if (statsResponse.status != RestStatus.OK) { - val message = getFailedStatsMessage(indexName) - logger.warn("$message - ${statsResponse.status}") - stepStatus = StepStatus.FAILED - info = mapOf( - "message" to message, - "shard_failures" to statsResponse.shardFailures.map { it.getUsefulCauseString() } - ) - return this + if (statsResponse.status != RestStatus.OK) { + val message = getFailedStatsMessage(indexName) + logger.warn("$message - ${statsResponse.status}") + stepStatus = StepStatus.FAILED + info = mapOf( + "message" to message, + "shard_failures" to statsResponse.shardFailures.map { it.getUsefulCauseString() } + ) + return this + } + numDocs = statsResponse.primaries.getDocs()?.count ?: 0 + indexSize = ByteSizeValue(statsResponse.primaries.getDocs()?.totalSizeInBytes ?: 0) + } else { + logger.warn("Cannot use index size/doc count transition conditions for index [$indexName] that does not exist in cluster") } - numDocs = statsResponse.primaries.getDocs()?.count ?: 0 - indexSize = ByteSizeValue(statsResponse.primaries.getDocs()?.totalSizeInBytes ?: 0) } // Find the first transition that evaluates to true and get the state to transition to, otherwise return null if none are true @@ -133,6 +144,38 @@ class AttemptTransitionStep(private val action: TransitionsAction) : Step(name) ) } + @Suppress("ReturnCount") + private suspend fun getIndexCreationDate( + metadata: ManagedIndexMetaData, + indexMetadataProvider: IndexMetadataProvider, + clusterService: ClusterService, + indexName: String, + inCluster: Boolean + ): Long { + try { + // If we do have an index creation date cached already then use that + metadata.indexCreationDate?.let { return it } + // Otherwise, check if this index is in cluster state first + return if (inCluster) { + val clusterStateMetadata = clusterService.state().metadata() + if (clusterStateMetadata.hasIndex(indexName)) clusterStateMetadata.index(indexName).creationDate else -1L + } else { + // And then finally check all other index types which may not be in the cluster + val nonDefaultIndexTypes = indexMetadataProvider.services.keys.filter { it != DEFAULT_INDEX_TYPE } + val multiTypeIndexNameToMetaData = indexMetadataProvider.getMultiTypeISMIndexMetadata(nonDefaultIndexTypes, listOf(indexName)) + // the managedIndexConfig.indexUuid should be unique across all index types + val indexCreationDate = multiTypeIndexNameToMetaData.values.firstOrNull { + it[indexName]?.indexUuid == metadata.indexUuid + }?.get(indexName)?.indexCreationDate + indexCreationDate ?: -1 + } + } catch (e: Exception) { + logger.error("Failed to get index creation date for $indexName", e) + } + // -1L index age is ignored during condition checks + return -1L + } + override fun isIdempotent() = true companion object { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt index 251689566..b23227083 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt @@ -281,21 +281,6 @@ fun State.getUpdatedStateMetaData(managedIndexMetaData: ManagedIndexMetaData): S } } -fun Action.getUpdatedActionMetaData(managedIndexMetaData: ManagedIndexMetaData, state: State): ActionMetaData { - val stateMetaData = managedIndexMetaData.stateMetaData - val actionMetaData = managedIndexMetaData.actionMetaData - - return when { - // start a new action - stateMetaData?.name != state.name -> - ActionMetaData(this.type, Instant.now().toEpochMilli(), this.actionIndex, false, 0, 0, null) - actionMetaData?.index != this.actionIndex -> - ActionMetaData(this.type, Instant.now().toEpochMilli(), this.actionIndex, false, 0, 0, null) - // RetryAPI will reset startTime to null for actionMetaData and we'll reset it to "now" here - else -> actionMetaData.copy(startTime = actionMetaData.startTime ?: Instant.now().toEpochMilli()) - } -} - fun Action.shouldBackoff(actionMetaData: ActionMetaData?, actionRetry: ActionRetry?): Pair? { return this.configRetry?.backoff?.shouldBackoff(actionMetaData, actionRetry) } @@ -332,7 +317,7 @@ fun ManagedIndexMetaData.getStartingManagedIndexMetaData( } val updatedStateMetaData = state.getUpdatedStateMetaData(this) - val updatedActionMetaData = action.getUpdatedActionMetaData(this, state) + val updatedActionMetaData = action.getUpdatedActionMetadata(this, state.name) val updatedStepMetaData = step.getStartingStepMetaData(this) return this.copy( @@ -517,6 +502,56 @@ fun isMetadataMoved( return true } +/** + * Check if cluster state metadata has been moved to config index + * + * log warning if remaining cluster state metadata has newer last_updated_time + */ +@Suppress("ReturnCount", "ComplexCondition", "ComplexMethod") +fun checkMetadata( + clusterStateMetadata: ManagedIndexMetaData?, + configIndexMetadata: Any?, + currentIndexUuid: String?, + logger: Logger +): MetadataCheck { + // indexUuid saved in ISM metadata may be outdated + // if an index restored from snapshot + val indexUuid1 = clusterStateMetadata?.indexUuid + val indexUuid2 = when (configIndexMetadata) { + is ManagedIndexMetaData -> configIndexMetadata.indexUuid + is Map<*, *> -> configIndexMetadata["index_uuid"] + else -> null + } as String? + if ((indexUuid1 != null && indexUuid1 != currentIndexUuid) || + (indexUuid2 != null && indexUuid2 != currentIndexUuid) + ) { + return MetadataCheck.CORRUPT + } + + if (clusterStateMetadata != null) { + if (configIndexMetadata == null) return MetadataCheck.PENDING + + // compare last updated time between 2 metadatas + val t1 = clusterStateMetadata.stepMetaData?.startTime + val t2 = when (configIndexMetadata) { + is ManagedIndexMetaData -> configIndexMetadata.stepMetaData?.startTime + is Map<*, *> -> { + val stepMetadata = configIndexMetadata["step"] as Map? + stepMetadata?.get("start_time") + } + else -> null + } as Long? + if (t1 != null && t2 != null && t1 > t2) { + logger.warn("Cluster state metadata get updates after moved for [${clusterStateMetadata.index}]") + } + } + return MetadataCheck.SUCCESS +} + +enum class MetadataCheck { + PENDING, CORRUPT, SUCCESS +} + private val baseMessageLogger = LogManager.getLogger(BaseMessage::class.java) fun BaseMessage.isHostInDenylist(networks: List): Boolean { diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json index 5c93e39c5..efb14714b 100644 --- a/src/main/resources/mappings/opendistro-ism-config.json +++ b/src/main/resources/mappings/opendistro-ism-config.json @@ -662,6 +662,10 @@ "rolled_over": { "type": "boolean" }, + "index_creation_date": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, "transition_to": { "type": "text", "fields": { diff --git a/src/main/resources/mappings/opendistro-ism-history.json b/src/main/resources/mappings/opendistro-ism-history.json index 04d3c46d8..44c7ab896 100644 --- a/src/main/resources/mappings/opendistro-ism-history.json +++ b/src/main/resources/mappings/opendistro-ism-history.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 3 + "schema_version": 4 }, "dynamic": "strict", "properties": { @@ -37,6 +37,10 @@ "rolled_over": { "type": "boolean" }, + "index_creation_date": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, "transition_to": { "type": "keyword" }, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt index 3ea0b44a2..874936330 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt @@ -28,7 +28,7 @@ import javax.management.remote.JMXServiceURL abstract class IndexManagementRestTestCase : ODFERestTestCase() { val configSchemaVersion = 13 - val historySchemaVersion = 3 + val historySchemaVersion = 4 // Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as // they do not go through the pending task queue. Ideally this should probably be written in a way to wait for the diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementIntegTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementIntegTestCase.kt index 0829a00b0..df3e87d5d 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementIntegTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementIntegTestCase.kt @@ -71,6 +71,7 @@ abstract class IndexStateManagementIntegTestCase : OpenSearchIntegTestCase() { policyPrimaryTerm = 1, policyCompleted = false, rolledOver = false, + indexCreationDate = null, transitionTo = null, stateMetaData = StateMetaData("ReplicaCountState", 1234), actionMetaData = null, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ActionRetryIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ActionRetryIT.kt index 02e14fc11..cf05dd34d 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ActionRetryIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ActionRetryIT.kt @@ -145,6 +145,7 @@ class ActionRetryIT : IndexStateManagementRestTestCase() { ManagedIndexMetaData.POLICY_SEQ_NO to policySeq::equals, ManagedIndexMetaData.POLICY_PRIMARY_TERM to policyPrimaryTerm::equals, ManagedIndexMetaData.ROLLED_OVER to false::equals, + ManagedIndexMetaData.INDEX_CREATION_DATE to fun(indexCreationDate: Any?): Boolean = (indexCreationDate as Long) > 1L, StateMetaData.STATE to fun(stateMetaDataMap: Any?): Boolean = assertStateEquals(StateMetaData("Ingest", Instant.now().toEpochMilli()), stateMetaDataMap), ActionMetaData.ACTION to fun(actionMetaDataMap: Any?): Boolean = diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/IndexStateManagementHistoryIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/IndexStateManagementHistoryIT.kt index 149176e44..1f80bb245 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/IndexStateManagementHistoryIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/IndexStateManagementHistoryIT.kt @@ -76,6 +76,7 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { policyPrimaryTerm = actualHistory.policyPrimaryTerm, policyCompleted = null, rolledOver = null, + indexCreationDate = actualHistory.indexCreationDate, transitionTo = null, stateMetaData = StateMetaData("ReadOnlyState", actualHistory.stateMetaData!!.startTime), actionMetaData = ActionMetaData(ReadOnlyAction.name, actualHistory.actionMetaData!!.startTime, 0, false, 0, 0, null), @@ -142,6 +143,7 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { policyPrimaryTerm = actualHistory.policyPrimaryTerm, policyCompleted = null, rolledOver = null, + indexCreationDate = actualHistory.indexCreationDate, transitionTo = null, stateMetaData = StateMetaData("ReadOnlyState", actualHistory.stateMetaData!!.startTime), actionMetaData = ActionMetaData(ReadOnlyAction.name, actualHistory.actionMetaData!!.startTime, 0, false, 0, 0, null), @@ -208,6 +210,7 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { policyPrimaryTerm = actualHistory.policyPrimaryTerm, policyCompleted = null, rolledOver = null, + indexCreationDate = actualHistory.indexCreationDate, transitionTo = null, stateMetaData = StateMetaData("ReadOnlyState", actualHistory.stateMetaData!!.startTime), actionMetaData = ActionMetaData(ReadOnlyAction.name, actualHistory.actionMetaData!!.startTime, 0, false, 0, 0, null), @@ -268,6 +271,7 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { policyPrimaryTerm = actualHistory.policyPrimaryTerm, policyCompleted = null, rolledOver = null, + indexCreationDate = actualHistory.indexCreationDate, transitionTo = null, stateMetaData = StateMetaData("ReadOnlyState", actualHistory.stateMetaData!!.startTime), actionMetaData = null, @@ -298,6 +302,7 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { policyPrimaryTerm = actualHistory1.policyPrimaryTerm, policyCompleted = null, rolledOver = null, + indexCreationDate = actualHistory1.indexCreationDate, transitionTo = null, stateMetaData = StateMetaData(states[0].name, actualHistory1.stateMetaData!!.startTime), actionMetaData = ActionMetaData(ReadOnlyAction.name, actualHistory1.actionMetaData!!.startTime, 0, false, 0, 0, null), @@ -366,6 +371,7 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { policyPrimaryTerm = actualHistory.policyPrimaryTerm, policyCompleted = null, rolledOver = null, + indexCreationDate = actualHistory.indexCreationDate, transitionTo = null, stateMetaData = StateMetaData(name = states[0].name, startTime = actualHistory.stateMetaData!!.startTime), actionMetaData = null, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexMetaDataTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexMetaDataTests.kt index da55bf768..e16f91319 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexMetaDataTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexMetaDataTests.kt @@ -17,6 +17,7 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaDat import org.opensearch.test.OpenSearchTestCase import java.io.ByteArrayInputStream import java.io.ByteArrayOutputStream +import java.time.Instant class ManagedIndexMetaDataTests : OpenSearchTestCase() { @@ -29,6 +30,7 @@ class ManagedIndexMetaDataTests : OpenSearchTestCase() { policyPrimaryTerm = 1, policyCompleted = null, rolledOver = null, + indexCreationDate = Instant.now().toEpochMilli(), transitionTo = null, stateMetaData = StateMetaData("close-index", 1234), actionMetaData = null, @@ -49,6 +51,7 @@ class ManagedIndexMetaDataTests : OpenSearchTestCase() { policyPrimaryTerm = 1, policyCompleted = null, rolledOver = null, + indexCreationDate = null, transitionTo = null, stateMetaData = StateMetaData("close-index", 1234), actionMetaData = ActionMetaData("close", 4321, 0, false, 0, 0, null), @@ -69,6 +72,7 @@ class ManagedIndexMetaDataTests : OpenSearchTestCase() { policyPrimaryTerm = 1, policyCompleted = null, rolledOver = null, + indexCreationDate = null, transitionTo = null, stateMetaData = StateMetaData("close-index", 1234), actionMetaData = ActionMetaData("close", 4321, 0, false, 0, 0, ActionProperties(3)), @@ -89,6 +93,7 @@ class ManagedIndexMetaDataTests : OpenSearchTestCase() { policyPrimaryTerm = 1, policyCompleted = null, rolledOver = false, + indexCreationDate = null, transitionTo = null, stateMetaData = StateMetaData("rollover-index", 1234), actionMetaData = ActionMetaData("rollover", 4321, 0, false, 0, 0, null), diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/XContentTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/XContentTests.kt index e255ec23c..7397d4d40 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/XContentTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/XContentTests.kt @@ -232,6 +232,7 @@ class XContentTests : OpenSearchTestCase() { policyPrimaryTerm = randomNonNegativeLong(), policyCompleted = null, rolledOver = null, + indexCreationDate = null, transitionTo = randomAlphaOfLength(10), stateMetaData = null, actionMetaData = null, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/runner/ManagedIndexRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/runner/ManagedIndexRunnerIT.kt index 370342476..38d013232 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/runner/ManagedIndexRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/runner/ManagedIndexRunnerIT.kt @@ -5,11 +5,32 @@ package org.opensearch.indexmanagement.indexstatemanagement.runner +import org.opensearch.indexmanagement.indexstatemanagement.ISMActionsParser import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase +import org.opensearch.indexmanagement.indexstatemanagement.action.OpenAction +import org.opensearch.indexmanagement.indexstatemanagement.action.ReadOnlyAction +import org.opensearch.indexmanagement.indexstatemanagement.model.Policy +import org.opensearch.indexmanagement.indexstatemanagement.model.State +import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification +import org.opensearch.indexmanagement.indexstatemanagement.randomPolicy +import org.opensearch.indexmanagement.indexstatemanagement.randomReadOnlyActionConfig +import org.opensearch.indexmanagement.indexstatemanagement.randomReadWriteActionConfig +import org.opensearch.indexmanagement.indexstatemanagement.randomState +import org.opensearch.indexmanagement.indexstatemanagement.randomTransition +import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings +import org.opensearch.indexmanagement.indexstatemanagement.step.readonly.SetReadOnlyStep +import org.opensearch.indexmanagement.indexstatemanagement.step.readwrite.SetReadWriteStep +import org.opensearch.indexmanagement.indexstatemanagement.step.transition.AttemptTransitionStep +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.PolicyRetryInfoMetaData +import org.opensearch.indexmanagement.waitFor +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule +import java.time.Instant +import java.time.temporal.ChronoUnit class ManagedIndexRunnerIT : IndexStateManagementRestTestCase() { - /*fun `test version conflict fails job`() { + fun `test version conflict fails job`() { val indexName = "version_conflict_index" val policyID = "version_conflict_policy" val actionConfig = OpenAction(0) @@ -72,6 +93,12 @@ class ManagedIndexRunnerIT : IndexStateManagementRestTestCase() { // init policy updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { assertEquals(createdPolicy.id, getManagedIndexConfigByDocId(managedIndexConfig.id)?.policyID) } + // change cluster job interval setting to 2 (minutes) + updateClusterSetting(ManagedIndexSettings.JOB_INTERVAL.key, "2") + // fast forward to next execution where at the end we should change the job interval time + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { (getManagedIndexConfigByDocId(managedIndexConfig.id)?.jobSchedule as? IntervalSchedule)?.interval == 2 } waitFor { assertEquals(createdPolicy.id, getManagedIndexConfigByDocId(managedIndexConfig.id)?.policyID) val currInterval = (getManagedIndexConfigByDocId(managedIndexConfig.id)?.jobSchedule as? IntervalSchedule)?.interval @@ -142,8 +169,8 @@ class ManagedIndexRunnerIT : IndexStateManagementRestTestCase() { waitFor { assertEquals(AttemptTransitionStep.getSuccessMessage(indexName, firstState.name), getExplainManagedIndexMetaData(indexName).info?.get("message")) } // remove read_only from the allowlist - val allowedActions = ActionConfig.ActionType.values().toList() - .filter { actionType -> actionType != ActionConfig.ActionType.READ_ONLY } + val allowedActions = ISMActionsParser.instance.parsers.map { it.getActionType() }.toList() + .filter { actionType -> actionType != ReadOnlyAction.name } .joinToString(prefix = "[", postfix = "]") { string -> "\"$string\"" } updateClusterSetting(ManagedIndexSettings.ALLOW_LIST.key, allowedActions, escapeValue = false) @@ -188,5 +215,5 @@ class ManagedIndexRunnerIT : IndexStateManagementRestTestCase() { val currJitter = getManagedIndexConfigByDocId(newManagedIndexConfig.id)?.jitter assertEquals("Failed to update ManagedIndexConfig jitter", newJitter, currJitter) } - }*/ + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCloseStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCloseStepTests.kt index 1acbed656..11e01c6a5 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCloseStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCloseStepTests.kt @@ -39,7 +39,7 @@ class AttemptCloseStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(closeIndexResponse, null))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptCloseStep = AttemptCloseStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptCloseStep.preExecute(logger, context).execute() @@ -53,7 +53,7 @@ class AttemptCloseStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(closeIndexResponse, null))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptCloseStep = AttemptCloseStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptCloseStep.preExecute(logger, context).execute() @@ -67,7 +67,7 @@ class AttemptCloseStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptCloseStep = AttemptCloseStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptCloseStep.preExecute(logger, context).execute() @@ -81,7 +81,7 @@ class AttemptCloseStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptCloseStep = AttemptCloseStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptCloseStep.preExecute(logger, context).execute() @@ -95,7 +95,7 @@ class AttemptCloseStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptCloseStep = AttemptCloseStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptCloseStep.preExecute(logger, context).execute() @@ -109,7 +109,7 @@ class AttemptCloseStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptCloseStep = AttemptCloseStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptCloseStep.preExecute(logger, context).execute() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCreateRollupJobStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCreateRollupJobStepTests.kt index ac78f550a..f41eca7c3 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCreateRollupJobStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCreateRollupJobStepTests.kt @@ -19,7 +19,7 @@ class AttemptCreateRollupJobStepTests : OpenSearchTestCase() { private val indexName = "test" private val rollupId: String = rollupAction.ismRollup.toRollup(indexName).id private val metadata = ManagedIndexMetaData( - indexName, "indexUuid", "policy_id", null, null, null, null, null, null, + indexName, "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(AttemptCreateRollupJobStep.name, 1, 0, false, 0, null, ActionProperties(rollupId = rollupId)), null, null, null ) private val step = AttemptCreateRollupJobStep(rollupAction) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptDeleteStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptDeleteStepTests.kt index 1dff88450..7b6963dba 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptDeleteStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptDeleteStepTests.kt @@ -37,7 +37,7 @@ class AttemptDeleteStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(acknowledgedResponse, null))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptDeleteStep = AttemptDeleteStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptDeleteStep.preExecute(logger, context).execute() @@ -51,7 +51,7 @@ class AttemptDeleteStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(acknowledgedResponse, null))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptDeleteStep = AttemptDeleteStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptDeleteStep.preExecute(logger, context).execute() @@ -65,7 +65,7 @@ class AttemptDeleteStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptDeleteStep = AttemptDeleteStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptDeleteStep.preExecute(logger, context).execute() @@ -80,7 +80,7 @@ class AttemptDeleteStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptDeleteStep = AttemptDeleteStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptDeleteStep.preExecute(logger, context).execute() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptOpenStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptOpenStepTests.kt index ddfedcf34..2ed8669e2 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptOpenStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptOpenStepTests.kt @@ -37,7 +37,7 @@ class AttemptOpenStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(openIndexResponse, null))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptOpenStep = AttemptOpenStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptOpenStep.preExecute(logger, context).execute() @@ -51,7 +51,7 @@ class AttemptOpenStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptOpenStep = AttemptOpenStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptOpenStep.preExecute(logger, context).execute() @@ -65,7 +65,7 @@ class AttemptOpenStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptOpenStep = AttemptOpenStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptOpenStep.preExecute(logger, context).execute() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetIndexPriorityStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetIndexPriorityStepTests.kt index 0781f2aae..796baf8ab 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetIndexPriorityStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetIndexPriorityStepTests.kt @@ -39,7 +39,7 @@ class AttemptSetIndexPriorityStepTests : OpenSearchTestCase() { runBlocking { val indexPriorityAction = IndexPriorityAction(50, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptSetPriorityStep = AttemptSetIndexPriorityStep(indexPriorityAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptSetPriorityStep.preExecute(logger, context).execute() @@ -54,7 +54,7 @@ class AttemptSetIndexPriorityStepTests : OpenSearchTestCase() { runBlocking { val indexPriorityAction = IndexPriorityAction(50, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptSetPriorityStep = AttemptSetIndexPriorityStep(indexPriorityAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptSetPriorityStep.preExecute(logger, context).execute() @@ -69,7 +69,7 @@ class AttemptSetIndexPriorityStepTests : OpenSearchTestCase() { runBlocking { val indexPriorityAction = IndexPriorityAction(50, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptSetPriorityStep = AttemptSetIndexPriorityStep(indexPriorityAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptSetPriorityStep.preExecute(logger, context).execute() @@ -85,7 +85,7 @@ class AttemptSetIndexPriorityStepTests : OpenSearchTestCase() { runBlocking { val indexPriorityAction = IndexPriorityAction(50, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptSetPriorityStep = AttemptSetIndexPriorityStep(indexPriorityAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptSetPriorityStep.preExecute(logger, context).execute() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetReplicaCountStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetReplicaCountStepTests.kt index 6de0c4052..c5e589dd9 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetReplicaCountStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetReplicaCountStepTests.kt @@ -39,7 +39,7 @@ class AttemptSetReplicaCountStepTests : OpenSearchTestCase() { runBlocking { val replicaCountAction = ReplicaCountAction(2, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val replicaCountStep = AttemptReplicaCountStep(replicaCountAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) replicaCountStep.preExecute(logger, context).execute() @@ -54,7 +54,7 @@ class AttemptSetReplicaCountStepTests : OpenSearchTestCase() { runBlocking { val replicaCountAction = ReplicaCountAction(2, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val replicaCountStep = AttemptReplicaCountStep(replicaCountAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) replicaCountStep.preExecute(logger, context).execute() @@ -69,7 +69,7 @@ class AttemptSetReplicaCountStepTests : OpenSearchTestCase() { runBlocking { val replicaCountAction = ReplicaCountAction(2, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val replicaCountStep = AttemptReplicaCountStep(replicaCountAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) replicaCountStep.preExecute(logger, context).execute() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSnapshotStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSnapshotStepTests.kt index e0eadd331..bea3c41ae 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSnapshotStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSnapshotStepTests.kt @@ -43,7 +43,7 @@ class AttemptSnapshotStepTests : OpenSearchTestCase() { private val scriptService: ScriptService = mock() private val settings: Settings = Settings.EMPTY private val snapshotAction = randomSnapshotActionConfig("repo", "snapshot-name") - private val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, ActionMetaData(AttemptSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) + private val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(AttemptSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) @Before fun settings() { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptTransitionStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptTransitionStepTests.kt index 065366721..e30ce2e33 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptTransitionStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptTransitionStepTests.kt @@ -11,6 +11,7 @@ import com.nhaarman.mockitokotlin2.doReturn import com.nhaarman.mockitokotlin2.mock import com.nhaarman.mockitokotlin2.whenever import kotlinx.coroutines.runBlocking +import org.junit.Before import org.opensearch.action.ActionListener import org.opensearch.action.admin.indices.rollover.RolloverInfo import org.opensearch.action.admin.indices.stats.CommonStats @@ -23,11 +24,14 @@ import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.metadata.Metadata import org.opensearch.cluster.service.ClusterService import org.opensearch.common.collect.ImmutableOpenMap +import org.opensearch.common.settings.ClusterSettings import org.opensearch.common.settings.Settings import org.opensearch.index.shard.DocsStats +import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider import org.opensearch.indexmanagement.indexstatemanagement.action.TransitionsAction import org.opensearch.indexmanagement.indexstatemanagement.model.Conditions import org.opensearch.indexmanagement.indexstatemanagement.model.Transition +import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings import org.opensearch.indexmanagement.indexstatemanagement.step.transition.AttemptTransitionStep import org.opensearch.indexmanagement.spi.indexstatemanagement.Step import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData @@ -41,11 +45,17 @@ import java.time.Instant class AttemptTransitionStepTests : OpenSearchTestCase() { + private val indexName: String = "test" + private val indexUUID: String = "indexUuid" @Suppress("UNCHECKED_CAST") private val indexMetadata: IndexMetadata = mock { on { rolloverInfos } doReturn ImmutableOpenMap.builder().build() + on { indexUUID } doReturn indexUUID + } + private val metadata: Metadata = mock { + on { index(any()) } doReturn indexMetadata + on { hasIndex(indexName) } doReturn true } - private val metadata: Metadata = mock { on { index(any()) } doReturn indexMetadata } private val clusterState: ClusterState = mock { on { metadata() } doReturn metadata } private val clusterService: ClusterService = mock { on { state() } doReturn clusterState } private val scriptService: ScriptService = mock() @@ -55,6 +65,11 @@ class AttemptTransitionStepTests : OpenSearchTestCase() { private val primaries: CommonStats = mock { on { getDocs() } doReturn docsStats } private val statsResponse: IndicesStatsResponse = mock { on { primaries } doReturn primaries } + @Before + fun `setup settings`() { + whenever(clusterService.clusterSettings).doReturn(ClusterSettings(Settings.EMPTY, setOf(ManagedIndexSettings.RESTRICTED_INDEX_PATTERN))) + } + fun `test stats response not OK`() { whenever(indexMetadata.creationDate).doReturn(5L) whenever(statsResponse.status).doReturn(RestStatus.INTERNAL_SERVER_ERROR) @@ -62,16 +77,17 @@ class AttemptTransitionStepTests : OpenSearchTestCase() { whenever(docsStats.count).doReturn(6L) whenever(docsStats.totalSizeInBytes).doReturn(2) val client = getClient(getAdminClient(getIndicesAdminClient(statsResponse, null))) + val indexMetadataProvider = IndexMetadataProvider(settings, client, clusterService, mutableMapOf()) runBlocking { - val managedIndexMetadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) - val transitionsAction = TransitionsAction(listOf(Transition("some_state", Conditions(docCount = 5L)))) + val managedIndexMetadata = ManagedIndexMetaData(indexName, indexUUID, "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val transitionsAction = TransitionsAction(listOf(Transition("some_state", Conditions(docCount = 5L))), indexMetadataProvider) val attemptTransitionStep = AttemptTransitionStep(transitionsAction) val context = StepContext(managedIndexMetadata, clusterService, client, null, null, scriptService, settings) attemptTransitionStep.preExecute(logger, context).execute() val updatedManagedIndexMetaData = attemptTransitionStep.getUpdatedManagedIndexMetadata(managedIndexMetadata) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) - assertEquals("Did not get correct failed message", AttemptTransitionStep.getFailedStatsMessage("test"), updatedManagedIndexMetaData.info!!["message"]) + assertEquals("Did not get correct failed message", AttemptTransitionStep.getFailedStatsMessage(indexName), updatedManagedIndexMetaData.info!!["message"]) } } @@ -79,10 +95,11 @@ class AttemptTransitionStepTests : OpenSearchTestCase() { whenever(indexMetadata.creationDate).doReturn(5L) val exception = IllegalArgumentException("example") val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + val indexMetadataProvider = IndexMetadataProvider(settings, client, clusterService, mutableMapOf()) runBlocking { - val managedIndexMetadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) - val transitionsAction = TransitionsAction(listOf(Transition("some_state", Conditions(docCount = 5L)))) + val managedIndexMetadata = ManagedIndexMetaData(indexName, indexUUID, "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val transitionsAction = TransitionsAction(listOf(Transition("some_state", Conditions(docCount = 5L))), indexMetadataProvider) val attemptTransitionStep = AttemptTransitionStep(transitionsAction) val context = StepContext(managedIndexMetadata, clusterService, client, null, null, scriptService, settings) attemptTransitionStep.preExecute(logger, context).execute() @@ -96,10 +113,11 @@ class AttemptTransitionStepTests : OpenSearchTestCase() { whenever(indexMetadata.creationDate).doReturn(5L) val exception = RemoteTransportException("rte", IllegalArgumentException("nested")) val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + val indexMetadataProvider = IndexMetadataProvider(settings, client, clusterService, mutableMapOf()) runBlocking { - val managedIndexMetadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) - val transitionsAction = TransitionsAction(listOf(Transition("some_state", Conditions(docCount = 5L)))) + val managedIndexMetadata = ManagedIndexMetaData(indexName, indexUUID, "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val transitionsAction = TransitionsAction(listOf(Transition("some_state", Conditions(docCount = 5L))), indexMetadataProvider) val attemptTransitionStep = AttemptTransitionStep(transitionsAction) val context = StepContext(managedIndexMetadata, clusterService, client, null, null, scriptService, settings) attemptTransitionStep.preExecute(logger, context).execute() @@ -110,10 +128,11 @@ class AttemptTransitionStepTests : OpenSearchTestCase() { } fun `test step start time resetting between two transitions`() { + val indexMetadataProvider = IndexMetadataProvider(settings, mock(), clusterService, mutableMapOf()) runBlocking { val completedStartTime = Instant.now() - val managedIndexMetadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, StepMetaData("attempt_transition", completedStartTime.toEpochMilli(), Step.StepStatus.COMPLETED), null, null) - val transitionsAction = TransitionsAction(listOf(Transition("some_state", null))) + val managedIndexMetadata = ManagedIndexMetaData(indexName, indexUUID, "policy_id", null, null, null, null, null, null, null, null, StepMetaData("attempt_transition", completedStartTime.toEpochMilli(), Step.StepStatus.COMPLETED), null, null) + val transitionsAction = TransitionsAction(listOf(Transition("some_state", null)), indexMetadataProvider) val attemptTransitionStep = AttemptTransitionStep(transitionsAction) Thread.sleep(50) // Make sure we give enough time for the instants to be different val startTime = attemptTransitionStep.getStepStartTime(managedIndexMetadata) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadOnlyStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadOnlyStepTests.kt index b4bb35dd5..cfcc73142 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadOnlyStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadOnlyStepTests.kt @@ -37,7 +37,7 @@ class SetReadOnlyStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(setReadOnlyResponse, null))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val setReadOnlyStep = SetReadOnlyStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) setReadOnlyStep.preExecute(logger, context).execute() @@ -51,7 +51,7 @@ class SetReadOnlyStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val setReadOnlyStep = SetReadOnlyStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) setReadOnlyStep.preExecute(logger, context).execute() @@ -65,7 +65,7 @@ class SetReadOnlyStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val setReadOnlyStep = SetReadOnlyStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) setReadOnlyStep.preExecute(logger, context).execute() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadWriteStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadWriteStepTests.kt index bcd2ca38e..c332a0c90 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadWriteStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadWriteStepTests.kt @@ -37,7 +37,7 @@ class SetReadWriteStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(setReadWriteResponse, null))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val setReadWriteStep = SetReadWriteStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) setReadWriteStep.preExecute(logger, context).execute() @@ -51,7 +51,7 @@ class SetReadWriteStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val setReadWriteStep = SetReadWriteStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) setReadWriteStep.preExecute(logger, context).execute() @@ -65,7 +65,7 @@ class SetReadWriteStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val setReadWriteStep = SetReadWriteStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) setReadWriteStep.preExecute(logger, context).execute() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForRollupCompletionStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForRollupCompletionStepTests.kt index 597d35fd7..83fd61145 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForRollupCompletionStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForRollupCompletionStepTests.kt @@ -30,7 +30,7 @@ class WaitForRollupCompletionStepTests : OpenSearchTestCase() { private val rollupId: String = "dummy-id" private val indexName: String = "test" private val metadata = ManagedIndexMetaData( - indexName, "indexUuid", "policy_id", null, null, null, null, null, null, + indexName, "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData (WaitForRollupCompletionStep.name, 1, 0, false, 0, null, ActionProperties(rollupId = rollupId)), null, null, null diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForSnapshotStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForSnapshotStepTests.kt index b1b8e0ef7..8af94805f 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForSnapshotStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForSnapshotStepTests.kt @@ -46,7 +46,7 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() { runBlocking { val emptyActionProperties = ActionProperties() val snapshotAction = SnapshotAction("repo", snapshot, 0) - val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, emptyActionProperties), null, null, null) + val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, emptyActionProperties), null, null, null) val step = WaitForSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings) step.preExecute(logger, context).execute() @@ -58,7 +58,7 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() { runBlocking { val nullActionProperties = null val snapshotAction = SnapshotAction("repo", snapshot, 0) - val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, nullActionProperties), null, null, null) + val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, nullActionProperties), null, null, null) val step = WaitForSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings) step.preExecute(logger, context).execute() @@ -78,7 +78,7 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() { whenever(snapshotStatus.state).doReturn(SnapshotsInProgress.State.INIT) runBlocking { val snapshotAction = SnapshotAction("repo", snapshot, 0) - val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) + val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) val step = WaitForSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings) step.preExecute(logger, context).execute() @@ -90,7 +90,7 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() { whenever(snapshotStatus.state).doReturn(SnapshotsInProgress.State.STARTED) runBlocking { val snapshotAction = SnapshotAction("repo", snapshot, 0) - val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) + val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) val step = WaitForSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings) step.preExecute(logger, context).execute() @@ -102,7 +102,7 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() { whenever(snapshotStatus.state).doReturn(SnapshotsInProgress.State.SUCCESS) runBlocking { val snapshotAction = SnapshotAction("repo", snapshot, 0) - val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) + val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) val step = WaitForSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings) step.preExecute(logger, context).execute() @@ -114,7 +114,7 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() { whenever(snapshotStatus.state).doReturn(SnapshotsInProgress.State.ABORTED) runBlocking { val snapshotAction = SnapshotAction("repo", snapshot, 0) - val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) + val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) val step = WaitForSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings) step.preExecute(logger, context).execute() @@ -126,7 +126,7 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() { whenever(snapshotStatus.state).doReturn(SnapshotsInProgress.State.FAILED) runBlocking { val snapshotAction = SnapshotAction("repo", snapshot, 0) - val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) + val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) val step = WaitForSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings) step.preExecute(logger, context).execute() @@ -145,7 +145,7 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() { runBlocking { val snapshotAction = SnapshotAction("repo", snapshot, 0) - val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) + val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) val step = WaitForSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings) step.preExecute(logger, context).execute() @@ -160,7 +160,7 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getClusterAdminClient(null, exception))) runBlocking { val snapshotAction = SnapshotAction("repo", snapshot, 0) - val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) + val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) val step = WaitForSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings) step.preExecute(logger, context).execute() @@ -175,7 +175,7 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getClusterAdminClient(null, exception))) runBlocking { val snapshotAction = SnapshotAction("repo", snapshot, 0) - val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) + val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) val step = WaitForSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings) step.preExecute(logger, context).execute() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponseTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponseTests.kt index 47f017288..4df22f608 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponseTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponseTests.kt @@ -23,6 +23,7 @@ class ExplainResponseTests : OpenSearchTestCase() { policyPrimaryTerm = randomNonNegativeLong(), policyCompleted = null, rolledOver = null, + indexCreationDate = null, transitionTo = randomAlphaOfLength(10), stateMetaData = null, actionMetaData = null, diff --git a/src/test/resources/mappings/cached-opendistro-ism-config.json b/src/test/resources/mappings/cached-opendistro-ism-config.json index 5c93e39c5..efb14714b 100644 --- a/src/test/resources/mappings/cached-opendistro-ism-config.json +++ b/src/test/resources/mappings/cached-opendistro-ism-config.json @@ -662,6 +662,10 @@ "rolled_over": { "type": "boolean" }, + "index_creation_date": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, "transition_to": { "type": "text", "fields": { diff --git a/src/test/resources/mappings/cached-opendistro-ism-history.json b/src/test/resources/mappings/cached-opendistro-ism-history.json index 04d3c46d8..44c7ab896 100644 --- a/src/test/resources/mappings/cached-opendistro-ism-history.json +++ b/src/test/resources/mappings/cached-opendistro-ism-history.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 3 + "schema_version": 4 }, "dynamic": "strict", "properties": { @@ -37,6 +37,10 @@ "rolled_over": { "type": "boolean" }, + "index_creation_date": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, "transition_to": { "type": "keyword" },