Skip to content

Commit

Permalink
Ports over show-applied-policies logic (#287)
Browse files Browse the repository at this point in the history
Signed-off-by: Clay Downs <[email protected]>
  • Loading branch information
downsrob authored Mar 7, 2022
1 parent 9c8028f commit 57e3cce
Show file tree
Hide file tree
Showing 11 changed files with 150 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.LEGACY_ISM
import org.opensearch.indexmanagement.indexstatemanagement.model.SearchParams
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.explain.ExplainAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.explain.ExplainRequest
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_EXPLAIN_SHOW_POLICY
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_JOB_SORT_FIELD
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_PAGINATION_FROM
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_PAGINATION_SIZE
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_QUERY_STRING
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_SORT_ORDER
import org.opensearch.indexmanagement.indexstatemanagement.util.SHOW_POLICY_QUERY_PARAM
import org.opensearch.indexmanagement.indexstatemanagement.util.TYPE_PARAM_KEY
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.BaseRestHandler.RestChannelConsumer
Expand Down Expand Up @@ -77,6 +79,7 @@ class RestExplainAction : BaseRestHandler() {
request.paramAsBoolean("local", false),
request.paramAsTime("master_timeout", MasterNodeRequest.DEFAULT_MASTER_NODE_TIMEOUT),
SearchParams(size, from, sortField, sortOrder, queryString),
request.paramAsBoolean(SHOW_POLICY_QUERY_PARAM, DEFAULT_EXPLAIN_SHOW_POLICY),
indexType
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,22 @@ class ExplainRequest : ActionRequest {
val local: Boolean
val masterTimeout: TimeValue
val searchParams: SearchParams
val showPolicy: Boolean
val indexType: String

constructor(
indices: List<String>,
local: Boolean,
masterTimeout: TimeValue,
searchParams: SearchParams,
showPolicy: Boolean,
indexType: String
) : super() {
this.indices = indices
this.local = local
this.masterTimeout = masterTimeout
this.searchParams = searchParams
this.showPolicy = showPolicy
this.indexType = indexType
}

Expand All @@ -43,6 +46,7 @@ class ExplainRequest : ActionRequest {
local = sin.readBoolean(),
masterTimeout = sin.readTimeValue(),
searchParams = SearchParams(sin),
showPolicy = sin.readBoolean(),
indexType = sin.readString()
)

Expand All @@ -63,6 +67,7 @@ class ExplainRequest : ActionRequest {
out.writeBoolean(local)
out.writeTimeValue(masterTimeout)
searchParams.writeTo(out)
out.writeBoolean(showPolicy)
out.writeString(indexType)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.settings.LegacyOpenDistroManagedIndexSettings
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import org.opensearch.indexmanagement.indexstatemanagement.util.TOTAL_MANAGED_INDICES
import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE_AND_USER
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import java.io.IOException

Expand All @@ -25,19 +27,22 @@ open class ExplainResponse : ActionResponse, ToXContentObject {
val indexMetadatas: List<ManagedIndexMetaData?>
val totalManagedIndices: Int
val enabledState: Map<String, Boolean>
val policies: Map<String, Policy>

constructor(
indexNames: List<String>,
indexPolicyIDs: List<String?>,
indexMetadatas: List<ManagedIndexMetaData?>,
totalManagedIndices: Int,
enabledState: Map<String, Boolean>
enabledState: Map<String, Boolean>,
policies: Map<String, Policy>
) : super() {
this.indexNames = indexNames
this.indexPolicyIDs = indexPolicyIDs
this.indexMetadatas = indexMetadatas
this.totalManagedIndices = totalManagedIndices
this.enabledState = enabledState
this.policies = policies
}

@Throws(IOException::class)
Expand All @@ -46,7 +51,8 @@ open class ExplainResponse : ActionResponse, ToXContentObject {
indexPolicyIDs = sin.readStringList(),
indexMetadatas = sin.readList { ManagedIndexMetaData.fromStreamInput(it) },
totalManagedIndices = sin.readInt(),
enabledState = sin.readMap() as Map<String, Boolean>
enabledState = sin.readMap() as Map<String, Boolean>,
policies = sin.readMap(StreamInput::readString, ::Policy)
)

@Throws(IOException::class)
Expand All @@ -56,6 +62,11 @@ open class ExplainResponse : ActionResponse, ToXContentObject {
out.writeCollection(indexMetadatas)
out.writeInt(totalManagedIndices)
out.writeMap(enabledState)
out.writeMap(
policies,
{ _out, key -> _out.writeString(key) },
{ _out, policy -> policy.writeTo(_out) }
)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
Expand All @@ -66,6 +77,7 @@ open class ExplainResponse : ActionResponse, ToXContentObject {
builder.field(LegacyOpenDistroManagedIndexSettings.POLICY_ID.key, indexPolicyIDs[ind])
indexMetadatas[ind]?.toXContent(builder, ToXContent.EMPTY_PARAMS)
builder.field("enabled", enabledState[name])
policies[name]?.let { builder.field(Policy.POLICY_TYPE, it, XCONTENT_WITHOUT_TYPE_AND_USER) }
builder.endObject()
}
builder.field(TOTAL_MANAGED_INDICES, totalManagedIndices)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,23 @@ import org.opensearch.index.query.QueryBuilders
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider
import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexCoordinator.Companion.MAX_HITS
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.model.SearchParams
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getManagedIndexMetadata
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.managedIndex.ManagedIndexAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.managedIndex.ManagedIndexRequest
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE
import org.opensearch.indexmanagement.indexstatemanagement.util.MANAGED_INDEX_FIELD
import org.opensearch.indexmanagement.indexstatemanagement.util.MANAGED_INDEX_INDEX_UUID_FIELD
import org.opensearch.indexmanagement.indexstatemanagement.util.MANAGED_INDEX_NAME_FIELD
import org.opensearch.indexmanagement.indexstatemanagement.util.MetadataCheck
import org.opensearch.indexmanagement.indexstatemanagement.util.checkMetadata
import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexMetadataID
import org.opensearch.indexmanagement.opensearchapi.parseWithType
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ISMIndexMetadata
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser
import org.opensearch.search.SearchHit
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.fetch.subphase.FetchSourceContext.FETCH_SOURCE
import org.opensearch.search.sort.SortBuilders
Expand Down Expand Up @@ -100,6 +103,7 @@ class TransportExplainAction @Inject constructor(
) {
private val indices: List<String> = request.indices
private val explainAll: Boolean = indices.isEmpty()
private val showPolicy: Boolean = request.showPolicy

// Map of indexName to index metadata got from config index job which is fake/not a real full metadata document
private val managedIndicesMetaDataMap: MutableMap<IndexName, ManagedIndexMetadataMap> = mutableMapOf()
Expand All @@ -113,6 +117,7 @@ class TransportExplainAction @Inject constructor(
private val indexPolicyIDs = mutableListOf<PolicyID?>()
private val indexMetadatas = mutableListOf<ManagedIndexMetaData?>()
private var totalManagedIndices = 0
private val appliedPolicies: MutableMap<String, Policy> = mutableMapOf()

@Suppress("SpreadOperator", "NestedBlockDepth")
fun start() {
Expand Down Expand Up @@ -182,17 +187,18 @@ class TransportExplainAction @Inject constructor(
totalManagedIndices = totalHits.value.toInt()
}

response.hits.hits.map {
val hitMap = it.sourceAsMap[MANAGED_INDEX_FIELD] as Map<String, Any>
val managedIndex = hitMap["index"] as String
managedIndices.add(managedIndex)
enabledState[managedIndex] = hitMap["enabled"] as Boolean
managedIndicesMetaDataMap[managedIndex] = mapOf(
"index" to hitMap["index"] as String?,
"index_uuid" to hitMap["index_uuid"] as String?,
"policy_id" to hitMap["policy_id"] as String?,
ManagedIndexMetaData.ENABLED to hitMap["enabled"]?.toString()
parseSearchHits(response.hits.hits).forEach { managedIndex ->
managedIndices.add(managedIndex.index)
enabledState[managedIndex.index] = managedIndex.enabled
managedIndicesMetaDataMap[managedIndex.index] = mapOf(
"index" to managedIndex.index,
"index_uuid" to managedIndex.indexUuid,
"policy_id" to managedIndex.policyID,
"enabled" to managedIndex.enabled.toString()
)
if (showPolicy) {
managedIndex.policy?.let { appliedPolicies[managedIndex.index] = it }
}
}

// explain all only return managed indices
Expand All @@ -201,7 +207,7 @@ class TransportExplainAction @Inject constructor(
// edge case: if specify query param pagination size to be 0
// we still show total managed indices
indexNames.clear()
sendResponse()
sendResponse(indexNames, indexMetadatas, indexPolicyIDs, enabledState, totalManagedIndices, appliedPolicies)
return
} else {
// Clear and add the managedIndices from the response to preserve the sort order and size
Expand All @@ -227,7 +233,7 @@ class TransportExplainAction @Inject constructor(
return
}
indexNames.clear()
sendResponse()
sendResponse(indexNames, indexMetadatas, indexPolicyIDs, enabledState, totalManagedIndices, appliedPolicies)
return
}
actionListener.onFailure(ExceptionsHelper.unwrapCause(t) as Exception)
Expand Down Expand Up @@ -324,7 +330,7 @@ class TransportExplainAction @Inject constructor(
managedIndicesMetaDataMap.clear()

if (user == null || indexNames.isEmpty()) {
sendResponse()
sendResponse(indexNames, indexMetadatas, indexPolicyIDs, enabledState, totalManagedIndices, appliedPolicies)
} else {
filterAndSendResponse(threadContext)
}
Expand All @@ -336,15 +342,18 @@ class TransportExplainAction @Inject constructor(
val filteredMetadata = mutableListOf<ManagedIndexMetaData?>()
val filteredPolicies = mutableListOf<PolicyID?>()
val enabledStatus = mutableMapOf<String, Boolean>()
filter(0, filteredIndices, filteredMetadata, filteredPolicies, enabledStatus)
val filteredAppliedPolicies = mutableMapOf<String, Policy>()
filter(0, filteredIndices, filteredMetadata, filteredPolicies, enabledStatus, filteredAppliedPolicies)
}

@Suppress("LongParameterList")
private fun filter(
current: Int,
filteredIndices: MutableList<String>,
filteredMetadata: MutableList<ManagedIndexMetaData?>,
filteredPolicies: MutableList<PolicyID?>,
enabledStatus: MutableMap<String, Boolean>
enabledStatus: MutableMap<String, Boolean>,
filteredAppliedPolicies: MutableMap<String, Policy>
) {
val request = ManagedIndexRequest().indices(indexNames[current])
client.execute(
Expand All @@ -356,11 +365,15 @@ class TransportExplainAction @Inject constructor(
filteredMetadata.add(indexMetadatas[current])
filteredPolicies.add(indexPolicyIDs[current])
enabledStatus[indexNames[current]] = enabledState.getOrDefault(indexNames[current], false)
appliedPolicies[indexNames[current]]?.let { filteredAppliedPolicies[indexNames[current]] = it }
if (current < indexNames.count() - 1) {
// do nothing - skip the index and go to next one
filter(current + 1, filteredIndices, filteredMetadata, filteredPolicies, enabledStatus)
filter(current + 1, filteredIndices, filteredMetadata, filteredPolicies, enabledStatus, filteredAppliedPolicies)
} else {
sendResponse(filteredIndices, filteredMetadata, filteredPolicies, enabledStatus)
sendResponse(
filteredIndices, filteredMetadata, filteredPolicies, enabledStatus,
totalManagedIndices, filteredAppliedPolicies
)
}
}

Expand All @@ -370,9 +383,19 @@ class TransportExplainAction @Inject constructor(
totalManagedIndices -= 1
if (current < indexNames.count() - 1) {
// do nothing - skip the index and go to next one
filter(current + 1, filteredIndices, filteredMetadata, filteredPolicies, enabledStatus)
filter(
current + 1,
filteredIndices,
filteredMetadata,
filteredPolicies,
enabledStatus,
filteredAppliedPolicies
)
} else {
sendResponse(filteredIndices, filteredMetadata, filteredPolicies, enabledStatus)
sendResponse(
filteredIndices, filteredMetadata, filteredPolicies, enabledStatus,
totalManagedIndices, filteredAppliedPolicies
)
}
}
false -> {
Expand All @@ -384,14 +407,16 @@ class TransportExplainAction @Inject constructor(
)
}

@Suppress("LongParameterList")
private fun sendResponse(
indices: List<String> = indexNames,
metadata: List<ManagedIndexMetaData?> = indexMetadatas,
policies: List<PolicyID?> = indexPolicyIDs,
enabledStatus: Map<String, Boolean> = enabledState,
totalIndices: Int = totalManagedIndices
indices: List<String>,
metadata: List<ManagedIndexMetaData?>,
policyIDs: List<PolicyID?>,
enabledStatus: Map<String, Boolean>,
totalIndices: Int,
policies: Map<String, Policy>
) {
actionListener.onResponse(ExplainResponse(indices, policies, metadata, totalIndices, enabledStatus))
actionListener.onResponse(ExplainResponse(indices, policyIDs, metadata, totalIndices, enabledStatus, policies))
}

@Suppress("ReturnCount")
Expand All @@ -413,6 +438,17 @@ class TransportExplainAction @Inject constructor(

return null
}

private fun parseSearchHits(hits: Array<SearchHit>): List<ManagedIndexConfig> {
return hits.map { hit ->
XContentHelper.createParser(
xContentRegistry,
LoggingDeprecationHandler.INSTANCE,
hit.sourceRef,
XContentType.JSON
).parseWithType(parse = ManagedIndexConfig.Companion::parse)
}
}
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ const val DEFAULT_POLICY_SORT_FIELD = "policy.policy_id.keyword"
const val DEFAULT_SORT_ORDER = "asc"
const val DEFAULT_QUERY_STRING = "*"

const val SHOW_POLICY_QUERY_PARAM = "show_policy"
const val DEFAULT_EXPLAIN_SHOW_POLICY = false

const val INDEX_HIDDEN = "index.hidden"
const val INDEX_NUMBER_OF_SHARDS = "index.number_of_shards"
const val INDEX_NUMBER_OF_REPLICAS = "index.number_of_replicas"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ fun OpenSearchException.isRetryable(): Boolean {
*/
fun XContentBuilder.string(): String = BytesReference.bytes(this).utf8ToString()

fun XContentBuilder.toMap(): Map<String, Any> = XContentHelper.convertToMap(BytesReference.bytes(this), false, XContentType.JSON).v2()

/**
* Converts [OpenSearchClient] methods that take a callback into a kotlin suspending function.
*
Expand Down
Loading

0 comments on commit 57e3cce

Please sign in to comment.