Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

add throttle on action level #48

Merged
merged 6 commits into from
May 9, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.convertToMap
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.firstFailureOrNull
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.retry
import com.amazon.opendistroforelasticsearch.alerting.model.ActionExecutionResult
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ExceptionsHelper
import org.elasticsearch.action.DocWriteRequest
Expand Down Expand Up @@ -193,7 +194,7 @@ class MonitorRunner(
if (isTriggerActionable(triggerCtx, triggerResult)) {
val actionCtx = triggerCtx.copy(error = monitorResult.error ?: triggerResult.error)
for (action in trigger.actions) {
triggerResult.actionResults[action.name] = runAction(action, actionCtx, dryrun)
triggerResult.actionResults[action.id] = runAction(action, actionCtx, dryrun)
}
}

Expand All @@ -219,22 +220,45 @@ class MonitorRunner(
private fun composeAlert(ctx: TriggerExecutionContext, result: TriggerRunResult, alertError: AlertError?): Alert? {
val currentTime = currentTime()
val currentAlert = ctx.alert

val updatedActionExecutionResults = mutableListOf<ActionExecutionResult>()
val currentActionIds = mutableSetOf<String>()
if (currentAlert != null) {
for (actionExecutionResult in currentAlert.actionExecutionResults) {
val actionId = actionExecutionResult.actionId
currentActionIds.add(actionId)
val actionRunResult = result.actionResults[actionId]
when {
actionRunResult == null -> updatedActionExecutionResults.add(actionExecutionResult)
actionRunResult.throttled ->
updatedActionExecutionResults.add(actionExecutionResult.copy(
throttledCount = actionExecutionResult.throttledCount + 1))
else -> updatedActionExecutionResults.add(actionExecutionResult.copy(lastExecutionTime = actionRunResult.executionTime))
}
}
updatedActionExecutionResults.addAll(result.actionResults.filter { it -> currentActionIds.contains(it.key) }
.map { it -> ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) })
} else {
updatedActionExecutionResults.addAll(result.actionResults.map { it -> ActionExecutionResult(it.key, it.value.executionTime,
if (it.value.throttled) 1 else 0) })
}

// Merge the alert's error message to the current alert's history
val updatedHistory = currentAlert?.errorHistory.update(alertError)
return if (alertError == null && !result.triggered) {
currentAlert?.copy(state = COMPLETED, endTime = currentTime, errorMessage = null,
errorHistory = updatedHistory)
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults)
} else if (alertError == null && currentAlert?.isAcknowledged() == true) {
null
} else if (currentAlert != null) {
val alertState = if (alertError == null) ACTIVE else ERROR
currentAlert.copy(state = alertState, lastNotificationTime = currentTime, errorMessage = alertError?.message,
errorHistory = updatedHistory)
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults)
} else {
val alertState = if (alertError == null) ACTIVE else ERROR
Alert(monitor = ctx.monitor, trigger = ctx.trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
errorHistory = updatedHistory)
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults)
}
}

Expand Down Expand Up @@ -380,8 +404,24 @@ class MonitorRunner(
return result.triggered && !suppress
}

private fun isActionActionable(action: Action, alert: Alert?): Boolean {
if (alert == null) {
return true
}
if (action.throttleEnabled) {
val result = alert.actionExecutionResults.firstOrNull { r -> r.actionId == action.id }
val lastExecutionTime: Instant? = result?.lastExecutionTime
val throttledTimeBound = currentTime().minus(action.throttle!!.value!!.toLong(), action.throttle.unit)
return (lastExecutionTime == null || lastExecutionTime.isBefore(throttledTimeBound))
}
return true
}

