Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

add throttle on action level #48

Merged
merged 6 commits into from
May 9, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ dependencies {

compile project(":alerting-core")
compile project(":alerting-notification")

testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
}

javadoc.enabled = false // turn off javadoc as it barfs on Kotlin code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.convertToMap
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.firstFailureOrNull
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.retry
import com.amazon.opendistroforelasticsearch.alerting.model.ActionExecutionResult
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ExceptionsHelper
import org.elasticsearch.action.DocWriteRequest
Expand Down Expand Up @@ -193,7 +194,7 @@ class MonitorRunner(
if (isTriggerActionable(triggerCtx, triggerResult)) {
val actionCtx = triggerCtx.copy(error = monitorResult.error ?: triggerResult.error)
for (action in trigger.actions) {
triggerResult.actionResults[action.name] = runAction(action, actionCtx, dryrun)
triggerResult.actionResults[action.id] = runAction(action, actionCtx, dryrun)
}
}

Expand All @@ -219,22 +220,45 @@ class MonitorRunner(
private fun composeAlert(ctx: TriggerExecutionContext, result: TriggerRunResult, alertError: AlertError?): Alert? {
val currentTime = currentTime()
val currentAlert = ctx.alert

val updatedActionExecutionResults = mutableListOf<ActionExecutionResult>()
val currentActionIds = mutableSetOf<String>()
if (currentAlert != null) {
for (actionExecutionResult in currentAlert.actionExecutionResults) {
val actionId = actionExecutionResult.actionId
currentActionIds.add(actionId)
val actionRunResult = result.actionResults[actionId]
when {
actionRunResult == null -> updatedActionExecutionResults.add(actionExecutionResult)
actionRunResult.throttled ->
updatedActionExecutionResults.add(actionExecutionResult.copy(
throttledCount = actionExecutionResult.throttledCount + 1))
else -> updatedActionExecutionResults.add(actionExecutionResult.copy(lastExecutionTime = actionRunResult.executionTime))
}
}
updatedActionExecutionResults.addAll(result.actionResults.filter { it -> currentActionIds.contains(it.key) }
.map { it -> ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) })
} else {
updatedActionExecutionResults.addAll(result.actionResults.map { it -> ActionExecutionResult(it.key, it.value.executionTime,
if (it.value.throttled) 1 else 0) })
}

// Merge the alert's error message to the current alert's history
val updatedHistory = currentAlert?.errorHistory.update(alertError)
return if (alertError == null && !result.triggered) {
currentAlert?.copy(state = COMPLETED, endTime = currentTime, errorMessage = null,
errorHistory = updatedHistory)
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults)
} else if (alertError == null && currentAlert?.isAcknowledged() == true) {
null
} else if (currentAlert != null) {
val alertState = if (alertError == null) ACTIVE else ERROR
currentAlert.copy(state = alertState, lastNotificationTime = currentTime, errorMessage = alertError?.message,
errorHistory = updatedHistory)
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults)
} else {
val alertState = if (alertError == null) ACTIVE else ERROR
Alert(monitor = ctx.monitor, trigger = ctx.trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
errorHistory = updatedHistory)
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults)
}
}

Expand Down Expand Up @@ -380,8 +404,24 @@ class MonitorRunner(
return result.triggered && !suppress
}

private fun isActionActionable(action: Action, alert: Alert?): Boolean {
if (alert == null) {
return true
}
if (action.throttleEnabled) {
val result = alert.actionExecutionResults.firstOrNull { r -> r.actionId == action.id }
val lastExecutionTime: Instant? = result?.lastExecutionTime
val throttledTimeBound = currentTime().minus(action.throttle!!.value!!.toLong(), action.throttle.unit)
return (lastExecutionTime == null || lastExecutionTime.isBefore(throttledTimeBound))
}
return true
}

