From 856702896cd3665388b602375e1588856b2d2ea5 Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Tue, 11 Jul 2023 13:16:16 -0700 Subject: [PATCH] [Backport 2.x] fix alert constructor with noop trigger to use execution id and workflow id (#994) Signed-off-by: Subhobrata Dey Co-authored-by: Surya Sashank Nistala --- .../org/opensearch/alerting/AlertService.kt | 19 +++++++++++---- .../alerting/DocumentLevelMonitorRunner.kt | 9 ++++++-- .../alerting/alerts/alert_mapping.json | 23 ++++++++++++++----- 3 files changed, 39 insertions(+), 12 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt index 47eed6010..c2c5e024b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt @@ -29,6 +29,7 @@ import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.MAX_SEARCH_SIZE import org.opensearch.alerting.util.getBucketKeysHash +import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.client.Client import org.opensearch.common.bytes.BytesReference import org.opensearch.common.unit.TimeValue @@ -213,13 +214,17 @@ class AlertService( fun composeMonitorErrorAlert( id: String, monitor: Monitor, - alertError: AlertError + alertError: AlertError, + executionId: String?, + workflowRunContext: WorkflowRunContext? ): Alert { val currentTime = Instant.now() return Alert( id = id, monitor = monitor, trigger = NoOpTrigger(), startTime = currentTime, lastNotificationTime = currentTime, state = Alert.State.ERROR, errorMessage = alertError?.message, - schemaVersion = IndexUtils.alertIndexSchemaVersion, executionId = "" + schemaVersion = IndexUtils.alertIndexSchemaVersion, + workflowId = workflowRunContext?.workflowId ?: "", + executionId = executionId ?: "" ) } @@ -319,7 +324,12 @@ class AlertService( } ?: listOf() } - suspend fun upsertMonitorErrorAlert(monitor: Monitor, errorMessage: String) { + suspend fun upsertMonitorErrorAlert( + monitor: Monitor, + errorMessage: String, + executionId: String?, + workflowRunContext: WorkflowRunContext?, + ) { val newErrorAlertId = "$ERROR_ALERT_ID_PREFIX-${monitor.id}-${UUID.randomUUID()}" val searchRequest = SearchRequest(monitor.dataSources.alertsIndex) @@ -334,7 +344,8 @@ class AlertService( ) val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) } - var alert = composeMonitorErrorAlert(newErrorAlertId, monitor, AlertError(Instant.now(), errorMessage)) + var alert = + composeMonitorErrorAlert(newErrorAlertId, monitor, AlertError(Instant.now(), errorMessage), executionId, workflowRunContext) if (searchResponse.hits.totalHits.value > 0L) { if (searchResponse.hits.totalHits.value > 1L) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index bb2a04e8a..9c3cfe0be 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -248,7 +248,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // If any error happened during trigger execution, upsert monitor error alert val errorMessage = constructErrorMessageFromTriggerResults(triggerResults = triggerResults) if (errorMessage.isNotEmpty()) { - monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor = monitor, errorMessage = errorMessage) + monitorCtx.alertService!!.upsertMonitorErrorAlert( + monitor = monitor, + errorMessage = errorMessage, + executionId = workflowRunContext?.executionId, + workflowRunContext + ) } else { onSuccessfulMonitorRun(monitorCtx, monitor) } @@ -263,7 +268,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { return monitorResult.copy(triggerResults = triggerResults) } catch (e: Exception) { val errorMessage = ExceptionsHelper.detailedMessage(e) - monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor, errorMessage) + monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor, errorMessage, workflowRunContext?.executionId, workflowRunContext) logger.error("Failed running Document-level-monitor ${monitor.name}", e) val alertingException = AlertingException( errorMessage, diff --git a/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json b/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json index acabec7e0..53fb5b0a2 100644 --- a/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json +++ b/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json @@ -13,9 +13,6 @@ "monitor_id": { "type": "keyword" }, - "workflow_id": { - "type": "keyword" - }, "monitor_version": { "type": "long" }, @@ -28,9 +25,6 @@ "severity": { "type": "keyword" }, - "execution_id": { - "type": "keyword" - }, "monitor_name": { "type": "text", "fields": { @@ -77,6 +71,15 @@ } } }, + "execution_id": { + "type": "keyword" + }, + "workflow_id": { + "type": "keyword" + }, + "workflow_name": { + "type": "keyword" + }, "trigger_id": { "type": "keyword" }, @@ -97,6 +100,14 @@ } } }, + "associated_alert_ids": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, "related_doc_ids": { "type" : "text", "fields" : {