Skip to content

Commit

Permalink
GetFindingsAction new optional parameters (#563)
Browse files Browse the repository at this point in the history
* added monitorId and findingsIndexName optinal params to GetFindingsAction

Signed-off-by: Petar Dzepina <[email protected]>
  • Loading branch information
petardz authored Sep 23, 2022
1 parent 415dc82 commit 4ca0fb0
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,27 @@ import java.io.IOException
class GetFindingsRequest : ActionRequest {
val findingId: String?
val table: Table
val monitorId: String?
val findingIndex: String?

constructor(
findingId: String?,
table: Table
table: Table,
monitorId: String? = null,
findingIndexName: String? = null
) : super() {
this.findingId = findingId
this.table = table
this.monitorId = monitorId
this.findingIndex = findingIndexName
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
findingId = sin.readOptionalString(),
table = Table.readFrom(sin)
table = Table.readFrom(sin),
monitorId = sin.readOptionalString(),
findingIndexName = sin.readOptionalString()
)

override fun validate(): ActionRequestValidationException? {
Expand All @@ -38,5 +46,7 @@ class GetFindingsRequest : ActionRequest {
override fun writeTo(out: StreamOutput) {
out.writeOptionalString(findingId)
table.writeTo(out)
out.writeOptionalString(monitorId)
out.writeOptionalString(findingIndex)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.alerting.transport
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.apache.logging.log4j.LogManager
import org.apache.lucene.search.join.ScoreMode
import org.opensearch.action.ActionListener
Expand All @@ -20,6 +21,9 @@ import org.opensearch.action.support.HandledTransportAction
import org.opensearch.alerting.action.GetFindingsAction
import org.opensearch.alerting.action.GetFindingsRequest
import org.opensearch.alerting.action.GetFindingsResponse
import org.opensearch.alerting.action.GetMonitorAction
import org.opensearch.alerting.action.GetMonitorRequest
import org.opensearch.alerting.action.GetMonitorResponse
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_FINDING_INDEX_PATTERN
import org.opensearch.alerting.model.Finding
import org.opensearch.alerting.model.FindingDocument
Expand All @@ -40,6 +44,7 @@ import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.common.xcontent.XContentType
import org.opensearch.index.query.Operator
import org.opensearch.index.query.QueryBuilders
import org.opensearch.rest.RestRequest
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.fetch.subphase.FetchSourceContext
import org.opensearch.search.sort.SortBuilders
Expand Down Expand Up @@ -122,19 +127,48 @@ class TransportGetFindingsSearchAction @Inject constructor(
client.threadPool().threadContext.stashContext().use {
scope.launch {
try {
val getFindingsResponse = search(searchSourceBuilder)
val indexName = resolveFindingsIndexName(getFindingsRequest)
val getFindingsResponse = search(searchSourceBuilder, indexName)
actionListener.onResponse(getFindingsResponse)
} catch (t: AlertingException) {
actionListener.onFailure(t)
} catch (t: Exception) {
actionListener.onFailure(AlertingException.wrap(t))
}
}
}
}

suspend fun search(searchSourceBuilder: SearchSourceBuilder): GetFindingsResponse {
suspend fun resolveFindingsIndexName(findingsRequest: GetFindingsRequest): String {
var indexName = ALL_FINDING_INDEX_PATTERN

if (findingsRequest.findingIndex.isNullOrEmpty() == false) {
// findingIndex has highest priority, so use that if available
indexName = findingsRequest.findingIndex
} else if (findingsRequest.monitorId.isNullOrEmpty() == false) {
// second best is monitorId.
// We will use it to fetch monitor and then read indexName from dataSources field of monitor
withContext(Dispatchers.IO) {
val getMonitorRequest = GetMonitorRequest(
findingsRequest.monitorId,
-3L,
RestRequest.Method.GET,
FetchSourceContext.FETCH_SOURCE
)
val getMonitorResponse: GetMonitorResponse =
this@TransportGetFindingsSearchAction.client.suspendUntil {
execute(GetMonitorAction.INSTANCE, getMonitorRequest, it)
}
indexName = getMonitorResponse.monitor?.dataSources?.findingsIndex ?: ALL_FINDING_INDEX_PATTERN
}
}
return indexName
}

suspend fun search(searchSourceBuilder: SearchSourceBuilder, indexName: String): GetFindingsResponse {
val searchRequest = SearchRequest()
.source(searchSourceBuilder)
.indices(ALL_FINDING_INDEX_PATTERN)
.indices(indexName)
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
val totalFindingCount = searchResponse.hits.totalHits?.value?.toInt()
val mgetRequest = MultiGetRequest()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,162 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
}

fun `test execute GetFindingsAction with monitorId param`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customFindingsIndex = "custom_findings_index"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(findingsIndex = customFindingsIndex)
)
val monitorResponse = createMonitor(monitor)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
indexDoc(index, "1", testDoc)
val monitorId = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, monitorId, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
searchAlerts(monitorId)
val findings = searchFindings(monitorId, customFindingsIndex)
assertEquals("Findings saved for test monitor", 1, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
// fetch findings - pass monitorId as reference to finding_index
val findingsFromAPI = getFindings(findings.get(0).id, monitorId, null)
assertEquals(
"Findings mismatch between manually searched and fetched via GetFindingsAction",
findings.get(0).id,
findingsFromAPI.get(0).id
)
}

fun `test execute GetFindingsAction with params monitorId and findingIndex`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customFindingsIndex = "custom_findings_index"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(findingsIndex = customFindingsIndex)
)
val monitorResponse = createMonitor(monitor)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
indexDoc(index, "1", testDoc)
val monitorId = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, monitorId, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
searchAlerts(monitorId)
val findings = searchFindings(monitorId, customFindingsIndex)
assertEquals("Findings saved for test monitor", 1, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
// fetch findings - pass both monitorId and findingIndexName name. Monitor id should be ignored
val findingsFromAPI = getFindings(findings.get(0).id, "incorrect_monitor_id", customFindingsIndex)
assertEquals(
"Findings mismatch between manually searched and fetched via GetFindingsAction",
findings.get(0).id,
findingsFromAPI.get(0).id
)
}

fun `test execute GetFindingsAction with unknown monitorId`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customFindingsIndex = "custom_findings_index"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(findingsIndex = customFindingsIndex)
)
val monitorResponse = createMonitor(monitor)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
indexDoc(index, "1", testDoc)
val monitorId = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, monitorId, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
searchAlerts(monitorId)
val findings = searchFindings(monitorId, customFindingsIndex)
assertEquals("Findings saved for test monitor", 1, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
// fetch findings - don't send monitorId or findingIndexName. It should fall back to hardcoded finding index name
try {
getFindings(findings.get(0).id, "unknown_monitor_id_123456789", null)
} catch (e: Exception) {
e.message?.let {
assertTrue(
"Exception not returning GetMonitor Action error ",
it.contains("Monitor not found")
)
}
}
}

fun `test execute GetFindingsAction with unknown findingIndex param`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customFindingsIndex = "custom_findings_index"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(findingsIndex = customFindingsIndex)
)
val monitorResponse = createMonitor(monitor)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
indexDoc(index, "1", testDoc)
val monitorId = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, monitorId, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
searchAlerts(monitorId)
val findings = searchFindings(monitorId, customFindingsIndex)
assertEquals("Findings saved for test monitor", 1, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
// fetch findings - don't send monitorId or findingIndexName. It should fall back to hardcoded finding index name
try {
getFindings(findings.get(0).id, null, "unknown_finding_index_123456789")
} catch (e: Exception) {
e.message?.let {
assertTrue(
"Exception not returning GetMonitor Action error ",
it.contains("no such index")
)
}
}
}

fun `test execute pre-existing monitorand update`() {
val request = CreateIndexRequest(SCHEDULED_JOBS_INDEX).mapping(ScheduledJobIndices.scheduledJobMappings())
.settings(Settings.builder().put("index.hidden", true).build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,19 @@ class TriggerServiceTests : OpenSearchTestCase() {
val trigger = randomBucketLevelTrigger(bucketSelector = bucketSelectorExtAggregationBuilder)
val monitor = randomBucketLevelMonitor(triggers = listOf(trigger))

val inputResultsStr = "{\"_shards\":{\"total\":1,\"failed\":0,\"successful\":1,\"skipped\":0},\"hits\":{\"hits\":[{\"_index\":\"sample-http-responses\",\"_type\":\"http\",\"_source\":{\"status_code\":100,\"http_4xx\":0,\"http_3xx\":0,\"http_5xx\":0,\"http_2xx\":0,\"timestamp\":100000,\"http_1xx\":1},\"_id\":1,\"_score\":1}],\"total\":{\"value\":4,\"relation\":\"eq\"},\"max_score\":1},\"took\":37,\"timed_out\":false,\"aggregations\":{\"status_code\":{\"doc_count_error_upper_bound\":0,\"sum_other_doc_count\":0,\"buckets\":[{\"doc_count\":2,\"key\":100},{\"doc_count\":1,\"key\":102},{\"doc_count\":1,\"key\":201}]},\"${trigger.id}\":{\"parent_bucket_path\":\"status_code\",\"bucket_indices\":[0,1,2]}}}"

val parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, inputResultsStr)
val inputResultsStr = "{\"_shards\":" +
"{\"total\":1,\"failed\":0,\"successful\":1,\"skipped\":0},\"hits\":{\"hits\":" +
"[{\"_index\":\"sample-http-responses\",\"_type\":\"http\",\"_source\":" +
"{\"status_code\":100,\"http_4xx\":0,\"http_3xx\":0,\"http_5xx\":0,\"http_2xx\":0," +
"\"timestamp\":100000,\"http_1xx\":1},\"_id\":1,\"_score\":1}],\"total\":{\"value\":4,\"relation\":\"eq\"}," +
"\"max_score\":1},\"took\":37,\"timed_out\":false,\"aggregations\":{\"status_code\":" +
"{\"doc_count_error_upper_bound\":0,\"sum_other_doc_count\":0,\"buckets\":[{\"doc_count\":2,\"key\":100}," +
"{\"doc_count\":1,\"key\":102},{\"doc_count\":1,\"key\":201}]},\"${trigger.id}\":{\"parent_bucket_path\":" +
"\"status_code\",\"bucket_indices\":[0,1,2]}}}"

val parser = XContentType.JSON.xContent().createParser(
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, inputResultsStr
)

val inputResults = parser.map()

Expand All @@ -60,9 +70,23 @@ class TriggerServiceTests : OpenSearchTestCase() {
val trigger = randomBucketLevelTrigger(bucketSelector = bucketSelectorExtAggregationBuilder)
val monitor = randomBucketLevelMonitor(triggers = listOf(trigger))

val inputResultsStr = "{\"_shards\":{\"total\":1, \"failed\":0, \"successful\":1, \"skipped\":0}, \"hits\":{\"hits\":[{\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":100, \"http_4xx\":0, \"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":0, \"timestamp\":100000, \"http_1xx\":1}, \"_id\":1, \"_score\":1.0}, {\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":102, \"http_4xx\":0, \"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":0, \"timestamp\":160000, \"http_1xx\":1}, \"_id\":2, \"_score\":1.0}, {\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":100, \"http_4xx\":0, \"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":0, \"timestamp\":220000, \"http_1xx\":1}, \"_id\":4, \"_score\":1.0}, {\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":201, \"http_4xx\":0, \"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":1, \"timestamp\":280000, \"http_1xx\":0}, \"_id\":5, \"_score\":1.0}], \"total\":{\"value\":4, \"relation\":\"eq\"}, \"max_score\":1.0}, \"took\":15, \"timed_out\":false, \"aggregations\":{\"${trigger.id}\":{\"parent_bucket_path\":\"status_code\", \"bucket_indices\":[0, 1, 2]}, \"status_code\":{\"buckets\":[{\"doc_count\":2, \"key\":{\"status_code\":100}}, {\"doc_count\":1, \"key\":{\"status_code\":102}}, {\"doc_count\":1, \"key\":{\"status_code\":201}}], \"after_key\":{\"status_code\":201}}}}"

val parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, inputResultsStr)
val inputResultsStr = "{\"_shards\":{\"total\":1, \"failed\":0, \"successful\":1, \"skipped\":0}, \"hits\":{\"hits\":" +
"[{\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":100, \"http_4xx\":0," +
" \"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":0, \"timestamp\":100000, \"http_1xx\":1}, \"_id\":1, \"_score\":1.0}, " +
"{\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":102, \"http_4xx\":0, " +
"\"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":0, \"timestamp\":160000, \"http_1xx\":1}, \"_id\":2, \"_score\":1.0}, " +
"{\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":100, \"http_4xx\":0, " +
"\"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":0, \"timestamp\":220000, \"http_1xx\":1}, \"_id\":4, \"_score\":1.0}, " +
"{\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":201, \"http_4xx\":0, " +
"\"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":1, \"timestamp\":280000, \"http_1xx\":0}, \"_id\":5, \"_score\":1.0}]," +
" \"total\":{\"value\":4, \"relation\":\"eq\"}, \"max_score\":1.0}, \"took\":15, \"timed_out\":false, \"aggregations\":" +
"{\"${trigger.id}\":{\"parent_bucket_path\":\"status_code\", \"bucket_indices\":[0, 1, 2]}, \"status_code\":{\"buckets\":" +
"[{\"doc_count\":2, \"key\":{\"status_code\":100}}, {\"doc_count\":1, \"key\":{\"status_code\":102}}, {\"doc_count\":1," +
" \"key\":{\"status_code\":201}}], \"after_key\":{\"status_code\":201}}}}"

val parser = XContentType.JSON.xContent().createParser(
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, inputResultsStr
)

val inputResults = parser.map()

Expand Down
Loading

0 comments on commit 4ca0fb0

Please sign in to comment.