Skip to content

Commit

Permalink
Multiple indices support in DocLevelMonitorInput (opensearch-project#784
Browse files Browse the repository at this point in the history
) (opensearch-project#808)

Signed-off-by: Petar Dzepina <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] authored Feb 21, 2023
1 parent ba54edd commit 06b427c
Show file tree
Hide file tree
Showing 10 changed files with 304 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerClusterService(clusterService)
.registerClient(client)
.registerNamedXContentRegistry(xContentRegistry)
.registerindexNameExpressionResolver(indexNameExpressionResolver)
.registerScriptService(scriptService)
.registerSettings(settings)
.registerThreadPool(threadPool)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ package org.opensearch.alerting

import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.admin.indices.get.GetIndexRequest
import org.opensearch.action.admin.indices.get.GetIndexResponse
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.index.IndexResponse
import org.opensearch.action.search.SearchAction
Expand All @@ -23,9 +21,11 @@ import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.bytes.BytesReference
Expand Down Expand Up @@ -92,7 +92,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
)

val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput
val index = docLevelMonitorInput.indices[0]

val queries: List<DocLevelQuery> = docLevelMonitorInput.queries

val lastRunContext = if (monitorMetadata.lastRunContext.isNullOrEmpty()) mutableMapOf()
Expand All @@ -105,6 +105,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val docsToQueries = mutableMapOf<String, MutableList<String>>()

try {
// Resolve all passed indices to concrete indices
val indices = IndexUtils.resolveAllIndices(
docLevelMonitorInput.indices,
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)

monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex(monitor.dataSources)
monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries(
monitor = monitor,
Expand All @@ -113,12 +120,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
indexTimeout = monitorCtx.indexTimeout!!
)

val getIndexRequest = GetIndexRequest().indices(index)
val getIndexResponse: GetIndexResponse = monitorCtx.client!!.suspendUntil {
monitorCtx.client!!.admin().indices().getIndex(getIndexRequest, it)
}
val indices = getIndexResponse.indices()

// cleanup old indices that are not monitored anymore from the same monitor
for (ind in updatedLastRunContext.keys) {
if (!indices.contains(ind)) {
Expand All @@ -129,8 +130,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
indices.forEach { indexName ->
// Prepare lastRunContext for each index
val indexLastRunContext = lastRunContext.getOrPut(indexName) {
val indexCreatedRecently = createdRecently(monitor, indexName, periodStart, periodEnd, getIndexResponse)
MonitorMetadataService.createRunContextForIndex(indexName, indexCreatedRecently)
val isIndexCreatedRecently = createdRecently(
monitor,
periodStart,
periodEnd,
monitorCtx.clusterService!!.state().metadata.index(indexName)
)
MonitorMetadataService.createRunContextForIndex(indexName, isIndexCreatedRecently)
}

// Prepare updatedLastRunContext for each index
Expand Down Expand Up @@ -180,7 +186,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults)))
} catch (e: Exception) {
logger.error("Failed to start Document-level-monitor $index. Error: ${e.message}", e)
logger.error("Failed to start Document-level-monitor ${monitor.name}. Error: ${e.message}", e)
val alertingException = AlertingException.wrap(e)
monitorResult = monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException))
}
Expand Down Expand Up @@ -379,9 +385,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
throw IOException("Invalid input with document-level-monitor.")
}

val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput
if (docLevelMonitorInput.indices.size > 1) {
throw IOException("Only one index is supported with document-level-monitor.")
if ((monitor.inputs[0] as DocLevelMonitorInput).indices.isEmpty()) {
throw IllegalArgumentException("DocLevelMonitorInput has no indices")
}
}

