Skip to content

Commit

Permalink
Adds support for fetching alerts from custom alert index in Get Alert…
Browse files Browse the repository at this point in the history
…s transport layer (#561)

* adds support for fetching alerts from custom alert index in Get Alerts transport layer
  • Loading branch information
eirsep authored Sep 21, 2022
1 parent 5074f7b commit 415dc82
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,29 @@ class GetAlertsRequest : ActionRequest {
val severityLevel: String
val alertState: String
val monitorId: String?
val alertIndex: String?

constructor(
table: Table,
severityLevel: String,
alertState: String,
monitorId: String?
monitorId: String?,
alertIndex: String?,
) : super() {
this.table = table
this.severityLevel = severityLevel
this.alertState = alertState
this.monitorId = monitorId
this.alertIndex = alertIndex
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
table = Table.readFrom(sin),
severityLevel = sin.readString(),
alertState = sin.readString(),
monitorId = sin.readOptionalString()
monitorId = sin.readOptionalString(),
alertIndex = sin.readOptionalString()
)

override fun validate(): ActionRequestValidationException? {
Expand All @@ -48,5 +52,6 @@ class GetAlertsRequest : ActionRequest {
out.writeString(severityLevel)
out.writeString(alertState)
out.writeOptionalString(monitorId)
out.writeOptionalString(alertIndex)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class RestGetAlertsAction : BaseRestHandler() {
searchString
)

val getAlertsRequest = GetAlertsRequest(table, severityLevel, alertState, monitorId)
val getAlertsRequest = GetAlertsRequest(table, severityLevel, alertState, monitorId, null)
return RestChannelConsumer {
channel ->
client.execute(GetAlertsAction.INSTANCE, getAlertsRequest, RestToXContentListener(channel))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

package org.opensearch.alerting.transport

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.apache.logging.log4j.LogManager
import org.opensearch.action.ActionListener
import org.opensearch.action.search.SearchRequest
Expand All @@ -14,9 +18,13 @@ import org.opensearch.action.support.HandledTransportAction
import org.opensearch.alerting.action.GetAlertsAction
import org.opensearch.alerting.action.GetAlertsRequest
import org.opensearch.alerting.action.GetAlertsResponse
import org.opensearch.alerting.action.GetMonitorAction
import org.opensearch.alerting.action.GetMonitorRequest
import org.opensearch.alerting.action.GetMonitorResponse
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.model.Alert
import org.opensearch.alerting.opensearchapi.addFilter
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AlertingException
import org.opensearch.client.Client
Expand All @@ -32,22 +40,26 @@ import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.authuser.User
import org.opensearch.index.query.Operator
import org.opensearch.index.query.QueryBuilders
import org.opensearch.rest.RestRequest
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.fetch.subphase.FetchSourceContext
import org.opensearch.search.sort.SortBuilders
import org.opensearch.search.sort.SortOrder
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.io.IOException

private val log = LogManager.getLogger(TransportGetAlertsAction::class.java)
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)

class TransportGetAlertsAction @Inject constructor(
transportService: TransportService,
val client: Client,
clusterService: ClusterService,
actionFilters: ActionFilters,
val settings: Settings,
val xContentRegistry: NamedXContentRegistry
val xContentRegistry: NamedXContentRegistry,
val transportGetMonitorAction: TransportGetMonitorAction
) : HandledTransportAction<GetAlertsRequest, GetAlertsResponse>(
GetAlertsAction.NAME, transportService, actionFilters, ::GetAlertsRequest
),
Expand Down Expand Up @@ -85,7 +97,6 @@ class TransportGetAlertsAction @Inject constructor(
if (getAlertsRequest.monitorId != null) {
queryBuilder.filter(QueryBuilders.termQuery("monitor_id", getAlertsRequest.monitorId))
}

if (!tableProp.searchString.isNullOrBlank()) {
queryBuilder
.must(
Expand All @@ -105,37 +116,78 @@ class TransportGetAlertsAction @Inject constructor(
.from(tableProp.startIndex)

client.threadPool().threadContext.stashContext().use {
resolve(searchSourceBuilder, actionListener, user)
scope.launch {
try {
val alertIndex = resolveAlertsIndexName(getAlertsRequest)
getAlerts(alertIndex, searchSourceBuilder, actionListener, user)
} catch (t: Exception) {
log.error("Failed to get alerts", t)
if (t is AlertingException) {
actionListener.onFailure(t)
} else {
actionListener.onFailure(AlertingException.wrap(t))
}
}
}
}
}

fun resolve(
/** Precedence order for resolving alert index to be queried:
1. alertIndex param.
2. alert index mentioned in monitor data sources.
3. Default alert indices pattern
*/
suspend fun resolveAlertsIndexName(getAlertsRequest: GetAlertsRequest): String {
var alertIndex = AlertIndices.ALL_ALERT_INDEX_PATTERN
if (getAlertsRequest.alertIndex.isNullOrEmpty() == false) {
alertIndex = getAlertsRequest.alertIndex
} else if (getAlertsRequest.monitorId.isNullOrEmpty() == false)
withContext(Dispatchers.IO) {
val getMonitorRequest = GetMonitorRequest(
getAlertsRequest.monitorId,
-3L,
RestRequest.Method.GET,
FetchSourceContext.FETCH_SOURCE
)
val getMonitorResponse: GetMonitorResponse =
transportGetMonitorAction.client.suspendUntil {
execute(GetMonitorAction.INSTANCE, getMonitorRequest, it)
}
if (getMonitorResponse.monitor != null) {
alertIndex = getMonitorResponse.monitor!!.dataSources.alertsIndex
}
}
return alertIndex
}

fun getAlerts(
alertIndex: String,
searchSourceBuilder: SearchSourceBuilder,
actionListener: ActionListener<GetAlertsResponse>,
user: User?
) {
// user is null when: 1/ security is disabled. 2/when user is super-admin.
if (user == null) {
// user is null when: 1/ security is disabled. 2/when user is super-admin.
search(searchSourceBuilder, actionListener)
search(alertIndex, searchSourceBuilder, actionListener)
} else if (!doFilterForUser(user)) {
// security is enabled and filterby is disabled.
search(searchSourceBuilder, actionListener)
search(alertIndex, searchSourceBuilder, actionListener)
} else {
// security is enabled and filterby is enabled.
try {
log.info("Filtering result by: ${user.backendRoles}")
addFilter(user, searchSourceBuilder, "monitor_user.backend_roles.keyword")
search(searchSourceBuilder, actionListener)
search(alertIndex, searchSourceBuilder, actionListener)
} catch (ex: IOException) {
actionListener.onFailure(AlertingException.wrap(ex))
}
}
}

fun search(searchSourceBuilder: SearchSourceBuilder, actionListener: ActionListener<GetAlertsResponse>) {
fun search(alertIndex: String, searchSourceBuilder: SearchSourceBuilder, actionListener: ActionListener<GetAlertsResponse>) {
val searchRequest = SearchRequest()
.indices(AlertIndices.ALL_ALERT_INDEX_PATTERN)
.indices(alertIndex)
.source(searchSourceBuilder)

client.search(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ package org.opensearch.alerting
import org.junit.Assert
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.alerting.action.GetAlertsAction
import org.opensearch.alerting.action.GetAlertsRequest
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.model.DocLevelMonitorInput
import org.opensearch.alerting.core.model.DocLevelQuery
import org.opensearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import org.opensearch.alerting.model.DataSources
import org.opensearch.alerting.model.Table
import org.opensearch.alerting.transport.AlertingSingleNodeTestCase
import org.opensearch.common.settings.Settings
import java.time.ZonedDateTime
Expand Down Expand Up @@ -44,6 +47,25 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
searchAlerts(id)
val table = Table("asc", "id", null, 1, 0, "")
var getAlertsResponse = client()
.execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", null, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 0)
getAlertsResponse = client()
.execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", id, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 0)
try {
client()
.execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", null, "wrong_alert_index"))
.get()
fail()
} catch (e: Exception) {
Assert.assertTrue(e.message!!.contains("IndexNotFoundException"))
}
}

fun `test execute monitor with custom alerts index`() {
Expand Down Expand Up @@ -72,6 +94,17 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
val alerts = searchAlerts(id, customAlertsIndex)
assertEquals("Alert saved for test monitor", 1, alerts.size)
val table = Table("asc", "id", null, 1, 0, "")
var getAlertsResponse = client()
.execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", null, customAlertsIndex))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
getAlertsResponse = client()
.execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", id, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
}

fun `test execute monitor with custom query index`() {
Expand Down Expand Up @@ -99,6 +132,17 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
searchAlerts(id)
val table = Table("asc", "id", null, 1, 0, "")
var getAlertsResponse = client()
.execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", null, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
getAlertsResponse = client()
.execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", id, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
}

fun `test execute monitor with custom query index and custom field mappings`() {
Expand Down Expand Up @@ -133,6 +177,17 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
val clusterStateResponse = client().admin().cluster().state(ClusterStateRequest().indices(customQueryIndex).metadata(true)).get()
val mapping = clusterStateResponse.state.metadata.index(customQueryIndex).mapping()
Assert.assertTrue(mapping?.source()?.string()?.contains("\"analyzer\":\"$analyzer\"") == true)
val table = Table("asc", "id", null, 1, 0, "")
var getAlertsResponse = client()
.execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", null, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
getAlertsResponse = client()
.execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", id, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
}

fun `test execute monitor with custom findings index`() {
Expand Down Expand Up @@ -163,6 +218,17 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
val findings = searchFindings(id, customFindingsIndex)
assertEquals("Findings saved for test monitor", 1, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
val table = Table("asc", "id", null, 1, 0, "")
var getAlertsResponse = client()
.execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", null, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
getAlertsResponse = client()
.execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", id, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
}

fun `test execute pre-existing monitorand update`() {
Expand Down Expand Up @@ -269,5 +335,16 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("2"))
val customAlertsIndexAlerts = searchAlerts(monitorId, customAlertsIndex)
assertEquals("Alert saved for test monitor", 1, customAlertsIndexAlerts.size)
val table = Table("asc", "id", null, 1, 0, "")
var getAlertsResponse = client()
.execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", null, customAlertsIndex))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
getAlertsResponse = client()
.execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", monitorId, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class GetAlertsRequestTests : OpenSearchTestCase() {

val table = Table("asc", "sortString", null, 1, 0, "")

val req = GetAlertsRequest(table, "1", "active", null)
val req = GetAlertsRequest(table, "1", "active", null, null)
assertNotNull(req)

val out = BytesStreamOutput()
Expand All @@ -33,7 +33,7 @@ class GetAlertsRequestTests : OpenSearchTestCase() {
fun `test get alerts request with filter`() {

val table = Table("asc", "sortString", null, 1, 0, "")
val req = GetAlertsRequest(table, "1", "active", null)
val req = GetAlertsRequest(table, "1", "active", null, null)
assertNotNull(req)

val out = BytesStreamOutput()
Expand All @@ -50,7 +50,7 @@ class GetAlertsRequestTests : OpenSearchTestCase() {
fun `test validate returns null`() {
val table = Table("asc", "sortString", null, 1, 0, "")

val req = GetAlertsRequest(table, "1", "active", null)
val req = GetAlertsRequest(table, "1", "active", null, null)
assertNotNull(req)
assertNull(req.validate())
}
Expand Down

0 comments on commit 415dc82

Please sign in to comment.