diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexMonitorAction.kt index 7925dff0a..c94afa8ae 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexMonitorAction.kt @@ -85,6 +85,7 @@ class RestIndexMonitorAction : BaseRestHandler() { ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp) val monitor = Monitor.parse(xcp, id).copy(lastUpdateTime = Instant.now()) validateDataSources(monitor) + validateOwner(monitor.owner) val monitorType = monitor.monitorType val triggers = monitor.triggers when (monitorType) { @@ -136,6 +137,12 @@ class RestIndexMonitorAction : BaseRestHandler() { } } + private fun validateOwner(owner: String?) { + if (owner != "alerting") { + throw IllegalArgumentException("Invalid owner field") + } + } + private fun indexMonitorResponse(channel: RestChannel, restMethod: RestRequest.Method): RestResponseListener { return object : RestResponseListener(channel) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestSearchMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestSearchMonitorAction.kt index 064748455..18a49de04 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestSearchMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestSearchMonitorAction.kt @@ -22,10 +22,8 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.ToXContent.EMPTY_PARAMS import org.opensearch.common.xcontent.XContentFactory.jsonBuilder import org.opensearch.common.xcontent.XContentType -import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX -import org.opensearch.index.query.QueryBuilders import org.opensearch.rest.BaseRestHandler import org.opensearch.rest.BaseRestHandler.RestChannelConsumer import org.opensearch.rest.BytesRestResponse @@ -97,14 +95,6 @@ class RestSearchMonitorAction( searchSourceBuilder.parseXContent(request.contentOrSourceParamParser()) searchSourceBuilder.fetchSource(context(request)) - val queryBuilder = QueryBuilders.boolQuery().must(searchSourceBuilder.query()) - if (index == SCHEDULED_JOBS_INDEX) { - queryBuilder.filter(QueryBuilders.existsQuery(Monitor.MONITOR_TYPE)) - } - - searchSourceBuilder.query(queryBuilder) - .seqNoAndPrimaryTerm(true) - .version(true) val searchRequest = SearchRequest() .source(searchSourceBuilder) .indices(index) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt index f97e382ff..13ed9c9cb 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt @@ -20,7 +20,12 @@ import org.opensearch.client.Client import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject import org.opensearch.common.settings.Settings +import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.authuser.User +import org.opensearch.index.query.BoolQueryBuilder +import org.opensearch.index.query.ExistsQueryBuilder +import org.opensearch.index.query.MatchQueryBuilder +import org.opensearch.index.query.QueryBuilders import org.opensearch.tasks.Task import org.opensearch.transport.TransportService @@ -43,6 +48,14 @@ class TransportSearchMonitorAction @Inject constructor( } override fun doExecute(task: Task, searchMonitorRequest: SearchMonitorRequest, actionListener: ActionListener) { + val searchSourceBuilder = searchMonitorRequest.searchRequest.source() + val queryBuilder = if (searchSourceBuilder.query() == null) BoolQueryBuilder() + else QueryBuilders.boolQuery().must(searchSourceBuilder.query()) + queryBuilder.filter(QueryBuilders.existsQuery(Monitor.MONITOR_TYPE)) + searchSourceBuilder.query(queryBuilder) + .seqNoAndPrimaryTerm(true) + .version(true) + addOwnerFieldIfNotExists(searchMonitorRequest.searchRequest) val user = readUserFromThreadContext(client) client.threadPool().threadContext.stashContext().use { resolve(searchMonitorRequest, actionListener, user) @@ -78,4 +91,16 @@ class TransportSearchMonitorAction @Inject constructor( } ) } + + private fun addOwnerFieldIfNotExists(searchRequest: SearchRequest) { + if (searchRequest.source().query() == null || searchRequest.source().query().toString().contains("monitor.owner") == false) { + var boolQueryBuilder: BoolQueryBuilder = if (searchRequest.source().query() == null) BoolQueryBuilder() + else QueryBuilders.boolQuery().must(searchRequest.source().query()) + val bqb = BoolQueryBuilder() + bqb.should().add(BoolQueryBuilder().mustNot(ExistsQueryBuilder("monitor.owner"))) + bqb.should().add(BoolQueryBuilder().must(MatchQueryBuilder("monitor.owner", "alerting"))) + boolQueryBuilder.filter(bqb) + searchRequest.source().query(boolQueryBuilder) + } + } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 4b2b52367..9ffc42b3a 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -669,4 +669,45 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { private fun Map.objectMap(key: String): Map> { return this[key] as Map> } + + fun `test execute monitor with non-null owner`() { + + val testIndex = createTestIndex() + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + + val alertCategories = AlertCategory.values() + val actionExecutionScope = PerAlertActionScope( + actionableAlerts = (1..randomInt(alertCategories.size)).map { alertCategories[it - 1] }.toSet() + ) + val actionExecutionPolicy = ActionExecutionPolicy(actionExecutionScope) + val actions = (0..randomInt(10)).map { + randomActionWithPolicy( + template = randomTemplateScript("Hello {{ctx.monitor.name}}"), + destinationId = createDestination().id, + actionExecutionPolicy = actionExecutionPolicy + ) + } + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions) + try { + createMonitor( + randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + owner = "owner" + ) + ) + fail("Expected create monitor to fail") + } catch (e: ResponseException) { + assertTrue(e.message!!.contains("illegal_argument_exception")) + } + } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index 6bed045fb..43c9dff3e 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -8,7 +8,10 @@ package org.opensearch.alerting import org.junit.Assert import org.opensearch.action.admin.cluster.state.ClusterStateRequest import org.opensearch.action.admin.indices.create.CreateIndexRequest +import org.opensearch.action.search.SearchRequest import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.action.SearchMonitorAction +import org.opensearch.alerting.action.SearchMonitorRequest import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.alerting.transport.AlertingSingleNodeTestCase import org.opensearch.common.settings.Settings @@ -21,6 +24,7 @@ import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX import org.opensearch.commons.alerting.model.Table +import org.opensearch.index.query.MatchQueryBuilder import org.opensearch.test.OpenSearchTestCase import java.time.ZonedDateTime import java.time.format.DateTimeFormatter @@ -319,6 +323,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { val updateMonitorResponse = updateMonitor( monitor.copy( id = monitorId, + owner = "security_analytics_plugin", dataSources = DataSources( alertsIndex = customAlertsIndex, queryIndex = customQueryIndex, @@ -328,6 +333,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { monitorId ) Assert.assertNotNull(updateMonitorResponse) + Assert.assertEquals(updateMonitorResponse!!.monitor.owner, "security_analytics_plugin") indexDoc(index, "2", testDoc) executeMonitorResponse = executeMonitor(updateMonitorResponse!!.monitor, monitorId, false) Assert.assertTrue(client().admin().cluster().state(ClusterStateRequest()).get().state.routingTable.hasIndex(customQueryIndex)) @@ -347,6 +353,16 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { .get() Assert.assertTrue(getAlertsResponse != null) Assert.assertTrue(getAlertsResponse.alerts.size == 1) + val searchRequest = SearchRequest(SCHEDULED_JOBS_INDEX) + var searchMonitorResponse = + client().execute(SearchMonitorAction.INSTANCE, SearchMonitorRequest(searchRequest)) + .get() + Assert.assertEquals(searchMonitorResponse.hits.hits.size, 0) + searchRequest.source().query(MatchQueryBuilder("monitor.owner", "security_analytics_plugin")) + searchMonitorResponse = + client().execute(SearchMonitorAction.INSTANCE, SearchMonitorRequest(searchRequest)) + .get() + Assert.assertEquals(searchMonitorResponse.hits.hits.size, 1) } fun `test execute GetFindingsAction with monitorId param`() { @@ -427,6 +443,47 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { } } + fun `test execute monitor with owner field`() { + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customAlertsIndex = "custom_alerts_index" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources(alertsIndex = customAlertsIndex), + owner = "owner" + ) + val monitorResponse = createMonitor(monitor) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + Assert.assertEquals(monitor.owner, "owner") + indexDoc(index, "1", testDoc) + val id = monitorResponse.id + val executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + val alerts = searchAlerts(id, customAlertsIndex) + assertEquals("Alert saved for test monitor", 1, alerts.size) + val table = Table("asc", "id", null, 1, 0, "") + var getAlertsResponse = client() + .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, customAlertsIndex)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 1) + getAlertsResponse = client() + .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", id, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 1) + } + fun `test execute GetFindingsAction with unknown findingIndex param`() { val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index 551e6d2c7..2b99b514a 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -162,12 +162,13 @@ fun randomDocumentLevelMonitor( triggers: List = (1..randomInt(10)).map { randomQueryLevelTrigger() }, enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), - withMetadata: Boolean = false + withMetadata: Boolean = false, + owner: String? = null ): Monitor { return Monitor( name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, - uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf() + uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), owner = owner ) } @@ -181,12 +182,13 @@ fun randomDocumentLevelMonitor( enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), withMetadata: Boolean = false, - dataSources: DataSources + dataSources: DataSources, + owner: String? = null ): Monitor { return Monitor( name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, - uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), dataSources = dataSources + uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), dataSources = dataSources, owner = owner ) } diff --git a/core/src/main/resources/mappings/scheduled-jobs.json b/core/src/main/resources/mappings/scheduled-jobs.json index 8254e1ebf..5ee414c0e 100644 --- a/core/src/main/resources/mappings/scheduled-jobs.json +++ b/core/src/main/resources/mappings/scheduled-jobs.json @@ -18,6 +18,15 @@ } } }, + "owner": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, "monitor_type": { "type": "keyword" },