Skip to content

Commit

Permalink
fix: temporal entity queries with lastN parameter and aggregation per…
Browse files Browse the repository at this point in the history
…iod duration (#1160)

* first draft for the queries with lastN parameter

* feat: allow lastN on aggregation queries

* changes in the scope history

* removed repititive unit test

---------

Co-authored-by: Benoit Orihuela <[email protected]>
  • Loading branch information
ranim-n and bobeal authored May 21, 2024
1 parent c9a670d commit cd3d313
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 21 deletions.
1 change: 1 addition & 0 deletions search-service/config/detekt/baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
<ID>Filename:V0_29__JsonLd_migration.kt$db.migration.V0_29__JsonLd_migration.kt</ID>
<ID>LongMethod:AttributeInstanceService.kt$AttributeInstanceService$@Transactional suspend fun create(attributeInstance: AttributeInstance): Either&lt;APIException, Unit&gt;</ID>
<ID>LongMethod:EnabledAuthorizationServiceTests.kt$EnabledAuthorizationServiceTests$@Test fun `it should return serialized access control entities with other rigths if user is owner`()</ID>
<ID>LongMethod:AttributeInstanceService.kt$AttributeInstanceService$suspend fun search( temporalEntitiesQuery: TemporalEntitiesQuery, temporalEntityAttributes: List&lt;TemporalEntityAttribute&gt;, origin: ZonedDateTime? = null ): Either&lt;APIException, List&lt;AttributeInstanceResult&gt;&gt;</ID>
<ID>LongMethod:EntityAccessControlHandler.kt$EntityAccessControlHandler$@PostMapping("/{subjectId}/attrs", consumes = [MediaType.APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE]) suspend fun addRightsOnEntities( @RequestHeader httpHeaders: HttpHeaders, @PathVariable subjectId: String, @RequestBody requestBody: Mono&lt;String&gt; ): ResponseEntity&lt;*&gt;</ID>
<ID>LongMethod:EntityOperationHandlerTests.kt$EntityOperationHandlerTests$@Test fun `create batch entity should return a 207 when some entities already exist`()</ID>
<ID>LongMethod:PatchAttributeTests.kt$PatchAttributeTests.Companion$@JvmStatic fun mergePatchProvider(): Stream&lt;Arguments&gt;</ID>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,13 @@ class ScopeService(
}

if (temporalEntitiesQuery.isAggregatedWithDefinedDuration())
sqlQueryBuilder.append(" GROUP BY entity_id, origin")
sqlQueryBuilder.append(" GROUP BY entity_id, start")
else if (temporalEntitiesQuery.withAggregatedValues)
sqlQueryBuilder.append(" GROUP BY entity_id")
else if (temporalQuery.lastN != null)
if (temporalQuery.lastN != null)
// in order to get last instances, need to order by time desc
// final ascending ordering of instances is done in query service
sqlQueryBuilder.append(" ORDER BY time DESC LIMIT ${temporalQuery.lastN}")
sqlQueryBuilder.append(" ORDER BY start DESC LIMIT ${temporalQuery.lastN}")

return databaseClient.sql(sqlQueryBuilder.toString())
.bind("entities_ids", entitiesIds)
Expand All @@ -143,20 +143,20 @@ class ScopeService(
val computedOrigin = origin ?: temporalQuery.timeAt
"""
SELECT entity_id,
public.time_bucket('$aggrPeriodDuration', time, TIMESTAMPTZ '${computedOrigin!!}') as origin,
public.time_bucket('$aggrPeriodDuration', time, TIMESTAMPTZ '${computedOrigin!!}') as start,
$allAggregates
"""
} else
"SELECT entity_id, min(time) as origin, max(time) as endTime, $allAggregates "
"SELECT entity_id, min(time) as start, max(time) as end, $allAggregates "
}
temporalEntitiesQuery.temporalQuery.timeproperty == TemporalProperty.OBSERVED_AT -> {
"""
SELECT entity_id, ARRAY(SELECT jsonb_array_elements_text(value)) as value, time
SELECT entity_id, ARRAY(SELECT jsonb_array_elements_text(value)) as value, time as start
"""
}
else -> {
"""
SELECT entity_id, ARRAY(SELECT jsonb_array_elements_text(value)) as value, time, sub
SELECT entity_id, ARRAY(SELECT jsonb_array_elements_text(value)) as value, time as start, sub
"""
}
}
Expand Down Expand Up @@ -188,10 +188,10 @@ class ScopeService(
temporalEntitiesQuery: TemporalEntitiesQuery
): ScopeInstanceResult =
if (temporalEntitiesQuery.withAggregatedValues) {
val startDateTime = toZonedDateTime(row["origin"])
val startDateTime = toZonedDateTime(row["start"])
val endDateTime =
if (!temporalEntitiesQuery.isAggregatedWithDefinedDuration())
toZonedDateTime(row["endTime"])
toZonedDateTime(row["end"])
else
startDateTime.plus(Duration.parse(temporalEntitiesQuery.temporalQuery.aggrPeriodDuration!!))
// in a row, there is the result for each requested aggregation method
Expand All @@ -207,13 +207,13 @@ class ScopeService(
SimplifiedScopeInstanceResult(
entityId = toUri(row["entity_id"]),
scopes = toList(row["value"]),
time = toZonedDateTime(row["time"])
time = toZonedDateTime(row["start"])
)
} else {
FullScopeInstanceResult(
entityId = toUri(row["entity_id"]),
scopes = toList(row["value"]),
time = toZonedDateTime(row["time"]),
time = toZonedDateTime(row["start"]),
timeproperty = temporalEntitiesQuery.temporalQuery.timeproperty.propertyName,
sub = row["sub"] as? String
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,14 @@ class AttributeInstanceService(
}

if (temporalEntitiesQuery.isAggregatedWithDefinedDuration())
sqlQueryBuilder.append(" GROUP BY temporal_entity_attribute, origin")
sqlQueryBuilder.append(" GROUP BY temporal_entity_attribute, start")
else if (temporalEntitiesQuery.withAggregatedValues)
sqlQueryBuilder.append(" GROUP BY temporal_entity_attribute")
else if (temporalQuery.lastN != null)

if (temporalQuery.lastN != null)
// in order to get last instances, need to order by time desc
// final ascending ordering of instances is done in query service
sqlQueryBuilder.append(" ORDER BY time DESC LIMIT ${temporalQuery.lastN}")
sqlQueryBuilder.append(" ORDER BY start DESC LIMIT ${temporalQuery.lastN}")

val finalTemporalQuery = composeFinalTemporalQuery(temporalEntityAttributes, sqlQueryBuilder.toString())

Expand Down Expand Up @@ -211,11 +212,11 @@ class AttributeInstanceService(
val computedOrigin = origin ?: temporalQuery.timeAt
"""
SELECT temporal_entity_attribute,
public.time_bucket('$aggrPeriodDuration', time, TIMESTAMPTZ '${computedOrigin!!}') as origin,
public.time_bucket('$aggrPeriodDuration', time, TIMESTAMPTZ '${computedOrigin!!}') as start,
$allAggregates
""".trimIndent()
} else
"SELECT temporal_entity_attribute, min(time) as origin, max(time) as endTime, $allAggregates "
"SELECT temporal_entity_attribute, min(time) as start, max(time) as end, $allAggregates "
}
else -> {
val valueColumn = when (temporalEntityAttributes[0].attributeValueType) {
Expand All @@ -227,7 +228,7 @@ class AttributeInstanceService(
AttributeInstance.TemporalProperty.OBSERVED_AT -> null
else -> "sub"
}
"SELECT " + listOfNotNull("temporal_entity_attribute", "time", valueColumn, subColumn)
"SELECT " + listOfNotNull("temporal_entity_attribute", "time as start", valueColumn, subColumn)
.joinToString(",")
}
}
Expand Down Expand Up @@ -270,10 +271,10 @@ class AttributeInstanceService(
temporalEntitiesQuery: TemporalEntitiesQuery
): AttributeInstanceResult =
if (temporalEntitiesQuery.withAggregatedValues) {
val startDateTime = toZonedDateTime(row["origin"])
val startDateTime = toZonedDateTime(row["start"])
val endDateTime =
if (!temporalEntitiesQuery.isAggregatedWithDefinedDuration())
toZonedDateTime(row["endTime"])
toZonedDateTime(row["end"])
else
startDateTime.plus(temporalEntitiesQuery.computeAggrPeriodDuration())
// in a row, there is the result for each requested aggregation method
Expand All @@ -291,12 +292,12 @@ class AttributeInstanceService(
// the type of the value of a property may have changed in the history (e.g., from number to string)
// in this case, just display an empty value (something happened, but we can't display it)
value = row["value"] ?: "",
time = toZonedDateTime(row["time"])
time = toZonedDateTime(row["start"])
)
else FullAttributeInstanceResult(
temporalEntityAttribute = toUuid(row["temporal_entity_attribute"]),
payload = toJsonString(row["payload"]),
time = toZonedDateTime(row["time"]),
time = toZonedDateTime(row["start"]),
timeproperty = temporalEntitiesQuery.temporalQuery.timeproperty.propertyName,
sub = row["sub"] as? String
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,40 @@ class ScopeServiceTests : WithTimescaleContainer, WithKafkaContainer {
}
}

@Test
fun `it should retrieve the last n instances of history of scopes with aggregated values`() = runTest {
createScopeHistory()

val scopeHistoryEntries = scopeService.retrieveHistory(
listOf(beehiveTestCId),
TemporalEntitiesQuery(
EntitiesQuery(
paginationQuery = PaginationQuery(limit = 100, offset = 0),
contexts = APIC_COMPOUND_CONTEXTS
),
TemporalQuery(
timeproperty = TemporalProperty.MODIFIED_AT,
timerel = TemporalQuery.Timerel.BEFORE,
timeAt = ngsiLdDateTime(),
aggrMethods = listOf(TemporalQuery.Aggregate.SUM),
aggrPeriodDuration = "PT1S",
lastN = 1
),
withTemporalValues = false,
withAudit = false,
withAggregatedValues = true
),
ngsiLdDateTime().minusHours(1)
).shouldSucceedAndResult()

assertEquals(1, scopeHistoryEntries.size)
assertThat(scopeHistoryEntries).allMatch {
it as AggregatedScopeInstanceResult
it.values.size == 1 &&
it.values[0].aggregate == TemporalQuery.Aggregate.SUM
}
}

@Test
fun `it should delete scope and its history`() = runTest {
loadSampleData("beehive_with_scope.jsonld")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,36 @@ class AttributeInstanceServiceTests : WithTimescaleContainer, WithKafkaContainer
}
}

@Test
fun `it should only return the last n instances asked in an aggregated temporal query`() = runTest {
val now = ngsiLdDateTime()
(1..10).forEachIndexed { index, _ ->
val attributeInstance =
gimmeNumericPropertyAttributeInstance(incomingTemporalEntityAttribute.id)
.copy(
measuredValue = 1.0,
time = now.minusSeconds(index.toLong())
)
attributeInstanceService.create(attributeInstance)
}

val temporalEntitiesQuery = gimmeTemporalEntitiesQuery(
TemporalQuery(
timerel = TemporalQuery.Timerel.BEFORE,
timeAt = now,
aggrPeriodDuration = "PT1S",
aggrMethods = listOf(TemporalQuery.Aggregate.SUM),
lastN = 5
),
withAggregatedValues = true
)
attributeInstanceService.search(temporalEntitiesQuery, incomingTemporalEntityAttribute)
.shouldSucceedWith {
assertThat(it)
.hasSize(5)
}
}

@Test
fun `it should only retrieve the temporal evolution of the provided temporal entity attribute`() = runTest {
val temporalEntityAttribute2 = TemporalEntityAttribute(
Expand Down

0 comments on commit cd3d313

Please sign in to comment.