private fun runAction(action: Action, ctx: TriggerExecutionContext, dryrun: Boolean): ActionRunResult {
return try {
if (!isActionActionable(action, ctx.alert)) {
return ActionRunResult(action.id, action.name, mapOf(), true, null, null)
}
val actionOutput = mutableMapOf<String, String>()
actionOutput[SUBJECT] = if (action.subjectTemplate != null) compileTemplate(action.subjectTemplate, ctx) else ""
actionOutput[MESSAGE] = compileTemplate(action.messageTemplate, ctx)
Expand All @@ -392,9 +432,9 @@ class MonitorRunner(
var destination = getDestinationInfo(action.destinationId)
actionOutput[MESSAGE_ID] = destination.publish(actionOutput[SUBJECT], actionOutput[MESSAGE]!!)
}
ActionRunResult(action.name, actionOutput, false, null)
ActionRunResult(action.id, action.name, actionOutput, false, currentTime(), null)
} catch (e: Exception) {
ActionRunResult(action.name, mapOf(), false, e)
ActionRunResult(action.id, action.name, mapOf(), false, currentTime(), e)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.alerting.model

import com.amazon.opendistroforelasticsearch.alerting.elasticapi.instant
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.optionalTimeField
import org.elasticsearch.common.xcontent.ToXContent
import org.elasticsearch.common.xcontent.ToXContentObject
import org.elasticsearch.common.xcontent.XContentBuilder
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentParserUtils
import java.io.IOException
import java.lang.IllegalStateException
import java.time.Instant

/**
* When an alert triggered, the trigger's actions will be executed.
* Action execution result records action throttle result and is a part of Alert.
*/
data class ActionExecutionResult(
val actionId: String,
val lastExecutionTime: Instant?,
val throttledCount: Int = 0
) : ToXContentObject {

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return builder.startObject()
.field(ACTION_ID_FIELD, actionId)
.optionalTimeField(LAST_EXECUTION_TIME_FIELD, lastExecutionTime)
.field(THROTTLED_COUNT_FIELD, throttledCount)
.endObject()
}

companion object {
const val ACTION_ID_FIELD = "action_id"
const val LAST_EXECUTION_TIME_FIELD = "last_execution_time"
const val THROTTLED_COUNT_FIELD = "throttled_count"

@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser): ActionExecutionResult {
var actionId: String? = null
ylwu-amzn marked this conversation as resolved.
Show resolved Hide resolved
var throttledCount: Int = 0
var lastExecutionTime: Instant? = null

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()
when (fieldName) {
ACTION_ID_FIELD -> actionId = xcp.text()
THROTTLED_COUNT_FIELD -> throttledCount = xcp.intValue()
LAST_EXECUTION_TIME_FIELD -> lastExecutionTime = xcp.instant()

else -> {
throw IllegalStateException("Unexpected field: $fieldName, while parsing action")
}
}
}

requireNotNull(actionId) { "Must set action id" }
return ActionExecutionResult(actionId, lastExecutionTime, throttledCount)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ data class Alert(
val acknowledgedTime: Instant? = null,
val errorMessage: String? = null,
val errorHistory: List<AlertError>,
val severity: String
val severity: String,
val actionExecutionResults: List<ActionExecutionResult>
) : ToXContent {

init {
Expand All @@ -57,11 +58,12 @@ data class Alert(
lastNotificationTime: Instant?,
state: State = State.ACTIVE,
errorMessage: String? = null,
errorHistory: List<AlertError> = mutableListOf()
errorHistory: List<AlertError> = mutableListOf(),
actionExecutionResults: List<ActionExecutionResult> = mutableListOf()
) : this(monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version,
triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime,
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory,
severity = trigger.severity)
severity = trigger.severity, actionExecutionResults = actionExecutionResults)

enum class State {
ACTIVE, ACKNOWLEDGED, COMPLETED, ERROR, DELETED
Expand All @@ -86,6 +88,7 @@ data class Alert(
const val ERROR_MESSAGE_FIELD = "error_message"
const val ALERT_HISTORY_FIELD = "alert_history"
const val SEVERITY_FIELD = "severity"
const val ACTION_EXECUTION_RESULTS_FIELD = "action_execution_results"

const val NO_ID = ""
const val NO_VERSION = Versions.NOT_FOUND
Expand Down Expand Up @@ -115,6 +118,7 @@ data class Alert(
var acknowledgedTime: Instant? = null
var errorMessage: String? = null
val errorHistory: MutableList<AlertError> = mutableListOf()
var actionExecutionResults: MutableList<ActionExecutionResult> = mutableListOf()

ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
Expand All @@ -140,6 +144,12 @@ data class Alert(
}
}
SEVERITY_FIELD -> severity = xcp.text()
ACTION_EXECUTION_RESULTS_FIELD -> {
ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp::getTokenLocation)
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
actionExecutionResults.add(ActionExecutionResult.parse(xcp))
}
}
}
}

Expand All @@ -148,7 +158,8 @@ data class Alert(
triggerId = requireNotNull(triggerId), triggerName = requireNotNull(triggerName),
state = requireNotNull(state), startTime = requireNotNull(startTime), endTime = endTime,
lastNotificationTime = lastNotificationTime, acknowledgedTime = acknowledgedTime,
errorMessage = errorMessage, errorHistory = errorHistory, severity = severity)
errorMessage = errorMessage, errorHistory = errorHistory, severity = severity,
actionExecutionResults = actionExecutionResults)
}
}

