Skip to content

Commit

Permalink
Copy over monitor datasources config from alerting to common utils (#247
Browse files Browse the repository at this point in the history
) (#271)

* copy over monitor datasources config from alerting to common utils

Signed-off-by: Surya Sashank Nistala <[email protected]>
(cherry picked from commit 3dce8f7)

Co-authored-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] and eirsep authored Oct 7, 2022
1 parent 9c2621e commit 03fd527
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 3 deletions.
128 changes: 128 additions & 0 deletions src/main/kotlin/org/opensearch/commons/alerting/model/DataSources.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package org.opensearch.commons.alerting.model

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils
import java.io.IOException

data class DataSources(
/** Configures a custom query index name for the monitor. Creates a new index if index with given name not present.*/
val queryIndex: String = ScheduledJob.DOC_LEVEL_QUERIES_INDEX,

/** Configures a custom index to store findings for a monitor. Creates a new index if index with given name not present.
* If index is pre-existing, mapping is updated*/
val findingsIndex: String = ".opensearch-alerting-finding-history-write", // AlertIndices.FINDING_HISTORY_WRITE_INDEX

/** Configures a custom index to store alerts for a monitor. Creates a new index if index with given name not present.
* If index is pre-existing, mapping is updated. */
val alertsIndex: String = ".opendistro-alerting-alerts", // AlertIndices.ALERT_INDEX

/** Configures custom mappings by field type for query index.
* Custom query index mappings are configurable, only if a custom query index is configured too. */
val queryIndexMappingsByType: Map<String, Map<String, String>> = mapOf()

) : Writeable, ToXContentObject {

init {
require(queryIndex.isNotEmpty()) {
"Query index cannot be empty"
}
require(findingsIndex.isNotEmpty()) {
"Findings index cannot be empty"
}
require(alertsIndex.isNotEmpty()) {
"Alerts index cannot be empty"
}
if (queryIndexMappingsByType.isNotEmpty()) {
require(queryIndex != ScheduledJob.DOC_LEVEL_QUERIES_INDEX) {
"Custom query index mappings are configurable only if a custom query index is configured too."
}
require(
queryIndexMappingsByType.size == 1 &&
queryIndexMappingsByType.containsKey("text") &&
queryIndexMappingsByType.get("text")?.size == 1 &&
queryIndexMappingsByType.get("text")!!.containsKey("analyzer")
) {
"Custom query index mappings are currently configurable only for 'text' fields and mapping parameter can only be 'analyzer'"
}
}
}

@Throws(IOException::class)
@Suppress("UNCHECKED_CAST")
constructor(sin: StreamInput) : this(
queryIndex = sin.readString(),
findingsIndex = sin.readString(),
alertsIndex = sin.readString(),
queryIndexMappingsByType = sin.readMap() as Map<String, Map<String, String>>
)

@Suppress("UNCHECKED_CAST")
fun asTemplateArg(): Map<String, Any> {
return mapOf(
QUERY_INDEX_FIELD to queryIndex,
FINDINGS_INDEX_FIELD to findingsIndex,
ALERTS_INDEX_FIELD to alertsIndex,
QUERY_INDEX_MAPPINGS_BY_TYPE to queryIndexMappingsByType
) as Map<String, Any>
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
builder.field(QUERY_INDEX_FIELD, queryIndex)
builder.field(FINDINGS_INDEX_FIELD, findingsIndex)
builder.field(ALERTS_INDEX_FIELD, alertsIndex)
builder.field(QUERY_INDEX_MAPPINGS_BY_TYPE, queryIndexMappingsByType as Map<String, Any>)
builder.endObject()
return builder
}

companion object {
const val QUERY_INDEX_FIELD = "query_index"
const val FINDINGS_INDEX_FIELD = "findings_index"
const val ALERTS_INDEX_FIELD = "alerts_index"
const val QUERY_INDEX_MAPPINGS_BY_TYPE = "query_index_mappings_by_type"

@JvmStatic
@Throws(IOException::class)
@Suppress("UNCHECKED_CAST")
fun parse(xcp: XContentParser): DataSources {
var queryIndex = ""
var findingsIndex = ""
var alertsIndex = ""
var queryIndexMappingsByType: Map<String, Map<String, String>> = mapOf()

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
QUERY_INDEX_FIELD -> queryIndex = xcp.text()
FINDINGS_INDEX_FIELD -> findingsIndex = xcp.text()
ALERTS_INDEX_FIELD -> alertsIndex = xcp.text()
QUERY_INDEX_MAPPINGS_BY_TYPE -> queryIndexMappingsByType = xcp.map() as Map<String, Map<String, String>>
}
}
return DataSources(
queryIndex = queryIndex,
findingsIndex = findingsIndex,
alertsIndex = alertsIndex,
queryIndexMappingsByType = queryIndexMappingsByType
)
}
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(queryIndex)
out.writeString(findingsIndex)
out.writeString(alertsIndex)
out.writeMap(queryIndexMappingsByType as Map<String, Any>)
}
}
20 changes: 17 additions & 3 deletions src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ data class Monitor(
val schemaVersion: Int = NO_SCHEMA_VERSION,
val inputs: List<Input>,
val triggers: List<Trigger>,
val uiMetadata: Map<String, Any>
val uiMetadata: Map<String, Any>,
val dataSources: DataSources = DataSources()
) : ScheduledJob {

override val type = MONITOR_TYPE
Expand Down Expand Up @@ -96,7 +97,12 @@ data class Monitor(
schemaVersion = sin.readInt(),
inputs = sin.readList((Input)::readFrom),
triggers = sin.readList((Trigger)::readFrom),
uiMetadata = suppressWarning(sin.readMap())
uiMetadata = suppressWarning(sin.readMap()),
dataSources = if (sin.readBoolean()) {
DataSources(sin)
} else {
DataSources()
}
)

// This enum classifies different Monitors
Expand Down Expand Up @@ -144,6 +150,7 @@ data class Monitor(
.field(TRIGGERS_FIELD, triggers.toTypedArray())
.optionalTimeField(LAST_UPDATE_TIME_FIELD, lastUpdateTime)
if (uiMetadata.isNotEmpty()) builder.field(UI_METADATA_FIELD, uiMetadata)
builder.field(DATA_SOURCES_FIELD, dataSources)
if (params.paramAsBoolean("with_type", false)) builder.endObject()
return builder.endObject()
}
Expand Down Expand Up @@ -186,6 +193,8 @@ data class Monitor(
it.writeTo(out)
}
out.writeMap(uiMetadata)
out.writeBoolean(dataSources != null) // for backward compatibility with pre-existing monitors which don't have datasources field
dataSources.writeTo(out)
}

