From d4e1c8a31774908f90062cd7d780cfa4cecd7430 Mon Sep 17 00:00:00 2001 From: Thomas BOUSSELIN Date: Tue, 21 May 2024 18:13:22 +0200 Subject: [PATCH 1/3] feat: batch merge endpoint --- .../search/service/EntityOperationService.kt | 29 +++++++++- .../search/web/EntityOperationHandler.kt | 54 ++++++++++++++++++- .../service/EntityOperationServiceTests.kt | 27 ++++++++++ 3 files changed, 107 insertions(+), 3 deletions(-) diff --git a/search-service/src/main/kotlin/com/egm/stellio/search/service/EntityOperationService.kt b/search-service/src/main/kotlin/com/egm/stellio/search/service/EntityOperationService.kt index 69d8245632..ae1f84073d 100644 --- a/search-service/src/main/kotlin/com/egm/stellio/search/service/EntityOperationService.kt +++ b/search-service/src/main/kotlin/com/egm/stellio/search/service/EntityOperationService.kt @@ -159,7 +159,19 @@ class EntityOperationService( ): BatchOperationResult = processEntities(entities, disallowOverwrite, sub, ::updateEntity) - private suspend fun processEntities( + /** + * Merge a batch of [entities] + * + * @return a [BatchOperationResult] with list of updated ids and list of errors. + */ + @Transactional + suspend fun merge( + entities: List, + sub: Sub? + ): BatchOperationResult = + processEntities(entities, false, sub, ::mergeEntity) + + internal suspend fun processEntities( entities: List, disallowOverwrite: Boolean = false, sub: Sub?, @@ -233,4 +245,19 @@ class EntityOperationService( sub ).bind() } + + @SuppressWarnings("UnusedParameter") + suspend fun mergeEntity( + entity: JsonLdNgsiLdEntity, + disallowOverwrite: Boolean, + sub: Sub? + ): Either { + val (jsonLdEntity, ngsiLdEntity) = entity + return entityPayloadService.mergeEntity( + ngsiLdEntity.id, + jsonLdEntity.getModifiableMembers(), + null, + sub + ) + } } diff --git a/search-service/src/main/kotlin/com/egm/stellio/search/web/EntityOperationHandler.kt b/search-service/src/main/kotlin/com/egm/stellio/search/web/EntityOperationHandler.kt index 1ae40d6dad..8d5d1ae1bb 100644 --- a/search-service/src/main/kotlin/com/egm/stellio/search/web/EntityOperationHandler.kt +++ b/search-service/src/main/kotlin/com/egm/stellio/search/web/EntityOperationHandler.kt @@ -27,7 +27,7 @@ import org.springframework.util.LinkedMultiValueMap import org.springframework.util.MultiValueMap import org.springframework.web.bind.annotation.* import reactor.core.publisher.Mono -import java.util.Optional +import java.util.* @RestController @RequestMapping("/ngsi-ld/v1/entityOperations") @@ -171,11 +171,11 @@ class EntityOperationHandler( checkBatchRequestBody(body).bind() checkContentType(httpHeaders, body).bind() val context = getContextFromLinkHeader(httpHeaders.getOrEmpty(HttpHeaders.LINK)).bind() - val disallowOverwrite = options.map { it == QUERY_PARAM_OPTIONS_NOOVERWRITE_VALUE }.orElse(false) val (parsedEntities, unparsableEntities) = expandAndPrepareBatchOfEntities(body, context, httpHeaders.contentType).bind() val (existingEntities, unknownEntities) = entityOperationService.splitEntitiesByExistence(parsedEntities) + val disallowOverwrite = options.map { it == QUERY_PARAM_OPTIONS_NOOVERWRITE_VALUE }.orElse(false) val (existingEntitiesUnauthorized, existingEntitiesAuthorized) = existingEntities.partition { authorizationService.userCanUpdateEntity(it.entityId(), sub).isLeft() } @@ -205,6 +205,56 @@ class EntityOperationHandler( { it } ) + /** + * Implements (6.31.3.1) - Merge Batch of Entities + */ + @PostMapping("/merge", consumes = [MediaType.APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE]) + suspend fun merge( + @RequestHeader httpHeaders: HttpHeaders, + @RequestBody requestBody: Mono, + ): ResponseEntity<*> = either { + val sub = getSubFromSecurityContext() + + val body = requestBody.awaitFirst().deserializeAsList() + .checkNamesAreNgsiLdSupported().bind() + .checkContentIsNgsiLdSupported().bind() + checkBatchRequestBody(body).bind() + checkContentType(httpHeaders, body).bind() + val context = getContextFromLinkHeader(httpHeaders.getOrEmpty(HttpHeaders.LINK)).bind() + + val (parsedEntities, unparsableEntities) = + expandAndPrepareBatchOfEntities(body, context, httpHeaders.contentType).bind() + + val (existingEntities, unknownEntities) = entityOperationService.splitEntitiesByExistence(parsedEntities) + + val (existingEntitiesUnauthorized, existingEntitiesAuthorized) = + existingEntities.partition { authorizationService.userCanUpdateEntity(it.entityId(), sub).isLeft() } + + val batchOperationResult = BatchOperationResult().apply { + addEntitiesToErrors(unparsableEntities) + addEntitiesToErrors(unknownEntities.extractNgsiLdEntities(), ENTITY_DOES_NOT_EXIST_MESSAGE) + addEntitiesToErrors(existingEntitiesUnauthorized.extractNgsiLdEntities(), ENTITY_UPDATE_FORBIDDEN_MESSAGE) + } + + if (existingEntitiesAuthorized.isNotEmpty()) { + val mergeOperationResult = + entityOperationService.merge(existingEntitiesAuthorized, sub.getOrNull()) + + publishUpdateEvents(sub.getOrNull(), mergeOperationResult, parsedEntities) + + batchOperationResult.errors.addAll(mergeOperationResult.errors) + batchOperationResult.success.addAll(mergeOperationResult.success) + } + + if (batchOperationResult.errors.isEmpty() && unknownEntities.isEmpty()) + ResponseEntity.status(HttpStatus.NO_CONTENT).build() + else + ResponseEntity.status(HttpStatus.MULTI_STATUS).body(batchOperationResult) + }.fold( + { it.toErrorResponse() }, + { it } + ) + /** * Implements 6.17.3.1 - Delete Batch of Entities */ diff --git a/search-service/src/test/kotlin/com/egm/stellio/search/service/EntityOperationServiceTests.kt b/search-service/src/test/kotlin/com/egm/stellio/search/service/EntityOperationServiceTests.kt index 2c502c5d12..7dd0be3ae2 100644 --- a/search-service/src/test/kotlin/com/egm/stellio/search/service/EntityOperationServiceTests.kt +++ b/search-service/src/test/kotlin/com/egm/stellio/search/service/EntityOperationServiceTests.kt @@ -382,4 +382,31 @@ class EntityOperationServiceTests { coVerify { entityPayloadService.deleteEntity(firstEntityURI) } coVerify { entityPayloadService.deleteEntity(secondEntityURI) } } + + @Test + fun `batch merge should ask to merge attributes of entities`() = runTest { + coEvery { + entityPayloadService.mergeEntity(any(), any(), any(), any()) + } returns EMPTY_UPDATE_RESULT.right() + + val batchOperationResult = entityOperationService.merge( + listOf( + firstExpandedEntity to firstEntity, + secondExpandedEntity to secondEntity + ), + sub + ) + + assertEquals( + listOf(firstEntityURI, secondEntityURI), + batchOperationResult.getSuccessfulEntitiesIds() + ) + + coVerify { + entityPayloadService.mergeEntity(eq(firstEntityURI), any(), null, sub) + } + coVerify { + entityPayloadService.mergeEntity(eq(secondEntityURI), any(), null, sub) + } + } } From d66a6f6edfd2e7c05cf9248a9ca1c0576b874627 Mon Sep 17 00:00:00 2001 From: Thomas BOUSSELIN Date: Fri, 24 May 2024 14:58:59 +0200 Subject: [PATCH 2/3] refactor(EntityOperationHandler): fix PR comment --- .../egm/stellio/search/service/EntityOperationService.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/search-service/src/main/kotlin/com/egm/stellio/search/service/EntityOperationService.kt b/search-service/src/main/kotlin/com/egm/stellio/search/service/EntityOperationService.kt index ae1f84073d..935cbbf8d0 100644 --- a/search-service/src/main/kotlin/com/egm/stellio/search/service/EntityOperationService.kt +++ b/search-service/src/main/kotlin/com/egm/stellio/search/service/EntityOperationService.kt @@ -236,14 +236,14 @@ class EntityOperationService( entity: JsonLdNgsiLdEntity, disallowOverwrite: Boolean, sub: Sub? - ): Either = either { + ): Either { val (jsonLdEntity, ngsiLdEntity) = entity - entityPayloadService.appendAttributes( + return entityPayloadService.appendAttributes( ngsiLdEntity.id, jsonLdEntity.getModifiableMembers(), disallowOverwrite, sub - ).bind() + ) } @SuppressWarnings("UnusedParameter") From 2820c68ebbcd7a3c0f1da8d338979803aa52ea45 Mon Sep 17 00:00:00 2001 From: Thomas BOUSSELIN Date: Wed, 22 May 2024 09:31:24 +0200 Subject: [PATCH 3/3] refactor(EntityOperationHandler): remove duplication --- .../search/web/EntityOperationHandler.kt | 54 +++++++------------ 1 file changed, 20 insertions(+), 34 deletions(-) diff --git a/search-service/src/main/kotlin/com/egm/stellio/search/web/EntityOperationHandler.kt b/search-service/src/main/kotlin/com/egm/stellio/search/web/EntityOperationHandler.kt index 8d5d1ae1bb..0c4dc9f6d0 100644 --- a/search-service/src/main/kotlin/com/egm/stellio/search/web/EntityOperationHandler.kt +++ b/search-service/src/main/kotlin/com/egm/stellio/search/web/EntityOperationHandler.kt @@ -27,7 +27,7 @@ import org.springframework.util.LinkedMultiValueMap import org.springframework.util.MultiValueMap import org.springframework.web.bind.annotation.* import reactor.core.publisher.Mono -import java.util.* +import java.util.Optional @RestController @RequestMapping("/ngsi-ld/v1/entityOperations") @@ -50,16 +50,10 @@ class EntityOperationHandler( ): ResponseEntity<*> = either { val sub = getSubFromSecurityContext() - val body = requestBody.awaitFirst().deserializeAsList() - .checkNamesAreNgsiLdSupported().bind() - .checkContentIsNgsiLdSupported().bind() - checkBatchRequestBody(body).bind() - checkContentType(httpHeaders, body).bind() - val context = getContextFromLinkHeader(httpHeaders.getOrEmpty(HttpHeaders.LINK)).bind() + val (parsedEntities, unparsableEntities) = getNgsiLdEntities(requestBody, httpHeaders).bind() - val (parsedEntities, unparsableEntities) = - expandAndPrepareBatchOfEntities(body, context, httpHeaders.contentType).bind() val (uniqueEntities, duplicateEntities) = + entityOperationService.splitEntitiesByUniqueness(parsedEntities) val (existingEntities, newEntities) = entityOperationService.splitEntitiesByExistence(uniqueEntities) val (unauthorizedEntities, authorizedEntities) = newEntities.partition { @@ -94,15 +88,8 @@ class EntityOperationHandler( ): ResponseEntity<*> = either { val sub = getSubFromSecurityContext() - val body = requestBody.awaitFirst().deserializeAsList() - .checkNamesAreNgsiLdSupported().bind() - .checkContentIsNgsiLdSupported().bind() - checkBatchRequestBody(body).bind() - checkContentType(httpHeaders, body).bind() - val context = getContextFromLinkHeader(httpHeaders.getOrEmpty(HttpHeaders.LINK)).bind() + val (parsedEntities, unparsableEntities) = getNgsiLdEntities(requestBody, httpHeaders).bind() - val (parsedEntities, unparsableEntities) = - expandAndPrepareBatchOfEntities(body, context, httpHeaders.contentType).bind() val (existingEntities, newEntities) = entityOperationService.splitEntitiesByExistence(parsedEntities) val (newUnauthorizedEntities, newAuthorizedEntities) = newEntities.partition { @@ -165,16 +152,10 @@ class EntityOperationHandler( ): ResponseEntity<*> = either { val sub = getSubFromSecurityContext() - val body = requestBody.awaitFirst().deserializeAsList() - .checkNamesAreNgsiLdSupported().bind() - .checkContentIsNgsiLdSupported().bind() - checkBatchRequestBody(body).bind() - checkContentType(httpHeaders, body).bind() - val context = getContextFromLinkHeader(httpHeaders.getOrEmpty(HttpHeaders.LINK)).bind() + val (parsedEntities, unparsableEntities) = getNgsiLdEntities(requestBody, httpHeaders).bind() - val (parsedEntities, unparsableEntities) = - expandAndPrepareBatchOfEntities(body, context, httpHeaders.contentType).bind() val (existingEntities, unknownEntities) = entityOperationService.splitEntitiesByExistence(parsedEntities) + val disallowOverwrite = options.map { it == QUERY_PARAM_OPTIONS_NOOVERWRITE_VALUE }.orElse(false) val (existingEntitiesUnauthorized, existingEntitiesAuthorized) = @@ -215,15 +196,7 @@ class EntityOperationHandler( ): ResponseEntity<*> = either { val sub = getSubFromSecurityContext() - val body = requestBody.awaitFirst().deserializeAsList() - .checkNamesAreNgsiLdSupported().bind() - .checkContentIsNgsiLdSupported().bind() - checkBatchRequestBody(body).bind() - checkContentType(httpHeaders, body).bind() - val context = getContextFromLinkHeader(httpHeaders.getOrEmpty(HttpHeaders.LINK)).bind() - - val (parsedEntities, unparsableEntities) = - expandAndPrepareBatchOfEntities(body, context, httpHeaders.contentType).bind() + val (parsedEntities, unparsableEntities) = getNgsiLdEntities(requestBody, httpHeaders).bind() val (existingEntities, unknownEntities) = entityOperationService.splitEntitiesByExistence(parsedEntities) @@ -451,4 +424,17 @@ class EntityOperationHandler( ) } } + + private suspend fun getNgsiLdEntities( + requestBody: Mono, + httpHeaders: HttpHeaders, + ): Either = either { + val body = requestBody.awaitFirst().deserializeAsList() + .checkNamesAreNgsiLdSupported().bind() + .checkContentIsNgsiLdSupported().bind() + checkBatchRequestBody(body).bind() + checkContentType(httpHeaders, body).bind() + val context = getContextFromLinkHeader(httpHeaders.getOrEmpty(HttpHeaders.LINK)).bind() + return@either expandAndPrepareBatchOfEntities(body, context, httpHeaders.contentType).bind() + } }