Skip to content

Commit

Permalink
optimize doc-level monitor workflow for index patterns (opensearch-pr…
Browse files Browse the repository at this point in the history
…oject#1122)

Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 authored and goyamegh committed Mar 15, 2024
1 parent d5413af commit 65c5f51
Show file tree
Hide file tree
Showing 4 changed files with 873 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import org.opensearch.commons.alerting.model.action.PerAlertActionScope
import org.opensearch.commons.alerting.util.string
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.index.IndexNotFoundException
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.Operator
import org.opensearch.index.query.QueryBuilders
Expand Down Expand Up @@ -114,11 +115,15 @@ object DocumentLevelMonitorRunner : MonitorRunner() {

try {
// Resolve all passed indices to concrete indices
val indices = IndexUtils.resolveAllIndices(
val concreteIndices = IndexUtils.resolveAllIndices(
docLevelMonitorInput.indices,
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)
if (concreteIndices.isEmpty()) {
logger.error("indices not found-${docLevelMonitorInput.indices.joinToString(",")}")
throw IndexNotFoundException(docLevelMonitorInput.indices.joinToString(","))
}

monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex(monitor.dataSources)
monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries(
Expand All @@ -130,7 +135,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {

// cleanup old indices that are not monitored anymore from the same monitor
for (ind in updatedLastRunContext.keys) {
if (!indices.contains(ind)) {
if (!concreteIndices.contains(ind)) {
updatedLastRunContext.remove(ind)
}
}
Expand All @@ -144,50 +149,77 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
periodEnd,
monitorCtx.clusterService!!.state().metadata.index(indexName)
)
MonitorMetadataService.createRunContextForIndex(indexName, isIndexCreatedRecently)
}

// Prepare updatedLastRunContext for each index
val indexUpdatedRunContext = updateLastRunContext(
indexLastRunContext.toMutableMap(),
monitorCtx,
indexName
) as MutableMap<String, Any>
updatedLastRunContext[indexName] = indexUpdatedRunContext

val count: Int = indexLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
val shard = i.toString()

// update lastRunContext if its a temp monitor as we only want to view the last bit of data then
// TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data
if (isTempMonitor) {
indexLastRunContext[shard] = max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 10)
}
}

// Prepare DocumentExecutionContext for each index
val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext)
val updatedIndexName = indexName.replace("*", "_")
val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields(
monitorCtx.clusterService!!.state(),
concreteIndices
)

val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, indexName)
concreteIndices.forEach { concreteIndexName ->
// Prepare lastRunContext for each index
val indexLastRunContext = lastRunContext.getOrPut(concreteIndexName) {
val isIndexCreatedRecently = createdRecently(
monitor,
periodStart,
periodEnd,
monitorCtx.clusterService!!.state().metadata.index(concreteIndexName)
)
MonitorMetadataService.createRunContextForIndex(concreteIndexName, isIndexCreatedRecently)
}

if (matchingDocs.isNotEmpty()) {
val matchedQueriesForDocs = getMatchedQueries(
// Prepare updatedLastRunContext for each index
val indexUpdatedRunContext = updateLastRunContext(
indexLastRunContext.toMutableMap(),
monitorCtx,
matchingDocs.map { it.second },
monitor,
monitorMetadata,
indexName
)
concreteIndexName
) as MutableMap<String, Any>
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext

val count: Int = indexLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
val shard = i.toString()

// update lastRunContext if its a temp monitor as we only want to view the last bit of data then
// TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data
if (isTempMonitor) {
indexLastRunContext[shard] = max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 10)
}
}

// Prepare DocumentExecutionContext for each index
val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext)

matchedQueriesForDocs.forEach { hit ->
val id = hit.id.replace("_${indexName}_${monitor.id}", "")
val matchingDocs = getMatchingDocs(
monitor,
monitorCtx,
docExecutionContext,
indexName,
updatedIndexName,
concreteIndexName,
conflictingFields.toList()
)

val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() }
docIndices.forEach { idx ->
val docIndex = "${matchingDocs[idx].first}|$indexName"
inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex)
docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id)
if (matchingDocs.isNotEmpty()) {
val matchedQueriesForDocs = getMatchedQueries(
monitorCtx,
matchingDocs.map { it.second },
monitor,
monitorMetadata,
updatedIndexName,
concreteIndexName
)

matchedQueriesForDocs.forEach { hit ->
val id = hit.id
.replace("_${updatedIndexName}_${monitor.id}", "")
.replace("_${concreteIndexName}_${monitor.id}", "")

val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() }
docIndices.forEach { idx ->
val docIndex = "${matchingDocs[idx].first}|$concreteIndexName"
inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex)
docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id)
}
}
}
}
Expand Down Expand Up @@ -498,8 +530,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
if (response.status() !== RestStatus.OK) {
throw IOException("Failed to get max seq no for shard: $shard")
}
if (response.hits.hits.isEmpty())
if (response.hits.hits.isEmpty()) {
return -1L
}

