Skip to content

Commit

Permalink
feat: add support for entityOperation/upsert with update option #21
Browse files Browse the repository at this point in the history
  • Loading branch information
vraybaud committed May 28, 2020
1 parent 6679178 commit e49c3fa
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import com.egm.stellio.entity.web.BatchEntityError
import com.egm.stellio.entity.web.BatchOperationResult
import com.egm.stellio.shared.model.BadRequestDataException
import com.egm.stellio.shared.model.ExpandedEntity
import com.egm.stellio.shared.util.NgsiLdParsingUtils.NGSILD_ENTITY_ID
import com.egm.stellio.shared.util.NgsiLdParsingUtils.NGSILD_ENTITY_TYPE
import org.jgrapht.Graph
import org.jgrapht.Graphs
import org.jgrapht.graph.DefaultEdge
Expand Down Expand Up @@ -58,6 +56,64 @@ class EntityOperationService(
return BatchOperationResult(ArrayList(success), ArrayList(errors))
}

/**
* Update a batch of [entities].
* Only entities with relations linked to existing entities will be updated.
*
* @return a [BatchOperationResult] with list of updated ids and list of errors (either not totally updated or
* linked to invalid entity).
*/
fun update(entities: List<ExpandedEntity>): BatchOperationResult {
return entities.parallelStream().map { entity ->
updateEntity(entity)
}.collect({ BatchOperationResult(ArrayList(), ArrayList()) },
{ batchOperationResult, (update, error) ->
update?.let { batchOperationResult.success.add(it) }
error?.let { batchOperationResult.errors.add(it) }
},
{ batchResult1, batchResult2 ->
batchResult1 += batchResult2
})
}

private fun updateEntity(entity: ExpandedEntity): Pair<String?, BatchEntityError?> {
// All relationships should target existing entities in DB
val linkedEntitiesIds = entity.getLinkedEntitiesIds()
val nonExistingLinkedEntitiesIds = linkedEntitiesIds
.minus(neo4jRepository.filterExistingEntitiesIds(linkedEntitiesIds))

if (nonExistingLinkedEntitiesIds.isNotEmpty()) {
return Pair(
null, BatchEntityError(
entity.id,
arrayListOf("Target entities $nonExistingLinkedEntitiesIds does not exist.")
)
)
}

return try {
val (_, notUpdated) = entityService.appendEntityAttributes(
entity.id,
entity.attributesWithoutTypeAndId,
false
)

if (notUpdated.isEmpty()) {
Pair(entity.id, null)
} else {
Pair(
null,
BatchEntityError(
entity.id,
ArrayList(notUpdated.map { it.attributeName + " : " + it.reason })
)
)
}
} catch (e: BadRequestDataException) {
Pair(null, BatchEntityError(entity.id, arrayListOf(e.message)))
}
}

