From 415dc821f1280bf64525e137087286dbc219bcc9 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Tue, 20 Sep 2022 18:19:32 -0700 Subject: [PATCH] Adds support for fetching alerts from custom alert index in Get Alerts transport layer (#561) * adds support for fetching alerts from custom alert index in Get Alerts transport layer --- .../alerting/action/GetAlertsRequest.kt | 9 ++- .../resthandler/RestGetAlertsAction.kt | 2 +- .../transport/TransportGetAlertsAction.kt | 70 ++++++++++++++--- .../alerting/MonitorDataSourcesIT.kt | 77 +++++++++++++++++++ .../alerting/action/GetAlertsRequestTests.kt | 6 +- 5 files changed, 149 insertions(+), 15 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/GetAlertsRequest.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetAlertsRequest.kt index 967942353..a6f77190b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/action/GetAlertsRequest.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetAlertsRequest.kt @@ -17,17 +17,20 @@ class GetAlertsRequest : ActionRequest { val severityLevel: String val alertState: String val monitorId: String? + val alertIndex: String? constructor( table: Table, severityLevel: String, alertState: String, - monitorId: String? + monitorId: String?, + alertIndex: String?, ) : super() { this.table = table this.severityLevel = severityLevel this.alertState = alertState this.monitorId = monitorId + this.alertIndex = alertIndex } @Throws(IOException::class) @@ -35,7 +38,8 @@ class GetAlertsRequest : ActionRequest { table = Table.readFrom(sin), severityLevel = sin.readString(), alertState = sin.readString(), - monitorId = sin.readOptionalString() + monitorId = sin.readOptionalString(), + alertIndex = sin.readOptionalString() ) override fun validate(): ActionRequestValidationException? { @@ -48,5 +52,6 @@ class GetAlertsRequest : ActionRequest { out.writeString(severityLevel) out.writeString(alertState) out.writeOptionalString(monitorId) + out.writeOptionalString(alertIndex) } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetAlertsAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetAlertsAction.kt index a189c3f52..3503a8e86 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetAlertsAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetAlertsAction.kt @@ -66,7 +66,7 @@ class RestGetAlertsAction : BaseRestHandler() { searchString ) - val getAlertsRequest = GetAlertsRequest(table, severityLevel, alertState, monitorId) + val getAlertsRequest = GetAlertsRequest(table, severityLevel, alertState, monitorId, null) return RestChannelConsumer { channel -> client.execute(GetAlertsAction.INSTANCE, getAlertsRequest, RestToXContentListener(channel)) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetAlertsAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetAlertsAction.kt index 9c25cb1aa..c4ea894f4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetAlertsAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetAlertsAction.kt @@ -5,6 +5,10 @@ package org.opensearch.alerting.transport +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext import org.apache.logging.log4j.LogManager import org.opensearch.action.ActionListener import org.opensearch.action.search.SearchRequest @@ -14,9 +18,13 @@ import org.opensearch.action.support.HandledTransportAction import org.opensearch.alerting.action.GetAlertsAction import org.opensearch.alerting.action.GetAlertsRequest import org.opensearch.alerting.action.GetAlertsResponse +import org.opensearch.alerting.action.GetMonitorAction +import org.opensearch.alerting.action.GetMonitorRequest +import org.opensearch.alerting.action.GetMonitorResponse import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.model.Alert import org.opensearch.alerting.opensearchapi.addFilter +import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException import org.opensearch.client.Client @@ -32,7 +40,9 @@ import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.authuser.User import org.opensearch.index.query.Operator import org.opensearch.index.query.QueryBuilders +import org.opensearch.rest.RestRequest import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.search.fetch.subphase.FetchSourceContext import org.opensearch.search.sort.SortBuilders import org.opensearch.search.sort.SortOrder import org.opensearch.tasks.Task @@ -40,6 +50,7 @@ import org.opensearch.transport.TransportService import java.io.IOException private val log = LogManager.getLogger(TransportGetAlertsAction::class.java) +private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) class TransportGetAlertsAction @Inject constructor( transportService: TransportService, @@ -47,7 +58,8 @@ class TransportGetAlertsAction @Inject constructor( clusterService: ClusterService, actionFilters: ActionFilters, val settings: Settings, - val xContentRegistry: NamedXContentRegistry + val xContentRegistry: NamedXContentRegistry, + val transportGetMonitorAction: TransportGetMonitorAction ) : HandledTransportAction( GetAlertsAction.NAME, transportService, actionFilters, ::GetAlertsRequest ), @@ -85,7 +97,6 @@ class TransportGetAlertsAction @Inject constructor( if (getAlertsRequest.monitorId != null) { queryBuilder.filter(QueryBuilders.termQuery("monitor_id", getAlertsRequest.monitorId)) } - if (!tableProp.searchString.isNullOrBlank()) { queryBuilder .must( @@ -105,11 +116,52 @@ class TransportGetAlertsAction @Inject constructor( .from(tableProp.startIndex) client.threadPool().threadContext.stashContext().use { - resolve(searchSourceBuilder, actionListener, user) + scope.launch { + try { + val alertIndex = resolveAlertsIndexName(getAlertsRequest) + getAlerts(alertIndex, searchSourceBuilder, actionListener, user) + } catch (t: Exception) { + log.error("Failed to get alerts", t) + if (t is AlertingException) { + actionListener.onFailure(t) + } else { + actionListener.onFailure(AlertingException.wrap(t)) + } + } + } } } - fun resolve( + /** Precedence order for resolving alert index to be queried: + 1. alertIndex param. + 2. alert index mentioned in monitor data sources. + 3. Default alert indices pattern + */ + suspend fun resolveAlertsIndexName(getAlertsRequest: GetAlertsRequest): String { + var alertIndex = AlertIndices.ALL_ALERT_INDEX_PATTERN + if (getAlertsRequest.alertIndex.isNullOrEmpty() == false) { + alertIndex = getAlertsRequest.alertIndex + } else if (getAlertsRequest.monitorId.isNullOrEmpty() == false) + withContext(Dispatchers.IO) { + val getMonitorRequest = GetMonitorRequest( + getAlertsRequest.monitorId, + -3L, + RestRequest.Method.GET, + FetchSourceContext.FETCH_SOURCE + ) + val getMonitorResponse: GetMonitorResponse = + transportGetMonitorAction.client.suspendUntil { + execute(GetMonitorAction.INSTANCE, getMonitorRequest, it) + } + if (getMonitorResponse.monitor != null) { + alertIndex = getMonitorResponse.monitor!!.dataSources.alertsIndex + } + } + return alertIndex + } + + fun getAlerts( + alertIndex: String, searchSourceBuilder: SearchSourceBuilder, actionListener: ActionListener, user: User? @@ -117,25 +169,25 @@ class TransportGetAlertsAction @Inject constructor( // user is null when: 1/ security is disabled. 2/when user is super-admin. if (user == null) { // user is null when: 1/ security is disabled. 2/when user is super-admin. - search(searchSourceBuilder, actionListener) + search(alertIndex, searchSourceBuilder, actionListener) } else if (!doFilterForUser(user)) { // security is enabled and filterby is disabled. - search(searchSourceBuilder, actionListener) + search(alertIndex, searchSourceBuilder, actionListener) } else { // security is enabled and filterby is enabled. try { log.info("Filtering result by: ${user.backendRoles}") addFilter(user, searchSourceBuilder, "monitor_user.backend_roles.keyword") - search(searchSourceBuilder, actionListener) + search(alertIndex, searchSourceBuilder, actionListener) } catch (ex: IOException) { actionListener.onFailure(AlertingException.wrap(ex)) } } } - fun search(searchSourceBuilder: SearchSourceBuilder, actionListener: ActionListener) { + fun search(alertIndex: String, searchSourceBuilder: SearchSourceBuilder, actionListener: ActionListener) { val searchRequest = SearchRequest() - .indices(AlertIndices.ALL_ALERT_INDEX_PATTERN) + .indices(alertIndex) .source(searchSourceBuilder) client.search( diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index 669374450..14b0beb3b 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -8,11 +8,14 @@ package org.opensearch.alerting import org.junit.Assert import org.opensearch.action.admin.cluster.state.ClusterStateRequest import org.opensearch.action.admin.indices.create.CreateIndexRequest +import org.opensearch.alerting.action.GetAlertsAction +import org.opensearch.alerting.action.GetAlertsRequest import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.alerting.core.model.DocLevelMonitorInput import org.opensearch.alerting.core.model.DocLevelQuery import org.opensearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX import org.opensearch.alerting.model.DataSources +import org.opensearch.alerting.model.Table import org.opensearch.alerting.transport.AlertingSingleNodeTestCase import org.opensearch.common.settings.Settings import java.time.ZonedDateTime @@ -44,6 +47,25 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) searchAlerts(id) + val table = Table("asc", "id", null, 1, 0, "") + var getAlertsResponse = client() + .execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", null, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 0) + getAlertsResponse = client() + .execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", id, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 0) + try { + client() + .execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", null, "wrong_alert_index")) + .get() + fail() + } catch (e: Exception) { + Assert.assertTrue(e.message!!.contains("IndexNotFoundException")) + } } fun `test execute monitor with custom alerts index`() { @@ -72,6 +94,17 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) val alerts = searchAlerts(id, customAlertsIndex) assertEquals("Alert saved for test monitor", 1, alerts.size) + val table = Table("asc", "id", null, 1, 0, "") + var getAlertsResponse = client() + .execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", null, customAlertsIndex)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 1) + getAlertsResponse = client() + .execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", id, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 1) } fun `test execute monitor with custom query index`() { @@ -99,6 +132,17 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) searchAlerts(id) + val table = Table("asc", "id", null, 1, 0, "") + var getAlertsResponse = client() + .execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", null, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 1) + getAlertsResponse = client() + .execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", id, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 1) } fun `test execute monitor with custom query index and custom field mappings`() { @@ -133,6 +177,17 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { val clusterStateResponse = client().admin().cluster().state(ClusterStateRequest().indices(customQueryIndex).metadata(true)).get() val mapping = clusterStateResponse.state.metadata.index(customQueryIndex).mapping() Assert.assertTrue(mapping?.source()?.string()?.contains("\"analyzer\":\"$analyzer\"") == true) + val table = Table("asc", "id", null, 1, 0, "") + var getAlertsResponse = client() + .execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", null, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 1) + getAlertsResponse = client() + .execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", id, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 1) } fun `test execute monitor with custom findings index`() { @@ -163,6 +218,17 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { val findings = searchFindings(id, customFindingsIndex) assertEquals("Findings saved for test monitor", 1, findings.size) assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) + val table = Table("asc", "id", null, 1, 0, "") + var getAlertsResponse = client() + .execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", null, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 1) + getAlertsResponse = client() + .execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", id, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 1) } fun `test execute pre-existing monitorand update`() { @@ -269,5 +335,16 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("2")) val customAlertsIndexAlerts = searchAlerts(monitorId, customAlertsIndex) assertEquals("Alert saved for test monitor", 1, customAlertsIndexAlerts.size) + val table = Table("asc", "id", null, 1, 0, "") + var getAlertsResponse = client() + .execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", null, customAlertsIndex)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 1) + getAlertsResponse = client() + .execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", monitorId, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 1) } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/action/GetAlertsRequestTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/action/GetAlertsRequestTests.kt index 3b8934638..166637457 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/action/GetAlertsRequestTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/action/GetAlertsRequestTests.kt @@ -16,7 +16,7 @@ class GetAlertsRequestTests : OpenSearchTestCase() { val table = Table("asc", "sortString", null, 1, 0, "") - val req = GetAlertsRequest(table, "1", "active", null) + val req = GetAlertsRequest(table, "1", "active", null, null) assertNotNull(req) val out = BytesStreamOutput() @@ -33,7 +33,7 @@ class GetAlertsRequestTests : OpenSearchTestCase() { fun `test get alerts request with filter`() { val table = Table("asc", "sortString", null, 1, 0, "") - val req = GetAlertsRequest(table, "1", "active", null) + val req = GetAlertsRequest(table, "1", "active", null, null) assertNotNull(req) val out = BytesStreamOutput() @@ -50,7 +50,7 @@ class GetAlertsRequestTests : OpenSearchTestCase() { fun `test validate returns null`() { val table = Table("asc", "sortString", null, 1, 0, "") - val req = GetAlertsRequest(table, "1", "active", null) + val req = GetAlertsRequest(table, "1", "active", null, null) assertNotNull(req) assertNull(req.validate()) }