Skip to content

Commit

Permalink
Refactor IndexManagement to support custom actions (#288)
Browse files Browse the repository at this point in the history
* Adding IM SPI (#216)

Signed-off-by: Robert Downs <[email protected]>

Co-authored-by: Ravi <[email protected]>
  • Loading branch information
downsrob and thalurur authored Mar 9, 2022
1 parent 20ff74e commit 2d3fd97
Show file tree
Hide file tree
Showing 182 changed files with 4,494 additions and 3,493 deletions.
5 changes: 3 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ task ktlint(type: JavaExec, group: "verification") {
description = "Check Kotlin code style."
main = "com.pinterest.ktlint.Main"
classpath = configurations.ktlint
args "src/**/*.kt"
args "src/**/*.kt", "spi/src/main/**/*.kt"
}

check.dependsOn ktlint
Expand All @@ -113,7 +113,7 @@ task ktlintFormat(type: JavaExec, group: "formatting") {
description = "Fix Kotlin code style deviations."
main = "com.pinterest.ktlint.Main"
classpath = configurations.ktlint
args "-F", "src/**/*.kt"
args "-F", "src/**/*.kt", "spi/src/main/**/*.kt"
}

detekt {
Expand Down Expand Up @@ -148,6 +148,7 @@ dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:${kotlin_version}"
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9'
compile "org.jetbrains:annotations:13.0"
compile project(path: ":${rootProject.name}-spi", configuration: 'shadow')
compile "org.opensearch:notification:${notification_version}"
compile "org.opensearch:common-utils:${common_utils_version}"
compile "com.github.seancfoley:ipaddress:5.3.3"
Expand Down
3 changes: 3 additions & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@
*/

rootProject.name = 'opensearch-index-management'

include "spi"
project(":spi").name = rootProject.name + "-spi"
85 changes: 85 additions & 0 deletions spi/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
import org.opensearch.gradle.test.RestIntegTestTask

plugins {
id 'com.github.johnrengelman.shadow'
id 'jacoco'
}

apply plugin: 'opensearch.java'
apply plugin: 'opensearch.testclusters'
apply plugin: 'opensearch.java-rest-test'
apply plugin: 'kotlin'
apply plugin: 'org.jetbrains.kotlin.jvm'
apply plugin: 'org.jetbrains.kotlin.plugin.allopen'

ext {
projectSubstitutions = [:]
licenseFile = rootProject.file('LICENSE.txt')
noticeFile = rootProject.file('NOTICE')
}

jacoco {
toolVersion = '0.8.5'
reportsDir = file("$buildDir/JacocoReport")
}

jacocoTestReport {
reports {
xml.enabled false
csv.enabled false
html.destination file("${buildDir}/jacoco/")
}
}
check.dependsOn jacocoTestReport

repositories {
mavenLocal()
mavenCentral()
maven { url "https://plugins.gradle.org/m2/" }
maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" }
}

configurations.all {
if (it.state != Configuration.State.UNRESOLVED) return
resolutionStrategy {
force "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
force "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
}
}

dependencies {
compileOnly "org.opensearch:opensearch:${opensearch_version}"
compileOnly "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
compileOnly "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
compileOnly "org.jetbrains.kotlin:kotlin-stdlib-jdk8:${kotlin_version}"
compileOnly "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9"
compileOnly "org.opensearch:common-utils:${common_utils_version}"

testImplementation "org.opensearch.test:framework:${opensearch_version}"
testImplementation "org.apache.logging.log4j:log4j-core:${versions.log4j}"
}

test {
doFirst {
test.classpath -= project.files(project.tasks.named('shadowJar'))
test.classpath -= project.configurations.getByName(ShadowBasePlugin.CONFIGURATION_NAME)
test.classpath += project.extensions.getByType(SourceSetContainer).getByName(SourceSet.MAIN_SOURCE_SET_NAME).runtimeClasspath
}
systemProperty 'tests.security.manager', 'false'
}

task integTest(type: RestIntegTestTask) {
description 'Run integ test with opensearch test framework'
group 'verification'
systemProperty 'tests.security.manager', 'false'
dependsOn test
}
check.dependsOn integTest

testClusters.javaRestTest {
testDistribution = 'INTEG_TEST'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi

import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
import org.opensearch.indexmanagement.spi.indexstatemanagement.DefaultStatusChecker
import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService
import org.opensearch.indexmanagement.spi.indexstatemanagement.StatusChecker

/**
* SPI for IndexManagement
*/
interface IndexManagementExtension {

/**
* List of action parsers that are supported by the extension, each of the action parser will parse the policy action into the defined action.
* The ActionParser provides the ability to parse the action
*/
fun getISMActionParsers(): List<ActionParser>

/**
* Status checker is used by IndexManagement to check the status of the extension before executing the actions registered by the extension.
* Actions registered by the plugin can only be executed if in enabled, otherwise the action fails without retries. The status returned
* should represent if the extension is enabled or disabled, and should not represent extension health or the availability of some extension
* dependency.
*/
fun statusChecker(): StatusChecker {
return DefaultStatusChecker()
}

/**
* Name of the extension
*/
fun getExtensionName(): String

/**
* Not Required to override but if extension moves the index metadata outside of cluster state and requires IndexManagement to manage these
* indices provide the metadata service that can provide the index metadata for these indices. An extension need to label the metadata service
* with a type string which is used to distinguish indices in IndexManagement plugin
*/
fun getIndexMetadataService(): Map<String, IndexMetadataService> {
return mapOf()
}

/**
* Caution: Experimental and can be removed in future
*
* If extension wants IndexManagement to determine cluster state indices UUID based on custom index setting if
* present of cluster state override this method.
*/
fun overrideClusterStateIndexUuidSetting(): String? {
return null
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement

import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionTimeout
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import java.time.Instant

abstract class Action(
val type: String,
val actionIndex: Int
) : ToXContentObject, Writeable {

var configTimeout: ActionTimeout? = null
var configRetry: ActionRetry? = ActionRetry(DEFAULT_RETRIES)
var customAction: Boolean = false

final override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
configTimeout?.toXContent(builder, params)
configRetry?.toXContent(builder, params)
// Include a "custom" object wrapper for custom actions to allow extensions to put arbitrary action configs in the config
// index. The EXCLUDE_CUSTOM_FIELD_PARAM is used to not include this wrapper in api responses
if (customAction && !params.paramAsBoolean(EXCLUDE_CUSTOM_FIELD_PARAM, false)) builder.startObject(CUSTOM_ACTION_FIELD)
populateAction(builder, params)
if (customAction && !params.paramAsBoolean(EXCLUDE_CUSTOM_FIELD_PARAM, false)) builder.endObject()
return builder.endObject()
}

/**
* The implementer of Action can change this method to correctly serialize the internals of the action
* when stored internally or returned as response
*/
open fun populateAction(builder: XContentBuilder, params: ToXContent.Params) {
builder.startObject(type).endObject()
}

final override fun writeTo(out: StreamOutput) {
out.writeString(type)
out.writeOptionalWriteable(configTimeout)
out.writeOptionalWriteable(configRetry)
populateAction(out)
}

fun getUpdatedActionMetadata(managedIndexMetaData: ManagedIndexMetaData, stateName: String): ActionMetaData {
val stateMetaData = managedIndexMetaData.stateMetaData
val actionMetaData = managedIndexMetaData.actionMetaData

return when {
// start a new action
stateMetaData?.name != stateName ->
ActionMetaData(this.type, Instant.now().toEpochMilli(), this.actionIndex, false, 0, 0, null)
actionMetaData?.index != this.actionIndex ->
ActionMetaData(this.type, Instant.now().toEpochMilli(), this.actionIndex, false, 0, 0, null)
// RetryAPI will reset startTime to null for actionMetaData and we'll reset it to "now" here
else -> actionMetaData.copy(startTime = actionMetaData.startTime ?: Instant.now().toEpochMilli())
}
}

/**
* The implementer of Action can change this method to correctly serialize the internals of the action
* when data is shared between nodes
*/
open fun populateAction(out: StreamOutput) {
out.writeInt(actionIndex)
}

/**
* Get all the steps associated with the action
*/
abstract fun getSteps(): List<Step>

/**
* Get the current step to execute in the action
*/
abstract fun getStepToExecute(context: StepContext): Step

final fun isLastStep(stepName: String): Boolean = getSteps().last().name == stepName

final fun isFirstStep(stepName: String): Boolean = getSteps().first().name == stepName

/*
* Gets if the managedIndexMetaData reflects a state in which this action has completed successfully. Used in the
* runner when determining if the index metadata should be deleted. If the action isFinishedSuccessfully and
* deleteIndexMetadataAfterFinish is set to true, then we issue a request to delete the managedIndexConfig and its
* managedIndexMetadata.
*/
final fun isFinishedSuccessfully(managedIndexMetaData: ManagedIndexMetaData): Boolean {
val policyRetryInfo = managedIndexMetaData.policyRetryInfo
if (policyRetryInfo?.failed == true) return false
val actionMetaData = managedIndexMetaData.actionMetaData
if (actionMetaData == null || actionMetaData.failed || actionMetaData.name != this.type) return false
val stepMetaData = managedIndexMetaData.stepMetaData
if (stepMetaData == null || !isLastStep(stepMetaData.name) || stepMetaData.stepStatus != Step.StepStatus.COMPLETED) return false
return true
}

/*
* Denotes if the index metadata in the config index should be deleted for the index this action has just
* successfully finished running on. This may be used by custom actions which delete some off-cluster index,
* and following the action's success, the managed index config and metadata need to be deleted.
*/
open fun deleteIndexMetadataAfterFinish(): Boolean = false

companion object {
const val DEFAULT_RETRIES = 3L
const val CUSTOM_ACTION_FIELD = "custom"
const val EXCLUDE_CUSTOM_FIELD_PARAM = "exclude_custom"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.xcontent.XContentParser

abstract class ActionParser(var customAction: Boolean = false) {

/**
* The action type parser will parse
*/
abstract fun getActionType(): String

/**
* Deserialize Action from stream input
*/
abstract fun fromStreamInput(sin: StreamInput): Action

/**
* Deserialize Action from xContent
*/
abstract fun fromXContent(xcp: XContentParser, index: Int): Action
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement

import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ISMIndexMetadata

/**
* ISM by default considers all the index metadata to be part of the cluster state,
* if that doesn't hold true and indices metadata is present in some other place and
* ISM still need to manage these indices the following interface provides a mechanism
* for ISM extensions to register the metadata service for the type so ISM can get the
* index metadata for these special type of indices.
*
* ISM Rest APIs allows support for type param which determines the type of index, if there
* is a registered metadata service for the type - ISM will use the service to get the metadata
* else uses the default i.e cluster state
*/
interface IndexMetadataService {

/**
* Returns the index metadata needed for ISM
*/
suspend fun getMetadata(indices: List<String>, client: Client, clusterService: ClusterService): Map<String, ISMIndexMetadata>

/**
* Returns all the indices metadata
*/
suspend fun getMetadataForAllIndices(client: Client, clusterService: ClusterService): Map<String, ISMIndexMetadata>

/**
* Returns an optional setting path which, when set to true in the index settings, overrides a cluster level metadata write block.
*/
fun getIndexMetadataWriteOverrideSetting(): String? = null
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement

import org.opensearch.cluster.ClusterState

interface StatusChecker {

/**
* checks and returns the status of the extension
*/
fun check(clusterState: ClusterState): Status {
return Status.ENABLED
}
}

enum class Status(private val value: String) {
ENABLED("enabled"),
DISABLED("disabled");

override fun toString(): String {
return value
}
}

class DefaultStatusChecker : StatusChecker
Loading

0 comments on commit 2d3fd97

Please sign in to comment.