From 45e26e1a5ae10250c643ae1637ac82a393c46611 Mon Sep 17 00:00:00 2001 From: ylwu-amzn <49084640+ylwu-amzn@users.noreply.github.com> Date: Thu, 9 May 2019 16:12:16 -0700 Subject: [PATCH] add throttle on action level fixes #48 Add throttle on action level We only support MINUTES unit now. --- alerting/build.gradle | 2 + .../alerting/MonitorRunner.kt | 52 ++++++++++-- .../alerting/model/ActionExecutionResult.kt | 78 +++++++++++++++++ .../alerting/model/Alert.kt | 20 ++++- .../alerting/model/MonitorRunResult.kt | 4 + .../alerting/model/action/Action.kt | 29 ++++++- .../alerting/model/action/Throttle.kt | 84 +++++++++++++++++++ .../alerting/alerts/alert_mapping.json | 14 ++++ .../alerting/TestHelpers.kt | 24 +++++- .../alerting/model/XContentTests.kt | 47 ++++++++--- .../resources/mappings/scheduled-jobs.json | 13 +++ 11 files changed, 341 insertions(+), 26 deletions(-) create mode 100644 alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/ActionExecutionResult.kt create mode 100644 alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/action/Throttle.kt diff --git a/alerting/build.gradle b/alerting/build.gradle index 2a3abda0..f06e47ec 100644 --- a/alerting/build.gradle +++ b/alerting/build.gradle @@ -62,6 +62,8 @@ dependencies { compile project(":alerting-core") compile project(":alerting-notification") + + testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}" } javadoc.enabled = false // turn off javadoc as it barfs on Kotlin code diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunner.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunner.kt index ff1b979e..dcf9940f 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunner.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunner.kt @@ -51,6 +51,7 @@ import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput import com.amazon.opendistroforelasticsearch.alerting.elasticapi.convertToMap import com.amazon.opendistroforelasticsearch.alerting.elasticapi.firstFailureOrNull import com.amazon.opendistroforelasticsearch.alerting.elasticapi.retry +import com.amazon.opendistroforelasticsearch.alerting.model.ActionExecutionResult import org.apache.logging.log4j.LogManager import org.elasticsearch.ExceptionsHelper import org.elasticsearch.action.DocWriteRequest @@ -193,7 +194,7 @@ class MonitorRunner( if (isTriggerActionable(triggerCtx, triggerResult)) { val actionCtx = triggerCtx.copy(error = monitorResult.error ?: triggerResult.error) for (action in trigger.actions) { - triggerResult.actionResults[action.name] = runAction(action, actionCtx, dryrun) + triggerResult.actionResults[action.id] = runAction(action, actionCtx, dryrun) } } @@ -219,22 +220,45 @@ class MonitorRunner( private fun composeAlert(ctx: TriggerExecutionContext, result: TriggerRunResult, alertError: AlertError?): Alert? { val currentTime = currentTime() val currentAlert = ctx.alert + + val updatedActionExecutionResults = mutableListOf() + val currentActionIds = mutableSetOf() + if (currentAlert != null) { + for (actionExecutionResult in currentAlert.actionExecutionResults) { + val actionId = actionExecutionResult.actionId + currentActionIds.add(actionId) + val actionRunResult = result.actionResults[actionId] + when { + actionRunResult == null -> updatedActionExecutionResults.add(actionExecutionResult) + actionRunResult.throttled -> + updatedActionExecutionResults.add(actionExecutionResult.copy( + throttledCount = actionExecutionResult.throttledCount + 1)) + else -> updatedActionExecutionResults.add(actionExecutionResult.copy(lastExecutionTime = actionRunResult.executionTime)) + } + } + updatedActionExecutionResults.addAll(result.actionResults.filter { it -> currentActionIds.contains(it.key) } + .map { it -> ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) }) + } else { + updatedActionExecutionResults.addAll(result.actionResults.map { it -> ActionExecutionResult(it.key, it.value.executionTime, + if (it.value.throttled) 1 else 0) }) + } + // Merge the alert's error message to the current alert's history val updatedHistory = currentAlert?.errorHistory.update(alertError) return if (alertError == null && !result.triggered) { currentAlert?.copy(state = COMPLETED, endTime = currentTime, errorMessage = null, - errorHistory = updatedHistory) + errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults) } else if (alertError == null && currentAlert?.isAcknowledged() == true) { null } else if (currentAlert != null) { val alertState = if (alertError == null) ACTIVE else ERROR currentAlert.copy(state = alertState, lastNotificationTime = currentTime, errorMessage = alertError?.message, - errorHistory = updatedHistory) + errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults) } else { val alertState = if (alertError == null) ACTIVE else ERROR Alert(monitor = ctx.monitor, trigger = ctx.trigger, startTime = currentTime, lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message, - errorHistory = updatedHistory) + errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults) } } @@ -380,8 +404,24 @@ class MonitorRunner( return result.triggered && !suppress } + private fun isActionActionable(action: Action, alert: Alert?): Boolean { + if (alert == null || action.throttle == null) { + return true + } + if (action.throttleEnabled) { + val result = alert.actionExecutionResults.firstOrNull { r -> r.actionId == action.id } + val lastExecutionTime: Instant? = result?.lastExecutionTime + val throttledTimeBound = currentTime().minus(action.throttle.value.toLong(), action.throttle.unit) + return (lastExecutionTime == null || lastExecutionTime.isBefore(throttledTimeBound)) + } + return true + } + private fun runAction(action: Action, ctx: TriggerExecutionContext, dryrun: Boolean): ActionRunResult { return try { + if (!isActionActionable(action, ctx.alert)) { + return ActionRunResult(action.id, action.name, mapOf(), true, null, null) + } val actionOutput = mutableMapOf() actionOutput[SUBJECT] = if (action.subjectTemplate != null) compileTemplate(action.subjectTemplate, ctx) else "" actionOutput[MESSAGE] = compileTemplate(action.messageTemplate, ctx) @@ -392,9 +432,9 @@ class MonitorRunner( var destination = getDestinationInfo(action.destinationId) actionOutput[MESSAGE_ID] = destination.publish(actionOutput[SUBJECT], actionOutput[MESSAGE]!!) } - ActionRunResult(action.name, actionOutput, false, null) + ActionRunResult(action.id, action.name, actionOutput, false, currentTime(), null) } catch (e: Exception) { - ActionRunResult(action.name, mapOf(), false, e) + ActionRunResult(action.id, action.name, mapOf(), false, currentTime(), e) } } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/ActionExecutionResult.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/ActionExecutionResult.kt new file mode 100644 index 00000000..cfb7881d --- /dev/null +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/ActionExecutionResult.kt @@ -0,0 +1,78 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.alerting.model + +import com.amazon.opendistroforelasticsearch.alerting.elasticapi.instant +import com.amazon.opendistroforelasticsearch.alerting.elasticapi.optionalTimeField +import org.elasticsearch.common.xcontent.ToXContent +import org.elasticsearch.common.xcontent.ToXContentObject +import org.elasticsearch.common.xcontent.XContentBuilder +import org.elasticsearch.common.xcontent.XContentParser +import org.elasticsearch.common.xcontent.XContentParserUtils +import java.io.IOException +import java.lang.IllegalStateException +import java.time.Instant + +/** + * When an alert triggered, the trigger's actions will be executed. + * Action execution result records action throttle result and is a part of Alert. + */ +data class ActionExecutionResult( + val actionId: String, + val lastExecutionTime: Instant?, + val throttledCount: Int = 0 +) : ToXContentObject { + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field(ACTION_ID_FIELD, actionId) + .optionalTimeField(LAST_EXECUTION_TIME_FIELD, lastExecutionTime) + .field(THROTTLED_COUNT_FIELD, throttledCount) + .endObject() + } + + companion object { + const val ACTION_ID_FIELD = "action_id" + const val LAST_EXECUTION_TIME_FIELD = "last_execution_time" + const val THROTTLED_COUNT_FIELD = "throttled_count" + + @JvmStatic + @Throws(IOException::class) + fun parse(xcp: XContentParser): ActionExecutionResult { + lateinit var actionId: String + var throttledCount: Int = 0 + var lastExecutionTime: Instant? = null + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + when (fieldName) { + ACTION_ID_FIELD -> actionId = xcp.text() + THROTTLED_COUNT_FIELD -> throttledCount = xcp.intValue() + LAST_EXECUTION_TIME_FIELD -> lastExecutionTime = xcp.instant() + + else -> { + throw IllegalStateException("Unexpected field: $fieldName, while parsing action") + } + } + } + + requireNotNull(actionId) { "Must set action id" } + return ActionExecutionResult(actionId, lastExecutionTime, throttledCount) + } + } +} diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/Alert.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/Alert.kt index 995bd179..57dd40a0 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/Alert.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/Alert.kt @@ -41,7 +41,8 @@ data class Alert( val acknowledgedTime: Instant? = null, val errorMessage: String? = null, val errorHistory: List, - val severity: String + val severity: String, + val actionExecutionResults: List ) : ToXContent { init { @@ -57,11 +58,12 @@ data class Alert( lastNotificationTime: Instant?, state: State = State.ACTIVE, errorMessage: String? = null, - errorHistory: List = mutableListOf() + errorHistory: List = mutableListOf(), + actionExecutionResults: List = mutableListOf() ) : this(monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime, lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory, - severity = trigger.severity) + severity = trigger.severity, actionExecutionResults = actionExecutionResults) enum class State { ACTIVE, ACKNOWLEDGED, COMPLETED, ERROR, DELETED @@ -86,6 +88,7 @@ data class Alert( const val ERROR_MESSAGE_FIELD = "error_message" const val ALERT_HISTORY_FIELD = "alert_history" const val SEVERITY_FIELD = "severity" + const val ACTION_EXECUTION_RESULTS_FIELD = "action_execution_results" const val NO_ID = "" const val NO_VERSION = Versions.NOT_FOUND @@ -115,6 +118,7 @@ data class Alert( var acknowledgedTime: Instant? = null var errorMessage: String? = null val errorHistory: MutableList = mutableListOf() + var actionExecutionResults: MutableList = mutableListOf() ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { @@ -140,6 +144,12 @@ data class Alert( } } SEVERITY_FIELD -> severity = xcp.text() + ACTION_EXECUTION_RESULTS_FIELD -> { + ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp::getTokenLocation) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + actionExecutionResults.add(ActionExecutionResult.parse(xcp)) + } + } } } @@ -148,7 +158,8 @@ data class Alert( triggerId = requireNotNull(triggerId), triggerName = requireNotNull(triggerName), state = requireNotNull(state), startTime = requireNotNull(startTime), endTime = endTime, lastNotificationTime = lastNotificationTime, acknowledgedTime = acknowledgedTime, - errorMessage = errorMessage, errorHistory = errorHistory, severity = severity) + errorMessage = errorMessage, errorHistory = errorHistory, severity = severity, + actionExecutionResults = actionExecutionResults) } } @@ -163,6 +174,7 @@ data class Alert( .field(ERROR_MESSAGE_FIELD, errorMessage) .field(ALERT_HISTORY_FIELD, errorHistory.toTypedArray()) .field(SEVERITY_FIELD, severity) + .field(ACTION_EXECUTION_RESULTS_FIELD, actionExecutionResults.toTypedArray()) .optionalTimeField(START_TIME_FIELD, startTime) .optionalTimeField(LAST_NOTIFICATION_TIME_FIELD, lastNotificationTime) .optionalTimeField(END_TIME_FIELD, endTime) diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/MonitorRunResult.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/MonitorRunResult.kt index 8c86bace..3b79a823 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/MonitorRunResult.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/MonitorRunResult.kt @@ -100,17 +100,21 @@ data class TriggerRunResult( } data class ActionRunResult( + val actionId: String, val actionName: String, val output: Map, val throttled: Boolean = false, + val executionTime: Instant? = null, val error: Exception? = null ) : ToXContent { override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { return builder.startObject() + .field("id", actionId) .field("name", actionName) .field("output", output) .field("throttled", throttled) + .optionalTimeField("executionTime", executionTime) .field("error", error?.message) .endObject() } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/action/Action.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/action/Action.kt index a8064f23..aafdd074 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/action/Action.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/action/Action.kt @@ -15,6 +15,7 @@ package com.amazon.opendistroforelasticsearch.alerting.model.action +import org.elasticsearch.common.UUIDs import org.elasticsearch.common.xcontent.ToXContent import org.elasticsearch.common.xcontent.ToXContentObject import org.elasticsearch.common.xcontent.XContentBuilder @@ -31,7 +32,10 @@ data class Action( val name: String, val destinationId: String, val subjectTemplate: Script?, - val messageTemplate: Script + val messageTemplate: Script, + val throttleEnabled: Boolean, + val throttle: Throttle?, + val id: String = UUIDs.base64UUID() ) : ToXContentObject { init { @@ -43,10 +47,13 @@ data class Action( override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { return builder.startObject() + .field(ID_FIELD, id) .field(NAME_FIELD, name) .field(DESTINATION_ID_FIELD, destinationId) .field(SUBJECT_TEMPLATE_FIELD, subjectTemplate) .field(MESSAGE_TEMPLATE_FIELD, messageTemplate) + .field(THROTTLE_ENABLED_FIELD, throttleEnabled) + .field(THROTTLE_FIELD, throttle) .endObject() } @@ -55,10 +62,13 @@ data class Action( } companion object { + const val ID_FIELD = "id" const val NAME_FIELD = "name" const val DESTINATION_ID_FIELD = "destination_id" const val SUBJECT_TEMPLATE_FIELD = "subject_template" const val MESSAGE_TEMPLATE_FIELD = "message_template" + const val THROTTLE_ENABLED_FIELD = "throttle_enabled" + const val THROTTLE_FIELD = "throttle" const val MUSTACHE = "mustache" const val SUBJECT = "subject" const val MESSAGE = "message" @@ -67,29 +77,42 @@ data class Action( @JvmStatic @Throws(IOException::class) fun parse(xcp: XContentParser): Action { + var id = UUIDs.base64UUID() // assign a default action id if one is not specified lateinit var name: String lateinit var destinationId: String var subjectTemplate: Script? = null // subject template could be null for some destinations lateinit var messageTemplate: Script + var throttleEnabled = false + var throttle: Throttle? = null XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { val fieldName = xcp.currentName() xcp.nextToken() when (fieldName) { + ID_FIELD -> id = xcp.text() NAME_FIELD -> name = xcp.textOrNull() DESTINATION_ID_FIELD -> destinationId = xcp.textOrNull() SUBJECT_TEMPLATE_FIELD -> subjectTemplate = Script.parse(xcp, Script.DEFAULT_TEMPLATE_LANG) MESSAGE_TEMPLATE_FIELD -> messageTemplate = Script.parse(xcp, Script.DEFAULT_TEMPLATE_LANG) + THROTTLE_FIELD -> throttle = Throttle.parse(xcp) + THROTTLE_ENABLED_FIELD -> { + throttleEnabled = xcp.booleanValue() + } + else -> { throw IllegalStateException("Unexpected field: $fieldName, while parsing action") } } } - return Action(requireNotNull(name) { "Destination name is null" }, + + return Action(requireNotNull(name) { "Action name is null" }, requireNotNull(destinationId) { "Destination id is null" }, subjectTemplate, - requireNotNull(messageTemplate) { "Destination message template is null" }) + requireNotNull(messageTemplate) { "Action message template is null" }, + throttleEnabled, + throttle, + id = requireNotNull(id)) } } } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/action/Throttle.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/action/Throttle.kt new file mode 100644 index 00000000..b2f3367f --- /dev/null +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/action/Throttle.kt @@ -0,0 +1,84 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.alerting.model.action + +import org.apache.commons.codec.binary.StringUtils +import org.elasticsearch.common.xcontent.ToXContent +import org.elasticsearch.common.xcontent.ToXContentObject +import org.elasticsearch.common.xcontent.XContentBuilder +import org.elasticsearch.common.xcontent.XContentParser +import org.elasticsearch.common.xcontent.XContentParserUtils +import java.io.IOException +import java.lang.IllegalStateException +import java.time.temporal.ChronoUnit +import java.util.Locale + +data class Throttle( + val value: Int, + val unit: ChronoUnit +) : ToXContentObject { + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field(VALUE_FIELD, value) + .field(UNIT_FIELD, unit.name) + .endObject() + } + + companion object { + const val VALUE_FIELD = "value" + const val UNIT_FIELD = "unit" + + @JvmStatic + @Throws(IOException::class) + fun parse(xcp: XContentParser): Throttle { + var value: Int = 0 + var unit: ChronoUnit? = null + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + when (fieldName) { + UNIT_FIELD -> { + val unitString = xcp.text().toUpperCase(Locale.ROOT) + require(StringUtils.equals(unitString, ChronoUnit.MINUTES.name), { "Only support MINUTES throttle unit currently" }) + unit = ChronoUnit.valueOf(unitString) + } + VALUE_FIELD -> { + val currentToken = xcp.currentToken() + require(currentToken != XContentParser.Token.VALUE_NULL, { "Throttle value can't be null" }) + when { + currentToken.isValue -> { + value = xcp.intValue() + require(value > 0, { "Can only set positive throttle period" }) + } + else -> { + XContentParserUtils.throwUnknownToken(currentToken, xcp.tokenLocation) + } + } + } + + else -> { + throw IllegalStateException("Unexpected field: $fieldName, while parsing action") + } + } + } + + return Throttle(value = value, unit = requireNotNull(unit)) + } + } +} diff --git a/alerting/src/main/resources/com/amazon/opendistroforelasticsearch/alerting/alerts/alert_mapping.json b/alerting/src/main/resources/com/amazon/opendistroforelasticsearch/alerting/alerts/alert_mapping.json index 8b9a9a2f..844c37b7 100644 --- a/alerting/src/main/resources/com/amazon/opendistroforelasticsearch/alerting/alerts/alert_mapping.json +++ b/alerting/src/main/resources/com/amazon/opendistroforelasticsearch/alerting/alerts/alert_mapping.json @@ -63,6 +63,20 @@ "type": "text" } } + }, + "action_execution_results": { + "type": "nested", + "properties": { + "action_id": { + "type": "keyword" + }, + "last_execution_time": { + "type": "date" + }, + "throttled_count": { + "type": "integer" + } + } } } } diff --git a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/TestHelpers.kt index 521a5420..e8179907 100644 --- a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/TestHelpers.kt @@ -23,6 +23,8 @@ import com.amazon.opendistroforelasticsearch.alerting.core.model.IntervalSchedul import com.amazon.opendistroforelasticsearch.alerting.core.model.Schedule import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput import com.amazon.opendistroforelasticsearch.alerting.elasticapi.string +import com.amazon.opendistroforelasticsearch.alerting.model.ActionExecutionResult +import com.amazon.opendistroforelasticsearch.alerting.model.action.Throttle import org.apache.http.Header import org.apache.http.HttpEntity import org.elasticsearch.client.Response @@ -35,6 +37,7 @@ import org.elasticsearch.script.ScriptType import org.elasticsearch.search.builder.SearchSourceBuilder import org.elasticsearch.test.ESTestCase import org.elasticsearch.test.ESTestCase.randomInt +import org.elasticsearch.test.ESTestCase.randomIntBetween import org.elasticsearch.test.rest.ESRestTestCase import java.time.Instant import java.time.temporal.ChronoUnit @@ -86,14 +89,29 @@ fun randomTemplateScript( fun randomAction( name: String = ESRestTestCase.randomUnicodeOfLength(10), template: Script = randomTemplateScript("Hello World"), - destinationId: String = "123" -) = Action(name, destinationId, template, template) + destinationId: String = "123", + throttleEnabled: Boolean = false, + throttle: Throttle = randomThrottle() +) = Action(name, destinationId, template, template, throttleEnabled, throttle) + +fun randomThrottle( + value: Int = randomIntBetween(1, 100), + unit: ChronoUnit = ChronoUnit.MINUTES +) = Throttle(value, unit) fun randomAlert(monitor: Monitor = randomMonitor()): Alert { val trigger = randomTrigger() - return Alert(monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null) + val actionExecutionResults = mutableListOf(randomActionExecutionResult(), randomActionExecutionResult()) + return Alert(monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null, + actionExecutionResults = actionExecutionResults) } +fun randomActionExecutionResult( + actionId: String = UUIDs.base64UUID(), + lastExecutionTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), + throttledCount: Int = randomInt() +) = ActionExecutionResult(actionId, lastExecutionTime, throttledCount) + fun Monitor.toJsonString(): String { val builder = XContentFactory.jsonBuilder() return this.toXContent(builder).string() diff --git a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/XContentTests.kt b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/XContentTests.kt index 586ac5fe..9adcec10 100644 --- a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/XContentTests.kt +++ b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/XContentTests.kt @@ -15,14 +15,17 @@ package com.amazon.opendistroforelasticsearch.alerting.model +import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput +import com.amazon.opendistroforelasticsearch.alerting.elasticapi.string import com.amazon.opendistroforelasticsearch.alerting.model.action.Action +import com.amazon.opendistroforelasticsearch.alerting.model.action.Throttle +import com.amazon.opendistroforelasticsearch.alerting.randomAction +import com.amazon.opendistroforelasticsearch.alerting.randomActionExecutionResult import com.amazon.opendistroforelasticsearch.alerting.randomAlert import com.amazon.opendistroforelasticsearch.alerting.randomMonitor -import com.amazon.opendistroforelasticsearch.alerting.randomTemplateScript +import com.amazon.opendistroforelasticsearch.alerting.randomThrottle import com.amazon.opendistroforelasticsearch.alerting.randomTrigger import com.amazon.opendistroforelasticsearch.alerting.toJsonString -import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput -import com.amazon.opendistroforelasticsearch.alerting.elasticapi.string import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.xcontent.LoggingDeprecationHandler import org.elasticsearch.common.xcontent.NamedXContentRegistry @@ -30,10 +33,9 @@ import org.elasticsearch.common.xcontent.ToXContent import org.elasticsearch.common.xcontent.XContentBuilder import org.elasticsearch.common.xcontent.XContentParser import org.elasticsearch.common.xcontent.XContentType -import org.elasticsearch.script.Script import org.elasticsearch.search.SearchModule import org.elasticsearch.test.ESTestCase -import org.elasticsearch.test.rest.ESRestTestCase +import kotlin.test.assertFailsWith class XContentTests : ESTestCase() { @@ -44,11 +46,27 @@ class XContentTests : ESTestCase() { assertEquals("Round tripping Monitor doesn't work", action, parsedAction) } - private fun randomAction( - name: String = ESRestTestCase.randomUnicodeOfLength(10), - template: Script = randomTemplateScript("Hello World"), - destinationId: String = "123" - ) = Action(name, destinationId, template, template) + fun `test throttle parsing`() { + val throttle = randomThrottle() + val throttleString = throttle.toXContent(builder(), ToXContent.EMPTY_PARAMS).string() + val parsedThrottle = Throttle.parse(parser(throttleString)) + assertEquals("Round tripping Monitor doesn't work", throttle, parsedThrottle) + } + + fun `test throttle parsing with wrong unit`() { + val throttle = randomThrottle() + val throttleString = throttle.toXContent(builder(), ToXContent.EMPTY_PARAMS).string() + val wrongThrottleString = throttleString.replace("MINUTES", "wrongunit") + + assertFailsWith("Only support MINUTES throttle unit") { Throttle.parse(parser(wrongThrottleString)) } + } + + fun `test throttle parsing with negative value`() { + val throttle = randomThrottle().copy(value = -1) + val throttleString = throttle.toXContent(builder(), ToXContent.EMPTY_PARAMS).string() + + assertFailsWith("Can only set positive throttle period") { Throttle.parse(parser(throttleString)) } + } fun `test monitor parsing`() { val monitor = randomMonitor() @@ -76,6 +94,15 @@ class XContentTests : ESTestCase() { assertEquals("Round tripping alert doesn't work", alert, parsedAlert) } + fun `test action execution result parsing`() { + val actionExecutionResult = randomActionExecutionResult() + + val actionExecutionResultString = actionExecutionResult.toXContent(builder(), ToXContent.EMPTY_PARAMS).string() + val parsedActionExecutionResultString = ActionExecutionResult.parse(parser(actionExecutionResultString)) + + assertEquals("Round tripping alert doesn't work", actionExecutionResult, parsedActionExecutionResultString) + } + fun `test creating a monitor with duplicate trigger ids fails`() { try { val repeatedTrigger = randomTrigger() diff --git a/core/src/main/resources/mappings/scheduled-jobs.json b/core/src/main/resources/mappings/scheduled-jobs.json index a9a2dda9..6ff76dee 100644 --- a/core/src/main/resources/mappings/scheduled-jobs.json +++ b/core/src/main/resources/mappings/scheduled-jobs.json @@ -114,6 +114,19 @@ "message_template": { "type": "object", "enabled": false + }, + "throttle_enabled": { + "type": "boolean" + }, + "throttle": { + "properties": { + "value": { + "type": "integer" + }, + "unit": { + "type": "keyword" + } + } } } }