From cd3d3137423c21bd05e06e5db3d58a78e6a20ea4 Mon Sep 17 00:00:00 2001 From: Ranim Naimi <156652078+ranim-n@users.noreply.github.com> Date: Tue, 21 May 2024 12:00:48 +0200 Subject: [PATCH] fix: temporal entity queries with lastN parameter and aggregation period duration (#1160) * first draft for the queries with lastN parameter * feat: allow lastN on aggregation queries * changes in the scope history * removed repititive unit test --------- Co-authored-by: Benoit Orihuela <benoit.orihuela@egm.io> --- search-service/config/detekt/baseline.xml | 1 + .../egm/stellio/search/scope/ScopeService.kt | 22 ++++++------ .../service/AttributeInstanceService.kt | 21 ++++++------ .../stellio/search/scope/ScopeServiceTests.kt | 34 +++++++++++++++++++ .../service/AttributeInstanceServiceTests.kt | 30 ++++++++++++++++ 5 files changed, 87 insertions(+), 21 deletions(-) diff --git a/search-service/config/detekt/baseline.xml b/search-service/config/detekt/baseline.xml index b30a0a9fc..8cb86eb64 100644 --- a/search-service/config/detekt/baseline.xml +++ b/search-service/config/detekt/baseline.xml @@ -9,6 +9,7 @@ <ID>Filename:V0_29__JsonLd_migration.kt$db.migration.V0_29__JsonLd_migration.kt</ID> <ID>LongMethod:AttributeInstanceService.kt$AttributeInstanceService$@Transactional suspend fun create(attributeInstance: AttributeInstance): Either<APIException, Unit></ID> <ID>LongMethod:EnabledAuthorizationServiceTests.kt$EnabledAuthorizationServiceTests$@Test fun `it should return serialized access control entities with other rigths if user is owner`()</ID> + <ID>LongMethod:AttributeInstanceService.kt$AttributeInstanceService$suspend fun search( temporalEntitiesQuery: TemporalEntitiesQuery, temporalEntityAttributes: List<TemporalEntityAttribute>, origin: ZonedDateTime? = null ): Either<APIException, List<AttributeInstanceResult>></ID> <ID>LongMethod:EntityAccessControlHandler.kt$EntityAccessControlHandler$@PostMapping("/{subjectId}/attrs", consumes = [MediaType.APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE]) suspend fun addRightsOnEntities( @RequestHeader httpHeaders: HttpHeaders, @PathVariable subjectId: String, @RequestBody requestBody: Mono<String> ): ResponseEntity<*></ID> <ID>LongMethod:EntityOperationHandlerTests.kt$EntityOperationHandlerTests$@Test fun `create batch entity should return a 207 when some entities already exist`()</ID> <ID>LongMethod:PatchAttributeTests.kt$PatchAttributeTests.Companion$@JvmStatic fun mergePatchProvider(): Stream<Arguments></ID> diff --git a/search-service/src/main/kotlin/com/egm/stellio/search/scope/ScopeService.kt b/search-service/src/main/kotlin/com/egm/stellio/search/scope/ScopeService.kt index 0a05a9d9d..9e40af907 100644 --- a/search-service/src/main/kotlin/com/egm/stellio/search/scope/ScopeService.kt +++ b/search-service/src/main/kotlin/com/egm/stellio/search/scope/ScopeService.kt @@ -110,13 +110,13 @@ class ScopeService( } if (temporalEntitiesQuery.isAggregatedWithDefinedDuration()) - sqlQueryBuilder.append(" GROUP BY entity_id, origin") + sqlQueryBuilder.append(" GROUP BY entity_id, start") else if (temporalEntitiesQuery.withAggregatedValues) sqlQueryBuilder.append(" GROUP BY entity_id") - else if (temporalQuery.lastN != null) + if (temporalQuery.lastN != null) // in order to get last instances, need to order by time desc // final ascending ordering of instances is done in query service - sqlQueryBuilder.append(" ORDER BY time DESC LIMIT ${temporalQuery.lastN}") + sqlQueryBuilder.append(" ORDER BY start DESC LIMIT ${temporalQuery.lastN}") return databaseClient.sql(sqlQueryBuilder.toString()) .bind("entities_ids", entitiesIds) @@ -143,20 +143,20 @@ class ScopeService( val computedOrigin = origin ?: temporalQuery.timeAt """ SELECT entity_id, - public.time_bucket('$aggrPeriodDuration', time, TIMESTAMPTZ '${computedOrigin!!}') as origin, + public.time_bucket('$aggrPeriodDuration', time, TIMESTAMPTZ '${computedOrigin!!}') as start, $allAggregates """ } else - "SELECT entity_id, min(time) as origin, max(time) as endTime, $allAggregates " + "SELECT entity_id, min(time) as start, max(time) as end, $allAggregates " } temporalEntitiesQuery.temporalQuery.timeproperty == TemporalProperty.OBSERVED_AT -> { """ - SELECT entity_id, ARRAY(SELECT jsonb_array_elements_text(value)) as value, time + SELECT entity_id, ARRAY(SELECT jsonb_array_elements_text(value)) as value, time as start """ } else -> { """ - SELECT entity_id, ARRAY(SELECT jsonb_array_elements_text(value)) as value, time, sub + SELECT entity_id, ARRAY(SELECT jsonb_array_elements_text(value)) as value, time as start, sub """ } } @@ -188,10 +188,10 @@ class ScopeService( temporalEntitiesQuery: TemporalEntitiesQuery ): ScopeInstanceResult = if (temporalEntitiesQuery.withAggregatedValues) { - val startDateTime = toZonedDateTime(row["origin"]) + val startDateTime = toZonedDateTime(row["start"]) val endDateTime = if (!temporalEntitiesQuery.isAggregatedWithDefinedDuration()) - toZonedDateTime(row["endTime"]) + toZonedDateTime(row["end"]) else startDateTime.plus(Duration.parse(temporalEntitiesQuery.temporalQuery.aggrPeriodDuration!!)) // in a row, there is the result for each requested aggregation method @@ -207,13 +207,13 @@ class ScopeService( SimplifiedScopeInstanceResult( entityId = toUri(row["entity_id"]), scopes = toList(row["value"]), - time = toZonedDateTime(row["time"]) + time = toZonedDateTime(row["start"]) ) } else { FullScopeInstanceResult( entityId = toUri(row["entity_id"]), scopes = toList(row["value"]), - time = toZonedDateTime(row["time"]), + time = toZonedDateTime(row["start"]), timeproperty = temporalEntitiesQuery.temporalQuery.timeproperty.propertyName, sub = row["sub"] as? String ) diff --git a/search-service/src/main/kotlin/com/egm/stellio/search/service/AttributeInstanceService.kt b/search-service/src/main/kotlin/com/egm/stellio/search/service/AttributeInstanceService.kt index eb6f6e0a3..235cd5c09 100644 --- a/search-service/src/main/kotlin/com/egm/stellio/search/service/AttributeInstanceService.kt +++ b/search-service/src/main/kotlin/com/egm/stellio/search/service/AttributeInstanceService.kt @@ -155,13 +155,14 @@ class AttributeInstanceService( } if (temporalEntitiesQuery.isAggregatedWithDefinedDuration()) - sqlQueryBuilder.append(" GROUP BY temporal_entity_attribute, origin") + sqlQueryBuilder.append(" GROUP BY temporal_entity_attribute, start") else if (temporalEntitiesQuery.withAggregatedValues) sqlQueryBuilder.append(" GROUP BY temporal_entity_attribute") - else if (temporalQuery.lastN != null) + + if (temporalQuery.lastN != null) // in order to get last instances, need to order by time desc // final ascending ordering of instances is done in query service - sqlQueryBuilder.append(" ORDER BY time DESC LIMIT ${temporalQuery.lastN}") + sqlQueryBuilder.append(" ORDER BY start DESC LIMIT ${temporalQuery.lastN}") val finalTemporalQuery = composeFinalTemporalQuery(temporalEntityAttributes, sqlQueryBuilder.toString()) @@ -211,11 +212,11 @@ class AttributeInstanceService( val computedOrigin = origin ?: temporalQuery.timeAt """ SELECT temporal_entity_attribute, - public.time_bucket('$aggrPeriodDuration', time, TIMESTAMPTZ '${computedOrigin!!}') as origin, + public.time_bucket('$aggrPeriodDuration', time, TIMESTAMPTZ '${computedOrigin!!}') as start, $allAggregates """.trimIndent() } else - "SELECT temporal_entity_attribute, min(time) as origin, max(time) as endTime, $allAggregates " + "SELECT temporal_entity_attribute, min(time) as start, max(time) as end, $allAggregates " } else -> { val valueColumn = when (temporalEntityAttributes[0].attributeValueType) { @@ -227,7 +228,7 @@ class AttributeInstanceService( AttributeInstance.TemporalProperty.OBSERVED_AT -> null else -> "sub" } - "SELECT " + listOfNotNull("temporal_entity_attribute", "time", valueColumn, subColumn) + "SELECT " + listOfNotNull("temporal_entity_attribute", "time as start", valueColumn, subColumn) .joinToString(",") } } @@ -270,10 +271,10 @@ class AttributeInstanceService( temporalEntitiesQuery: TemporalEntitiesQuery ): AttributeInstanceResult = if (temporalEntitiesQuery.withAggregatedValues) { - val startDateTime = toZonedDateTime(row["origin"]) + val startDateTime = toZonedDateTime(row["start"]) val endDateTime = if (!temporalEntitiesQuery.isAggregatedWithDefinedDuration()) - toZonedDateTime(row["endTime"]) + toZonedDateTime(row["end"]) else startDateTime.plus(temporalEntitiesQuery.computeAggrPeriodDuration()) // in a row, there is the result for each requested aggregation method @@ -291,12 +292,12 @@ class AttributeInstanceService( // the type of the value of a property may have changed in the history (e.g., from number to string) // in this case, just display an empty value (something happened, but we can't display it) value = row["value"] ?: "", - time = toZonedDateTime(row["time"]) + time = toZonedDateTime(row["start"]) ) else FullAttributeInstanceResult( temporalEntityAttribute = toUuid(row["temporal_entity_attribute"]), payload = toJsonString(row["payload"]), - time = toZonedDateTime(row["time"]), + time = toZonedDateTime(row["start"]), timeproperty = temporalEntitiesQuery.temporalQuery.timeproperty.propertyName, sub = row["sub"] as? String ) diff --git a/search-service/src/test/kotlin/com/egm/stellio/search/scope/ScopeServiceTests.kt b/search-service/src/test/kotlin/com/egm/stellio/search/scope/ScopeServiceTests.kt index 36372813d..5df8a4cc3 100644 --- a/search-service/src/test/kotlin/com/egm/stellio/search/scope/ScopeServiceTests.kt +++ b/search-service/src/test/kotlin/com/egm/stellio/search/scope/ScopeServiceTests.kt @@ -284,6 +284,40 @@ class ScopeServiceTests : WithTimescaleContainer, WithKafkaContainer { } } + @Test + fun `it should retrieve the last n instances of history of scopes with aggregated values`() = runTest { + createScopeHistory() + + val scopeHistoryEntries = scopeService.retrieveHistory( + listOf(beehiveTestCId), + TemporalEntitiesQuery( + EntitiesQuery( + paginationQuery = PaginationQuery(limit = 100, offset = 0), + contexts = APIC_COMPOUND_CONTEXTS + ), + TemporalQuery( + timeproperty = TemporalProperty.MODIFIED_AT, + timerel = TemporalQuery.Timerel.BEFORE, + timeAt = ngsiLdDateTime(), + aggrMethods = listOf(TemporalQuery.Aggregate.SUM), + aggrPeriodDuration = "PT1S", + lastN = 1 + ), + withTemporalValues = false, + withAudit = false, + withAggregatedValues = true + ), + ngsiLdDateTime().minusHours(1) + ).shouldSucceedAndResult() + + assertEquals(1, scopeHistoryEntries.size) + assertThat(scopeHistoryEntries).allMatch { + it as AggregatedScopeInstanceResult + it.values.size == 1 && + it.values[0].aggregate == TemporalQuery.Aggregate.SUM + } + } + @Test fun `it should delete scope and its history`() = runTest { loadSampleData("beehive_with_scope.jsonld") diff --git a/search-service/src/test/kotlin/com/egm/stellio/search/service/AttributeInstanceServiceTests.kt b/search-service/src/test/kotlin/com/egm/stellio/search/service/AttributeInstanceServiceTests.kt index 62efd15b3..76b97031f 100644 --- a/search-service/src/test/kotlin/com/egm/stellio/search/service/AttributeInstanceServiceTests.kt +++ b/search-service/src/test/kotlin/com/egm/stellio/search/service/AttributeInstanceServiceTests.kt @@ -365,6 +365,36 @@ class AttributeInstanceServiceTests : WithTimescaleContainer, WithKafkaContainer } } + @Test + fun `it should only return the last n instances asked in an aggregated temporal query`() = runTest { + val now = ngsiLdDateTime() + (1..10).forEachIndexed { index, _ -> + val attributeInstance = + gimmeNumericPropertyAttributeInstance(incomingTemporalEntityAttribute.id) + .copy( + measuredValue = 1.0, + time = now.minusSeconds(index.toLong()) + ) + attributeInstanceService.create(attributeInstance) + } + + val temporalEntitiesQuery = gimmeTemporalEntitiesQuery( + TemporalQuery( + timerel = TemporalQuery.Timerel.BEFORE, + timeAt = now, + aggrPeriodDuration = "PT1S", + aggrMethods = listOf(TemporalQuery.Aggregate.SUM), + lastN = 5 + ), + withAggregatedValues = true + ) + attributeInstanceService.search(temporalEntitiesQuery, incomingTemporalEntityAttribute) + .shouldSucceedWith { + assertThat(it) + .hasSize(5) + } + } + @Test fun `it should only retrieve the temporal evolution of the provided temporal entity attribute`() = runTest { val temporalEntityAttribute2 = TemporalEntityAttribute(