From 7c254e17f5c09ff7d7830d3b43186b59d6b37b14 Mon Sep 17 00:00:00 2001 From: Robert Downs Date: Tue, 1 Feb 2022 00:10:08 +0000 Subject: [PATCH] Adds index creation date Signed-off-by: Robert Downs Adds NewClusterEventListeners and refactors Transition to work with custom actions Signed-off-by: Robert Downs Marks blocked actions list as deprecated Signed-off-by: Clay Downs <89109232+downsrob@users.noreply.github.com> Asserts the deprecation warning after adding to allow list in test Signed-off-by: Robert Downs Removes allow_list test Signed-off-by: Robert Downs --- build.gradle | 1 - .../indexstatemanagement/Action.kt | 37 +++++ .../NewClusterEventHandler.kt | 0 .../indexstatemanagement/model/Decision.kt | 9 -- .../model/ManagedIndexMetaData.kt | 13 ++ .../IndexMetadataProvider.kt | 4 + .../ManagedIndexRunner.kt | 126 +++++++++++++++--- .../NewClusterEventListener.kt | 35 +++++ .../action/TransitionsAction.kt | 2 + .../indexstatemanagement/model/Policy.kt | 13 +- .../indexstatemanagement/model/State.kt | 10 +- .../opensearchapi/OpenSearchExtensions.kt | 19 ++- .../resthandler/RestIndexPolicyAction.kt | 5 +- .../settings/ManagedIndexSettings.kt | 10 ++ .../step/transition/AttemptTransitionStep.kt | 79 ++++++++--- .../util/ManagedIndexUtils.kt | 74 +++++++--- .../mappings/opendistro-ism-config.json | 4 + .../mappings/opendistro-ism-history.json | 6 +- .../IndexManagementSettingsTests.kt | 1 + .../IndexStateManagementIntegTestCase.kt | 1 + .../action/IndexStateManagementHistoryIT.kt | 6 + .../model/ManagedIndexMetaDataTests.kt | 5 + .../model/XContentTests.kt | 1 + .../runner/ManagedIndexRunnerIT.kt | 41 ++++-- .../runner/ManagedIndexRunnerTests.kt | 1 + .../step/AttemptCloseStepTests.kt | 12 +- .../step/AttemptCreateRollupJobStepTests.kt | 2 +- .../step/AttemptDeleteStepTests.kt | 8 +- .../step/AttemptOpenStepTests.kt | 6 +- .../step/AttemptSetIndexPriorityStepTests.kt | 8 +- .../step/AttemptSetReplicaCountStepTests.kt | 6 +- .../step/AttemptSnapshotStepTests.kt | 2 +- .../step/AttemptTransitionStepTests.kt | 31 +++-- .../step/SetReadOnlyStepTests.kt | 6 +- .../step/SetReadWriteStepTests.kt | 6 +- .../step/WaitForRollupCompletionStepTests.kt | 2 +- .../step/WaitForSnapshotStepTests.kt | 20 +-- .../action/explain/ExplainResponseTests.kt | 36 ++++- .../cached-opendistro-ism-config.json | 4 + .../cached-opendistro-ism-history.json | 6 +- 40 files changed, 511 insertions(+), 147 deletions(-) create mode 100644 spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/NewClusterEventHandler.kt delete mode 100644 spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/Decision.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/NewClusterEventListener.kt diff --git a/build.gradle b/build.gradle index 06157b378..48447284c 100644 --- a/build.gradle +++ b/build.gradle @@ -308,7 +308,6 @@ integTest { } // TODO: Remove me after refactoring all actions - exclude 'org/opensearch/indexmanagement/indexstatemanagement/runner/*' exclude 'org/opensearch/indexmanagement/indexstatemanagement/resthandler/*' } diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Action.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Action.kt index 58780b6ca..761d8b6e9 100644 --- a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Action.kt +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Action.kt @@ -10,9 +10,12 @@ import org.opensearch.common.io.stream.Writeable import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.ToXContentObject import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionTimeout +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import java.time.Instant abstract class Action( val type: String, @@ -50,6 +53,21 @@ abstract class Action( populateAction(out) } + fun getUpdatedActionMetadata(managedIndexMetaData: ManagedIndexMetaData, stateName: String): ActionMetaData { + val stateMetaData = managedIndexMetaData.stateMetaData + val actionMetaData = managedIndexMetaData.actionMetaData + + return when { + // start a new action + stateMetaData?.name != stateName -> + ActionMetaData(this.type, Instant.now().toEpochMilli(), this.actionIndex, false, 0, 0, null) + actionMetaData?.index != this.actionIndex -> + ActionMetaData(this.type, Instant.now().toEpochMilli(), this.actionIndex, false, 0, 0, null) + // RetryAPI will reset startTime to null for actionMetaData and we'll reset it to "now" here + else -> actionMetaData.copy(startTime = actionMetaData.startTime ?: Instant.now().toEpochMilli()) + } + } + /** * The implementer of Action can change this method to correctly serialize the internals of the action * when data is shared between nodes @@ -72,6 +90,25 @@ abstract class Action( final fun isFirstStep(stepName: String): Boolean = getSteps().first().name == stepName + /* + * Gets if the managedIndexMetaData reflects a state in which this action has completed successfully + */ + final fun isFinishedSuccessfully(managedIndexMetaData: ManagedIndexMetaData): Boolean { + val policyRetryInfo = managedIndexMetaData.policyRetryInfo + if (policyRetryInfo == null || policyRetryInfo.failed) return false + val actionMetaData = managedIndexMetaData.actionMetaData + if (actionMetaData == null || actionMetaData.failed || actionMetaData.name != this.type) return false + val stepMetaData = managedIndexMetaData.stepMetaData + if (stepMetaData == null || !isLastStep(stepMetaData.name) || stepMetaData.stepStatus != Step.StepStatus.COMPLETED) return false + return true + } + + /* + * Denotes if the index metadata in the config index should be deleted for the index this action has just + * successfully finished running on. + */ + open fun deleteIndexMetadataAfterFinish(): Boolean = false + companion object { const val DEFAULT_RETRIES = 3L const val CUSTOM_ACTION_FIELD = "custom" diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/NewClusterEventHandler.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/NewClusterEventHandler.kt new file mode 100644 index 000000000..e69de29bb diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/Decision.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/Decision.kt deleted file mode 100644 index 6fcaa98ad..000000000 --- a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/Decision.kt +++ /dev/null @@ -1,9 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.spi.indexstatemanagement.model - -// TODO: Probably need an override or priority to clear clashes if there are multiple decisions with conflicting index metadata -data class Decision(val shouldProcess: Boolean = true) diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ManagedIndexMetaData.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ManagedIndexMetaData.kt index 6883be3d9..3832fa29c 100644 --- a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ManagedIndexMetaData.kt +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ManagedIndexMetaData.kt @@ -31,6 +31,7 @@ data class ManagedIndexMetaData( val policyPrimaryTerm: Long?, val policyCompleted: Boolean?, val rolledOver: Boolean?, + val indexCreationDate: Long?, val transitionTo: String?, val stateMetaData: StateMetaData?, val actionMetaData: ActionMetaData?, @@ -58,6 +59,7 @@ data class ManagedIndexMetaData( if (policyPrimaryTerm != null) resultMap[POLICY_PRIMARY_TERM] = policyPrimaryTerm.toString() if (policyCompleted != null) resultMap[POLICY_COMPLETED] = policyCompleted.toString() if (rolledOver != null) resultMap[ROLLED_OVER] = rolledOver.toString() + if (indexCreationDate != null) resultMap[INDEX_CREATION_DATE] = indexCreationDate.toString() if (transitionTo != null) resultMap[TRANSITION_TO] = transitionTo if (stateMetaData != null) resultMap[StateMetaData.STATE] = stateMetaData.getMapValueString() if (actionMetaData != null) resultMap[ActionMetaData.ACTION] = actionMetaData.getMapValueString() @@ -82,6 +84,7 @@ data class ManagedIndexMetaData( .field(POLICY_PRIMARY_TERM, policyPrimaryTerm) .field(POLICY_COMPLETED, policyCompleted) .field(ROLLED_OVER, rolledOver) + .field(INDEX_CREATION_DATE, indexCreationDate) .field(TRANSITION_TO, transitionTo) .addObject(StateMetaData.STATE, stateMetaData, params, true) .addObject(ActionMetaData.ACTION, actionMetaData, params, true) @@ -117,6 +120,8 @@ data class ManagedIndexMetaData( builder.field(ROLLED_OVER, rolledOver) } + if (indexCreationDate != null) builder.field(INDEX_CREATION_DATE, indexCreationDate) + if (policyCompleted == true) { builder.field(POLICY_COMPLETED, policyCompleted) return builder @@ -145,6 +150,7 @@ data class ManagedIndexMetaData( streamOutput.writeOptionalLong(policyPrimaryTerm) streamOutput.writeOptionalBoolean(policyCompleted) streamOutput.writeOptionalBoolean(rolledOver) + streamOutput.writeOptionalLong(indexCreationDate) streamOutput.writeOptionalString(transitionTo) streamOutput.writeOptionalWriteable(stateMetaData) @@ -174,6 +180,7 @@ data class ManagedIndexMetaData( const val POLICY_PRIMARY_TERM = "policy_primary_term" const val POLICY_COMPLETED = "policy_completed" const val ROLLED_OVER = "rolled_over" + const val INDEX_CREATION_DATE = "index_creation_date" const val TRANSITION_TO = "transition_to" const val INFO = "info" const val ENABLED = "enabled" @@ -186,6 +193,7 @@ data class ManagedIndexMetaData( val policyPrimaryTerm: Long? = si.readOptionalLong() val policyCompleted: Boolean? = si.readOptionalBoolean() val rolledOver: Boolean? = si.readOptionalBoolean() + val indexCreationDate: Long? = si.readOptionalLong() val transitionTo: String? = si.readOptionalString() val state: StateMetaData? = si.readOptionalWriteable { StateMetaData.fromStreamInput(it) } @@ -207,6 +215,7 @@ data class ManagedIndexMetaData( policyPrimaryTerm = policyPrimaryTerm, policyCompleted = policyCompleted, rolledOver = rolledOver, + indexCreationDate = indexCreationDate, transitionTo = transitionTo, stateMetaData = state, actionMetaData = action, @@ -233,6 +242,7 @@ data class ManagedIndexMetaData( var policyPrimaryTerm: Long? = null var policyCompleted: Boolean? = null var rolledOver: Boolean? = null + var indexCreationDate: Long? = null var transitionTo: String? = null var state: StateMetaData? = null @@ -255,6 +265,7 @@ data class ManagedIndexMetaData( POLICY_PRIMARY_TERM -> policyPrimaryTerm = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.longValue() POLICY_COMPLETED -> policyCompleted = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.booleanValue() ROLLED_OVER -> rolledOver = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.booleanValue() + INDEX_CREATION_DATE -> indexCreationDate = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.longValue() TRANSITION_TO -> transitionTo = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.text() StateMetaData.STATE -> { state = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else StateMetaData.parse(xcp) @@ -282,6 +293,7 @@ data class ManagedIndexMetaData( policyPrimaryTerm, policyCompleted, rolledOver, + indexCreationDate, transitionTo, state, action, @@ -320,6 +332,7 @@ data class ManagedIndexMetaData( policyPrimaryTerm = map[POLICY_PRIMARY_TERM]?.toLong(), policyCompleted = map[POLICY_COMPLETED]?.toBoolean(), rolledOver = map[ROLLED_OVER]?.toBoolean(), + indexCreationDate = map[INDEX_CREATION_DATE]?.toLong(), transitionTo = map[TRANSITION_TO], stateMetaData = StateMetaData.fromManagedIndexMetaDataMap(map), actionMetaData = ActionMetaData.fromManagedIndexMetaDataMap(map), diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexMetadataProvider.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexMetadataProvider.kt index c7ecc358b..32639c303 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexMetadataProvider.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexMetadataProvider.kt @@ -45,6 +45,10 @@ class IndexMetadataProvider( return service.getMetadata(indexNames, client, clusterService) } + /* + * Attempts to get the index metadata for of all indexNames for each of the index types designated in the types parameter. + * Returns a map of > + */ suspend fun getMultiTypeISMIndexMetadata( types: List = services.keys.toList(), indexNames: List diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index 1e4e70868..90cef4db3 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -16,6 +16,8 @@ import org.apache.logging.log4j.LogManager import org.opensearch.action.admin.cluster.state.ClusterStateRequest import org.opensearch.action.admin.cluster.state.ClusterStateResponse import org.opensearch.action.bulk.BackoffPolicy +import org.opensearch.action.bulk.BulkRequest +import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse import org.opensearch.action.index.IndexResponse @@ -51,9 +53,13 @@ import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndex import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.DEFAULT_JOB_INTERVAL import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.INDEX_STATE_MANAGEMENT_ENABLED import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.JOB_INTERVAL +import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE +import org.opensearch.indexmanagement.indexstatemanagement.util.MetadataCheck +import org.opensearch.indexmanagement.indexstatemanagement.util.checkMetadata +import org.opensearch.indexmanagement.indexstatemanagement.util.deleteManagedIndexMetadataRequest +import org.opensearch.indexmanagement.indexstatemanagement.util.deleteManagedIndexRequest import org.opensearch.indexmanagement.indexstatemanagement.util.getCompletedManagedIndexMetaData import org.opensearch.indexmanagement.indexstatemanagement.util.getStartingManagedIndexMetaData -import org.opensearch.indexmanagement.indexstatemanagement.util.getUpdatedActionMetaData import org.opensearch.indexmanagement.indexstatemanagement.util.hasDifferentJobInterval import org.opensearch.indexmanagement.indexstatemanagement.util.hasTimedOut import org.opensearch.indexmanagement.indexstatemanagement.util.hasVersionConflict @@ -94,7 +100,7 @@ import org.opensearch.threadpool.ThreadPool import java.time.Instant import java.time.temporal.ChronoUnit -@Suppress("TooManyFunctions") +@Suppress("TooManyFunctions", "LargeClass") object ManagedIndexRunner : ScheduledJobRunner, CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ManagedIndexRunner")) { @@ -111,6 +117,7 @@ object ManagedIndexRunner : private lateinit var skipExecFlag: SkipExecution private lateinit var threadPool: ThreadPool private lateinit var extensionStatusChecker: ExtensionStatusChecker + private lateinit var indexMetadataProvider: IndexMetadataProvider private var indexStateManagementEnabled: Boolean = DEFAULT_ISM_ENABLED @Suppress("MagicNumber") private val savePolicyRetryPolicy = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(250), 3) @@ -188,6 +195,11 @@ object ManagedIndexRunner : return this } + fun registerIndexMetadataProvider(indexMetadataProvider: IndexMetadataProvider): ManagedIndexRunner { + this.indexMetadataProvider = indexMetadataProvider + return this + } + fun registerExtensionChecker(extensionStatusChecker: ExtensionStatusChecker): ManagedIndexRunner { this.extensionStatusChecker = extensionStatusChecker return this @@ -219,7 +231,7 @@ object ManagedIndexRunner : } } - @Suppress("ReturnCount", "ComplexMethod", "LongMethod", "ComplexCondition") + @Suppress("ReturnCount", "ComplexMethod", "LongMethod", "ComplexCondition", "NestedBlockDepth") private suspend fun runManagedIndexConfig(managedIndexConfig: ManagedIndexConfig) { logger.debug("Run job for index ${managedIndexConfig.index}") // doing a check of local cluster health as we do not want to overload master node with potentially a lot of calls @@ -228,18 +240,34 @@ object ManagedIndexRunner : return } - // Get current IndexMetaData and ManagedIndexMetaData - val indexMetaData = getIndexMetadata(managedIndexConfig.index) - if (indexMetaData == null) { - logger.warn("Failed to retrieve IndexMetadata.") + val (managedIndexMetaData, getMetadataSuccess) = client.getManagedIndexMetadata(managedIndexConfig.indexUuid) + if (!getMetadataSuccess) { + logger.info("Failed to retrieve managed index metadata of index [${managedIndexConfig.index}] from config index, abort this run.") return } - val managedIndexMetaData = indexMetaData.getManagedIndexMetadata(client) - val clusterStateMetadata = indexMetaData.getManagedIndexMetadata() - if (!isMetadataMoved(clusterStateMetadata, managedIndexMetaData, logger)) { - logger.info("Skipping execution while pending migration of metadata for ${managedIndexConfig.jobName}") - return + var indexMetadata = getIndexMetadata(managedIndexConfig.index) + // The index uuids not matching can happen when two jobs share a name but have different index types + if (indexMetadata == null || indexMetadata.indexUUID != managedIndexConfig.indexUuid) { + indexMetadata = null + // If the cluster state/default index type didn't have an index with a matching name and uuid combination, try all other index types + val nonDefaultIndexTypes = indexMetadataProvider.services.keys.filter { it != DEFAULT_INDEX_TYPE } + val multiTypeIndexNameToMetaData = indexMetadataProvider.getMultiTypeIndexMetadata(nonDefaultIndexTypes, listOf(managedIndexConfig.index)) + val someTypeMatchedUuid = multiTypeIndexNameToMetaData.values.any { + it[managedIndexConfig.index]?.indexUuid == managedIndexConfig.indexUuid + } + // If no index types had an index with a matching name and uuid combination, return + if (!someTypeMatchedUuid) { + logger.warn("Failed to find IndexMetadata for ${managedIndexConfig.index}.") + return + } + } else { + val clusterStateMetadata = indexMetadata.getManagedIndexMetadata() + val metadataCheck = checkMetadata(clusterStateMetadata, managedIndexMetaData, managedIndexConfig.indexUuid, logger) + if (metadataCheck != MetadataCheck.SUCCESS) { + logger.info("Skipping execution while metadata status is $metadataCheck") + return + } } // If policy or managedIndexMetaData is null then initialize @@ -271,10 +299,13 @@ object ManagedIndexRunner : } val state = policy.getStateToExecute(managedIndexMetaData) - val action: Action? = state?.getActionToExecute(managedIndexMetaData.copy(user = policy.user, threadContext = threadPool.threadContext)) + val action: Action? = state?.getActionToExecute( + managedIndexMetaData.copy(user = policy.user, threadContext = threadPool.threadContext), + indexMetadataProvider + ) val stepContext = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) val step: Step? = action?.getStepToExecute(stepContext) - val currentActionMetaData = action?.getUpdatedActionMetaData(managedIndexMetaData, state) + val currentActionMetaData = action?.getUpdatedActionMetadata(managedIndexMetaData, state.name) // If Index State Management is disabled and the current step is not null and safe to disable on // then disable the job and return early @@ -383,6 +414,14 @@ object ManagedIndexRunner : return } + if (action.isFinishedSuccessfully(executedManagedIndexMetaData)) { + // Custom actions added by extensions may require a manual deletion of the managed index metadata after a successful execution + if (action.deleteIndexMetadataAfterFinish()) { + deleteFromManagedIndex(managedIndexConfig, action.type) + return + } + } + if (!updateManagedIndexMetaData(executedManagedIndexMetaData, updateResult).metadataSaved) { logger.error("Failed to update ManagedIndexMetaData after executing the Step : ${step.name}") } @@ -418,7 +457,7 @@ object ManagedIndexRunner : getInitializedManagedIndexMetaData(managedIndexMetaData, managedIndexConfig, policy) } - updateManagedIndexMetaData(updatedManagedIndexMetaData) + updateManagedIndexMetaData(updatedManagedIndexMetaData, create = managedIndexMetaData == null) } @Suppress("ReturnCount", "BlockingMethodInNonBlockingContext") @@ -497,7 +536,7 @@ object ManagedIndexRunner : } } - private fun getFailedInitializedManagedIndexMetaData( + private suspend fun getFailedInitializedManagedIndexMetaData( managedIndexMetaData: ManagedIndexMetaData?, managedIndexConfig: ManagedIndexConfig, policyID: String @@ -514,6 +553,7 @@ object ManagedIndexRunner : policyPrimaryTerm = null, policyCompleted = false, rolledOver = false, + indexCreationDate = getIndexCreationDate(managedIndexConfig), transitionTo = null, stateMetaData = null, actionMetaData = null, @@ -541,6 +581,7 @@ object ManagedIndexRunner : policyPrimaryTerm = policy.primaryTerm, policyCompleted = false, rolledOver = false, + indexCreationDate = getIndexCreationDate(managedIndexConfig), transitionTo = null, stateMetaData = stateMetaData, actionMetaData = null, @@ -589,7 +630,8 @@ object ManagedIndexRunner : */ private suspend fun updateManagedIndexMetaData( managedIndexMetaData: ManagedIndexMetaData, - lastUpdateResult: UpdateMetadataResult? = null + lastUpdateResult: UpdateMetadataResult? = null, + create: Boolean = false ): UpdateMetadataResult { var result = UpdateMetadataResult() if (!imIndices.attemptUpdateConfigIndexMapping()) { @@ -602,7 +644,7 @@ object ManagedIndexRunner : metadata = managedIndexMetaData.copy(seqNo = lastUpdateResult.seqNo, primaryTerm = lastUpdateResult.primaryTerm) } - val indexRequest = managedIndexMetadataIndexRequest(metadata) + val indexRequest = managedIndexMetadataIndexRequest(metadata, create = create) try { updateMetaDataRetryPolicy.retry(logger) { val indexResponse: IndexResponse = client.suspendUntil { index(indexRequest, it) } @@ -764,9 +806,6 @@ object ManagedIndexRunner : val response: ClusterStateResponse = client.admin().cluster().suspendUntil { state(clusterStateRequest, it) } indexMetaData = response.state.metadata.indices.firstOrNull()?.value - if (indexMetaData == null) { - logger.error("Could not find IndexMetaData in master cluster state for $index") - } } catch (e: Exception) { logger.error("Failed to get IndexMetaData from master cluster state for index=$index", e) } @@ -795,4 +834,49 @@ object ManagedIndexRunner : Instant.ofEpochMilli(requireNotNull(startTime)) } } + + /** + * Get the index creation date for the first time to cache it on the ManagedIndexMetadata + */ + @Suppress("ReturnCount") + private suspend fun getIndexCreationDate(managedIndexConfig: ManagedIndexConfig): Long? { + try { + val multiTypeIndexNameToMetaData = indexMetadataProvider.getMultiTypeIndexMetadata(indexNames = listOf(managedIndexConfig.index)) + // the managedIndexConfig.indexUuid should be unique across all index types + val indexCreationDate = multiTypeIndexNameToMetaData.values.firstOrNull { + it[managedIndexConfig.index]?.indexUuid == managedIndexConfig.indexUuid + }?.get(managedIndexConfig.index)?.indexCreationDate + return indexCreationDate + } catch (e: Exception) { + logger.error("Failed to get the index creation date", e) + } + return null + } + + /** + * Deletes a managedIndexConfig and its managedIndexMetadata. Used after an action has successfully completely + * and the action has deleteIndexMetadataAfterFinish set to true. This should only be set to true for custom + * actions added by extensions, as when indices in the cluster are deleted,the deletion should get picked up + * and the ism config metadata for the index will be removed, but if the index is not in the cluster, as it + * might be for some other index type, we have to do this manually + */ + private suspend fun deleteFromManagedIndex(managedIndexConfig: ManagedIndexConfig, actionType: String) { + try { + val bulkRequest = BulkRequest() + .add(deleteManagedIndexRequest(managedIndexConfig.indexUuid)) + .add(deleteManagedIndexMetadataRequest(managedIndexConfig.indexUuid)) + + val bulkResponse: BulkResponse = client.suspendUntil { bulk(bulkRequest, it) } + for (bulkItemResponse in bulkResponse) { + if (bulkItemResponse.isFailed) { + logger.warn( + "Failed to delete managed index job/metadata [id=${bulkItemResponse.id}] for ${managedIndexConfig.index}" + + " after a successful $actionType [result=${bulkItemResponse.failureMessage}]" + ) + } + } + } catch (e: Exception) { + logger.warn("Failed to delete managed index job for for ${managedIndexConfig.index} after a successful $actionType", e) + } + } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/NewClusterEventListener.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/NewClusterEventListener.kt new file mode 100644 index 000000000..ac00386e6 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/NewClusterEventListener.kt @@ -0,0 +1,35 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement + +import org.opensearch.client.Client +import org.opensearch.cluster.ClusterChangedEvent +import org.opensearch.cluster.ClusterStateListener +import org.opensearch.cluster.service.ClusterService +import org.opensearch.indexmanagement.spi.indexstatemanagement.NewClusterEventHandler + +/** + * Notifies the NewClusterEventHandlers from all extensions whenever a ClusterChangedEvent of the `isNewCluster` type occurs, + * enabling extensions to react to `new cluster` typed events. + */ +class NewClusterEventListener( + val client: Client, + val clusterService: ClusterService, + private val newClusterEventHandlers: List +) : ClusterStateListener { + + init { + clusterService.addListener(this) + } + + override fun clusterChanged(event: ClusterChangedEvent) { + if (event.isNewCluster) { + newClusterEventHandlers.forEach { eventHandler -> + eventHandler.processEvent(client, clusterService, event) + } + } + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransitionsAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransitionsAction.kt index c78b1af14..60f3a7929 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransitionsAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransitionsAction.kt @@ -5,6 +5,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.action +import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider import org.opensearch.indexmanagement.indexstatemanagement.model.Transition import org.opensearch.indexmanagement.indexstatemanagement.step.transition.AttemptTransitionStep import org.opensearch.indexmanagement.spi.indexstatemanagement.Action @@ -13,6 +14,7 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext class TransitionsAction( val transitions: List, + val indexMetadataProvider: IndexMetadataProvider ) : Action(name, -1) { private val attemptTransitionStep = AttemptTransitionStep(this) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/Policy.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/Policy.kt index 432fa8e0d..d4b25f4ce 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/Policy.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/Policy.kt @@ -118,15 +118,16 @@ data class Policy( } /** - * Disallowed actions are ones that are not specified in the [ManagedIndexSettings.ALLOW_LIST] setting. + * Disallowed actions are specified in the [ManagedIndexSettings.BLOCKED_ACTIONS_LIST], or are not specified in the + * deprecated [ManagedIndexSettings.ALLOW_LIST] setting. */ - fun getDisallowedActions(allowList: List): List { - val allowListSet = allowList.toSet() + fun getDisallowedActions(allowList: List, blockedActionsList: List): List { + val allowedActionsSet = allowList.toSet() - blockedActionsList.toSet() val disallowedActions = mutableListOf() this.states.forEach { state -> - state.actions.forEach { actionConfig -> - if (!allowListSet.contains(actionConfig.type)) { - disallowedActions.add(actionConfig.type) + state.actions.forEach { action -> + if (!allowedActionsSet.contains(action.type)) { + disallowedActions.add(action.type) } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt index cb70964e2..f675e6c2c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt @@ -15,6 +15,7 @@ import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentParser.Token import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken import org.opensearch.indexmanagement.indexstatemanagement.ISMActionsParser +import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider import org.opensearch.indexmanagement.indexstatemanagement.action.DeleteAction import org.opensearch.indexmanagement.indexstatemanagement.action.TransitionsAction import org.opensearch.indexmanagement.spi.indexstatemanagement.Action @@ -68,17 +69,18 @@ data class State( } fun getActionToExecute( - managedIndexMetaData: ManagedIndexMetaData + managedIndexMetaData: ManagedIndexMetaData, + indexMetadataProvider: IndexMetadataProvider ): Action? { var actionConfig: Action? val actionMetaData = managedIndexMetaData.actionMetaData // If we are transitioning to this state get the first action in the state // If the action/actionIndex are null it means we just initialized and should get the first action from the state if (managedIndexMetaData.transitionTo != null || actionMetaData == null) { - actionConfig = this.actions.firstOrNull() ?: TransitionsAction(this.transitions) + actionConfig = this.actions.firstOrNull() ?: TransitionsAction(this.transitions, indexMetadataProvider) } else if (actionMetaData.name == TransitionsAction.name) { // If the current action is transition and we do not have a transitionTo set then we should be in Transition - actionConfig = TransitionsAction(this.transitions) + actionConfig = TransitionsAction(this.transitions, indexMetadataProvider) } else { // Get the current actionConfig that is in the ManagedIndexMetaData actionConfig = this.actions.filterIndexed { index, config -> @@ -92,7 +94,7 @@ data class State( if (stepMetaData != null && stepMetaData.stepStatus == Step.StepStatus.COMPLETED) { val action = actionConfig if (action.isLastStep(stepMetaData.name)) { - actionConfig = this.actions.getOrNull(actionMetaData.index + 1) ?: TransitionsAction(this.transitions) + actionConfig = this.actions.getOrNull(actionMetaData.index + 1) ?: TransitionsAction(this.transitions, indexMetadataProvider) } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt index 77ed24c97..d049d9848 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt @@ -82,18 +82,22 @@ fun getUuidsForClosedIndices(state: ClusterState): MutableList { fun Map.filterNotNullValues(): Map = filterValues { it != null } as Map -// get metadata from config index using doc id +/** + * Get metadata from config index + * + * @return metadata object and get call successful or not + */ @Suppress("ReturnCount") -suspend fun IndexMetadata.getManagedIndexMetadata(client: Client): ManagedIndexMetaData? { +suspend fun Client.getManagedIndexMetadata(indexUUID: String): Pair { try { val getRequest = GetRequest(INDEX_MANAGEMENT_INDEX, managedIndexMetadataID(indexUUID)) - .routing(this.indexUUID) - val getResponse: GetResponse = client.suspendUntil { get(getRequest, it) } + .routing(indexUUID) + val getResponse: GetResponse = this.suspendUntil { get(getRequest, it) } if (!getResponse.isExists || getResponse.isSourceEmpty) { - return null + return Pair(null, true) } - return withContext(Dispatchers.IO) { + val metadata = withContext(Dispatchers.IO) { val xcp = XContentHelper.createParser( NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, @@ -101,6 +105,7 @@ suspend fun IndexMetadata.getManagedIndexMetadata(client: Client): ManagedIndexM ) ManagedIndexMetaData.parseWithType(xcp, getResponse.id, getResponse.seqNo, getResponse.primaryTerm) } + return Pair(metadata, true) } catch (e: Exception) { when (e) { is IndexNotFoundException, is NoShardAvailableActionException -> { @@ -109,7 +114,7 @@ suspend fun IndexMetadata.getManagedIndexMetadata(client: Client): ManagedIndexM else -> log.error("Failed to get metadata", e) } - return null + return Pair(null, false) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestIndexPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestIndexPolicyAction.kt index dd98093a0..d4957eda9 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestIndexPolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestIndexPolicyAction.kt @@ -15,6 +15,7 @@ import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.LEGACY_POL import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.POLICY_BASE_URI import org.opensearch.indexmanagement.indexstatemanagement.model.Policy import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.ALLOW_LIST +import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.BLOCKED_ACTIONS_LIST import org.opensearch.indexmanagement.indexstatemanagement.transport.action.indexpolicy.IndexPolicyAction import org.opensearch.indexmanagement.indexstatemanagement.transport.action.indexpolicy.IndexPolicyRequest import org.opensearch.indexmanagement.indexstatemanagement.transport.action.indexpolicy.IndexPolicyResponse @@ -41,9 +42,11 @@ class RestIndexPolicyAction( ) : BaseRestHandler() { @Volatile private var allowList = ALLOW_LIST.get(settings) + @Volatile private var blockedActionsList = BLOCKED_ACTIONS_LIST.get(settings) init { clusterService.clusterSettings.addSettingsUpdateConsumer(ALLOW_LIST) { allowList = it } + clusterService.clusterSettings.addSettingsUpdateConsumer(BLOCKED_ACTIONS_LIST) { blockedActionsList = it } } override fun routes(): List { @@ -85,7 +88,7 @@ class RestIndexPolicyAction( WriteRequest.RefreshPolicy.IMMEDIATE } - val disallowedActions = policy.getDisallowedActions(allowList) + val disallowedActions = policy.getDisallowedActions(allowList, blockedActionsList) if (disallowedActions.isNotEmpty()) { return RestChannelConsumer { channel -> channel.sendResponse( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt index 359c52da8..219c07ff1 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt @@ -18,6 +18,7 @@ class ManagedIndexSettings { const val DEFAULT_JOB_INTERVAL = 5 const val DEFAULT_JITTER = 0.6 const val DEFAULT_RESTRICTED_PATTERN = "\\.opendistro_security|\\.kibana.*|\\$INDEX_MANAGEMENT_INDEX" + val DEFAULT_BLOCKED_ACTIONS = emptyList() val ALLOW_LIST_NONE = emptyList() val SNAPSHOT_DENY_LIST_NONE = emptyList() const val HOST_DENY_LIST = "opendistro.destination.host.deny_list" @@ -172,6 +173,15 @@ class ManagedIndexSettings { LegacyOpenDistroManagedIndexSettings.ALLOW_LIST, Function.identity(), Setting.Property.NodeScope, + Setting.Property.Dynamic, + Setting.Property.Deprecated + ) + + val BLOCKED_ACTIONS_LIST: Setting> = Setting.listSetting( + "plugins.index_state_management.blocked_actions_list", + DEFAULT_BLOCKED_ACTIONS, + Function.identity(), + Setting.Property.NodeScope, Setting.Property.Dynamic ) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transition/AttemptTransitionStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transition/AttemptTransitionStep.kt index 3c11acf43..5e6218c37 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transition/AttemptTransitionStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transition/AttemptTransitionStep.kt @@ -9,9 +9,12 @@ import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.action.admin.indices.stats.IndicesStatsRequest import org.opensearch.action.admin.indices.stats.IndicesStatsResponse +import org.opensearch.cluster.service.ClusterService import org.opensearch.common.unit.ByteSizeValue +import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider import org.opensearch.indexmanagement.indexstatemanagement.action.TransitionsAction import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getOldestRolloverTime +import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE import org.opensearch.indexmanagement.indexstatemanagement.util.evaluateConditions import org.opensearch.indexmanagement.indexstatemanagement.util.hasStatsConditions import org.opensearch.indexmanagement.opensearchapi.getUsefulCauseString @@ -31,12 +34,13 @@ class AttemptTransitionStep(private val action: TransitionsAction) : Step(name) private var policyCompleted: Boolean = false private var info: Map? = null - @Suppress("ReturnCount", "ComplexMethod", "LongMethod") + @Suppress("ReturnCount", "ComplexMethod", "LongMethod", "NestedBlockDepth") override suspend fun execute(): Step { val context = this.context ?: return this val indexName = context.metadata.index val clusterService = context.clusterService val transitions = action.transitions + val indexMetadataProvider = action.indexMetadataProvider try { if (transitions.isEmpty()) { logger.info("$indexName transitions are empty, completing policy") @@ -45,8 +49,10 @@ class AttemptTransitionStep(private val action: TransitionsAction) : Step(name) return this } - val indexMetaData = clusterService.state().metadata().index(indexName) - val indexCreationDate = indexMetaData.creationDate + val indexMetadata = clusterService.state().metadata().index(indexName) + val inCluster = clusterService.state().metadata().hasIndex(indexName) && indexMetadata?.indexUUID == context.metadata.indexUuid + + val indexCreationDate = getIndexCreationDate(context.metadata, indexMetadataProvider, clusterService, indexName, inCluster) val indexCreationDateInstant = Instant.ofEpochMilli(indexCreationDate) if (indexCreationDate == -1L) { logger.warn("$indexName had an indexCreationDate=-1L, cannot use for comparison") @@ -54,8 +60,8 @@ class AttemptTransitionStep(private val action: TransitionsAction) : Step(name) val stepStartTime = getStepStartTime(context.metadata) var numDocs: Long? = null var indexSize: ByteSizeValue? = null - val rolloverDate: Instant? = indexMetaData.getOldestRolloverTime() + val rolloverDate: Instant? = if (inCluster) indexMetadata.getOldestRolloverTime() else null if (transitions.any { it.conditions?.rolloverAge !== null }) { // if we have a transition with rollover age condition, then we must have a rollover date // otherwise fail this transition @@ -69,22 +75,27 @@ class AttemptTransitionStep(private val action: TransitionsAction) : Step(name) } if (transitions.any { it.hasStatsConditions() }) { - val statsRequest = IndicesStatsRequest() - .indices(indexName).clear().docs(true) - val statsResponse: IndicesStatsResponse = context.client.admin().indices().suspendUntil { stats(statsRequest, it) } + if (inCluster) { + val statsRequest = IndicesStatsRequest() + .indices(indexName).clear().docs(true) + val statsResponse: IndicesStatsResponse = + context.client.admin().indices().suspendUntil { stats(statsRequest, it) } - if (statsResponse.status != RestStatus.OK) { - val message = getFailedStatsMessage(indexName) - logger.warn("$message - ${statsResponse.status}") - stepStatus = StepStatus.FAILED - info = mapOf( - "message" to message, - "shard_failures" to statsResponse.shardFailures.map { it.getUsefulCauseString() } - ) - return this + if (statsResponse.status != RestStatus.OK) { + val message = getFailedStatsMessage(indexName) + logger.warn("$message - ${statsResponse.status}") + stepStatus = StepStatus.FAILED + info = mapOf( + "message" to message, + "shard_failures" to statsResponse.shardFailures.map { it.getUsefulCauseString() } + ) + return this + } + numDocs = statsResponse.primaries.getDocs()?.count ?: 0 + indexSize = ByteSizeValue(statsResponse.primaries.getDocs()?.totalSizeInBytes ?: 0) + } else { + logger.warn("Cannot use index size/doc count transition conditions for index [$indexName] that does not exist in cluster") } - numDocs = statsResponse.primaries.getDocs()?.count ?: 0 - indexSize = ByteSizeValue(statsResponse.primaries.getDocs()?.totalSizeInBytes ?: 0) } // Find the first transition that evaluates to true and get the state to transition to, otherwise return null if none are true @@ -133,6 +144,38 @@ class AttemptTransitionStep(private val action: TransitionsAction) : Step(name) ) } + @Suppress("ReturnCount") + private suspend fun getIndexCreationDate( + metadata: ManagedIndexMetaData, + indexMetadataProvider: IndexMetadataProvider, + clusterService: ClusterService, + indexName: String, + inCluster: Boolean + ): Long { + try { + // If we do have an index creation date cached already then use that + metadata.indexCreationDate?.let { return it } + // Otherwise, check if this index is in cluster state first + return if (inCluster) { + val clusterStateMetadata = clusterService.state().metadata() + clusterStateMetadata.index(indexName).creationDate + } else { + // And then finally check all other index types which may not be in the cluster + val nonDefaultIndexTypes = indexMetadataProvider.services.keys.filter { it != DEFAULT_INDEX_TYPE } + val multiTypeIndexNameToMetaData = indexMetadataProvider.getMultiTypeIndexMetadata(nonDefaultIndexTypes, listOf(indexName)) + // the managedIndexConfig.indexUuid should be unique across all index types + val indexCreationDate = multiTypeIndexNameToMetaData.values.firstOrNull { + it[indexName]?.indexUuid == metadata.indexUuid + }?.get(indexName)?.indexCreationDate + indexCreationDate ?: -1 + } + } catch (e: Exception) { + logger.error("Failed to get index creation date for $indexName", e) + } + // -1L index age is ignored during condition checks + return -1L + } + override fun isIdempotent() = true companion object { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt index 251689566..6bfcd6fde 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt @@ -281,21 +281,6 @@ fun State.getUpdatedStateMetaData(managedIndexMetaData: ManagedIndexMetaData): S } } -fun Action.getUpdatedActionMetaData(managedIndexMetaData: ManagedIndexMetaData, state: State): ActionMetaData { - val stateMetaData = managedIndexMetaData.stateMetaData - val actionMetaData = managedIndexMetaData.actionMetaData - - return when { - // start a new action - stateMetaData?.name != state.name -> - ActionMetaData(this.type, Instant.now().toEpochMilli(), this.actionIndex, false, 0, 0, null) - actionMetaData?.index != this.actionIndex -> - ActionMetaData(this.type, Instant.now().toEpochMilli(), this.actionIndex, false, 0, 0, null) - // RetryAPI will reset startTime to null for actionMetaData and we'll reset it to "now" here - else -> actionMetaData.copy(startTime = actionMetaData.startTime ?: Instant.now().toEpochMilli()) - } -} - fun Action.shouldBackoff(actionMetaData: ActionMetaData?, actionRetry: ActionRetry?): Pair? { return this.configRetry?.backoff?.shouldBackoff(actionMetaData, actionRetry) } @@ -332,7 +317,7 @@ fun ManagedIndexMetaData.getStartingManagedIndexMetaData( } val updatedStateMetaData = state.getUpdatedStateMetaData(this) - val updatedActionMetaData = action.getUpdatedActionMetaData(this, state) + val updatedActionMetaData = action.getUpdatedActionMetadata(this, state.name) val updatedStepMetaData = step.getStartingStepMetaData(this) return this.copy( @@ -483,9 +468,12 @@ fun Policy.isSafeToChange(stateName: String?, newPolicy: Policy, changePolicy: C } /** - * Allowed actions are ones that are specified in the [ManagedIndexSettings.ALLOW_LIST] setting. + * Allowed actions are specified in the [ManagedIndexSettings.ALLOW_LIST] setting, and are not specified in the + * [ManagedIndexSettings.BLOCKED_ACTIONS_LIST] setting. */ -fun Action.isAllowed(allowList: List): Boolean = allowList.contains(this.type) +fun Action.isAllowed(blockedActionsList: List, allowList: List): Boolean { + return allowList.contains(this.type) && !blockedActionsList.contains(this.type) +} /** * Check if cluster state metadata has been moved to config index @@ -517,6 +505,56 @@ fun isMetadataMoved( return true } +/** + * Check if cluster state metadata has been moved to config index + * + * log warning if remaining cluster state metadata has newer last_updated_time + */ +@Suppress("ReturnCount", "ComplexCondition", "ComplexMethod") +fun checkMetadata( + clusterStateMetadata: ManagedIndexMetaData?, + configIndexMetadata: Any?, + currentIndexUuid: String?, + logger: Logger +): MetadataCheck { + // indexUuid saved in ISM metadata may be outdated + // if an index restored from snapshot + val indexUuid1 = clusterStateMetadata?.indexUuid + val indexUuid2 = when (configIndexMetadata) { + is ManagedIndexMetaData -> configIndexMetadata.indexUuid + is Map<*, *> -> configIndexMetadata["index_uuid"] + else -> null + } as String? + if ((indexUuid1 != null && indexUuid1 != currentIndexUuid) || + (indexUuid2 != null && indexUuid2 != currentIndexUuid) + ) { + return MetadataCheck.CORRUPT + } + + if (clusterStateMetadata != null) { + if (configIndexMetadata == null) return MetadataCheck.PENDING + + // compare last updated time between 2 metadatas + val t1 = clusterStateMetadata.stepMetaData?.startTime + val t2 = when (configIndexMetadata) { + is ManagedIndexMetaData -> configIndexMetadata.stepMetaData?.startTime + is Map<*, *> -> { + val stepMetadata = configIndexMetadata["step"] as Map? + stepMetadata?.get("start_time") + } + else -> null + } as Long? + if (t1 != null && t2 != null && t1 > t2) { + logger.warn("Cluster state metadata get updates after moved for [${clusterStateMetadata.index}]") + } + } + return MetadataCheck.SUCCESS +} + +enum class MetadataCheck { + PENDING, CORRUPT, SUCCESS +} + private val baseMessageLogger = LogManager.getLogger(BaseMessage::class.java) fun BaseMessage.isHostInDenylist(networks: List): Boolean { diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json index 5c93e39c5..efb14714b 100644 --- a/src/main/resources/mappings/opendistro-ism-config.json +++ b/src/main/resources/mappings/opendistro-ism-config.json @@ -662,6 +662,10 @@ "rolled_over": { "type": "boolean" }, + "index_creation_date": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, "transition_to": { "type": "text", "fields": { diff --git a/src/main/resources/mappings/opendistro-ism-history.json b/src/main/resources/mappings/opendistro-ism-history.json index 04d3c46d8..44c7ab896 100644 --- a/src/main/resources/mappings/opendistro-ism-history.json +++ b/src/main/resources/mappings/opendistro-ism-history.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 3 + "schema_version": 4 }, "dynamic": "strict", "properties": { @@ -37,6 +37,10 @@ "rolled_over": { "type": "boolean" }, + "index_creation_date": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, "transition_to": { "type": "keyword" }, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementSettingsTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementSettingsTests.kt index 0176c90a2..e9ed17b9a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementSettingsTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementSettingsTests.kt @@ -81,6 +81,7 @@ class IndexManagementSettingsTests : OpenSearchTestCase() { ManagedIndexSettings.COORDINATOR_BACKOFF_COUNT, ManagedIndexSettings.COORDINATOR_BACKOFF_MILLIS, ManagedIndexSettings.ALLOW_LIST, + ManagedIndexSettings.BLOCKED_ACTIONS_LIST, ManagedIndexSettings.SNAPSHOT_DENY_LIST, ManagedIndexSettings.JITTER, RollupSettings.ROLLUP_INGEST_BACKOFF_COUNT, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementIntegTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementIntegTestCase.kt index 0829a00b0..df3e87d5d 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementIntegTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementIntegTestCase.kt @@ -71,6 +71,7 @@ abstract class IndexStateManagementIntegTestCase : OpenSearchIntegTestCase() { policyPrimaryTerm = 1, policyCompleted = false, rolledOver = false, + indexCreationDate = null, transitionTo = null, stateMetaData = StateMetaData("ReplicaCountState", 1234), actionMetaData = null, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/IndexStateManagementHistoryIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/IndexStateManagementHistoryIT.kt index 149176e44..1f80bb245 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/IndexStateManagementHistoryIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/IndexStateManagementHistoryIT.kt @@ -76,6 +76,7 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { policyPrimaryTerm = actualHistory.policyPrimaryTerm, policyCompleted = null, rolledOver = null, + indexCreationDate = actualHistory.indexCreationDate, transitionTo = null, stateMetaData = StateMetaData("ReadOnlyState", actualHistory.stateMetaData!!.startTime), actionMetaData = ActionMetaData(ReadOnlyAction.name, actualHistory.actionMetaData!!.startTime, 0, false, 0, 0, null), @@ -142,6 +143,7 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { policyPrimaryTerm = actualHistory.policyPrimaryTerm, policyCompleted = null, rolledOver = null, + indexCreationDate = actualHistory.indexCreationDate, transitionTo = null, stateMetaData = StateMetaData("ReadOnlyState", actualHistory.stateMetaData!!.startTime), actionMetaData = ActionMetaData(ReadOnlyAction.name, actualHistory.actionMetaData!!.startTime, 0, false, 0, 0, null), @@ -208,6 +210,7 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { policyPrimaryTerm = actualHistory.policyPrimaryTerm, policyCompleted = null, rolledOver = null, + indexCreationDate = actualHistory.indexCreationDate, transitionTo = null, stateMetaData = StateMetaData("ReadOnlyState", actualHistory.stateMetaData!!.startTime), actionMetaData = ActionMetaData(ReadOnlyAction.name, actualHistory.actionMetaData!!.startTime, 0, false, 0, 0, null), @@ -268,6 +271,7 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { policyPrimaryTerm = actualHistory.policyPrimaryTerm, policyCompleted = null, rolledOver = null, + indexCreationDate = actualHistory.indexCreationDate, transitionTo = null, stateMetaData = StateMetaData("ReadOnlyState", actualHistory.stateMetaData!!.startTime), actionMetaData = null, @@ -298,6 +302,7 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { policyPrimaryTerm = actualHistory1.policyPrimaryTerm, policyCompleted = null, rolledOver = null, + indexCreationDate = actualHistory1.indexCreationDate, transitionTo = null, stateMetaData = StateMetaData(states[0].name, actualHistory1.stateMetaData!!.startTime), actionMetaData = ActionMetaData(ReadOnlyAction.name, actualHistory1.actionMetaData!!.startTime, 0, false, 0, 0, null), @@ -366,6 +371,7 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { policyPrimaryTerm = actualHistory.policyPrimaryTerm, policyCompleted = null, rolledOver = null, + indexCreationDate = actualHistory.indexCreationDate, transitionTo = null, stateMetaData = StateMetaData(name = states[0].name, startTime = actualHistory.stateMetaData!!.startTime), actionMetaData = null, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexMetaDataTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexMetaDataTests.kt index da55bf768..e16f91319 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexMetaDataTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexMetaDataTests.kt @@ -17,6 +17,7 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaDat import org.opensearch.test.OpenSearchTestCase import java.io.ByteArrayInputStream import java.io.ByteArrayOutputStream +import java.time.Instant class ManagedIndexMetaDataTests : OpenSearchTestCase() { @@ -29,6 +30,7 @@ class ManagedIndexMetaDataTests : OpenSearchTestCase() { policyPrimaryTerm = 1, policyCompleted = null, rolledOver = null, + indexCreationDate = Instant.now().toEpochMilli(), transitionTo = null, stateMetaData = StateMetaData("close-index", 1234), actionMetaData = null, @@ -49,6 +51,7 @@ class ManagedIndexMetaDataTests : OpenSearchTestCase() { policyPrimaryTerm = 1, policyCompleted = null, rolledOver = null, + indexCreationDate = null, transitionTo = null, stateMetaData = StateMetaData("close-index", 1234), actionMetaData = ActionMetaData("close", 4321, 0, false, 0, 0, null), @@ -69,6 +72,7 @@ class ManagedIndexMetaDataTests : OpenSearchTestCase() { policyPrimaryTerm = 1, policyCompleted = null, rolledOver = null, + indexCreationDate = null, transitionTo = null, stateMetaData = StateMetaData("close-index", 1234), actionMetaData = ActionMetaData("close", 4321, 0, false, 0, 0, ActionProperties(3)), @@ -89,6 +93,7 @@ class ManagedIndexMetaDataTests : OpenSearchTestCase() { policyPrimaryTerm = 1, policyCompleted = null, rolledOver = false, + indexCreationDate = null, transitionTo = null, stateMetaData = StateMetaData("rollover-index", 1234), actionMetaData = ActionMetaData("rollover", 4321, 0, false, 0, 0, null), diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/XContentTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/XContentTests.kt index e255ec23c..7397d4d40 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/XContentTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/XContentTests.kt @@ -232,6 +232,7 @@ class XContentTests : OpenSearchTestCase() { policyPrimaryTerm = randomNonNegativeLong(), policyCompleted = null, rolledOver = null, + indexCreationDate = null, transitionTo = randomAlphaOfLength(10), stateMetaData = null, actionMetaData = null, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/runner/ManagedIndexRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/runner/ManagedIndexRunnerIT.kt index 370342476..d227c4926 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/runner/ManagedIndexRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/runner/ManagedIndexRunnerIT.kt @@ -6,10 +6,30 @@ package org.opensearch.indexmanagement.indexstatemanagement.runner import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase +import org.opensearch.indexmanagement.indexstatemanagement.action.OpenAction +import org.opensearch.indexmanagement.indexstatemanagement.action.ReadOnlyAction +import org.opensearch.indexmanagement.indexstatemanagement.model.Policy +import org.opensearch.indexmanagement.indexstatemanagement.model.State +import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification +import org.opensearch.indexmanagement.indexstatemanagement.randomPolicy +import org.opensearch.indexmanagement.indexstatemanagement.randomReadOnlyActionConfig +import org.opensearch.indexmanagement.indexstatemanagement.randomReadWriteActionConfig +import org.opensearch.indexmanagement.indexstatemanagement.randomState +import org.opensearch.indexmanagement.indexstatemanagement.randomTransition +import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings +import org.opensearch.indexmanagement.indexstatemanagement.step.readonly.SetReadOnlyStep +import org.opensearch.indexmanagement.indexstatemanagement.step.readwrite.SetReadWriteStep +import org.opensearch.indexmanagement.indexstatemanagement.step.transition.AttemptTransitionStep +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.PolicyRetryInfoMetaData +import org.opensearch.indexmanagement.waitFor +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule +import java.time.Instant +import java.time.temporal.ChronoUnit class ManagedIndexRunnerIT : IndexStateManagementRestTestCase() { - /*fun `test version conflict fails job`() { + fun `test version conflict fails job`() { val indexName = "version_conflict_index" val policyID = "version_conflict_policy" val actionConfig = OpenAction(0) @@ -72,6 +92,12 @@ class ManagedIndexRunnerIT : IndexStateManagementRestTestCase() { // init policy updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { assertEquals(createdPolicy.id, getManagedIndexConfigByDocId(managedIndexConfig.id)?.policyID) } + // change cluster job interval setting to 2 (minutes) + updateClusterSetting(ManagedIndexSettings.JOB_INTERVAL.key, "2") + // fast forward to next execution where at the end we should change the job interval time + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { (getManagedIndexConfigByDocId(managedIndexConfig.id)?.jobSchedule as? IntervalSchedule)?.interval == 2 } waitFor { assertEquals(createdPolicy.id, getManagedIndexConfigByDocId(managedIndexConfig.id)?.policyID) val currInterval = (getManagedIndexConfigByDocId(managedIndexConfig.id)?.jobSchedule as? IntervalSchedule)?.interval @@ -103,8 +129,8 @@ class ManagedIndexRunnerIT : IndexStateManagementRestTestCase() { } } - fun `test allow list fails execution`() { - val indexName = "allow_list_index" + fun `test blocked action fails execution`() { + val indexName = "blocked_action_index" val firstState = randomState( name = "first_state", actions = listOf(randomReadOnlyActionConfig()), @@ -141,11 +167,8 @@ class ManagedIndexRunnerIT : IndexStateManagementRestTestCase() { updateManagedIndexConfigStartTime(managedIndexConfig) waitFor { assertEquals(AttemptTransitionStep.getSuccessMessage(indexName, firstState.name), getExplainManagedIndexMetaData(indexName).info?.get("message")) } - // remove read_only from the allowlist - val allowedActions = ActionConfig.ActionType.values().toList() - .filter { actionType -> actionType != ActionConfig.ActionType.READ_ONLY } - .joinToString(prefix = "[", postfix = "]") { string -> "\"$string\"" } - updateClusterSetting(ManagedIndexSettings.ALLOW_LIST.key, allowedActions, escapeValue = false) + // block the read_only action + updateClusterSetting(ManagedIndexSettings.BLOCKED_ACTIONS_LIST.key, "[\"${ReadOnlyAction.name}\"]", escapeValue = false) // speed up to fifth execution that should try to set index to read only and fail because the action is not allowed updateManagedIndexConfigStartTime(managedIndexConfig) @@ -188,5 +211,5 @@ class ManagedIndexRunnerIT : IndexStateManagementRestTestCase() { val currJitter = getManagedIndexConfigByDocId(newManagedIndexConfig.id)?.jitter assertEquals("Failed to update ManagedIndexConfig jitter", newJitter, currJitter) } - }*/ + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/runner/ManagedIndexRunnerTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/runner/ManagedIndexRunnerTests.kt index 17a1a4ef5..eaff8a507 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/runner/ManagedIndexRunnerTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/runner/ManagedIndexRunnerTests.kt @@ -62,6 +62,7 @@ class ManagedIndexRunnerTests : OpenSearchTestCase() { settingSet.add(ManagedIndexSettings.JOB_INTERVAL) settingSet.add(ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED) settingSet.add(ManagedIndexSettings.ALLOW_LIST) + settingSet.add(ManagedIndexSettings.BLOCKED_ACTIONS_LIST) val clusterSettings = ClusterSettings(settings, settingSet) val originClusterService: ClusterService = ClusterServiceUtils.createClusterService(threadPool, discoveryNode, clusterSettings) clusterService = Mockito.spy(originClusterService) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCloseStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCloseStepTests.kt index 1acbed656..11e01c6a5 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCloseStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCloseStepTests.kt @@ -39,7 +39,7 @@ class AttemptCloseStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(closeIndexResponse, null))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptCloseStep = AttemptCloseStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptCloseStep.preExecute(logger, context).execute() @@ -53,7 +53,7 @@ class AttemptCloseStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(closeIndexResponse, null))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptCloseStep = AttemptCloseStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptCloseStep.preExecute(logger, context).execute() @@ -67,7 +67,7 @@ class AttemptCloseStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptCloseStep = AttemptCloseStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptCloseStep.preExecute(logger, context).execute() @@ -81,7 +81,7 @@ class AttemptCloseStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptCloseStep = AttemptCloseStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptCloseStep.preExecute(logger, context).execute() @@ -95,7 +95,7 @@ class AttemptCloseStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptCloseStep = AttemptCloseStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptCloseStep.preExecute(logger, context).execute() @@ -109,7 +109,7 @@ class AttemptCloseStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptCloseStep = AttemptCloseStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptCloseStep.preExecute(logger, context).execute() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCreateRollupJobStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCreateRollupJobStepTests.kt index ac78f550a..f41eca7c3 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCreateRollupJobStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCreateRollupJobStepTests.kt @@ -19,7 +19,7 @@ class AttemptCreateRollupJobStepTests : OpenSearchTestCase() { private val indexName = "test" private val rollupId: String = rollupAction.ismRollup.toRollup(indexName).id private val metadata = ManagedIndexMetaData( - indexName, "indexUuid", "policy_id", null, null, null, null, null, null, + indexName, "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(AttemptCreateRollupJobStep.name, 1, 0, false, 0, null, ActionProperties(rollupId = rollupId)), null, null, null ) private val step = AttemptCreateRollupJobStep(rollupAction) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptDeleteStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptDeleteStepTests.kt index 1dff88450..7b6963dba 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptDeleteStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptDeleteStepTests.kt @@ -37,7 +37,7 @@ class AttemptDeleteStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(acknowledgedResponse, null))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptDeleteStep = AttemptDeleteStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptDeleteStep.preExecute(logger, context).execute() @@ -51,7 +51,7 @@ class AttemptDeleteStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(acknowledgedResponse, null))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptDeleteStep = AttemptDeleteStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptDeleteStep.preExecute(logger, context).execute() @@ -65,7 +65,7 @@ class AttemptDeleteStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptDeleteStep = AttemptDeleteStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptDeleteStep.preExecute(logger, context).execute() @@ -80,7 +80,7 @@ class AttemptDeleteStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptDeleteStep = AttemptDeleteStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptDeleteStep.preExecute(logger, context).execute() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptOpenStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptOpenStepTests.kt index ddfedcf34..2ed8669e2 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptOpenStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptOpenStepTests.kt @@ -37,7 +37,7 @@ class AttemptOpenStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(openIndexResponse, null))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptOpenStep = AttemptOpenStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptOpenStep.preExecute(logger, context).execute() @@ -51,7 +51,7 @@ class AttemptOpenStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptOpenStep = AttemptOpenStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptOpenStep.preExecute(logger, context).execute() @@ -65,7 +65,7 @@ class AttemptOpenStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptOpenStep = AttemptOpenStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptOpenStep.preExecute(logger, context).execute() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetIndexPriorityStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetIndexPriorityStepTests.kt index 0781f2aae..796baf8ab 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetIndexPriorityStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetIndexPriorityStepTests.kt @@ -39,7 +39,7 @@ class AttemptSetIndexPriorityStepTests : OpenSearchTestCase() { runBlocking { val indexPriorityAction = IndexPriorityAction(50, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptSetPriorityStep = AttemptSetIndexPriorityStep(indexPriorityAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptSetPriorityStep.preExecute(logger, context).execute() @@ -54,7 +54,7 @@ class AttemptSetIndexPriorityStepTests : OpenSearchTestCase() { runBlocking { val indexPriorityAction = IndexPriorityAction(50, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptSetPriorityStep = AttemptSetIndexPriorityStep(indexPriorityAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptSetPriorityStep.preExecute(logger, context).execute() @@ -69,7 +69,7 @@ class AttemptSetIndexPriorityStepTests : OpenSearchTestCase() { runBlocking { val indexPriorityAction = IndexPriorityAction(50, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptSetPriorityStep = AttemptSetIndexPriorityStep(indexPriorityAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptSetPriorityStep.preExecute(logger, context).execute() @@ -85,7 +85,7 @@ class AttemptSetIndexPriorityStepTests : OpenSearchTestCase() { runBlocking { val indexPriorityAction = IndexPriorityAction(50, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val attemptSetPriorityStep = AttemptSetIndexPriorityStep(indexPriorityAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) attemptSetPriorityStep.preExecute(logger, context).execute() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetReplicaCountStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetReplicaCountStepTests.kt index 6de0c4052..c5e589dd9 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetReplicaCountStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSetReplicaCountStepTests.kt @@ -39,7 +39,7 @@ class AttemptSetReplicaCountStepTests : OpenSearchTestCase() { runBlocking { val replicaCountAction = ReplicaCountAction(2, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val replicaCountStep = AttemptReplicaCountStep(replicaCountAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) replicaCountStep.preExecute(logger, context).execute() @@ -54,7 +54,7 @@ class AttemptSetReplicaCountStepTests : OpenSearchTestCase() { runBlocking { val replicaCountAction = ReplicaCountAction(2, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val replicaCountStep = AttemptReplicaCountStep(replicaCountAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) replicaCountStep.preExecute(logger, context).execute() @@ -69,7 +69,7 @@ class AttemptSetReplicaCountStepTests : OpenSearchTestCase() { runBlocking { val replicaCountAction = ReplicaCountAction(2, 0) - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val replicaCountStep = AttemptReplicaCountStep(replicaCountAction) val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) replicaCountStep.preExecute(logger, context).execute() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSnapshotStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSnapshotStepTests.kt index e0eadd331..bea3c41ae 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSnapshotStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptSnapshotStepTests.kt @@ -43,7 +43,7 @@ class AttemptSnapshotStepTests : OpenSearchTestCase() { private val scriptService: ScriptService = mock() private val settings: Settings = Settings.EMPTY private val snapshotAction = randomSnapshotActionConfig("repo", "snapshot-name") - private val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, ActionMetaData(AttemptSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) + private val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(AttemptSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) @Before fun settings() { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptTransitionStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptTransitionStepTests.kt index 065366721..398489f2f 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptTransitionStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptTransitionStepTests.kt @@ -25,6 +25,7 @@ import org.opensearch.cluster.service.ClusterService import org.opensearch.common.collect.ImmutableOpenMap import org.opensearch.common.settings.Settings import org.opensearch.index.shard.DocsStats +import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider import org.opensearch.indexmanagement.indexstatemanagement.action.TransitionsAction import org.opensearch.indexmanagement.indexstatemanagement.model.Conditions import org.opensearch.indexmanagement.indexstatemanagement.model.Transition @@ -41,11 +42,17 @@ import java.time.Instant class AttemptTransitionStepTests : OpenSearchTestCase() { + private val indexName: String = "test" + private val indexUUID: String = "indexUuid" @Suppress("UNCHECKED_CAST") private val indexMetadata: IndexMetadata = mock { on { rolloverInfos } doReturn ImmutableOpenMap.builder().build() + on { indexUUID } doReturn indexUUID + } + private val metadata: Metadata = mock { + on { index(any()) } doReturn indexMetadata + on { hasIndex(indexName) } doReturn true } - private val metadata: Metadata = mock { on { index(any()) } doReturn indexMetadata } private val clusterState: ClusterState = mock { on { metadata() } doReturn metadata } private val clusterService: ClusterService = mock { on { state() } doReturn clusterState } private val scriptService: ScriptService = mock() @@ -62,16 +69,17 @@ class AttemptTransitionStepTests : OpenSearchTestCase() { whenever(docsStats.count).doReturn(6L) whenever(docsStats.totalSizeInBytes).doReturn(2) val client = getClient(getAdminClient(getIndicesAdminClient(statsResponse, null))) + val indexMetadataProvider = IndexMetadataProvider(client, clusterService, mutableMapOf()) runBlocking { - val managedIndexMetadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) - val transitionsAction = TransitionsAction(listOf(Transition("some_state", Conditions(docCount = 5L)))) + val managedIndexMetadata = ManagedIndexMetaData(indexName, indexUUID, "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val transitionsAction = TransitionsAction(listOf(Transition("some_state", Conditions(docCount = 5L))), indexMetadataProvider) val attemptTransitionStep = AttemptTransitionStep(transitionsAction) val context = StepContext(managedIndexMetadata, clusterService, client, null, null, scriptService, settings) attemptTransitionStep.preExecute(logger, context).execute() val updatedManagedIndexMetaData = attemptTransitionStep.getUpdatedManagedIndexMetadata(managedIndexMetadata) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) - assertEquals("Did not get correct failed message", AttemptTransitionStep.getFailedStatsMessage("test"), updatedManagedIndexMetaData.info!!["message"]) + assertEquals("Did not get correct failed message", AttemptTransitionStep.getFailedStatsMessage(indexName), updatedManagedIndexMetaData.info!!["message"]) } } @@ -79,10 +87,11 @@ class AttemptTransitionStepTests : OpenSearchTestCase() { whenever(indexMetadata.creationDate).doReturn(5L) val exception = IllegalArgumentException("example") val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + val indexMetadataProvider = IndexMetadataProvider(client, clusterService, mutableMapOf()) runBlocking { - val managedIndexMetadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) - val transitionsAction = TransitionsAction(listOf(Transition("some_state", Conditions(docCount = 5L)))) + val managedIndexMetadata = ManagedIndexMetaData(indexName, indexUUID, "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val transitionsAction = TransitionsAction(listOf(Transition("some_state", Conditions(docCount = 5L))), indexMetadataProvider) val attemptTransitionStep = AttemptTransitionStep(transitionsAction) val context = StepContext(managedIndexMetadata, clusterService, client, null, null, scriptService, settings) attemptTransitionStep.preExecute(logger, context).execute() @@ -96,10 +105,11 @@ class AttemptTransitionStepTests : OpenSearchTestCase() { whenever(indexMetadata.creationDate).doReturn(5L) val exception = RemoteTransportException("rte", IllegalArgumentException("nested")) val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + val indexMetadataProvider = IndexMetadataProvider(client, clusterService, mutableMapOf()) runBlocking { - val managedIndexMetadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) - val transitionsAction = TransitionsAction(listOf(Transition("some_state", Conditions(docCount = 5L)))) + val managedIndexMetadata = ManagedIndexMetaData(indexName, indexUUID, "policy_id", null, null, null, null, null, null, null, null, null, null, null) + val transitionsAction = TransitionsAction(listOf(Transition("some_state", Conditions(docCount = 5L))), indexMetadataProvider) val attemptTransitionStep = AttemptTransitionStep(transitionsAction) val context = StepContext(managedIndexMetadata, clusterService, client, null, null, scriptService, settings) attemptTransitionStep.preExecute(logger, context).execute() @@ -110,10 +120,11 @@ class AttemptTransitionStepTests : OpenSearchTestCase() { } fun `test step start time resetting between two transitions`() { + val indexMetadataProvider = IndexMetadataProvider(mock(), clusterService, mutableMapOf()) runBlocking { val completedStartTime = Instant.now() - val managedIndexMetadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, StepMetaData("attempt_transition", completedStartTime.toEpochMilli(), Step.StepStatus.COMPLETED), null, null) - val transitionsAction = TransitionsAction(listOf(Transition("some_state", null))) + val managedIndexMetadata = ManagedIndexMetaData(indexName, indexUUID, "policy_id", null, null, null, null, null, null, null, null, StepMetaData("attempt_transition", completedStartTime.toEpochMilli(), Step.StepStatus.COMPLETED), null, null) + val transitionsAction = TransitionsAction(listOf(Transition("some_state", null)), indexMetadataProvider) val attemptTransitionStep = AttemptTransitionStep(transitionsAction) Thread.sleep(50) // Make sure we give enough time for the instants to be different val startTime = attemptTransitionStep.getStepStartTime(managedIndexMetadata) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadOnlyStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadOnlyStepTests.kt index b4bb35dd5..cfcc73142 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadOnlyStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadOnlyStepTests.kt @@ -37,7 +37,7 @@ class SetReadOnlyStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(setReadOnlyResponse, null))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val setReadOnlyStep = SetReadOnlyStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) setReadOnlyStep.preExecute(logger, context).execute() @@ -51,7 +51,7 @@ class SetReadOnlyStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val setReadOnlyStep = SetReadOnlyStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) setReadOnlyStep.preExecute(logger, context).execute() @@ -65,7 +65,7 @@ class SetReadOnlyStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val setReadOnlyStep = SetReadOnlyStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) setReadOnlyStep.preExecute(logger, context).execute() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadWriteStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadWriteStepTests.kt index bcd2ca38e..c332a0c90 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadWriteStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadWriteStepTests.kt @@ -37,7 +37,7 @@ class SetReadWriteStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(setReadWriteResponse, null))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val setReadWriteStep = SetReadWriteStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) setReadWriteStep.preExecute(logger, context).execute() @@ -51,7 +51,7 @@ class SetReadWriteStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val setReadWriteStep = SetReadWriteStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) setReadWriteStep.preExecute(logger, context).execute() @@ -65,7 +65,7 @@ class SetReadWriteStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null, null) val setReadWriteStep = SetReadWriteStep() val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings) setReadWriteStep.preExecute(logger, context).execute() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForRollupCompletionStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForRollupCompletionStepTests.kt index 597d35fd7..83fd61145 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForRollupCompletionStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForRollupCompletionStepTests.kt @@ -30,7 +30,7 @@ class WaitForRollupCompletionStepTests : OpenSearchTestCase() { private val rollupId: String = "dummy-id" private val indexName: String = "test" private val metadata = ManagedIndexMetaData( - indexName, "indexUuid", "policy_id", null, null, null, null, null, null, + indexName, "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData (WaitForRollupCompletionStep.name, 1, 0, false, 0, null, ActionProperties(rollupId = rollupId)), null, null, null diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForSnapshotStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForSnapshotStepTests.kt index b1b8e0ef7..8af94805f 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForSnapshotStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForSnapshotStepTests.kt @@ -46,7 +46,7 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() { runBlocking { val emptyActionProperties = ActionProperties() val snapshotAction = SnapshotAction("repo", snapshot, 0) - val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, emptyActionProperties), null, null, null) + val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, emptyActionProperties), null, null, null) val step = WaitForSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings) step.preExecute(logger, context).execute() @@ -58,7 +58,7 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() { runBlocking { val nullActionProperties = null val snapshotAction = SnapshotAction("repo", snapshot, 0) - val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, nullActionProperties), null, null, null) + val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, nullActionProperties), null, null, null) val step = WaitForSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings) step.preExecute(logger, context).execute() @@ -78,7 +78,7 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() { whenever(snapshotStatus.state).doReturn(SnapshotsInProgress.State.INIT) runBlocking { val snapshotAction = SnapshotAction("repo", snapshot, 0) - val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) + val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) val step = WaitForSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings) step.preExecute(logger, context).execute() @@ -90,7 +90,7 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() { whenever(snapshotStatus.state).doReturn(SnapshotsInProgress.State.STARTED) runBlocking { val snapshotAction = SnapshotAction("repo", snapshot, 0) - val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) + val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) val step = WaitForSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings) step.preExecute(logger, context).execute() @@ -102,7 +102,7 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() { whenever(snapshotStatus.state).doReturn(SnapshotsInProgress.State.SUCCESS) runBlocking { val snapshotAction = SnapshotAction("repo", snapshot, 0) - val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) + val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) val step = WaitForSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings) step.preExecute(logger, context).execute() @@ -114,7 +114,7 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() { whenever(snapshotStatus.state).doReturn(SnapshotsInProgress.State.ABORTED) runBlocking { val snapshotAction = SnapshotAction("repo", snapshot, 0) - val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) + val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) val step = WaitForSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings) step.preExecute(logger, context).execute() @@ -126,7 +126,7 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() { whenever(snapshotStatus.state).doReturn(SnapshotsInProgress.State.FAILED) runBlocking { val snapshotAction = SnapshotAction("repo", snapshot, 0) - val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) + val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) val step = WaitForSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings) step.preExecute(logger, context).execute() @@ -145,7 +145,7 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() { runBlocking { val snapshotAction = SnapshotAction("repo", snapshot, 0) - val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) + val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) val step = WaitForSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings) step.preExecute(logger, context).execute() @@ -160,7 +160,7 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getClusterAdminClient(null, exception))) runBlocking { val snapshotAction = SnapshotAction("repo", snapshot, 0) - val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) + val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) val step = WaitForSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings) step.preExecute(logger, context).execute() @@ -175,7 +175,7 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getClusterAdminClient(null, exception))) runBlocking { val snapshotAction = SnapshotAction("repo", snapshot, 0) - val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) + val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) val step = WaitForSnapshotStep(snapshotAction) val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings) step.preExecute(logger, context).execute() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponseTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponseTests.kt index 47f017288..03618e9f0 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponseTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponseTests.kt @@ -23,6 +23,38 @@ class ExplainResponseTests : OpenSearchTestCase() { policyPrimaryTerm = randomNonNegativeLong(), policyCompleted = null, rolledOver = null, + indexCreationDate = null, + transitionTo = randomAlphaOfLength(10), + stateMetaData = null, + actionMetaData = null, + stepMetaData = null, + policyRetryInfo = null, + info = null + ) + val indexMetadatas = listOf(metadata) + val res = ExplainResponse(indexNames, indexPolicyIDs, indexMetadatas) + + val out = BytesStreamOutput() + res.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRes = ExplainResponse(sin) + assertEquals(indexNames, newRes.indexNames) + assertEquals(indexPolicyIDs, newRes.indexPolicyIDs) + assertEquals(indexMetadatas, newRes.indexMetadatas) + } + + fun `test explain all response`() { + val indexNames = listOf("index1") + val indexPolicyIDs = listOf("policyID1") + val metadata = ManagedIndexMetaData( + index = "index1", + indexUuid = randomAlphaOfLength(10), + policyID = "policyID1", + policySeqNo = randomNonNegativeLong(), + policyPrimaryTerm = randomNonNegativeLong(), + policyCompleted = null, + rolledOver = null, + indexCreationDate = null, transitionTo = randomAlphaOfLength(10), stateMetaData = null, actionMetaData = null, @@ -33,12 +65,12 @@ class ExplainResponseTests : OpenSearchTestCase() { val indexMetadatas = listOf(metadata) val totalManagedIndices = 1 val enabledState = mapOf("index1" to true) - val res = ExplainResponse(indexNames, indexPolicyIDs, indexMetadatas, totalManagedIndices, enabledState) + val res = ExplainAllResponse(indexNames, indexPolicyIDs, indexMetadatas, totalManagedIndices, enabledState) val out = BytesStreamOutput() res.writeTo(out) val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) - val newRes = ExplainResponse(sin) + val newRes = ExplainAllResponse(sin) assertEquals(indexNames, newRes.indexNames) assertEquals(indexPolicyIDs, newRes.indexPolicyIDs) assertEquals(indexMetadatas, newRes.indexMetadatas) diff --git a/src/test/resources/mappings/cached-opendistro-ism-config.json b/src/test/resources/mappings/cached-opendistro-ism-config.json index 5c93e39c5..efb14714b 100644 --- a/src/test/resources/mappings/cached-opendistro-ism-config.json +++ b/src/test/resources/mappings/cached-opendistro-ism-config.json @@ -662,6 +662,10 @@ "rolled_over": { "type": "boolean" }, + "index_creation_date": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, "transition_to": { "type": "text", "fields": { diff --git a/src/test/resources/mappings/cached-opendistro-ism-history.json b/src/test/resources/mappings/cached-opendistro-ism-history.json index 04d3c46d8..44c7ab896 100644 --- a/src/test/resources/mappings/cached-opendistro-ism-history.json +++ b/src/test/resources/mappings/cached-opendistro-ism-history.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 3 + "schema_version": 4 }, "dynamic": "strict", "properties": { @@ -37,6 +37,10 @@ "rolled_over": { "type": "boolean" }, + "index_creation_date": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, "transition_to": { "type": "keyword" },