private fun runAction(action: Action, ctx: TriggerExecutionContext, dryrun: Boolean): ActionRunResult {
return try {
if (!isActionActionable(action, ctx.alert)) {
return ActionRunResult(action.id, action.name, mapOf(), true, null, null)
}
val actionOutput = mutableMapOf<String, String>()
actionOutput[SUBJECT] = if (action.subjectTemplate != null) compileTemplate(action.subjectTemplate, ctx) else ""
actionOutput[MESSAGE] = compileTemplate(action.messageTemplate, ctx)
Expand All @@ -392,9 +432,9 @@ class MonitorRunner(
var destination = getDestinationInfo(action.destinationId)
actionOutput[MESSAGE_ID] = destination.publish(actionOutput[SUBJECT], actionOutput[MESSAGE]!!)
}
ActionRunResult(action.name, actionOutput, false, null)
ActionRunResult(action.id, action.name, actionOutput, false, currentTime(), null)
} catch (e: Exception) {
ActionRunResult(action.name, mapOf(), false, e)
ActionRunResult(action.id, action.name, mapOf(), false, currentTime(), e)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.alerting.model

import com.amazon.opendistroforelasticsearch.alerting.elasticapi.instant
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.optionalTimeField
import org.elasticsearch.common.xcontent.ToXContent
import org.elasticsearch.common.xcontent.ToXContentObject
import org.elasticsearch.common.xcontent.XContentBuilder
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentParserUtils
import java.io.IOException
import java.lang.IllegalStateException
import java.time.Instant

/**
* When an alert triggered, the trigger's actions will be executed.
* Action execution result records action throttle result and is a part of Alert.
*/
data class ActionExecutionResult(
val actionId: String,
val lastExecutionTime: Instant?,
val throttledCount: Int = 0
) : ToXContentObject {

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return builder.startObject()
.field(ACTION_ID_FIELD, actionId)
.optionalTimeField(LAST_EXECUTION_TIME_FIELD, lastExecutionTime)
.field(THROTTLED_COUNT_FIELD, throttledCount)
.endObject()
}

companion object {
const val ACTION_ID_FIELD = "action_id"
const val LAST_EXECUTION_TIME_FIELD = "last_execution_time"
const val THROTTLED_COUNT_FIELD = "throttled_count"

@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser): ActionExecutionResult {
var actionId: String? = null
ylwu-amzn marked this conversation as resolved.
Show resolved Hide resolved
var throttledCount: Int = 0
var lastExecutionTime: Instant? = null

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()
when (fieldName) {
ACTION_ID_FIELD -> actionId = xcp.text()
THROTTLED_COUNT_FIELD -> throttledCount = xcp.intValue()
LAST_EXECUTION_TIME_FIELD -> lastExecutionTime = xcp.instant()

else -> {
throw IllegalStateException("Unexpected field: $fieldName, while parsing action")
}
}
}

requireNotNull(actionId) { "Must set action id" }
return ActionExecutionResult(actionId, lastExecutionTime, throttledCount)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ data class Alert(
val acknowledgedTime: Instant? = null,
val errorMessage: String? = null,
val errorHistory: List<AlertError>,
val severity: String
val severity: String,
val actionExecutionResults: List<ActionExecutionResult>
) : ToXContent {

init {
Expand All @@ -57,11 +58,12 @@ data class Alert(
lastNotificationTime: Instant?,
state: State = State.ACTIVE,
errorMessage: String? = null,
errorHistory: List<AlertError> = mutableListOf()
errorHistory: List<AlertError> = mutableListOf(),
actionExecutionResults: List<ActionExecutionResult> = mutableListOf()
) : this(monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version,
triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime,
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory,
severity = trigger.severity)
severity = trigger.severity, actionExecutionResults = actionExecutionResults)

enum class State {
ACTIVE, ACKNOWLEDGED, COMPLETED, ERROR, DELETED
Expand All @@ -86,6 +88,7 @@ data class Alert(
const val ERROR_MESSAGE_FIELD = "error_message"
const val ALERT_HISTORY_FIELD = "alert_history"
const val SEVERITY_FIELD = "severity"
const val ACTION_EXECUTION_RESULTS_FIELD = "action_execution_results"

const val NO_ID = ""
const val NO_VERSION = Versions.NOT_FOUND
Expand Down Expand Up @@ -115,6 +118,7 @@ data class Alert(
var acknowledgedTime: Instant? = null
var errorMessage: String? = null
val errorHistory: MutableList<AlertError> = mutableListOf()
var actionExecutionResults: MutableList<ActionExecutionResult> = mutableListOf()

ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
Expand All @@ -140,6 +144,12 @@ data class Alert(
}
}
SEVERITY_FIELD -> severity = xcp.text()
ACTION_EXECUTION_RESULTS_FIELD -> {
ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp::getTokenLocation)
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
actionExecutionResults.add(ActionExecutionResult.parse(xcp))
}
}
}
}

Expand All @@ -148,7 +158,8 @@ data class Alert(
triggerId = requireNotNull(triggerId), triggerName = requireNotNull(triggerName),
state = requireNotNull(state), startTime = requireNotNull(startTime), endTime = endTime,
lastNotificationTime = lastNotificationTime, acknowledgedTime = acknowledgedTime,
errorMessage = errorMessage, errorHistory = errorHistory, severity = severity)
errorMessage = errorMessage, errorHistory = errorHistory, severity = severity,
actionExecutionResults = actionExecutionResults)
}
}

