Skip to content

Commit

Permalink
Implemented cross-cluster monitor support (opensearch-project#1404)
Browse files Browse the repository at this point in the history
* Updated alert mappings to accommodate cross-cluster cluster metrics monitors.

Signed-off-by: AWSHurneyt <[email protected]>

* Implemented support for cross-cluster cluster metrics monitors. Implemented GetRemoteIndexes API to populate the frontend UI with details regarding the remote clusters, and indexes.

Signed-off-by: AWSHurneyt <[email protected]>

* Fixed a writeable test after changing QueryLevelTriggerRunResult from a data class to an open class for inheritability.

Signed-off-by: AWSHurneyt <[email protected]>

* Fixed ktlint errors.

Signed-off-by: AWSHurneyt <[email protected]>

* Removed changes to IndexUtils as they're only needed by doc monitors.

Signed-off-by: AWSHurneyt <[email protected]>

---------

Signed-off-by: AWSHurneyt <[email protected]>
  • Loading branch information
AWSHurneyt authored Feb 6, 2024
1 parent b561965 commit ea36996
Show file tree
Hide file tree
Showing 16 changed files with 933 additions and 14 deletions.
21 changes: 19 additions & 2 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.ChainedAlertTriggerRunResult
import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.opensearchapi.firstFailureOrNull
import org.opensearch.alerting.opensearchapi.retry
Expand Down Expand Up @@ -190,6 +191,19 @@ class AlertService(
)
}

// Including a list of triggered clusters for cluster metrics monitors
var triggeredClusters: MutableList<String>? = null
if (result is ClusterMetricsTriggerRunResult)
result.clusterTriggerResults.forEach {
if (it.triggered) {
// Add an empty list if one isn't already present
if (triggeredClusters.isNullOrEmpty()) triggeredClusters = mutableListOf()

// Add the cluster to the list of triggered clusters
triggeredClusters!!.add(it.cluster)
}
}

// Merge the alert's error message to the current alert's history
val updatedHistory = currentAlert?.errorHistory.update(alertError)
return if (alertError == null && !result.triggered) {
Expand All @@ -199,7 +213,8 @@ class AlertService(
errorMessage = null,
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion
schemaVersion = IndexUtils.alertIndexSchemaVersion,
clusters = triggeredClusters
)
} else if (alertError == null && currentAlert?.isAcknowledged() == true) {
null
Expand All @@ -212,6 +227,7 @@ class AlertService(
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion,
clusters = triggeredClusters
)
} else {
val alertState = if (workflorwRunContext?.auditDelegateMonitorAlerts == true) {
Expand All @@ -223,7 +239,8 @@ class AlertService(
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion, executionId = executionId,
workflowId = workflorwRunContext?.workflowId ?: ""
workflowId = workflorwRunContext?.workflowId ?: "",
clusters = triggeredClusters
)
}
}
Expand Down
13 changes: 10 additions & 3 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.opensearch.alerting.action.ExecuteWorkflowAction
import org.opensearch.alerting.action.GetDestinationsAction
import org.opensearch.alerting.action.GetEmailAccountAction
import org.opensearch.alerting.action.GetEmailGroupAction
import org.opensearch.alerting.action.GetRemoteIndexesAction
import org.opensearch.alerting.action.SearchEmailAccountAction
import org.opensearch.alerting.action.SearchEmailGroupAction
import org.opensearch.alerting.alerts.AlertIndices
Expand All @@ -34,6 +35,7 @@ import org.opensearch.alerting.resthandler.RestGetEmailAccountAction
import org.opensearch.alerting.resthandler.RestGetEmailGroupAction
import org.opensearch.alerting.resthandler.RestGetFindingsAction
import org.opensearch.alerting.resthandler.RestGetMonitorAction
import org.opensearch.alerting.resthandler.RestGetRemoteIndexesAction
import org.opensearch.alerting.resthandler.RestGetWorkflowAction
import org.opensearch.alerting.resthandler.RestGetWorkflowAlertsAction
import org.opensearch.alerting.resthandler.RestIndexMonitorAction
Expand All @@ -59,6 +61,7 @@ import org.opensearch.alerting.transport.TransportGetEmailAccountAction
import org.opensearch.alerting.transport.TransportGetEmailGroupAction
import org.opensearch.alerting.transport.TransportGetFindingsSearchAction
import org.opensearch.alerting.transport.TransportGetMonitorAction
import org.opensearch.alerting.transport.TransportGetRemoteIndexesAction
import org.opensearch.alerting.transport.TransportGetWorkflowAction
import org.opensearch.alerting.transport.TransportGetWorkflowAlertsAction
import org.opensearch.alerting.transport.TransportIndexMonitorAction
Expand Down Expand Up @@ -132,6 +135,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
@JvmField val UI_METADATA_EXCLUDE = arrayOf("monitor.${Monitor.UI_METADATA_FIELD}")
@JvmField val MONITOR_BASE_URI = "/_plugins/_alerting/monitors"
@JvmField val WORKFLOW_BASE_URI = "/_plugins/_alerting/workflows"
@JvmField val REMOTE_BASE_URI = "/_plugins/_alerting/remote"
@JvmField val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations"
@JvmField val LEGACY_OPENDISTRO_MONITOR_BASE_URI = "/_opendistro/_alerting/monitors"
@JvmField val LEGACY_OPENDISTRO_DESTINATION_BASE_URI = "/_opendistro/_alerting/destinations"
Expand Down Expand Up @@ -183,7 +187,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
RestGetWorkflowAlertsAction(),
RestGetFindingsAction(),
RestGetWorkflowAction(),
RestDeleteWorkflowAction()
RestDeleteWorkflowAction(),
RestGetRemoteIndexesAction(),
)
}