Expand All @@ -408,13 +413,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
// new index is monitored from the beginning of that index
private fun createdRecently(
monitor: Monitor,
index: String,
periodStart: Instant,
periodEnd: Instant,
getIndexResponse: GetIndexResponse
indexMetadata: IndexMetadata
): Boolean {
val lastExecutionTime = if (periodStart == periodEnd) monitor.lastUpdateTime else periodStart
return getIndexResponse.settings.get(index).getAsLong("index.creation_date", 0L) > lastExecutionTime.toEpochMilli()
val indexCreationDate = indexMetadata.settings.get("index.creation_date")?.toLong() ?: 0L
return indexCreationDate > lastExecutionTime.toEpochMilli()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.opensearch.alerting.settings.DestinationSettings
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
import org.opensearch.alerting.util.DocLevelMonitorQueries
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
Expand All @@ -25,6 +26,7 @@ data class MonitorRunnerExecutionContext(
var clusterService: ClusterService? = null,
var client: Client? = null,
var xContentRegistry: NamedXContentRegistry? = null,
var indexNameExpressionResolver: IndexNameExpressionResolver? = null,
var scriptService: ScriptService? = null,
var settings: Settings? = null,
var threadPool: ThreadPool? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.opensearch.alerting.settings.DestinationSettings.Companion.loadDestin
import org.opensearch.alerting.util.DocLevelMonitorQueries
import org.opensearch.alerting.util.isDocLevelMonitor
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.component.AbstractLifecycleComponent
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -71,6 +72,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
return this
}

fun registerindexNameExpressionResolver(indexNameExpressionResolver: IndexNameExpressionResolver): MonitorRunnerService {
this.monitorCtx.indexNameExpressionResolver = indexNameExpressionResolver
return this
}

fun registerScriptService(scriptService: ScriptService): MonitorRunnerService {
this.monitorCtx.scriptService = scriptService
return this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.alerting.alerts

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.ResourceAlreadyExistsException
import org.opensearch.action.ActionListener
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
Expand Down Expand Up @@ -36,6 +37,7 @@ import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTO
import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTORY_RETENTION_PERIOD
import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTORY_ROLLOVER_PERIOD
import org.opensearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEOUT
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.client.Client
import org.opensearch.cluster.ClusterChangedEvent
Expand Down Expand Up @@ -357,8 +359,12 @@ class AlertIndices(
return try {
val createIndexResponse: CreateIndexResponse = client.admin().indices().suspendUntil { create(request, it) }
createIndexResponse.isAcknowledged
} catch (e: ResourceAlreadyExistsException) {
true
} catch (t: Exception) {
if (ExceptionsHelper.unwrapCause(t) is ResourceAlreadyExistsException) {
true
} else {
throw AlertingException.wrap(t)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchException
import org.opensearch.OpenSearchSecurityException
import org.opensearch.OpenSearchStatusException
Expand Down Expand Up @@ -303,7 +304,7 @@ class TransportIndexMonitorAction @Inject constructor(
}
override fun onFailure(t: Exception) {
// https://github.com/opensearch-project/alerting/issues/646
if (t is ResourceAlreadyExistsException && t.message?.contains("already exists") == true) {
if (ExceptionsHelper.unwrapCause(t) is ResourceAlreadyExistsException) {
scope.launch {
// Wait for the yellow status
val request = ClusterHealthRequest()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import org.opensearch.action.admin.indices.alias.Alias
import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.admin.indices.create.CreateIndexResponse
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.action.admin.indices.get.GetIndexRequest
import org.opensearch.action.admin.indices.get.GetIndexResponse
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
import org.opensearch.action.admin.indices.rollover.RolloverRequest
import org.opensearch.action.admin.indices.rollover.RolloverResponse
Expand All @@ -26,6 +24,7 @@ import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.support.WriteRequest.RefreshPolicy
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.alerting.MonitorRunnerService.monitorCtx
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.client.Client
Expand Down Expand Up @@ -86,8 +85,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
return try {
val createIndexResponse: CreateIndexResponse = client.suspendUntil { client.admin().indices().create(indexRequest, it) }
createIndexResponse.isAcknowledged
} catch (t: ResourceAlreadyExistsException) {
if (t.message?.contains("already exists") == true) {
} catch (t: Exception) {
if (ExceptionsHelper.unwrapCause(t) is ResourceAlreadyExistsException) {
true
} else {
throw t
Expand All @@ -107,9 +106,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
admin().indices().delete(DeleteIndexRequest(dataSources.queryIndex), it)
}
if (!acknowledgedResponse.isAcknowledged) {
val errorMessage = "Deletion of old queryIndex [${dataSources.queryIndex}] index is not acknowledged!"
log.error(errorMessage)
throw AlertingException.wrap(OpenSearchStatusException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR))
log.warn("Deletion of old queryIndex [${dataSources.queryIndex}] index is not acknowledged!")
}
}
val alias = dataSources.queryIndex
Expand All @@ -125,8 +122,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
return try {
val createIndexResponse: CreateIndexResponse = client.suspendUntil { client.admin().indices().create(indexRequest, it) }
createIndexResponse.isAcknowledged
} catch (t: ResourceAlreadyExistsException) {
if (t.message?.contains("already exists") == true) {
} catch (t: Exception) {
if (ExceptionsHelper.unwrapCause(t) is ResourceAlreadyExistsException) {
true
} else {
throw t
Expand Down Expand Up @@ -206,16 +203,15 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
indexTimeout: TimeValue
) {
val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput
val index = docLevelMonitorInput.indices[0]
val queries: List<DocLevelQuery> = docLevelMonitorInput.queries

val clusterState = clusterService.state()
val indices = IndexUtils.resolveAllIndices(
docLevelMonitorInput.indices,
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)

val getIndexRequest = GetIndexRequest().indices(index)
val getIndexResponse: GetIndexResponse = client.suspendUntil {
client.admin().indices().getIndex(getIndexRequest, it)
}
val indices = getIndexResponse.indices()
val clusterState = clusterService.state()

// Run through each backing index and apply appropriate mappings to query index
indices?.forEach { indexName ->
Expand Down Expand Up @@ -387,7 +383,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ

/**
* Adjusts max field limit index setting for query index if source index has higher limit.
* This will prevent max field limit exception, when applying mappings to query index
* This will prevent max field limit exception, when source index has more fields then query index limit
*/
private suspend fun checkAndAdjustMaxFieldLimit(sourceIndex: String, concreteQueryIndex: String) {
val getSettingsResponse: GetSettingsResponse = client.suspendUntil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,21 @@ package org.opensearch.alerting.util

import org.opensearch.action.ActionListener
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.client.IndicesAdminClient
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.util.IndexUtils
import org.opensearch.index.IndexNotFoundException

class IndexUtils {

Expand Down Expand Up @@ -130,5 +134,26 @@ class IndexUtils {
}
}
}

@JvmStatic
fun resolveAllIndices(indices: List<String>, clusterService: ClusterService, resolver: IndexNameExpressionResolver): List<String> {
val result = mutableListOf<String>()

indices.forEach { index ->
val concreteIndices = resolver.concreteIndexNames(
clusterService.state(),
IndicesOptions.lenientExpand(),
true,
index
)
result.addAll(concreteIndices)
}

if (result.size == 0) {
throw IndexNotFoundException(indices[0])
}

return result
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,8 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {

val findings = searchFindings(monitor)
assertEquals("Findings saved for test monitor", 2, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5"))
val foundFindings = findings.filter { it.relatedDocIds.contains("1") || it.relatedDocIds.contains("5") }
assertEquals("Didn't find findings for docs 1 and 5", 2, foundFindings.size)
}

fun `test execute monitor with new index added after first execution that generates alerts and findings`() {
Expand Down Expand Up @@ -410,14 +410,9 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {

var findings = searchFindings(monitor)
assertEquals("Findings saved for test monitor", 2, findings.size)
assertTrue(
"Findings saved for test monitor expected 1 instead of ${findings[0].relatedDocIds}",
findings[0].relatedDocIds.contains("1")
)
assertTrue(
"Findings saved for test monitor expected 51 instead of ${findings[1].relatedDocIds}",
findings[1].relatedDocIds.contains("5")
)

var foundFindings = findings.filter { it.relatedDocIds.contains("1") || it.relatedDocIds.contains("5") }
assertEquals("Findings saved for test monitor expected 1 and 5", 2, foundFindings.size)

// clear previous findings and alerts
deleteIndex(ALL_FINDING_INDEX_PATTERN)
Expand Down Expand Up @@ -445,18 +440,11 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {

findings = searchFindings(monitor)
assertEquals("Findings saved for test monitor", 3, findings.size)
assertTrue(
"Findings saved for test monitor expected 14 instead of ${findings[0].relatedDocIds}",
findings[0].relatedDocIds.contains("14")
)
assertTrue(
"Findings saved for test monitor expected 51 instead of ${findings[1].relatedDocIds}",
findings[1].relatedDocIds.contains("51")
)
assertTrue(
"Findings saved for test monitor expected 10 instead of ${findings[2].relatedDocIds}",
findings[2].relatedDocIds.contains("10")
)

foundFindings = findings.filter {
it.relatedDocIds.contains("14") || it.relatedDocIds.contains("51") || it.relatedDocIds.contains("10")
}
assertEquals("Findings saved for test monitor expected 14, 51 and 10", 3, foundFindings.size)
}

fun `test document-level monitor when alias only has write index with 0 docs`() {
Expand Down
Loading

0 comments on commit 06b427c

Please sign in to comment.