Skip to content

Commit

Permalink
Adding IM SPI
Browse files Browse the repository at this point in the history
Signed-off-by: Ravi Thaluru <[email protected]>
  • Loading branch information
thalurur committed Dec 7, 2021
1 parent d718d7c commit c82793f
Show file tree
Hide file tree
Showing 21 changed files with 1,433 additions and 2 deletions.
5 changes: 3 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ task ktlint(type: JavaExec, group: "verification") {
description = "Check Kotlin code style."
main = "com.pinterest.ktlint.Main"
classpath = configurations.ktlint
args "src/**/*.kt"
args "src/**/*.kt", "spi/src/main/**/*.kt"
}

check.dependsOn ktlint
Expand All @@ -110,7 +110,7 @@ task ktlintFormat(type: JavaExec, group: "formatting") {
description = "Fix Kotlin code style deviations."
main = "com.pinterest.ktlint.Main"
classpath = configurations.ktlint
args "-F", "src/**/*.kt"
args "-F", "src/**/*.kt", "spi/src/main/**/*.kt"
}

detekt {
Expand Down Expand Up @@ -145,6 +145,7 @@ dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:${kotlin_version}"
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9'
compile "org.jetbrains:annotations:13.0"
compile project(path: ":${rootProject.name}-spi", configuration: 'shadow')
compile "org.opensearch:notification:${notification_version}"
compile "org.opensearch:common-utils:${common_utils_version}"
compile "com.github.seancfoley:ipaddress:5.3.3"
Expand Down
3 changes: 3 additions & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@
*/

rootProject.name = 'opensearch-index-management'

include "spi"
project(":spi").name = rootProject.name + "-spi"
85 changes: 85 additions & 0 deletions spi/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
import org.opensearch.gradle.test.RestIntegTestTask

plugins {
id 'com.github.johnrengelman.shadow'
id 'jacoco'
}

apply plugin: 'opensearch.java'
apply plugin: 'opensearch.testclusters'
apply plugin: 'opensearch.java-rest-test'
apply plugin: 'kotlin'
apply plugin: 'org.jetbrains.kotlin.jvm'
apply plugin: 'org.jetbrains.kotlin.plugin.allopen'

ext {
projectSubstitutions = [:]
licenseFile = rootProject.file('LICENSE.txt')
noticeFile = rootProject.file('NOTICE')
}

jacoco {
toolVersion = '0.8.5'
reportsDir = file("$buildDir/JacocoReport")
}

jacocoTestReport {
reports {
xml.enabled false
csv.enabled false
html.destination file("${buildDir}/jacoco/")
}
}
check.dependsOn jacocoTestReport

repositories {
mavenLocal()
mavenCentral()
maven { url "https://plugins.gradle.org/m2/" }
maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" }
}

configurations.all {
if (it.state != Configuration.State.UNRESOLVED) return
resolutionStrategy {
force "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
force "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
}
}

dependencies {
compileOnly "org.opensearch:opensearch:${opensearch_version}"
compileOnly "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
compileOnly "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
compileOnly "org.jetbrains.kotlin:kotlin-stdlib-jdk8:${kotlin_version}"
compileOnly "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9"
compileOnly "org.opensearch:common-utils:${common_utils_version}"

testImplementation "org.opensearch.test:framework:${opensearch_version}"
testImplementation "org.apache.logging.log4j:log4j-core:${versions.log4j}"
}

test {
doFirst {
test.classpath -= project.files(project.tasks.named('shadowJar'))
test.classpath -= project.configurations.getByName(ShadowBasePlugin.CONFIGURATION_NAME)
test.classpath += project.extensions.getByType(SourceSetContainer).getByName(SourceSet.MAIN_SOURCE_SET_NAME).runtimeClasspath
}
systemProperty 'tests.security.manager', 'false'
}

task integTest(type: RestIntegTestTask) {
description 'Run integ test with opensearch test framework'
group 'verification'
systemProperty 'tests.security.manager', 'false'
dependsOn test
}
check.dependsOn integTest

testClusters.javaRestTest {
testDistribution = 'INTEG_TEST'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi

import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
import org.opensearch.indexmanagement.spi.indexstatemanagement.ClusterEventHandler
import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService

/**
* SPI for IndexManagement
*/
interface IndexManagementExtension {

/**
* List of action parsers that are supported by the extension, each of the action parser will parse the policy action into the defined action.
* The ActionParser provides the ability to parse the action
*/
fun getISMActionParsers(): List<ActionParser>

/**
* Not Required to override but if extension is introducing a new index type and special handling is needed to handle this type
* use this to provide the metadata service for the new index types
*/
fun getIndexMetadataService(): Map<String, IndexMetadataService> {
return mapOf()
}

/**
* Not required to override but if extension wants to evaluate the cluster events before deciding whether to auto manage indices
* on index creation or should/not clean up managed indices when indices are deleted - add new handlers for the sepcific event type
*/
fun getClusterEventHandlers(): Map<ClusterEventType, ClusterEventHandler> {
return mapOf()
}
}

enum class ClusterEventType(val type: String) {
CREATE("create"),
DELETE("delete");

override fun toString(): String {
return type
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement

import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionTimeout
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext

abstract class Action(
val type: String,
val actionIndex: Int
) : ToXContentObject, Writeable {

var configTimeout: ActionTimeout? = null
var configRetry: ActionRetry? = null

final override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
configTimeout?.toXContent(builder, params)
configRetry?.toXContent(builder, params)
// TODO: We should add "custom" object wrapper based on the params
populateAction(builder, params)
return builder.endObject()
}

/**
* The implementer of Action can change this method to correctly serialize the internals of the action
* when stored internally or returned as response
*/
open fun populateAction(builder: XContentBuilder, params: ToXContent.Params) {
builder.startObject(type).endObject()
}

final override fun writeTo(out: StreamOutput) {
out.writeString(type)
out.writeOptionalWriteable(configTimeout)
out.writeOptionalWriteable(configRetry)
populateAction(out)
}

/**
* The implementer of Action can change this method to correctly serialize the internals of the action
* when data is shared between nodes
*/
open fun populateAction(out: StreamOutput) {
out.writeInt(actionIndex)
}

/**
* Get all the steps associated with the action
*/
abstract fun getSteps(): List<Step>

/**
* Get the current step to execute in the action
*/
abstract fun getStepToExecute(context: StepContext): Step

final fun isLastStep(stepName: String): Boolean = getSteps().last().name == stepName

final fun isFirstStep(stepName: String): Boolean = getSteps().first().name == stepName
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.xcontent.XContentParser

abstract class ActionParser(var customAction: Boolean = false) {

/**
* The action type parser will parse
*/
abstract fun getActionType(): String

/**
* Deserialize Action from stream input
*/
abstract fun fromStreamInput(sin: StreamInput): Action

/**
* Deserialize Action from xContent
*/
abstract fun fromXContent(xcp: XContentParser, index: Int): Action
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement

import org.opensearch.client.Client
import org.opensearch.cluster.ClusterChangedEvent
import org.opensearch.cluster.service.ClusterService
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.Decision

abstract class ClusterEventHandler {

abstract fun processEvent(client: Client, clusterService: ClusterService, event: ClusterChangedEvent): Decision
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement

import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ISMIndexMetadata

interface IndexMetadataService {

/**
* Returns the index metadata needed for ISM
*/
fun getMetadata(indices: List<String>, client: Client, clusterService: ClusterService): Map<String, ISMIndexMetadata>
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement

import org.apache.logging.log4j.Logger
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import java.time.Instant
import java.util.Locale

abstract class Step(val name: String, val isSafeToDisableOn: Boolean = true) {

var context: StepContext? = null
private set

fun preExecute(logger: Logger, context: StepContext): Step {
logger.info("Executing $name for ${context.metadata.index}")
this.context = context
return this
}

abstract suspend fun execute(): Step

fun postExecute(logger: Logger): Step {
logger.info("Finished executing $name for ${context?.metadata?.index}")
this.context = null
return this
}

abstract fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData

abstract fun isIdempotent(): Boolean

final fun getStepStartTime(metadata: ManagedIndexMetaData): Instant {
return when {
metadata.stepMetaData == null -> Instant.now()
metadata.stepMetaData.name != this.name -> Instant.now()
// The managed index metadata is a historical snapshot of the metadata and refers to what has happened from the previous
// execution, so if we ever see it as COMPLETED it means we are always going to be in a new step, this specifically
// helps with the Transition -> Transition (empty state) sequence which the above do not capture
metadata.stepMetaData.stepStatus == StepStatus.COMPLETED -> Instant.now()
else -> Instant.ofEpochMilli(metadata.stepMetaData.startTime)
}
}

final fun getStartingStepMetaData(metadata: ManagedIndexMetaData): StepMetaData = StepMetaData(name, getStepStartTime(metadata).toEpochMilli(), StepStatus.STARTING)

enum class StepStatus(val status: String) : Writeable {
STARTING("starting"),
CONDITION_NOT_MET("condition_not_met"),
FAILED("failed"),
COMPLETED("completed");

override fun toString(): String {
return status
}

override fun writeTo(out: StreamOutput) {
out.writeString(status)
}

companion object {
fun read(streamInput: StreamInput): StepStatus {
return valueOf(streamInput.readString().toUpperCase(Locale.ROOT))
}
}
}
}
Loading

0 comments on commit c82793f

Please sign in to comment.