Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.9] Backport 2.x Adds chained alerts (#976) #1008

Merged
merged 1 commit into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 88 additions & 31 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.opensearchapi.firstFailureOrNull
import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.ChainedAlertTriggerExecutionContext
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.util.IndexUtils
Expand All @@ -47,6 +48,7 @@ import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.NoOpTrigger
import org.opensearch.commons.alerting.model.Trigger
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.alerting.model.action.AlertCategory
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.XContentParser
Expand Down Expand Up @@ -81,10 +83,11 @@ class AlertService(

private val logger = LogManager.getLogger(AlertService::class.java)

suspend fun loadCurrentAlertsForQueryLevelMonitor(monitor: Monitor): Map<Trigger, Alert?> {
suspend fun loadCurrentAlertsForQueryLevelMonitor(monitor: Monitor, workflowRunContext: WorkflowRunContext?): Map<Trigger, Alert?> {
val searchAlertsResponse: SearchResponse = searchAlerts(
monitor = monitor,
size = monitor.triggers.size * 2 // We expect there to be only a single in-progress alert so fetch 2 to check
size = monitor.triggers.size * 2, // We expect there to be only a single in-progress alert so fetch 2 to check
workflowRunContext
)

val foundAlerts = searchAlertsResponse.hits.map { Alert.parse(contentParser(it.sourceRef), it.id, it.version) }
Expand All @@ -100,11 +103,15 @@ class AlertService(
}
}

suspend fun loadCurrentAlertsForBucketLevelMonitor(monitor: Monitor): Map<Trigger, MutableMap<String, Alert>> {
suspend fun loadCurrentAlertsForBucketLevelMonitor(
monitor: Monitor,
workflowRunContext: WorkflowRunContext?,
): Map<Trigger, MutableMap<String, Alert>> {
val searchAlertsResponse: SearchResponse = searchAlerts(
monitor = monitor,
// TODO: This should be limited based on a circuit breaker that limits Alerts
size = MAX_BUCKET_LEVEL_MONITOR_ALERT_SEARCH_COUNT
size = MAX_BUCKET_LEVEL_MONITOR_ALERT_SEARCH_COUNT,
workflowRunContext = workflowRunContext
)

val foundAlerts = searchAlertsResponse.hits.map { Alert.parse(contentParser(it.sourceRef), it.id, it.version) }
Expand All @@ -123,7 +130,9 @@ class AlertService(
fun composeQueryLevelAlert(
ctx: QueryLevelTriggerExecutionContext,
result: QueryLevelTriggerRunResult,
alertError: AlertError?
alertError: AlertError?,
executionId: String,
workflorwRunContext: WorkflowRunContext?
): Alert? {
val currentTime = Instant.now()
val currentAlert = ctx.alert
Expand Down Expand Up @@ -181,15 +190,19 @@ class AlertService(
errorMessage = alertError?.message,
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion
schemaVersion = IndexUtils.alertIndexSchemaVersion,
)
} else {
val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR
val alertState = if (workflorwRunContext?.auditDelegateMonitorAlerts == true) {
Alert.State.AUDIT
} else if (alertError == null) Alert.State.ACTIVE
else Alert.State.ERROR
Alert(
monitor = ctx.monitor, trigger = ctx.trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion
schemaVersion = IndexUtils.alertIndexSchemaVersion, executionId = executionId,
workflowId = workflorwRunContext?.workflowId ?: ""
)
}
}
Expand All @@ -199,15 +212,24 @@ class AlertService(
findings: List<String>,
relatedDocIds: List<String>,
ctx: DocumentLevelTriggerExecutionContext,
alertError: AlertError?
alertError: AlertError?,
executionId: String,
workflorwRunContext: WorkflowRunContext?
): Alert {
val currentTime = Instant.now()

val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR
val alertState = if (workflorwRunContext?.auditDelegateMonitorAlerts == true) {
Alert.State.AUDIT
} else if (alertError == null) {
Alert.State.ACTIVE
} else {
Alert.State.ERROR
}
return Alert(
id = UUID.randomUUID().toString(), monitor = ctx.monitor, trigger = ctx.trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
schemaVersion = IndexUtils.alertIndexSchemaVersion, findingIds = findings, relatedDocIds = relatedDocIds
schemaVersion = IndexUtils.alertIndexSchemaVersion, findingIds = findings, relatedDocIds = relatedDocIds,
executionId = executionId, workflowId = workflorwRunContext?.workflowId ?: ""
)
}

Expand All @@ -219,12 +241,33 @@ class AlertService(
workflowRunContext: WorkflowRunContext?
): Alert {
val currentTime = Instant.now()
val alertState = if (workflowRunContext?.auditDelegateMonitorAlerts == true) {
Alert.State.AUDIT
} else {
Alert.State.ERROR
}
return Alert(
id = id, monitor = monitor, trigger = NoOpTrigger(), startTime = currentTime,
lastNotificationTime = currentTime, state = Alert.State.ERROR, errorMessage = alertError?.message,
schemaVersion = IndexUtils.alertIndexSchemaVersion,
workflowId = workflowRunContext?.workflowId ?: "",
executionId = executionId ?: ""
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
schemaVersion = IndexUtils.alertIndexSchemaVersion, executionId = executionId, workflowId = workflowRunContext?.workflowId ?: ""
)
}

fun composeChainedAlert(
ctx: ChainedAlertTriggerExecutionContext,
executionId: String,
workflow: Workflow,
associatedAlertIds: List<String>
): Alert {
return Alert(
startTime = Instant.now(),
lastNotificationTime = Instant.now(),
state = Alert.State.ACTIVE,
errorMessage = null, schemaVersion = -1,
chainedAlertTrigger = ctx.trigger,
executionId = executionId,
workflow = workflow,
associatedAlertIds = associatedAlertIds
)
}

Expand Down Expand Up @@ -279,7 +322,9 @@ class AlertService(
trigger: BucketLevelTrigger,
currentAlerts: MutableMap<String, Alert>,
aggResultBuckets: List<AggregationResultBucket>,
findings: List<String>
findings: List<String>,
executionId: String,
workflorwRunContext: WorkflowRunContext?
): Map<AlertCategory, List<Alert>> {
val dedupedAlerts = mutableListOf<Alert>()
val newAlerts = mutableListOf<Alert>()
Expand All @@ -295,12 +340,15 @@ class AlertService(
currentAlerts.remove(aggAlertBucket.getBucketKeysHash())
} else {
// New Alert
val alertState = if (workflorwRunContext?.auditDelegateMonitorAlerts == true) {
Alert.State.AUDIT
} else Alert.State.ACTIVE
val newAlert = Alert(
monitor = monitor, trigger = trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = Alert.State.ACTIVE, errorMessage = null,
lastNotificationTime = currentTime, state = alertState, errorMessage = null,
errorHistory = mutableListOf(), actionExecutionResults = mutableListOf(),
schemaVersion = IndexUtils.alertIndexSchemaVersion, aggregationResultBucket = aggAlertBucket,
findingIds = findings
findingIds = findings, executionId = executionId, workflowId = workflorwRunContext?.workflowId ?: ""
)
newAlerts.add(newAlert)
}
Expand Down Expand Up @@ -528,7 +576,8 @@ class AlertService(
dataSources: DataSources,
alerts: List<Alert>,
retryPolicy: BackoffPolicy,
allowUpdatingAcknowledgedAlert: Boolean = false
allowUpdatingAcknowledgedAlert: Boolean = false,
routingId: String // routing is mandatory and set as monitor id. for workflow chained alerts we pass workflow id as routing
) {
val alertsIndex = dataSources.alertsIndex
val alertsHistoryIndex = dataSources.alertsHistoryIndex
Expand All @@ -542,7 +591,7 @@ class AlertService(
Alert.State.ACTIVE, Alert.State.ERROR -> {
listOf<DocWriteRequest<*>>(
IndexRequest(alertsIndex)
.routing(alert.monitorId)
.routing(routingId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(if (alert.id != Alert.NO_ID) alert.id else null)
)
Expand All @@ -553,7 +602,7 @@ class AlertService(
if (allowUpdatingAcknowledgedAlert) {
listOf<DocWriteRequest<*>>(
IndexRequest(alertsIndex)
.routing(alert.monitorId)
.routing(routingId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(if (alert.id != Alert.NO_ID) alert.id else null)
)
Expand All @@ -562,9 +611,12 @@ class AlertService(
}
}
Alert.State.AUDIT -> {
val index = if (alertIndices.isAlertHistoryEnabled()) {
dataSources.alertsHistoryIndex
} else dataSources.alertsIndex
listOf<DocWriteRequest<*>>(
IndexRequest(alertsIndex)
.routing(alert.monitorId)
IndexRequest(index)
.routing(routingId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(if (alert.id != Alert.NO_ID) alert.id else null)
)
Expand All @@ -575,11 +627,11 @@ class AlertService(
Alert.State.COMPLETED -> {
listOfNotNull<DocWriteRequest<*>>(
DeleteRequest(alertsIndex, alert.id)
.routing(alert.monitorId),
.routing(routingId),
// Only add completed alert to history index if history is enabled
if (alertIndices.isAlertHistoryEnabled()) {
IndexRequest(alertsHistoryIndex)
.routing(alert.monitorId)
.routing(routingId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(alert.id)
} else null
Expand All @@ -591,7 +643,7 @@ class AlertService(
if (requestsToRetry.isEmpty()) return
// Retry Bulk requests if there was any 429 response
retryPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) {
val bulkRequest = BulkRequest().add(requestsToRetry)
val bulkRequest = BulkRequest().add(requestsToRetry).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
val bulkResponse: BulkResponse = client.suspendUntil { client.bulk(bulkRequest, it) }
val failedResponses = (bulkResponse.items ?: arrayOf()).filter { it.isFailed }
requestsToRetry = failedResponses.filter { it.status() == RestStatus.TOO_MANY_REQUESTS }
Expand All @@ -616,13 +668,16 @@ class AlertService(
val savedAlerts = mutableListOf<Alert>()
var alertsBeingIndexed = alerts
var requestsToRetry: MutableList<IndexRequest> = alerts.map { alert ->
if (alert.state != Alert.State.ACTIVE) {
if (alert.state != Alert.State.ACTIVE && alert.state != Alert.State.AUDIT) {
throw IllegalStateException("Unexpected attempt to save new alert [$alert] with state [${alert.state}]")
}
if (alert.id != Alert.NO_ID) {
throw IllegalStateException("Unexpected attempt to save new alert [$alert] with an existing alert ID [${alert.id}]")
}
IndexRequest(dataSources.alertsIndex)
val alertIndex = if (alert.state == Alert.State.AUDIT && alertIndices.isAlertHistoryEnabled()) {
dataSources.alertsHistoryIndex
} else dataSources.alertsIndex
IndexRequest(alertIndex)
.routing(alert.monitorId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
}.toMutableList()
Expand Down Expand Up @@ -683,13 +738,15 @@ class AlertService(
* @param monitorId The Monitor to get Alerts for
* @param size The number of search hits (Alerts) to return
*/
private suspend fun searchAlerts(monitor: Monitor, size: Int): SearchResponse {
private suspend fun searchAlerts(monitor: Monitor, size: Int, workflowRunContext: WorkflowRunContext?): SearchResponse {
val monitorId = monitor.id
val alertIndex = monitor.dataSources.alertsIndex

val queryBuilder = QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitorId))

.must(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitorId))
if (workflowRunContext != null) {
queryBuilder.must(QueryBuilders.termQuery(Alert.WORKFLOW_ID_FIELD, workflowRunContext.workflowId))
}
val searchSourceBuilder = SearchSourceBuilder()
.size(size)
.query(queryBuilder)
Expand Down
Loading