Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Alerting Anomaly detection integration
Browse files Browse the repository at this point in the history
  • Loading branch information
ylwu-amzn committed Nov 26, 2019
1 parent 6c1d0de commit ae1cfc6
Show file tree
Hide file tree
Showing 21 changed files with 940 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
import com.amazon.opendistroforelasticsearch.alerting.core.resthandler.RestScheduledJobStatsHandler
import com.amazon.opendistroforelasticsearch.alerting.core.schedule.JobScheduler
import com.amazon.opendistroforelasticsearch.alerting.core.settings.ScheduledJobSettings
import com.amazon.opendistroforelasticsearch.alerting.model.AnomalyDetectorInput
import com.amazon.opendistroforelasticsearch.alerting.model.Monitor
import com.amazon.opendistroforelasticsearch.alerting.resthandler.RestAcknowledgeAlertAction
import com.amazon.opendistroforelasticsearch.alerting.resthandler.RestDeleteDestinationAction
Expand Down Expand Up @@ -68,7 +69,7 @@ import java.util.function.Supplier
/**
* Entry point of the OpenDistro for Elasticsearch alerting plugin
* This class initializes the [RestGetMonitorAction], [RestDeleteMonitorAction], [RestIndexMonitorAction] rest handlers.
* It also adds [Monitor.XCONTENT_REGISTRY], [SearchInput.XCONTENT_REGISTRY] to the
* It also adds [Monitor.XCONTENT_REGISTRY], [SearchInput.XCONTENT_REGISTRY], [AnomalyDetectorInput.XCONTENT_REGISTRY] to the
* [NamedXContentRegistry] so that we are able to deserialize the custom named objects.
*/
internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, Plugin() {
Expand Down Expand Up @@ -118,7 +119,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, P
}

override fun getNamedXContent(): List<NamedXContentRegistry.Entry> {
return listOf(Monitor.XCONTENT_REGISTRY, SearchInput.XCONTENT_REGISTRY)
return listOf(Monitor.XCONTENT_REGISTRY, SearchInput.XCONTENT_REGISTRY, AnomalyDetectorInput.XCONTENT_REGISTRY)
}

override fun createComponents(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@

package com.amazon.opendistroforelasticsearch.alerting

import com.amazon.elasticsearch.ad.transport.AnomalyResultAction
import com.amazon.elasticsearch.ad.transport.AnomalyResultRequest
import com.amazon.elasticsearch.ad.transport.AnomalyResultResponse
import com.amazon.elasticsearch.ad.transport.exception.EndRunException
import com.amazon.elasticsearch.ad.transport.exception.InternalFailure
import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertError
import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertIndices
import com.amazon.opendistroforelasticsearch.alerting.alerts.moveAlerts
Expand All @@ -34,6 +39,7 @@ import com.amazon.opendistroforelasticsearch.alerting.model.Alert.State.ACTIVE
import com.amazon.opendistroforelasticsearch.alerting.model.Alert.State.COMPLETED
import com.amazon.opendistroforelasticsearch.alerting.model.Alert.State.DELETED
import com.amazon.opendistroforelasticsearch.alerting.model.Alert.State.ERROR
import com.amazon.opendistroforelasticsearch.alerting.model.AnomalyDetectorInput
import com.amazon.opendistroforelasticsearch.alerting.model.InputRunResults
import com.amazon.opendistroforelasticsearch.alerting.model.Monitor
import com.amazon.opendistroforelasticsearch.alerting.model.MonitorRunResult
Expand Down Expand Up @@ -292,6 +298,10 @@ class MonitorRunner(
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
results += searchResponse.convertToMap()
}
is AnomalyDetectorInput -> {
val detectorId = input.detectorId
results += executeAnomalyDetector(monitor, detectorId, periodStart, periodEnd)
}
else -> {
throw IllegalArgumentException("Unsupported input type: ${input.name()}.")
}
Expand All @@ -304,6 +314,23 @@ class MonitorRunner(
}
}

private fun executeAnomalyDetector(monitor: Monitor, detectorId: String, start: Instant, end: Instant): Map<String, Any> {
try {
val request = AnomalyResultRequest(detectorId, start.toEpochMilli(), end.toEpochMilli())
val future = client.execute(AnomalyResultAction.INSTANCE, request)
val result = AnomalyResultResponse.fromActionResponse(future.actionGet())
return result.convertToMap()
} catch (e: Exception) { // TODO: stop monitor for EndRunException and InternalFailure
when (e) {
is EndRunException, is InternalFailure -> {
logger.error("Fail to execute AD $detectorId for monitor ${monitor.id}: ${e.cause}")
}
else -> logger.error("Fail to execute AD $detectorId for monitor ${monitor.id}", e)
}
return emptyMap()
}
}