private fun createEntitiesWithoutCircularDependencies(graph: Graph<ExpandedEntity, DefaultEdge>): Pair<BatchOperationResult, Set<ExpandedEntity>> {
val batchOperationResult = BatchOperationResult(arrayListOf(), arrayListOf())
val temporaryGraph = DirectedPseudograph<ExpandedEntity, DefaultEdge>(DefaultEdge::class.java)
Expand Down Expand Up @@ -110,9 +166,7 @@ class EntityOperationService(
try {
entityService.appendEntityAttributes(
entity.id,
entity.attributes.filterKeys {
!listOf(NGSILD_ENTITY_ID, NGSILD_ENTITY_TYPE).contains(it)
},
entity.attributesWithoutTypeAndId,
false
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ class EntityService(
val entity = entityRepository.save(rawEntity)

// filter the unwanted entries and expand all attributes for easier later processing
val propertiesAndRelationshipsMap = expandedEntity.attributes.filterKeys {
!listOf(NGSILD_ENTITY_ID, NGSILD_ENTITY_TYPE).contains(it)
}.mapValues {
val propertiesAndRelationshipsMap = expandedEntity.attributesWithoutTypeAndId.mapValues {
expandValueAsMap(it.value)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,16 @@ package com.egm.stellio.entity.web
data class BatchOperationResult(
val success: ArrayList<String>,
val errors: ArrayList<BatchEntityError>
)
) {

operator fun plusAssign(other: BatchOperationResult) {
success.addAll(other.success)
errors.addAll(other.errors)
}
}

data class BatchEntityError(
val entityId: String,
val error: ArrayList<String>

)
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,28 @@ class EntityOperationHandler(
}
}

@PostMapping("/upsert", consumes = [MediaType.APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE])
fun upsert(@RequestBody body: Mono<String>): Mono<ResponseEntity<*>> {
return body
.map {
extractAndParseBatchOfEntities(it)
}
.map {
val (existingEntities, newEntities) = entityOperationService.splitEntitiesByExistence(it)

val createBatchOperationResult = entityOperationService.create(newEntities)
val updateBatchOperationResult = entityOperationService.update(existingEntities)

BatchOperationResult(
ArrayList(createBatchOperationResult.success.plus(updateBatchOperationResult.success)),
ArrayList(createBatchOperationResult.errors.plus(updateBatchOperationResult.errors))
)
}
.map {
ResponseEntity.status(HttpStatus.OK).body(it)
}
}

private fun extractAndParseBatchOfEntities(payload: String): List<ExpandedEntity> {
val extractedEntities = extractEntitiesFromJsonPayload(payload)
return NgsiLdParsingUtils.parseEntities(extractedEntities)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.egm.stellio.entity.service

import com.egm.stellio.entity.model.Entity
import com.egm.stellio.entity.model.NotUpdatedDetails
import com.egm.stellio.entity.model.UpdateResult
import com.egm.stellio.entity.repository.EntityRepository
import com.egm.stellio.entity.repository.Neo4jRepository
Expand Down Expand Up @@ -121,4 +122,76 @@ class EntityOperationServiceTests {
assertEquals(arrayListOf("1", "2"), batchOperationResult.success)
assertTrue(batchOperationResult.errors.isEmpty())
}

@Test
fun `it should not update entities with relationships to invalid entity`() {
val firstEntity = mockkClass(ExpandedEntity::class, relaxed = true)
every { firstEntity.id } returns "1"
every { firstEntity.getLinkedEntitiesIds() } returns listOf()
val secondEntity = mockkClass(ExpandedEntity::class, relaxed = true)
every { secondEntity.id } returns "2"
every { secondEntity.getLinkedEntitiesIds() } returns listOf("3")

every { neo4jRepository.filterExistingEntitiesIds(listOf()) } returns listOf()
every { neo4jRepository.filterExistingEntitiesIds(listOf("3")) } returns listOf()
every { entityService.appendEntityAttributes(eq("1"), any(), any()) } returns UpdateResult(listOf(), listOf())

val batchOperationResult = entityOperationService.update(listOf(firstEntity, secondEntity))

assertEquals(listOf("1"), batchOperationResult.success)
assertEquals(
listOf(BatchEntityError("2", arrayListOf("Target entities [3] does not exist."))),
batchOperationResult.errors
)
}

@Test
fun `it should count as error updating which results in BadRequestDataException`() {
val firstEntity = mockkClass(ExpandedEntity::class, relaxed = true)
every { firstEntity.id } returns "1"
every { firstEntity.getLinkedEntitiesIds() } returns listOf()
val secondEntity = mockkClass(ExpandedEntity::class, relaxed = true)
every { secondEntity.id } returns "2"
every { secondEntity.getLinkedEntitiesIds() } returns listOf()

every { neo4jRepository.filterExistingEntitiesIds(listOf()) } returns listOf()
every { entityService.appendEntityAttributes(eq("1"), any(), any()) } returns UpdateResult(listOf(), listOf())
every { entityService.appendEntityAttributes(eq("2"), any(), any()) } throws BadRequestDataException("error")

val batchOperationResult = entityOperationService.update(listOf(firstEntity, secondEntity))

assertEquals(listOf("1"), batchOperationResult.success)
assertEquals(
listOf(BatchEntityError("2", arrayListOf("error"))),
batchOperationResult.errors
)
}

@Test
fun `it should count as error not updated attributes in entities`() {
val firstEntity = mockkClass(ExpandedEntity::class, relaxed = true)
every { firstEntity.id } returns "1"
every { firstEntity.getLinkedEntitiesIds() } returns listOf()
val secondEntity = mockkClass(ExpandedEntity::class, relaxed = true)
every { secondEntity.id } returns "2"
every { secondEntity.getLinkedEntitiesIds() } returns listOf()

every { neo4jRepository.filterExistingEntitiesIds(listOf()) } returns listOf()
every { entityService.appendEntityAttributes(eq("1"), any(), any()) } returns UpdateResult(listOf(), listOf())
every { entityService.appendEntityAttributes(eq("2"), any(), any()) } returns UpdateResult(
listOf(),
listOf(
NotUpdatedDetails("attribute#1", "reason"),
NotUpdatedDetails("attribute#2", "reason")
)
)

val batchOperationResult = entityOperationService.update(listOf(firstEntity, secondEntity))

assertEquals(listOf("1"), batchOperationResult.success)
assertEquals(
listOf(BatchEntityError("2", arrayListOf("attribute#1 : reason", "attribute#2 : reason"))),
batchOperationResult.errors
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,104 @@ class EntityOperationHandlerTests {
@Autowired
private lateinit var webClient: WebTestClient

@MockkBean(relaxed = true)
@MockkBean
private lateinit var entityOperationService: EntityOperationService

@Test
fun `upsert batch entity should return a 200 if JSON-LD payload contains update errors`() {
val jsonLdFile = ClassPathResource("/ngsild/hcmr/HCMR_test_file_invalid_relation_update.json")
val errors = arrayListOf(
BatchEntityError(
"urn:ngsi-ld:Sensor:HCMR-AQUABOX1temperature",
arrayListOf("Target entity urn:ngsi-ld:Device:HCMR-AQUABOX2 does not exist.")
),
BatchEntityError(
"urn:ngsi-ld:Sensor:HCMR-AQUABOX1dissolvedOxygen",
arrayListOf("Target entity urn:ngsi-ld:Device:HCMR-AQUABOX2 does not exist.")
)
)

every { entityOperationService.splitEntitiesByExistence(any()) } returns Pair(
listOf(),
listOf()
)
every { entityOperationService.create(any()) } returns BatchOperationResult(
arrayListOf(),
arrayListOf()
)
every { entityOperationService.update(any()) } returns BatchOperationResult(
arrayListOf(),
errors
)

webClient.post()
.uri("/ngsi-ld/v1/entityOperations/upsert")
.header("Link", "<$aquacContext>; rel=http://www.w3.org/ns/json-ld#context; type=application/ld+json")
.bodyValue(jsonLdFile)
.exchange()
.expectStatus().isOk
.expectBody().json(
"{\n" +
" \"errors\": [" +
" {\n" +
" \"entityId\": \"urn:ngsi-ld:Sensor:HCMR-AQUABOX1temperature\",\n" +
" \"error\": [\n" +
" \"Target entity urn:ngsi-ld:Device:HCMR-AQUABOX2 does not exist.\"\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"entityId\": \"urn:ngsi-ld:Sensor:HCMR-AQUABOX1dissolvedOxygen\",\n" +
" \"error\": [\n" +
" \"Target entity urn:ngsi-ld:Device:HCMR-AQUABOX2 does not exist.\"\n" +
" ]\n" +
" }],\n" +
" \"success\": []\n" +
"}"
)
}

@Test
fun `upsert batch entity should return a 200 if JSON-LD payload is correct`() {
val jsonLdFile = ClassPathResource("/ngsild/hcmr/HCMR_test_file.json")
val entitiesIds = arrayListOf(
"urn:ngsi-ld:Sensor:HCMR-AQUABOX1temperature",
"urn:ngsi-ld:Sensor:HCMR-AQUABOX1dissolvedOxygen",
"urn:ngsi-ld:Device:HCMR-AQUABOX1"
)

val existingEntities = mockk<List<ExpandedEntity>>()
val nonExistingEntities = mockk<List<ExpandedEntity>>()

every { entityOperationService.splitEntitiesByExistence(any()) } returns Pair(
existingEntities,
nonExistingEntities
)
every { entityOperationService.create(nonExistingEntities) } returns BatchOperationResult(
arrayListOf(),
arrayListOf()
)
every { entityOperationService.update(existingEntities) } returns BatchOperationResult(
entitiesIds,
arrayListOf()
)
webClient.post()
.uri("/ngsi-ld/v1/entityOperations/upsert")
.header("Link", "<$aquacContext>; rel=http://www.w3.org/ns/json-ld#context; type=application/ld+json")
.bodyValue(jsonLdFile)
.exchange()
.expectStatus().isOk
.expectBody().json(
"{\n" +
" \"errors\": [],\n" +
" \"success\": [\n" +
" \"urn:ngsi-ld:Sensor:HCMR-AQUABOX1temperature\",\n" +
" \"urn:ngsi-ld:Sensor:HCMR-AQUABOX1dissolvedOxygen\",\n" +
" \"urn:ngsi-ld:Device:HCMR-AQUABOX1\"\n" +
" ]\n" +
"}"
)
}

@Test
fun `create batch entity should return a 200 if JSON-LD payload is correct`() {
val jsonLdFile = ClassPathResource("/ngsild/hcmr/HCMR_test_file.json")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
[
{
"id": "urn:ngsi-ld:Sensor:HCMR-AQUABOX1temperature",
"type": "Sensor",
"deviceParameter": {
"type": "Property",
"value": "temperature"
},
"connectsTo": {
"type": "Relationship",
"object": "urn:ngsi-ld:Device:HCMR-AQUABOX2"
},
"@context": [
"https://raw.githubusercontent.com/easy-global-market/ngsild-api-data-models/master/shared-jsonld-contexts/egm.jsonld",
"https://raw.githubusercontent.com/easy-global-market/ngsild-api-data-models/master/aquac/jsonld-contexts/aquac.jsonld",
"http://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld"
]
},
{
"id": "urn:ngsi-ld:Sensor:HCMR-AQUABOX1dissolvedOxygen",
"type": "Sensor",
"deviceParameter": {
"type": "Property",
"value": "dissolvedOxygen"
},
"connectsTo": {
"type": "Relationship",
"object": "urn:ngsi-ld:Device:HCMR-AQUABOX2"
},
"@context": [
"https://raw.githubusercontent.com/easy-global-market/ngsild-api-data-models/master/shared-jsonld-contexts/egm.jsonld",
"https://raw.githubusercontent.com/easy-global-market/ngsild-api-data-models/master/aquac/jsonld-contexts/aquac.jsonld",
"http://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld"
]
}
]
Loading

0 comments on commit e49c3fa

Please sign in to comment.