Skip to content

Commit

Permalink
fix for max & min aggregations when no metric property exist (opensea…
Browse files Browse the repository at this point in the history
…rch-project#870)

Signed-off-by: Subhobrata Dey <[email protected]>
Signed-off-by: Tanqiu Liu <[email protected]>
  • Loading branch information
sbcd90 authored and tanqiuliu committed Sep 25, 2023
1 parent 8918a79 commit 60fc6f2
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 3 deletions.
4 changes: 3 additions & 1 deletion detekt.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ complexity:
LongMethod:
excludes: ['**/test/**']
LongParameterList:
excludes: ['**/test/**']
excludes: ['**/test/**']
NestedBlockDepth:
threshold: 5
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,9 @@ class RollupIndexer(
it.aggregations.forEach {
when (it) {
is InternalSum -> aggResults[it.name] = it.value
is InternalMax -> aggResults[it.name] = it.value
is InternalMin -> aggResults[it.name] = it.value
// TODO: Need to redo the logic in corresponding doXContentBody of InternalMax and InternalMin
is InternalMax -> if (it.value.isInfinite()) aggResults[it.name] = null else aggResults[it.name] = it.value
is InternalMin -> if (it.value.isInfinite()) aggResults[it.name] = null else aggResults[it.name] = it.value
is InternalValueCount -> aggResults[it.name] = it.value
is InternalAvg -> aggResults[it.name] = it.value
else -> error("Found aggregation in composite result that is not supported [${it.type} - ${it.name}]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
insertSampleBulkData(index, javaClass.classLoader.getResource("data/nyc_5000.ndjson").readText())
}

protected fun generateMessageLogsData(index: String = "message-logs") {
createIndex(index, Settings.EMPTY, """"properties": {"message":{"properties":{"bytes_in":{"type":"long"},"bytes_out":{"type":"long"},"plugin":{"eager_global_ordinals":true,"ignore_above":10000,"type":"keyword"},"timestamp_received":{"type":"date"}}}}""")
insertSampleBulkData(index, javaClass.classLoader.getResource("data/message_logs.ndjson").readText())
}

@Suppress("UNCHECKED_CAST")
protected fun extractFailuresFromSearchResponse(searchResponse: Response): List<Map<String, String>?>? {
val shards = searchResponse.asMap()["_shards"] as Map<String, ArrayList<Map<String, Any>>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1319,6 +1319,78 @@ class RollupRunnerIT : RollupRestTestCase() {
}
}

@Suppress("UNCHECKED_CAST")
fun `test rollup with max metric when metric property not present`() {
val sourceIdxTestName = "source_idx_test_max"
val targetIdxTestName = "target_idx_test_max"
val propertyName = "message.bytes_in"
val maxMetricName = "min_message_bytes_in"

generateMessageLogsData(sourceIdxTestName)
val rollup = Rollup(
id = "rollup_test_max",
schemaVersion = 1L,
enabled = true,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobLastUpdatedTime = Instant.now(),
jobEnabledTime = Instant.now(),
description = "basic stats test",
sourceIndex = sourceIdxTestName,
targetIndex = targetIdxTestName,
metadataID = null,
roles = emptyList(),
pageSize = 100,
delay = 0,
continuous = false,
dimensions = listOf(
DateHistogram(sourceField = "message.timestamp_received", targetField = "message.timestamp_received", fixedInterval = "10m"),
Terms("message.plugin", "message.plugin")
),
metrics = listOf(
RollupMetrics(sourceField = propertyName, targetField = propertyName, metrics = listOf(Max()))
)
).let { createRollup(it, it.id) }

updateRollupStartTime(rollup)

waitFor { assertTrue("Target rollup index was not created", indexExists(rollup.targetIndex)) }

waitFor {
val rollupJob = getRollup(rollupId = rollup.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)

// Term query
val req = """
{
"size": 0,
"query": {
"match_all": {}
},
"aggs": {
"$maxMetricName": {
"max": {
"field": "$propertyName"
}
}
}
}
""".trimIndent()
var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/$sourceIdxTestName/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rawRes.restStatus() == RestStatus.OK)
var rollupRes = client().makeRequest(RestRequest.Method.POST.name, "/$targetIdxTestName/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rollupRes.restStatus() == RestStatus.OK)
var rawAggRes = rawRes.asMap()["aggregations"] as Map<String, Map<String, Any>>
var rollupAggRes = rollupRes.asMap()["aggregations"] as Map<String, Map<String, Any>>
assertEquals(
"Source and rollup index did not return same max results",
rawAggRes.getValue(maxMetricName)["value"],
rollupAggRes.getValue(maxMetricName)["value"]
)
}
}

// TODO: Test scenarios:
// - Source index deleted after first execution
// * If this is with a source index pattern and the underlying indices are recreated but with different data
Expand Down
4 changes: 4 additions & 0 deletions src/test/resources/data/message_logs.ndjson
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"create":{}}
{"message":{"bytes_out":4256,"plugin":"AlienVault NIDS","timestamp_received":1689786716020}}
{"create":{}}
{"message":{"bytes_out":4526,"plugin":"AlienVault NIDS","timestamp_received":1689886716020}}

0 comments on commit 60fc6f2

Please sign in to comment.