Skip to content

Commit

Permalink
changes to add support for remote monitors in alerting (#662)
Browse files Browse the repository at this point in the history
* changes to add support for remote monitors in alerting

Signed-off-by: Subhobrata Dey <[email protected]>

* add tests for moved classes

Signed-off-by: Subhobrata Dey <[email protected]>

---------

Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 authored May 28, 2024
1 parent 3a69aab commit 7706019
Show file tree
Hide file tree
Showing 22 changed files with 1,872 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.alerting.action

import org.opensearch.action.ActionType

class DocLevelMonitorFanOutAction private constructor() : ActionType<DocLevelMonitorFanOutResponse>(NAME, ::DocLevelMonitorFanOutResponse) {
companion object {
val INSTANCE = DocLevelMonitorFanOutAction()
const val NAME = "cluster:admin/opensearch/alerting/monitor/doclevel/fanout"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.commons.alerting.model.IndexExecutionContext
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.MonitorMetadata
import org.opensearch.commons.alerting.model.WorkflowRunContext
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.index.shard.ShardId
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.ToXContentObject
import org.opensearch.core.xcontent.XContentBuilder
import java.io.IOException

class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
val monitor: Monitor
val dryRun: Boolean
val monitorMetadata: MonitorMetadata
val executionId: String
val indexExecutionContext: IndexExecutionContext?
val shardIds: List<ShardId>
val concreteIndicesSeenSoFar: List<String>
val workflowRunContext: WorkflowRunContext?

constructor(
monitor: Monitor,
dryRun: Boolean,
monitorMetadata: MonitorMetadata,
executionId: String,
indexExecutionContext: IndexExecutionContext?,
shardIds: List<ShardId>,
concreteIndicesSeenSoFar: List<String>,
workflowRunContext: WorkflowRunContext?
) : super() {
this.monitor = monitor
this.dryRun = dryRun
this.monitorMetadata = monitorMetadata
this.executionId = executionId
this.indexExecutionContext = indexExecutionContext
this.shardIds = shardIds
this.concreteIndicesSeenSoFar = concreteIndicesSeenSoFar
this.workflowRunContext = workflowRunContext
require(false == shardIds.isEmpty()) { }
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
monitor = Monitor.readFrom(sin)!!,
dryRun = sin.readBoolean(),
monitorMetadata = MonitorMetadata.readFrom(sin),
executionId = sin.readString(),
shardIds = sin.readList(::ShardId),
concreteIndicesSeenSoFar = sin.readStringList(),
workflowRunContext = if (sin.readBoolean()) {
WorkflowRunContext(sin)
} else { null },
indexExecutionContext = IndexExecutionContext(sin)
)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
monitor.writeTo(out)
out.writeBoolean(dryRun)
monitorMetadata.writeTo(out)
out.writeString(executionId)
out.writeCollection(shardIds)
out.writeStringCollection(concreteIndicesSeenSoFar)
out.writeBoolean(workflowRunContext != null)
workflowRunContext?.writeTo(out)
indexExecutionContext?.writeTo(out)
}

override fun validate(): ActionRequestValidationException? {
var actionValidationException: ActionRequestValidationException? = null
if (shardIds.isEmpty()) {
actionValidationException = ActionRequestValidationException()
actionValidationException.addValidationError("shard_ids is null or empty")
}
return actionValidationException
}

@Throws(IOException::class)
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.field("monitor", monitor)
.field("dry_run", dryRun)
.field("execution_id", executionId)
.field("index_execution_context", indexExecutionContext)
.field("shard_ids", shardIds)
.field("concrete_indices", concreteIndicesSeenSoFar)
.field("workflow_run_context", workflowRunContext)
return builder.endObject()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.alerting.action

import org.opensearch.commons.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.commons.alerting.model.InputRunResults
import org.opensearch.commons.alerting.util.AlertingException
import org.opensearch.core.action.ActionResponse
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.ToXContentObject
import org.opensearch.core.xcontent.XContentBuilder
import java.io.IOException

class DocLevelMonitorFanOutResponse : ActionResponse, ToXContentObject {
val nodeId: String
val executionId: String
val monitorId: String
val lastRunContexts: MutableMap<String, Any>
val inputResults: InputRunResults
val triggerResults: Map<String, DocumentLevelTriggerRunResult>
val exception: AlertingException?

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
nodeId = sin.readString(),
executionId = sin.readString(),
monitorId = sin.readString(),
lastRunContexts = sin.readMap()!! as MutableMap<String, Any>,
inputResults = InputRunResults.readFrom(sin),
triggerResults = suppressWarning(sin.readMap(StreamInput::readString, DocumentLevelTriggerRunResult::readFrom)),
exception = sin.readException()
)

constructor(
nodeId: String,
executionId: String,
monitorId: String,
lastRunContexts: MutableMap<String, Any>,
inputResults: InputRunResults = InputRunResults(), // partial,
triggerResults: Map<String, DocumentLevelTriggerRunResult> = mapOf(),
exception: AlertingException? = null
) : super() {
this.nodeId = nodeId
this.executionId = executionId
this.monitorId = monitorId
this.lastRunContexts = lastRunContexts
this.inputResults = inputResults
this.triggerResults = triggerResults
this.exception = exception
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(nodeId)
out.writeString(executionId)
out.writeString(monitorId)
out.writeMap(lastRunContexts)
inputResults.writeTo(out)
out.writeMap(
triggerResults,
StreamOutput::writeString,
{ stream, stats -> stats.writeTo(stream) }
)
out.writeException(exception)
}

@Throws(IOException::class)
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.field("node_id", nodeId)
.field("execution_id", executionId)
.field("monitor_id", monitorId)
.field("last_run_contexts", lastRunContexts)
.field("input_results", inputResults)
.field("trigger_results", triggerResults)
.field("exception", exception)
.endObject()
return builder
}

companion object {
@Suppress("UNCHECKED_CAST")
fun suppressWarning(map: MutableMap<String?, Any?>?): Map<String, DocumentLevelTriggerRunResult> {
return map as Map<String, DocumentLevelTriggerRunResult>
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.alerting.model

import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import java.io.IOException

data class BucketLevelTriggerRunResult(
override var triggerName: String,
override var error: Exception? = null,
var aggregationResultBuckets: Map<String, AggregationResultBucket>,
var actionResultsMap: MutableMap<String, MutableMap<String, ActionRunResult>> = mutableMapOf()
) : TriggerRunResult(triggerName, error) {

@Throws(IOException::class)
@Suppress("UNCHECKED_CAST")
constructor(sin: StreamInput) : this(
sin.readString(),
sin.readException() as Exception?, // error
sin.readMap(StreamInput::readString, ::AggregationResultBucket),
sin.readMap() as MutableMap<String, MutableMap<String, ActionRunResult>>
)

override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return builder
.field(AGG_RESULT_BUCKETS, aggregationResultBuckets)
.field(ACTIONS_RESULTS, actionResultsMap as Map<String, Any>)
}

@Throws(IOException::class)
@Suppress("UNCHECKED_CAST")
override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeMap(aggregationResultBuckets, StreamOutput::writeString) {
valueOut: StreamOutput, aggResultBucket: AggregationResultBucket ->
aggResultBucket.writeTo(valueOut)
}
out.writeMap(actionResultsMap as Map<String, Any>)
}

companion object {
const val AGG_RESULT_BUCKETS = "agg_result_buckets"
const val ACTIONS_RESULTS = "action_results"

@JvmStatic
@Throws(IOException::class)
fun readFrom(sin: StreamInput): TriggerRunResult {
return BucketLevelTriggerRunResult(sin)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.alerting.model

import org.opensearch.commons.alerting.alerts.AlertError
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.script.ScriptException
import java.io.IOException
import java.time.Instant

data class ChainedAlertTriggerRunResult(
override var triggerName: String,
var triggered: Boolean,
override var error: Exception?,
var actionResults: MutableMap<String, ActionRunResult> = mutableMapOf(),
val associatedAlertIds: Set<String>
) : TriggerRunResult(triggerName, error) {

@Throws(IOException::class)
@Suppress("UNCHECKED_CAST")
constructor(sin: StreamInput) : this(
triggerName = sin.readString(),
error = sin.readException(),
triggered = sin.readBoolean(),
actionResults = sin.readMap() as MutableMap<String, ActionRunResult>,
associatedAlertIds = sin.readStringList().toSet()
)

override fun alertError(): AlertError? {
if (error != null) {
return AlertError(Instant.now(), "Failed evaluating trigger:\n${error!!.userErrorMessage()}")
}
for (actionResult in actionResults.values) {
if (actionResult.error != null) {
return AlertError(Instant.now(), "Failed running action:\n${actionResult.error.userErrorMessage()}")
}
}
return null
}

override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
if (error is ScriptException) error = Exception((error as ScriptException).toJsonString(), error)
return builder
.field("triggered", triggered)
.field("action_results", actionResults as Map<String, ActionRunResult>)
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeBoolean(triggered)
out.writeMap(actionResults as Map<String, ActionRunResult>)
out.writeStringCollection(associatedAlertIds)
}

companion object {
@JvmStatic
@Throws(IOException::class)
fun readFrom(sin: StreamInput): TriggerRunResult {
return ChainedAlertTriggerRunResult(sin)
}
}
}
Loading

0 comments on commit 7706019

Please sign in to comment.