Skip to content

Commit

Permalink
introducing custom uuid setting in SPI (#278)
Browse files Browse the repository at this point in the history
* introducing custom uuid setting in SPI

Signed-off-by: Ravi Thaluru <[email protected]>
  • Loading branch information
thalurur authored Mar 1, 2022
1 parent 2634d96 commit 40544b8
Show file tree
Hide file tree
Showing 20 changed files with 173 additions and 88 deletions.
6 changes: 0 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -308,14 +308,8 @@ integTest {
}

// TODO: Remove me after refactoring all actions
exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/ActionRetryIT.class'
exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/ActionTimeoutIT.class'
exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/IndexStateManagementHistoryIT.class'
exclude 'org/opensearch/indexmanagement/indexstatemanagement/runner/*'
exclude 'org/opensearch/indexmanagement/indexstatemanagement/resthandler/*'
exclude 'org/opensearch/indexmanagement/indexstatemanagement/MetadataRegressionIT.class'
exclude 'org/opensearch/indexmanagement/indexstatemanagement/coordinator/*'
exclude 'org/opensearch/indexmanagement/IndexManagementIndicesIT.class'
}

String bwcVersion = "1.13.2.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
package org.opensearch.indexmanagement.spi

import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
import org.opensearch.indexmanagement.spi.indexstatemanagement.ClusterEventHandler
import org.opensearch.indexmanagement.spi.indexstatemanagement.DefaultStatusChecker
import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService
import org.opensearch.indexmanagement.spi.indexstatemanagement.StatusChecker