Expand All @@ -163,6 +174,7 @@ data class Alert(
.field(ERROR_MESSAGE_FIELD, errorMessage)
.field(ALERT_HISTORY_FIELD, errorHistory.toTypedArray())
.field(SEVERITY_FIELD, severity)
.field(ACTION_EXECUTION_RESULTS_FIELD, actionExecutionResults.toTypedArray())
.optionalTimeField(START_TIME_FIELD, startTime)
.optionalTimeField(LAST_NOTIFICATION_TIME_FIELD, lastNotificationTime)
.optionalTimeField(END_TIME_FIELD, endTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,21 @@ data class TriggerRunResult(
}

data class ActionRunResult(
val actionId: String,
val actionName: String,
val output: Map<String, String>,
val throttled: Boolean = false,
val executionTime: Instant? = null,
val error: Exception? = null
) : ToXContent {

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return builder.startObject()
.field("id", actionId)
.field("name", actionName)
.field("output", output)
.field("throttled", throttled)
.field("executionTime", executionTime)
.field("error", error?.message)
.endObject()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.amazon.opendistroforelasticsearch.alerting.model.action

import org.elasticsearch.common.UUIDs
import org.elasticsearch.common.xcontent.ToXContent
import org.elasticsearch.common.xcontent.ToXContentObject
import org.elasticsearch.common.xcontent.XContentBuilder
Expand All @@ -31,7 +32,10 @@ data class Action(
val name: String,
val destinationId: String,
val subjectTemplate: Script?,
val messageTemplate: Script
val messageTemplate: Script,
val throttleEnabled: Boolean,
val throttle: Throttle?,
val id: String = UUIDs.base64UUID()
) : ToXContentObject {

init {
Expand All @@ -43,10 +47,13 @@ data class Action(

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return builder.startObject()
.field(ID_FIELD, id)
.field(NAME_FIELD, name)
.field(DESTINATION_ID_FIELD, destinationId)
.field(SUBJECT_TEMPLATE_FIELD, subjectTemplate)
.field(MESSAGE_TEMPLATE_FIELD, messageTemplate)
.field(THROTTLE_ENABLED_FIELD, throttleEnabled)
.field(THROTTLE_FIELD, throttle)
.endObject()
}

Expand All @@ -55,10 +62,13 @@ data class Action(
}

companion object {
const val ID_FIELD = "id"
const val NAME_FIELD = "name"
const val DESTINATION_ID_FIELD = "destination_id"
const val SUBJECT_TEMPLATE_FIELD = "subject_template"
const val MESSAGE_TEMPLATE_FIELD = "message_template"
const val THROTTLE_ENABLED_FIELD = "throttle_enabled"
const val THROTTLE_FIELD = "throttle"
const val MUSTACHE = "mustache"
const val SUBJECT = "subject"
const val MESSAGE = "message"
Expand All @@ -67,29 +77,42 @@ data class Action(
@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser): Action {
var id = UUIDs.base64UUID() // assign a default action id if one is not specified
lateinit var name: String
lateinit var destinationId: String
var subjectTemplate: Script? = null // subject template could be null for some destinations
lateinit var messageTemplate: Script
var throttleEnabled = false
var throttle: Throttle? = null

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()
when (fieldName) {
ID_FIELD -> id = xcp.text()
NAME_FIELD -> name = xcp.textOrNull()
DESTINATION_ID_FIELD -> destinationId = xcp.textOrNull()
SUBJECT_TEMPLATE_FIELD -> subjectTemplate = Script.parse(xcp, Script.DEFAULT_TEMPLATE_LANG)
MESSAGE_TEMPLATE_FIELD -> messageTemplate = Script.parse(xcp, Script.DEFAULT_TEMPLATE_LANG)
THROTTLE_FIELD -> throttle = Throttle.parse(xcp)
THROTTLE_ENABLED_FIELD -> {
throttleEnabled = xcp.booleanValue()
}

else -> {
throw IllegalStateException("Unexpected field: $fieldName, while parsing action")
}
}
}
return Action(requireNotNull(name) { "Destination name is null" },

return Action(requireNotNull(name) { "Action name is null" },
requireNotNull(destinationId) { "Destination id is null" },
subjectTemplate,
requireNotNull(messageTemplate) { "Destination message template is null" })
requireNotNull(messageTemplate) { "Action message template is null" },
throttleEnabled,
throttle,
id = requireNotNull(id))
}
}
}
Loading