From a9fec0493f54902c2eabbc328e987812664dd126 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Wed, 30 Aug 2023 13:20:55 -0700 Subject: [PATCH 1/4] support multiple monitors in Chained Monitor Findings Signed-off-by: Surya Sashank Nistala --- .../alerting/action/IndexWorkflowRequest.kt | 50 +++++++++++++------ .../alerting/model/ChainedMonitorFindings.kt | 42 +++++++++++++--- .../commons/alerting/TestHelpers.kt | 5 ++ .../action/IndexWorkflowRequestTests.kt | 34 +++++++++++++ .../commons/alerting/model/XContentTests.kt | 17 ++++++- 5 files changed, 125 insertions(+), 23 deletions(-) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequest.kt index dc2b0936..c58249c8 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequest.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequest.kt @@ -30,7 +30,7 @@ class IndexWorkflowRequest : ActionRequest { refreshPolicy: WriteRequest.RefreshPolicy, method: RestRequest.Method, workflow: Workflow, - rbacRoles: List? = null + rbacRoles: List? = null, ) : super() { this.workflowId = workflowId this.seqNo = seqNo @@ -105,19 +105,41 @@ class IndexWorkflowRequest : ActionRequest { val monitorIdOrderMap: Map = delegates.associate { it.monitorId to it.order } delegates.forEach { if (it.chainedMonitorFindings != null) { - if (monitorIdOrderMap.containsKey(it.chainedMonitorFindings.monitorId) == false) { - validationException = ValidateActions.addValidationError( - "Chained Findings Monitor ${it.chainedMonitorFindings.monitorId} doesn't exist in sequence", - validationException - ) - // Break the flow because next check will generate the NPE - return validationException - } - if (it.order <= monitorIdOrderMap[it.chainedMonitorFindings.monitorId]!!) { - validationException = ValidateActions.addValidationError( - "Chained Findings Monitor ${it.chainedMonitorFindings.monitorId} should be executed before monitor ${it.monitorId}", - validationException - ) + + if (it.chainedMonitorFindings.monitorId != null) { + if (monitorIdOrderMap.containsKey(it.chainedMonitorFindings.monitorId) == false) { + validationException = ValidateActions.addValidationError( + "Chained Findings Monitor ${it.chainedMonitorFindings.monitorId} doesn't exist in sequence", + validationException + ) + // Break the flow because next check will generate the NPE + return validationException + } + if (it.order <= monitorIdOrderMap[it.chainedMonitorFindings.monitorId]!!) { + validationException = ValidateActions.addValidationError( + "Chained Findings Monitor ${it.chainedMonitorFindings.monitorId} should be executed before monitor ${it.monitorId}", + validationException + ) + } + } else { + for (monitorId in it.chainedMonitorFindings.monitorIds) { + if (!monitorIdOrderMap.containsKey(monitorId)) { + validationException = ValidateActions.addValidationError( + "Chained Findings Monitor $monitorId doesn't exist in sequence", + validationException + ) + return validationException + } else { + val order = monitorIdOrderMap.get(monitorId)!! + if (order >= it.order) { + return ValidateActions.addValidationError( + "Chained Findings Monitor ${it.chainedMonitorFindings.monitorId} should be executed before monitor ${it.monitorId}. " + + "Order of monitor being chained [$order] should be smaller than order of monitor using findings as source data [${it.order}] in sequence", + validationException + ) + } + } + } } } } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedMonitorFindings.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedMonitorFindings.kt index dbc15e34..c9873cdf 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedMonitorFindings.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedMonitorFindings.kt @@ -9,50 +9,65 @@ import org.opensearch.core.xcontent.XContentBuilder import org.opensearch.core.xcontent.XContentParser import org.opensearch.core.xcontent.XContentParserUtils import java.io.IOException +import java.util.Collections /** - * Context passed in delegate monitor to filter data queried by a monitor based on the findings of the given monitor id. + * Context passed in delegate monitor to filter data matched by a list of monitors based on the findings of the given monitor ids. */ // TODO - Remove the class and move the monitorId to Delegate (as a chainedMonitorId property) if this class won't be updated by adding new properties data class ChainedMonitorFindings( - val monitorId: String + val monitorId: String? = null, + val monitorIds: List = emptyList(), // if monitorId field is non-null it would be given precendence for BWC ) : BaseModel { init { - validateId(monitorId) + require(!(monitorId.isNullOrBlank() && monitorIds.isEmpty())) { + "at least one of fields, 'monitorIds' and 'monitorId' should be provided" + } + if(monitorId!= null && monitorId.isBlank()) { + validateId(monitorId) + } else { + monitorIds.forEach { validateId(it) } + } } @Throws(IOException::class) constructor(sin: StreamInput) : this( - sin.readString(), // monitorId + sin.readOptionalString(), // monitorId + Collections.unmodifiableList(sin.readStringList()) ) + @Suppress("UNCHECKED_CAST") fun asTemplateArg(): Map { return mapOf( MONITOR_ID_FIELD to monitorId, - ) + MONITOR_IDS_FIELD to monitorIds + ) as Map } @Throws(IOException::class) override fun writeTo(out: StreamOutput) { - out.writeString(monitorId) + out.writeOptionalString(monitorId) + out.writeStringCollection(monitorIds) } override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { builder.startObject() .field(MONITOR_ID_FIELD, monitorId) + .field(MONITOR_IDS_FIELD, monitorIds) .endObject() return builder } companion object { const val MONITOR_ID_FIELD = "monitor_id" + const val MONITOR_IDS_FIELD = "monitor_ids" @JvmStatic @Throws(IOException::class) fun parse(xcp: XContentParser): ChainedMonitorFindings { lateinit var monitorId: String - + val monitorIds = mutableListOf() XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { val fieldName = xcp.currentName() @@ -63,9 +78,20 @@ data class ChainedMonitorFindings( monitorId = xcp.text() validateId(monitorId) } + + MONITOR_IDS_FIELD -> { + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_ARRAY, + xcp.currentToken(), + xcp + ) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + monitorIds.add(xcp.text()) + } + } } } - return ChainedMonitorFindings(monitorId) + return ChainedMonitorFindings(monitorId, monitorIds) } @JvmStatic diff --git a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt index a0d32843..f3183e51 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt @@ -405,6 +405,11 @@ fun randomClusterMetricsInput( return ClusterMetricsInput(path, pathParams, url) } +fun ChainedMonitorFindings.toJsonString(): String { + val builder = XContentFactory.jsonBuilder() + return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string() +} + fun Workflow.toJsonString(): String { val builder = XContentFactory.jsonBuilder() return this.toXContentWithUser(builder, ToXContent.EMPTY_PARAMS).string() diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequestTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequestTests.kt index 8ff08738..58feffb2 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequestTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequestTests.kt @@ -21,6 +21,9 @@ import org.opensearch.search.SearchModule import java.lang.Exception import java.lang.IllegalArgumentException import java.util.UUID +import kotlin.test.assertNotNull +import kotlin.test.assertNull +import kotlin.test.assertTrue class IndexWorkflowRequestTests { @@ -196,6 +199,21 @@ class IndexWorkflowRequestTests { delegates = listOf( Delegate(1, "monitor-1") ) + + // Chained finding list of monitors valid + delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(2, "monitor-2"), + Delegate(3, "monitor-3", ChainedMonitorFindings(null, listOf("monitor-1", "monitor-2"))), + + ) + val req7 = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT, + randomWorkflowWithDelegates( + input = listOf(CompositeInput(Sequence(delegates = delegates))) + ) + ) + assertNull(req7.validate()) try { IndexWorkflowRequest( "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT, @@ -207,5 +225,21 @@ class IndexWorkflowRequestTests { Assert.assertTrue(ex is IllegalArgumentException) Assert.assertTrue(ex.message!!.contains("Workflows can only have 1 search input.")) } + + // Chained finding list of monitors invalid order and old field null + delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(3, "monitor-2"), + Delegate(2, "monitor-3", ChainedMonitorFindings(null, listOf("monitor-1", "monitor-2"))), + + ) + val req8 = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT, + randomWorkflowWithDelegates( + input = listOf(CompositeInput(Sequence(delegates = delegates))) + ) + ) + assertNotNull(req8.validate()) + assertTrue(req8.validate()!!.message!!.contains("should be executed before monitor")) } } diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt index 63588755..a284c187 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt @@ -267,12 +267,27 @@ class XContentTests { @Test fun `test workflow parsing`() { val workflow = randomWorkflow(monitorIds = listOf("1", "2", "3")) - val monitorString = workflow.toJsonString() val parsedWorkflow = Workflow.parse(parser(monitorString)) Assertions.assertEquals(workflow, parsedWorkflow, "Round tripping workflow failed") } + @Test + fun `test chainedMonitorFindings parsing`() { + val cmf1 = ChainedMonitorFindings(monitorId = "m1") + val cmf1String = cmf1.toJsonString() + Assertions.assertEquals( + ChainedMonitorFindings.parse(parser(cmf1String)), cmf1, + "Round tripping chained monitor findings failed" + ) + val cmf2 = ChainedMonitorFindings(monitorIds = listOf("m1", "m2")) + val cmf2String = cmf2.toJsonString() + Assertions.assertEquals( + ChainedMonitorFindings.parse(parser(cmf2String)), cmf2, + "Round tripping chained monitor findings failed" + ) + } + @Test fun `test old monitor format parsing`() { val monitorString = """ From 5b11c86236293b04bb9c209b3a6d58a9cfe51cb9 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Wed, 30 Aug 2023 14:26:55 -0700 Subject: [PATCH 2/4] add validation tests Signed-off-by: Surya Sashank Nistala --- .../alerting/model/ChainedMonitorFindings.kt | 6 +++--- .../alerting/model/CompositeInputTests.kt | 17 ++++++++++++++++- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedMonitorFindings.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedMonitorFindings.kt index c9873cdf..c5f2c472 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedMonitorFindings.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedMonitorFindings.kt @@ -24,7 +24,7 @@ data class ChainedMonitorFindings( require(!(monitorId.isNullOrBlank() && monitorIds.isEmpty())) { "at least one of fields, 'monitorIds' and 'monitorId' should be provided" } - if(monitorId!= null && monitorId.isBlank()) { + if (monitorId != null && monitorId.isBlank()) { validateId(monitorId) } else { monitorIds.forEach { validateId(it) } @@ -66,7 +66,7 @@ data class ChainedMonitorFindings( @JvmStatic @Throws(IOException::class) fun parse(xcp: XContentParser): ChainedMonitorFindings { - lateinit var monitorId: String + var monitorId: String? = null val monitorIds = mutableListOf() XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { @@ -75,8 +75,8 @@ data class ChainedMonitorFindings( when (fieldName) { MONITOR_ID_FIELD -> { + if(!xcp.currentToken().equals(XContentParser.Token.VALUE_NULL)) monitorId = xcp.text() - validateId(monitorId) } MONITOR_IDS_FIELD -> { diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/CompositeInputTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/CompositeInputTests.kt index 9680bdbe..ad0d2b24 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/CompositeInputTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/CompositeInputTests.kt @@ -70,10 +70,25 @@ class CompositeInputTests { } @Test - fun `test create Chained Findings with illegal monitorId value`() { + fun `test create Chained Findings with illegal monitorId value and empty monitorIds list`() { try { ChainedMonitorFindings("") Assertions.fail("Expecting an illegal argument exception") + } catch (e: IllegalArgumentException) { + e.message?.let { + Assertions.assertTrue( + it.contains("at least one of fields, 'monitorIds' and 'monitorId' should be provided") + + ) + } + } + } + + @Test + fun `test create Chained Findings with null monitorId value and monitorIds list with blank monitorIds`() { + try { + ChainedMonitorFindings("", listOf("", "")) + Assertions.fail("Expecting an illegal argument exception") } catch (e: IllegalArgumentException) { e.message?.let { Assertions.assertTrue( From 3f764b6dd181503c4fd1c5c9232532684fb20326 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Wed, 30 Aug 2023 14:27:29 -0700 Subject: [PATCH 3/4] fix ktlint Signed-off-by: Surya Sashank Nistala --- .../commons/alerting/model/ChainedMonitorFindings.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedMonitorFindings.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedMonitorFindings.kt index c5f2c472..92192eec 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedMonitorFindings.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedMonitorFindings.kt @@ -75,8 +75,8 @@ data class ChainedMonitorFindings( when (fieldName) { MONITOR_ID_FIELD -> { - if(!xcp.currentToken().equals(XContentParser.Token.VALUE_NULL)) - monitorId = xcp.text() + if (!xcp.currentToken().equals(XContentParser.Token.VALUE_NULL)) + monitorId = xcp.text() } MONITOR_IDS_FIELD -> { From ff2c58155614f369eb8596ea8ad1ea15ef4fc21d Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Thu, 31 Aug 2023 01:10:47 -0700 Subject: [PATCH 4/4] address review comments Signed-off-by: Surya Sashank Nistala --- .../opensearch/commons/alerting/action/IndexWorkflowRequest.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequest.kt index c58249c8..6fe9c47b 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequest.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequest.kt @@ -30,7 +30,7 @@ class IndexWorkflowRequest : ActionRequest { refreshPolicy: WriteRequest.RefreshPolicy, method: RestRequest.Method, workflow: Workflow, - rbacRoles: List? = null, + rbacRoles: List? = null ) : super() { this.workflowId = workflowId this.seqNo = seqNo