Skip to content

Commit

Permalink
Backport 2.x Adds chained alerts (#976) (#1007) (#1008)
Browse files Browse the repository at this point in the history
* Adds chained alerts (#976)

* chained alert triggers

Signed-off-by: Surya Sashank Nistala <[email protected]>

* converge all single node test cases

Signed-off-by: Surya Sashank Nistala <[email protected]>

* add license headers to files

Signed-off-by: Surya Sashank Nistala <[email protected]>

* fix workflow not found issue

Signed-off-by: Surya Sashank Nistala <[email protected]>

* added audit state alerts for doc level monitors

Signed-off-by: Surya Sashank Nistala <[email protected]>

* add audit alerts in query level monitor

Signed-off-by: Surya Sashank Nistala <[email protected]>

* temp: upload custom built common utils jar

Signed-off-by: Surya Sashank Nistala <[email protected]>

* fix get monitor response parsing to include associated_workflows

Signed-off-by: Surya Sashank Nistala <[email protected]>

* add query level monitor audit alerts tests

Signed-off-by: Surya Sashank Nistala <[email protected]>

* add audit alerts in bucket level monitor

Signed-off-by: Surya Sashank Nistala <[email protected]>

* fix workflow tests

Signed-off-by: Surya Sashank Nistala <[email protected]>

* alerting

Signed-off-by: Surya Sashank Nistala <[email protected]>

* verify bucket monitor audit alerts and chained alerts in workflow

Signed-off-by: Surya Sashank Nistala <[email protected]>

* make execution id mandatory

Signed-off-by: Surya Sashank Nistala <[email protected]>

* revert mapping update in run job method

Signed-off-by: Surya Sashank Nistala <[email protected]>

* minor fixes in chained alert trigger result

Signed-off-by: Surya Sashank Nistala <[email protected]>

* fix chained alert triggers tests

Signed-off-by: Surya Sashank Nistala <[email protected]>

* fix acknowledge chained alert bug

Signed-off-by: Surya Sashank Nistala <[email protected]>

* revert get alerts change

Signed-off-by: Surya Sashank Nistala <[email protected]>

* refactor and remove transport actions being invoked in other transport actions

Signed-off-by: Surya Sashank Nistala <[email protected]>

* add license header

Signed-off-by: Surya Sashank Nistala <[email protected]>

* scheduled job mapping schema

Signed-off-by: Surya Sashank Nistala <[email protected]>

* fix ktlint and revert gradle dev set up chanegs

Signed-off-by: Surya Sashank Nistala <[email protected]>

* fix post delete method and refactor alert mover to add class level logger

Signed-off-by: Surya Sashank Nistala <[email protected]>

* fix test - pass workflow id in get alerts

Signed-off-by: Surya Sashank Nistala <[email protected]>

* remove monitor empty filter in get alerts api as there is dedicated api for fetching chained alerts - workflow alerts api

Signed-off-by: Surya Sashank Nistala <[email protected]>

* fix check for workflow id is empty or null in get alerts action

Signed-off-by: Surya Sashank Nistala <[email protected]>

* fix alert mover method delegate monitor parsing logic

Signed-off-by: Surya Sashank Nistala <[email protected]>

* remove common utils jar from repo

Signed-off-by: Surya Sashank Nistala <[email protected]>

---------

Signed-off-by: Surya Sashank Nistala <[email protected]>

* fix imports

Signed-off-by: Surya Sashank Nistala <[email protected]>

---------

Signed-off-by: Surya Sashank Nistala <[email protected]>
(cherry picked from commit d2d03c6)

Co-authored-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] and eirsep authored Jul 12, 2023
1 parent 8d019aa commit 57b2ebf
Show file tree
Hide file tree
Showing 50 changed files with 4,172 additions and 786 deletions.
119 changes: 88 additions & 31 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.opensearchapi.firstFailureOrNull
import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.ChainedAlertTriggerExecutionContext
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.util.IndexUtils
Expand All @@ -47,6 +48,7 @@ import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.NoOpTrigger
import org.opensearch.commons.alerting.model.Trigger
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.alerting.model.action.AlertCategory
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.XContentParser
Expand Down Expand Up @@ -81,10 +83,11 @@ class AlertService(

private val logger = LogManager.getLogger(AlertService::class.java)

suspend fun loadCurrentAlertsForQueryLevelMonitor(monitor: Monitor): Map<Trigger, Alert?> {
suspend fun loadCurrentAlertsForQueryLevelMonitor(monitor: Monitor, workflowRunContext: WorkflowRunContext?): Map<Trigger, Alert?> {
val searchAlertsResponse: SearchResponse = searchAlerts(
monitor = monitor,
size = monitor.triggers.size * 2 // We expect there to be only a single in-progress alert so fetch 2 to check
size = monitor.triggers.size * 2, // We expect there to be only a single in-progress alert so fetch 2 to check
workflowRunContext
)

val foundAlerts = searchAlertsResponse.hits.map { Alert.parse(contentParser(it.sourceRef), it.id, it.version) }
Expand All @@ -100,11 +103,15 @@ class AlertService(
}
}

suspend fun loadCurrentAlertsForBucketLevelMonitor(monitor: Monitor): Map<Trigger, MutableMap<String, Alert>> {
suspend fun loadCurrentAlertsForBucketLevelMonitor(
monitor: Monitor,
workflowRunContext: WorkflowRunContext?,
): Map<Trigger, MutableMap<String, Alert>> {
val searchAlertsResponse: SearchResponse = searchAlerts(
monitor = monitor,
// TODO: This should be limited based on a circuit breaker that limits Alerts
size = MAX_BUCKET_LEVEL_MONITOR_ALERT_SEARCH_COUNT
size = MAX_BUCKET_LEVEL_MONITOR_ALERT_SEARCH_COUNT,
workflowRunContext = workflowRunContext
)

val foundAlerts = searchAlertsResponse.hits.map { Alert.parse(contentParser(it.sourceRef), it.id, it.version) }
Expand All @@ -123,7 +130,9 @@ class AlertService(
fun composeQueryLevelAlert(
ctx: QueryLevelTriggerExecutionContext,
result: QueryLevelTriggerRunResult,
alertError: AlertError?
alertError: AlertError?,
executionId: String,
workflorwRunContext: WorkflowRunContext?
): Alert? {
val currentTime = Instant.now()
val currentAlert = ctx.alert
Expand Down Expand Up @@ -181,15 +190,19 @@ class AlertService(
errorMessage = alertError?.message,
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion
schemaVersion = IndexUtils.alertIndexSchemaVersion,
)
} else {
val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR
val alertState = if (workflorwRunContext?.auditDelegateMonitorAlerts == true) {
Alert.State.AUDIT
} else if (alertError == null) Alert.State.ACTIVE
else Alert.State.ERROR
Alert(
monitor = ctx.monitor, trigger = ctx.trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion
schemaVersion = IndexUtils.alertIndexSchemaVersion, executionId = executionId,
workflowId = workflorwRunContext?.workflowId ?: ""
)
}
}
Expand All @@ -199,15 +212,24 @@ class AlertService(
findings: List<String>,
relatedDocIds: List<String>,
ctx: DocumentLevelTriggerExecutionContext,
alertError: AlertError?
alertError: AlertError?,
executionId: String,
workflorwRunContext: WorkflowRunContext?
): Alert {
val currentTime = Instant.now()

val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR
val alertState = if (workflorwRunContext?.auditDelegateMonitorAlerts == true) {
Alert.State.AUDIT
} else if (alertError == null) {
Alert.State.ACTIVE
} else {
Alert.State.ERROR
}
return Alert(
id = UUID.randomUUID().toString(), monitor = ctx.monitor, trigger = ctx.trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
schemaVersion = IndexUtils.alertIndexSchemaVersion, findingIds = findings, relatedDocIds = relatedDocIds
schemaVersion = IndexUtils.alertIndexSchemaVersion, findingIds = findings, relatedDocIds = relatedDocIds,
executionId = executionId, workflowId = workflorwRunContext?.workflowId ?: ""
)
}

Expand All @@ -219,12 +241,33 @@ class AlertService(
workflowRunContext: WorkflowRunContext?
): Alert {
val currentTime = Instant.now()
val alertState = if (workflowRunContext?.auditDelegateMonitorAlerts == true) {
Alert.State.AUDIT
} else {
Alert.State.ERROR
}
return Alert(
id = id, monitor = monitor, trigger = NoOpTrigger(), startTime = currentTime,
lastNotificationTime = currentTime, state = Alert.State.ERROR, errorMessage = alertError?.message,
schemaVersion = IndexUtils.alertIndexSchemaVersion,
workflowId = workflowRunContext?.workflowId ?: "",
executionId = executionId ?: ""
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
schemaVersion = IndexUtils.alertIndexSchemaVersion, executionId = executionId, workflowId = workflowRunContext?.workflowId ?: ""
)
}

fun composeChainedAlert(
ctx: ChainedAlertTriggerExecutionContext,
executionId: String,
workflow: Workflow,
associatedAlertIds: List<String>
): Alert {
return Alert(
startTime = Instant.now(),
lastNotificationTime = Instant.now(),
state = Alert.State.ACTIVE,
errorMessage = null, schemaVersion = -1,
chainedAlertTrigger = ctx.trigger,
executionId = executionId,
workflow = workflow,
associatedAlertIds = associatedAlertIds
)
}

Expand Down Expand Up @@ -279,7 +322,9 @@ class AlertService(
trigger: BucketLevelTrigger,
currentAlerts: MutableMap<String, Alert>,
aggResultBuckets: List<AggregationResultBucket>,
findings: List<String>
findings: List<String>,
executionId: String,
workflorwRunContext: WorkflowRunContext?
): Map<AlertCategory, List<Alert>> {
val dedupedAlerts = mutableListOf<Alert>()
val newAlerts = mutableListOf<Alert>()
Expand All @@ -295,12 +340,15 @@ class AlertService(
currentAlerts.remove(aggAlertBucket.getBucketKeysHash())
} else {
// New Alert
val alertState = if (workflorwRunContext?.auditDelegateMonitorAlerts == true) {
Alert.State.AUDIT
} else Alert.State.ACTIVE
val newAlert = Alert(
monitor = monitor, trigger = trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = Alert.State.ACTIVE, errorMessage = null,
lastNotificationTime = currentTime, state = alertState, errorMessage = null,
errorHistory = mutableListOf(), actionExecutionResults = mutableListOf(),
schemaVersion = IndexUtils.alertIndexSchemaVersion, aggregationResultBucket = aggAlertBucket,
findingIds = findings
findingIds = findings, executionId = executionId, workflowId = workflorwRunContext?.workflowId ?: ""
)
newAlerts.add(newAlert)
}
Expand Down Expand Up @@ -528,7 +576,8 @@ class AlertService(
dataSources: DataSources,
alerts: List<Alert>,
retryPolicy: BackoffPolicy,
allowUpdatingAcknowledgedAlert: Boolean = false
allowUpdatingAcknowledgedAlert: Boolean = false,
routingId: String // routing is mandatory and set as monitor id. for workflow chained alerts we pass workflow id as routing
) {
val alertsIndex = dataSources.alertsIndex
val alertsHistoryIndex = dataSources.alertsHistoryIndex
Expand All @@ -542,7 +591,7 @@ class AlertService(
Alert.State.ACTIVE, Alert.State.ERROR -> {
listOf<DocWriteRequest<*>>(
IndexRequest(alertsIndex)
.routing(alert.monitorId)
.routing(routingId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(if (alert.id != Alert.NO_ID) alert.id else null)
)
Expand All @@ -553,7 +602,7 @@ class AlertService(
if (allowUpdatingAcknowledgedAlert) {
listOf<DocWriteRequest<*>>(
IndexRequest(alertsIndex)
.routing(alert.monitorId)
.routing(routingId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(if (alert.id != Alert.NO_ID) alert.id else null)
)
Expand All @@ -562,9 +611,12 @@ class AlertService(
}
}
Alert.State.AUDIT -> {
val index = if (alertIndices.isAlertHistoryEnabled()) {
dataSources.alertsHistoryIndex
} else dataSources.alertsIndex
listOf<DocWriteRequest<*>>(
IndexRequest(alertsIndex)
.routing(alert.monitorId)
IndexRequest(index)
.routing(routingId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(if (alert.id != Alert.NO_ID) alert.id else null)
)
Expand All @@ -575,11 +627,11 @@ class AlertService(
Alert.State.COMPLETED -> {
listOfNotNull<DocWriteRequest<*>>(
DeleteRequest(alertsIndex, alert.id)
.routing(alert.monitorId),
.routing(routingId),
// Only add completed alert to history index if history is enabled
if (alertIndices.isAlertHistoryEnabled()) {
IndexRequest(alertsHistoryIndex)
.routing(alert.monitorId)
.routing(routingId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(alert.id)
} else null
Expand All @@ -591,7 +643,7 @@ class AlertService(
if (requestsToRetry.isEmpty()) return
// Retry Bulk requests if there was any 429 response
retryPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) {
val bulkRequest = BulkRequest().add(requestsToRetry)
val bulkRequest = BulkRequest().add(requestsToRetry).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
val bulkResponse: BulkResponse = client.suspendUntil { client.bulk(bulkRequest, it) }
val failedResponses = (bulkResponse.items ?: arrayOf()).filter { it.isFailed }
requestsToRetry = failedResponses.filter { it.status() == RestStatus.TOO_MANY_REQUESTS }
Expand All @@ -616,13 +668,16 @@ class AlertService(
val savedAlerts = mutableListOf<Alert>()
var alertsBeingIndexed = alerts
var requestsToRetry: MutableList<IndexRequest> = alerts.map { alert ->
if (alert.state != Alert.State.ACTIVE) {
if (alert.state != Alert.State.ACTIVE && alert.state != Alert.State.AUDIT) {
throw IllegalStateException("Unexpected attempt to save new alert [$alert] with state [${alert.state}]")
}
if (alert.id != Alert.NO_ID) {
throw IllegalStateException("Unexpected attempt to save new alert [$alert] with an existing alert ID [${alert.id}]")
}
IndexRequest(dataSources.alertsIndex)
val alertIndex = if (alert.state == Alert.State.AUDIT && alertIndices.isAlertHistoryEnabled()) {
dataSources.alertsHistoryIndex
} else dataSources.alertsIndex
IndexRequest(alertIndex)
.routing(alert.monitorId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
}.toMutableList()
Expand Down Expand Up @@ -683,13 +738,15 @@ class AlertService(
* @param monitorId The Monitor to get Alerts for
* @param size The number of search hits (Alerts) to return
*/
private suspend fun searchAlerts(monitor: Monitor, size: Int): SearchResponse {
private suspend fun searchAlerts(monitor: Monitor, size: Int, workflowRunContext: WorkflowRunContext?): SearchResponse {
val monitorId = monitor.id
val alertIndex = monitor.dataSources.alertsIndex

val queryBuilder = QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitorId))

.must(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitorId))
if (workflowRunContext != null) {
queryBuilder.must(QueryBuilders.termQuery(Alert.WORKFLOW_ID_FIELD, workflowRunContext.workflowId))
}
val searchSourceBuilder = SearchSourceBuilder()
.size(size)
.query(queryBuilder)
Expand Down
Loading

0 comments on commit 57b2ebf

Please sign in to comment.