diff --git a/build.gradle b/build.gradle index fa13aca43..db285a781 100644 --- a/build.gradle +++ b/build.gradle @@ -101,7 +101,7 @@ task ktlint(type: JavaExec, group: "verification") { description = "Check Kotlin code style." main = "com.pinterest.ktlint.Main" classpath = configurations.ktlint - args "src/**/*.kt" + args "src/**/*.kt", "spi/src/main/**/*.kt" } check.dependsOn ktlint @@ -110,7 +110,7 @@ task ktlintFormat(type: JavaExec, group: "formatting") { description = "Fix Kotlin code style deviations." main = "com.pinterest.ktlint.Main" classpath = configurations.ktlint - args "-F", "src/**/*.kt" + args "-F", "src/**/*.kt", "spi/src/main/**/*.kt" } detekt { @@ -145,6 +145,7 @@ dependencies { compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:${kotlin_version}" compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9' compile "org.jetbrains:annotations:13.0" + compile project(path: ":${rootProject.name}-spi", configuration: 'shadow') compile "org.opensearch:notification:${notification_version}" compile "org.opensearch:common-utils:${common_utils_version}" compile "com.github.seancfoley:ipaddress:5.3.3" diff --git a/settings.gradle b/settings.gradle index 4a4d049c1..be34bde5c 100644 --- a/settings.gradle +++ b/settings.gradle @@ -4,3 +4,6 @@ */ rootProject.name = 'opensearch-index-management' + +include "spi" +project(":spi").name = rootProject.name + "-spi" diff --git a/spi/build.gradle b/spi/build.gradle new file mode 100644 index 000000000..f85d9d0c2 --- /dev/null +++ b/spi/build.gradle @@ -0,0 +1,85 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +import org.opensearch.gradle.test.RestIntegTestTask + +plugins { + id 'com.github.johnrengelman.shadow' + id 'jacoco' +} + +apply plugin: 'opensearch.java' +apply plugin: 'opensearch.testclusters' +apply plugin: 'opensearch.java-rest-test' +apply plugin: 'kotlin' +apply plugin: 'org.jetbrains.kotlin.jvm' +apply plugin: 'org.jetbrains.kotlin.plugin.allopen' + +ext { + projectSubstitutions = [:] + licenseFile = rootProject.file('LICENSE.txt') + noticeFile = rootProject.file('NOTICE') +} + +jacoco { + toolVersion = '0.8.5' + reportsDir = file("$buildDir/JacocoReport") +} + +jacocoTestReport { + reports { + xml.enabled false + csv.enabled false + html.destination file("${buildDir}/jacoco/") + } +} +check.dependsOn jacocoTestReport + +repositories { + mavenLocal() + mavenCentral() + maven { url "https://plugins.gradle.org/m2/" } + maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" } +} + +configurations.all { + if (it.state != Configuration.State.UNRESOLVED) return + resolutionStrategy { + force "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}" + force "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}" + } +} + +dependencies { + compileOnly "org.opensearch:opensearch:${opensearch_version}" + compileOnly "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}" + compileOnly "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}" + compileOnly "org.jetbrains.kotlin:kotlin-stdlib-jdk8:${kotlin_version}" + compileOnly "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9" + compileOnly "org.opensearch:common-utils:${common_utils_version}" + + testImplementation "org.opensearch.test:framework:${opensearch_version}" + testImplementation "org.apache.logging.log4j:log4j-core:${versions.log4j}" +} + +test { + doFirst { + test.classpath -= project.files(project.tasks.named('shadowJar')) + test.classpath -= project.configurations.getByName(ShadowBasePlugin.CONFIGURATION_NAME) + test.classpath += project.extensions.getByType(SourceSetContainer).getByName(SourceSet.MAIN_SOURCE_SET_NAME).runtimeClasspath + } + systemProperty 'tests.security.manager', 'false' +} + +task integTest(type: RestIntegTestTask) { + description 'Run integ test with opensearch test framework' + group 'verification' + systemProperty 'tests.security.manager', 'false' + dependsOn test +} +check.dependsOn integTest + +testClusters.javaRestTest { + testDistribution = 'INTEG_TEST' +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/IndexManagementExtension.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/IndexManagementExtension.kt new file mode 100644 index 000000000..d16ce9d16 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/IndexManagementExtension.kt @@ -0,0 +1,47 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi + +import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser +import org.opensearch.indexmanagement.spi.indexstatemanagement.ClusterEventHandler +import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService + +/** + * SPI for IndexManagement + */ +interface IndexManagementExtension { + + /** + * List of action parsers that are supported by the extension, each of the action parser will parse the policy action into the defined action. + * The ActionParser provides the ability to parse the action + */ + fun getISMActionParsers(): List + + /** + * Not Required to override but if extension is introducing a new index type and special handling is needed to handle this type + * use this to provide the metadata service for the new index types + */ + fun getIndexMetadataService(): Map { + return mapOf() + } + + /** + * Not required to override but if extension wants to evaluate the cluster events before deciding whether to auto manage indices + * on index creation or should/not clean up managed indices when indices are deleted - add new handlers for the sepcific event type + */ + fun getClusterEventHandlers(): Map { + return mapOf() + } +} + +enum class ClusterEventType(val type: String) { + CREATE("create"), + DELETE("delete"); + + override fun toString(): String { + return type + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Action.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Action.kt new file mode 100644 index 000000000..6a95b1534 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Action.kt @@ -0,0 +1,70 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement + +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.ToXContentObject +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionTimeout +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext + +abstract class Action( + val type: String, + val actionIndex: Int +) : ToXContentObject, Writeable { + + var configTimeout: ActionTimeout? = null + var configRetry: ActionRetry? = null + + final override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + configTimeout?.toXContent(builder, params) + configRetry?.toXContent(builder, params) + // TODO: We should add "custom" object wrapper based on the params + populateAction(builder, params) + return builder.endObject() + } + + /** + * The implementer of Action can change this method to correctly serialize the internals of the action + * when stored internally or returned as response + */ + open fun populateAction(builder: XContentBuilder, params: ToXContent.Params) { + builder.startObject(type).endObject() + } + + final override fun writeTo(out: StreamOutput) { + out.writeString(type) + out.writeOptionalWriteable(configTimeout) + out.writeOptionalWriteable(configRetry) + populateAction(out) + } + + /** + * The implementer of Action can change this method to correctly serialize the internals of the action + * when data is shared between nodes + */ + open fun populateAction(out: StreamOutput) { + out.writeInt(actionIndex) + } + + /** + * Get all the steps associated with the action + */ + abstract fun getSteps(): List + + /** + * Get the current step to execute in the action + */ + abstract fun getStepToExecute(context: StepContext): Step + + final fun isLastStep(stepName: String): Boolean = getSteps().last().name == stepName + + final fun isFirstStep(stepName: String): Boolean = getSteps().first().name == stepName +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/ActionParser.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/ActionParser.kt new file mode 100644 index 000000000..292d7dcb6 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/ActionParser.kt @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.xcontent.XContentParser + +abstract class ActionParser(var customAction: Boolean = false) { + + /** + * The action type parser will parse + */ + abstract fun getActionType(): String + + /** + * Deserialize Action from stream input + */ + abstract fun fromStreamInput(sin: StreamInput): Action + + /** + * Deserialize Action from xContent + */ + abstract fun fromXContent(xcp: XContentParser, index: Int): Action +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/ClusterEventHandler.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/ClusterEventHandler.kt new file mode 100644 index 000000000..19b80f1b2 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/ClusterEventHandler.kt @@ -0,0 +1,16 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement + +import org.opensearch.client.Client +import org.opensearch.cluster.ClusterChangedEvent +import org.opensearch.cluster.service.ClusterService +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.Decision + +abstract class ClusterEventHandler { + + abstract fun processEvent(client: Client, clusterService: ClusterService, event: ClusterChangedEvent): Decision +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/IndexMetadataService.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/IndexMetadataService.kt new file mode 100644 index 000000000..a64c97f05 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/IndexMetadataService.kt @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement + +import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ISMIndexMetadata + +interface IndexMetadataService { + + /** + * Returns the index metadata needed for ISM + */ + fun getMetadata(indices: List, client: Client, clusterService: ClusterService): Map +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Step.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Step.kt new file mode 100644 index 000000000..c4494887e --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Step.kt @@ -0,0 +1,75 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement + +import org.apache.logging.log4j.Logger +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import java.time.Instant +import java.util.Locale + +abstract class Step(val name: String, val isSafeToDisableOn: Boolean = true) { + + var context: StepContext? = null + private set + + fun preExecute(logger: Logger, context: StepContext): Step { + logger.info("Executing $name for ${context.metadata.index}") + this.context = context + return this + } + + abstract suspend fun execute(): Step + + fun postExecute(logger: Logger): Step { + logger.info("Finished executing $name for ${context?.metadata?.index}") + this.context = null + return this + } + + abstract fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData + + abstract fun isIdempotent(): Boolean + + final fun getStepStartTime(metadata: ManagedIndexMetaData): Instant { + return when { + metadata.stepMetaData == null -> Instant.now() + metadata.stepMetaData.name != this.name -> Instant.now() + // The managed index metadata is a historical snapshot of the metadata and refers to what has happened from the previous + // execution, so if we ever see it as COMPLETED it means we are always going to be in a new step, this specifically + // helps with the Transition -> Transition (empty state) sequence which the above do not capture + metadata.stepMetaData.stepStatus == StepStatus.COMPLETED -> Instant.now() + else -> Instant.ofEpochMilli(metadata.stepMetaData.startTime) + } + } + + final fun getStartingStepMetaData(metadata: ManagedIndexMetaData): StepMetaData = StepMetaData(name, getStepStartTime(metadata).toEpochMilli(), StepStatus.STARTING) + + enum class StepStatus(val status: String) : Writeable { + STARTING("starting"), + CONDITION_NOT_MET("condition_not_met"), + FAILED("failed"), + COMPLETED("completed"); + + override fun toString(): String { + return status + } + + override fun writeTo(out: StreamOutput) { + out.writeString(status) + } + + companion object { + fun read(streamInput: StreamInput): StepStatus { + return valueOf(streamInput.readString().toUpperCase(Locale.ROOT)) + } + } + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Utils.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Utils.kt new file mode 100644 index 000000000..6854973c7 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Utils.kt @@ -0,0 +1,25 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement + +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.ToXContentFragment +import org.opensearch.common.xcontent.XContentBuilder + +// forIndex means saving to config index, distinguish from Explain and History, +// which only show meaningful partial metadata +@Suppress("ReturnCount") +fun XContentBuilder.addObject(name: String, metadata: ToXContentFragment?, params: ToXContent.Params, forIndex: Boolean = false): XContentBuilder { + if (metadata != null) return this.buildMetadata(name, metadata, params) + return if (forIndex) nullField(name) else this +} + +fun XContentBuilder.buildMetadata(name: String, metadata: ToXContentFragment, params: ToXContent.Params): XContentBuilder { + this.startObject(name) + metadata.toXContent(this, params) + this.endObject() + return this +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionMetaData.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionMetaData.kt new file mode 100644 index 000000000..9e82ba412 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionMetaData.kt @@ -0,0 +1,146 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.model + +import org.opensearch.common.Strings +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.ToXContentFragment +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.common.xcontent.XContentType +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData.Companion.NAME +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData.Companion.START_TIME +import java.io.ByteArrayInputStream +import java.nio.charset.StandardCharsets + +data class ActionMetaData( + val name: String, + val startTime: Long?, + val index: Int, + val failed: Boolean, + val consumedRetries: Int, + val lastRetryTime: Long?, + val actionProperties: ActionProperties? +) : Writeable, ToXContentFragment { + + override fun writeTo(out: StreamOutput) { + out.writeString(name) + out.writeOptionalLong(startTime) + out.writeInt(index) + out.writeBoolean(failed) + out.writeInt(consumedRetries) + out.writeOptionalLong(lastRetryTime) + + out.writeOptionalWriteable(actionProperties) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder + .field(NAME, name) + .field(START_TIME, startTime) + .field(INDEX, index) + .field(FAILED, failed) + .field(CONSUMED_RETRIES, consumedRetries) + .field(LAST_RETRY_TIME, lastRetryTime) + + if (actionProperties != null) { + builder.startObject(ActionProperties.ACTION_PROPERTIES) + actionProperties.toXContent(builder, params) + builder.endObject() + } + + return builder + } + + fun getMapValueString(): String { + return Strings.toString(this, false, false) + } + + companion object { + const val ACTION = "action" + const val INDEX = "index" + const val FAILED = "failed" + const val CONSUMED_RETRIES = "consumed_retries" + const val LAST_RETRY_TIME = "last_retry_time" + + fun fromStreamInput(si: StreamInput): ActionMetaData { + val name: String? = si.readString() + val startTime: Long? = si.readOptionalLong() + val index: Int? = si.readInt() + val failed: Boolean? = si.readBoolean() + val consumedRetries: Int? = si.readInt() + val lastRetryTime: Long? = si.readOptionalLong() + + val actionProperties: ActionProperties? = si.readOptionalWriteable { ActionProperties.fromStreamInput(it) } + + return ActionMetaData( + requireNotNull(name) { "$NAME is null" }, + startTime, + requireNotNull(index) { "$INDEX is null" }, + requireNotNull(failed) { "$FAILED is null" }, + requireNotNull(consumedRetries) { "$CONSUMED_RETRIES is null" }, + lastRetryTime, + actionProperties + ) + } + + fun fromManagedIndexMetaDataMap(map: Map): ActionMetaData? { + val actionJsonString = map[ACTION] + return if (actionJsonString != null) { + val inputStream = ByteArrayInputStream(actionJsonString.toByteArray(StandardCharsets.UTF_8)) + val parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, inputStream) + parser.nextToken() + parse(parser) + } else { + null + } + } + + @Suppress("ComplexMethod") + fun parse(xcp: XContentParser): ActionMetaData { + var name: String? = null + var startTime: Long? = null + var index: Int? = null + var failed: Boolean? = null + var consumedRetries: Int? = null + var lastRetryTime: Long? = null + var actionProperties: ActionProperties? = null + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + NAME -> name = xcp.text() + START_TIME -> startTime = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.longValue() + INDEX -> index = xcp.intValue() + FAILED -> failed = xcp.booleanValue() + CONSUMED_RETRIES -> consumedRetries = xcp.intValue() + LAST_RETRY_TIME -> lastRetryTime = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.longValue() + ActionProperties.ACTION_PROPERTIES -> + actionProperties = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else ActionProperties.parse(xcp) + } + } + + return ActionMetaData( + requireNotNull(name) { "$NAME is null" }, + startTime, + requireNotNull(index) { "$INDEX is null" }, + requireNotNull(failed) { "$FAILED is null" }, + requireNotNull(consumedRetries) { "$CONSUMED_RETRIES is null" }, + lastRetryTime, + actionProperties + ) + } + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionProperties.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionProperties.kt new file mode 100644 index 000000000..175dc447d --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionProperties.kt @@ -0,0 +1,82 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.model + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.ToXContentFragment +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils + +/** Properties that will persist across steps of a single Action. Will be stored in the [ActionMetaData]. */ +// TODO: Create namespaces to group properties together +data class ActionProperties( + val maxNumSegments: Int? = null, + val snapshotName: String? = null, + val rollupId: String? = null, + val hasRollupFailed: Boolean? = null +) : Writeable, ToXContentFragment { + + override fun writeTo(out: StreamOutput) { + out.writeOptionalInt(maxNumSegments) + out.writeOptionalString(snapshotName) + out.writeOptionalString(rollupId) + out.writeOptionalBoolean(hasRollupFailed) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + if (maxNumSegments != null) builder.field(Properties.MAX_NUM_SEGMENTS.key, maxNumSegments) + if (snapshotName != null) builder.field(Properties.SNAPSHOT_NAME.key, snapshotName) + if (rollupId != null) builder.field(Properties.ROLLUP_ID.key, rollupId) + if (hasRollupFailed != null) builder.field(Properties.HAS_ROLLUP_FAILED.key, hasRollupFailed) + return builder + } + + companion object { + const val ACTION_PROPERTIES = "action_properties" + + fun fromStreamInput(si: StreamInput): ActionProperties { + val maxNumSegments: Int? = si.readOptionalInt() + val snapshotName: String? = si.readOptionalString() + val rollupId: String? = si.readOptionalString() + val hasRollupFailed: Boolean? = si.readOptionalBoolean() + + return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed) + } + + fun parse(xcp: XContentParser): ActionProperties { + var maxNumSegments: Int? = null + var snapshotName: String? = null + var rollupId: String? = null + var hasRollupFailed: Boolean? = null + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + Properties.MAX_NUM_SEGMENTS.key -> maxNumSegments = xcp.intValue() + Properties.SNAPSHOT_NAME.key -> snapshotName = xcp.text() + Properties.ROLLUP_ID.key -> rollupId = xcp.text() + Properties.HAS_ROLLUP_FAILED.key -> hasRollupFailed = xcp.booleanValue() + } + } + + return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed) + } + } + + enum class Properties(val key: String) { + MAX_NUM_SEGMENTS("max_num_segments"), + SNAPSHOT_NAME("snapshot_name"), + ROLLUP_ID("rollup_id"), + HAS_ROLLUP_FAILED("has_rollup_failed") + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionRetry.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionRetry.kt new file mode 100644 index 000000000..5cbaf75ec --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionRetry.kt @@ -0,0 +1,133 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.model + +import org.apache.logging.log4j.LogManager +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.unit.TimeValue +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.ToXContentFragment +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils +import java.io.IOException +import java.time.Instant +import java.util.Locale +import kotlin.math.pow + +data class ActionRetry( + val count: Long, + val backoff: Backoff = Backoff.EXPONENTIAL, + val delay: TimeValue = TimeValue.timeValueMinutes(1) +) : ToXContentFragment, Writeable { + + init { require(count > 0) { "Count for ActionRetry must be greater than 0" } } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder + .startObject(RETRY_FIELD) + .field(COUNT_FIELD, count) + .field(BACKOFF_FIELD, backoff) + .field(DELAY_FIELD, delay.stringRep) + .endObject() + return builder + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + count = sin.readLong(), + backoff = sin.readEnum(Backoff::class.java), + delay = sin.readTimeValue() + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeLong(count) + out.writeEnum(backoff) + out.writeTimeValue(delay) + } + + companion object { + const val RETRY_FIELD = "retry" + const val COUNT_FIELD = "count" + const val BACKOFF_FIELD = "backoff" + const val DELAY_FIELD = "delay" + + @JvmStatic + @Throws(IOException::class) + fun parse(xcp: XContentParser): ActionRetry { + var count: Long? = null + var backoff: Backoff = Backoff.EXPONENTIAL + var delay: TimeValue = TimeValue.timeValueMinutes(1) + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + COUNT_FIELD -> count = xcp.longValue() + BACKOFF_FIELD -> backoff = Backoff.valueOf(xcp.text().toUpperCase(Locale.ROOT)) + DELAY_FIELD -> delay = TimeValue.parseTimeValue(xcp.text(), DELAY_FIELD) + } + } + + return ActionRetry( + count = requireNotNull(count) { "ActionRetry count is null" }, + backoff = backoff, + delay = delay + ) + } + } + + enum class Backoff(val type: String, val getNextRetryTime: (consumedRetries: Int, timeValue: TimeValue) -> Long) { + EXPONENTIAL( + "exponential", + { consumedRetries, timeValue -> + (2.0.pow(consumedRetries - 1)).toLong() * timeValue.millis + } + ), + CONSTANT( + "constant", + { _, timeValue -> + timeValue.millis + } + ), + LINEAR( + "linear", + { consumedRetries, timeValue -> + consumedRetries * timeValue.millis + } + ); + + private val logger = LogManager.getLogger(javaClass) + + override fun toString(): String { + return type + } + + @Suppress("ReturnCount") + fun shouldBackoff(actionMetaData: ActionMetaData?, actionRetry: ActionRetry?): Pair { + if (actionMetaData == null || actionRetry == null) { + logger.debug("There is no actionMetaData and ActionRetry we don't need to backoff") + return Pair(false, null) + } + + if (actionMetaData.consumedRetries > 0) { + if (actionMetaData.lastRetryTime != null) { + val remainingTime = getNextRetryTime(actionMetaData.consumedRetries, actionRetry.delay) - + (Instant.now().toEpochMilli() - actionMetaData.lastRetryTime) + + return Pair(remainingTime > 0, remainingTime) + } + } + + return Pair(false, null) + } + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionTimeout.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionTimeout.kt new file mode 100644 index 000000000..a21055402 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionTimeout.kt @@ -0,0 +1,47 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.model + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.unit.TimeValue +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.ToXContentFragment +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import java.io.IOException + +data class ActionTimeout(val timeout: TimeValue) : ToXContentFragment, Writeable { + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.field(TIMEOUT_FIELD, timeout.stringRep) + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + timeout = sin.readTimeValue() + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeTimeValue(timeout) + } + + companion object { + const val TIMEOUT_FIELD = "timeout" + + @JvmStatic + @Throws(IOException::class) + fun parse(xcp: XContentParser): ActionTimeout { + if (xcp.currentToken() == XContentParser.Token.VALUE_STRING) { + return ActionTimeout(TimeValue.parseTimeValue(xcp.text(), TIMEOUT_FIELD)) + } else { + throw IllegalArgumentException("Invalid token: [${xcp.currentToken()}] for ActionTimeout") + } + } + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/Decision.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/Decision.kt new file mode 100644 index 000000000..6fcaa98ad --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/Decision.kt @@ -0,0 +1,9 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.model + +// TODO: Probably need an override or priority to clear clashes if there are multiple decisions with conflicting index metadata +data class Decision(val shouldProcess: Boolean = true) diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ISMIndexMetadata.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ISMIndexMetadata.kt new file mode 100644 index 000000000..1e8bcfc24 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ISMIndexMetadata.kt @@ -0,0 +1,13 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.model + +// TODO need to have more information here +data class ISMIndexMetadata( + val indexUuid: String, + val indexCreationDate: Long, + val documentCount: Long, +) diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ManagedIndexMetaData.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ManagedIndexMetaData.kt new file mode 100644 index 000000000..f490ca359 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ManagedIndexMetaData.kt @@ -0,0 +1,332 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.model + +import org.opensearch.common.Strings +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.util.concurrent.ThreadContext +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.ToXContentFragment +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.common.xcontent.json.JsonXContent +import org.opensearch.commons.authuser.User +import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.indexmanagement.spi.indexstatemanagement.addObject +import java.io.IOException + +data class ManagedIndexMetaData( + val index: String, + val indexUuid: String, + val policyID: String, + val policySeqNo: Long?, + val policyPrimaryTerm: Long?, + val policyCompleted: Boolean?, + val rolledOver: Boolean?, + val transitionTo: String?, + val stateMetaData: StateMetaData?, + val actionMetaData: ActionMetaData?, + val stepMetaData: StepMetaData?, + val policyRetryInfo: PolicyRetryInfoMetaData?, + val info: Map?, + val id: String = NO_ID, + val seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO, + val primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + // TODO: Remove this once the step interface is updated to pass in user information. + // The user information is not being stored/written anywhere, this is only intended to be used during the step execution. + val user: User? = null, + // TODO: Remove this once the step interface is updated to pass in thread context information. + // This information is not being stored/written anywhere, this is only intended to be used during the step execution. + val threadContext: ThreadContext? = null +) : Writeable, ToXContentFragment { + + @Suppress("ComplexMethod") + fun toMap(): Map { + val resultMap = mutableMapOf () + resultMap[INDEX] = index + resultMap[INDEX_UUID] = indexUuid + resultMap[POLICY_ID] = policyID + if (policySeqNo != null) resultMap[POLICY_SEQ_NO] = policySeqNo.toString() + if (policyPrimaryTerm != null) resultMap[POLICY_PRIMARY_TERM] = policyPrimaryTerm.toString() + if (policyCompleted != null) resultMap[POLICY_COMPLETED] = policyCompleted.toString() + if (rolledOver != null) resultMap[ROLLED_OVER] = rolledOver.toString() + if (transitionTo != null) resultMap[TRANSITION_TO] = transitionTo + if (stateMetaData != null) resultMap[StateMetaData.STATE] = stateMetaData.getMapValueString() + if (actionMetaData != null) resultMap[ActionMetaData.ACTION] = actionMetaData.getMapValueString() + if (stepMetaData != null) resultMap[StepMetaData.STEP] = stepMetaData.getMapValueString() + if (policyRetryInfo != null) resultMap[PolicyRetryInfoMetaData.RETRY_INFO] = policyRetryInfo.getMapValueString() + if (info != null) resultMap[INFO] = Strings.toString(XContentFactory.jsonBuilder().map(info)) + + return resultMap + } + + fun toXContent(builder: XContentBuilder, params: ToXContent.Params, forIndex: Boolean): XContentBuilder { + // forIndex means saving to config index, distinguish from Explain and History, which only save meaningful partial metadata + if (!forIndex) return toXContent(builder, params) + + builder + .startObject() + .startObject(MANAGED_INDEX_METADATA_TYPE) + .field(INDEX, index) + .field(INDEX_UUID, indexUuid) + .field(POLICY_ID, policyID) + .field(POLICY_SEQ_NO, policySeqNo) + .field(POLICY_PRIMARY_TERM, policyPrimaryTerm) + .field(POLICY_COMPLETED, policyCompleted) + .field(ROLLED_OVER, rolledOver) + .field(TRANSITION_TO, transitionTo) + .addObject(StateMetaData.STATE, stateMetaData, params, true) + .addObject(ActionMetaData.ACTION, actionMetaData, params, true) + .addObject(StepMetaData.STEP, stepMetaData, params, true) + .addObject(PolicyRetryInfoMetaData.RETRY_INFO, policyRetryInfo, params, true) + .field(INFO, info) + .endObject() + .endObject() + return builder + } + + fun isFailed(): Boolean { + // If PolicyRetryInfo is failed then the ManagedIndex has failed. + if (this.policyRetryInfo?.failed == true) return true + // If ActionMetaData is not null and some action is failed. Then the ManagedIndex has failed. + if (this.actionMetaData?.failed == true) return true + return false + } + + @Suppress("ComplexMethod") + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + // The order we check values matters here as we are only trying to show what is needed for the customer + // and can return early on certain checks like policyCompleted + builder + .field(INDEX, index) + .field(INDEX_UUID, indexUuid) + .field(POLICY_ID, policyID) + if (policySeqNo != null) builder.field(POLICY_SEQ_NO, policySeqNo) + if (policyPrimaryTerm != null) builder.field(POLICY_PRIMARY_TERM, policyPrimaryTerm) + + // Only show rolled_over if we have rolled over or we are in the rollover action + // TODO: Fix this + /*if (rolledOver == true || (actionMetaData != null && actionMetaData.name == ActionConfig.ActionType.ROLLOVER.type)) { + builder.field(ROLLED_OVER, rolledOver) + }*/ + + if (policyCompleted == true) { + builder.field(POLICY_COMPLETED, policyCompleted) + return builder + } + + val transitionToExists = transitionTo != null + if (transitionToExists) { + builder.field(TRANSITION_TO, transitionTo) + } else { + builder.addObject(StateMetaData.STATE, stateMetaData, params) + .addObject(ActionMetaData.ACTION, actionMetaData, params) + .addObject(StepMetaData.STEP, stepMetaData, params) + } + builder.addObject(PolicyRetryInfoMetaData.RETRY_INFO, policyRetryInfo, params) + + if (info != null) builder.field(INFO, info) + + return builder + } + + override fun writeTo(streamOutput: StreamOutput) { + streamOutput.writeString(index) + streamOutput.writeString(indexUuid) + streamOutput.writeString(policyID) + streamOutput.writeOptionalLong(policySeqNo) + streamOutput.writeOptionalLong(policyPrimaryTerm) + streamOutput.writeOptionalBoolean(policyCompleted) + streamOutput.writeOptionalBoolean(rolledOver) + streamOutput.writeOptionalString(transitionTo) + + streamOutput.writeOptionalWriteable(stateMetaData) + streamOutput.writeOptionalWriteable(actionMetaData) + streamOutput.writeOptionalWriteable(stepMetaData) + streamOutput.writeOptionalWriteable(policyRetryInfo) + + if (info == null) { + streamOutput.writeBoolean(false) + } else { + streamOutput.writeBoolean(true) + streamOutput.writeMap(info) + } + } + + companion object { + const val NO_ID = "" + const val MANAGED_INDEX_METADATA_TYPE = "managed_index_metadata" + + const val NAME = "name" + const val START_TIME = "start_time" + + const val INDEX = "index" + const val INDEX_UUID = "index_uuid" + const val POLICY_ID = "policy_id" + const val POLICY_SEQ_NO = "policy_seq_no" + const val POLICY_PRIMARY_TERM = "policy_primary_term" + const val POLICY_COMPLETED = "policy_completed" + const val ROLLED_OVER = "rolled_over" + const val TRANSITION_TO = "transition_to" + const val INFO = "info" + + fun fromStreamInput(si: StreamInput): ManagedIndexMetaData { + val index: String? = si.readString() + val indexUuid: String? = si.readString() + val policyID: String? = si.readString() + val policySeqNo: Long? = si.readOptionalLong() + val policyPrimaryTerm: Long? = si.readOptionalLong() + val policyCompleted: Boolean? = si.readOptionalBoolean() + val rolledOver: Boolean? = si.readOptionalBoolean() + val transitionTo: String? = si.readOptionalString() + + val state: StateMetaData? = si.readOptionalWriteable { StateMetaData.fromStreamInput(it) } + val action: ActionMetaData? = si.readOptionalWriteable { ActionMetaData.fromStreamInput(it) } + val step: StepMetaData? = si.readOptionalWriteable { StepMetaData.fromStreamInput(it) } + val retryInfo: PolicyRetryInfoMetaData? = si.readOptionalWriteable { PolicyRetryInfoMetaData.fromStreamInput(it) } + + val info = if (si.readBoolean()) { + si.readMap() + } else { + null + } + + return ManagedIndexMetaData( + index = requireNotNull(index) { "$INDEX is null" }, + indexUuid = requireNotNull(indexUuid) { "$INDEX_UUID is null" }, + policyID = requireNotNull(policyID) { "$POLICY_ID is null" }, + policySeqNo = policySeqNo, + policyPrimaryTerm = policyPrimaryTerm, + policyCompleted = policyCompleted, + rolledOver = rolledOver, + transitionTo = transitionTo, + stateMetaData = state, + actionMetaData = action, + stepMetaData = step, + policyRetryInfo = retryInfo, + info = info + ) + } + + @Suppress("ComplexMethod", "LongMethod") + @JvmStatic + @JvmOverloads + @Throws(IOException::class) + fun parse( + xcp: XContentParser, + id: String = NO_ID, + seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO, + primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM + ): ManagedIndexMetaData { + var index: String? = null + var indexUuid: String? = null + var policyID: String? = null + var policySeqNo: Long? = null + var policyPrimaryTerm: Long? = null + var policyCompleted: Boolean? = null + var rolledOver: Boolean? = null + var transitionTo: String? = null + + var state: StateMetaData? = null + var action: ActionMetaData? = null + var step: StepMetaData? = null + var retryInfo: PolicyRetryInfoMetaData? = null + + var info: Map? = null + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + INDEX -> index = xcp.text() + INDEX_UUID -> indexUuid = xcp.text() + POLICY_ID -> policyID = xcp.text() + POLICY_SEQ_NO -> policySeqNo = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.longValue() + POLICY_PRIMARY_TERM -> policyPrimaryTerm = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.longValue() + POLICY_COMPLETED -> policyCompleted = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.booleanValue() + ROLLED_OVER -> rolledOver = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.booleanValue() + TRANSITION_TO -> transitionTo = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.text() + StateMetaData.STATE -> { + state = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else StateMetaData.parse(xcp) + } + ActionMetaData.ACTION -> { + action = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else ActionMetaData.parse(xcp) + } + StepMetaData.STEP -> { + step = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else StepMetaData.parse(xcp) + } + PolicyRetryInfoMetaData.RETRY_INFO -> { + retryInfo = PolicyRetryInfoMetaData.parse(xcp) + } + INFO -> info = xcp.map() + // below line will break when getting metadata for explain or history + // else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in ManagedIndexMetaData.") + } + } + + return ManagedIndexMetaData( + requireNotNull(index) { "$INDEX is null" }, + requireNotNull(indexUuid) { "$INDEX_UUID is null" }, + requireNotNull(policyID) { "$POLICY_ID is null" }, + policySeqNo, + policyPrimaryTerm, + policyCompleted, + rolledOver, + transitionTo, + state, + action, + step, + retryInfo, + info, + id, + seqNo, + primaryTerm + ) + } + + @JvmStatic + @JvmOverloads + @Throws(IOException::class) + fun parseWithType( + xcp: XContentParser, + id: String = NO_ID, + seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO, + primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM + ): ManagedIndexMetaData { + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, xcp.nextToken(), xcp) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + val managedIndexMetaData = parse(xcp, id, seqNo, primaryTerm) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp) + return managedIndexMetaData + } + + fun fromMap(map: Map): ManagedIndexMetaData { + return ManagedIndexMetaData( + index = requireNotNull(map[INDEX]) { "$INDEX is null" }, + indexUuid = requireNotNull(map[INDEX_UUID]) { "$INDEX_UUID is null" }, + policyID = requireNotNull(map[POLICY_ID]) { "$POLICY_ID is null" }, + policySeqNo = map[POLICY_SEQ_NO]?.toLong(), + policyPrimaryTerm = map[POLICY_PRIMARY_TERM]?.toLong(), + policyCompleted = map[POLICY_COMPLETED]?.toBoolean(), + rolledOver = map[ROLLED_OVER]?.toBoolean(), + transitionTo = map[TRANSITION_TO], + stateMetaData = StateMetaData.fromManagedIndexMetaDataMap(map), + actionMetaData = ActionMetaData.fromManagedIndexMetaDataMap(map), + stepMetaData = StepMetaData.fromManagedIndexMetaDataMap(map), + policyRetryInfo = PolicyRetryInfoMetaData.fromManagedIndexMetaDataMap(map), + info = map[INFO]?.let { XContentHelper.convertToMap(JsonXContent.jsonXContent, it, false) } + ) + } + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/PolicyRetryInfoMetaData.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/PolicyRetryInfoMetaData.kt new file mode 100644 index 000000000..79b5dbe89 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/PolicyRetryInfoMetaData.kt @@ -0,0 +1,89 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.model + +import org.opensearch.common.Strings +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.ToXContentFragment +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.common.xcontent.XContentType +import java.io.ByteArrayInputStream +import java.nio.charset.StandardCharsets + +data class PolicyRetryInfoMetaData( + val failed: Boolean, + val consumedRetries: Int +) : Writeable, ToXContentFragment { + + override fun writeTo(out: StreamOutput) { + out.writeBoolean(failed) + out.writeInt(consumedRetries) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder + .field(FAILED, failed) + .field(CONSUMED_RETRIES, consumedRetries) + } + + fun getMapValueString(): String = Strings.toString(this, false, false) + + companion object { + const val RETRY_INFO = "retry_info" + const val FAILED = "failed" + const val CONSUMED_RETRIES = "consumed_retries" + + fun fromStreamInput(si: StreamInput): PolicyRetryInfoMetaData { + val failed: Boolean? = si.readBoolean() + val consumedRetries: Int? = si.readInt() + + return PolicyRetryInfoMetaData( + requireNotNull(failed) { "$FAILED is null" }, + requireNotNull(consumedRetries) { "$CONSUMED_RETRIES is null" } + ) + } + + fun fromManagedIndexMetaDataMap(map: Map): PolicyRetryInfoMetaData? { + val stateJsonString = map[RETRY_INFO] + return if (stateJsonString != null) { + val inputStream = ByteArrayInputStream(stateJsonString.toByteArray(StandardCharsets.UTF_8)) + val parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, inputStream) + parser.nextToken() + parse(parser) + } else { + null + } + } + + fun parse(xcp: XContentParser): PolicyRetryInfoMetaData { + var failed: Boolean? = null + var consumedRetries: Int? = null + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + FAILED -> failed = xcp.booleanValue() + CONSUMED_RETRIES -> consumedRetries = xcp.intValue() + } + } + + return PolicyRetryInfoMetaData( + requireNotNull(failed) { "$FAILED is null" }, + requireNotNull(consumedRetries) { "$CONSUMED_RETRIES is null" } + ) + } + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/StateMetaData.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/StateMetaData.kt new file mode 100644 index 000000000..614f9e25a --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/StateMetaData.kt @@ -0,0 +1,89 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.model + +import org.opensearch.common.Strings +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.ToXContentFragment +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.common.xcontent.XContentType +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData.Companion.NAME +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData.Companion.START_TIME +import java.io.ByteArrayInputStream +import java.nio.charset.StandardCharsets + +data class StateMetaData( + val name: String, + val startTime: Long +) : Writeable, ToXContentFragment { + + override fun writeTo(out: StreamOutput) { + out.writeString(name) + out.writeLong(startTime) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder + .field(NAME, name) + .field(START_TIME, startTime) + } + + fun getMapValueString(): String = Strings.toString(this, false, false) + + companion object { + const val STATE = "state" + + fun fromStreamInput(si: StreamInput): StateMetaData { + val name: String? = si.readString() + val startTime: Long? = si.readLong() + + return StateMetaData( + requireNotNull(name) { "$NAME is null" }, + requireNotNull(startTime) { "$START_TIME is null" } + ) + } + + fun fromManagedIndexMetaDataMap(map: Map): StateMetaData? { + val stateJsonString = map[STATE] + return if (stateJsonString != null) { + val inputStream = ByteArrayInputStream(stateJsonString.toByteArray(StandardCharsets.UTF_8)) + val parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, inputStream) + parser.nextToken() + parse(parser) + } else { + null + } + } + + fun parse(xcp: XContentParser): StateMetaData { + var name: String? = null + var startTime: Long? = null + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + NAME -> name = xcp.text() + START_TIME -> startTime = xcp.longValue() + } + } + + return StateMetaData( + requireNotNull(name) { "$NAME is null" }, + requireNotNull(startTime) { "$START_TIME is null" } + ) + } + } +} diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/StepContext.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/StepContext.kt new file mode 100644 index 000000000..fb434a438 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/StepContext.kt @@ -0,0 +1,20 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.model + +import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.util.concurrent.ThreadContext +import org.opensearch.commons.authuser.User + +// TODO: Add more attributes here if needed +class StepContext( + val metadata: ManagedIndexMetaData, + val clusterService: ClusterService, + val client: Client, + val threadContext: ThreadContext?, + val user: User? +) diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/StepMetaData.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/StepMetaData.kt new file mode 100644 index 000000000..a83d9c1b5 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/StepMetaData.kt @@ -0,0 +1,104 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.model + +import org.opensearch.common.Strings +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.ToXContentFragment +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.common.xcontent.XContentType +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData.Companion.NAME +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData.Companion.START_TIME +import java.io.ByteArrayInputStream +import java.nio.charset.StandardCharsets +import java.util.Locale + +data class StepMetaData( + val name: String, + val startTime: Long, + val stepStatus: Step.StepStatus +) : Writeable, ToXContentFragment { + + override fun writeTo(out: StreamOutput) { + out.writeString(name) + out.writeLong(startTime) + stepStatus.writeTo(out) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder + .field(NAME, name) + .field(START_TIME, startTime) + .field(STEP_STATUS, stepStatus.toString()) + + return builder + } + + fun getMapValueString(): String { + return Strings.toString(this, false, false) + } + + companion object { + const val STEP = "step" + const val STEP_STATUS = "step_status" + + fun fromStreamInput(si: StreamInput): StepMetaData { + val name: String? = si.readString() + val startTime: Long? = si.readLong() + val stepStatus: Step.StepStatus? = Step.StepStatus.read(si) + + return StepMetaData( + requireNotNull(name) { "$NAME is null" }, + requireNotNull(startTime) { "$START_TIME is null" }, + requireNotNull(stepStatus) { "$STEP_STATUS is null" } + ) + } + + fun fromManagedIndexMetaDataMap(map: Map): StepMetaData? { + val stepJsonString = map[STEP] + return if (stepJsonString != null) { + val inputStream = ByteArrayInputStream(stepJsonString.toByteArray(StandardCharsets.UTF_8)) + val parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, inputStream) + parser.nextToken() + parse(parser) + } else { + null + } + } + + fun parse(xcp: XContentParser): StepMetaData { + var name: String? = null + var startTime: Long? = null + var stepStatus: Step.StepStatus? = null + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + NAME -> name = xcp.text() + START_TIME -> startTime = xcp.longValue() + STEP_STATUS -> stepStatus = Step.StepStatus.valueOf(xcp.text().toUpperCase(Locale.ROOT)) + } + } + + return StepMetaData( + requireNotNull(name) { "$NAME is null" }, + requireNotNull(startTime) { "$START_TIME is null" }, + requireNotNull(stepStatus) { "$STEP_STATUS is null" } + ) + } + } +}