diff --git a/search-service/src/main/kotlin/com/egm/stellio/search/service/EntityOperationService.kt b/search-service/src/main/kotlin/com/egm/stellio/search/service/EntityOperationService.kt index 69d824563..935cbbf8d 100644 --- a/search-service/src/main/kotlin/com/egm/stellio/search/service/EntityOperationService.kt +++ b/search-service/src/main/kotlin/com/egm/stellio/search/service/EntityOperationService.kt @@ -159,7 +159,19 @@ class EntityOperationService( ): BatchOperationResult = processEntities(entities, disallowOverwrite, sub, ::updateEntity) - private suspend fun processEntities( + /** + * Merge a batch of [entities] + * + * @return a [BatchOperationResult] with list of updated ids and list of errors. + */ + @Transactional + suspend fun merge( + entities: List, + sub: Sub? + ): BatchOperationResult = + processEntities(entities, false, sub, ::mergeEntity) + + internal suspend fun processEntities( entities: List, disallowOverwrite: Boolean = false, sub: Sub?, @@ -224,13 +236,28 @@ class EntityOperationService( entity: JsonLdNgsiLdEntity, disallowOverwrite: Boolean, sub: Sub? - ): Either = either { + ): Either { val (jsonLdEntity, ngsiLdEntity) = entity - entityPayloadService.appendAttributes( + return entityPayloadService.appendAttributes( ngsiLdEntity.id, jsonLdEntity.getModifiableMembers(), disallowOverwrite, sub - ).bind() + ) + } + + @SuppressWarnings("UnusedParameter") + suspend fun mergeEntity( + entity: JsonLdNgsiLdEntity, + disallowOverwrite: Boolean, + sub: Sub? + ): Either { + val (jsonLdEntity, ngsiLdEntity) = entity + return entityPayloadService.mergeEntity( + ngsiLdEntity.id, + jsonLdEntity.getModifiableMembers(), + null, + sub + ) } } diff --git a/search-service/src/main/kotlin/com/egm/stellio/search/web/EntityOperationHandler.kt b/search-service/src/main/kotlin/com/egm/stellio/search/web/EntityOperationHandler.kt index 1ae40d6da..0c4dc9f6d 100644 --- a/search-service/src/main/kotlin/com/egm/stellio/search/web/EntityOperationHandler.kt +++ b/search-service/src/main/kotlin/com/egm/stellio/search/web/EntityOperationHandler.kt @@ -50,16 +50,10 @@ class EntityOperationHandler( ): ResponseEntity<*> = either { val sub = getSubFromSecurityContext() - val body = requestBody.awaitFirst().deserializeAsList() - .checkNamesAreNgsiLdSupported().bind() - .checkContentIsNgsiLdSupported().bind() - checkBatchRequestBody(body).bind() - checkContentType(httpHeaders, body).bind() - val context = getContextFromLinkHeader(httpHeaders.getOrEmpty(HttpHeaders.LINK)).bind() + val (parsedEntities, unparsableEntities) = getNgsiLdEntities(requestBody, httpHeaders).bind() - val (parsedEntities, unparsableEntities) = - expandAndPrepareBatchOfEntities(body, context, httpHeaders.contentType).bind() val (uniqueEntities, duplicateEntities) = + entityOperationService.splitEntitiesByUniqueness(parsedEntities) val (existingEntities, newEntities) = entityOperationService.splitEntitiesByExistence(uniqueEntities) val (unauthorizedEntities, authorizedEntities) = newEntities.partition { @@ -94,15 +88,8 @@ class EntityOperationHandler( ): ResponseEntity<*> = either { val sub = getSubFromSecurityContext() - val body = requestBody.awaitFirst().deserializeAsList() - .checkNamesAreNgsiLdSupported().bind() - .checkContentIsNgsiLdSupported().bind() - checkBatchRequestBody(body).bind() - checkContentType(httpHeaders, body).bind() - val context = getContextFromLinkHeader(httpHeaders.getOrEmpty(HttpHeaders.LINK)).bind() + val (parsedEntities, unparsableEntities) = getNgsiLdEntities(requestBody, httpHeaders).bind() - val (parsedEntities, unparsableEntities) = - expandAndPrepareBatchOfEntities(body, context, httpHeaders.contentType).bind() val (existingEntities, newEntities) = entityOperationService.splitEntitiesByExistence(parsedEntities) val (newUnauthorizedEntities, newAuthorizedEntities) = newEntities.partition { @@ -165,18 +152,12 @@ class EntityOperationHandler( ): ResponseEntity<*> = either { val sub = getSubFromSecurityContext() - val body = requestBody.awaitFirst().deserializeAsList() - .checkNamesAreNgsiLdSupported().bind() - .checkContentIsNgsiLdSupported().bind() - checkBatchRequestBody(body).bind() - checkContentType(httpHeaders, body).bind() - val context = getContextFromLinkHeader(httpHeaders.getOrEmpty(HttpHeaders.LINK)).bind() - val disallowOverwrite = options.map { it == QUERY_PARAM_OPTIONS_NOOVERWRITE_VALUE }.orElse(false) + val (parsedEntities, unparsableEntities) = getNgsiLdEntities(requestBody, httpHeaders).bind() - val (parsedEntities, unparsableEntities) = - expandAndPrepareBatchOfEntities(body, context, httpHeaders.contentType).bind() val (existingEntities, unknownEntities) = entityOperationService.splitEntitiesByExistence(parsedEntities) + val disallowOverwrite = options.map { it == QUERY_PARAM_OPTIONS_NOOVERWRITE_VALUE }.orElse(false) + val (existingEntitiesUnauthorized, existingEntitiesAuthorized) = existingEntities.partition { authorizationService.userCanUpdateEntity(it.entityId(), sub).isLeft() } @@ -205,6 +186,48 @@ class EntityOperationHandler( { it } ) + /** + * Implements (6.31.3.1) - Merge Batch of Entities + */ + @PostMapping("/merge", consumes = [MediaType.APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE]) + suspend fun merge( + @RequestHeader httpHeaders: HttpHeaders, + @RequestBody requestBody: Mono, + ): ResponseEntity<*> = either { + val sub = getSubFromSecurityContext() + + val (parsedEntities, unparsableEntities) = getNgsiLdEntities(requestBody, httpHeaders).bind() + + val (existingEntities, unknownEntities) = entityOperationService.splitEntitiesByExistence(parsedEntities) + + val (existingEntitiesUnauthorized, existingEntitiesAuthorized) = + existingEntities.partition { authorizationService.userCanUpdateEntity(it.entityId(), sub).isLeft() } + + val batchOperationResult = BatchOperationResult().apply { + addEntitiesToErrors(unparsableEntities) + addEntitiesToErrors(unknownEntities.extractNgsiLdEntities(), ENTITY_DOES_NOT_EXIST_MESSAGE) + addEntitiesToErrors(existingEntitiesUnauthorized.extractNgsiLdEntities(), ENTITY_UPDATE_FORBIDDEN_MESSAGE) + } + + if (existingEntitiesAuthorized.isNotEmpty()) { + val mergeOperationResult = + entityOperationService.merge(existingEntitiesAuthorized, sub.getOrNull()) + + publishUpdateEvents(sub.getOrNull(), mergeOperationResult, parsedEntities) + + batchOperationResult.errors.addAll(mergeOperationResult.errors) + batchOperationResult.success.addAll(mergeOperationResult.success) + } + + if (batchOperationResult.errors.isEmpty() && unknownEntities.isEmpty()) + ResponseEntity.status(HttpStatus.NO_CONTENT).build() + else + ResponseEntity.status(HttpStatus.MULTI_STATUS).body(batchOperationResult) + }.fold( + { it.toErrorResponse() }, + { it } + ) + /** * Implements 6.17.3.1 - Delete Batch of Entities */ @@ -401,4 +424,17 @@ class EntityOperationHandler( ) } } + + private suspend fun getNgsiLdEntities( + requestBody: Mono, + httpHeaders: HttpHeaders, + ): Either = either { + val body = requestBody.awaitFirst().deserializeAsList() + .checkNamesAreNgsiLdSupported().bind() + .checkContentIsNgsiLdSupported().bind() + checkBatchRequestBody(body).bind() + checkContentType(httpHeaders, body).bind() + val context = getContextFromLinkHeader(httpHeaders.getOrEmpty(HttpHeaders.LINK)).bind() + return@either expandAndPrepareBatchOfEntities(body, context, httpHeaders.contentType).bind() + } } diff --git a/search-service/src/test/kotlin/com/egm/stellio/search/service/EntityOperationServiceTests.kt b/search-service/src/test/kotlin/com/egm/stellio/search/service/EntityOperationServiceTests.kt index 2c502c5d1..7dd0be3ae 100644 --- a/search-service/src/test/kotlin/com/egm/stellio/search/service/EntityOperationServiceTests.kt +++ b/search-service/src/test/kotlin/com/egm/stellio/search/service/EntityOperationServiceTests.kt @@ -382,4 +382,31 @@ class EntityOperationServiceTests { coVerify { entityPayloadService.deleteEntity(firstEntityURI) } coVerify { entityPayloadService.deleteEntity(secondEntityURI) } } + + @Test + fun `batch merge should ask to merge attributes of entities`() = runTest { + coEvery { + entityPayloadService.mergeEntity(any(), any(), any(), any()) + } returns EMPTY_UPDATE_RESULT.right() + + val batchOperationResult = entityOperationService.merge( + listOf( + firstExpandedEntity to firstEntity, + secondExpandedEntity to secondEntity + ), + sub + ) + + assertEquals( + listOf(firstEntityURI, secondEntityURI), + batchOperationResult.getSuccessfulEntitiesIds() + ) + + coVerify { + entityPayloadService.mergeEntity(eq(firstEntityURI), any(), null, sub) + } + coVerify { + entityPayloadService.mergeEntity(eq(secondEntityURI), any(), null, sub) + } + } }