Expand All @@ -210,7 +215,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java),
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java)
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java),
ActionPlugin.ActionHandler(GetRemoteIndexesAction.INSTANCE, TransportGetRemoteIndexesAction::class.java),
)
}

Expand Down Expand Up @@ -347,7 +353,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE,
AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD,
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD,
AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE
AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE,
AlertingSettings.REMOTE_MONITORING_ENABLED
)
}

Expand Down
43 changes: 39 additions & 4 deletions alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@

package org.opensearch.alerting

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.TriggerAfterKey
import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AggregationQueryRewriter
import org.opensearch.alerting.util.CrossClusterMonitorUtils
import org.opensearch.alerting.util.addUserBackendRolesFilter
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.executeTransportAction
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap
Expand Down Expand Up @@ -44,6 +49,8 @@ import org.opensearch.script.TemplateScript
import org.opensearch.search.builder.SearchSourceBuilder
import java.time.Instant

private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)

/** Service that handles the collection of input results for Monitor executions */
class InputService(
val client: Client,
Expand Down Expand Up @@ -101,8 +108,9 @@ class InputService(
.newInstance(searchParams)
.execute()

val indexes = CrossClusterMonitorUtils.parseIndexesForRemoteSearch(input.indices, clusterService)
val searchRequest = SearchRequest()
.indices(*input.indices.toTypedArray())
.indices(*indexes.toTypedArray())
.preference(Preference.PRIMARY_FIRST.type())
XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use {
searchRequest.source(SearchSourceBuilder.fromXContent(it))
Expand All @@ -116,9 +124,36 @@ class InputService(
results += searchResponse.convertToMap()
}
is ClusterMetricsInput -> {
logger.debug("ClusterMetricsInput clusterMetricType: ${input.clusterMetricType}")
val response = executeTransportAction(input, client)
results += response.toMap()
logger.debug("ClusterMetricsInput clusterMetricType: {}", input.clusterMetricType)

val remoteMonitoringEnabled = clusterService.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED)
logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled)

val responseMap = mutableMapOf<String, Map<String, Any>>()
if (remoteMonitoringEnabled && input.clusters.isNotEmpty()) {
client.threadPool().threadContext.stashContext().use {
scope.launch {
input.clusters.forEach { cluster ->
val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService)
val response = executeTransportAction(input, targetClient)
// Not all supported API reference the cluster name in their response.
// Mapping each response to the cluster name before adding to results.
// Not adding this same logic for local-only monitors to avoid breaking existing monitors.
responseMap[cluster] = response.toMap()
}
}
}
val inputTimeout = clusterService.clusterSettings.get(AlertingSettings.INPUT_TIMEOUT)
val startTime = Instant.now().toEpochMilli()
while (
(Instant.now().toEpochMilli() - startTime >= inputTimeout.millis) ||
(responseMap.size < input.clusters.size)
) { /* Wait for responses */ }
results += responseMap
} else {
val response = executeTransportAction(input, client)
results += response.toMap()
}
}
else -> {
throw IllegalArgumentException("Unsupported input type: ${input.name()}.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.opensearchapi.InjectorContextElement
import org.opensearch.alerting.opensearchapi.withClosableContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.isADMonitor
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.commons.alerting.model.Alert
Expand Down Expand Up @@ -65,7 +66,21 @@ object QueryLevelMonitorRunner : MonitorRunner() {
for (trigger in monitor.triggers) {
val currentAlert = currentAlerts[trigger]
val triggerCtx = QueryLevelTriggerExecutionContext(monitor, trigger as QueryLevelTrigger, monitorResult, currentAlert)
val triggerResult = monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
val triggerResult = when (monitor.monitorType) {
Monitor.MonitorType.QUERY_LEVEL_MONITOR ->
monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> {
val remoteMonitoringEnabled =
monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED)
logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled)
if (remoteMonitoringEnabled)
monitorCtx.triggerService!!.runClusterMetricsTrigger(monitor, trigger, triggerCtx, monitorCtx.clusterService!!)
else monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
}
else ->
throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType.name}.")
}

triggerResults[trigger.id] = triggerResult

if (monitorCtx.triggerService!!.isQueryLevelTriggerActionable(triggerCtx, triggerResult, workflowRunContext)) {
Expand Down
50 changes: 50 additions & 0 deletions alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,19 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.chainedAlertCondition.parsers.ChainedAlertExpressionParser
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.ChainedAlertTriggerRunResult
import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult
import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult.ClusterTriggerResult
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.script.ChainedAlertTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.script.TriggerScript
import org.opensearch.alerting.triggercondition.parsers.TriggerExpressionParser
import org.opensearch.alerting.util.CrossClusterMonitorUtils
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.cluster.service.ClusterService
import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorIndices.Fields.BUCKET_INDICES
import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorIndices.Fields.PARENT_BUCKET_PATH
import org.opensearch.commons.alerting.model.AggregationResultBucket
Expand Down Expand Up @@ -79,6 +83,52 @@ class TriggerService(val scriptService: ScriptService) {
}
}

fun runClusterMetricsTrigger(
monitor: Monitor,
trigger: QueryLevelTrigger,
ctx: QueryLevelTriggerExecutionContext,
clusterService: ClusterService
): ClusterMetricsTriggerRunResult {
var runResult: ClusterMetricsTriggerRunResult?
try {
val inputResults = ctx.results.getOrElse(0) { mapOf() }
var triggered = false
val clusterTriggerResults = mutableListOf<ClusterTriggerResult>()
if (CrossClusterMonitorUtils.isRemoteMonitor(monitor, clusterService)) {
inputResults.forEach { clusterResult ->
// Reducing the inputResults to only include results from 1 cluster at a time
val clusterTriggerCtx = ctx.copy(results = listOf(mapOf(clusterResult.toPair())))

val clusterTriggered = scriptService.compile(trigger.condition, TriggerScript.CONTEXT)
.newInstance(trigger.condition.params)
.execute(clusterTriggerCtx)

if (clusterTriggered) {
triggered = clusterTriggered
clusterTriggerResults.add(ClusterTriggerResult(cluster = clusterResult.key, triggered = clusterTriggered))
}
}
} else {
triggered = scriptService.compile(trigger.condition, TriggerScript.CONTEXT)
.newInstance(trigger.condition.params)
.execute(ctx)
if (triggered) clusterTriggerResults
.add(ClusterTriggerResult(cluster = clusterService.clusterName.value(), triggered = triggered))
}
runResult = ClusterMetricsTriggerRunResult(
triggerName = trigger.name,
triggered = triggered,
error = null,
clusterTriggerResults = clusterTriggerResults
)
} catch (e: Exception) {
logger.info("Error running script for monitor ${monitor.id}, trigger: ${trigger.id}", e)
// if the script fails we need to send an alert so set triggered = true
runResult = ClusterMetricsTriggerRunResult(trigger.name, true, e)
}
return runResult!!
}

// TODO: improve performance and support match all and match any
fun runDocLevelTrigger(
monitor: Monitor,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.action

import org.opensearch.action.ActionType

class GetRemoteIndexesAction private constructor() : ActionType<GetRemoteIndexesResponse>(NAME, ::GetRemoteIndexesResponse) {
companion object {
val INSTANCE = GetRemoteIndexesAction()
const val NAME = "cluster:admin/opensearch/alerting/remote/indexes/get"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import java.io.IOException

class GetRemoteIndexesRequest : ActionRequest {
var indexes: List<String> = listOf()
var includeMappings: Boolean

constructor(indexes: List<String>, includeMappings: Boolean) : super() {
this.indexes = indexes
this.includeMappings = includeMappings
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readStringList(),
sin.readBoolean()
)

override fun validate(): ActionRequestValidationException? {
return null
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeStringArray(indexes.toTypedArray())
out.writeBoolean(includeMappings)
}

companion object {
const val INDEXES_FIELD = "indexes"
const val INCLUDE_MAPPINGS_FIELD = "include_mappings"
}
}
Loading

0 comments on commit ea36996

Please sign in to comment.