Skip to content

Commit

Permalink
optimize doc-level monitor workflow for index patterns
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 committed Aug 28, 2023
1 parent 892bbb4 commit 0805e06
Show file tree
Hide file tree
Showing 3 changed files with 434 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorCtx.indexNameExpressionResolver!!
)
val updatedIndexName = indexName.replace("*", "_")
val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields(
monitorCtx.clusterService!!.state(),
concreteIndices
)

concreteIndices.forEach { concreteIndexName ->
// Prepare lastRunContext for each index
Expand Down Expand Up @@ -194,6 +198,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
docExecutionContext,
updatedIndexName,
concreteIndexName,
conflictingFields.toList(),
matchingDocIdsPerIndex?.get(concreteIndexName)
)

Expand All @@ -208,7 +213,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
)

matchedQueriesForDocs.forEach { hit ->
val id = hit.id.replace("_${updatedIndexName}_${monitor.id}", "")
logger.info("test percolate $concreteIndexName")
val id = hit.id
.replace("_${updatedIndexName}_${monitor.id}", "")
.replace("_${concreteIndexName}_${monitor.id}", "")

val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() }
docIndices.forEach { idx ->
Expand Down Expand Up @@ -571,6 +579,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
docExecutionCtx: DocumentExecutionContext,
index: String,
concreteIndex: String,
conflictingFields: List<String>,
docIds: List<String>? = null
): List<Pair<String, BytesReference>> {
val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int
Expand All @@ -592,7 +601,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
)

if (hits.hits.isNotEmpty()) {
matchingDocs.addAll(getAllDocs(hits, index, monitor.id))
matchingDocs.addAll(getAllDocs(hits, index, concreteIndex, monitor.id, conflictingFields))
}
} catch (e: Exception) {
logger.warn("Failed to run for shard $shard. Error: ${e.message}")
Expand Down Expand Up @@ -687,11 +696,23 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
return response.hits
}

