Skip to content

Commit

Permalink
Merge pull request #1166 from stellio-hub/batch-merge2
Browse files Browse the repository at this point in the history
Batch Merge
  • Loading branch information
thomasBousselin authored May 24, 2024
2 parents 9886433 + 4edf0a3 commit 5d1177c
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,19 @@ class EntityOperationService(
): BatchOperationResult =
processEntities(entities, disallowOverwrite, sub, ::updateEntity)

private suspend fun processEntities(
/**
* Merge a batch of [entities]
*
* @return a [BatchOperationResult] with list of updated ids and list of errors.
*/
@Transactional
suspend fun merge(
entities: List<JsonLdNgsiLdEntity>,
sub: Sub?
): BatchOperationResult =
processEntities(entities, false, sub, ::mergeEntity)

internal suspend fun processEntities(
entities: List<JsonLdNgsiLdEntity>,
disallowOverwrite: Boolean = false,
sub: Sub?,
Expand Down Expand Up @@ -224,13 +236,28 @@ class EntityOperationService(
entity: JsonLdNgsiLdEntity,
disallowOverwrite: Boolean,
sub: Sub?
): Either<APIException, UpdateResult> = either {
): Either<APIException, UpdateResult> {
val (jsonLdEntity, ngsiLdEntity) = entity
entityPayloadService.appendAttributes(
return entityPayloadService.appendAttributes(
ngsiLdEntity.id,
jsonLdEntity.getModifiableMembers(),
disallowOverwrite,
sub
).bind()
)
}

@SuppressWarnings("UnusedParameter")
suspend fun mergeEntity(
entity: JsonLdNgsiLdEntity,
disallowOverwrite: Boolean,
sub: Sub?
): Either<APIException, UpdateResult> {
val (jsonLdEntity, ngsiLdEntity) = entity
return entityPayloadService.mergeEntity(
ngsiLdEntity.id,
jsonLdEntity.getModifiableMembers(),
null,
sub
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,10 @@ class EntityOperationHandler(
): ResponseEntity<*> = either {
val sub = getSubFromSecurityContext()

val body = requestBody.awaitFirst().deserializeAsList()
.checkNamesAreNgsiLdSupported().bind()
.checkContentIsNgsiLdSupported().bind()
checkBatchRequestBody(body).bind()
checkContentType(httpHeaders, body).bind()
val context = getContextFromLinkHeader(httpHeaders.getOrEmpty(HttpHeaders.LINK)).bind()
val (parsedEntities, unparsableEntities) = getNgsiLdEntities(requestBody, httpHeaders).bind()

val (parsedEntities, unparsableEntities) =
expandAndPrepareBatchOfEntities(body, context, httpHeaders.contentType).bind()
val (uniqueEntities, duplicateEntities) =

entityOperationService.splitEntitiesByUniqueness(parsedEntities)
val (existingEntities, newEntities) = entityOperationService.splitEntitiesByExistence(uniqueEntities)
val (unauthorizedEntities, authorizedEntities) = newEntities.partition {
Expand Down Expand Up @@ -94,15 +88,8 @@ class EntityOperationHandler(
): ResponseEntity<*> = either {
val sub = getSubFromSecurityContext()

val body = requestBody.awaitFirst().deserializeAsList()
.checkNamesAreNgsiLdSupported().bind()
.checkContentIsNgsiLdSupported().bind()
checkBatchRequestBody(body).bind()
checkContentType(httpHeaders, body).bind()
val context = getContextFromLinkHeader(httpHeaders.getOrEmpty(HttpHeaders.LINK)).bind()
val (parsedEntities, unparsableEntities) = getNgsiLdEntities(requestBody, httpHeaders).bind()

val (parsedEntities, unparsableEntities) =
expandAndPrepareBatchOfEntities(body, context, httpHeaders.contentType).bind()
val (existingEntities, newEntities) = entityOperationService.splitEntitiesByExistence(parsedEntities)

val (newUnauthorizedEntities, newAuthorizedEntities) = newEntities.partition {
Expand Down Expand Up @@ -165,18 +152,12 @@ class EntityOperationHandler(
): ResponseEntity<*> = either {
val sub = getSubFromSecurityContext()

val body = requestBody.awaitFirst().deserializeAsList()
.checkNamesAreNgsiLdSupported().bind()
.checkContentIsNgsiLdSupported().bind()
checkBatchRequestBody(body).bind()
checkContentType(httpHeaders, body).bind()
val context = getContextFromLinkHeader(httpHeaders.getOrEmpty(HttpHeaders.LINK)).bind()
val disallowOverwrite = options.map { it == QUERY_PARAM_OPTIONS_NOOVERWRITE_VALUE }.orElse(false)
val (parsedEntities, unparsableEntities) = getNgsiLdEntities(requestBody, httpHeaders).bind()

val (parsedEntities, unparsableEntities) =
expandAndPrepareBatchOfEntities(body, context, httpHeaders.contentType).bind()
val (existingEntities, unknownEntities) = entityOperationService.splitEntitiesByExistence(parsedEntities)

val disallowOverwrite = options.map { it == QUERY_PARAM_OPTIONS_NOOVERWRITE_VALUE }.orElse(false)

val (existingEntitiesUnauthorized, existingEntitiesAuthorized) =
existingEntities.partition { authorizationService.userCanUpdateEntity(it.entityId(), sub).isLeft() }

Expand Down Expand Up @@ -205,6 +186,48 @@ class EntityOperationHandler(
{ it }
)

/**
* Implements (6.31.3.1) - Merge Batch of Entities
*/
@PostMapping("/merge", consumes = [MediaType.APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE])
suspend fun merge(
@RequestHeader httpHeaders: HttpHeaders,
@RequestBody requestBody: Mono<String>,
): ResponseEntity<*> = either {
val sub = getSubFromSecurityContext()

val (parsedEntities, unparsableEntities) = getNgsiLdEntities(requestBody, httpHeaders).bind()

val (existingEntities, unknownEntities) = entityOperationService.splitEntitiesByExistence(parsedEntities)

val (existingEntitiesUnauthorized, existingEntitiesAuthorized) =
existingEntities.partition { authorizationService.userCanUpdateEntity(it.entityId(), sub).isLeft() }

val batchOperationResult = BatchOperationResult().apply {
addEntitiesToErrors(unparsableEntities)
addEntitiesToErrors(unknownEntities.extractNgsiLdEntities(), ENTITY_DOES_NOT_EXIST_MESSAGE)
addEntitiesToErrors(existingEntitiesUnauthorized.extractNgsiLdEntities(), ENTITY_UPDATE_FORBIDDEN_MESSAGE)
}

if (existingEntitiesAuthorized.isNotEmpty()) {
val mergeOperationResult =
entityOperationService.merge(existingEntitiesAuthorized, sub.getOrNull())

publishUpdateEvents(sub.getOrNull(), mergeOperationResult, parsedEntities)

batchOperationResult.errors.addAll(mergeOperationResult.errors)
batchOperationResult.success.addAll(mergeOperationResult.success)
}

if (batchOperationResult.errors.isEmpty() && unknownEntities.isEmpty())
ResponseEntity.status(HttpStatus.NO_CONTENT).build<String>()
else
ResponseEntity.status(HttpStatus.MULTI_STATUS).body(batchOperationResult)
}.fold(
{ it.toErrorResponse() },
{ it }
)

/**
* Implements 6.17.3.1 - Delete Batch of Entities
*/
Expand Down Expand Up @@ -401,4 +424,17 @@ class EntityOperationHandler(
)
}
}

private suspend fun getNgsiLdEntities(
requestBody: Mono<String>,
httpHeaders: HttpHeaders,
): Either<APIException, BatchEntityPreparation> = either {
val body = requestBody.awaitFirst().deserializeAsList()
.checkNamesAreNgsiLdSupported().bind()
.checkContentIsNgsiLdSupported().bind()
checkBatchRequestBody(body).bind()
checkContentType(httpHeaders, body).bind()
val context = getContextFromLinkHeader(httpHeaders.getOrEmpty(HttpHeaders.LINK)).bind()
return@either expandAndPrepareBatchOfEntities(body, context, httpHeaders.contentType).bind()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -382,4 +382,31 @@ class EntityOperationServiceTests {
coVerify { entityPayloadService.deleteEntity(firstEntityURI) }
coVerify { entityPayloadService.deleteEntity(secondEntityURI) }
}

@Test
fun `batch merge should ask to merge attributes of entities`() = runTest {
coEvery {
entityPayloadService.mergeEntity(any(), any(), any(), any())
} returns EMPTY_UPDATE_RESULT.right()

val batchOperationResult = entityOperationService.merge(
listOf(
firstExpandedEntity to firstEntity,
secondExpandedEntity to secondEntity
),
sub
)

assertEquals(
listOf(firstEntityURI, secondEntityURI),
batchOperationResult.getSuccessfulEntitiesIds()
)

coVerify {
entityPayloadService.mergeEntity(eq(firstEntityURI), any(), null, sub)
}
coVerify {
entityPayloadService.mergeEntity(eq(secondEntityURI), any(), null, sub)
}
}
}

0 comments on commit 5d1177c

Please sign in to comment.