From 0805e06f147aea5fd5d4483d671cb20b4013beb9 Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Sat, 26 Aug 2023 02:32:07 +0000 Subject: [PATCH] optimize doc-level monitor workflow for index patterns Signed-off-by: Subhobrata Dey --- .../alerting/DocumentLevelMonitorRunner.kt | 58 +++- .../alerting/util/DocLevelMonitorQueries.kt | 149 +++++++++-- .../alerting/DocumentMonitorRunnerIT.kt | 253 ++++++++++++++++++ 3 files changed, 434 insertions(+), 26 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index c644ea62c..ed39274fd 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -153,6 +153,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorCtx.indexNameExpressionResolver!! ) val updatedIndexName = indexName.replace("*", "_") + val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields( + monitorCtx.clusterService!!.state(), + concreteIndices + ) concreteIndices.forEach { concreteIndexName -> // Prepare lastRunContext for each index @@ -194,6 +198,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { docExecutionContext, updatedIndexName, concreteIndexName, + conflictingFields.toList(), matchingDocIdsPerIndex?.get(concreteIndexName) ) @@ -208,7 +213,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { ) matchedQueriesForDocs.forEach { hit -> - val id = hit.id.replace("_${updatedIndexName}_${monitor.id}", "") + logger.info("test percolate $concreteIndexName") + val id = hit.id + .replace("_${updatedIndexName}_${monitor.id}", "") + .replace("_${concreteIndexName}_${monitor.id}", "") val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } docIndices.forEach { idx -> @@ -571,6 +579,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { docExecutionCtx: DocumentExecutionContext, index: String, concreteIndex: String, + conflictingFields: List, docIds: List? = null ): List> { val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int @@ -592,7 +601,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { ) if (hits.hits.isNotEmpty()) { - matchingDocs.addAll(getAllDocs(hits, index, monitor.id)) + matchingDocs.addAll(getAllDocs(hits, index, concreteIndex, monitor.id, conflictingFields)) } } catch (e: Exception) { logger.warn("Failed to run for shard $shard. Error: ${e.message}") @@ -687,11 +696,23 @@ object DocumentLevelMonitorRunner : MonitorRunner() { return response.hits } - private fun getAllDocs(hits: SearchHits, index: String, monitorId: String): List> { + private fun getAllDocs( + hits: SearchHits, + index: String, + concreteIndex: String, + monitorId: String, + conflictingFields: List + ): List> { return hits.map { hit -> val sourceMap = hit.sourceAsMap - transformDocumentFieldNames(sourceMap, "_${index}_$monitorId") + transformDocumentFieldNames( + sourceMap, + conflictingFields, + "_${index}_$monitorId", + "_${concreteIndex}_$monitorId", + "" + ) var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap) @@ -718,17 +739,36 @@ object DocumentLevelMonitorRunner : MonitorRunner() { */ private fun transformDocumentFieldNames( jsonAsMap: MutableMap, - fieldNameSuffix: String + conflictingFields: List, + fieldNameSuffixPattern: String, + fieldNameSuffixIndex: String, + fieldNamePrefix: String ) { val tempMap = mutableMapOf() val it: MutableIterator> = jsonAsMap.entries.iterator() while (it.hasNext()) { val entry = it.next() if (entry.value is Map<*, *>) { - transformDocumentFieldNames(entry.value as MutableMap, fieldNameSuffix) - } else if (entry.key.endsWith(fieldNameSuffix) == false) { - tempMap["${entry.key}$fieldNameSuffix"] = entry.value - it.remove() + transformDocumentFieldNames( + entry.value as MutableMap, + conflictingFields, + fieldNameSuffixPattern, + fieldNameSuffixIndex, + if (fieldNamePrefix == "") entry.key else "$fieldNamePrefix.${entry.key}" + ) + } else if (!entry.key.endsWith(fieldNameSuffixPattern) && !entry.key.endsWith(fieldNameSuffixIndex)) { + var alreadyReplaced = false + conflictingFields.forEach { conflictingField -> + if (conflictingField == "$fieldNamePrefix.${entry.key}" || (fieldNamePrefix == "" && conflictingField == entry.key)) { + tempMap["${entry.key}$fieldNameSuffixIndex"] = entry.value + it.remove() + alreadyReplaced = true + } + } + if (!alreadyReplaced) { + tempMap["${entry.key}$fieldNameSuffixPattern"] = entry.value + it.remove() + } } } jsonAsMap.putAll(tempMap) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 90c19818b..1eade30fb 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -28,6 +28,7 @@ import org.opensearch.alerting.MonitorRunnerService.monitorCtx import org.opensearch.alerting.model.MonitorMetadata import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.client.Client +import org.opensearch.cluster.ClusterState import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue @@ -155,8 +156,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ fun traverseMappingsAndUpdate( node: MutableMap, currentPath: String, - processLeafFn: (String, MutableMap) -> Triple>, - flattenPaths: MutableList + processLeafFn: (String, String, MutableMap) -> Triple>, + flattenPaths: MutableMap> ) { // If node contains "properties" property then it is internal(non-leaf) node log.debug("Node in traverse: $node") @@ -170,10 +171,10 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ // If it has type property and type is not "nested" then this is a leaf if (nodeProps.containsKey(TYPE) && nodeProps[TYPE] != NESTED) { // At this point we know full path of node, so we add it to output array - flattenPaths.add(fullPath) + flattenPaths.put(fullPath, nodeProps) // Calls processLeafFn and gets old node name, new node name and new properties of node. // This is all information we need to update this node - val (oldName, newName, props) = processLeafFn(it.key, it.value as MutableMap) + val (oldName, newName, props) = processLeafFn(it.key, fullPath, it.value as MutableMap) newNodes.add(Triple(oldName, newName, props)) } else { // Internal(non-leaf) node - visit children @@ -213,8 +214,9 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ ) val updatedIndexName = indexName.replace("*", "_") val updatedProperties = mutableMapOf() - val allFlattenPaths = mutableListOf() + val allFlattenPaths = mutableSetOf>() var sourceIndexFieldLimit = 0L + val conflictingFields = getAllConflictingFields(clusterState, concreteIndices) concreteIndices.forEach { concreteIndexName -> if (clusterState.routingTable.hasIndex(concreteIndexName)) { @@ -227,7 +229,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ // Node processor function is used to process leaves of index mappings tree // val leafNodeProcessor = - fun(fieldName: String, props: MutableMap): Triple> { + fun(fieldName: String, fullPath: String, props: MutableMap): Triple> { val newProps = props.toMutableMap() if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) { val mappingsByType = monitor.dataSources.queryIndexMappingsByType @@ -237,23 +239,38 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ } } } - if (props.containsKey("path")) { - newProps["path"] = "${props["path"]}_${updatedIndexName}_$monitorId" + + return if (conflictingFields.contains(fullPath)) { + if (props.containsKey("path")) { + newProps["path"] = "${props["path"]}_${concreteIndexName}_$monitorId" + } + Triple(fieldName, "${fieldName}_${concreteIndexName}_$monitorId", newProps) + } else { + if (props.containsKey("path")) { + newProps["path"] = "${props["path"]}_${updatedIndexName}_$monitorId" + } + Triple(fieldName, "${fieldName}_${updatedIndexName}_$monitorId", newProps) } - return Triple(fieldName, "${fieldName}_${updatedIndexName}_$monitorId", newProps) } // Traverse and update index mappings here while extracting flatten field paths - val flattenPaths = mutableListOf() + val flattenPaths = mutableMapOf>() traverseMappingsAndUpdate(properties, "", leafNodeProcessor, flattenPaths) - allFlattenPaths.addAll(flattenPaths) + flattenPaths.keys.forEach { allFlattenPaths.add(Pair(it, concreteIndexName)) } // Updated mappings ready to be applied on queryIndex - updatedProperties.putAll(properties) + properties.forEach { + if (updatedProperties.containsKey(it.key) && updatedProperties[it.key] != it.value) { + val mergedField = mergeConflictingFields(updatedProperties[it.key] as Map, it.value as Map) + updatedProperties[it.key] = mergedField + } else { + updatedProperties[it.key] = it.value + } + } sourceIndexFieldLimit += checkMaxFieldLimit(concreteIndexName) } } } // Updates mappings of concrete queryIndex. This can rollover queryIndex if field mapping limit is reached. - var (updateMappingResponse, concreteQueryIndex) = updateQueryIndexMappings( + val (updateMappingResponse, concreteQueryIndex) = updateQueryIndexMappings( monitor, monitorMetadata, updatedIndexName, @@ -262,7 +279,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ ) if (updateMappingResponse.isAcknowledged) { - doIndexAllQueries(concreteQueryIndex, updatedIndexName, monitorId, queries, allFlattenPaths, refreshPolicy, indexTimeout) + doIndexAllQueries(concreteQueryIndex, updatedIndexName, monitorId, queries, allFlattenPaths, conflictingFields, refreshPolicy, indexTimeout) } } } @@ -272,18 +289,60 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ sourceIndex: String, monitorId: String, queries: List, - flattenPaths: MutableList, + flattenPaths: MutableSet>, + conflictingPaths: Set, refreshPolicy: RefreshPolicy, indexTimeout: TimeValue ) { val indexRequests = mutableListOf() + val conflictingPathToConcreteIndices = mutableMapOf>() + flattenPaths.forEach { fieldPath -> + if (conflictingPaths.contains(fieldPath.first)) { + if (conflictingPathToConcreteIndices.containsKey(fieldPath.first)) { + val concreteIndexSet = conflictingPathToConcreteIndices[fieldPath.first] + concreteIndexSet!!.add(fieldPath.second) + conflictingPathToConcreteIndices[fieldPath.first] = concreteIndexSet + } else { + val concreteIndexSet = mutableSetOf() + concreteIndexSet.add(fieldPath.second) + conflictingPathToConcreteIndices[fieldPath.first] = concreteIndexSet + } + } + } + + val newQueries = mutableListOf() queries.forEach { + val filteredConcreteIndices = mutableSetOf() + var query = it.query + conflictingPaths.forEach { conflictingPath -> + if (query.contains(conflictingPath)) { + query = query.replace("$conflictingPath:", "${conflictingPath}__$monitorId:") + filteredConcreteIndices.addAll(conflictingPathToConcreteIndices[conflictingPath]!!) + } + } + + if (filteredConcreteIndices.isNotEmpty()) { + filteredConcreteIndices.forEach { filteredConcreteIndex -> + val newQuery = it.copy( + id = "${it.id}_$filteredConcreteIndex", + query = query.replace("", filteredConcreteIndex) + ) + newQueries.add(newQuery) + } + } else { + newQueries.add(it.copy(id = "${it.id}_$sourceIndex")) + } + } + + newQueries.forEach { var query = it.query flattenPaths.forEach { fieldPath -> - query = query.replace("$fieldPath:", "${fieldPath}_${sourceIndex}_$monitorId:") + if (!conflictingPaths.contains(fieldPath.first)) { + query = query.replace("${fieldPath.first}:", "${fieldPath.first}_${sourceIndex}_$monitorId:") + } } val indexRequest = IndexRequest(concreteQueryIndex) - .id(it.id + "_${sourceIndex}_$monitorId") + .id(it.id + "_$monitorId") .source( mapOf( "query" to mapOf("query_string" to mapOf("query" to query)), @@ -390,6 +449,62 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ return Pair(updateMappingResponse, targetQueryIndex) } + private fun mergeConflictingFields(oldField: Map, newField: Map): Map { + val mergedField = mutableMapOf() + oldField.entries.forEach { + if (newField.containsKey(it.key)) { + if (it.value is Map<*, *> && newField[it.key] is Map<*, *>) { + mergedField[it.key] = + mergeConflictingFields(it.value as Map, newField[it.key] as Map) + } else { + mergedField[it.key] = it.value + } + } else { + mergedField[it.key] = it.value + } + } + + newField.entries.forEach { + if (!oldField.containsKey(it.key)) { + mergedField[it.key] = it.value + } + } + return mergedField + } + + fun getAllConflictingFields(clusterState: ClusterState, concreteIndices: List): Set { + val conflictingFields = mutableSetOf() + val allFlattenPaths = mutableMapOf>() + concreteIndices.forEach { concreteIndexName -> + if (clusterState.routingTable.hasIndex(concreteIndexName)) { + val indexMetadata = clusterState.metadata.index(concreteIndexName) + if (indexMetadata.mapping()?.sourceAsMap?.get("properties") != null) { + val properties = ( + (indexMetadata.mapping()?.sourceAsMap?.get("properties")) + as MutableMap + ) + // Node processor function is used to process leaves of index mappings tree + // + val leafNodeProcessor = + fun(fieldName: String, _: String, props: MutableMap): Triple> { + return Triple(fieldName, fieldName, props) + } + // Traverse and update index mappings here while extracting flatten field paths + val flattenPaths = mutableMapOf>() + traverseMappingsAndUpdate(properties, "", leafNodeProcessor, flattenPaths) + + flattenPaths.forEach { + if (allFlattenPaths.containsKey(it.key) && allFlattenPaths[it.key]!! != it.value) { + conflictingFields.add(it.key) + } + allFlattenPaths.putIfAbsent(it.key, it.value) + } + } + } + } + return conflictingFields + } + /** * Adjusts max field limit index setting for query index if source index has higher limit. * This will prevent max field limit exception, when source index has more fields then query index limit diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index f6c1a333c..49a7b8428 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -500,6 +500,259 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { assertEquals("Findings saved for test monitor expected 14, 51 and 10", 3, foundFindings.size) } + fun `test execute monitor with indices having fields with same name but different data types`() { + val testIndex = createTestIndex( + "test1", + """"properties": { + "source.device.port": { "type": "long" }, + "source.device.hwd.id": { "type": "long" }, + "nested_field": { + "type": "nested", + "properties": { + "test1": { + "type": "keyword" + } + } + }, + "my_join_field": { + "type": "join", + "relations": { + "question": "answer" + } + }, + "test_field" : { "type" : "integer" } + } + """.trimIndent() + ) + var testDoc = """{ + "source" : { "device": {"port" : 12345 } }, + "nested_field": { "test1": "some text" }, + "test_field": 12345 + }""" + + val docQuery1 = DocLevelQuery( + query = "(source.device.port:12345 AND test_field:12345) OR source.device.hwd.id:12345", + name = "4" + ) + val docQuery2 = DocLevelQuery( + query = "(source.device.port:\"12345\" AND test_field:\"12345\") OR source.device.hwd.id:\"12345\"", + name = "5" + ) + val docLevelInput = DocLevelMonitorInput("description", listOf("test*"), listOf(docQuery1, docQuery2)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + indexDoc(testIndex, "1", testDoc) + executeMonitor(monitor.id) + + var alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 1, alerts.size) + + var findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 1, findings.size) + + // clear previous findings and alerts + deleteIndex(ALL_FINDING_INDEX_PATTERN) + deleteIndex(ALL_ALERT_INDEX_PATTERN) + + indexDoc(testIndex, "2", testDoc) + val testIndex2 = createTestIndex("test2") + testDoc = """{ + "source" : { "device": {"port" : "12345" } }, + "nested_field": { "test1": "some text" }, + "test_field": "12345" + }""" + indexDoc(testIndex2, "1", testDoc) + executeMonitor(monitor.id) + + alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 2, alerts.size) + + findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 2, findings.size) + } + + fun `test execute monitor with indices having fields with same name but different field mappings`() { + val testIndex = createTestIndex( + "test1", + """"properties": { + "source": { + "properties": { + "id": { + "type":"text", + "analyzer":"whitespace" + } + } + }, + "test_field" : { + "type":"text", + "analyzer":"whitespace" + } + } + """.trimIndent() + ) + + val testIndex2 = createTestIndex( + "test2", + """"properties": { + "source": { + "properties": { + "id": { + "type":"text" + } + } + }, + "test_field" : { + "type":"text" + } + } + """.trimIndent() + ) + val testDoc = """{ + "source" : {"id" : "12345" }, + "nested_field": { "test1": "some text" }, + "test_field": "12345" + }""" + + val docQuery = DocLevelQuery( + query = "test_field:\"12345\" AND source.id:\"12345\"", + name = "5" + ) + val docLevelInput = DocLevelMonitorInput("description", listOf("test*"), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex2, "1", testDoc) + + executeMonitor(monitor.id) + + val alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 2, alerts.size) + + val findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 2, findings.size) + } + + fun `test execute monitor with indices having fields with same name but different field mappings in multiple indices`() { + val testIndex = createTestIndex( + "test1", + """"properties": { + "source": { + "properties": { + "device": { + "properties": { + "hwd": { + "properties": { + "id": { + "type":"text", + "analyzer":"whitespace" + } + } + } + } + } + } + }, + "test_field" : { + "type":"text" + } + } + """.trimIndent() + ) + + val testIndex2 = createTestIndex( + "test2", + """"properties": { + "test_field" : { + "type":"keyword" + } + } + """.trimIndent() + ) + + val testIndex4 = createTestIndex( + "test4", + """"properties": { + "source": { + "properties": { + "device": { + "properties": { + "hwd": { + "properties": { + "id": { + "type":"text" + } + } + } + } + } + } + }, + "test_field" : { + "type":"text" + } + } + """.trimIndent() + ) + + val testDoc1 = """{ + "source" : {"device" : {"hwd" : {"id" : "12345"}} }, + "nested_field": { "test1": "some text" } + }""" + val testDoc2 = """{ + "nested_field": { "test1": "some text" }, + "test_field": "12345" + }""" + + val docQuery1 = DocLevelQuery( + query = "test_field:\"12345\"", + name = "4" + ) + val docQuery2 = DocLevelQuery( + query = "source.device.hwd.id:\"12345\"", + name = "5" + ) + + val docLevelInput = DocLevelMonitorInput("description", listOf("test*"), listOf(docQuery1, docQuery2)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + indexDoc(testIndex4, "1", testDoc1) + indexDoc(testIndex2, "1", testDoc2) + indexDoc(testIndex, "1", testDoc1) + indexDoc(testIndex, "2", testDoc2) + + executeMonitor(monitor.id) + + val alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 4, alerts.size) + + val findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 4, findings.size) + + val request = """{ + "size": 0, + "query": { + "match_all": {} + } + }""" + val httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + + val searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.totalHits?.let { assertEquals(5L, it.value) } + } + fun `test no of queries generated for document-level monitor based on wildcard indexes`() { val testIndex = createTestIndex("test1") val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))