Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Throttle #59

Merged
merged 4 commits into from
May 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ class MonitorRunner(
val updatedActionExecutionResults = mutableListOf<ActionExecutionResult>()
val currentActionIds = mutableSetOf<String>()
if (currentAlert != null) {
// update current alert's action execution results
for (actionExecutionResult in currentAlert.actionExecutionResults) {
val actionId = actionExecutionResult.actionId
currentActionIds.add(actionId)
Expand All @@ -239,7 +240,8 @@ class MonitorRunner(
else -> updatedActionExecutionResults.add(actionExecutionResult.copy(lastExecutionTime = actionRunResult.executionTime))
}
}
updatedActionExecutionResults.addAll(result.actionResults.filter { it -> currentActionIds.contains(it.key) }
// add action execution results which not exist in current alert
updatedActionExecutionResults.addAll(result.actionResults.filter { it -> !currentActionIds.contains(it.key) }
.map { it -> ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) })
} else {
updatedActionExecutionResults.addAll(result.actionResults.map { it -> ActionExecutionResult(it.key, it.value.executionTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ package com.amazon.opendistroforelasticsearch.alerting

import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertError
import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertIndices
import com.amazon.opendistroforelasticsearch.alerting.core.model.IntervalSchedule
import com.amazon.opendistroforelasticsearch.alerting.model.Alert
import com.amazon.opendistroforelasticsearch.alerting.model.Alert.State.ACKNOWLEDGED
import com.amazon.opendistroforelasticsearch.alerting.model.Alert.State.ACTIVE
import com.amazon.opendistroforelasticsearch.alerting.model.Alert.State.COMPLETED
import com.amazon.opendistroforelasticsearch.alerting.model.Alert.State.ERROR
import com.amazon.opendistroforelasticsearch.alerting.model.Monitor
import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
import com.amazon.opendistroforelasticsearch.alerting.model.ActionExecutionResult
import com.amazon.opendistroforelasticsearch.alerting.model.action.Throttle
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.index.query.QueryBuilders
import org.elasticsearch.rest.RestStatus
Expand All @@ -32,8 +35,10 @@ import org.elasticsearch.search.builder.SearchSourceBuilder
import java.time.Instant
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit
import java.time.temporal.ChronoUnit.DAYS
import java.time.temporal.ChronoUnit.MILLIS
import java.time.temporal.ChronoUnit.MINUTES

class MonitorRunnerIT : AlertingRestTestCase() {

Expand Down Expand Up @@ -507,6 +512,96 @@ class MonitorRunnerIT : AlertingRestTestCase() {
assertEquals("Enabled times do not match", monitor.enabledTime, retrievedMonitor.enabledTime)
}

fun `test monitor with throttled action for same alert`() {
val actionThrottleEnabled = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"),
destinationId = createDestination().id,
throttleEnabled = true, throttle = Throttle(value = 5, unit = MINUTES))
val actionThrottleNotEnabled = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"),
destinationId = createDestination().id,
throttleEnabled = false, throttle = Throttle(value = 5, unit = MINUTES))
val actions = listOf(actionThrottleEnabled, actionThrottleNotEnabled)
val monitor = createMonitor(randomMonitor(triggers = listOf(randomTrigger(condition = ALWAYS_RUN, actions = actions)),
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES)))
val monitorRunResultNotThrottled = entityAsMap(executeMonitor(monitor.id))
verifyActionThrottleResults(monitorRunResultNotThrottled, mutableMapOf(Pair(actionThrottleEnabled.id, false),
Pair(actionThrottleNotEnabled.id, false)))

val notThrottledAlert = searchAlerts(monitor)
assertEquals("1 alert should be returned", 1, notThrottledAlert.size)
verifyAlert(notThrottledAlert.single(), monitor, ACTIVE)
val notThrottledActionResults = verifyActionExecutionResultInAlert(notThrottledAlert[0],
mutableMapOf(Pair(actionThrottleEnabled.id, 0), Pair(actionThrottleNotEnabled.id, 0)))

assertEquals(notThrottledActionResults.size, 2)
val monitorRunResultThrottled = entityAsMap(executeMonitor(monitor.id))
verifyActionThrottleResults(monitorRunResultThrottled, mutableMapOf(Pair(actionThrottleEnabled.id, true),
Pair(actionThrottleNotEnabled.id, false)))

val throttledAlert = searchAlerts(monitor)
assertEquals("1 alert should be returned", 1, throttledAlert.size)
verifyAlert(throttledAlert.single(), monitor, ACTIVE)
val throttledActionResults = verifyActionExecutionResultInAlert(throttledAlert[0],
mutableMapOf(Pair(actionThrottleEnabled.id, 1), Pair(actionThrottleNotEnabled.id, 0)))

assertEquals(notThrottledActionResults.size, 2)

assertEquals(notThrottledActionResults[actionThrottleEnabled.id]!!.lastExecutionTime,
throttledActionResults[actionThrottleEnabled.id]!!.lastExecutionTime)
}

fun `test monitor with throttled action for different alerts`() {
val actionThrottleEnabled = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"),
destinationId = createDestination().id,
throttleEnabled = true, throttle = Throttle(value = 5, unit = MINUTES))
val actions = listOf(actionThrottleEnabled)
val trigger = randomTrigger(condition = ALWAYS_RUN, actions = actions)
val monitor = createMonitor(randomMonitor(triggers = listOf(trigger),
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES)))
val monitorRunResult1 = entityAsMap(executeMonitor(monitor.id))
verifyActionThrottleResults(monitorRunResult1, mutableMapOf(Pair(actionThrottleEnabled.id, false)))

val activeAlert1 = searchAlerts(monitor)
assertEquals("1 alert should be returned", 1, activeAlert1.size)
verifyAlert(activeAlert1.single(), monitor, ACTIVE)
val actionResults1 = verifyActionExecutionResultInAlert(activeAlert1[0], mutableMapOf(Pair(actionThrottleEnabled.id, 0)))

updateMonitor(monitor.copy(triggers = listOf(trigger.copy(condition = NEVER_RUN)), id = monitor.id))
executeMonitor(monitor.id)
val completedAlert = searchAlerts(monitor, AlertIndices.ALL_INDEX_PATTERN).single()
verifyAlert(completedAlert, monitor, COMPLETED)

updateMonitor(monitor.copy(triggers = listOf(trigger.copy(condition = ALWAYS_RUN)), id = monitor.id))
val monitorRunResult2 = entityAsMap(executeMonitor(monitor.id))
verifyActionThrottleResults(monitorRunResult2, mutableMapOf(Pair(actionThrottleEnabled.id, false)))
val activeAlert2 = searchAlerts(monitor)
assertEquals("1 alert should be returned", 1, activeAlert2.size)
assertNotEquals(activeAlert1[0].id, activeAlert2[0].id)

val actionResults2 = verifyActionExecutionResultInAlert(activeAlert2[0], mutableMapOf(Pair(actionThrottleEnabled.id, 0)))
assertNotEquals(actionResults1[actionThrottleEnabled.id]!!.lastExecutionTime,
actionResults2[actionThrottleEnabled.id]!!.lastExecutionTime)
}

private fun verifyActionExecutionResultInAlert(alert: Alert, expectedResult: Map<String, Int>):
MutableMap<String, ActionExecutionResult> {
val actionResult = mutableMapOf<String, ActionExecutionResult>()
for (result in alert.actionExecutionResults) {
val expected = expectedResult[result.actionId]
assertEquals(expected, result.throttledCount)
actionResult.put(result.actionId, result)
}
return actionResult
}

private fun verifyActionThrottleResults(output: MutableMap<String, Any>, expectedResult: Map<String, Boolean>) {
for (triggerResult in output.objectMap("trigger_results").values) {
for (actionResult in triggerResult.objectMap("action_results").values) {
val expected = expectedResult[actionResult["id"]]
assertEquals(expected, actionResult["throttled"])
}
}
}

private fun verifyAlert(alert: Alert, monitor: Monitor, expectedState: Alert.State = ACTIVE) {
assertNotNull(alert.id)
assertNotNull(alert.startTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ public class DestinationHttpClient {
private static final int TIMEOUT_MILLISECONDS = (int) TimeValue.timeValueSeconds(5).millis();
private static final int SOCKET_TIMEOUT_MILLISECONDS = (int)TimeValue.timeValueSeconds(50).millis();

/**
* all valid response status
*/
private static final Set<Integer> VALID_RESPONSE_STATUS = Collections.unmodifiableSet(new HashSet<>(
Arrays.asList(RestStatus.OK.getStatus(), RestStatus.CREATED.getStatus(), RestStatus.ACCEPTED.getStatus(),
RestStatus.NON_AUTHORITATIVE_INFORMATION.getStatus(), RestStatus.NO_CONTENT.getStatus(),
RestStatus.RESET_CONTENT.getStatus(), RestStatus.PARTIAL_CONTENT.getStatus(),
RestStatus.MULTI_STATUS.getStatus())));
/**
* all valid response status
*/
private static final Set<Integer> VALID_RESPONSE_STATUS = Collections.unmodifiableSet(new HashSet<>(
Arrays.asList(RestStatus.OK.getStatus(), RestStatus.CREATED.getStatus(), RestStatus.ACCEPTED.getStatus(),
RestStatus.NON_AUTHORITATIVE_INFORMATION.getStatus(), RestStatus.NO_CONTENT.getStatus(),
RestStatus.RESET_CONTENT.getStatus(), RestStatus.PARTIAL_CONTENT.getStatus(),
RestStatus.MULTI_STATUS.getStatus())));

private static CloseableHttpClient HTTP_CLIENT = createHttpClient();

Expand Down Expand Up @@ -166,7 +166,7 @@ public String getResponseString(CloseableHttpResponse response) throws IOExcepti
private void validateResponseStatus(HttpResponse response) throws IOException {
int statusCode = response.getStatusLine().getStatusCode();

if (!(VALID_RESPONSE_STATUS.contains(statusCode))) {
if (!(VALID_RESPONSE_STATUS.contains(statusCode))) {
throw new IOException("Failed: " + response);
}
}
Expand Down