Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GetFindingsAction new optional parameters #563

Merged
merged 10 commits into from
Sep 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,27 @@ import java.io.IOException
class GetFindingsRequest : ActionRequest {
val findingId: String?
val table: Table
val monitorId: String?
val findingIndex: String?

constructor(
findingId: String?,
table: Table
table: Table,
monitorId: String? = null,
findingIndexName: String? = null
) : super() {
this.findingId = findingId
this.table = table
this.monitorId = monitorId
this.findingIndex = findingIndexName
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
findingId = sin.readOptionalString(),
table = Table.readFrom(sin)
table = Table.readFrom(sin),
monitorId = sin.readOptionalString(),
findingIndexName = sin.readOptionalString()
)

override fun validate(): ActionRequestValidationException? {
Expand All @@ -38,5 +46,7 @@ class GetFindingsRequest : ActionRequest {
override fun writeTo(out: StreamOutput) {
out.writeOptionalString(findingId)
table.writeTo(out)
out.writeOptionalString(monitorId)
out.writeOptionalString(findingIndex)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.alerting.transport
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.apache.logging.log4j.LogManager
import org.apache.lucene.search.join.ScoreMode
import org.opensearch.action.ActionListener
Expand All @@ -20,6 +21,9 @@ import org.opensearch.action.support.HandledTransportAction
import org.opensearch.alerting.action.GetFindingsAction
import org.opensearch.alerting.action.GetFindingsRequest
import org.opensearch.alerting.action.GetFindingsResponse
import org.opensearch.alerting.action.GetMonitorAction
import org.opensearch.alerting.action.GetMonitorRequest
import org.opensearch.alerting.action.GetMonitorResponse
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_FINDING_INDEX_PATTERN
import org.opensearch.alerting.model.Finding
import org.opensearch.alerting.model.FindingDocument
Expand All @@ -40,6 +44,7 @@ import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.common.xcontent.XContentType
import org.opensearch.index.query.Operator
import org.opensearch.index.query.QueryBuilders
import org.opensearch.rest.RestRequest
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.fetch.subphase.FetchSourceContext
import org.opensearch.search.sort.SortBuilders
Expand Down Expand Up @@ -122,19 +127,48 @@ class TransportGetFindingsSearchAction @Inject constructor(
client.threadPool().threadContext.stashContext().use {
scope.launch {
try {
val getFindingsResponse = search(searchSourceBuilder)
val indexName = resolveFindingsIndexName(getFindingsRequest)
val getFindingsResponse = search(searchSourceBuilder, indexName)
actionListener.onResponse(getFindingsResponse)
} catch (t: AlertingException) {
actionListener.onFailure(t)
} catch (t: Exception) {
actionListener.onFailure(AlertingException.wrap(t))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if get monitor fails we would already receive the alerting exception

would we need to do something like



if (t is AlertingException) {
    actionListener.onFailure(t)
} else {
      actionListener.onFailure(AlertingException.wrap(t))
 }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added another catch for AlertingException

}
}
}
}

suspend fun search(searchSourceBuilder: SearchSourceBuilder): GetFindingsResponse {
suspend fun resolveFindingsIndexName(findingsRequest: GetFindingsRequest): String {
var indexName = ALL_FINDING_INDEX_PATTERN

if (findingsRequest.findingIndex.isNullOrEmpty() == false) {
// findingIndex has highest priority, so use that if available
indexName = findingsRequest.findingIndex
} else if (findingsRequest.monitorId.isNullOrEmpty() == false) {
// second best is monitorId.
// We will use it to fetch monitor and then read indexName from dataSources field of monitor
withContext(Dispatchers.IO) {
val getMonitorRequest = GetMonitorRequest(
findingsRequest.monitorId,
-3L,
RestRequest.Method.GET,
FetchSourceContext.FETCH_SOURCE
)
val getMonitorResponse: GetMonitorResponse =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should catch exceptiton if GetmonitorAction fails and fail the get findings action overall?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be happening on line 134 right now. I'll add IT for this

[email protected] {
execute(GetMonitorAction.INSTANCE, getMonitorRequest, it)
}
Comment on lines +158 to +161
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we handle the failure scenarios here? We would want to log and fallback to the default maybe?
Also moving the fetch config information to a common utility might be a good way to use across. such as for #561

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception is caught on line 134 and sent back to caller.

Hmm falling back to default one might lead to spurious behavior. It would be better if user knows that GetMonitor Action failed and is retryable then to get back NOT_FOUND without knowing if the desired finding index was searched or not

indexName = getMonitorResponse.monitor?.dataSources?.findingsIndex ?: ALL_FINDING_INDEX_PATTERN
}
}
return indexName
}

suspend fun search(searchSourceBuilder: SearchSourceBuilder, indexName: String): GetFindingsResponse {
val searchRequest = SearchRequest()
.source(searchSourceBuilder)
.indices(ALL_FINDING_INDEX_PATTERN)
.indices(indexName)
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
val totalFindingCount = searchResponse.hits.totalHits?.value?.toInt()
val mgetRequest = MultiGetRequest()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,162 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
}

fun `test execute GetFindingsAction with monitorId param`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customFindingsIndex = "custom_findings_index"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(findingsIndex = customFindingsIndex)
)
val monitorResponse = createMonitor(monitor)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
indexDoc(index, "1", testDoc)
val monitorId = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, monitorId, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
searchAlerts(monitorId)
val findings = searchFindings(monitorId, customFindingsIndex)
assertEquals("Findings saved for test monitor", 1, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
// fetch findings - pass monitorId as reference to finding_index
val findingsFromAPI = getFindings(findings.get(0).id, monitorId, null)
assertEquals(
"Findings mismatch between manually searched and fetched via GetFindingsAction",
findings.get(0).id,
findingsFromAPI.get(0).id
)
}

fun `test execute GetFindingsAction with params monitorId and findingIndex`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customFindingsIndex = "custom_findings_index"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(findingsIndex = customFindingsIndex)
)
val monitorResponse = createMonitor(monitor)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
indexDoc(index, "1", testDoc)
val monitorId = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, monitorId, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
searchAlerts(monitorId)
val findings = searchFindings(monitorId, customFindingsIndex)
assertEquals("Findings saved for test monitor", 1, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
// fetch findings - pass both monitorId and findingIndexName name. Monitor id should be ignored
val findingsFromAPI = getFindings(findings.get(0).id, "incorrect_monitor_id", customFindingsIndex)
assertEquals(
"Findings mismatch between manually searched and fetched via GetFindingsAction",
findings.get(0).id,
findingsFromAPI.get(0).id
)
}

fun `test execute GetFindingsAction with unknown monitorId`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customFindingsIndex = "custom_findings_index"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(findingsIndex = customFindingsIndex)
)
val monitorResponse = createMonitor(monitor)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
indexDoc(index, "1", testDoc)
val monitorId = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, monitorId, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
searchAlerts(monitorId)
val findings = searchFindings(monitorId, customFindingsIndex)
assertEquals("Findings saved for test monitor", 1, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
// fetch findings - don't send monitorId or findingIndexName. It should fall back to hardcoded finding index name
try {
getFindings(findings.get(0).id, "unknown_monitor_id_123456789", null)
} catch (e: Exception) {
e.message?.let {
assertTrue(
"Exception not returning GetMonitor Action error ",
it.contains("Monitor not found")
)
}
}
}

fun `test execute GetFindingsAction with unknown findingIndex param`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customFindingsIndex = "custom_findings_index"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(findingsIndex = customFindingsIndex)
)
val monitorResponse = createMonitor(monitor)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
indexDoc(index, "1", testDoc)
val monitorId = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, monitorId, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
searchAlerts(monitorId)
val findings = searchFindings(monitorId, customFindingsIndex)
assertEquals("Findings saved for test monitor", 1, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
// fetch findings - don't send monitorId or findingIndexName. It should fall back to hardcoded finding index name
try {
getFindings(findings.get(0).id, null, "unknown_finding_index_123456789")
} catch (e: Exception) {
e.message?.let {
assertTrue(
"Exception not returning GetMonitor Action error ",
it.contains("no such index")
)
}
}
}

fun `test execute pre-existing monitorand update`() {
val request = CreateIndexRequest(SCHEDULED_JOBS_INDEX).mapping(ScheduledJobIndices.scheduledJobMappings())
.settings(Settings.builder().put("index.hidden", true).build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,19 @@ class TriggerServiceTests : OpenSearchTestCase() {
val trigger = randomBucketLevelTrigger(bucketSelector = bucketSelectorExtAggregationBuilder)
val monitor = randomBucketLevelMonitor(triggers = listOf(trigger))

val inputResultsStr = "{\"_shards\":{\"total\":1,\"failed\":0,\"successful\":1,\"skipped\":0},\"hits\":{\"hits\":[{\"_index\":\"sample-http-responses\",\"_type\":\"http\",\"_source\":{\"status_code\":100,\"http_4xx\":0,\"http_3xx\":0,\"http_5xx\":0,\"http_2xx\":0,\"timestamp\":100000,\"http_1xx\":1},\"_id\":1,\"_score\":1}],\"total\":{\"value\":4,\"relation\":\"eq\"},\"max_score\":1},\"took\":37,\"timed_out\":false,\"aggregations\":{\"status_code\":{\"doc_count_error_upper_bound\":0,\"sum_other_doc_count\":0,\"buckets\":[{\"doc_count\":2,\"key\":100},{\"doc_count\":1,\"key\":102},{\"doc_count\":1,\"key\":201}]},\"${trigger.id}\":{\"parent_bucket_path\":\"status_code\",\"bucket_indices\":[0,1,2]}}}"

val parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, inputResultsStr)
val inputResultsStr = "{\"_shards\":" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for doing this :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

np!

"{\"total\":1,\"failed\":0,\"successful\":1,\"skipped\":0},\"hits\":{\"hits\":" +
"[{\"_index\":\"sample-http-responses\",\"_type\":\"http\",\"_source\":" +
"{\"status_code\":100,\"http_4xx\":0,\"http_3xx\":0,\"http_5xx\":0,\"http_2xx\":0," +
"\"timestamp\":100000,\"http_1xx\":1},\"_id\":1,\"_score\":1}],\"total\":{\"value\":4,\"relation\":\"eq\"}," +
"\"max_score\":1},\"took\":37,\"timed_out\":false,\"aggregations\":{\"status_code\":" +
"{\"doc_count_error_upper_bound\":0,\"sum_other_doc_count\":0,\"buckets\":[{\"doc_count\":2,\"key\":100}," +
"{\"doc_count\":1,\"key\":102},{\"doc_count\":1,\"key\":201}]},\"${trigger.id}\":{\"parent_bucket_path\":" +
"\"status_code\",\"bucket_indices\":[0,1,2]}}}"

val parser = XContentType.JSON.xContent().createParser(
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, inputResultsStr
)

val inputResults = parser.map()

Expand All @@ -60,9 +70,23 @@ class TriggerServiceTests : OpenSearchTestCase() {
val trigger = randomBucketLevelTrigger(bucketSelector = bucketSelectorExtAggregationBuilder)
val monitor = randomBucketLevelMonitor(triggers = listOf(trigger))

val inputResultsStr = "{\"_shards\":{\"total\":1, \"failed\":0, \"successful\":1, \"skipped\":0}, \"hits\":{\"hits\":[{\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":100, \"http_4xx\":0, \"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":0, \"timestamp\":100000, \"http_1xx\":1}, \"_id\":1, \"_score\":1.0}, {\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":102, \"http_4xx\":0, \"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":0, \"timestamp\":160000, \"http_1xx\":1}, \"_id\":2, \"_score\":1.0}, {\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":100, \"http_4xx\":0, \"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":0, \"timestamp\":220000, \"http_1xx\":1}, \"_id\":4, \"_score\":1.0}, {\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":201, \"http_4xx\":0, \"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":1, \"timestamp\":280000, \"http_1xx\":0}, \"_id\":5, \"_score\":1.0}], \"total\":{\"value\":4, \"relation\":\"eq\"}, \"max_score\":1.0}, \"took\":15, \"timed_out\":false, \"aggregations\":{\"${trigger.id}\":{\"parent_bucket_path\":\"status_code\", \"bucket_indices\":[0, 1, 2]}, \"status_code\":{\"buckets\":[{\"doc_count\":2, \"key\":{\"status_code\":100}}, {\"doc_count\":1, \"key\":{\"status_code\":102}}, {\"doc_count\":1, \"key\":{\"status_code\":201}}], \"after_key\":{\"status_code\":201}}}}"

val parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, inputResultsStr)
val inputResultsStr = "{\"_shards\":{\"total\":1, \"failed\":0, \"successful\":1, \"skipped\":0}, \"hits\":{\"hits\":" +
"[{\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":100, \"http_4xx\":0," +
" \"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":0, \"timestamp\":100000, \"http_1xx\":1}, \"_id\":1, \"_score\":1.0}, " +
"{\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":102, \"http_4xx\":0, " +
"\"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":0, \"timestamp\":160000, \"http_1xx\":1}, \"_id\":2, \"_score\":1.0}, " +
"{\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":100, \"http_4xx\":0, " +
"\"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":0, \"timestamp\":220000, \"http_1xx\":1}, \"_id\":4, \"_score\":1.0}, " +
"{\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":201, \"http_4xx\":0, " +
"\"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":1, \"timestamp\":280000, \"http_1xx\":0}, \"_id\":5, \"_score\":1.0}]," +
" \"total\":{\"value\":4, \"relation\":\"eq\"}, \"max_score\":1.0}, \"took\":15, \"timed_out\":false, \"aggregations\":" +
"{\"${trigger.id}\":{\"parent_bucket_path\":\"status_code\", \"bucket_indices\":[0, 1, 2]}, \"status_code\":{\"buckets\":" +
"[{\"doc_count\":2, \"key\":{\"status_code\":100}}, {\"doc_count\":1, \"key\":{\"status_code\":102}}, {\"doc_count\":1," +
" \"key\":{\"status_code\":201}}], \"after_key\":{\"status_code\":201}}}}"

val parser = XContentType.JSON.xContent().createParser(
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, inputResultsStr
)

val inputResults = parser.map()

Expand Down
Loading