diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ManagedIndexMetaData.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ManagedIndexMetaData.kt index a388ac570..4928b97e1 100644 --- a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ManagedIndexMetaData.kt +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ManagedIndexMetaData.kt @@ -38,7 +38,8 @@ data class ManagedIndexMetaData( val info: Map?, val id: String = NO_ID, val seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO, - val primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM + val primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + val rolledOverIndexName: String? = null, ) : Writeable, ToXContentFragment { @Suppress("ComplexMethod") @@ -51,6 +52,7 @@ data class ManagedIndexMetaData( if (policyPrimaryTerm != null) resultMap[POLICY_PRIMARY_TERM] = policyPrimaryTerm.toString() if (policyCompleted != null) resultMap[POLICY_COMPLETED] = policyCompleted.toString() if (rolledOver != null) resultMap[ROLLED_OVER] = rolledOver.toString() + if (rolledOverIndexName != null) resultMap[ROLLED_OVER_INDEX_NAME] = rolledOverIndexName if (indexCreationDate != null) resultMap[INDEX_CREATION_DATE] = indexCreationDate.toString() if (transitionTo != null) resultMap[TRANSITION_TO] = transitionTo if (stateMetaData != null) resultMap[StateMetaData.STATE] = stateMetaData.getMapValueString() @@ -76,6 +78,7 @@ data class ManagedIndexMetaData( .field(POLICY_PRIMARY_TERM, policyPrimaryTerm) .field(POLICY_COMPLETED, policyCompleted) .field(ROLLED_OVER, rolledOver) + .field(ROLLED_OVER_INDEX_NAME, rolledOverIndexName) .field(INDEX_CREATION_DATE, indexCreationDate) .field(TRANSITION_TO, transitionTo) .addObject(StateMetaData.STATE, stateMetaData, params, true) @@ -110,6 +113,7 @@ data class ManagedIndexMetaData( // Only show rolled_over if we have rolled over or we are in the rollover action if (rolledOver == true || (actionMetaData != null && actionMetaData.name == "rollover")) { builder.field(ROLLED_OVER, rolledOver) + .field(ROLLED_OVER_INDEX_NAME, rolledOverIndexName) } if (indexCreationDate != null) builder.field(INDEX_CREATION_DATE, indexCreationDate) @@ -142,6 +146,7 @@ data class ManagedIndexMetaData( streamOutput.writeOptionalLong(policyPrimaryTerm) streamOutput.writeOptionalBoolean(policyCompleted) streamOutput.writeOptionalBoolean(rolledOver) + streamOutput.writeOptionalString(rolledOverIndexName) streamOutput.writeOptionalLong(indexCreationDate) streamOutput.writeOptionalString(transitionTo) @@ -172,6 +177,7 @@ data class ManagedIndexMetaData( const val POLICY_PRIMARY_TERM = "policy_primary_term" const val POLICY_COMPLETED = "policy_completed" const val ROLLED_OVER = "rolled_over" + const val ROLLED_OVER_INDEX_NAME = "rolled_over_index_name" const val INDEX_CREATION_DATE = "index_creation_date" const val TRANSITION_TO = "transition_to" const val INFO = "info" @@ -185,6 +191,7 @@ data class ManagedIndexMetaData( val policyPrimaryTerm: Long? = si.readOptionalLong() val policyCompleted: Boolean? = si.readOptionalBoolean() val rolledOver: Boolean? = si.readOptionalBoolean() + val rolledOverIndexName: String? = si.readOptionalString() val indexCreationDate: Long? = si.readOptionalLong() val transitionTo: String? = si.readOptionalString() @@ -207,6 +214,7 @@ data class ManagedIndexMetaData( policyPrimaryTerm = policyPrimaryTerm, policyCompleted = policyCompleted, rolledOver = rolledOver, + rolledOverIndexName = rolledOverIndexName, indexCreationDate = indexCreationDate, transitionTo = transitionTo, stateMetaData = state, @@ -234,6 +242,7 @@ data class ManagedIndexMetaData( var policyPrimaryTerm: Long? = null var policyCompleted: Boolean? = null var rolledOver: Boolean? = null + var rolledOverIndexName: String? = null var indexCreationDate: Long? = null var transitionTo: String? = null @@ -256,6 +265,7 @@ data class ManagedIndexMetaData( POLICY_PRIMARY_TERM -> policyPrimaryTerm = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.longValue() POLICY_COMPLETED -> policyCompleted = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.booleanValue() ROLLED_OVER -> rolledOver = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.booleanValue() + ROLLED_OVER_INDEX_NAME -> rolledOverIndexName = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.text() INDEX_CREATION_DATE -> indexCreationDate = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.longValue() TRANSITION_TO -> transitionTo = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.text() StateMetaData.STATE -> { @@ -277,23 +287,24 @@ data class ManagedIndexMetaData( } return ManagedIndexMetaData( - requireNotNull(index) { "$INDEX is null" }, - requireNotNull(indexUuid) { "$INDEX_UUID is null" }, - requireNotNull(policyID) { "$POLICY_ID is null" }, - policySeqNo, - policyPrimaryTerm, - policyCompleted, - rolledOver, - indexCreationDate, - transitionTo, - state, - action, - step, - retryInfo, - info, - id, - seqNo, - primaryTerm + index = requireNotNull(index) { "$INDEX is null" }, + indexUuid = requireNotNull(indexUuid) { "$INDEX_UUID is null" }, + policyID = requireNotNull(policyID) { "$POLICY_ID is null" }, + policySeqNo = policySeqNo, + policyPrimaryTerm = policyPrimaryTerm, + policyCompleted = policyCompleted, + rolledOver = rolledOver, + rolledOverIndexName = rolledOverIndexName, + indexCreationDate = indexCreationDate, + transitionTo = transitionTo, + stateMetaData = state, + actionMetaData = action, + stepMetaData = step, + policyRetryInfo = retryInfo, + info = info, + id = id, + seqNo = seqNo, + primaryTerm = primaryTerm, ) } @@ -323,6 +334,7 @@ data class ManagedIndexMetaData( policyPrimaryTerm = map[POLICY_PRIMARY_TERM]?.toLong(), policyCompleted = map[POLICY_COMPLETED]?.toBoolean(), rolledOver = map[ROLLED_OVER]?.toBoolean(), + rolledOverIndexName = map[ROLLED_OVER_INDEX_NAME], indexCreationDate = map[INDEX_CREATION_DATE]?.toLong(), transitionTo = map[TRANSITION_TO], stateMetaData = StateMetaData.fromManagedIndexMetaDataMap(map), diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverAction.kt index 48f892032..259afae4d 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverAction.kt @@ -20,6 +20,7 @@ class RolloverAction( val minDocs: Long?, val minAge: TimeValue?, val minPrimaryShardSize: ByteSizeValue?, + val copyAlias: Boolean = false, index: Int ) : Action(name, index) { @@ -47,6 +48,7 @@ class RolloverAction( if (minDocs != null) builder.field(MIN_DOC_COUNT_FIELD, minDocs) if (minAge != null) builder.field(MIN_INDEX_AGE_FIELD, minAge.stringRep) if (minPrimaryShardSize != null) builder.field(MIN_PRIMARY_SHARD_SIZE_FIELD, minPrimaryShardSize.stringRep) + builder.field(COPY_ALIAS_FIELD, copyAlias) builder.endObject() } @@ -55,6 +57,7 @@ class RolloverAction( out.writeOptionalLong(minDocs) out.writeOptionalTimeValue(minAge) out.writeOptionalWriteable(minPrimaryShardSize) + out.writeBoolean(copyAlias) out.writeInt(actionIndex) } @@ -64,5 +67,6 @@ class RolloverAction( const val MIN_DOC_COUNT_FIELD = "min_doc_count" const val MIN_INDEX_AGE_FIELD = "min_index_age" const val MIN_PRIMARY_SHARD_SIZE_FIELD = "min_primary_shard_size" + const val COPY_ALIAS_FIELD = "copy_alias" } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionParser.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionParser.kt index 7671754ad..5f9749f2a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionParser.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionParser.kt @@ -19,9 +19,10 @@ class RolloverActionParser : ActionParser() { val minDocs = sin.readOptionalLong() val minAge = sin.readOptionalTimeValue() val minPrimaryShardSize = sin.readOptionalWriteable(::ByteSizeValue) + val copyAlias = sin.readBoolean() val index = sin.readInt() - return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, index) + return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, copyAlias, index) } override fun fromXContent(xcp: XContentParser, index: Int): Action { @@ -29,6 +30,7 @@ class RolloverActionParser : ActionParser() { var minDocs: Long? = null var minAge: TimeValue? = null var minPrimaryShardSize: ByteSizeValue? = null + var copyAlias = false ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { @@ -44,11 +46,12 @@ class RolloverActionParser : ActionParser() { RolloverAction .MIN_PRIMARY_SHARD_SIZE_FIELD ) + RolloverAction.COPY_ALIAS_FIELD -> copyAlias = xcp.booleanValue() else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in RolloverAction.") } } - return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, index) + return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, copyAlias, index) } override fun getActionType(): String { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt index 6657e672a..009935128 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt @@ -7,11 +7,16 @@ package org.opensearch.indexmanagement.indexstatemanagement.step.rollover import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper +import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest +import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions import org.opensearch.action.admin.indices.rollover.RolloverRequest import org.opensearch.action.admin.indices.rollover.RolloverResponse import org.opensearch.action.admin.indices.stats.IndicesStatsRequest import org.opensearch.action.admin.indices.stats.IndicesStatsResponse import org.opensearch.common.unit.ByteSizeValue +import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService import org.opensearch.common.unit.TimeValue import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getRolloverAlias @@ -33,6 +38,7 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) { private val logger = LogManager.getLogger(javaClass) private var stepStatus = StepStatus.STARTING private var info: Map? = null + private var newIndex: String? = null @Suppress("ComplexMethod", "LongMethod") override suspend fun execute(): Step { @@ -53,6 +59,9 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) { if (clusterService.state().metadata.index(indexName).rolloverInfos.containsKey(rolloverTarget)) { stepStatus = StepStatus.COMPLETED info = mapOf("message" to getAlreadyRolledOverMessage(indexName, rolloverTarget)) + + // If already rolled over, alias may not get copied over yet + copyAlias(clusterService, indexName, context.client, rolloverTarget, context.metadata) return this } @@ -114,6 +123,7 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) { " numDocs=$numDocs, indexSize=${indexSize.bytes}, primaryShardSize=${largestPrimaryShardSize.bytes}]" ) executeRollover(context, rolloverTarget, isDataStream, conditions) + copyAlias(clusterService, indexName, context.client, rolloverTarget, context.metadata) } else { stepStatus = StepStatus.CONDITION_NOT_MET info = mapOf("message" to getPendingMessage(indexName), "conditions" to conditions) @@ -158,14 +168,14 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) { val indexName = context.metadata.index val metadata = context.clusterService.state().metadata val indexAlias = metadata.index(indexName)?.aliases?.get(alias) - logger.debug("Index $indexName has aliases $indexAlias") + logger.debug("Index {} has aliases {}", indexName, indexAlias) if (indexAlias == null) { return false } val isWriteIndex = indexAlias.writeIndex() // this could be null if (isWriteIndex != true) { val aliasIndices = metadata.indicesLookup[alias]?.indices?.map { it.index } - logger.debug("Alias $alias contains indices $aliasIndices") + logger.debug("Alias {} contains indices {}", alias, aliasIndices) if (aliasIndices != null && aliasIndices.size > 1) { return false } @@ -225,6 +235,9 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) { else -> getSuccessMessage(indexName) } + // Save newIndex later to metadata to be reused in case of failures + newIndex = response.newIndex + stepStatus = StepStatus.COMPLETED info = listOfNotNull( "message" to message, @@ -251,21 +264,87 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) { } } + /** + * This method should be called ASAP after rollover succeed + * + * Rollover currently only copy the alias being rolled over on to new index + * This method copy any remaining aliases to new index + * + * TODO This method can be deprecated once this issue finished + * https://github.com/opensearch-project/index-management/issues/849 finished + */ + private suspend fun copyAlias( + clusterService: ClusterService, + indexName: String, + client: Client, + rolloverTarget: String, + metadata: ManagedIndexMetaData + ) { + if (!action.copyAlias) return + val rolledOverIndexName = newIndex ?: metadata.rolledOverIndexName + if (rolledOverIndexName == null) { + logger.warn("Cannot find rolled over index to copy aliases to.") + return + } + + val aliasActions = mutableListOf() + val aliases = clusterService.state().metadata().index(indexName).aliases + for (alias in aliases) { + val aliasName = alias.key + // Skip the alias that has been rolled over on, since it's already copied + if (aliasName == rolloverTarget) continue + + val aliasMetadata = alias.value + val aliasAction = AliasActions(AliasActions.Type.ADD).index(rolledOverIndexName) + .alias(aliasMetadata.alias) + .filter(aliasMetadata.filter?.toString()) + .searchRouting(aliasMetadata.searchRouting) + .indexRouting(aliasMetadata.indexRouting) + .isHidden(aliasMetadata.isHidden) + aliasActions.add(aliasAction) + } + val aliasReq = IndicesAliasesRequest() + aliasActions.forEach { aliasReq.addAliasAction(it) } + + // Preserve the rolled over conditions + val conditions = info?.get("conditions") ?: context?.metadata?.info?.get("conditions") + try { + val aliasRes: AcknowledgedResponse = client.admin().indices().suspendUntil { aliases(aliasReq, it) } + if (aliasRes.isAcknowledged) { + stepStatus = StepStatus.COMPLETED + info = listOfNotNull( + "message" to getSuccessCopyAliasMessage(indexName, rolledOverIndexName), + if (conditions != null) "conditions" to conditions else null + ).toMap() + } else { + stepStatus = StepStatus.FAILED + info = listOfNotNull( + "message" to getFailedCopyAliasMessage(indexName, rolledOverIndexName), + if (conditions != null) "conditions" to conditions else null + ).toMap() + } + } catch (e: Exception) { + handleException(indexName, e, getFailedCopyAliasMessage(indexName, rolledOverIndexName), conditions) + } + } + override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData { return currentMetadata.copy( stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus), rolledOver = if (currentMetadata.rolledOver == true) true else stepStatus == StepStatus.COMPLETED, + rolledOverIndexName = if (currentMetadata.rolledOverIndexName != null) currentMetadata.rolledOverIndexName else newIndex, transitionTo = null, info = info ) } - private fun handleException(indexName: String, e: Exception, message: String = getFailedMessage(indexName)) { + private fun handleException(indexName: String, e: Exception, message: String = getFailedMessage(indexName), conditions: Any? = null) { logger.error(message, e) stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to message) + val mutableInfo: MutableMap = mutableMapOf("message" to message) val errorMessage = e.message if (errorMessage != null) mutableInfo["cause"] = errorMessage + if (conditions != null) mutableInfo["conditions"] = conditions info = mutableInfo.toMap() } @@ -288,5 +367,9 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) { fun getSkipRolloverMessage(index: String) = "Skipped rollover action for [index=$index]" fun getAlreadyRolledOverMessage(index: String, alias: String) = "This index has already been rolled over using this alias, treating as a success [index=$index, alias=$alias]" + fun getSuccessCopyAliasMessage(index: String, newIndex: String) = + "Successfully rolled over and copied alias from [index=$index] to [index=$newIndex]" + fun getFailedCopyAliasMessage(index: String, newIndex: String) = + "Successfully rolled over but failed to copied alias from [index=$index] to [index=$newIndex]" } } diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json index e82a1937a..588e886b4 100644 --- a/src/main/resources/mappings/opendistro-ism-config.json +++ b/src/main/resources/mappings/opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 18 + "schema_version": 19 }, "dynamic": "strict", "properties": { @@ -229,6 +229,9 @@ }, "min_primary_shard_size": { "type": "keyword" + }, + "copy_alias": { + "type": "boolean" } } }, @@ -708,6 +711,9 @@ "rolled_over": { "type": "boolean" }, + "rolled_over_index_name": { + "type": "keyword" + }, "index_creation_date": { "type": "date", "format": "strict_date_time||epoch_millis" diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt index 9d312a193..20c27fb8a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt @@ -37,7 +37,7 @@ import kotlin.collections.HashSet abstract class IndexManagementRestTestCase : ODFERestTestCase() { - val configSchemaVersion = 18 + val configSchemaVersion = 19 val historySchemaVersion = 5 // Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt index ff6283f2a..505315f61 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt @@ -24,9 +24,9 @@ import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue import org.opensearch.core.xcontent.DeprecationHandler import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.xcontent.XContentParser.Token -import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken import org.opensearch.common.xcontent.XContentType import org.opensearch.common.xcontent.json.JsonXContent.jsonXContent import org.opensearch.index.seqno.SequenceNumbers @@ -67,6 +67,7 @@ import org.opensearch.indexmanagement.waitFor import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.rest.RestRequest import org.opensearch.rest.RestStatus +import org.opensearch.indexmanagement.rollup.randomTermQuery import org.opensearch.test.OpenSearchTestCase import java.io.IOException import java.time.Duration @@ -238,16 +239,28 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() index: String, alias: String, action: String = "remove", - isWriteIndex: Boolean = false + isWriteIndex: Boolean = false, + routing: Int? = null, + searchRouting: Int = randomInt(), + indexRouting: Int = randomInt(), + filter: String = randomTermQuery().toString(), + isHidden: Boolean = randomBoolean() ) { val isWriteIndexField = if (isWriteIndex) "\",\"is_write_index\": \"$isWriteIndex" else "" + val params = if (action == "add" && routing != null) """ + ,"routing": $routing, + "search_routing": $searchRouting, + "index_routing": $indexRouting, + "filter": $filter, + "is_hidden": $isHidden + """.trimIndent() else "" val body = """ { "actions": [ { "$action": { "index": "$index", - "alias": "$alias$isWriteIndexField" + "alias": "$alias$isWriteIndexField"$params } } ] diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt index b0eb1f86d..54ac68f5a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt @@ -36,13 +36,12 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { private val testIndexName = javaClass.simpleName.lowercase(Locale.ROOT) - @Suppress("UNCHECKED_CAST") fun `test rollover no condition`() { val aliasName = "${testIndexName}_alias" val indexNameBase = "${testIndexName}_index" val firstIndex = "$indexNameBase-1" val policyID = "${testIndexName}_testPolicyName_1" - val actionConfig = RolloverAction(null, null, null, null, 0) + val actionConfig = RolloverAction(null, null, null, null, false, 0) val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) val policy = Policy( id = policyID, @@ -96,7 +95,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { ) val policyID = "${testIndexName}_bwc" - val actionConfig = RolloverAction(null, null, null, null, 0) + val actionConfig = RolloverAction(null, null, null, null, false, 0) val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) val policy = Policy( id = policyID, @@ -133,7 +132,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { val indexNameBase = "${testIndexName}_index_byte" val firstIndex = "$indexNameBase-1" val policyID = "${testIndexName}_testPolicyName_byte_1" - val actionConfig = RolloverAction(ByteSizeValue(10, ByteSizeUnit.BYTES), 1000000, null, null, 0) + val actionConfig = RolloverAction(ByteSizeValue(10, ByteSizeUnit.BYTES), 1000000, null, null, false, 0) val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) val policy = Policy( id = policyID, @@ -205,7 +204,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { val indexNameBase = "${testIndexName}_index_primary_shard" val firstIndex = "$indexNameBase-1" val policyID = "${testIndexName}_primary_shard_1" - val actionConfig = RolloverAction(null, null, null, ByteSizeValue(100, ByteSizeUnit.KB), 0) + val actionConfig = RolloverAction(null, null, null, ByteSizeValue(100, ByteSizeUnit.KB), false, 0) val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) val policy = Policy( id = policyID, @@ -315,7 +314,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { val indexNameBase = "${testIndexName}_index_doc" val firstIndex = "$indexNameBase-1" val policyID = "${testIndexName}_testPolicyName_doc_1" - val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, 0) + val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, false, 0) val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) val policy = Policy( id = policyID, @@ -389,7 +388,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { val index2 = "index-2" val alias1 = "x" val policyID = "${testIndexName}_precheck" - val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, 0) + val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, false, 0) actionConfig.configRetry = ActionRetry(0) val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) val policy = Policy( @@ -448,7 +447,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { val policyID = "${testIndexName}_rollover_policy" // Create the rollover policy - val rolloverAction = RolloverAction(null, null, null, null, 0) + val rolloverAction = RolloverAction(null, null, null, null, false, 0) val states = listOf(State(name = "default", actions = listOf(rolloverAction), transitions = listOf())) val policy = Policy( id = policyID, @@ -504,7 +503,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { val policyID = "${testIndexName}_rollover_policy_multi" // Create the rollover policy - val rolloverAction = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, 0) + val rolloverAction = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, false, 0) val states = listOf(State(name = "default", actions = listOf(rolloverAction), transitions = listOf())) val policy = Policy( id = policyID, @@ -591,13 +590,12 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { Assert.assertTrue("New rollover index does not exist.", indexExists(secondIndexName)) } - @Suppress("UNCHECKED_CAST") fun `test rollover from outside ISM doesn't fail ISM job`() { val aliasName = "${testIndexName}_alias" val indexNameBase = "${testIndexName}_index" val firstIndex = "$indexNameBase-1" val policyID = "${testIndexName}_testPolicyName_1" - val actionConfig = RolloverAction(null, null, null, null, 0) + val actionConfig = RolloverAction(null, null, null, null, false, 0) val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) val policy = Policy( id = policyID, @@ -632,4 +630,100 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { } assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002")) } + + @Suppress("UNCHECKED_CAST") + fun `test rollover with copy alias`() { + val aliasName = "${testIndexName}_doc_alias" + val indexNameBase = "${testIndexName}_index_doc" + val firstIndex = "$indexNameBase-1" + val policyID = "${testIndexName}_testPolicyName_doc_1" + val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, true, 0) + val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) + val policy = Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states + ) + + createPolicy(policy, policyID) + // create index defaults + createIndex(firstIndex, policyID, aliasName) + + // Add a bunch of aliases + changeAlias( + firstIndex, "test_alias1", "add", routing = 0, searchRouting = 1, indexRouting = 2, + filter = """ + { "term": { "user.id": "kimchy" } } + """.trimIndent(), + isHidden = false + ) + changeAlias(firstIndex, "test_alias2", "add") + changeAlias(firstIndex, "test_alias3", "add") + + val managedIndexConfig = getExistingManagedIndexConfig(firstIndex) + + // Change the start time so the job will trigger in 2 seconds, this will trigger the first initialization of the policy + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(firstIndex).policyID) } + + // Need to speed up to second execution where it will trigger the first execution of the action + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + val info = getExplainManagedIndexMetaData(firstIndex).info as Map + assertEquals( + "Index rollover before it met the condition.", + AttemptRolloverStep.getPendingMessage(firstIndex), info["message"] + ) + val conditions = info["conditions"] as Map + assertEquals( + "Did not have exclusively min age and min doc count conditions", + setOf(RolloverAction.MIN_INDEX_AGE_FIELD, RolloverAction.MIN_DOC_COUNT_FIELD), conditions.keys + ) + val minAge = conditions[RolloverAction.MIN_INDEX_AGE_FIELD] as Map + val minDocCount = conditions[RolloverAction.MIN_DOC_COUNT_FIELD] as Map + assertEquals("Did not have min age condition", "2d", minAge["condition"]) + assertTrue("Did not have min age current", minAge["current"] is String) + assertEquals("Did not have min doc count condition", 3, minDocCount["condition"]) + assertEquals("Did not have min doc count current", 0, minDocCount["current"]) + } + + insertSampleData(index = firstIndex, docCount = 5, delay = 0) + + // Need to speed up to second execution where it will trigger the first execution of the action + updateManagedIndexConfigStartTime(managedIndexConfig) + val newIndex = "$indexNameBase-000002" + waitFor { + val info = getExplainManagedIndexMetaData(firstIndex).info as Map + assertEquals("Index did not rollover", AttemptRolloverStep.getSuccessCopyAliasMessage(firstIndex, newIndex), info["message"]) + val conditions = info["conditions"] as Map + assertEquals( + "Did not have exclusively min age and min doc count conditions", + setOf(RolloverAction.MIN_INDEX_AGE_FIELD, RolloverAction.MIN_DOC_COUNT_FIELD), conditions.keys + ) + val minAge = conditions[RolloverAction.MIN_INDEX_AGE_FIELD] as Map + val minDocCount = conditions[RolloverAction.MIN_DOC_COUNT_FIELD] as Map + assertEquals("Did not have min age condition", "2d", minAge["condition"]) + assertTrue("Did not have min age current", minAge["current"] is String) + assertEquals("Did not have min doc count condition", 3, minDocCount["condition"]) + assertEquals("Did not have min doc count current", 5, minDocCount["current"]) + } + Assert.assertTrue("New rollover index does not exist.", indexExists(newIndex)) + + // Check if new index has the aliases + val alias = getAlias(newIndex, "") + Assert.assertEquals(alias.containsKey("test_alias1"), true) + alias["test_alias1"]!!.let { + it as Map + assertEquals(it["is_hidden"], false) + assertEquals(it["search_routing"], "1") + assertEquals(it["index_routing"], "2") + assertEquals(it["filter"]!!.toString(), "{term={user.id=kimchy}}") + } + Assert.assertEquals(alias.containsKey("test_alias2"), true) + Assert.assertEquals(alias.containsKey("test_alias3"), true) + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverIT.kt index f1b20c14f..4127de728 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverIT.kt @@ -32,7 +32,7 @@ class ValidateRolloverIT : IndexStateManagementRestTestCase() { val index1 = "index-1" val alias1 = "x" val policyID = "${testIndexName}_precheck" - val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, 0) + val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, false, 0) actionConfig.configRetry = ActionRetry(0) val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) val policy = Policy( @@ -84,7 +84,7 @@ class ValidateRolloverIT : IndexStateManagementRestTestCase() { val indexNameBase = "${testIndexName}_index" val index1 = "$indexNameBase-1" val policyID = "${testIndexName}_testPolicyName_1" - val actionConfig = RolloverAction(null, null, null, null, 0) + val actionConfig = RolloverAction(null, null, null, null, false, 0) val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) val policy = Policy( id = policyID, @@ -131,7 +131,7 @@ class ValidateRolloverIT : IndexStateManagementRestTestCase() { val index1 = "index-1" val index2 = "index-2" val policyID = "${testIndexName}_precheck" - val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, 0) + val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, false, 0) actionConfig.configRetry = ActionRetry(0) val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) val policy = Policy( @@ -175,7 +175,7 @@ class ValidateRolloverIT : IndexStateManagementRestTestCase() { val index2 = "index-2" val alias1 = "x" val policyID = "${testIndexName}_precheck" - val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, 0) + val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, false, 0) actionConfig.configRetry = ActionRetry(0) val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) val policy = Policy( diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverTests.kt index 805d3b045..367a38e05 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverTests.kt @@ -40,7 +40,7 @@ class ValidateRolloverTests : OpenSearchTestCase() { ("rollover", 1, 0, false, 0, null, null), null, null, null ) - val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, 0) + val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, false, 0) private val client: Client = mock() private val lockService: LockService = LockService(mock(), clusterService) private val validate = ValidateRollover(settings, clusterService, jvmService) diff --git a/src/test/resources/mappings/cached-opendistro-ism-config.json b/src/test/resources/mappings/cached-opendistro-ism-config.json index e82a1937a..588e886b4 100644 --- a/src/test/resources/mappings/cached-opendistro-ism-config.json +++ b/src/test/resources/mappings/cached-opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 18 + "schema_version": 19 }, "dynamic": "strict", "properties": { @@ -229,6 +229,9 @@ }, "min_primary_shard_size": { "type": "keyword" + }, + "copy_alias": { + "type": "boolean" } } }, @@ -708,6 +711,9 @@ "rolled_over": { "type": "boolean" }, + "rolled_over_index_name": { + "type": "keyword" + }, "index_creation_date": { "type": "date", "format": "strict_date_time||epoch_millis"