Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy over monitor datasources config from alerting to common utils #247

Merged
merged 2 commits into from
Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions src/main/kotlin/org/opensearch/commons/alerting/model/DataSources.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package org.opensearch.commons.alerting.model

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils
import java.io.IOException

data class DataSources(
/** Configures a custom query index name for the monitor. Creates a new index if index with given name not present.*/
val queryIndex: String = ScheduledJob.DOC_LEVEL_QUERIES_INDEX,

/** Configures a custom index to store findings for a monitor. Creates a new index if index with given name not present.
* If index is pre-existing, mapping is updated*/
val findingsIndex: String = ".opensearch-alerting-finding-history-write", // AlertIndices.FINDING_HISTORY_WRITE_INDEX

/** Configures a custom index to store alerts for a monitor. Creates a new index if index with given name not present.
* If index is pre-existing, mapping is updated. */
val alertsIndex: String = ".opendistro-alerting-alerts", // AlertIndices.ALERT_INDEX

/** Configures custom mappings by field type for query index.
* Custom query index mappings are configurable, only if a custom query index is configured too. */
val queryIndexMappingsByType: Map<String, Map<String, String>> = mapOf()

) : Writeable, ToXContentObject {

init {
require(queryIndex.isNotEmpty()) {
"Query index cannot be empty"
}
require(findingsIndex.isNotEmpty()) {
"Findings index cannot be empty"
}
require(alertsIndex.isNotEmpty()) {
"Alerts index cannot be empty"
}
if (queryIndexMappingsByType.isNotEmpty()) {
require(queryIndex != ScheduledJob.DOC_LEVEL_QUERIES_INDEX) {
"Custom query index mappings are configurable only if a custom query index is configured too."
}
require(
queryIndexMappingsByType.size == 1 &&
queryIndexMappingsByType.containsKey("text") &&
queryIndexMappingsByType.get("text")?.size == 1 &&
queryIndexMappingsByType.get("text")!!.containsKey("analyzer")
) {
"Custom query index mappings are currently configurable only for 'text' fields and mapping parameter can only be 'analyzer'"
}
}
}

@Throws(IOException::class)
@Suppress("UNCHECKED_CAST")
constructor(sin: StreamInput) : this(
queryIndex = sin.readString(),
findingsIndex = sin.readString(),
alertsIndex = sin.readString(),
queryIndexMappingsByType = sin.readMap() as Map<String, Map<String, String>>
)

@Suppress("UNCHECKED_CAST")
fun asTemplateArg(): Map<String, Any> {
return mapOf(
QUERY_INDEX_FIELD to queryIndex,
FINDINGS_INDEX_FIELD to findingsIndex,
ALERTS_INDEX_FIELD to alertsIndex,
QUERY_INDEX_MAPPINGS_BY_TYPE to queryIndexMappingsByType
) as Map<String, Any>
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
builder.field(QUERY_INDEX_FIELD, queryIndex)
builder.field(FINDINGS_INDEX_FIELD, findingsIndex)
builder.field(ALERTS_INDEX_FIELD, alertsIndex)
builder.field(QUERY_INDEX_MAPPINGS_BY_TYPE, queryIndexMappingsByType as Map<String, Any>)
builder.endObject()
return builder
}

companion object {
const val QUERY_INDEX_FIELD = "query_index"
const val FINDINGS_INDEX_FIELD = "findings_index"
const val ALERTS_INDEX_FIELD = "alerts_index"
const val QUERY_INDEX_MAPPINGS_BY_TYPE = "query_index_mappings_by_type"

@JvmStatic
@Throws(IOException::class)
@Suppress("UNCHECKED_CAST")
fun parse(xcp: XContentParser): DataSources {
var queryIndex = ""
var findingsIndex = ""
var alertsIndex = ""
var queryIndexMappingsByType: Map<String, Map<String, String>> = mapOf()

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
QUERY_INDEX_FIELD -> queryIndex = xcp.text()
FINDINGS_INDEX_FIELD -> findingsIndex = xcp.text()
ALERTS_INDEX_FIELD -> alertsIndex = xcp.text()
QUERY_INDEX_MAPPINGS_BY_TYPE -> queryIndexMappingsByType = xcp.map() as Map<String, Map<String, String>>
}
}
return DataSources(
queryIndex = queryIndex,
findingsIndex = findingsIndex,
alertsIndex = alertsIndex,
queryIndexMappingsByType = queryIndexMappingsByType
)
}
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(queryIndex)
out.writeString(findingsIndex)
out.writeString(alertsIndex)
out.writeMap(queryIndexMappingsByType as Map<String, Any>)
}
}
20 changes: 17 additions & 3 deletions src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ data class Monitor(
val schemaVersion: Int = NO_SCHEMA_VERSION,
val inputs: List<Input>,
val triggers: List<Trigger>,
val uiMetadata: Map<String, Any>
val uiMetadata: Map<String, Any>,
val dataSources: DataSources = DataSources()
) : ScheduledJob {

override val type = MONITOR_TYPE
Expand Down Expand Up @@ -96,7 +97,12 @@ data class Monitor(
schemaVersion = sin.readInt(),
inputs = sin.readList((Input)::readFrom),
triggers = sin.readList((Trigger)::readFrom),
uiMetadata = suppressWarning(sin.readMap())
uiMetadata = suppressWarning(sin.readMap()),
dataSources = if (sin.readBoolean()) {
DataSources(sin)
} else {
DataSources()
}
)

// This enum classifies different Monitors
Expand Down Expand Up @@ -144,6 +150,7 @@ data class Monitor(
.field(TRIGGERS_FIELD, triggers.toTypedArray())
.optionalTimeField(LAST_UPDATE_TIME_FIELD, lastUpdateTime)
if (uiMetadata.isNotEmpty()) builder.field(UI_METADATA_FIELD, uiMetadata)
builder.field(DATA_SOURCES_FIELD, dataSources)
if (params.paramAsBoolean("with_type", false)) builder.endObject()
return builder.endObject()
}
Expand Down Expand Up @@ -186,6 +193,8 @@ data class Monitor(
it.writeTo(out)
}
out.writeMap(uiMetadata)
out.writeBoolean(dataSources != null) // for backward compatibility with pre-existing monitors which don't have datasources field
dataSources.writeTo(out)
}

