Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Adding HTTP Input type for monitors #82

Merged
merged 25 commits into from
Aug 1, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
38d2c20
Adding HttpInput
ann3431 Jul 16, 2019
ffbff35
Update HttpInput.kt
ann3431 Jul 17, 2019
67ff5fe
Update README.md
ann3431 Jul 17, 2019
00de8d1
Added field validation in HttpInput and test
ann3431 Jul 18, 2019
cc1821a
Merge branch 'dev' of https://github.com/ann3431/alerting into dev
ann3431 Jul 18, 2019
8e7a44a
Update HttpExtensions.kt
ann3431 Jul 18, 2019
a9a0596
Implementation of reusing AsyncClient
ann3431 Jul 18, 2019
5ff1f9f
Restrict port of localhost to 9200
ann3431 Jul 19, 2019
312c93e
Changed timeout values to units of sec
ann3431 Jul 19, 2019
d6cffe1
Update HttpInput.kt
ann3431 Jul 19, 2019
6bb598d
Merge branch 'reuse-client' into dev
ann3431 Jul 19, 2019
97c897e
Fixed timeout values in tests
ann3431 Jul 19, 2019
3d9b076
Merge pull request #2 from ann3431/dev
ann3431 Jul 19, 2019
cd30e8c
Change max connection timeout to 5s, close Client
ann3431 Jul 19, 2019
01fcb38
Limit response size and closing client
ann3431 Jul 19, 2019
005b5c2
Merge branch 'dev'
ann3431 Jul 19, 2019
f8dbc8f
Change log level to debug
ann3431 Jul 24, 2019
0d08d15
Finished async implementation
ann3431 Jul 25, 2019
7aeb3be
Merge branch 'async'
ann3431 Jul 25, 2019
3e1dcc3
Moved port validation logic to RestIndexMonitorAction
ann3431 Jul 31, 2019
9f750b3
Style fix
ann3431 Jul 31, 2019
9fc5d79
Update MonitorRunner.kt
ann3431 Jul 31, 2019
7573b46
Revert "Update MonitorRunner.kt"
ann3431 Jul 31, 2019
7c13e92
Created toConstuctUrl() for use
ann3431 Aug 1, 2019
430d718
Removed unnecessary if statement
ann3431 Aug 1, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ Please see our [documentation](https://opendistro.github.io/for-elasticsearch-do
1. Check out this package from version control.
1. Launch Intellij IDEA, choose **Import Project**, and select the `settings.gradle` file in the root of this package.
1. To build from the command line, set `JAVA_HOME` to point to a JDK >= 12 before running `./gradlew`.
- Unix System
1. `export JAVA_HOME=jdk-install-dir`: Replace `jdk-install-dir` by the JAVA_HOME directory of your system.
1. `export PATH=$JAVA_HOME/bin:$PATH`

- Windows System
1. Find **My Computers** from file directory, right click and select **properties**.
1. Select the **Advanced** tab, select **Environment variables**.
1. Edit **JAVA_HOME** to path of where JDK softeare is installed.
ann3431 marked this conversation as resolved.
Show resolved Hide resolved


## Build
Expand Down
6 changes: 5 additions & 1 deletion alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ configurations.all {
force "commons-logging:commons-logging:${versions.commonslogging}"
force "org.apache.httpcomponents:httpcore:${versions.httpcore}"
force "commons-codec:commons-codec:${versions.commonscodec}"

force "commons-collections:commons-collections:3.2.2"
force "org.apache.httpcomponents:httpcore-nio:4.4.11"
force "org.apache.httpcomponents:httpclient:4.5.7"
ann3431 marked this conversation as resolved.
Show resolved Hide resolved

// This is required because kotlin coroutines core-1.1.1 still requires kotin stdlib 1.3.20 and we're using 1.3.21
force "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
force "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
Expand All @@ -58,6 +61,7 @@ configurations.all {

dependencies {
compileOnly "org.elasticsearch.plugin:elasticsearch-scripting-painless-spi:${versions.elasticsearch}"
compile "org.apache.httpcomponents:httpasyncclient:4.1.4"

// Elasticsearch Nanny state
compile "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.amazon.opendistroforelasticsearch.alerting.core.JobSweeper
import com.amazon.opendistroforelasticsearch.alerting.core.ScheduledJobIndices
import com.amazon.opendistroforelasticsearch.alerting.core.action.node.ScheduledJobsStatsAction
import com.amazon.opendistroforelasticsearch.alerting.core.action.node.ScheduledJobsStatsTransportAction
import com.amazon.opendistroforelasticsearch.alerting.core.model.HttpInput
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
import com.amazon.opendistroforelasticsearch.alerting.core.resthandler.RestScheduledJobStatsHandler
Expand Down Expand Up @@ -118,7 +119,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, P
}

override fun getNamedXContent(): List<NamedXContentRegistry.Entry> {
return listOf(Monitor.XCONTENT_REGISTRY, SearchInput.XCONTENT_REGISTRY)
return listOf(Monitor.XCONTENT_REGISTRY, SearchInput.XCONTENT_REGISTRY, HttpInput.XCONTENT_REGISTRY)
}

override fun createComponents(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ package com.amazon.opendistroforelasticsearch.alerting
import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertError
import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertIndices
import com.amazon.opendistroforelasticsearch.alerting.alerts.moveAlerts
import com.amazon.opendistroforelasticsearch.alerting.client.HttpInputClient
import com.amazon.opendistroforelasticsearch.alerting.core.JobRunner
import com.amazon.opendistroforelasticsearch.alerting.core.httpapi.suspendUntil
import com.amazon.opendistroforelasticsearch.alerting.core.httpapi.toGetRequest
import com.amazon.opendistroforelasticsearch.alerting.core.httpapi.toMap
import com.amazon.opendistroforelasticsearch.alerting.core.model.HttpInput
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
Expand Down Expand Up @@ -51,13 +56,14 @@ import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_MILLIS
import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils
import org.apache.logging.log4j.LogManager
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.apache.http.HttpResponse
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ExceptionsHelper
import org.elasticsearch.action.DocWriteRequest
import org.elasticsearch.action.bulk.BackoffPolicy
Expand Down Expand Up @@ -292,7 +298,14 @@ class MonitorRunner(
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
results += searchResponse.convertToMap()
}
else -> {
is HttpInput -> {
val httpClient = HttpInputClient()
ann3431 marked this conversation as resolved.
Show resolved Hide resolved
httpClient.client.start()
val response: HttpResponse = httpClient.client.suspendUntil {
ann3431 marked this conversation as resolved.
Show resolved Hide resolved
httpClient.client.execute(input.toGetRequest(), it) }
httpClient.client.close()
results += response.toMap()
ann3431 marked this conversation as resolved.
Show resolved Hide resolved
} else -> {
throw IllegalArgumentException("Unsupported input type: ${input.name()}.")
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.alerting.client

import com.amazon.opendistroforelasticsearch.alerting.core.model.HttpInput
import org.apache.http.client.config.RequestConfig
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import org.elasticsearch.common.unit.TimeValue
import java.security.AccessController
import java.security.PrivilegedAction

/**
* This class takes [HttpInput] and performs GET requests to given URIs.
*/
class HttpInputClient {
ann3431 marked this conversation as resolved.
Show resolved Hide resolved

// TODO: If possible, these settings should be implemented as changeable via the "_cluster/settings" API.
private val CONNECTION_TIMEOUT_MILLISECONDS = TimeValue.timeValueSeconds(10).millis().toInt()
private val REQUEST_TIMEOUT_MILLISECONDS = TimeValue.timeValueSeconds(10).millis().toInt()
private val SOCKET_TIMEOUT_MILLISECONDS = TimeValue.timeValueSeconds(10).millis().toInt()

val client = createHttpClient()

/**
* Create [CloseableHttpAsyncClient] as a [PrivilegedAction] in order to avoid [java.net.NetPermission] error.
*/
private fun createHttpClient(): CloseableHttpAsyncClient {
val config = RequestConfig.custom()
.setConnectTimeout(CONNECTION_TIMEOUT_MILLISECONDS)
.setConnectionRequestTimeout(REQUEST_TIMEOUT_MILLISECONDS)
.setSocketTimeout(SOCKET_TIMEOUT_MILLISECONDS)
.build()

return AccessController.doPrivileged(PrivilegedAction<CloseableHttpAsyncClient>({
HttpAsyncClientBuilder.create()
.setDefaultRequestConfig(config)
.useSystemProperties()
.build()
} as () -> CloseableHttpAsyncClient))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ package com.amazon.opendistroforelasticsearch.alerting
import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertError
import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertIndices
import com.amazon.opendistroforelasticsearch.alerting.core.model.IntervalSchedule
import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
import com.amazon.opendistroforelasticsearch.alerting.model.ActionExecutionResult
import com.amazon.opendistroforelasticsearch.alerting.model.Alert
import com.amazon.opendistroforelasticsearch.alerting.model.Alert.State.ACKNOWLEDGED
import com.amazon.opendistroforelasticsearch.alerting.model.Alert.State.ACTIVE
import com.amazon.opendistroforelasticsearch.alerting.model.Alert.State.COMPLETED
import com.amazon.opendistroforelasticsearch.alerting.model.Alert.State.ERROR
import com.amazon.opendistroforelasticsearch.alerting.model.Monitor
import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
import com.amazon.opendistroforelasticsearch.alerting.model.ActionExecutionResult
import com.amazon.opendistroforelasticsearch.alerting.model.action.Throttle
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.index.query.QueryBuilders
Expand Down Expand Up @@ -582,6 +582,116 @@ class MonitorRunnerIT : AlertingRestTestCase() {
actionResults2[actionThrottleEnabled.id]!!.lastExecutionTime)
}

fun `test monitor HttpInput with non JSON response `() {
val clusterIndex = randomInt(clusterHosts.size - 1)
val input = randomHttpInput(
scheme = clusterHosts[clusterIndex].schemeName,
host = clusterHosts[clusterIndex].hostName,
port = clusterHosts[clusterIndex].port,
path = "_cat/indices",
params = mapOf(),
url = "",
connection_timeout = 5000,
socket_timeout = 5000)
val monitor = createMonitor(randomMonitor(inputs = listOf(input)))
val response = executeMonitor(monitor.id)
val output = entityAsMap(response)
@Suppress("UNCHECKED_CAST")
val inputResults = output.stringMap("input_results")
@Suppress("UNCHECKED_CAST")
val errorMessage = inputResults?.get("error")
assertTrue("Error did not occur from receiving invalid format of response, error message is actually: " +
"$errorMessage\nOutput: $output",
errorMessage.toString().contains("Unrecognized token"))
}

fun `test monitor HttpInput with JSON response`() {
val clusterIndex = randomInt(clusterHosts.size - 1)
val input = randomHttpInput(
scheme = clusterHosts[clusterIndex].schemeName,
host = clusterHosts[clusterIndex].hostName,
port = clusterHosts[clusterIndex].port,
path = "_cluster/health",
params = mapOf(),
url = "",
connection_timeout = 5000,
socket_timeout = 5000)
val monitor = createMonitor(randomMonitor(inputs = listOf(input)))
val response = executeMonitor(monitor.id)

val output = entityAsMap(response)
@Suppress("UNCHECKED_CAST")
val inputResults = output.stringMap("input_results")
val resultsContent = (inputResults?.get("results") as ArrayList<*>).get(0)
val errorMessage = inputResults.get("error")

assertEquals(monitor.name, output["monitor_name"])
@Suppress("UNCHECKED_CAST")
assertTrue("Monitor results should contain cluster_name field, but was actually: $resultsContent",
resultsContent.toString().contains("cluster_name"))
@Suppress("UNCHECKED_CAST")
assertTrue("Error message should not exist, error: $errorMessage", errorMessage == null)
}

fun `test monitor HttpInput with alert triggered`() {
putAlertMappings() // Required as we do not have a create alert API.
val trigger = randomTrigger(condition = Script("""
return ctx.results[0].number_of_pending_tasks < 1
""".trimIndent()), destinationId = createDestination().id)
val clusterIndex = randomInt(clusterHosts.size - 1)
val input = randomHttpInput(
scheme = clusterHosts[clusterIndex].schemeName,
host = clusterHosts[clusterIndex].hostName,
port = clusterHosts[clusterIndex].port,
path = "_cluster/health",
params = mapOf(),
url = "",
connection_timeout = 5000,
socket_timeout = 5000)
val monitor = createMonitor(randomMonitor(inputs = listOf(input), triggers = listOf(trigger)))
val response = executeMonitor(monitor.id)

val output = entityAsMap(response)
assertEquals(monitor.name, output["monitor_name"])
for (triggerResult in output.objectMap("trigger_results").values) {
assertTrue("Should be triggered.", triggerResult.objectMap("action_results").isNotEmpty())
}

val alerts = searchAlerts(monitor)
assertEquals("Alert not saved, $output", 1, alerts.size)
verifyAlert(alerts.single(), monitor, ACTIVE)
}

fun `test monitor HttpInput with no alert triggered`() {
putAlertMappings() // Required as we do not have a create alert API.
val trigger = randomTrigger(condition = Script("""
return ctx.results[0].status.equals("red")
""".trimIndent()))
val clusterIndex = randomInt(clusterHosts.size - 1)
val input = randomHttpInput(
scheme = clusterHosts[clusterIndex].schemeName,
host = clusterHosts[clusterIndex].hostName,
port = clusterHosts[clusterIndex].port,
path = "_cluster/health",
ann3431 marked this conversation as resolved.
Show resolved Hide resolved
params = mapOf(),
url = "",
connection_timeout = 5000,
socket_timeout = 5000)
val monitor = createMonitor(randomMonitor(inputs = listOf(input), triggers = listOf(trigger)))
val response = executeMonitor(monitor.id)

val output = entityAsMap(response)
assertEquals(monitor.name, output["monitor_name"])
for (triggerResult in output.objectMap("trigger_results").values) {
val actionResults = triggerResult.objectMap("action_results")
@Suppress("UNCHECKED_CAST")
assertTrue("Should not be triggered, $actionResults", actionResults.isEmpty())
}

val alerts = searchAlerts(monitor)
assertEquals("Alert saved for test monitor, output: $output", 0, alerts.size)
}

private fun verifyActionExecutionResultInAlert(alert: Alert, expectedResult: Map<String, Int>):
MutableMap<String, ActionExecutionResult> {
val actionResult = mutableMapOf<String, ActionExecutionResult>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package com.amazon.opendistroforelasticsearch.alerting

import com.amazon.opendistroforelasticsearch.alerting.core.model.HttpInput
import com.amazon.opendistroforelasticsearch.alerting.model.Alert
import com.amazon.opendistroforelasticsearch.alerting.model.Monitor
import com.amazon.opendistroforelasticsearch.alerting.model.Trigger
Expand Down Expand Up @@ -66,6 +67,27 @@ fun randomMonitor(
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf())
}

fun randomHttpInput(
scheme: String = "http",
host: String = "localhost",
port: Int = randomInt(65535),
path: String = ESRestTestCase.randomAlphaOfLength(10),
params: Map<String, String> = mapOf(),
url: String = "",
connection_timeout: Int = randomInt(10000),
socket_timeout: Int = randomInt(10000)
): HttpInput {
return HttpInput(
scheme = scheme,
host = host,
port = port,
path = path,
params = params,
url = url,
connection_timeout = connection_timeout,
socket_timeout = socket_timeout)
}

fun randomTrigger(
id: String = UUIDs.base64UUID(),
name: String = ESRestTestCase.randomAlphaOfLength(10),
Expand Down
4 changes: 3 additions & 1 deletion core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.1.1'
compile "com.cronutils:cron-utils:7.0.5"

compile "org.apache.httpcomponents:httpasyncclient:4.1.4"
compile 'commons-validator:commons-validator:1.6'

testImplementation "org.elasticsearch.test:framework:${es_version}"
testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
testImplementation "org.jetbrains.kotlin:kotlin-test-junit:${kotlin_version}"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.amazon.opendistroforelasticsearch.alerting.core.httpapi

import com.amazon.opendistroforelasticsearch.alerting.core.model.HttpInput
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.suspendCancellableCoroutine
import org.apache.http.HttpResponse
import org.apache.http.client.config.RequestConfig
import org.apache.http.client.methods.HttpGet
import org.apache.http.client.utils.URIBuilder
import org.apache.http.concurrent.FutureCallback
import org.apache.http.nio.client.HttpAsyncClient
import org.apache.http.util.EntityUtils
import org.elasticsearch.common.Strings
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler
import org.elasticsearch.common.xcontent.NamedXContentRegistry
import org.elasticsearch.common.xcontent.XContentType
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException

suspend fun <C : HttpAsyncClient, T> C.suspendUntil(block: C.(FutureCallback<T>) -> Unit): T =
suspendCancellableCoroutine { cont ->
block(object : FutureCallback<T> {
override fun cancelled() {
cont.resumeWith(Result.failure(CancellationException("Request cancelled")))
ann3431 marked this conversation as resolved.
Show resolved Hide resolved
}

override fun completed(result: T) {
cont.resume(result)
}

override fun failed(ex: Exception) {
cont.resumeWithException(ex)
}
})
}

fun HttpResponse.toMap(): Map<String, Any> {
val xcp = XContentType.JSON.xContent().createParser(
NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, EntityUtils.toString(this.entity))
return xcp.map()
}

fun HttpInput.toGetRequest(): HttpGet {
val requestConfig = RequestConfig.custom()
.setConnectTimeout(this.connection_timeout)
.setSocketTimeout(this.socket_timeout)
.build()
// If url field is null or empty, construct an url field by field.
val constructedUrl = if (Strings.isNullOrEmpty(this.url)) {
ann3431 marked this conversation as resolved.
Show resolved Hide resolved
val uriBuilder = URIBuilder()
uriBuilder.scheme = this.scheme
uriBuilder.host = this.host
uriBuilder.port = this.port
uriBuilder.path = this.path
for (e in this.params.entries)
uriBuilder.addParameter(e.key, e.value)
uriBuilder.build().toString()
} else {
this.url
}
val httpGetRequest = HttpGet(constructedUrl)
httpGetRequest.config = requestConfig
return httpGetRequest
}
Loading