return response.hits.hits[0].seqNo
}
Expand All @@ -513,7 +546,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
docExecutionCtx: DocumentExecutionContext,
index: String
index: String,
concreteIndex: String,
conflictingFields: List<String>
): List<Pair<String, BytesReference>> {
val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int
val matchingDocs = mutableListOf<Pair<String, BytesReference>>()
Expand All @@ -525,15 +560,15 @@ object DocumentLevelMonitorRunner : MonitorRunner() {

val hits: SearchHits = searchShard(
monitorCtx,
index,
concreteIndex,
shard,
prevSeqNo,
maxSeqNo,
null
)

if (hits.hits.isNotEmpty()) {
matchingDocs.addAll(getAllDocs(hits, index, monitor.id))
matchingDocs.addAll(getAllDocs(hits, index, concreteIndex, monitor.id, conflictingFields))
}
} catch (e: Exception) {
logger.warn("Failed to run for shard $shard. Error: ${e.message}")
Expand Down Expand Up @@ -581,7 +616,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
docs: List<BytesReference>,
monitor: Monitor,
monitorMetadata: MonitorMetadata,
index: String
index: String,
concreteIndex: String
): SearchHits {
val boolQueryBuilder = BoolQueryBuilder().must(QueryBuilders.matchQuery("index", index).operator(Operator.AND))

Expand All @@ -594,7 +630,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val queryIndex = monitorMetadata.sourceToQueryIndexMapping[index + monitor.id]
if (queryIndex == null) {
val message = "Failed to resolve concrete queryIndex from sourceIndex during monitor execution!" +
" sourceIndex:$index queryIndex:${monitor.dataSources.queryIndex}"
" sourceIndex:$concreteIndex queryIndex:${monitor.dataSources.queryIndex}"
logger.error(message)
throw AlertingException.wrap(
OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR)
Expand Down Expand Up @@ -622,11 +658,23 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
return response.hits
}

private fun getAllDocs(hits: SearchHits, index: String, monitorId: String): List<Pair<String, BytesReference>> {
private fun getAllDocs(
hits: SearchHits,
index: String,
concreteIndex: String,
monitorId: String,
conflictingFields: List<String>
): List<Pair<String, BytesReference>> {
return hits.map { hit ->
val sourceMap = hit.sourceAsMap

transformDocumentFieldNames(sourceMap, "_${index}_$monitorId")
transformDocumentFieldNames(
sourceMap,
conflictingFields,
"_${index}_$monitorId",
"_${concreteIndex}_$monitorId",
""
)

var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap)

Expand All @@ -639,7 +687,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}

/**
* Traverses document fields in leaves recursively and appends [fieldNameSuffix] to field names.
* Traverses document fields in leaves recursively and appends [fieldNameSuffixIndex] to field names with same names
* but different mappings & [fieldNameSuffixPattern] to field names which have unique names.
*
* Example for index name is my_log_index and Monitor ID is TReewWdsf2gdJFV:
* { {
Expand All @@ -653,17 +702,36 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
*/
private fun transformDocumentFieldNames(
jsonAsMap: MutableMap<String, Any>,
fieldNameSuffix: String
conflictingFields: List<String>,
fieldNameSuffixPattern: String,
fieldNameSuffixIndex: String,
fieldNamePrefix: String
) {
val tempMap = mutableMapOf<String, Any>()
val it: MutableIterator<Map.Entry<String, Any>> = jsonAsMap.entries.iterator()
while (it.hasNext()) {
val entry = it.next()
if (entry.value is Map<*, *>) {
transformDocumentFieldNames(entry.value as MutableMap<String, Any>, fieldNameSuffix)
} else if (entry.key.endsWith(fieldNameSuffix) == false) {
tempMap["${entry.key}$fieldNameSuffix"] = entry.value
it.remove()
transformDocumentFieldNames(
entry.value as MutableMap<String, Any>,
conflictingFields,
fieldNameSuffixPattern,
fieldNameSuffixIndex,
if (fieldNamePrefix == "") entry.key else "$fieldNamePrefix.${entry.key}"
)
} else if (!entry.key.endsWith(fieldNameSuffixPattern) && !entry.key.endsWith(fieldNameSuffixIndex)) {
var alreadyReplaced = false
conflictingFields.forEach { conflictingField ->
if (conflictingField == "$fieldNamePrefix.${entry.key}" || (fieldNamePrefix == "" && conflictingField == entry.key)) {
tempMap["${entry.key}$fieldNameSuffixIndex"] = entry.value
it.remove()
alreadyReplaced = true
}
}
if (!alreadyReplaced) {
tempMap["${entry.key}$fieldNameSuffixPattern"] = entry.value
it.remove()
}
}
}
jsonAsMap.putAll(tempMap)
Expand Down
Loading

0 comments on commit 65c5f51

Please sign in to comment.