Skip to content

Commit

Permalink
shards assignment to local Node when fanout flag is disabled (#1749)
Browse files Browse the repository at this point in the history
* shards assignment to local Node when fanout flag is disabled

Signed-off-by: Riya Saxena <[email protected]>

* shards assignment to local Node when fanout flag is disabled

Signed-off-by: Riya Saxena <[email protected]>

* shards assignment to local Node when fanout flag is disabled

Signed-off-by: Riya Saxena <[email protected]>

* shards assignment to local Node when fanout flag is disabled

Signed-off-by: Riya Saxena <[email protected]>

* tests fix

Signed-off-by: Riya Saxena <[email protected]>

* tests fix

Signed-off-by: Riya Saxena <[email protected]>

---------

Signed-off-by: Riya Saxena <[email protected]>
  • Loading branch information
riysaxen-amzn authored and sbcd90 committed Dec 18, 2024
1 parent e8c356d commit dc7cb11
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,19 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
shards.remove("index")
shards.remove("shards_count")

val nodeMap = getNodes(monitorCtx)
/**
* if fanout flag is disabled and force assign all shards to local node
* thus effectively making the fan-out a single node operation.
* This is done to avoid de-dupe Alerts generated by Aggregation Sigma Rules
**/
val localNode = monitorCtx.clusterService!!.localNode()
val nodeMap: Map<String, DiscoveryNode> = if (docLevelMonitorInput?.fanoutEnabled == true) {
getNodes(monitorCtx)
} else {
logger.info("Fan-out is disabled for chained findings monitor ${monitor.id}")
mapOf(localNode.id to localNode)
}

val nodeShardAssignments = distributeShards(
monitorCtx,
nodeMap.keys.toList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2791,6 +2791,68 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
}
}

fun `test document-level monitor fanout disabled approach when aliases contain indices with multiple shards`() {
val aliasName = "test-alias"
createIndexAlias(
aliasName,
"""
"properties" : {
"test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" },
"test_field" : { "type" : "keyword" },
"number" : { "type" : "keyword" }
}
""".trimIndent(),
"\"index.number_of_shards\": 7"
)

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(aliasName), listOf(docQuery), false)

val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
val monitor = createMonitor(
randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))),
enabled = true,
schedule = IntervalSchedule(1, ChronoUnit.MINUTES)
)
)

val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"@timestamp": "$testTime",
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""
indexDoc(aliasName, "1", testDoc)
indexDoc(aliasName, "2", testDoc)
indexDoc(aliasName, "4", testDoc)
indexDoc(aliasName, "5", testDoc)
indexDoc(aliasName, "6", testDoc)
indexDoc(aliasName, "7", testDoc)
OpenSearchTestCase.waitUntil(
{ searchFindings(monitor).size == 6 && searchAlertsWithFilter(monitor).size == 1 },
2,
TimeUnit.MINUTES
)

rolloverDatastream(aliasName)
indexDoc(aliasName, "11", testDoc)
indexDoc(aliasName, "12", testDoc)
indexDoc(aliasName, "14", testDoc)
indexDoc(aliasName, "15", testDoc)
indexDoc(aliasName, "16", testDoc)
indexDoc(aliasName, "17", testDoc)
OpenSearchTestCase.waitUntil(
{ searchFindings(monitor).size == 6 && searchAlertsWithFilter(monitor).size == 1 },
2,
TimeUnit.MINUTES
)

deleteDataStream(aliasName)
}

fun `test execute monitor generates alerts and findings with renewable locks`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void onFailure(Exception e) {
};
} else if (runMonitorParam.equals("multiple")) {
SampleRemoteMonitorInput2 input2 = new SampleRemoteMonitorInput2("hello",
new DocLevelMonitorInput("test", List.of("test"), List.of(new DocLevelQuery("query", "query", List.of(), "test:1", List.of()))));
new DocLevelMonitorInput("test", List.of("test"), List.of(new DocLevelQuery("query", "query", List.of(), "test:1", List.of())), true));
BytesStreamOutput out1 = new BytesStreamOutput();
input2.writeTo(out1);
BytesReference input1Serialized1 = out1.bytes();
Expand Down Expand Up @@ -220,7 +220,7 @@ public void onFailure(Exception e) {
sampleRemoteDocLevelMonitorInput.writeTo(out2);
BytesReference sampleRemoteDocLevelMonitorInputSerialized = out2.bytes();

DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput("description", index, emptyList());
DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput("description", index, emptyList(), true);
RemoteDocLevelMonitorInput remoteDocLevelMonitorInput = new RemoteDocLevelMonitorInput(sampleRemoteDocLevelMonitorInputSerialized, docLevelMonitorInput);

Monitor remoteDocLevelMonitor = new Monitor(
Expand Down

0 comments on commit dc7cb11

Please sign in to comment.