-
Notifications
You must be signed in to change notification settings - Fork 104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GetFindingsAction new optional parameters #563
Changes from 2 commits
d7fc542
af07d62
8684ff1
6f9e0c1
7d79dfe
8d1cadf
1e54b0c
54a6427
fd3f7ae
1975242
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ package org.opensearch.alerting.transport | |
import kotlinx.coroutines.CoroutineScope | ||
import kotlinx.coroutines.Dispatchers | ||
import kotlinx.coroutines.launch | ||
import kotlinx.coroutines.withContext | ||
import org.apache.logging.log4j.LogManager | ||
import org.apache.lucene.search.join.ScoreMode | ||
import org.opensearch.action.ActionListener | ||
|
@@ -20,6 +21,9 @@ import org.opensearch.action.support.HandledTransportAction | |
import org.opensearch.alerting.action.GetFindingsAction | ||
import org.opensearch.alerting.action.GetFindingsRequest | ||
import org.opensearch.alerting.action.GetFindingsResponse | ||
import org.opensearch.alerting.action.GetMonitorAction | ||
import org.opensearch.alerting.action.GetMonitorRequest | ||
import org.opensearch.alerting.action.GetMonitorResponse | ||
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_FINDING_INDEX_PATTERN | ||
import org.opensearch.alerting.model.Finding | ||
import org.opensearch.alerting.model.FindingDocument | ||
|
@@ -40,6 +44,7 @@ import org.opensearch.common.xcontent.XContentParserUtils | |
import org.opensearch.common.xcontent.XContentType | ||
import org.opensearch.index.query.Operator | ||
import org.opensearch.index.query.QueryBuilders | ||
import org.opensearch.rest.RestRequest | ||
import org.opensearch.search.builder.SearchSourceBuilder | ||
import org.opensearch.search.fetch.subphase.FetchSourceContext | ||
import org.opensearch.search.sort.SortBuilders | ||
|
@@ -122,7 +127,8 @@ class TransportGetFindingsSearchAction @Inject constructor( | |
client.threadPool().threadContext.stashContext().use { | ||
scope.launch { | ||
try { | ||
val getFindingsResponse = search(searchSourceBuilder) | ||
val indexName = resolveFindingsIndexName(getFindingsRequest) | ||
val getFindingsResponse = search(searchSourceBuilder, indexName) | ||
actionListener.onResponse(getFindingsResponse) | ||
} catch (t: Exception) { | ||
actionListener.onFailure(AlertingException.wrap(t)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if get monitor fails we would already receive the alerting exception would we need to do something like
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added another catch for AlertingException |
||
|
@@ -131,10 +137,34 @@ class TransportGetFindingsSearchAction @Inject constructor( | |
} | ||
} | ||
|
||
suspend fun search(searchSourceBuilder: SearchSourceBuilder): GetFindingsResponse { | ||
suspend fun resolveFindingsIndexName(findingsRequest: GetFindingsRequest): String { | ||
var indexName = "" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should start with the default one here, and override based on the availability of the other information being present. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
|
||
if (findingsRequest.findingIndexName.isNullOrEmpty() == false) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: let's call its There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
// findingIndexName has highest priority, so use that if available | ||
indexName = findingsRequest.findingIndexName | ||
} else if (findingsRequest.monitorId != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's do an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
// second best is monitorId. | ||
// We will use it to fetch monitor and then read indexName from dataSources field of monitor | ||
withContext(Dispatchers.IO) { | ||
val getMonitorRequest = GetMonitorRequest(findingsRequest.monitorId, -3L, RestRequest.Method.GET, null) | ||
val getMonitorResponse: GetMonitorResponse = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should catch exceptiton if GetmonitorAction fails and fail the get findings action overall? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be happening on line 134 right now. I'll add IT for this |
||
[email protected] { | ||
execute(GetMonitorAction.INSTANCE, getMonitorRequest, it) | ||
} | ||
Comment on lines
+158
to
+161
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How do we handle the failure scenarios here? We would want to log and fallback to the default maybe? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exception is caught on line 134 and sent back to caller. Hmm falling back to default one might lead to spurious behavior. It would be better if user knows that GetMonitor Action failed and is retryable then to get back NOT_FOUND without knowing if the desired finding index was searched or not |
||
indexName = getMonitorResponse.monitor?.dataSources?.findingsIndex ?: ALL_FINDING_INDEX_PATTERN | ||
} | ||
} else { | ||
// default is ALL_FINDING_INDEX_PATTERN for bwc and when these 2 parameters are not passed | ||
indexName = ALL_FINDING_INDEX_PATTERN | ||
} | ||
return indexName | ||
} | ||
|
||
suspend fun search(searchSourceBuilder: SearchSourceBuilder, indexName: String): GetFindingsResponse { | ||
val searchRequest = SearchRequest() | ||
.source(searchSourceBuilder) | ||
.indices(ALL_FINDING_INDEX_PATTERN) | ||
.indices(indexName) | ||
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } | ||
val totalFindingCount = searchResponse.hits.totalHits?.value?.toInt() | ||
val mgetRequest = MultiGetRequest() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -165,6 +165,57 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { | |
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) | ||
} | ||
|
||
fun `test execute monitor with custom findings index and GetFindingsAction`() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is not incorrect but instead could we add to existing test methods in this test class where custom findings are being verified There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") | ||
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) | ||
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) | ||
val customFindingsIndex = "custom_findings_index" | ||
var monitor = randomDocumentLevelMonitor( | ||
inputs = listOf(docLevelInput), | ||
triggers = listOf(trigger), | ||
dataSources = DataSources(findingsIndex = customFindingsIndex) | ||
) | ||
val monitorResponse = createMonitor(monitor) | ||
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) | ||
val testDoc = """{ | ||
"message" : "This is an error from IAD region", | ||
"test_strict_date_time" : "$testTime", | ||
"test_field" : "us-west-2" | ||
}""" | ||
assertFalse(monitorResponse?.id.isNullOrEmpty()) | ||
monitor = monitorResponse!!.monitor | ||
indexDoc(index, "1", testDoc) | ||
val monitorId = monitorResponse.id | ||
val executeMonitorResponse = executeMonitor(monitor, monitorId, false) | ||
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) | ||
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) | ||
searchAlerts(monitorId) | ||
val findings = searchFindings(monitorId, customFindingsIndex) | ||
assertEquals("Findings saved for test monitor", 1, findings.size) | ||
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) | ||
// fetch findings - pass monitorId as reference to finding_index | ||
val findingsFromAPI1 = getFindings(findings.get(0).id, monitorId, null) | ||
assertEquals( | ||
"Findings mismatch between manually searched and fetched via GetFindingsAction", | ||
findings.get(0).id, | ||
findingsFromAPI1.get(0).id | ||
) | ||
// fetch findings - pass both monitorId and findingIndexName name. Monitor id should be ignored | ||
val findingsFromAPI2 = getFindings(findings.get(0).id, "incorrect_monitor_id", customFindingsIndex) | ||
assertEquals( | ||
"Findings mismatch between manually searched and fetched via GetFindingsAction", | ||
findings.get(0).id, | ||
findingsFromAPI2.get(0).id | ||
) | ||
// fetch findings - don't send monitorId or findingIndexName. It should fall back to hardcoded finding index name | ||
val findingsFromAPI3 = getFindings(findings.get(0).id, null, null) | ||
assertEquals( | ||
"Unexpected result when fetching findings from default finding index", | ||
0, | ||
findingsFromAPI3.size | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should break these into individual findings scenarios - also to include the failure mode tests such as fallback if the name is present while the index is not? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done and added more tests |
||
) | ||
} | ||
|
||
fun `test execute pre-existing monitorand update`() { | ||
val request = CreateIndexRequest(SCHEDULED_JOBS_INDEX).mapping(ScheduledJobIndices.scheduledJobMappings()) | ||
.settings(Settings.builder().put("index.hidden", true).build()) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,9 +37,19 @@ class TriggerServiceTests : OpenSearchTestCase() { | |
val trigger = randomBucketLevelTrigger(bucketSelector = bucketSelectorExtAggregationBuilder) | ||
val monitor = randomBucketLevelMonitor(triggers = listOf(trigger)) | ||
|
||
val inputResultsStr = "{\"_shards\":{\"total\":1,\"failed\":0,\"successful\":1,\"skipped\":0},\"hits\":{\"hits\":[{\"_index\":\"sample-http-responses\",\"_type\":\"http\",\"_source\":{\"status_code\":100,\"http_4xx\":0,\"http_3xx\":0,\"http_5xx\":0,\"http_2xx\":0,\"timestamp\":100000,\"http_1xx\":1},\"_id\":1,\"_score\":1}],\"total\":{\"value\":4,\"relation\":\"eq\"},\"max_score\":1},\"took\":37,\"timed_out\":false,\"aggregations\":{\"status_code\":{\"doc_count_error_upper_bound\":0,\"sum_other_doc_count\":0,\"buckets\":[{\"doc_count\":2,\"key\":100},{\"doc_count\":1,\"key\":102},{\"doc_count\":1,\"key\":201}]},\"${trigger.id}\":{\"parent_bucket_path\":\"status_code\",\"bucket_indices\":[0,1,2]}}}" | ||
|
||
val parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, inputResultsStr) | ||
val inputResultsStr = "{\"_shards\":" + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thank you for doing this :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. np! |
||
"{\"total\":1,\"failed\":0,\"successful\":1,\"skipped\":0},\"hits\":{\"hits\":" + | ||
"[{\"_index\":\"sample-http-responses\",\"_type\":\"http\",\"_source\":" + | ||
"{\"status_code\":100,\"http_4xx\":0,\"http_3xx\":0,\"http_5xx\":0,\"http_2xx\":0," + | ||
"\"timestamp\":100000,\"http_1xx\":1},\"_id\":1,\"_score\":1}],\"total\":{\"value\":4,\"relation\":\"eq\"}," + | ||
"\"max_score\":1},\"took\":37,\"timed_out\":false,\"aggregations\":{\"status_code\":" + | ||
"{\"doc_count_error_upper_bound\":0,\"sum_other_doc_count\":0,\"buckets\":[{\"doc_count\":2,\"key\":100}," + | ||
"{\"doc_count\":1,\"key\":102},{\"doc_count\":1,\"key\":201}]},\"${trigger.id}\":{\"parent_bucket_path\":" + | ||
"\"status_code\",\"bucket_indices\":[0,1,2]}}}" | ||
|
||
val parser = XContentType.JSON.xContent().createParser( | ||
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, inputResultsStr | ||
) | ||
|
||
val inputResults = parser.map() | ||
|
||
|
@@ -60,9 +70,23 @@ class TriggerServiceTests : OpenSearchTestCase() { | |
val trigger = randomBucketLevelTrigger(bucketSelector = bucketSelectorExtAggregationBuilder) | ||
val monitor = randomBucketLevelMonitor(triggers = listOf(trigger)) | ||
|
||
val inputResultsStr = "{\"_shards\":{\"total\":1, \"failed\":0, \"successful\":1, \"skipped\":0}, \"hits\":{\"hits\":[{\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":100, \"http_4xx\":0, \"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":0, \"timestamp\":100000, \"http_1xx\":1}, \"_id\":1, \"_score\":1.0}, {\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":102, \"http_4xx\":0, \"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":0, \"timestamp\":160000, \"http_1xx\":1}, \"_id\":2, \"_score\":1.0}, {\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":100, \"http_4xx\":0, \"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":0, \"timestamp\":220000, \"http_1xx\":1}, \"_id\":4, \"_score\":1.0}, {\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":201, \"http_4xx\":0, \"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":1, \"timestamp\":280000, \"http_1xx\":0}, \"_id\":5, \"_score\":1.0}], \"total\":{\"value\":4, \"relation\":\"eq\"}, \"max_score\":1.0}, \"took\":15, \"timed_out\":false, \"aggregations\":{\"${trigger.id}\":{\"parent_bucket_path\":\"status_code\", \"bucket_indices\":[0, 1, 2]}, \"status_code\":{\"buckets\":[{\"doc_count\":2, \"key\":{\"status_code\":100}}, {\"doc_count\":1, \"key\":{\"status_code\":102}}, {\"doc_count\":1, \"key\":{\"status_code\":201}}], \"after_key\":{\"status_code\":201}}}}" | ||
|
||
val parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, inputResultsStr) | ||
val inputResultsStr = "{\"_shards\":{\"total\":1, \"failed\":0, \"successful\":1, \"skipped\":0}, \"hits\":{\"hits\":" + | ||
"[{\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":100, \"http_4xx\":0," + | ||
" \"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":0, \"timestamp\":100000, \"http_1xx\":1}, \"_id\":1, \"_score\":1.0}, " + | ||
"{\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":102, \"http_4xx\":0, " + | ||
"\"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":0, \"timestamp\":160000, \"http_1xx\":1}, \"_id\":2, \"_score\":1.0}, " + | ||
"{\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":100, \"http_4xx\":0, " + | ||
"\"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":0, \"timestamp\":220000, \"http_1xx\":1}, \"_id\":4, \"_score\":1.0}, " + | ||
"{\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":201, \"http_4xx\":0, " + | ||
"\"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":1, \"timestamp\":280000, \"http_1xx\":0}, \"_id\":5, \"_score\":1.0}]," + | ||
" \"total\":{\"value\":4, \"relation\":\"eq\"}, \"max_score\":1.0}, \"took\":15, \"timed_out\":false, \"aggregations\":" + | ||
"{\"${trigger.id}\":{\"parent_bucket_path\":\"status_code\", \"bucket_indices\":[0, 1, 2]}, \"status_code\":{\"buckets\":" + | ||
"[{\"doc_count\":2, \"key\":{\"status_code\":100}}, {\"doc_count\":1, \"key\":{\"status_code\":102}}, {\"doc_count\":1," + | ||
" \"key\":{\"status_code\":201}}], \"after_key\":{\"status_code\":201}}}}" | ||
|
||
val parser = XContentType.JSON.xContent().createParser( | ||
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, inputResultsStr | ||
) | ||
|
||
val inputResults = parser.map() | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.alerting.action | ||
|
||
import org.opensearch.alerting.model.Table | ||
import org.opensearch.common.io.stream.BytesStreamOutput | ||
import org.opensearch.common.io.stream.StreamInput | ||
import org.opensearch.test.OpenSearchTestCase | ||
|
||
class GetFindingsRequestTests : OpenSearchTestCase() { | ||
|
||
fun `test get findings request`() { | ||
|
||
val table = Table("asc", "sortString", null, 1, 0, "") | ||
|
||
val req = GetFindingsRequest("2121", table, "1", "finding_index_name") | ||
assertNotNull(req) | ||
|
||
val out = BytesStreamOutput() | ||
req.writeTo(out) | ||
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) | ||
val newReq = GetFindingsRequest(sin) | ||
|
||
assertEquals("1", newReq.monitorId) | ||
assertEquals("2121", newReq.findingId) | ||
assertEquals("finding_index_name", newReq.findingIndexName) | ||
assertEquals(table, newReq.table) | ||
} | ||
|
||
fun `test validate returns null`() { | ||
val table = Table("asc", "sortString", null, 1, 0, "") | ||
|
||
val req = GetFindingsRequest("2121", table, "1", "active") | ||
assertNotNull(req) | ||
assertNull(req.validate()) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Custom data sources are not exposed in rest layer. Let's limit the changes to transport layer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alerting Rest level action should not be impact with this change, as SAP will introduce its own rest layer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done