diff --git a/build.gradle b/build.gradle index 698d2be56..839ff8af4 100644 --- a/build.gradle +++ b/build.gradle @@ -308,7 +308,6 @@ integTest { exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/NotificationActionIT.class' exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/OpenActionIT.class' exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/ReplicaCountActionIT.class' - exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.class' exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/RollupActionIT.class' exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/SnapshotActionIT.class' exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/TransitionActionIT.class' diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Action.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Action.kt index 6a95b1534..d3d68d58d 100644 --- a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Action.kt +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Action.kt @@ -20,7 +20,7 @@ abstract class Action( ) : ToXContentObject, Writeable { var configTimeout: ActionTimeout? = null - var configRetry: ActionRetry? = null + var configRetry: ActionRetry? = ActionRetry(DEFAULT_RETRIES) final override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { builder.startObject() @@ -67,4 +67,8 @@ abstract class Action( final fun isLastStep(stepName: String): Boolean = getSteps().last().name == stepName final fun isFirstStep(stepName: String): Boolean = getSteps().first().name == stepName + + companion object { + const val DEFAULT_RETRIES = 3L + } } diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionRetry.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionRetry.kt index 5cbaf75ec..c60b88b99 100644 --- a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionRetry.kt +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionRetry.kt @@ -26,7 +26,7 @@ data class ActionRetry( val delay: TimeValue = TimeValue.timeValueMinutes(1) ) : ToXContentFragment, Writeable { - init { require(count > 0) { "Count for ActionRetry must be greater than 0" } } + init { require(count >= 0) { "Count for ActionRetry must be a non-negative number" } } override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { builder diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt index 8d5be5f7a..204374968 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt @@ -12,6 +12,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.CloseActionPar import org.opensearch.indexmanagement.indexstatemanagement.action.DeleteActionParser import org.opensearch.indexmanagement.indexstatemanagement.action.ReadOnlyActionParser import org.opensearch.indexmanagement.indexstatemanagement.action.ReadWriteActionParser +import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverActionParser import org.opensearch.indexmanagement.spi.indexstatemanagement.Action import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry @@ -28,7 +29,8 @@ class ISMActionsParser private constructor() { CloseActionParser(), DeleteActionParser(), ReadOnlyActionParser(), - ReadWriteActionParser() + ReadWriteActionParser(), + RolloverActionParser() ) fun addParser(parser: ActionParser) { @@ -51,7 +53,7 @@ class ISMActionsParser private constructor() { fun parse(xcp: XContentParser, totalActions: Int): Action { var action: Action? = null var timeout: ActionTimeout? = null - var retry: ActionRetry? = null + var retry: ActionRetry? = ActionRetry(Action.DEFAULT_RETRIES) XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { val type = xcp.currentName() diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverAction.kt index 671046aa7..fbfd61228 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverAction.kt @@ -5,8 +5,12 @@ package org.opensearch.indexmanagement.indexstatemanagement.action +import org.opensearch.common.io.stream.StreamOutput import org.opensearch.common.unit.ByteSizeValue import org.opensearch.common.unit.TimeValue +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.indexmanagement.indexstatemanagement.step.rollover.AttemptRolloverStep import org.opensearch.indexmanagement.spi.indexstatemanagement.Action import org.opensearch.indexmanagement.spi.indexstatemanagement.Step import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext @@ -18,12 +22,34 @@ class RolloverAction( index: Int ) : Action(name, index) { + init { + if (minSize != null) require(minSize.bytes > 0) { "RolloverAction minSize value must be greater than 0" } + + if (minDocs != null) require(minDocs > 0) { "RolloverAction minDocs value must be greater than 0" } + } + + private val attemptRolloverStep = AttemptRolloverStep(this) + private val steps = listOf(attemptRolloverStep) + override fun getStepToExecute(context: StepContext): Step { - TODO("Not yet implemented") + return attemptRolloverStep + } + + override fun getSteps(): List = steps + + override fun populateAction(builder: XContentBuilder, params: ToXContent.Params) { + builder.startObject(type) + if (minSize != null) builder.field(MIN_SIZE_FIELD, minSize.stringRep) + if (minDocs != null) builder.field(MIN_DOC_COUNT_FIELD, minDocs) + if (minAge != null) builder.field(MIN_INDEX_AGE_FIELD, minAge.stringRep) + builder.endObject() } - override fun getSteps(): List { - TODO("Not yet implemented") + override fun populateAction(out: StreamOutput) { + out.writeOptionalWriteable(minSize) + out.writeOptionalLong(minDocs) + out.writeOptionalTimeValue(minAge) + out.writeInt(actionIndex) } companion object { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionParser.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionParser.kt index be2dd20f7..6a9618a08 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionParser.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionParser.kt @@ -6,20 +6,45 @@ package org.opensearch.indexmanagement.indexstatemanagement.action import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.unit.ByteSizeValue +import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken import org.opensearch.indexmanagement.spi.indexstatemanagement.Action import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser class RolloverActionParser : ActionParser() { override fun fromStreamInput(sin: StreamInput): Action { - TODO("Not yet implemented") + val minSize = sin.readOptionalWriteable(::ByteSizeValue) + val minDocs = sin.readOptionalLong() + val minAge = sin.readOptionalTimeValue() + val index = sin.readInt() + + return RolloverAction(minSize, minDocs, minAge, index) } override fun fromXContent(xcp: XContentParser, index: Int): Action { - TODO("Not yet implemented") + var minSize: ByteSizeValue? = null + var minDocs: Long? = null + var minAge: TimeValue? = null + + ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + RolloverAction.MIN_SIZE_FIELD -> minSize = ByteSizeValue.parseBytesSizeValue(xcp.text(), RolloverAction.MIN_SIZE_FIELD) + RolloverAction.MIN_DOC_COUNT_FIELD -> minDocs = xcp.longValue() + RolloverAction.MIN_INDEX_AGE_FIELD -> minAge = TimeValue.parseTimeValue(xcp.text(), RolloverAction.MIN_INDEX_AGE_FIELD) + else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in RolloverAction.") + } + } + + return RolloverAction(minSize, minDocs, minAge, index) } override fun getActionType(): String { - TODO("Not yet implemented") + return RolloverAction.name } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt index 6ff2d0797..f9637fd21 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt @@ -5,32 +5,279 @@ package org.opensearch.indexmanagement.indexstatemanagement.step.rollover +import org.apache.logging.log4j.LogManager +import org.opensearch.ExceptionsHelper +import org.opensearch.action.admin.indices.rollover.RolloverRequest +import org.opensearch.action.admin.indices.rollover.RolloverResponse +import org.opensearch.action.admin.indices.stats.IndicesStatsRequest +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse +import org.opensearch.common.unit.ByteSizeValue +import org.opensearch.common.unit.TimeValue +import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction +import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getRolloverAlias +import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getRolloverSkip +import org.opensearch.indexmanagement.indexstatemanagement.util.evaluateConditions +import org.opensearch.indexmanagement.opensearchapi.getUsefulCauseString +import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.spi.indexstatemanagement.Step import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.rest.RestStatus +import org.opensearch.transport.RemoteTransportException +import java.time.Instant -class AttemptRolloverStep : Step(name) { +@Suppress("ReturnCount") +class AttemptRolloverStep(private val action: RolloverAction) : Step(name) { + private val logger = LogManager.getLogger(javaClass) + private var stepStatus = StepStatus.STARTING + private var info: Map? = null + + @Suppress("ComplexMethod", "LongMethod") override suspend fun execute(): Step { - TODO("Not yet implemented") + val context = this.context ?: return this + val indexName = context.metadata.index + val clusterService = context.clusterService + val skipRollover = clusterService.state().metadata.index(indexName).getRolloverSkip() + if (skipRollover) { + stepStatus = StepStatus.COMPLETED + info = mapOf("message" to getSkipRolloverMessage(indexName)) + return this + } + + val (rolloverTarget, isDataStream) = getRolloverTargetOrUpdateInfo(context) + // If the rolloverTarget is null, we would've already updated the failed info from getRolloverTargetOrUpdateInfo and can return early + rolloverTarget ?: return this + + if (clusterService.state().metadata.index(indexName).rolloverInfos.containsKey(rolloverTarget)) { + stepStatus = StepStatus.COMPLETED + info = mapOf("message" to getAlreadyRolledOverMessage(indexName, rolloverTarget)) + return this + } + + if (!isDataStream && !preCheckIndexAlias(context, rolloverTarget)) { + stepStatus = StepStatus.FAILED + info = mapOf("message" to getFailedPreCheckMessage(indexName)) + return this + } + + val statsResponse = getIndexStatsOrUpdateInfo(context) + // If statsResponse is null we already updated failed info from getIndexStatsOrUpdateInfo and can return early + statsResponse ?: return this + + val indexCreationDate = clusterService.state().metadata().index(indexName).creationDate + val indexAgeTimeValue = if (indexCreationDate == -1L) { + logger.warn("$indexName had an indexCreationDate=-1L, cannot use for comparison") + // since we cannot use for comparison, we can set it to 0 as minAge will never be <= 0 + TimeValue.timeValueMillis(0) + } else { + TimeValue.timeValueMillis(Instant.now().toEpochMilli() - indexCreationDate) + } + val numDocs = statsResponse.primaries.docs?.count ?: 0 + val indexSize = ByteSizeValue(statsResponse.primaries.docs?.totalSizeInBytes ?: 0) + + val conditions = listOfNotNull( + action.minAge?.let { + RolloverAction.MIN_INDEX_AGE_FIELD to mapOf( + "condition" to it.toString(), + "current" to indexAgeTimeValue.toString(), + "creationDate" to indexCreationDate + ) + }, + action.minDocs?.let { + RolloverAction.MIN_DOC_COUNT_FIELD to mapOf( + "condition" to it, + "current" to numDocs + ) + }, + action.minSize?.let { + RolloverAction.MIN_SIZE_FIELD to mapOf( + "condition" to it.toString(), + "current" to indexSize.toString() + ) + } + ).toMap() + + if (action.evaluateConditions(indexAgeTimeValue, numDocs, indexSize)) { + logger.info( + "$indexName rollover conditions evaluated to true [indexCreationDate=$indexCreationDate," + + " numDocs=$numDocs, indexSize=${indexSize.bytes}]" + ) + executeRollover(context, rolloverTarget, isDataStream, conditions) + } else { + stepStatus = StepStatus.CONDITION_NOT_MET + info = mapOf("message" to getPendingMessage(indexName), "conditions" to conditions) + } + + return this + } + + private fun getRolloverTargetOrUpdateInfo(context: StepContext): Pair { + val indexName = context.metadata.index + val metadata = context.clusterService.state().metadata() + val indexAbstraction = metadata.indicesLookup[indexName] + val isDataStreamIndex = indexAbstraction?.parentDataStream != null + + val rolloverTarget = when { + isDataStreamIndex -> indexAbstraction?.parentDataStream?.name + else -> metadata.index(indexName).getRolloverAlias() + } + + if (rolloverTarget == null) { + val message = getFailedNoValidAliasMessage(indexName) + logger.warn(message) + stepStatus = StepStatus.FAILED + info = mapOf("message" to message) + } + + return rolloverTarget to isDataStreamIndex + } + + /** + * pre-condition check on managed-index's alias before rollover + * + * This will block + * when managed index doesn't have alias + * when managed index has alias but not the write index, + * and this alias contains more than one index + * User can use skip rollover setting to bypass this + * + * @param alias user defined ISM rollover alias + */ + private fun preCheckIndexAlias(context: StepContext, alias: String): Boolean { + val indexName = context.metadata.index + val metadata = context.clusterService.state().metadata + val indexAlias = metadata.index(indexName)?.aliases?.get(alias) + logger.debug("Index $indexName has aliases $indexAlias") + if (indexAlias == null) { + return false + } + val isWriteIndex = indexAlias.writeIndex() // this could be null + if (isWriteIndex != true) { + val aliasIndices = metadata.indicesLookup[alias]?.indices?.map { it.index } + logger.debug("Alias $alias contains indices $aliasIndices") + if (aliasIndices != null && aliasIndices.size > 1) { + return false + } + } + + return true + } + + private suspend fun getIndexStatsOrUpdateInfo(context: StepContext): IndicesStatsResponse? { + val indexName = context.metadata.index + try { + val statsRequest = IndicesStatsRequest() + .indices(indexName).clear().docs(true) + val statsResponse: IndicesStatsResponse = context.client.admin().indices().suspendUntil { stats(statsRequest, it) } + + if (statsResponse.status == RestStatus.OK) { + return statsResponse + } + + val message = getFailedEvaluateMessage(indexName) + logger.warn("$message - ${statsResponse.status}") + stepStatus = StepStatus.FAILED + info = mapOf( + "message" to message, + "shard_failures" to statsResponse.shardFailures.map { it.getUsefulCauseString() } + ) + } catch (e: RemoteTransportException) { + handleException(indexName, ExceptionsHelper.unwrapCause(e) as Exception) + } catch (e: Exception) { + handleException(indexName, e, getFailedEvaluateMessage(indexName)) + } + + return null + } + + @Suppress("ComplexMethod") + private suspend fun executeRollover( + context: StepContext, + rolloverTarget: String, + isDataStream: Boolean, + conditions: Map> + ) { + val indexName = context.metadata.index + try { + val request = RolloverRequest(rolloverTarget, null) + val response: RolloverResponse = context.client.admin().indices().suspendUntil { rolloverIndex(request, it) } + + // Do not need to check for isRolledOver as we are not passing any conditions or dryrun=true + // which are the only two ways it comes back as false + + // If the response is acknowledged, then the new index is created and added to one of the following index abstractions: + // 1. IndexAbstraction.Type.DATA_STREAM - the new index is added to the data stream indicated by the 'rolloverTarget' + // 2. IndexAbstraction.Type.ALIAS - the new index is added to the alias indicated by the 'rolloverTarget' + if (response.isAcknowledged) { + val message = when { + isDataStream -> getSuccessDataStreamRolloverMessage(rolloverTarget, indexName) + else -> getSuccessMessage(indexName) + } + + stepStatus = StepStatus.COMPLETED + info = listOfNotNull( + "message" to message, + if (conditions.isEmpty()) null else "conditions" to conditions // don't show empty conditions object if no conditions specified + ).toMap() + } else { + val message = when { + isDataStream -> getFailedDataStreamRolloverMessage(rolloverTarget) + + // If the alias update response was NOT acknowledged, then the new index was created but we failed to swap the alias + else -> getFailedAliasUpdateMessage(indexName, response.newIndex) + } + logger.warn(message) + stepStatus = StepStatus.FAILED + info = listOfNotNull( + "message" to message, + if (conditions.isEmpty()) null else "conditions" to conditions // don't show empty conditions object if no conditions specified + ).toMap() + } + } catch (e: RemoteTransportException) { + handleException(indexName, ExceptionsHelper.unwrapCause(e) as Exception) + } catch (e: Exception) { + handleException(indexName, e) + } } override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData { - TODO("Not yet implemented") + return currentMetadata.copy( + stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus), + rolledOver = if (currentMetadata.rolledOver == true) true else stepStatus == StepStatus.COMPLETED, + transitionTo = null, + info = info + ) } - override fun isIdempotent(): Boolean { - TODO("Not yet implemented") + private fun handleException(indexName: String, e: Exception, message: String = getFailedMessage(indexName)) { + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() } + override fun isIdempotent(): Boolean = false + + @Suppress("TooManyFunctions") companion object { const val name = "attempt_rollover" - // TODO: fixme - fun getFailedNoValidAliasMessage(indexName: String) = "" - fun getPendingMessage(indexName: String) = "" - fun getAlreadyRolledOverMessage(indexName: String, alias: String) = "" - fun getSuccessDataStreamRolloverMessage(dataStreamName: String, indexName: String) = "" - fun getSuccessMessage(indexName: String) = "" - fun getFailedPreCheckMessage(indexName: String) = "" - fun getSkipRolloverMessage(indexName: String) = "" + fun getFailedMessage(index: String) = "Failed to rollover index [index=$index]" + fun getFailedAliasUpdateMessage(index: String, newIndex: String) = + "New index created, but failed to update alias [index=$index, newIndex=$newIndex]" + fun getFailedDataStreamRolloverMessage(dataStream: String) = "Failed to rollover data stream [data_stream=$dataStream]" + fun getFailedNoValidAliasMessage(index: String) = "Missing rollover_alias index setting [index=$index]" + fun getFailedEvaluateMessage(index: String) = "Failed to evaluate conditions for rollover [index=$index]" + fun getPendingMessage(index: String) = "Pending rollover of index [index=$index]" + fun getSuccessMessage(index: String) = "Successfully rolled over index [index=$index]" + fun getSuccessDataStreamRolloverMessage(dataStream: String, index: String) = + "Successfully rolled over data stream [data_stream=$dataStream index=$index]" + fun getFailedPreCheckMessage(index: String) = "Missing alias or not the write index when rollover [index=$index]" + fun getSkipRolloverMessage(index: String) = "Skipped rollover action for [index=$index]" + fun getAlreadyRolledOverMessage(index: String, alias: String) = + "This index has already been rolled over using this alias, treating as a success [index=$index, alias=$alias]" } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt index a46b8ab8c..e19769ce1 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt @@ -19,6 +19,7 @@ import org.opensearch.action.update.UpdateRequest import org.opensearch.alerting.destination.message.BaseMessage import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.common.unit.ByteSizeValue +import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.XContentFactory import org.opensearch.index.Index @@ -26,6 +27,7 @@ import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.QueryBuilders import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexCoordinator +import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction import org.opensearch.indexmanagement.indexstatemanagement.model.ChangePolicy import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.model.Policy @@ -204,7 +206,8 @@ fun Transition.evaluateConditions( indexCreationDate: Instant, numDocs: Long?, indexSize: ByteSizeValue?, - transitionStartTime: Instant + transitionStartTime: Instant, + rolloverDate: Instant?, ): Boolean { // If there are no conditions, treat as always true if (this.conditions == null) return true @@ -229,15 +232,20 @@ fun Transition.evaluateConditions( return this.conditions.cron.getNextExecutionTime(transitionStartTime) <= Instant.now() } + if (this.conditions.rolloverAge != null) { + val rolloverDateMilli = rolloverDate?.toEpochMilli() ?: return false + val elapsedTime = Instant.now().toEpochMilli() - rolloverDateMilli + return this.conditions.rolloverAge.millis <= elapsedTime + } + // We should never reach this return false } fun Transition.hasStatsConditions(): Boolean = this.conditions?.docCount != null || this.conditions?.size != null -// TODO: Uncomment after the rollover action config -// @Suppress("ReturnCount") -/*fun RolloverActionConfig.evaluateConditions( +@Suppress("ReturnCount") +fun RolloverAction.evaluateConditions( indexAgeTimeValue: TimeValue, numDocs: Long, indexSize: ByteSizeValue @@ -264,7 +272,7 @@ fun Transition.hasStatsConditions(): Boolean = this.conditions?.docCount != null // return false if non of the conditions were true. return false -}*/ +} fun State.getUpdatedStateMetaData(managedIndexMetaData: ManagedIndexMetaData): StateMetaData { // If the current ManagedIndexMetaData state does not match this state, it means we transitioned and need to update the startStartTime diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt index fa13fe3b4..2ecfcb3a8 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt @@ -48,15 +48,13 @@ class ActionTests : OpenSearchTestCase() { } } - // TODO: fixme - enable the test - private fun `test rollover action minimum size of zero fails`() { + fun `test rollover action minimum size of zero fails`() { assertFailsWith(IllegalArgumentException::class, "Expected IllegalArgumentException for minSize less than 1") { randomRolloverActionConfig(minSize = ByteSizeValue.parseBytesSizeValue("0", "min_size_test")) } } - // TODO: fixme - enable the test - private fun `test rollover action minimum doc count of zero fails`() { + fun `test rollover action minimum doc count of zero fails`() { assertFailsWith(IllegalArgumentException::class, "Expected IllegalArgumentException for minDoc less than 1") { randomRolloverActionConfig(minDocs = 0) } @@ -84,8 +82,7 @@ class ActionTests : OpenSearchTestCase() { roundTripAction(randomReadOnlyActionConfig()) } - // TODO: fixme - enable the test - private fun `test rollover action round trip`() { + fun `test rollover action round trip`() { roundTripAction(randomRolloverActionConfig()) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/XContentTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/XContentTests.kt index 0495ae27a..69b87e951 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/XContentTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/XContentTests.kt @@ -87,15 +87,15 @@ class XContentTests : OpenSearchTestCase() { assertEquals("Round tripping DeleteAction doesn't work", deleteAction.convertToMap(), parsedDeleteAction.convertToMap()) } - private fun `test rollover action config parsing`() { - val rolloverActionConfig = randomRolloverActionConfig() + fun `test rollover action parsing`() { + val rolloverAction = randomRolloverActionConfig() - val rolloverActionConfigString = rolloverActionConfig.toJsonString() - val parsedRolloverActionConfig = ISMActionsParser.instance.parse(parser(rolloverActionConfigString), 0) - assertEquals("Round tripping RolloverActionConfig doesn't work", rolloverActionConfig, parsedRolloverActionConfig) + val rolloverActionString = rolloverAction.toJsonString() + val parsedRolloverAction = ISMActionsParser.instance.parse(parser(rolloverActionString), 0) + assertEquals("Round tripping RolloverAction doesn't work", rolloverAction.convertToMap(), parsedRolloverAction.convertToMap()) } - fun `test read_only action config parsing`() { + fun `test read_only action parsing`() { val readOnlyAction = randomReadOnlyActionConfig() val readOnlyActionString = readOnlyAction.toJsonString() @@ -103,7 +103,7 @@ class XContentTests : OpenSearchTestCase() { assertEquals("Round tripping ReadOnlyAction doesn't work", readOnlyAction.convertToMap(), parsedReadOnlyAction.convertToMap()) } - fun `test read_write action config parsing`() { + fun `test read_write action parsing`() { val readWriteAction = randomReadWriteActionConfig() val readWriteActionString = readWriteAction.toJsonString() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt index 266ee8724..dbc1a5fd7 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt @@ -13,21 +13,26 @@ import org.opensearch.alerting.destination.message.BaseMessage import org.opensearch.alerting.destination.message.CustomWebhookMessage import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.common.bytes.BytesReference +import org.opensearch.common.unit.ByteSizeValue +import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentType import org.opensearch.index.Index import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX +import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction +import org.opensearch.indexmanagement.indexstatemanagement.model.Conditions import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig +import org.opensearch.indexmanagement.indexstatemanagement.model.Transition import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.SweptManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.randomChangePolicy import org.opensearch.indexmanagement.indexstatemanagement.randomClusterStateManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.randomSweptManagedIndexConfig import org.opensearch.indexmanagement.opensearchapi.parseWithType import org.opensearch.test.OpenSearchTestCase +import java.time.Instant -// TODO: Fix tests after refactor class ManagedIndexUtilsTests : OpenSearchTestCase() { fun `test create managed index request`() { @@ -137,8 +142,8 @@ class ManagedIndexUtilsTests : OpenSearchTestCase() { assertEquals("Wrong index being searched", listOf(INDEX_MANAGEMENT_INDEX), indices) } - /*fun `test rollover action config evaluate conditions`() { - val noConditionsConfig = RolloverActionConfig(minSize = null, minDocs = null, minAge = null, index = 0) + fun `test rollover action config evaluate conditions`() { + val noConditionsConfig = RolloverAction(minSize = null, minDocs = null, minAge = null, index = 0) assertTrue( "No conditions should always pass", noConditionsConfig @@ -155,7 +160,7 @@ class ManagedIndexUtilsTests : OpenSearchTestCase() { .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(6000), numDocs = 5, indexSize = ByteSizeValue(5)) ) - val minSizeConfig = RolloverActionConfig(minSize = ByteSizeValue(5), minDocs = null, minAge = null, index = 0) + val minSizeConfig = RolloverAction(minSize = ByteSizeValue(5), minDocs = null, minAge = null, index = 0) assertFalse( "Less bytes should not pass", minSizeConfig @@ -172,7 +177,7 @@ class ManagedIndexUtilsTests : OpenSearchTestCase() { .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(1000), numDocs = 0, indexSize = ByteSizeValue(10)) ) - val minDocsConfig = RolloverActionConfig(minSize = null, minDocs = 5, minAge = null, index = 0) + val minDocsConfig = RolloverAction(minSize = null, minDocs = 5, minAge = null, index = 0) assertFalse( "Less docs should not pass", minDocsConfig @@ -189,7 +194,7 @@ class ManagedIndexUtilsTests : OpenSearchTestCase() { .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(1000), numDocs = 10, indexSize = ByteSizeValue.ZERO) ) - val minAgeConfig = RolloverActionConfig(minSize = null, minDocs = null, minAge = TimeValue.timeValueSeconds(5), index = 0) + val minAgeConfig = RolloverAction(minSize = null, minDocs = null, minAge = TimeValue.timeValueSeconds(5), index = 0) assertFalse( "Index age that is too young should not pass", minAgeConfig @@ -201,7 +206,7 @@ class ManagedIndexUtilsTests : OpenSearchTestCase() { .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(10000), numDocs = 0, indexSize = ByteSizeValue.ZERO) ) - val multiConfig = RolloverActionConfig(minSize = ByteSizeValue(1), minDocs = 1, minAge = TimeValue.timeValueSeconds(5), index = 0) + val multiConfig = RolloverAction(minSize = ByteSizeValue(1), minDocs = 1, minAge = TimeValue.timeValueSeconds(5), index = 0) assertFalse( "No conditions met should not pass", multiConfig @@ -222,9 +227,9 @@ class ManagedIndexUtilsTests : OpenSearchTestCase() { multiConfig .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(0), numDocs = 0, indexSize = ByteSizeValue(2)) ) - }*/ + } - /*fun `test transition evaluate conditions`() { + fun `test transition evaluate conditions`() { val emptyTransition = Transition(stateName = "some_state", conditions = null) assertTrue( "No conditions should pass", @@ -271,7 +276,7 @@ class ManagedIndexUtilsTests : OpenSearchTestCase() { rolloverTimeTransition .evaluateConditions(indexCreationDate = Instant.ofEpochMilli(-1L), numDocs = null, indexSize = null, transitionStartTime = Instant.now(), rolloverDate = null) ) - }*/ + } fun `test ips in denylist`() { val ips = listOf(