From 2b352cc41f5da7c6a466ed1f12469d7b5300bb22 Mon Sep 17 00:00:00 2001 From: ranim-n Date: Wed, 15 May 2024 12:53:52 +0200 Subject: [PATCH] first draft for the queries with lastN parameter --- search-service/config/detekt/baseline.xml | 4 +++ .../service/AttributeInstanceService.kt | 31 +++++++++++++------ .../service/AttributeInstanceServiceTests.kt | 24 ++++++++++++++ 3 files changed, 49 insertions(+), 10 deletions(-) diff --git a/search-service/config/detekt/baseline.xml b/search-service/config/detekt/baseline.xml index 751fa1b031..a7d8e3aec1 100644 --- a/search-service/config/detekt/baseline.xml +++ b/search-service/config/detekt/baseline.xml @@ -6,8 +6,10 @@ ClassNaming:V0_29__JsonLd_migration.kt$V0_29__JsonLd_migration : BaseJavaMigration ComplexCondition:EntitiesQueryUtils.kt$geoQuery == null && q.isNullOrEmpty() && typeSelection.isNullOrEmpty() && attrs.isEmpty() ComplexCondition:EntityPayloadService.kt$EntityPayloadService$it && !inverse || !it && inverse + CyclomaticComplexMethod:AttributeInstanceService.kt$AttributeInstanceService$suspend fun search( temporalEntitiesQuery: TemporalEntitiesQuery, temporalEntityAttributes: List<TemporalEntityAttribute>, origin: ZonedDateTime? = null ): Either<APIException, List<AttributeInstanceResult>> Filename:V0_29__JsonLd_migration.kt$db.migration.V0_29__JsonLd_migration.kt LongMethod:AttributeInstanceService.kt$AttributeInstanceService$@Transactional suspend fun create(attributeInstance: AttributeInstance): Either<APIException, Unit> + LongMethod:AttributeInstanceService.kt$AttributeInstanceService$suspend fun search( temporalEntitiesQuery: TemporalEntitiesQuery, temporalEntityAttributes: List<TemporalEntityAttribute>, origin: ZonedDateTime? = null ): Either<APIException, List<AttributeInstanceResult>> LongMethod:EntityAccessControlHandler.kt$EntityAccessControlHandler$@PostMapping("/{subjectId}/attrs", consumes = [MediaType.APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE]) suspend fun addRightsOnEntities( @RequestHeader httpHeaders: HttpHeaders, @PathVariable subjectId: String, @RequestBody requestBody: Mono<String> ): ResponseEntity<*> LongMethod:EntityOperationHandlerTests.kt$EntityOperationHandlerTests$@Test fun `create batch entity should return a 207 when some entities already exist`() LongMethod:PatchAttributeTests.kt$PatchAttributeTests.Companion$@JvmStatic fun mergePatchProvider(): Stream<Arguments> @@ -26,6 +28,8 @@ LongParameterList:TemporalEntityAttributeService.kt$TemporalEntityAttributeService$( tea: TemporalEntityAttribute, attributeName: ExpandedTerm, attributeMetadata: AttributeMetadata, mergedAt: ZonedDateTime, observedAt: ZonedDateTime?, attributePayload: ExpandedAttributeInstance, sub: Sub? ) LongParameterList:TemporalEntityAttributeService.kt$TemporalEntityAttributeService$( temporalEntityAttribute: TemporalEntityAttribute, ngsiLdAttribute: NgsiLdAttribute, attributeMetadata: AttributeMetadata, createdAt: ZonedDateTime, attributePayload: ExpandedAttributeInstance, sub: Sub? ) LongParameterList:V0_29__JsonLd_migration.kt$V0_29__JsonLd_migration$( entityId: URI, attributeName: ExpandedTerm, datasetId: URI?, attributePayload: ExpandedAttributeInstance, ngsiLdAttributeInstance: NgsiLdAttributeInstance, defaultCreatedAt: ZonedDateTime ) + MaxLineLength:AttributeInstanceServiceTests.kt$AttributeInstanceServiceTests$fun + MaximumLineLength:AttributeInstanceServiceTests.kt$AttributeInstanceServiceTests$ NestedBlockDepth:V0_29__JsonLd_migration.kt$V0_29__JsonLd_migration$override fun migrate(context: Context) ReturnCount:EntitiesQueryUtils.kt$fun buildTemporalQuery( params: MultiValueMap<String, String>, inQueryEntities: Boolean = false, withAggregatedValues: Boolean = false ): Either<APIException, TemporalQuery> SwallowedException:EntitiesQueryUtils.kt$e: IllegalArgumentException diff --git a/search-service/src/main/kotlin/com/egm/stellio/search/service/AttributeInstanceService.kt b/search-service/src/main/kotlin/com/egm/stellio/search/service/AttributeInstanceService.kt index eb6f6e0a30..da647324fa 100644 --- a/search-service/src/main/kotlin/com/egm/stellio/search/service/AttributeInstanceService.kt +++ b/search-service/src/main/kotlin/com/egm/stellio/search/service/AttributeInstanceService.kt @@ -126,9 +126,11 @@ class AttributeInstanceService( sqlQueryBuilder.append(composeSearchSelectStatement(temporalQuery, temporalEntityAttributes, origin)) - if (!temporalEntitiesQuery.withTemporalValues && !temporalEntitiesQuery.withAggregatedValues) + if (!temporalEntitiesQuery.withTemporalValues && !temporalEntitiesQuery.withAggregatedValues && + temporalQuery.aggrMethods == null + ) { sqlQueryBuilder.append(", payload") - + } if (temporalQuery.timeproperty == AttributeInstance.TemporalProperty.OBSERVED_AT) sqlQueryBuilder.append( """ @@ -155,12 +157,19 @@ class AttributeInstanceService( } if (temporalEntitiesQuery.isAggregatedWithDefinedDuration()) - sqlQueryBuilder.append(" GROUP BY temporal_entity_attribute, origin") + sqlQueryBuilder.append(" GROUP BY temporal_entity_attribute, time") else if (temporalEntitiesQuery.withAggregatedValues) sqlQueryBuilder.append(" GROUP BY temporal_entity_attribute") - else if (temporalQuery.lastN != null) + if (temporalQuery.lastN != null && !temporalEntitiesQuery.isAggregatedWithDefinedDuration() && + !temporalEntitiesQuery.withAggregatedValues + ) { // in order to get last instances, need to order by time desc // final ascending ordering of instances is done in query service + sqlQueryBuilder.append( + " GROUP BY temporal_entity_attribute, time, measured_value, payload " + + "ORDER BY time DESC LIMIT ${temporalQuery.lastN}" + ) + } else if (temporalQuery.lastN != null) sqlQueryBuilder.append(" ORDER BY time DESC LIMIT ${temporalQuery.lastN}") val finalTemporalQuery = composeFinalTemporalQuery(temporalEntityAttributes, sqlQueryBuilder.toString()) @@ -211,11 +220,11 @@ class AttributeInstanceService( val computedOrigin = origin ?: temporalQuery.timeAt """ SELECT temporal_entity_attribute, - public.time_bucket('$aggrPeriodDuration', time, TIMESTAMPTZ '${computedOrigin!!}') as origin, + public.time_bucket('$aggrPeriodDuration', time, TIMESTAMPTZ '${computedOrigin!!}') as time, $allAggregates """.trimIndent() } else - "SELECT temporal_entity_attribute, min(time) as origin, max(time) as endTime, $allAggregates " + "SELECT temporal_entity_attribute, min(time) as time, max(time) as endTime, $allAggregates " } else -> { val valueColumn = when (temporalEntityAttributes[0].attributeValueType) { @@ -269,12 +278,14 @@ class AttributeInstanceService( row: Map, temporalEntitiesQuery: TemporalEntitiesQuery ): AttributeInstanceResult = - if (temporalEntitiesQuery.withAggregatedValues) { - val startDateTime = toZonedDateTime(row["origin"]) + if (temporalEntitiesQuery.withAggregatedValues || temporalEntitiesQuery.temporalQuery.aggrMethods != null) { + val startDateTime = toZonedDateTime(row["time"]) val endDateTime = - if (!temporalEntitiesQuery.isAggregatedWithDefinedDuration()) + if (!temporalEntitiesQuery.isAggregatedWithDefinedDuration() && + temporalEntitiesQuery.temporalQuery.aggrPeriodDuration == null + ) { toZonedDateTime(row["endTime"]) - else + } else startDateTime.plus(temporalEntitiesQuery.computeAggrPeriodDuration()) // in a row, there is the result for each requested aggregation method val values = temporalEntitiesQuery.temporalQuery.aggrMethods!!.map { diff --git a/search-service/src/test/kotlin/com/egm/stellio/search/service/AttributeInstanceServiceTests.kt b/search-service/src/test/kotlin/com/egm/stellio/search/service/AttributeInstanceServiceTests.kt index 62efd15b32..406ad17c9e 100644 --- a/search-service/src/test/kotlin/com/egm/stellio/search/service/AttributeInstanceServiceTests.kt +++ b/search-service/src/test/kotlin/com/egm/stellio/search/service/AttributeInstanceServiceTests.kt @@ -365,6 +365,30 @@ class AttributeInstanceServiceTests : WithTimescaleContainer, WithKafkaContainer } } + @Test + fun `it should only return the last n instances with aggregated values and aggregation period in the query`() = runTest { + (1..10).forEach { _ -> + val attributeInstance = gimmeNumericPropertyAttributeInstance(incomingTemporalEntityAttribute.id) + .copy(measuredValue = 1.0) + attributeInstanceService.create(attributeInstance) + } + + val temporalEntitiesQuery = gimmeTemporalEntitiesQuery( + TemporalQuery( + timerel = TemporalQuery.Timerel.BEFORE, + timeAt = now.plusHours(1), + aggrPeriodDuration = "PT1S", + aggrMethods = listOf(TemporalQuery.Aggregate.SUM), + lastN = 5 + ) + ) + attributeInstanceService.search(temporalEntitiesQuery, incomingTemporalEntityAttribute) + .shouldSucceedWith { + assertThat(it) + .hasSize(5) + } + } + @Test fun `it should only retrieve the temporal evolution of the provided temporal entity attribute`() = runTest { val temporalEntityAttribute2 = TemporalEntityAttribute(