private fun getAllDocs(hits: SearchHits, index: String, monitorId: String): List<Pair<String, BytesReference>> {
private fun getAllDocs(
hits: SearchHits,
index: String,
concreteIndex: String,
monitorId: String,
conflictingFields: List<String>
): List<Pair<String, BytesReference>> {
return hits.map { hit ->
val sourceMap = hit.sourceAsMap

transformDocumentFieldNames(sourceMap, "_${index}_$monitorId")
transformDocumentFieldNames(
sourceMap,
conflictingFields,
"_${index}_$monitorId",
"_${concreteIndex}_$monitorId",
""
)

var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap)

Expand All @@ -718,17 +739,36 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
*/
private fun transformDocumentFieldNames(
jsonAsMap: MutableMap<String, Any>,
fieldNameSuffix: String
conflictingFields: List<String>,
fieldNameSuffixPattern: String,
fieldNameSuffixIndex: String,
fieldNamePrefix: String
) {
val tempMap = mutableMapOf<String, Any>()
val it: MutableIterator<Map.Entry<String, Any>> = jsonAsMap.entries.iterator()
while (it.hasNext()) {
val entry = it.next()
if (entry.value is Map<*, *>) {
transformDocumentFieldNames(entry.value as MutableMap<String, Any>, fieldNameSuffix)
} else if (entry.key.endsWith(fieldNameSuffix) == false) {
tempMap["${entry.key}$fieldNameSuffix"] = entry.value
it.remove()
transformDocumentFieldNames(
entry.value as MutableMap<String, Any>,
conflictingFields,
fieldNameSuffixPattern,
fieldNameSuffixIndex,
if (fieldNamePrefix == "") entry.key else "$fieldNamePrefix.${entry.key}"
)
} else if (!entry.key.endsWith(fieldNameSuffixPattern) && !entry.key.endsWith(fieldNameSuffixIndex)) {
var alreadyReplaced = false
conflictingFields.forEach { conflictingField ->
if (conflictingField == "$fieldNamePrefix.${entry.key}" || (fieldNamePrefix == "" && conflictingField == entry.key)) {
tempMap["${entry.key}$fieldNameSuffixIndex"] = entry.value
it.remove()
alreadyReplaced = true
}
}
if (!alreadyReplaced) {
tempMap["${entry.key}$fieldNameSuffixPattern"] = entry.value
it.remove()
}
}
}
jsonAsMap.putAll(tempMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.opensearch.alerting.MonitorRunnerService.monitorCtx
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.client.Client
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
Expand Down Expand Up @@ -155,8 +156,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
fun traverseMappingsAndUpdate(
node: MutableMap<String, Any>,
currentPath: String,
processLeafFn: (String, MutableMap<String, Any>) -> Triple<String, String, MutableMap<String, Any>>,
flattenPaths: MutableList<String>
processLeafFn: (String, String, MutableMap<String, Any>) -> Triple<String, String, MutableMap<String, Any>>,
flattenPaths: MutableMap<String, MutableMap<String, Any>>
) {
// If node contains "properties" property then it is internal(non-leaf) node
log.debug("Node in traverse: $node")
Expand All @@ -170,10 +171,10 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
// If it has type property and type is not "nested" then this is a leaf
if (nodeProps.containsKey(TYPE) && nodeProps[TYPE] != NESTED) {
// At this point we know full path of node, so we add it to output array
flattenPaths.add(fullPath)
flattenPaths.put(fullPath, nodeProps)
// Calls processLeafFn and gets old node name, new node name and new properties of node.
// This is all information we need to update this node
val (oldName, newName, props) = processLeafFn(it.key, it.value as MutableMap<String, Any>)
val (oldName, newName, props) = processLeafFn(it.key, fullPath, it.value as MutableMap<String, Any>)
newNodes.add(Triple(oldName, newName, props))
} else {
// Internal(non-leaf) node - visit children
Expand Down Expand Up @@ -213,8 +214,9 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
)
val updatedIndexName = indexName.replace("*", "_")
val updatedProperties = mutableMapOf<String, Any>()
val allFlattenPaths = mutableListOf<String>()
val allFlattenPaths = mutableSetOf<Pair<String, String>>()
var sourceIndexFieldLimit = 0L
val conflictingFields = getAllConflictingFields(clusterState, concreteIndices)

concreteIndices.forEach { concreteIndexName ->
if (clusterState.routingTable.hasIndex(concreteIndexName)) {
Expand All @@ -227,7 +229,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
// Node processor function is used to process leaves of index mappings tree
//
val leafNodeProcessor =
fun(fieldName: String, props: MutableMap<String, Any>): Triple<String, String, MutableMap<String, Any>> {
fun(fieldName: String, fullPath: String, props: MutableMap<String, Any>): Triple<String, String, MutableMap<String, Any>> {
val newProps = props.toMutableMap()
if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) {
val mappingsByType = monitor.dataSources.queryIndexMappingsByType
Expand All @@ -237,23 +239,38 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
}
}
}
if (props.containsKey("path")) {
newProps["path"] = "${props["path"]}_${updatedIndexName}_$monitorId"

return if (conflictingFields.contains(fullPath)) {
if (props.containsKey("path")) {
newProps["path"] = "${props["path"]}_${concreteIndexName}_$monitorId"
}
Triple(fieldName, "${fieldName}_${concreteIndexName}_$monitorId", newProps)
} else {
if (props.containsKey("path")) {
newProps["path"] = "${props["path"]}_${updatedIndexName}_$monitorId"
}
Triple(fieldName, "${fieldName}_${updatedIndexName}_$monitorId", newProps)
}
return Triple(fieldName, "${fieldName}_${updatedIndexName}_$monitorId", newProps)
}
// Traverse and update index mappings here while extracting flatten field paths
val flattenPaths = mutableListOf<String>()
val flattenPaths = mutableMapOf<String, MutableMap<String, Any>>()
traverseMappingsAndUpdate(properties, "", leafNodeProcessor, flattenPaths)
allFlattenPaths.addAll(flattenPaths)
flattenPaths.keys.forEach { allFlattenPaths.add(Pair(it, concreteIndexName)) }
// Updated mappings ready to be applied on queryIndex
updatedProperties.putAll(properties)
properties.forEach {
if (updatedProperties.containsKey(it.key) && updatedProperties[it.key] != it.value) {
val mergedField = mergeConflictingFields(updatedProperties[it.key] as Map<String, Any>, it.value as Map<String, Any>)
updatedProperties[it.key] = mergedField
} else {
updatedProperties[it.key] = it.value
}
}
sourceIndexFieldLimit += checkMaxFieldLimit(concreteIndexName)
}
}
}
// Updates mappings of concrete queryIndex. This can rollover queryIndex if field mapping limit is reached.
var (updateMappingResponse, concreteQueryIndex) = updateQueryIndexMappings(
val (updateMappingResponse, concreteQueryIndex) = updateQueryIndexMappings(
monitor,
monitorMetadata,
updatedIndexName,
Expand All @@ -262,7 +279,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
)

if (updateMappingResponse.isAcknowledged) {
doIndexAllQueries(concreteQueryIndex, updatedIndexName, monitorId, queries, allFlattenPaths, refreshPolicy, indexTimeout)
doIndexAllQueries(concreteQueryIndex, updatedIndexName, monitorId, queries, allFlattenPaths, conflictingFields, refreshPolicy, indexTimeout)
}
}
}
Expand All @@ -272,18 +289,60 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
sourceIndex: String,
monitorId: String,
queries: List<DocLevelQuery>,
flattenPaths: MutableList<String>,
flattenPaths: MutableSet<Pair<String, String>>,
conflictingPaths: Set<String>,
refreshPolicy: RefreshPolicy,
indexTimeout: TimeValue
) {
val indexRequests = mutableListOf<IndexRequest>()
val conflictingPathToConcreteIndices = mutableMapOf<String, MutableSet<String>>()
flattenPaths.forEach { fieldPath ->
if (conflictingPaths.contains(fieldPath.first)) {
if (conflictingPathToConcreteIndices.containsKey(fieldPath.first)) {
val concreteIndexSet = conflictingPathToConcreteIndices[fieldPath.first]
concreteIndexSet!!.add(fieldPath.second)
conflictingPathToConcreteIndices[fieldPath.first] = concreteIndexSet
} else {
val concreteIndexSet = mutableSetOf<String>()
concreteIndexSet.add(fieldPath.second)
conflictingPathToConcreteIndices[fieldPath.first] = concreteIndexSet
}
}
}

val newQueries = mutableListOf<DocLevelQuery>()
queries.forEach {
val filteredConcreteIndices = mutableSetOf<String>()
var query = it.query
conflictingPaths.forEach { conflictingPath ->
if (query.contains(conflictingPath)) {
query = query.replace("$conflictingPath:", "${conflictingPath}_<index>_$monitorId:")
filteredConcreteIndices.addAll(conflictingPathToConcreteIndices[conflictingPath]!!)
}
}

if (filteredConcreteIndices.isNotEmpty()) {
filteredConcreteIndices.forEach { filteredConcreteIndex ->
val newQuery = it.copy(
id = "${it.id}_$filteredConcreteIndex",
query = query.replace("<index>", filteredConcreteIndex)
)
newQueries.add(newQuery)
}
} else {
newQueries.add(it.copy(id = "${it.id}_$sourceIndex"))
}
}

newQueries.forEach {
var query = it.query
flattenPaths.forEach { fieldPath ->
query = query.replace("$fieldPath:", "${fieldPath}_${sourceIndex}_$monitorId:")
if (!conflictingPaths.contains(fieldPath.first)) {
query = query.replace("${fieldPath.first}:", "${fieldPath.first}_${sourceIndex}_$monitorId:")
}
}
val indexRequest = IndexRequest(concreteQueryIndex)
.id(it.id + "_${sourceIndex}_$monitorId")
.id(it.id + "_$monitorId")
.source(
mapOf(
"query" to mapOf("query_string" to mapOf("query" to query)),
Expand Down Expand Up @@ -390,6 +449,62 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
return Pair(updateMappingResponse, targetQueryIndex)
}

private fun mergeConflictingFields(oldField: Map<String, Any>, newField: Map<String, Any>): Map<String, Any> {
val mergedField = mutableMapOf<String, Any>()
oldField.entries.forEach {
if (newField.containsKey(it.key)) {
if (it.value is Map<*, *> && newField[it.key] is Map<*, *>) {
mergedField[it.key] =
mergeConflictingFields(it.value as Map<String, Any>, newField[it.key] as Map<String, Any>)
} else {
mergedField[it.key] = it.value
}
} else {
mergedField[it.key] = it.value
}
}

newField.entries.forEach {
if (!oldField.containsKey(it.key)) {
mergedField[it.key] = it.value
}
}
return mergedField
}

fun getAllConflictingFields(clusterState: ClusterState, concreteIndices: List<String>): Set<String> {
val conflictingFields = mutableSetOf<String>()
val allFlattenPaths = mutableMapOf<String, MutableMap<String, Any>>()
concreteIndices.forEach { concreteIndexName ->
if (clusterState.routingTable.hasIndex(concreteIndexName)) {
val indexMetadata = clusterState.metadata.index(concreteIndexName)
if (indexMetadata.mapping()?.sourceAsMap?.get("properties") != null) {
val properties = (
(indexMetadata.mapping()?.sourceAsMap?.get("properties"))
as MutableMap<String, Any>
)
// Node processor function is used to process leaves of index mappings tree
//
val leafNodeProcessor =
fun(fieldName: String, _: String, props: MutableMap<String, Any>): Triple<String, String, MutableMap<String, Any>> {
return Triple(fieldName, fieldName, props)
}
// Traverse and update index mappings here while extracting flatten field paths
val flattenPaths = mutableMapOf<String, MutableMap<String, Any>>()
traverseMappingsAndUpdate(properties, "", leafNodeProcessor, flattenPaths)

flattenPaths.forEach {
if (allFlattenPaths.containsKey(it.key) && allFlattenPaths[it.key]!! != it.value) {
conflictingFields.add(it.key)
}
allFlattenPaths.putIfAbsent(it.key, it.value)
}
}
}
}
return conflictingFields
}

/**
* Adjusts max field limit index setting for query index if source index has higher limit.
* This will prevent max field limit exception, when source index has more fields then query index limit
Expand Down
Loading

0 comments on commit 0805e06

Please sign in to comment.