companion object {
Expand All @@ -203,6 +212,7 @@ data class Monitor(
const val INPUTS_FIELD = "inputs"
const val LAST_UPDATE_TIME_FIELD = "last_update_time"
const val UI_METADATA_FIELD = "ui_metadata"
const val DATA_SOURCES_FIELD = "data_sources"
const val ENABLED_TIME_FIELD = "enabled_time"

// This is defined here instead of in ScheduledJob to avoid having the ScheduledJob class know about all
Expand All @@ -229,6 +239,7 @@ data class Monitor(
var schemaVersion = NO_SCHEMA_VERSION
val triggers: MutableList<Trigger> = mutableListOf()
val inputs: MutableList<Input> = mutableListOf()
var dataSources = DataSources()

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
Expand Down Expand Up @@ -274,6 +285,8 @@ data class Monitor(
ENABLED_TIME_FIELD -> enabledTime = xcp.instant()
LAST_UPDATE_TIME_FIELD -> lastUpdateTime = xcp.instant()
UI_METADATA_FIELD -> uiMetadata = xcp.map()
DATA_SOURCES_FIELD -> dataSources = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) DataSources()
else DataSources.parse(xcp)
else -> {
xcp.skipChildren()
}
Expand All @@ -298,7 +311,8 @@ data class Monitor(
schemaVersion,
inputs.toList(),
triggers.toList(),
uiMetadata
uiMetadata,
dataSources
)
}

Expand Down