/**
* SPI for IndexManagement
Expand All @@ -21,27 +22,36 @@ interface IndexManagementExtension {
fun getISMActionParsers(): List<ActionParser>

/**
* Not Required to override but if extension is introducing a new index type and special handling is needed to handle this type
* use this to provide the metadata service for the new index types
* Status checker is used by IndexManagement to check the status of the extension before executing the actions registered by the extension.
* Actions registered by the plugin can only be executed if in enabled, otherwise the action fails without retries. The status returned
* should represent if the extension is enabled or disabled, and should not represent extension health or the availability of some extension
* dependency.
*/
fun getIndexMetadataService(): Map<String, IndexMetadataService> {
return mapOf()
fun statusChecker(): StatusChecker {
return DefaultStatusChecker()
}

/**
* Not required to override but if extension wants to evaluate the cluster events before deciding whether to auto manage indices
* on index creation or should/not clean up managed indices when indices are deleted - add new handlers for the sepcific event type
* Name of the extension
*/
fun getExtensionName(): String

/**
* Not Required to override but if extension moves the index metadata outside of cluster state and requires IndexManagement to manage these
* indices provide the metadata service that can provide the index metadata for these indices. An extension need to label the metadata service
* with a type string which is used to distinguish indices in IndexManagement plugin
*/
fun getClusterEventHandlers(): Map<ClusterEventType, ClusterEventHandler> {
fun getIndexMetadataService(): Map<String, IndexMetadataService> {
return mapOf()
}
}

enum class ClusterEventType(val type: String) {
CREATE("create"),
DELETE("delete");

override fun toString(): String {
return type
/**
* Caution: Experimental and can be removed in future
*
* If extension wants IndexManagement to determine cluster state indices UUID based on custom index setting if
* present of cluster state override this method.
*/
fun overrideClusterStateIndexUuidSetting(): String? {
return null
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement

import org.opensearch.cluster.ClusterState

interface StatusChecker {

/**
* checks and returns the status of the extension
*/
fun check(clusterState: ClusterState): Status {
return Status.ENABLED
}
}

enum class Status(private val value: String) {
ENABLED("enabled"),
DISABLED("disabled");

override fun toString(): String {
return value
}
}

class DefaultStatusChecker : StatusChecker
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,9 @@ data class ManagedIndexMetaData(
if (policyPrimaryTerm != null) builder.field(POLICY_PRIMARY_TERM, policyPrimaryTerm)

// Only show rolled_over if we have rolled over or we are in the rollover action
// TODO: Fix this
/*if (rolledOver == true || (actionMetaData != null && actionMetaData.name == ActionConfig.ActionType.ROLLOVER.type)) {
if (rolledOver == true || (actionMetaData != null && actionMetaData.name == "rollover")) {
builder.field(ROLLED_OVER, rolledOver)
}*/
}

if (policyCompleted == true) {
builder.field(POLICY_COMPLETED, policyCompleted)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,8 @@ class StepContext(
val user: User?,
val scriptService: ScriptService,
val settings: Settings,
)
) {
fun getUpdatedContext(metadata: ManagedIndexMetaData): StepContext {
return StepContext(metadata, this.clusterService, this.client, this.threadContext, this.user, this.scriptService, this.settings)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.env.Environment
import org.opensearch.env.NodeEnvironment
import org.opensearch.indexmanagement.indexstatemanagement.DefaultIndexMetadataService
import org.opensearch.indexmanagement.indexstatemanagement.ExtensionStatusChecker
import org.opensearch.indexmanagement.indexstatemanagement.ISMActionsParser
import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider
import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementHistory
Expand Down Expand Up @@ -112,6 +113,7 @@ import org.opensearch.indexmanagement.rollup.settings.RollupSettings
import org.opensearch.indexmanagement.settings.IndexManagementSettings
import org.opensearch.indexmanagement.spi.IndexManagementExtension
import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService
import org.opensearch.indexmanagement.spi.indexstatemanagement.StatusChecker
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.transform.TransformRunner
import org.opensearch.indexmanagement.transform.action.delete.DeleteTransformsAction
Expand Down Expand Up @@ -170,6 +172,9 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
lateinit var fieldCapsFilter: FieldCapsFilter
lateinit var indexMetadataProvider: IndexMetadataProvider
private val indexMetadataServices: MutableList<Map<String, IndexMetadataService>> = mutableListOf()
private var customIndexUUIDSetting: String? = null
private val extensions = mutableSetOf<String>()
private val extensionCheckerMap = mutableMapOf<String, StatusChecker>()

companion object {
const val PLUGINS_BASE_URI = "/_plugins"
Expand Down Expand Up @@ -244,10 +249,23 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
val indexManagementExtensions = loader.loadExtensions(IndexManagementExtension::class.java)

indexManagementExtensions.forEach { extension ->
val extensionName = extension.getExtensionName()
if (extensionName in extensions) {
throw IllegalStateException("Mutliple extensions of IndexManagement have same name $extensionName - not supported")
}
extension.getISMActionParsers().forEach { parser ->
ISMActionsParser.instance.addParser(parser)
ISMActionsParser.instance.addParser(parser, extensionName)
}
indexMetadataServices.add(extension.getIndexMetadataService())
extension.overrideClusterStateIndexUuidSetting()?.let {
if (customIndexUUIDSetting != null) {
throw IllegalStateException(
"Multiple extensions of IndexManagement plugin overriding ClusterStateIndexUUIDSetting - not supported"
)
}
customIndexUUIDSetting = extension.overrideClusterStateIndexUuidSetting()
}
extensionCheckerMap[extensionName] = extension.statusChecker()
}
}

Expand Down Expand Up @@ -343,11 +361,12 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
indexMetadataProvider = IndexMetadataProvider(
settings, client, clusterService,
hashMapOf(
DEFAULT_INDEX_TYPE to DefaultIndexMetadataService()
DEFAULT_INDEX_TYPE to DefaultIndexMetadataService(customIndexUUIDSetting)
)
)
indexMetadataServices.forEach { indexMetadataProvider.addMetadataServices(it) }

val extensionChecker = ExtensionStatusChecker(extensionCheckerMap, clusterService)
val managedIndexRunner = ManagedIndexRunner
.registerClient(client)
.registerClusterService(clusterService)
Expand All @@ -359,6 +378,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
.registerHistoryIndex(indexStateManagementHistory)
.registerSkipFlag(skipFlag)
.registerThreadPool(threadPool)
.registerExtensionChecker(extensionChecker)

val metadataService = MetadataService(client, clusterService, skipFlag, indexManagementIndices)
val templateService = ISMTemplateService(client, clusterService, xContentRegistry, indexManagementIndices)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ISMIndexMetadata

class DefaultIndexMetadataService : IndexMetadataService {
class DefaultIndexMetadataService(val customUUIDSetting: String? = null) : IndexMetadataService {

/**
* Returns the default index metadata needed for ISM
Expand All @@ -24,21 +24,25 @@ class DefaultIndexMetadataService : IndexMetadataService {
override suspend fun getMetadata(indices: List<String>, client: Client, clusterService: ClusterService): Map<String, ISMIndexMetadata> {
val indexNameToMetadata: MutableMap<String, ISMIndexMetadata> = HashMap()

val strictExpandOptions = IndicesOptions.strictExpand()
// We want to go through all cluster indices - open/closed/hidden
val lenientExpandOptions = IndicesOptions.lenientExpandHidden()
val clusterStateRequest = ClusterStateRequest()
.clear()
.indices(*indices.toTypedArray())
.metadata(true)
.local(false)
.waitForTimeout(TimeValue.timeValueMillis(DEFAULT_GET_METADATA_TIMEOUT_IN_MILLIS))
.indicesOptions(strictExpandOptions)
.indicesOptions(lenientExpandOptions)

val response: ClusterStateResponse = client.suspendUntil { client.admin().cluster().state(clusterStateRequest, it) }

response.state.metadata.indices.forEach {
// TODO waiting to add document count until it is definitely needed
// TODO find a way to avoid this managed service code difference
val uuid = it.value.settings.get(COLD_UUID_SETTING, it.value.indexUUID) // use the cold uuid if it exists
val uuid = if (customUUIDSetting != null) {
it.value.settings.get(customUUIDSetting, it.value.indexUUID)
} else {
it.value.indexUUID
}
val indexMetadata = ISMIndexMetadata(uuid, it.value.creationDate, -1)
indexNameToMetadata[it.key] = indexMetadata
}
Expand All @@ -52,6 +56,5 @@ class DefaultIndexMetadataService : IndexMetadataService {

companion object {
const val DEFAULT_GET_METADATA_TIMEOUT_IN_MILLIS = 30000L
const val COLD_UUID_SETTING = "index.cold.uuid"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement

import org.opensearch.cluster.service.ClusterService
import org.opensearch.indexmanagement.spi.indexstatemanagement.Status
import org.opensearch.indexmanagement.spi.indexstatemanagement.StatusChecker

/**
* Check the extension status check. The extension status should be used to represent if the extension is turned on or off,
* not as a health check denoting availability.
*/
class ExtensionStatusChecker(private val extensionCheckers: Map<String, StatusChecker>, val clusterService: ClusterService) {

fun isEnabled(extensionName: String?): Boolean {
val checker = extensionCheckers[extensionName] ?: return true
val clusterState = clusterService.state()
return checker.check(clusterState) == Status.ENABLED
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ISMActionsParser private constructor() {
val instance = ISMActionsParser()
}

val parsers = mutableListOf<ActionParser>(
val parsers = mutableListOf(
AllocationActionParser(),
CloseActionParser(),
DeleteActionParser(),
Expand All @@ -48,11 +48,14 @@ class ISMActionsParser private constructor() {
SnapshotActionParser()
)

fun addParser(parser: ActionParser) {
val customActionExtensionMap = mutableMapOf<String, String>()

fun addParser(parser: ActionParser, extensionName: String) {
if (parsers.map { it.getActionType() }.contains(parser.getActionType())) {
throw IllegalArgumentException(getDuplicateActionTypesMessage(parser.getActionType()))
}
parsers.add(parser)
customActionExtensionMap[parser.getActionType()] = extensionName
}

fun fromStreamInput(sin: StreamInput): Action {
Expand Down
Loading

0 comments on commit 40544b8

Please sign in to comment.