Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Adding HTTP Input type for monitors #82

Merged
merged 25 commits into from
Aug 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
38d2c20
Adding HttpInput
ann3431 Jul 16, 2019
ffbff35
Update HttpInput.kt
ann3431 Jul 17, 2019
67ff5fe
Update README.md
ann3431 Jul 17, 2019
00de8d1
Added field validation in HttpInput and test
ann3431 Jul 18, 2019
cc1821a
Merge branch 'dev' of https://github.com/ann3431/alerting into dev
ann3431 Jul 18, 2019
8e7a44a
Update HttpExtensions.kt
ann3431 Jul 18, 2019
a9a0596
Implementation of reusing AsyncClient
ann3431 Jul 18, 2019
5ff1f9f
Restrict port of localhost to 9200
ann3431 Jul 19, 2019
312c93e
Changed timeout values to units of sec
ann3431 Jul 19, 2019
d6cffe1
Update HttpInput.kt
ann3431 Jul 19, 2019
6bb598d
Merge branch 'reuse-client' into dev
ann3431 Jul 19, 2019
97c897e
Fixed timeout values in tests
ann3431 Jul 19, 2019
3d9b076
Merge pull request #2 from ann3431/dev
ann3431 Jul 19, 2019
cd30e8c
Change max connection timeout to 5s, close Client
ann3431 Jul 19, 2019
01fcb38
Limit response size and closing client
ann3431 Jul 19, 2019
005b5c2
Merge branch 'dev'
ann3431 Jul 19, 2019
f8dbc8f
Change log level to debug
ann3431 Jul 24, 2019
0d08d15
Finished async implementation
ann3431 Jul 25, 2019
7aeb3be
Merge branch 'async'
ann3431 Jul 25, 2019
3e1dcc3
Moved port validation logic to RestIndexMonitorAction
ann3431 Jul 31, 2019
9f750b3
Style fix
ann3431 Jul 31, 2019
9fc5d79
Update MonitorRunner.kt
ann3431 Jul 31, 2019
7573b46
Revert "Update MonitorRunner.kt"
ann3431 Jul 31, 2019
7c13e92
Created toConstuctUrl() for use
ann3431 Aug 1, 2019
430d718
Removed unnecessary if statement
ann3431 Aug 1, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ Please see our [documentation](https://opendistro.github.io/for-elasticsearch-do
1. Check out this package from version control.
1. Launch Intellij IDEA, choose **Import Project**, and select the `settings.gradle` file in the root of this package.
1. To build from the command line, set `JAVA_HOME` to point to a JDK >= 12 before running `./gradlew`.
- Unix System
1. `export JAVA_HOME=jdk-install-dir`: Replace `jdk-install-dir` by the JAVA_HOME directory of your system.
1. `export PATH=$JAVA_HOME/bin:$PATH`

- Windows System
1. Find **My Computers** from file directory, right click and select **properties**.
1. Select the **Advanced** tab, select **Environment variables**.
1. Edit **JAVA_HOME** to path of where JDK software is installed.


## Build
Expand Down
6 changes: 5 additions & 1 deletion alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ configurations.all {
force "commons-logging:commons-logging:${versions.commonslogging}"
force "org.apache.httpcomponents:httpcore:${versions.httpcore}"
force "commons-codec:commons-codec:${versions.commonscodec}"

force "commons-collections:commons-collections:3.2.2"
force "org.apache.httpcomponents:httpcore-nio:4.4.11"
force "org.apache.httpcomponents:httpclient:4.5.7"
ann3431 marked this conversation as resolved.
Show resolved Hide resolved

// This is required because kotlin coroutines core-1.1.1 still requires kotin stdlib 1.3.20 and we're using 1.3.21
force "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
force "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
Expand All @@ -58,6 +61,7 @@ configurations.all {

dependencies {
compileOnly "org.elasticsearch.plugin:elasticsearch-scripting-painless-spi:${versions.elasticsearch}"
compile "org.apache.httpcomponents:httpasyncclient:4.1.4"

// Elasticsearch Nanny state
compile "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.amazon.opendistroforelasticsearch.alerting.core.JobSweeper
import com.amazon.opendistroforelasticsearch.alerting.core.ScheduledJobIndices
import com.amazon.opendistroforelasticsearch.alerting.core.action.node.ScheduledJobsStatsAction
import com.amazon.opendistroforelasticsearch.alerting.core.action.node.ScheduledJobsStatsTransportAction
import com.amazon.opendistroforelasticsearch.alerting.core.model.HttpInput
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
import com.amazon.opendistroforelasticsearch.alerting.core.resthandler.RestScheduledJobStatsHandler
Expand Down Expand Up @@ -118,7 +119,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, P
}

override fun getNamedXContent(): List<NamedXContentRegistry.Entry> {
return listOf(Monitor.XCONTENT_REGISTRY, SearchInput.XCONTENT_REGISTRY)
return listOf(Monitor.XCONTENT_REGISTRY, SearchInput.XCONTENT_REGISTRY, HttpInput.XCONTENT_REGISTRY)
}

override fun createComponents(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ package com.amazon.opendistroforelasticsearch.alerting
import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertError
import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertIndices
import com.amazon.opendistroforelasticsearch.alerting.alerts.moveAlerts
import com.amazon.opendistroforelasticsearch.alerting.client.HttpInputClient
import com.amazon.opendistroforelasticsearch.alerting.core.JobRunner
import com.amazon.opendistroforelasticsearch.alerting.core.httpapi.suspendUntil
import com.amazon.opendistroforelasticsearch.alerting.core.httpapi.toGetRequest
import com.amazon.opendistroforelasticsearch.alerting.core.httpapi.toMap
import com.amazon.opendistroforelasticsearch.alerting.core.model.HttpInput
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
Expand Down Expand Up @@ -51,13 +56,14 @@ import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_MILLIS
import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils
import org.apache.logging.log4j.LogManager
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.apache.http.HttpResponse
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ExceptionsHelper
import org.elasticsearch.action.DocWriteRequest
import org.elasticsearch.action.bulk.BackoffPolicy
Expand Down Expand Up @@ -105,7 +111,7 @@ class MonitorRunner(
) : JobRunner, CoroutineScope, AbstractLifecycleComponent() {

private val logger = LogManager.getLogger(MonitorRunner::class.java)

private var httpClient: HttpInputClient
ann3431 marked this conversation as resolved.
Show resolved Hide resolved
private lateinit var runnerSupervisor: Job
override val coroutineContext: CoroutineContext
get() = Dispatchers.Default + runnerSupervisor
Expand All @@ -122,14 +128,17 @@ class MonitorRunner(
clusterService.clusterSettings.addSettingsUpdateConsumer(MOVE_ALERTS_BACKOFF_MILLIS, MOVE_ALERTS_BACKOFF_COUNT) {
millis, count -> moveAlertsRetryPolicy = BackoffPolicy.exponentialBackoff(millis, count)
}
httpClient = HttpInputClient()
}

override fun doStart() {
runnerSupervisor = SupervisorJob()
httpClient.client.start()
}

override fun doStop() {
runnerSupervisor.cancel()
httpClient.client.close()
}

override fun doClose() { }
Expand Down Expand Up @@ -292,6 +301,29 @@ class MonitorRunner(
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
results += searchResponse.convertToMap()
}
is HttpInput -> {
val response: HttpResponse = httpClient.client.suspendUntil {
ann3431 marked this conversation as resolved.
Show resolved Hide resolved
httpClient.client.execute(input.toGetRequest(), it)
}
// Make sure response content length is not larger than 100MB
val contentLengthHeader = response.getFirstHeader("Content-Length").value

// Use content-length header to check size. If content-length header does not exist, set Alert in Error state.
if (contentLengthHeader != null) {
logger.debug("Content length is $contentLengthHeader")
val contentLength = contentLengthHeader.toInt()
if (contentLength > httpClient.MAX_CONTENT_LENGTH) {
throw Exception("Response content size: $contentLength, is larger than ${httpClient.MAX_CONTENT_LENGTH}.")
}
} else {
logger.debug("Content-length header does not exist, set alert to error state.")
throw IllegalArgumentException("Response does not contain content-length header.")
}

results += withContext(Dispatchers.IO) {
response.toMap()
}
}
else -> {
throw IllegalArgumentException("Unsupported input type: ${input.name()}.")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.alerting.client

import com.amazon.opendistroforelasticsearch.alerting.core.model.HttpInput
import org.apache.http.client.config.RequestConfig
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import org.elasticsearch.common.unit.ByteSizeUnit
import org.elasticsearch.common.unit.TimeValue
import java.security.AccessController
import java.security.PrivilegedAction

/**
* This class takes [HttpInput] and performs GET requests to given URIs.
*/
class HttpInputClient {
ann3431 marked this conversation as resolved.
Show resolved Hide resolved

// TODO: If possible, these settings should be implemented as changeable via the "_cluster/settings" API.
private val CONNECTION_TIMEOUT_MILLISECONDS = TimeValue.timeValueSeconds(5).millis().toInt()
private val REQUEST_TIMEOUT_MILLISECONDS = TimeValue.timeValueSeconds(10).millis().toInt()
private val SOCKET_TIMEOUT_MILLISECONDS = TimeValue.timeValueSeconds(10).millis().toInt()
val MAX_CONTENT_LENGTH = ByteSizeUnit.MB.toBytes(100)

val client = createHttpClient()

/**
* Create [CloseableHttpAsyncClient] as a [PrivilegedAction] in order to avoid [java.net.NetPermission] error.
*/
private fun createHttpClient(): CloseableHttpAsyncClient {
val config = RequestConfig.custom()
.setConnectTimeout(CONNECTION_TIMEOUT_MILLISECONDS)
.setConnectionRequestTimeout(REQUEST_TIMEOUT_MILLISECONDS)
.setSocketTimeout(SOCKET_TIMEOUT_MILLISECONDS)
.build()

return AccessController.doPrivileged(PrivilegedAction<CloseableHttpAsyncClient>({
HttpAsyncClientBuilder.create()
.setDefaultRequestConfig(config)
.useSystemProperties()
.build()
} as () -> CloseableHttpAsyncClient))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,25 @@
*/
package com.amazon.opendistroforelasticsearch.alerting.resthandler

import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin
import com.amazon.opendistroforelasticsearch.alerting.core.ScheduledJobIndices
import com.amazon.opendistroforelasticsearch.alerting.core.httpapi.toConstructedUrl
import com.amazon.opendistroforelasticsearch.alerting.core.model.HttpInput
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import com.amazon.opendistroforelasticsearch.alerting.model.Monitor
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.ALERTING_MAX_MONITORS
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEOUT
import com.amazon.opendistroforelasticsearch.alerting.util.REFRESH
import com.amazon.opendistroforelasticsearch.alerting.util._ID
import com.amazon.opendistroforelasticsearch.alerting.util._VERSION
import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MAX_ACTION_THROTTLE_VALUE
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEOUT
import com.amazon.opendistroforelasticsearch.alerting.util.IF_PRIMARY_TERM
import com.amazon.opendistroforelasticsearch.alerting.util.IF_SEQ_NO
import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils
import com.amazon.opendistroforelasticsearch.alerting.util.REFRESH
import com.amazon.opendistroforelasticsearch.alerting.util._ID
import com.amazon.opendistroforelasticsearch.alerting.util._PRIMARY_TERM
import com.amazon.opendistroforelasticsearch.alerting.util._SEQ_NO
import com.amazon.opendistroforelasticsearch.alerting.util._VERSION
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse
Expand Down Expand Up @@ -76,7 +78,7 @@ private val log = LogManager.getLogger(RestIndexMonitorAction::class.java)
* Rest handlers to create and update monitors.
*/
class RestIndexMonitorAction(
settings: Settings,
val settings: Settings,
controller: RestController,
jobIndices: ScheduledJobIndices,
clusterService: ClusterService
Expand Down Expand Up @@ -159,6 +161,7 @@ class RestIndexMonitorAction(
*/
private fun prepareMonitorIndexing() {
validateActionThrottle(newMonitor, maxActionThrottle, TimeValue.timeValueMinutes(1))
validateLocalPort(newMonitor, settings.get("http.port").toInt())
if (channel.request().method() == PUT) return updateMonitor()
val query = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("${Monitor.MONITOR_TYPE}.type", Monitor.MONITOR_TYPE))
val searchSource = SearchSourceBuilder().query(query).timeout(requestTimeout)
Expand All @@ -180,6 +183,23 @@ class RestIndexMonitorAction(
}
}

/**
* This function checks whether the [Monitor] has an [HttpInput] with localhost. If so, make sure the port is same as specified in settings.
*/
private fun validateLocalPort(monitor: Monitor, settingsPort: Int) {
for (input in monitor.inputs) {
if (input is HttpInput) {
val constructedUrl = input.toConstructedUrl()
// Make sure that when host is "localhost", only port number specified in settings is allowed.
if (constructedUrl.host == "localhost") {
require(constructedUrl.port == settingsPort) {
"Host: ${constructedUrl.host} is restricted to port $settingsPort."
}
}
}
}
}

/**
* After searching for all existing monitors we validate the system can support another monitor to be created.
*/
Expand Down
Loading