Skip to content

Commit

Permalink
Adding new type of input for Monitors - HttpInput (opendistro-for-ela…
Browse files Browse the repository at this point in the history
  • Loading branch information
ann3431 authored and lucaswin-amzn committed Aug 1, 2019
1 parent c0d0e8f commit c2004f7
Show file tree
Hide file tree
Showing 12 changed files with 601 additions and 13 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ Please see our [documentation](https://opendistro.github.io/for-elasticsearch-do
1. Check out this package from version control.
1. Launch Intellij IDEA, choose **Import Project**, and select the `settings.gradle` file in the root of this package.
1. To build from the command line, set `JAVA_HOME` to point to a JDK >= 12 before running `./gradlew`.
- Unix System
1. `export JAVA_HOME=jdk-install-dir`: Replace `jdk-install-dir` by the JAVA_HOME directory of your system.
1. `export PATH=$JAVA_HOME/bin:$PATH`

- Windows System
1. Find **My Computers** from file directory, right click and select **properties**.
1. Select the **Advanced** tab, select **Environment variables**.
1. Edit **JAVA_HOME** to path of where JDK software is installed.


## Build
Expand Down
6 changes: 5 additions & 1 deletion alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ configurations.all {
force "commons-logging:commons-logging:${versions.commonslogging}"
force "org.apache.httpcomponents:httpcore:${versions.httpcore}"
force "commons-codec:commons-codec:${versions.commonscodec}"

force "commons-collections:commons-collections:3.2.2"
force "org.apache.httpcomponents:httpcore-nio:4.4.11"
force "org.apache.httpcomponents:httpclient:4.5.7"

// This is required because kotlin coroutines core-1.1.1 still requires kotin stdlib 1.3.20 and we're using 1.3.21
force "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
force "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
Expand All @@ -58,6 +61,7 @@ configurations.all {

dependencies {
compileOnly "org.elasticsearch.plugin:elasticsearch-scripting-painless-spi:${versions.elasticsearch}"
compile "org.apache.httpcomponents:httpasyncclient:4.1.4"

// Elasticsearch Nanny state
compile "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.amazon.opendistroforelasticsearch.alerting.core.JobSweeper
import com.amazon.opendistroforelasticsearch.alerting.core.ScheduledJobIndices
import com.amazon.opendistroforelasticsearch.alerting.core.action.node.ScheduledJobsStatsAction
import com.amazon.opendistroforelasticsearch.alerting.core.action.node.ScheduledJobsStatsTransportAction
import com.amazon.opendistroforelasticsearch.alerting.core.model.HttpInput
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
import com.amazon.opendistroforelasticsearch.alerting.core.resthandler.RestScheduledJobStatsHandler
Expand Down Expand Up @@ -118,7 +119,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, P
}

override fun getNamedXContent(): List<NamedXContentRegistry.Entry> {
return listOf(Monitor.XCONTENT_REGISTRY, SearchInput.XCONTENT_REGISTRY)
return listOf(Monitor.XCONTENT_REGISTRY, SearchInput.XCONTENT_REGISTRY, HttpInput.XCONTENT_REGISTRY)
}

override fun createComponents(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ package com.amazon.opendistroforelasticsearch.alerting
import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertError
import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertIndices
import com.amazon.opendistroforelasticsearch.alerting.alerts.moveAlerts
import com.amazon.opendistroforelasticsearch.alerting.client.HttpInputClient
import com.amazon.opendistroforelasticsearch.alerting.core.JobRunner
import com.amazon.opendistroforelasticsearch.alerting.core.httpapi.suspendUntil
import com.amazon.opendistroforelasticsearch.alerting.core.httpapi.toGetRequest
import com.amazon.opendistroforelasticsearch.alerting.core.httpapi.toMap
import com.amazon.opendistroforelasticsearch.alerting.core.model.HttpInput
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
Expand Down Expand Up @@ -51,13 +56,14 @@ import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_MILLIS
import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils
import org.apache.logging.log4j.LogManager
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.apache.http.HttpResponse
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ExceptionsHelper
import org.elasticsearch.action.DocWriteRequest
import org.elasticsearch.action.bulk.BackoffPolicy
Expand Down Expand Up @@ -105,7 +111,7 @@ class MonitorRunner(
) : JobRunner, CoroutineScope, AbstractLifecycleComponent() {

private val logger = LogManager.getLogger(MonitorRunner::class.java)

private var httpClient: HttpInputClient
private lateinit var runnerSupervisor: Job
override val coroutineContext: CoroutineContext
get() = Dispatchers.Default + runnerSupervisor
Expand All @@ -122,14 +128,17 @@ class MonitorRunner(
clusterService.clusterSettings.addSettingsUpdateConsumer(MOVE_ALERTS_BACKOFF_MILLIS, MOVE_ALERTS_BACKOFF_COUNT) {
millis, count -> moveAlertsRetryPolicy = BackoffPolicy.exponentialBackoff(millis, count)
}
httpClient = HttpInputClient()
}

override fun doStart() {
runnerSupervisor = SupervisorJob()
httpClient.client.start()
}

override fun doStop() {
runnerSupervisor.cancel()
httpClient.client.close()
}

override fun doClose() { }
Expand Down Expand Up @@ -292,6 +301,29 @@ class MonitorRunner(
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
results += searchResponse.convertToMap()
}
is HttpInput -> {
val response: HttpResponse = httpClient.client.suspendUntil {
httpClient.client.execute(input.toGetRequest(), it)
}
// Make sure response content length is not larger than 100MB
val contentLengthHeader = response.getFirstHeader("Content-Length").value

// Use content-length header to check size. If content-length header does not exist, set Alert in Error state.
if (contentLengthHeader != null) {
logger.debug("Content length is $contentLengthHeader")
val contentLength = contentLengthHeader.toInt()
if (contentLength > httpClient.MAX_CONTENT_LENGTH) {
throw Exception("Response content size: $contentLength, is larger than ${httpClient.MAX_CONTENT_LENGTH}.")
}
} else {
logger.debug("Content-length header does not exist, set alert to error state.")
throw IllegalArgumentException("Response does not contain content-length header.")
}

results += withContext(Dispatchers.IO) {
response.toMap()
}
}
else -> {
throw IllegalArgumentException("Unsupported input type: ${input.name()}.")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.alerting.client

import com.amazon.opendistroforelasticsearch.alerting.core.model.HttpInput
import org.apache.http.client.config.RequestConfig
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import org.elasticsearch.common.unit.ByteSizeUnit
import org.elasticsearch.common.unit.TimeValue
import java.security.AccessController
import java.security.PrivilegedAction

/**
* This class takes [HttpInput] and performs GET requests to given URIs.
*/
class HttpInputClient {

// TODO: If possible, these settings should be implemented as changeable via the "_cluster/settings" API.
private val CONNECTION_TIMEOUT_MILLISECONDS = TimeValue.timeValueSeconds(5).millis().toInt()
private val REQUEST_TIMEOUT_MILLISECONDS = TimeValue.timeValueSeconds(10).millis().toInt()
private val SOCKET_TIMEOUT_MILLISECONDS = TimeValue.timeValueSeconds(10).millis().toInt()
val MAX_CONTENT_LENGTH = ByteSizeUnit.MB.toBytes(100)

val client = createHttpClient()

/**
* Create [CloseableHttpAsyncClient] as a [PrivilegedAction] in order to avoid [java.net.NetPermission] error.
*/
private fun createHttpClient(): CloseableHttpAsyncClient {
val config = RequestConfig.custom()
.setConnectTimeout(CONNECTION_TIMEOUT_MILLISECONDS)
.setConnectionRequestTimeout(REQUEST_TIMEOUT_MILLISECONDS)
.setSocketTimeout(SOCKET_TIMEOUT_MILLISECONDS)
.build()

return AccessController.doPrivileged(PrivilegedAction<CloseableHttpAsyncClient>({
HttpAsyncClientBuilder.create()
.setDefaultRequestConfig(config)
.useSystemProperties()
.build()
} as () -> CloseableHttpAsyncClient))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,25 @@
*/
package com.amazon.opendistroforelasticsearch.alerting.resthandler

import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin
import com.amazon.opendistroforelasticsearch.alerting.core.ScheduledJobIndices
import com.amazon.opendistroforelasticsearch.alerting.core.httpapi.toConstructedUrl
import com.amazon.opendistroforelasticsearch.alerting.core.model.HttpInput
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import com.amazon.opendistroforelasticsearch.alerting.model.Monitor
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.ALERTING_MAX_MONITORS
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEOUT
import com.amazon.opendistroforelasticsearch.alerting.util.REFRESH
import com.amazon.opendistroforelasticsearch.alerting.util._ID
import com.amazon.opendistroforelasticsearch.alerting.util._VERSION
import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MAX_ACTION_THROTTLE_VALUE
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEOUT
import com.amazon.opendistroforelasticsearch.alerting.util.IF_PRIMARY_TERM
import com.amazon.opendistroforelasticsearch.alerting.util.IF_SEQ_NO
import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils
import com.amazon.opendistroforelasticsearch.alerting.util.REFRESH
import com.amazon.opendistroforelasticsearch.alerting.util._ID
import com.amazon.opendistroforelasticsearch.alerting.util._PRIMARY_TERM
import com.amazon.opendistroforelasticsearch.alerting.util._SEQ_NO
import com.amazon.opendistroforelasticsearch.alerting.util._VERSION
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse
Expand Down Expand Up @@ -76,7 +78,7 @@ private val log = LogManager.getLogger(RestIndexMonitorAction::class.java)
* Rest handlers to create and update monitors.
*/
class RestIndexMonitorAction(
settings: Settings,
val settings: Settings,
controller: RestController,
jobIndices: ScheduledJobIndices,
clusterService: ClusterService
Expand Down Expand Up @@ -159,6 +161,7 @@ class RestIndexMonitorAction(
*/
private fun prepareMonitorIndexing() {
validateActionThrottle(newMonitor, maxActionThrottle, TimeValue.timeValueMinutes(1))
validateLocalPort(newMonitor, settings.get("http.port").toInt())
if (channel.request().method() == PUT) return updateMonitor()
val query = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("${Monitor.MONITOR_TYPE}.type", Monitor.MONITOR_TYPE))
val searchSource = SearchSourceBuilder().query(query).timeout(requestTimeout)
Expand All @@ -180,6 +183,23 @@ class RestIndexMonitorAction(
}
}

/**
* This function checks whether the [Monitor] has an [HttpInput] with localhost. If so, make sure the port is same as specified in settings.
*/
private fun validateLocalPort(monitor: Monitor, settingsPort: Int) {
for (input in monitor.inputs) {
if (input is HttpInput) {
val constructedUrl = input.toConstructedUrl()
// Make sure that when host is "localhost", only port number specified in settings is allowed.
if (constructedUrl.host == "localhost") {
require(constructedUrl.port == settingsPort) {
"Host: ${constructedUrl.host} is restricted to port $settingsPort."
}
}
}
}
}

/**
* After searching for all existing monitors we validate the system can support another monitor to be created.
*/
Expand Down
Loading

0 comments on commit c2004f7

Please sign in to comment.