Skip to content

Commit

Permalink
fix: handle empty aggregation period durations (PTOS) (#1006)
Browse files Browse the repository at this point in the history
  • Loading branch information
bobeal authored Sep 20, 2023
1 parent 3e9fbb2 commit aa14f46
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,8 @@ data class TemporalEntitiesQuery(
val withTemporalValues: Boolean,
val withAudit: Boolean,
val withAggregatedValues: Boolean
)
) {
fun isAggregatedWithDefinedDuration(): Boolean =
withAggregatedValues &&
(temporalQuery.aggrPeriodDuration != null && temporalQuery.aggrPeriodDuration != "PT0S")
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package com.egm.stellio.search.model

import java.time.ZonedDateTime

const val WHOLE_TIME_RANGE_DURATION = "PT0S"

data class TemporalQuery(
val timerel: Timerel? = null,
val timeAt: ZonedDateTime? = null,
Expand Down Expand Up @@ -29,10 +31,10 @@ data class TemporalQuery(

companion object {
fun isSupportedAggregate(method: String): Boolean =
values().any { it.method == method }
entries.any { it.method == method }

fun forMethod(method: String): Aggregate? =
values().find { it.method == method }
entries.find { it.method == method }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,10 @@ class ScopeService(
null -> Unit
}

if (temporalEntitiesQuery.withAggregatedValues)
if (temporalEntitiesQuery.isAggregatedWithDefinedDuration())
sqlQueryBuilder.append(" GROUP BY entity_id, origin")
else if (temporalEntitiesQuery.withAggregatedValues)
sqlQueryBuilder.append(" GROUP BY entity_id")
else if (temporalQuery.lastN != null)
// in order to get last instances, need to order by time desc
// final ascending ordering of instances is done in query service
Expand All @@ -137,15 +139,19 @@ class ScopeService(
): String = when {
temporalEntitiesQuery.withAggregatedValues -> {
val temporalQuery = temporalEntitiesQuery.temporalQuery
val allAggregates = temporalQuery.aggrMethods?.joinToString(",") {
val sqlAggregateExpression = aggrMethodToSqlAggregate(it, AttributeValueType.ARRAY)
"$sqlAggregateExpression as ${it.method}_value"
}
"""
SELECT entity_id,
time_bucket('${temporalQuery.aggrPeriodDuration}', time, TIMESTAMPTZ '${origin!!}') as origin,
$allAggregates
"""
val aggrPeriodDuration = temporalQuery.aggrPeriodDuration
val allAggregates = temporalQuery.aggrMethods?.composeAggregationSelectClause(AttributeValueType.ARRAY)
// if retrieving a temporal entity, origin is calculated beforehand as timeAt is optional in this case
// if querying temporal entities, timeAt is mandatory and will be used if origin is null
if (aggrPeriodDuration != WHOLE_TIME_RANGE_DURATION) {
val computedOrigin = origin ?: temporalQuery.timeAt
"""
SELECT entity_id,
time_bucket('$aggrPeriodDuration', time, TIMESTAMPTZ '${computedOrigin!!}') as origin,
$allAggregates
"""
} else
"SELECT entity_id, min(time) as origin, max(time) as endTime, $allAggregates "
}
temporalEntitiesQuery.temporalQuery.timeproperty == TemporalProperty.OBSERVED_AT -> {
"""
Expand Down Expand Up @@ -188,7 +194,10 @@ class ScopeService(
if (temporalEntitiesQuery.withAggregatedValues) {
val startDateTime = toZonedDateTime(row["origin"])
val endDateTime =
startDateTime.plus(Duration.parse(temporalEntitiesQuery.temporalQuery.aggrPeriodDuration!!))
if (!temporalEntitiesQuery.isAggregatedWithDefinedDuration())
toZonedDateTime(row["endTime"])
else
startDateTime.plus(Duration.parse(temporalEntitiesQuery.temporalQuery.aggrPeriodDuration!!))
// in a row, there is the result for each requested aggregation method
val values = temporalEntitiesQuery.temporalQuery.aggrMethods!!.map {
val value = row["${it.method}_value"] ?: ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,10 @@ class AttributeInstanceService(
null -> Unit
}

if (temporalEntitiesQuery.withAggregatedValues)
if (temporalEntitiesQuery.isAggregatedWithDefinedDuration())
sqlQueryBuilder.append(" GROUP BY temporal_entity_attribute, origin")
else if (temporalEntitiesQuery.withAggregatedValues)
sqlQueryBuilder.append(" GROUP BY temporal_entity_attribute")
else if (temporalQuery.lastN != null)
// in order to get last instances, need to order by time desc
// final ascending ordering of instances is done in query service
Expand Down Expand Up @@ -200,19 +202,20 @@ class AttributeInstanceService(
origin: ZonedDateTime?
) = when {
temporalQuery.aggrPeriodDuration != null -> {
val allAggregates = temporalQuery.aggrMethods?.joinToString(",") {
val sqlAggregateExpression =
aggrMethodToSqlAggregate(it, temporalEntityAttributes[0].attributeValueType)
"$sqlAggregateExpression as ${it.method}_value"
}
// if retrieving a temporal entity, origin is calculated before
// if querying temporal entities, timeAt is mandatory
val calculatedOrigin = origin ?: temporalQuery.timeAt
"""
SELECT temporal_entity_attribute,
time_bucket('${temporalQuery.aggrPeriodDuration}', time, TIMESTAMPTZ '${calculatedOrigin!!}') as origin,
$allAggregates
""".trimIndent()
val aggrPeriodDuration = temporalQuery.aggrPeriodDuration
val allAggregates = temporalQuery.aggrMethods
?.composeAggregationSelectClause(temporalEntityAttributes[0].attributeValueType)
// if retrieving a temporal entity, origin is calculated beforehand as timeAt is optional in this case
// if querying temporal entities, timeAt is mandatory and will be used if origin is null
if (aggrPeriodDuration != WHOLE_TIME_RANGE_DURATION) {
val computedOrigin = origin ?: temporalQuery.timeAt
"""
SELECT temporal_entity_attribute,
time_bucket('$aggrPeriodDuration', time, TIMESTAMPTZ '${computedOrigin!!}') as origin,
$allAggregates
""".trimIndent()
} else
"SELECT temporal_entity_attribute, min(time) as origin, max(time) as endTime, $allAggregates "
}
else -> {
val valueColumn = when (temporalEntityAttributes[0].attributeValueType) {
Expand Down Expand Up @@ -265,11 +268,14 @@ class AttributeInstanceService(
private fun rowToAttributeInstanceResult(
row: Map<String, Any>,
temporalEntitiesQuery: TemporalEntitiesQuery
): AttributeInstanceResult {
return if (temporalEntitiesQuery.withAggregatedValues) {
): AttributeInstanceResult =
if (temporalEntitiesQuery.withAggregatedValues) {
val startDateTime = toZonedDateTime(row["origin"])
val endDateTime =
startDateTime.plus(Duration.parse(temporalEntitiesQuery.temporalQuery.aggrPeriodDuration!!))
if (!temporalEntitiesQuery.isAggregatedWithDefinedDuration())
toZonedDateTime(row["endTime"])
else
startDateTime.plus(Duration.parse(temporalEntitiesQuery.temporalQuery.aggrPeriodDuration))
// in a row, there is the result for each requested aggregation method
val values = temporalEntitiesQuery.temporalQuery.aggrMethods!!.map {
val value = row["${it.method}_value"] ?: ""
Expand All @@ -294,7 +300,6 @@ class AttributeInstanceService(
timeproperty = temporalEntitiesQuery.temporalQuery.timeproperty.propertyName,
sub = row["sub"] as? String
)
}

@Transactional
suspend fun modifyAttributeInstance(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
package com.egm.stellio.search.util

import com.egm.stellio.search.model.TemporalEntityAttribute
import com.egm.stellio.search.model.TemporalEntityAttribute.AttributeValueType
import com.egm.stellio.search.model.TemporalQuery

fun aggrMethodToSqlAggregate(
aggregate: TemporalQuery.Aggregate,
attributeValueType: TemporalEntityAttribute.AttributeValueType
attributeValueType: AttributeValueType
): String = when (attributeValueType) {
TemporalEntityAttribute.AttributeValueType.STRING -> sqlAggregationForJsonString(aggregate)
TemporalEntityAttribute.AttributeValueType.NUMBER -> sqlAggregateForJsonNumber(aggregate)
TemporalEntityAttribute.AttributeValueType.OBJECT -> sqlAggregateForJsonObject(aggregate)
TemporalEntityAttribute.AttributeValueType.ARRAY -> sqlAggregateForJsonArray(aggregate)
TemporalEntityAttribute.AttributeValueType.BOOLEAN -> sqlAggregateForJsonBoolean(aggregate)
TemporalEntityAttribute.AttributeValueType.DATETIME -> sqlAggregateForDateTime(aggregate)
TemporalEntityAttribute.AttributeValueType.DATE -> sqlAggregateForDate(aggregate)
TemporalEntityAttribute.AttributeValueType.TIME -> sqlAggregateForTime(aggregate)
TemporalEntityAttribute.AttributeValueType.URI -> sqlAggregateForURI(aggregate)
TemporalEntityAttribute.AttributeValueType.GEOMETRY -> "null"
AttributeValueType.STRING -> sqlAggregationForJsonString(aggregate)
AttributeValueType.NUMBER -> sqlAggregateForJsonNumber(aggregate)
AttributeValueType.OBJECT -> sqlAggregateForJsonObject(aggregate)
AttributeValueType.ARRAY -> sqlAggregateForJsonArray(aggregate)
AttributeValueType.BOOLEAN -> sqlAggregateForJsonBoolean(aggregate)
AttributeValueType.DATETIME -> sqlAggregateForDateTime(aggregate)
AttributeValueType.DATE -> sqlAggregateForDate(aggregate)
AttributeValueType.TIME -> sqlAggregateForTime(aggregate)
AttributeValueType.URI -> sqlAggregateForURI(aggregate)
AttributeValueType.GEOMETRY -> "null"
}

fun sqlAggregationForJsonString(aggregate: TemporalQuery.Aggregate): String = when (aggregate) {
Expand Down Expand Up @@ -95,3 +95,9 @@ fun sqlAggregateForURI(aggregate: TemporalQuery.Aggregate): String = when (aggre
TemporalQuery.Aggregate.DISTINCT_COUNT -> "count(distinct(value))"
else -> "null"
}

fun List<TemporalQuery.Aggregate>?.composeAggregationSelectClause(attributeValueType: AttributeValueType): String? =
this?.joinToString(",") {
val sqlAggregateExpression = aggrMethodToSqlAggregate(it, attributeValueType)
"$sqlAggregateExpression as ${it.method}_value"
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,67 @@ class ScopeServiceTests : WithTimescaleContainer, WithKafkaContainer {
}
}

@Test
fun `it should retrieve the history of scopes with aggregated values on whole time range`() = runTest {
createScopeHistory()

val scopeHistoryEntries = scopeService.retrieveHistory(
listOf(beehiveTestCId),
TemporalEntitiesQuery(
QueryParams(limit = 100, offset = 0, context = APIC_COMPOUND_CONTEXT),
TemporalQuery(
timeproperty = TemporalProperty.MODIFIED_AT,
timerel = TemporalQuery.Timerel.BEFORE,
timeAt = ngsiLdDateTime(),
aggrMethods = listOf(TemporalQuery.Aggregate.SUM),
aggrPeriodDuration = "PT0S"
),
withTemporalValues = false,
withAudit = false,
withAggregatedValues = true
),
ngsiLdDateTime().minusHours(1)
).shouldSucceedAndResult()

assertEquals(1, scopeHistoryEntries.size)
assertThat(scopeHistoryEntries).allMatch {
it as AggregatedScopeInstanceResult
it.values.size == 1 &&
it.values[0].aggregate == TemporalQuery.Aggregate.SUM &&
it.values[0].value as Long == 2L
}
}

@Test
fun `it should retrieve the history of scopes with aggregated values on whole time range without interval`() =
runTest {
createScopeHistory()

val scopeHistoryEntries = scopeService.retrieveHistory(
listOf(beehiveTestCId),
TemporalEntitiesQuery(
QueryParams(limit = 100, offset = 0, context = APIC_COMPOUND_CONTEXT),
TemporalQuery(
timeproperty = TemporalProperty.MODIFIED_AT,
aggrMethods = listOf(TemporalQuery.Aggregate.SUM),
aggrPeriodDuration = "PT0S"
),
withTemporalValues = false,
withAudit = false,
withAggregatedValues = true
),
ngsiLdDateTime().minusHours(1)
).shouldSucceedAndResult()

assertEquals(1, scopeHistoryEntries.size)
assertThat(scopeHistoryEntries).allMatch {
it as AggregatedScopeInstanceResult
it.values.size == 1 &&
it.values[0].aggregate == TemporalQuery.Aggregate.SUM &&
it.values[0].value as Long == 2L
}
}

@Test
fun `it should delete scope and its history`() = runTest {
loadSampleData("beehive_with_scope.jsonld")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,30 @@ class AggregatedQueryServiceTests : WithTimescaleContainer, WithKafkaContainer {
}
}

@Test
fun `ìt should aggregate on the whole time range if no aggrPeriodDuration is given`() = runTest {
val temporalEntityAttribute = createTemporalEntityAttribute(TemporalEntityAttribute.AttributeValueType.NUMBER)
(1..10).forEach { i ->
val attributeInstance = gimmeAttributeInstance(teaUuid)
.copy(measuredValue = i.toDouble())
attributeInstanceService.create(attributeInstance)
}

val temporalEntitiesQuery = createTemporalEntitiesQuery("avg")
attributeInstanceService.search(
temporalEntitiesQuery.copy(
temporalQuery = temporalEntitiesQuery.temporalQuery.copy(aggrPeriodDuration = "PT0S")
),
temporalEntityAttribute,
now
).shouldSucceedWith { results ->
assertAggregatedResult(results, "avg")
.matches({
it.toString() == "5.5"
}, "expected value is 5.5")
}
}

@Test
fun `it should handle aggregates for an attribute having different types of values in history`() = runTest {
val temporalEntityAttribute = createTemporalEntityAttribute(TemporalEntityAttribute.AttributeValueType.ARRAY)
Expand Down

0 comments on commit aa14f46

Please sign in to comment.