Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding SPI to support extensions of IndexManagement plugin #216

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ task ktlint(type: JavaExec, group: "verification") {
description = "Check Kotlin code style."
main = "com.pinterest.ktlint.Main"
classpath = configurations.ktlint
args "src/**/*.kt"
args "src/**/*.kt", "spi/src/main/**/*.kt"
}

check.dependsOn ktlint
Expand All @@ -110,7 +110,7 @@ task ktlintFormat(type: JavaExec, group: "formatting") {
description = "Fix Kotlin code style deviations."
main = "com.pinterest.ktlint.Main"
classpath = configurations.ktlint
args "-F", "src/**/*.kt"
args "-F", "src/**/*.kt", "spi/src/main/**/*.kt"
}

detekt {
Expand Down Expand Up @@ -145,6 +145,7 @@ dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:${kotlin_version}"
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9'
compile "org.jetbrains:annotations:13.0"
compile project(path: ":${rootProject.name}-spi", configuration: 'shadow')
compile "org.opensearch:notification:${notification_version}"
compile "org.opensearch:common-utils:${common_utils_version}"
compile "com.github.seancfoley:ipaddress:5.3.3"
Expand Down
3 changes: 3 additions & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@
*/

rootProject.name = 'opensearch-index-management'

include "spi"
project(":spi").name = rootProject.name + "-spi"
85 changes: 85 additions & 0 deletions spi/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
import org.opensearch.gradle.test.RestIntegTestTask

plugins {
id 'com.github.johnrengelman.shadow'
id 'jacoco'
}

apply plugin: 'opensearch.java'
apply plugin: 'opensearch.testclusters'
apply plugin: 'opensearch.java-rest-test'
apply plugin: 'kotlin'
apply plugin: 'org.jetbrains.kotlin.jvm'
apply plugin: 'org.jetbrains.kotlin.plugin.allopen'

ext {
projectSubstitutions = [:]
licenseFile = rootProject.file('LICENSE.txt')
noticeFile = rootProject.file('NOTICE')
}

jacoco {
toolVersion = '0.8.5'
reportsDir = file("$buildDir/JacocoReport")
}

jacocoTestReport {
reports {
xml.enabled false
csv.enabled false
html.destination file("${buildDir}/jacoco/")
}
}
check.dependsOn jacocoTestReport

repositories {
mavenLocal()
mavenCentral()
maven { url "https://plugins.gradle.org/m2/" }
maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" }
}

configurations.all {
if (it.state != Configuration.State.UNRESOLVED) return
resolutionStrategy {
force "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
force "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
}
}

dependencies {
compileOnly "org.opensearch:opensearch:${opensearch_version}"
compileOnly "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
compileOnly "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
compileOnly "org.jetbrains.kotlin:kotlin-stdlib-jdk8:${kotlin_version}"
compileOnly "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9"
compileOnly "org.opensearch:common-utils:${common_utils_version}"

testImplementation "org.opensearch.test:framework:${opensearch_version}"
testImplementation "org.apache.logging.log4j:log4j-core:${versions.log4j}"
}

test {
doFirst {
test.classpath -= project.files(project.tasks.named('shadowJar'))
test.classpath -= project.configurations.getByName(ShadowBasePlugin.CONFIGURATION_NAME)
test.classpath += project.extensions.getByType(SourceSetContainer).getByName(SourceSet.MAIN_SOURCE_SET_NAME).runtimeClasspath
}
systemProperty 'tests.security.manager', 'false'
}

task integTest(type: RestIntegTestTask) {
description 'Run integ test with opensearch test framework'
group 'verification'
systemProperty 'tests.security.manager', 'false'
dependsOn test
}
check.dependsOn integTest

testClusters.javaRestTest {
testDistribution = 'INTEG_TEST'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi

import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
import org.opensearch.indexmanagement.spi.indexstatemanagement.ClusterEventHandler
import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService

/**
* SPI for IndexManagement
*/
interface IndexManagementExtension {

/**
* List of action parsers that are supported by the extension, each of the action parser will parse the policy action into the defined action.
* The ActionParser provides the ability to parse the action
*/
fun getISMActionParsers(): List<ActionParser>

/**
* Not Required to override but if extension is introducing a new index type and special handling is needed to handle this type
* use this to provide the metadata service for the new index types
*/
fun getIndexMetadataService(): Map<String, IndexMetadataService> {
return mapOf()
}

/**
* Not required to override but if extension wants to evaluate the cluster events before deciding whether to auto manage indices
* on index creation or should/not clean up managed indices when indices are deleted - add new handlers for the sepcific event type
*/
fun getClusterEventHandlers(): Map<ClusterEventType, ClusterEventHandler> {
return mapOf()
}
}

enum class ClusterEventType(val type: String) {
CREATE("create"),
DELETE("delete");

override fun toString(): String {
return type
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement

import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionTimeout
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext

abstract class Action(
val type: String,
val actionIndex: Int
) : ToXContentObject, Writeable {

var configTimeout: ActionTimeout? = null
var configRetry: ActionRetry? = null

final override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
configTimeout?.toXContent(builder, params)
configRetry?.toXContent(builder, params)
// TODO: We should add "custom" object wrapper based on the params
populateAction(builder, params)
return builder.endObject()
}

/**
* The implementer of Action can change this method to correctly serialize the internals of the action
* when stored internally or returned as response
*/
open fun populateAction(builder: XContentBuilder, params: ToXContent.Params) {
builder.startObject(type).endObject()
}

final override fun writeTo(out: StreamOutput) {
out.writeString(type)
out.writeOptionalWriteable(configTimeout)
out.writeOptionalWriteable(configRetry)
populateAction(out)
}

/**
* The implementer of Action can change this method to correctly serialize the internals of the action
* when data is shared between nodes
*/
open fun populateAction(out: StreamOutput) {
out.writeInt(actionIndex)
}

/**
* Get all the steps associated with the action
*/
abstract fun getSteps(): List<Step>

/**
* Get the current step to execute in the action
*/
abstract fun getStepToExecute(context: StepContext): Step

final fun isLastStep(stepName: String): Boolean = getSteps().last().name == stepName

final fun isFirstStep(stepName: String): Boolean = getSteps().first().name == stepName
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.xcontent.XContentParser

abstract class ActionParser(var customAction: Boolean = false) {

/**
* The action type parser will parse
*/
abstract fun getActionType(): String

/**
* Deserialize Action from stream input
*/
abstract fun fromStreamInput(sin: StreamInput): Action

/**
* Deserialize Action from xContent
*/
abstract fun fromXContent(xcp: XContentParser, index: Int): Action
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement

import org.opensearch.client.Client
import org.opensearch.cluster.ClusterChangedEvent
import org.opensearch.cluster.service.ClusterService
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.Decision

abstract class ClusterEventHandler {

abstract fun processEvent(client: Client, clusterService: ClusterService, event: ClusterChangedEvent): Decision
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement

import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ISMIndexMetadata

interface IndexMetadataService {

/**
* Returns the index metadata needed for ISM
*/
fun getMetadata(indices: List<String>, client: Client, clusterService: ClusterService): Map<String, ISMIndexMetadata>
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement

import org.apache.logging.log4j.Logger
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import java.time.Instant
import java.util.Locale

abstract class Step(val name: String, val isSafeToDisableOn: Boolean = true) {

var context: StepContext? = null
private set

fun preExecute(logger: Logger, context: StepContext): Step {
logger.info("Executing $name for ${context.metadata.index}")
this.context = context
return this
}

abstract suspend fun execute(): Step

fun postExecute(logger: Logger): Step {
logger.info("Finished executing $name for ${context?.metadata?.index}")
this.context = null
return this
}

abstract fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData

abstract fun isIdempotent(): Boolean

final fun getStepStartTime(metadata: ManagedIndexMetaData): Instant {
return when {
metadata.stepMetaData == null -> Instant.now()
metadata.stepMetaData.name != this.name -> Instant.now()
// The managed index metadata is a historical snapshot of the metadata and refers to what has happened from the previous
// execution, so if we ever see it as COMPLETED it means we are always going to be in a new step, this specifically
// helps with the Transition -> Transition (empty state) sequence which the above do not capture
metadata.stepMetaData.stepStatus == StepStatus.COMPLETED -> Instant.now()
else -> Instant.ofEpochMilli(metadata.stepMetaData.startTime)
}
}

final fun getStartingStepMetaData(metadata: ManagedIndexMetaData): StepMetaData = StepMetaData(name, getStepStartTime(metadata).toEpochMilli(), StepStatus.STARTING)

enum class StepStatus(val status: String) : Writeable {
STARTING("starting"),
CONDITION_NOT_MET("condition_not_met"),
FAILED("failed"),
COMPLETED("completed");

override fun toString(): String {
return status
}

override fun writeTo(out: StreamOutput) {
out.writeString(status)
}

companion object {
fun read(streamInput: StreamInput): StepStatus {
return valueOf(streamInput.readString().toUpperCase(Locale.ROOT))
}
}
}
}
Loading