Skip to content

Commit

Permalink
merge with main
Browse files Browse the repository at this point in the history
Signed-off-by: Ravi Thaluru <[email protected]>
  • Loading branch information
thalurur committed Feb 22, 2022
1 parent 577603e commit 43d248f
Show file tree
Hide file tree
Showing 19 changed files with 184 additions and 277 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ data class ManagedIndexMetaData(
const val ROLLED_OVER = "rolled_over"
const val TRANSITION_TO = "transition_to"
const val INFO = "info"
const val ENABLED = "enabled"

fun fromStreamInput(si: StreamInput): ManagedIndexMetaData {
val index: String? = si.readString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndex
import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN
import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_REPLICAS
import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_SHARDS
import org.opensearch.indexmanagement.opensearchapi.OPENDISTRO_SECURITY_PROTECTED_INDICES_CONF_REQUEST
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
Expand Down Expand Up @@ -113,8 +114,16 @@ class IndexStateManagementHistory(
}

private fun rolloverAndDeleteHistoryIndex() {
if (historyEnabled) rolloverHistoryIndex()
deleteOldHistoryIndex()
val ctx = threadPool.threadContext.stashContext()
try {
if (threadPool.threadContext.getTransient<String?>(OPENDISTRO_SECURITY_PROTECTED_INDICES_CONF_REQUEST) == null) {
threadPool.threadContext.putTransient(OPENDISTRO_SECURITY_PROTECTED_INDICES_CONF_REQUEST, "true")
}
if (historyEnabled) rolloverHistoryIndex()
deleteOldHistoryIndex()
} finally {
ctx.close()
}
}

private fun rolloverHistoryIndex() {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.opensearch.common.xcontent.ToXContentFragment
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.index.Index
import org.opensearch.index.IndexNotFoundException
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import org.opensearch.indexmanagement.indexstatemanagement.settings.LegacyOpenDistroManagedIndexSettings
Expand Down Expand Up @@ -119,28 +118,28 @@ suspend fun IndexMetadata.getManagedIndexMetadata(client: Client): ManagedIndexM
*
* @return list of metadata
*/
suspend fun Client.mgetManagedIndexMetadata(indices: List<Index>): List<Pair<ManagedIndexMetaData?, Exception?>?> {
log.debug("trying to get back metadata for indices ${indices.map { it.name }}")

if (indices.isEmpty()) return emptyList()
suspend fun Client.mgetManagedIndexMetadata(indexUuids: List<String>): List<Pair<ManagedIndexMetaData?, Exception?>?> {
log.debug("trying to get back metadata for index [$indexUuids]")
if (indexUuids.isEmpty()) return emptyList()

val mgetRequest = MultiGetRequest()
indices.forEach {
indexUuids.forEach {
mgetRequest.add(
MultiGetRequest.Item(
INDEX_MANAGEMENT_INDEX, managedIndexMetadataID(it.uuid)
).routing(it.uuid)
INDEX_MANAGEMENT_INDEX, managedIndexMetadataID(it)
).routing(it)
)
}
var mgetMetadataList = listOf<Pair<ManagedIndexMetaData?, Exception?>?>()
try {
val response: MultiGetResponse = this.suspendUntil { multiGet(mgetRequest, it) }
mgetMetadataList = mgetResponseToList(response)
} catch (e: ActionRequestValidationException) {
log.info("No managed index metadata for indices [$indices], ${e.message}")
log.info("No managed index metadata for indices [$indexUuids], ${e.message}")
} catch (e: Exception) {
log.error("Failed to multi-get managed index metadata for indices [$indices]", e)
log.error("Failed to multi-get managed index metadata for indices [$indexUuids]", e)
}

return mgetMetadataList
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ class AttemptSetReadOnlyStep(private val action: ForceMergeAction) : Step(name)
}

override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData =
currentMetadata.copy(stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus), transitionTo = null, info = info)
currentMetadata.copy(
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus),
transitionTo = null,
info = info
)

override fun isIdempotent() = true

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.indexmanagement.indexstatemanagement.settings.LegacyOpenDistroManagedIndexSettings
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import org.opensearch.indexmanagement.indexstatemanagement.util.TOTAL_MANAGED_INDICES
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import java.io.IOException

Expand All @@ -22,40 +23,52 @@ open class ExplainResponse : ActionResponse, ToXContentObject {
val indexNames: List<String>
val indexPolicyIDs: List<String?>
val indexMetadatas: List<ManagedIndexMetaData?>
val totalManagedIndices: Int
val enabledState: Map<String, Boolean>

constructor(
indexNames: List<String>,
indexPolicyIDs: List<String?>,
indexMetadatas: List<ManagedIndexMetaData?>
indexMetadatas: List<ManagedIndexMetaData?>,
totalManagedIndices: Int,
enabledState: Map<String, Boolean>
) : super() {
this.indexNames = indexNames
this.indexPolicyIDs = indexPolicyIDs
this.indexMetadatas = indexMetadatas
this.totalManagedIndices = totalManagedIndices
this.enabledState = enabledState
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
indexNames = sin.readStringList(),
indexPolicyIDs = sin.readStringList(),
indexMetadatas = sin.readList { ManagedIndexMetaData.fromStreamInput(it) }
indexMetadatas = sin.readList { ManagedIndexMetaData.fromStreamInput(it) },
totalManagedIndices = sin.readInt(),
enabledState = sin.readMap() as Map<String, Boolean>
)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeStringCollection(indexNames)
out.writeStringCollection(indexPolicyIDs)
out.writeCollection(indexMetadatas)
out.writeInt(totalManagedIndices)
out.writeMap(enabledState)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
indexNames.forEachIndexed { ind, name ->
builder.startObject(name)
builder.field(LegacyOpenDistroManagedIndexSettings.POLICY_ID.key, indexPolicyIDs[ind])
builder.field(ManagedIndexSettings.POLICY_ID.key, indexPolicyIDs[ind])
builder.field(LegacyOpenDistroManagedIndexSettings.POLICY_ID.key, indexPolicyIDs[ind])
indexMetadatas[ind]?.toXContent(builder, ToXContent.EMPTY_PARAMS)
builder.field("enabled", enabledState[name])
builder.endObject()
}
builder.field(TOTAL_MANAGED_INDICES, totalManagedIndices)
return builder.endObject()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,7 @@ class TransportExplainAction @Inject constructor(
enabledStatus: Map<String, Boolean> = enabledState,
totalIndices: Int = totalManagedIndices
) {
if (explainAll) {
actionListener.onResponse(ExplainAllResponse(indices, policies, metadata, totalIndices, enabledStatus))
return
}
actionListener.onResponse(ExplainResponse(indices, policies, metadata))
actionListener.onResponse(ExplainResponse(indices, policies, metadata, totalIndices, enabledStatus))
}

private fun getMetadata(response: GetResponse?): ManagedIndexMetaData? {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,16 @@ package org.opensearch.indexmanagement.indexstatemanagement.util
import inet.ipaddr.IPAddressString
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import org.opensearch.action.DocWriteRequest
import org.opensearch.action.delete.DeleteRequest
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.support.WriteRequest
import org.opensearch.action.update.UpdateRequest
import org.opensearch.alerting.destination.message.BaseMessage
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.common.unit.ByteSizeValue
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.index.Index
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.QueryBuilders
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
Expand Down Expand Up @@ -163,53 +160,32 @@ fun updateManagedIndexRequest(sweptManagedIndexConfig: SweptManagedIndexConfig):
}

/**
* Creates DeleteRequests for [ManagedIndexConfig].
*
* Finds ManagedIndices that exist in [INDEX_MANAGEMENT_INDEX] that do not exist in the cluster state
* anymore which means we need to delete the [ManagedIndexConfig].
*
* @param currentIndices List of current [IndexMetadata] in cluster state.
* @param currentManagedIndexConfigs map of IndexUuid to [SweptManagedIndexConfig].
* @return list of [DocWriteRequest].
* @param currentIndexUuids List of current index uuids in cluster.
* @param currentManagedIndexUuids List of current managed index uuids in cluster.
* @return list of managedIndexUuids to delete.
*/
fun getDeleteManagedIndexRequests(
currentIndices: List<IndexMetadata>,
currentManagedIndexConfigs: Map<String, SweptManagedIndexConfig>
): List<DocWriteRequest<*>> {
return currentManagedIndexConfigs.filter { currentManagedIndex ->
!currentIndices.map { it.index.uuid }.contains(currentManagedIndex.key)
}.map { deleteManagedIndexRequest(it.value.uuid) }
}

// if managed index exist but the index is not existing any more
// then we should delete this managed index
fun getManagedIndicesToDelete(
currentIndices: List<IndexMetadata>,
currentManagedIndexConfigs: Map<String, SweptManagedIndexConfig>
): List<Index> {
val currentIndicesSet = currentIndices.map { it.index }.toSet()
val managedIndicesSet = currentManagedIndexConfigs.values.map { Index(it.index, it.uuid) }.toSet()
return (managedIndicesSet - currentIndicesSet).toList()
currentIndexUuids: List<String>,
currentManagedIndexUuids: List<String>
): List<String> {
return currentManagedIndexUuids.filter { currentManagedIndex ->
!currentIndexUuids.contains(currentManagedIndex)
}
}

fun getSweptManagedIndexSearchRequest(): SearchRequest {
val boolQueryBuilder = BoolQueryBuilder().filter(QueryBuilders.existsQuery(ManagedIndexConfig.MANAGED_INDEX_TYPE))
return SearchRequest()
.indices(INDEX_MANAGEMENT_INDEX)
.scroll(TimeValue.timeValueMinutes(1))
.source(
SearchSourceBuilder.searchSource()
// TODO: Get all ManagedIndices at once or split into searchAfter queries?
.size(ManagedIndexCoordinator.MAX_HITS)
.seqNoAndPrimaryTerm(true)
.fetchSource(
arrayOf(
"${ManagedIndexConfig.MANAGED_INDEX_TYPE}.${ManagedIndexConfig.INDEX_FIELD}",
"${ManagedIndexConfig.MANAGED_INDEX_TYPE}.${ManagedIndexConfig.INDEX_UUID_FIELD}",
"${ManagedIndexConfig.MANAGED_INDEX_TYPE}.${ManagedIndexConfig.POLICY_ID_FIELD}",
"${ManagedIndexConfig.MANAGED_INDEX_TYPE}.${ManagedIndexConfig.CHANGE_POLICY_FIELD}"
),
emptyArray()
)
.fetchSource(emptyArray(), emptyArray())
.query(boolQueryBuilder)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ val XCONTENT_WITHOUT_TYPE_AND_USER = ToXContent.MapParams(mapOf(WITH_TYPE to "fa
const val FAILURES = "failures"
const val FAILED_INDICES = "failed_indices"
const val UPDATED_INDICES = "updated_indices"
const val TOTAL_MANAGED_INDICES = "total_managed_indices"

const val ISM_TEMPLATE_FIELD = "policy.ism_template"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine

const val OPENDISTRO_SECURITY_PROTECTED_INDICES_CONF_REQUEST = "_opendistro_security_protected_indices_conf_request"

fun contentParser(bytesReference: BytesReference): XContentParser {
return XContentHelper.createParser(
NamedXContentRegistry.EMPTY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.transport.action.expl
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.explain.TransportExplainAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.TransportUpdateManagedIndexMetaDataAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction
import org.opensearch.indexmanagement.indexstatemanagement.util.TOTAL_MANAGED_INDICES
import org.opensearch.indexmanagement.makeRequest
import org.opensearch.indexmanagement.opensearchapi.parseWithType
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
Expand Down Expand Up @@ -272,11 +273,12 @@ abstract class IndexStateManagementIntegTestCase : OpenSearchIntegTestCase() {
xcp.nextToken(),
xcp
)
var totalManagedIndices = 0
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
xcp.currentName()
xcp.nextToken()

metadata = ManagedIndexMetaData.parse(xcp)
if (xcp.currentName() == TOTAL_MANAGED_INDICES) totalManagedIndices = xcp.intValue()
else metadata = ManagedIndexMetaData.parse(xcp)
}
return metadata
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ class ActionRetryIT : IndexStateManagementRestTestCase() {
),
PolicyRetryInfoMetaData.RETRY_INFO to fun(retryInfoMetaDataMap: Any?): Boolean =
assertRetryInfoEquals(PolicyRetryInfoMetaData(false, 0), retryInfoMetaDataMap),
ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedInfoString == info.toString()
ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedInfoString == info.toString(),
ManagedIndexMetaData.ENABLED to true::equals
)
),
getExplainMap(indexName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() {
explainResponseOpendistroPolicyIdSetting to fun(policyID: Any?): Boolean =
policyID == null,
explainResponseOpenSearchPolicyIdSetting to fun(policyID: Any?): Boolean =
policyID == null
policyID == null,
ManagedIndexMetaData.ENABLED to fun(enabled: Any?): Boolean = enabled == null
)
),
getExplainMap(index),
Expand Down
Loading

0 comments on commit 43d248f

Please sign in to comment.