companion object {
Expand All @@ -203,6 +212,7 @@ data class Monitor(
const val INPUTS_FIELD = "inputs"
const val LAST_UPDATE_TIME_FIELD = "last_update_time"
const val UI_METADATA_FIELD = "ui_metadata"
const val DATA_SOURCES_FIELD = "data_sources"
const val ENABLED_TIME_FIELD = "enabled_time"

// This is defined here instead of in ScheduledJob to avoid having the ScheduledJob class know about all
Expand All @@ -229,6 +239,7 @@ data class Monitor(
var schemaVersion = NO_SCHEMA_VERSION
val triggers: MutableList<Trigger> = mutableListOf()
val inputs: MutableList<Input> = mutableListOf()
var dataSources = DataSources()

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
Expand Down Expand Up @@ -274,6 +285,8 @@ data class Monitor(
ENABLED_TIME_FIELD -> enabledTime = xcp.instant()
LAST_UPDATE_TIME_FIELD -> lastUpdateTime = xcp.instant()
UI_METADATA_FIELD -> uiMetadata = xcp.map()
DATA_SOURCES_FIELD -> dataSources = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) DataSources()
else DataSources.parse(xcp)
else -> {
xcp.skipChildren()
}
Expand All @@ -298,7 +311,8 @@ data class Monitor(
schemaVersion,
inputs.toList(),
triggers.toList(),
uiMetadata
uiMetadata,
dataSources
)
}

Expand Down

0 comments on commit 03fd527

Please sign in to comment.