private fun runTrigger(monitor: Monitor, trigger: Trigger, ctx: TriggerExecutionContext): TriggerRunResult {
return try {
val triggered = scriptService.compile(trigger.condition, TriggerScript.CONTEXT)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.alerting.model

import com.amazon.opendistroforelasticsearch.alerting.core.model.Input
import org.elasticsearch.common.CheckedFunction
import org.elasticsearch.common.ParseField
import org.elasticsearch.common.xcontent.NamedXContentRegistry
import org.elasticsearch.common.xcontent.ToXContent
import org.elasticsearch.common.xcontent.XContentBuilder
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentParser.Token
import org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import java.io.IOException

/**
* This class represents anomaly detector which will be used by monitor.
* The anomaly detector's result will be used as monitor's input result.
*/
data class AnomalyDetectorInput(val detectorId: String) : Input {

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return builder.startObject()
.startObject(ANOMALY_DETECTOR_FIELD)
.field(DETECTOR_ID_FIELD, detectorId)
.endObject()
.endObject()
}

override fun name(): String {
return ANOMALY_DETECTOR_FIELD
}

companion object {
const val ANOMALY_DETECTOR_FIELD = "anomaly_detector"
const val DETECTOR_ID_FIELD = "detector_id"

val XCONTENT_REGISTRY = NamedXContentRegistry.Entry(Input::class.java, ParseField(ANOMALY_DETECTOR_FIELD),
CheckedFunction { parseInner(it) })

@JvmStatic @Throws(IOException::class)
private fun parseInner(xcp: XContentParser): AnomalyDetectorInput {
lateinit var detectorId: String

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()
when (fieldName) {
DETECTOR_ID_FIELD -> {
detectorId = xcp.text()
}
}
}

return AnomalyDetectorInput(detectorId)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,42 @@
*/
package com.amazon.opendistroforelasticsearch.alerting.resthandler

import com.amazon.elasticsearch.ad.transport.StopDetectorAction
import com.amazon.elasticsearch.ad.transport.StopDetectorRequest
import com.amazon.elasticsearch.ad.transport.StopDetectorResponse
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.model.Alert
import com.amazon.opendistroforelasticsearch.alerting.util.REFRESH
import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin
import com.amazon.opendistroforelasticsearch.alerting.model.AnomalyDetectorInput
import com.amazon.opendistroforelasticsearch.alerting.model.Monitor
import com.amazon.opendistroforelasticsearch.alerting.util.context
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.delete.DeleteRequest
import org.elasticsearch.action.get.GetRequest
import org.elasticsearch.action.get.GetResponse
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy
import org.elasticsearch.client.node.NodeClient
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler
import org.elasticsearch.common.xcontent.XContentHelper
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.rest.BaseRestHandler
import org.elasticsearch.rest.BaseRestHandler.RestChannelConsumer
import org.elasticsearch.rest.BytesRestResponse
import org.elasticsearch.rest.RestChannel
import org.elasticsearch.rest.RestController
import org.elasticsearch.rest.RestRequest
import org.elasticsearch.rest.RestRequest.Method.DELETE
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.rest.action.RestActionListener
import org.elasticsearch.rest.action.RestActions
import org.elasticsearch.rest.action.RestStatusToXContentListener
import org.elasticsearch.search.fetch.subphase.FetchSourceContext
import java.io.IOException

private val log = LogManager.getLogger(RestDeleteMonitorAction::class.java)

/**
* This class consists of the REST handler to delete monitors.
* When a monitor is deleted, all alerts are moved to the [Alert.State.DELETED] state and moved to the alert history index.
Expand All @@ -51,10 +71,66 @@ class RestDeleteMonitorAction(settings: Settings, controller: RestController) :
val monitorId = request.param("monitorID")
val refreshPolicy = RefreshPolicy.parse(request.param(REFRESH, RefreshPolicy.IMMEDIATE.value))

return RestChannelConsumer { channel ->
val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId)
.setRefreshPolicy(refreshPolicy)
client.delete(deleteRequest, RestStatusToXContentListener(channel))
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId)
.version(RestActions.parseVersion(request))
.fetchSourceContext(context(request))

if (request.method() == RestRequest.Method.HEAD) {
getRequest.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE)
}

return RestChannelConsumer { channel -> client.get(getRequest, getMonitorResponse(channel, client, monitorId, refreshPolicy)) }
}

private fun getMonitorResponse(
channel: RestChannel,
client: NodeClient,
monitorId: String,
refreshPolicy: RefreshPolicy
): RestActionListener<GetResponse> {
return object : RestActionListener<GetResponse>(channel) {
@Throws(Exception::class)
override fun processResponse(response: GetResponse) {
if (!response.isExists) {
val message = channel.newErrorBuilder().startObject()
.field("message", "Can't find monitor with id:" + monitorId)
.endObject()
channel.sendResponse(BytesRestResponse(RestStatus.NOT_FOUND, message))
return
}

XContentHelper.createParser(channel.request().xContentRegistry, LoggingDeprecationHandler.INSTANCE,
response.sourceAsBytesRef, XContentType.JSON).use { xcp ->
val monitor: Monitor = ScheduledJob.parse(xcp, response.id, response.version) as Monitor
val anomalyDetectorInput = monitor.inputs.firstOrNull { input -> input is AnomalyDetectorInput }
var anomalyDetectorDeleted = false

if (anomalyDetectorInput != null && anomalyDetectorInput is AnomalyDetectorInput) {
anomalyDetectorDeleted = deleteAnomalyDetector(client, anomalyDetectorInput.detectorId, monitorId)
if (!anomalyDetectorDeleted) {
channel.sendResponse(BytesRestResponse(RestStatus.EXPECTATION_FAILED,
"Failed to delete anomaly detector's resources"))
return
}
}

val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId).setRefreshPolicy(refreshPolicy)
client.delete(deleteRequest, RestStatusToXContentListener(channel))
}
}
}
}

// TODO: handle delete failure
private fun deleteAnomalyDetector(client: NodeClient, detectorId: String, monitorId: String): Boolean {
try {
val request = StopDetectorRequest(detectorId)
val future = client.execute(StopDetectorAction.INSTANCE, request)
val result = StopDetectorResponse.fromActionResponse(future.actionGet())
return result.success()
} catch (e: Exception) { // TODO: add exception handling
log.error("Fail to delete anomaly detector $detectorId for monitor $monitorId", e)
return false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class RestExecuteMonitorAction(
.field("message", "Can't find monitor with id: ${response.id}")
.endObject()
this.channel.sendResponse(BytesRestResponse(RestStatus.NOT_FOUND, ret))
return
}

val xcp = (this.channel.request().xContentType ?: XContentType.JSON).xContent()
Expand Down
Loading

0 comments on commit ae1cfc6

Please sign in to comment.