From 6a579cd6aa59862e2b0692b44155b6a0d1645c56 Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Tue, 6 Feb 2024 11:49:37 -0800 Subject: [PATCH] [Backport 2.x] Implemented cross-cluster monitor support #584 (#586) * Added clusters field to support cross cluster cluster metrics monitors. Signed-off-by: AWSHurneyt * Fixed writeTo. Signed-off-by: AWSHurneyt * Updated tests. Signed-off-by: AWSHurneyt * Updated tests. Signed-off-by: AWSHurneyt --------- Signed-off-by: AWSHurneyt --- .../commons/alerting/model/Alert.kt | 46 +++++++++--- .../alerting/model/ClusterMetricsInput.kt | 73 +++++++++++-------- .../opensearch/commons/alerting/AlertTests.kt | 2 + .../commons/alerting/TestHelpers.kt | 18 ++--- .../model/ClusterMetricsInputTests.kt | 10 +-- .../commons/alerting/model/XContentTests.kt | 9 ++- 6 files changed, 101 insertions(+), 57 deletions(-) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt index 04df1b28..e4979407 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt @@ -43,6 +43,7 @@ data class Alert( val aggregationResultBucket: AggregationResultBucket? = null, val executionId: String? = null, val associatedAlertIds: List, + val clusters: List? = null, ) : Writeable, ToXContent { init { @@ -61,6 +62,7 @@ data class Alert( chainedAlertTrigger: ChainedAlertTrigger, workflow: Workflow, associatedAlertIds: List, + clusters: List? = null ) : this( monitorId = NO_ID, monitorName = "", @@ -82,7 +84,8 @@ data class Alert( executionId = executionId, workflowId = workflow.id, workflowName = workflow.name, - associatedAlertIds = associatedAlertIds + associatedAlertIds = associatedAlertIds, + clusters = clusters ) constructor( @@ -97,6 +100,7 @@ data class Alert( schemaVersion: Int = NO_SCHEMA_VERSION, executionId: String? = null, workflowId: String? = null, + clusters: List? = null ) : this( monitorId = monitor.id, monitorName = monitor.name, @@ -118,7 +122,8 @@ data class Alert( executionId = executionId, workflowId = workflowId ?: "", workflowName = "", - associatedAlertIds = emptyList() + associatedAlertIds = emptyList(), + clusters = clusters ) constructor( @@ -134,6 +139,7 @@ data class Alert( findingIds: List = emptyList(), executionId: String? = null, workflowId: String? = null, + clusters: List? = null ) : this( monitorId = monitor.id, monitorName = monitor.name, @@ -155,7 +161,8 @@ data class Alert( executionId = executionId, workflowId = workflowId ?: "", workflowName = "", - associatedAlertIds = emptyList() + associatedAlertIds = emptyList(), + clusters = clusters ) constructor( @@ -172,6 +179,7 @@ data class Alert( findingIds: List = emptyList(), executionId: String? = null, workflowId: String? = null, + clusters: List? = null ) : this( monitorId = monitor.id, monitorName = monitor.name, @@ -193,7 +201,8 @@ data class Alert( executionId = executionId, workflowId = workflowId ?: "", workflowName = "", - associatedAlertIds = emptyList() + associatedAlertIds = emptyList(), + clusters = clusters ) constructor( @@ -211,6 +220,7 @@ data class Alert( schemaVersion: Int = NO_SCHEMA_VERSION, executionId: String? = null, workflowId: String? = null, + clusters: List? = null ) : this( id = id, monitorId = monitor.id, @@ -233,7 +243,8 @@ data class Alert( executionId = executionId, workflowId = workflowId ?: "", workflowName = "", - associatedAlertIds = emptyList() + associatedAlertIds = emptyList(), + clusters = clusters ) constructor( @@ -248,6 +259,7 @@ data class Alert( schemaVersion: Int = NO_SCHEMA_VERSION, workflowId: String? = null, executionId: String?, + clusters: List? = null ) : this( id = id, monitorId = monitor.id, @@ -270,7 +282,8 @@ data class Alert( relatedDocIds = listOf(), workflowId = workflowId ?: "", executionId = executionId, - associatedAlertIds = emptyList() + associatedAlertIds = emptyList(), + clusters = clusters ) enum class State { @@ -311,7 +324,8 @@ data class Alert( actionExecutionResults = sin.readList(::ActionExecutionResult), aggregationResultBucket = if (sin.readBoolean()) AggregationResultBucket(sin) else null, executionId = sin.readOptionalString(), - associatedAlertIds = sin.readStringList() + associatedAlertIds = sin.readStringList(), + clusters = sin.readOptionalStringList() ) fun isAcknowledged(): Boolean = (state == State.ACKNOWLEDGED) @@ -349,6 +363,7 @@ data class Alert( } out.writeOptionalString(executionId) out.writeStringCollection(associatedAlertIds) + out.writeOptionalStringArray(clusters?.toTypedArray()) } companion object { @@ -379,6 +394,7 @@ data class Alert( const val ASSOCIATED_ALERT_IDS_FIELD = "associated_alert_ids" const val BUCKET_KEYS = AggregationResultBucket.BUCKET_KEYS const val PARENTS_BUCKET_PATH = AggregationResultBucket.PARENTS_BUCKET_PATH + const val CLUSTERS_FIELD = "clusters" const val NO_ID = "" const val NO_VERSION = Versions.NOT_FOUND @@ -409,6 +425,7 @@ data class Alert( val actionExecutionResults: MutableList = mutableListOf() var aggAlertBucket: AggregationResultBucket? = null val associatedAlertIds = mutableListOf() + val clusters = mutableListOf() ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { val fieldName = xcp.currentName() @@ -475,6 +492,12 @@ data class Alert( AggregationResultBucket.parse(xcp) } } + CLUSTERS_FIELD -> { + ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + clusters.add(xcp.text()) + } + } } } @@ -503,7 +526,8 @@ data class Alert( executionId = executionId, workflowId = workflowId, workflowName = workflowName, - associatedAlertIds = associatedAlertIds + associatedAlertIds = associatedAlertIds, + clusters = if (clusters.size > 0) clusters else null ) } @@ -553,6 +577,9 @@ data class Alert( .optionalTimeField(END_TIME_FIELD, endTime) .optionalTimeField(ACKNOWLEDGED_TIME_FIELD, acknowledgedTime) aggregationResultBucket?.innerXContent(builder) + + if (!clusters.isNullOrEmpty()) builder.field(CLUSTERS_FIELD, clusters.toTypedArray()) + builder.endObject() return builder } @@ -576,7 +603,8 @@ data class Alert( BUCKET_KEYS to aggregationResultBucket?.bucketKeys?.joinToString(","), PARENTS_BUCKET_PATH to aggregationResultBucket?.parentBucketPath, FINDING_IDS to findingIds.joinToString(","), - RELATED_DOC_IDS to relatedDocIds.joinToString(",") + RELATED_DOC_IDS to relatedDocIds.joinToString(","), + CLUSTERS_FIELD to clusters?.joinToString(",") ) } } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/ClusterMetricsInput.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/ClusterMetricsInput.kt index 81432546..0c38f8a5 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/ClusterMetricsInput.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/ClusterMetricsInput.kt @@ -13,6 +13,7 @@ import org.opensearch.core.xcontent.XContentParser import org.opensearch.core.xcontent.XContentParserUtils import java.io.IOException import java.net.URI +import java.net.URISyntaxException val ILLEGAL_PATH_PARAMETER_CHARACTERS = arrayOf(':', '"', '+', '\\', '|', '?', '#', '>', '<', ' ') @@ -22,7 +23,8 @@ val ILLEGAL_PATH_PARAMETER_CHARACTERS = arrayOf(':', '"', '+', '\\', '|', '?', ' data class ClusterMetricsInput( var path: String, var pathParams: String = "", - var url: String + var url: String, + var clusters: List = listOf() ) : Input { val clusterMetricType: ClusterMetricType val constructedUri: URI @@ -43,11 +45,10 @@ data class ClusterMetricsInput( "Invalid URI constructed from the path and path_params inputs, or the url input." } - if (url.isNotEmpty() && validateFieldsNotEmpty()) { + if (url.isNotEmpty() && validateFieldsNotEmpty()) require(constructedUri == constructUrlFromInputs()) { "The provided URL and URI fields form different URLs." } - } require(constructedUri.host.lowercase() == SUPPORTED_HOST) { "Only host '$SUPPORTED_HOST' is supported." @@ -74,6 +75,7 @@ data class ClusterMetricsInput( .field(PATH_FIELD, path) .field(PATH_PARAMS_FIELD, pathParams) .field(URL_FIELD, url) + .field(CLUSTERS_FIELD, clusters) .endObject() .endObject() } @@ -87,6 +89,7 @@ data class ClusterMetricsInput( out.writeString(path) out.writeString(pathParams) out.writeString(url) + out.writeStringArray(clusters.toTypedArray()) } companion object { @@ -99,18 +102,19 @@ data class ClusterMetricsInput( const val PATH_PARAMS_FIELD = "path_params" const val URL_FIELD = "url" const val URI_FIELD = "uri" + const val CLUSTERS_FIELD = "clusters" val XCONTENT_REGISTRY = NamedXContentRegistry.Entry(Input::class.java, ParseField(URI_FIELD), CheckedFunction { parseInner(it) }) /** * This parse function uses [XContentParser] to parse JSON input and store corresponding fields to create a [ClusterMetricsInput] object */ - @JvmStatic - @Throws(IOException::class) + @JvmStatic @Throws(IOException::class) fun parseInner(xcp: XContentParser): ClusterMetricsInput { var path = "" var pathParams = "" var url = "" + val clusters = mutableListOf() XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) @@ -121,9 +125,17 @@ data class ClusterMetricsInput( PATH_FIELD -> path = xcp.text() PATH_PARAMS_FIELD -> pathParams = xcp.text() URL_FIELD -> url = xcp.text() + CLUSTERS_FIELD -> { + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_ARRAY, + xcp.currentToken(), + xcp + ) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) clusters.add(xcp.text()) + } } } - return ClusterMetricsInput(path, pathParams, url) + return ClusterMetricsInput(path, pathParams, url, clusters) } } @@ -163,20 +175,17 @@ data class ClusterMetricsInput( if (pathParams.isNotEmpty()) { pathParams = pathParams.trim('/') ILLEGAL_PATH_PARAMETER_CHARACTERS.forEach { character -> - if (pathParams.contains(character)) { + if (pathParams.contains(character)) throw IllegalArgumentException( - "The provided path parameters contain invalid characters or spaces. Please omit: " + "${ILLEGAL_PATH_PARAMETER_CHARACTERS.joinToString(" ")}" + "The provided path parameters contain invalid characters or spaces. Please omit: " + ILLEGAL_PATH_PARAMETER_CHARACTERS.joinToString(" ") ) - } } } - if (apiType.requiresPathParams && pathParams.isEmpty()) { + if (apiType.requiresPathParams && pathParams.isEmpty()) throw IllegalArgumentException("The API requires path parameters.") - } - if (!apiType.supportsPathParams && pathParams.isNotEmpty()) { + if (!apiType.supportsPathParams && pathParams.isNotEmpty()) throw IllegalArgumentException("The API does not use path parameters.") - } return pathParams } @@ -192,13 +201,11 @@ data class ClusterMetricsInput( ClusterMetricType.values() .filter { option -> option != ClusterMetricType.BLANK } .forEach { option -> - if (uriPath.startsWith(option.prependPath) || uriPath.startsWith(option.defaultPath)) { + if (uriPath.startsWith(option.prependPath) || uriPath.startsWith(option.defaultPath)) apiType = option - } } - if (apiType.isBlank()) { + if (apiType.isBlank()) throw IllegalArgumentException("The API could not be determined from the provided URI.") - } return apiType } @@ -207,12 +214,23 @@ data class ClusterMetricsInput( * @return The constructed [URI]. */ private fun constructUrlFromInputs(): URI { - val uriBuilder = URIBuilder() - .setScheme(SUPPORTED_SCHEME) - .setHost(SUPPORTED_HOST) - .setPort(SUPPORTED_PORT) - .setPath(path + pathParams) - return uriBuilder.build() + /** + * this try-catch block is required due to a httpcomponents 5.1.x library issue + * it auto encodes path params in the url. + */ + return try { + val formattedPath = if (path.startsWith("/") || path.isBlank()) path else "/$path" + val formattedPathParams = if (pathParams.startsWith("/") || pathParams.isBlank()) pathParams else "/$pathParams" + val uriBuilder = URIBuilder("$SUPPORTED_SCHEME://$SUPPORTED_HOST:$SUPPORTED_PORT$formattedPath$formattedPathParams") + uriBuilder.build() + } catch (ex: URISyntaxException) { + val uriBuilder = URIBuilder() + .setScheme(SUPPORTED_SCHEME) + .setHost(SUPPORTED_HOST) + .setPort(SUPPORTED_PORT) + .setPath(path + pathParams) + uriBuilder.build() + } } /** @@ -220,15 +238,12 @@ data class ClusterMetricsInput( * If [path] and [pathParams] are empty, populates them with values from [url]. */ private fun parseEmptyFields() { - if (pathParams.isEmpty()) { + if (pathParams.isEmpty()) pathParams = this.parsePathParams() - } - if (path.isEmpty()) { + if (path.isEmpty()) path = if (pathParams.isEmpty()) clusterMetricType.defaultPath else clusterMetricType.prependPath - } - if (url.isEmpty()) { + if (url.isEmpty()) url = constructedUri.toString() - } } /** diff --git a/src/test/kotlin/org/opensearch/commons/alerting/AlertTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/AlertTests.kt index cf6a4947..8b4db7c4 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/AlertTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/AlertTests.kt @@ -23,6 +23,7 @@ class AlertTests { assertEquals(templateArgs[Alert.START_TIME_FIELD], alert.startTime.toEpochMilli(), "Template args start time does not") assertEquals(templateArgs[Alert.LAST_NOTIFICATION_TIME_FIELD], null, "Template args last notification time does not match") assertEquals(templateArgs[Alert.SEVERITY_FIELD], alert.severity, "Template args severity does not match") + assertEquals(templateArgs[Alert.CLUSTERS_FIELD], alert.clusters?.joinToString(","), "Template args clusters does not match") } @Test @@ -40,6 +41,7 @@ class AlertTests { assertEquals(templateArgs[Alert.START_TIME_FIELD], alert.startTime.toEpochMilli(), "Template args start time does not") assertEquals(templateArgs[Alert.LAST_NOTIFICATION_TIME_FIELD], null, "Template args last notification time does not match") assertEquals(templateArgs[Alert.SEVERITY_FIELD], alert.severity, "Template args severity does not match") + assertEquals(templateArgs[Alert.CLUSTERS_FIELD], alert.clusters?.joinToString(","), "Template args clusters does not match") assertEquals( templateArgs[Alert.BUCKET_KEYS], alert.aggregationResultBucket?.bucketKeys?.joinToString(","), diff --git a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt index 3889d1d6..b4337e7f 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt @@ -286,9 +286,9 @@ fun randomDocumentLevelTrigger( name = name, severity = severity, condition = condition, - actions = if (actions.isEmpty() && destinationId.isNotBlank()) { + actions = if (actions.isEmpty() && destinationId.isNotBlank()) (0..RandomNumbers.randomIntBetween(Random(), 0, 10)).map { randomAction(destinationId = destinationId) } - } else actions + else actions ) } @@ -527,12 +527,11 @@ fun assertUserNull(monitor: Monitor) { fun randomAlert(monitor: Monitor = randomQueryLevelMonitor()): Alert { val trigger = randomQueryLevelTrigger() val actionExecutionResults = mutableListOf(randomActionExecutionResult(), randomActionExecutionResult()) + val clusterCount = (-1..5).random() + val clusters = if (clusterCount == -1) null else (0..clusterCount).map { "index-$it" } return Alert( - monitor, - trigger, - Instant.now().truncatedTo(ChronoUnit.MILLIS), - null, - actionExecutionResults = actionExecutionResults + monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null, + actionExecutionResults = actionExecutionResults, clusters = clusters ) } @@ -562,10 +561,7 @@ fun randomAlertWithAggregationResultBucket(monitor: Monitor = randomBucketLevelM val trigger = randomBucketLevelTrigger() val actionExecutionResults = mutableListOf(randomActionExecutionResult(), randomActionExecutionResult()) return Alert( - monitor, - trigger, - Instant.now().truncatedTo(ChronoUnit.MILLIS), - null, + monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null, actionExecutionResults = actionExecutionResults, aggregationResultBucket = AggregationResultBucket( "parent_bucket_path_1", diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/ClusterMetricsInputTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/ClusterMetricsInputTests.kt index d9dcd1f3..9980d5db 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/ClusterMetricsInputTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/ClusterMetricsInputTests.kt @@ -89,7 +89,7 @@ class ClusterMetricsInputTests { @Test fun `test url field and URI component fields with path params create equal URI`() { // GIVEN - path = "/_cluster/health/" + path = "/_cluster/health" pathParams = "index1,index2,index3,index4,index5" url = "http://localhost:9200/_cluster/health/index1,index2,index3,index4,index5" @@ -205,7 +205,7 @@ class ClusterMetricsInputTests { @Test fun `test parsePathParams with path params as URI field`() { // GIVEN - path = "/_cluster/health/" + path = "/_cluster/health" pathParams = "index1,index2,index3,index4,index5" val testUrl = "http://localhost:9200/_cluster/health/index1,index2,index3,index4,index5" val clusterMetricsInput = ClusterMetricsInput(path, pathParams, url) @@ -268,7 +268,7 @@ class ClusterMetricsInputTests { // WHEN + THEN assertFailsWith( - "The provided path parameters contain invalid characters or spaces. Please omit: " + "${ILLEGAL_PATH_PARAMETER_CHARACTERS.joinToString(" ")}" + "The provided path parameters contain invalid characters or spaces. Please omit: " + ILLEGAL_PATH_PARAMETER_CHARACTERS.joinToString(" ") ) { clusterMetricsInput.parsePathParams() } @@ -427,9 +427,9 @@ class ClusterMetricsInputTests { @Test fun `test parseEmptyFields populates empty url field when path and path_params are provided`() { // GIVEN - path = "/_cluster/health/" + path = "/_cluster/health" pathParams = "index1,index2,index3,index4,index5" - val testUrl = "http://localhost:9200$path$pathParams" + val testUrl = "http://localhost:9200$path/$pathParams" // WHEN val clusterMetricsInput = ClusterMetricsInput(path, pathParams, url) diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt index 7c52ff42..e56d4aab 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt @@ -450,7 +450,8 @@ class XContentTests { errorMessage = "some error", lastNotificationTime = Instant.now(), workflowId = "", - executionId = "" + executionId = "", + clusters = listOf() ) assertEquals("Round tripping alert doesn't work", alert.triggerName, "NoOp trigger") } @@ -462,7 +463,8 @@ class XContentTests { "\"state\":\"ACTIVE\",\"error_message\":null,\"alert_history\":[],\"severity\":\"1\",\"action_execution_results\"" + ":[{\"action_id\":\"ghe1-XQBySl0wQKDBkOG\",\"last_execution_time\":1601917224583,\"throttled_count\":-1478015168}," + "{\"action_id\":\"gxe1-XQBySl0wQKDBkOH\",\"last_execution_time\":1601917224583,\"throttled_count\":-768533744}]," + - "\"start_time\":1601917224599,\"last_notification_time\":null,\"end_time\":null,\"acknowledged_time\":null}" + "\"start_time\":1601917224599,\"last_notification_time\":null,\"end_time\":null,\"acknowledged_time\":null," + + "\"clusters\":[\"cluster-1\",\"cluster-2\"]}" val parsedAlert = Alert.parse(parser(alertStr)) OpenSearchTestCase.assertNull(parsedAlert.monitorUser) } @@ -475,7 +477,8 @@ class XContentTests { "\"state\":\"ACTIVE\",\"error_message\":null,\"alert_history\":[],\"severity\":\"1\",\"action_execution_results\"" + ":[{\"action_id\":\"ghe1-XQBySl0wQKDBkOG\",\"last_execution_time\":1601917224583,\"throttled_count\":-1478015168}," + "{\"action_id\":\"gxe1-XQBySl0wQKDBkOH\",\"last_execution_time\":1601917224583,\"throttled_count\":-768533744}]," + - "\"start_time\":1601917224599,\"last_notification_time\":null,\"end_time\":null,\"acknowledged_time\":null}" + "\"start_time\":1601917224599,\"last_notification_time\":null,\"end_time\":null,\"acknowledged_time\":null," + + "\"clusters\":[\"cluster-1\",\"cluster-2\"]}" val parsedAlert = Alert.parse(parser(alertStr)) OpenSearchTestCase.assertNull(parsedAlert.monitorUser) }