Expand All @@ -163,6 +174,7 @@ data class Alert(
.field(ERROR_MESSAGE_FIELD, errorMessage)
.field(ALERT_HISTORY_FIELD, errorHistory.toTypedArray())
.field(SEVERITY_FIELD, severity)
.field(ACTION_EXECUTION_RESULTS_FIELD, actionExecutionResults.toTypedArray())
.optionalTimeField(START_TIME_FIELD, startTime)
.optionalTimeField(LAST_NOTIFICATION_TIME_FIELD, lastNotificationTime)
.optionalTimeField(END_TIME_FIELD, endTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,21 @@ data class TriggerRunResult(
}

data class ActionRunResult(
val actionId: String,
val actionName: String,
val output: Map<String, String>,
val throttled: Boolean = false,
val executionTime: Instant? = null,
val error: Exception? = null
) : ToXContent {

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return builder.startObject()
.field("id", actionId)
.field("name", actionName)
.field("output", output)
.field("throttled", throttled)
.optionalTimeField("executionTime", executionTime)
.field("error", error?.message)
.endObject()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.amazon.opendistroforelasticsearch.alerting.model.action

import org.elasticsearch.common.UUIDs
import org.elasticsearch.common.xcontent.ToXContent
import org.elasticsearch.common.xcontent.ToXContentObject
import org.elasticsearch.common.xcontent.XContentBuilder
Expand All @@ -31,7 +32,10 @@ data class Action(
val name: String,
val destinationId: String,
val subjectTemplate: Script?,
val messageTemplate: Script
val messageTemplate: Script,
val throttleEnabled: Boolean,
val throttle: Throttle?,
val id: String = UUIDs.base64UUID()
) : ToXContentObject {

init {
Expand All @@ -43,10 +47,13 @@ data class Action(

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return builder.startObject()
.field(ID_FIELD, id)
.field(NAME_FIELD, name)
.field(DESTINATION_ID_FIELD, destinationId)
.field(SUBJECT_TEMPLATE_FIELD, subjectTemplate)
.field(MESSAGE_TEMPLATE_FIELD, messageTemplate)
.field(THROTTLE_ENABLED_FIELD, throttleEnabled)
.field(THROTTLE_FIELD, throttle)
.endObject()
}

Expand All @@ -55,10 +62,13 @@ data class Action(
}

companion object {
const val ID_FIELD = "id"
const val NAME_FIELD = "name"
const val DESTINATION_ID_FIELD = "destination_id"
const val SUBJECT_TEMPLATE_FIELD = "subject_template"
const val MESSAGE_TEMPLATE_FIELD = "message_template"
const val THROTTLE_ENABLED_FIELD = "throttle_enabled"
const val THROTTLE_FIELD = "throttle"
const val MUSTACHE = "mustache"
const val SUBJECT = "subject"
const val MESSAGE = "message"
Expand All @@ -67,29 +77,42 @@ data class Action(
@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser): Action {
var id = UUIDs.base64UUID() // assign a default action id if one is not specified
lateinit var name: String
lateinit var destinationId: String
var subjectTemplate: Script? = null // subject template could be null for some destinations
lateinit var messageTemplate: Script
var throttleEnabled = false
var throttle: Throttle? = null

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()
when (fieldName) {
ID_FIELD -> id = xcp.text()
NAME_FIELD -> name = xcp.textOrNull()
DESTINATION_ID_FIELD -> destinationId = xcp.textOrNull()
SUBJECT_TEMPLATE_FIELD -> subjectTemplate = Script.parse(xcp, Script.DEFAULT_TEMPLATE_LANG)
MESSAGE_TEMPLATE_FIELD -> messageTemplate = Script.parse(xcp, Script.DEFAULT_TEMPLATE_LANG)
THROTTLE_FIELD -> throttle = Throttle.parse(xcp)
THROTTLE_ENABLED_FIELD -> {
throttleEnabled = xcp.booleanValue()
}

else -> {
throw IllegalStateException("Unexpected field: $fieldName, while parsing action")
}
}
}
return Action(requireNotNull(name) { "Destination name is null" },

return Action(requireNotNull(name) { "Action name is null" },
requireNotNull(destinationId) { "Destination id is null" },
subjectTemplate,
requireNotNull(messageTemplate) { "Destination message template is null" })
requireNotNull(messageTemplate) { "Action message template is null" },
throttleEnabled,
throttle,
id = requireNotNull(id))
}
}
}
Loading