Skip to content

Commit

Permalink
feat: support entityOperations/upsert replace option #21
Browse files Browse the repository at this point in the history
  • Loading branch information
vraybaud committed Jun 4, 2020
1 parent c011a29 commit 6504c8c
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,38 @@ class Neo4jRepository(
return Pair(queryStatistics.nodesDeleted, queryStatistics.relationshipsDeleted)
}

fun getEntitiesByTypeAndQuery(type: String, query: Pair<List<Triple<String, String, String>>, List<Triple<String, String, String>>>): List<String> {
@Transactional
fun deleteEntityAttributes(entityId: String): Pair<Int, Int> {
/**
* Delete :
*
* 1. the properties
* 2. the properties of properties
* 3. the relationships of properties
* 4. the relationships
* 5. the properties of relationships
* 6. the relationships of relationships
*/
val query = """
MATCH (n:Entity { id: '$entityId' })
OPTIONAL MATCH (n)-[:HAS_VALUE]->(prop)
OPTIONAL MATCH (prop)-[:HAS_OBJECT]->(relOfProp)
OPTIONAL MATCH (prop)-[:HAS_VALUE]->(propOfProp)
OPTIONAL MATCH (n)-[:HAS_OBJECT]->(rel)
OPTIONAL MATCH (rel)-[:HAS_VALUE]->(propOfRel)
OPTIONAL MATCH (rel)-[:HAS_OBJECT]->(relOfRel:Relationship)
DETACH DELETE prop,relOfProp,propOfProp,rel,propOfRel,relOfRel
""".trimIndent()

val queryStatistics = session.query(query, emptyMap<String, Any>()).queryStatistics()
logger.debug("Deleted entity $entityId : deleted ${queryStatistics.nodesDeleted} nodes, ${queryStatistics.relationshipsDeleted} relations")
return Pair(queryStatistics.nodesDeleted, queryStatistics.relationshipsDeleted)
}

fun getEntitiesByTypeAndQuery(
type: String,
query: Pair<List<Triple<String, String, String>>, List<Triple<String, String, String>>>
): List<String> {
val propertiesFilter =
if (query.second.isNotEmpty())
query.second.joinToString(" AND ") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import org.jgrapht.Graphs
import org.jgrapht.graph.DefaultEdge
import org.jgrapht.graph.DirectedPseudograph
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Transactional
import kotlin.streams.toList

/**
Expand Down Expand Up @@ -58,17 +59,36 @@ class EntityOperationService(
}

/**
* Update a batch of [entities].
* Replaces a batch of [entities]
* Only entities with relations linked to existing entities will be replaced.
*
* @return a [BatchOperationResult] with list of replaced ids and list of errors (either not replaced or
* linked to invalid entity).
*/
fun replace(entities: List<ExpandedEntity>, createBatchResult: BatchOperationResult): BatchOperationResult {
return processEntities(entities, createBatchResult, ::replaceEntity)
}

/**
* Updates a batch of [entities].
* Only entities with relations linked to existing entities will be updated.
*
* @return a [BatchOperationResult] with list of updated ids and list of errors (either not totally updated or
* linked to invalid entity).
*/
fun update(entities: List<ExpandedEntity>, createBatchResult: BatchOperationResult): BatchOperationResult {
return processEntities(entities, createBatchResult, ::updateEntity)
}

private fun processEntities(
entities: List<ExpandedEntity>,
createBatchResult: BatchOperationResult,
processor: (ExpandedEntity) -> Either<BatchEntityError, String>
): BatchOperationResult {
val existingEntitiesIds = createBatchResult.success.plus(entities.map { it.id })
val nonExistingEntitiesIds = createBatchResult.errors.map { it.entityId }
return entities.parallelStream().map { entity ->
updateEntity(entity, existingEntitiesIds, nonExistingEntitiesIds)
return entities.parallelStream().map {
processEntity(it, processor, existingEntitiesIds, nonExistingEntitiesIds)
}.collect(
{ BatchOperationResult() },
{ batchOperationResult, updateResult ->
Expand All @@ -82,8 +102,9 @@ class EntityOperationService(
)
}

private fun updateEntity(
private fun processEntity(
entity: ExpandedEntity,
processor: (ExpandedEntity) -> Either<BatchEntityError, String>,
existingEntitiesIds: List<String>,
nonExistingEntitiesIds: List<String>
): Either<BatchEntityError, String> {
Expand All @@ -103,27 +124,42 @@ class EntityOperationService(
}

return try {
val (_, notUpdated) = entityService.appendEntityAttributes(
entity.id,
entity.attributes,
false
)

if (notUpdated.isEmpty()) {
Either.right(entity.id)
} else {
Either.left(
BatchEntityError(
entity.id,
ArrayList(notUpdated.map { it.attributeName + " : " + it.reason })
)
)
}
processor(entity)
} catch (e: BadRequestDataException) {
Either.left(BatchEntityError(entity.id, arrayListOf(e.message)))
}
}

/*
* Transactional because it should not delete entity attributes if new ones could not be appended.
*/
@Transactional(rollbackFor = [BadRequestDataException::class])
@Throws(BadRequestDataException::class)
private fun replaceEntity(entity: ExpandedEntity): Either<BatchEntityError, String> {
neo4jRepository.deleteEntityAttributes(entity.id)
entityService.appendEntityAttributes(entity.id, entity.attributes, false)
return Either.right(entity.id)
}

private fun updateEntity(entity: ExpandedEntity): Either<BatchEntityError, String> {
val (_, notUpdated) = entityService.appendEntityAttributes(
entity.id,
entity.attributes,
false
)

return if (notUpdated.isEmpty()) {
Either.right(entity.id)
} else {
Either.left(
BatchEntityError(
entity.id,
ArrayList(notUpdated.map { it.attributeName + " : " + it.reason })
)
)
}
}

private fun findInvalidEntityId(
entitiesIds: List<String>,
existingEntitiesIds: List<String>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,8 @@ class EntityService(
}

fun updateEntityLastMeasure(observation: Observation) {
val observingEntity = neo4jRepository.getObservingSensorEntity(observation.observedBy, EGM_VENDOR_ID, observation.attributeName)
val observingEntity =
neo4jRepository.getObservingSensorEntity(observation.observedBy, EGM_VENDOR_ID, observation.attributeName)
if (observingEntity == null) {
logger.warn("Unable to find observing entity ${observation.observedBy} for property ${observation.attributeName}")
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import org.springframework.http.HttpStatus
import org.springframework.http.MediaType
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.bind.annotation.*
import reactor.core.publisher.Mono

@RestController
Expand Down Expand Up @@ -50,7 +47,10 @@ class EntityOperationHandler(
* Implements 6.15.3.1 - Upsert Batch of Entities
*/
@PostMapping("/upsert", consumes = [MediaType.APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE])
fun upsert(@RequestBody body: Mono<String>): Mono<ResponseEntity<*>> {
fun upsert(
@RequestBody body: Mono<String>,
@RequestParam(required = false) options: String?
): Mono<ResponseEntity<*>> {
return body
.map {
extractAndParseBatchOfEntities(it)
Expand All @@ -59,8 +59,11 @@ class EntityOperationHandler(
val (existingEntities, newEntities) = entityOperationService.splitEntitiesByExistence(it)

val createBatchOperationResult = entityOperationService.create(newEntities)
val updateBatchOperationResult =
entityOperationService.update(existingEntities, createBatchOperationResult)

val updateBatchOperationResult = when (options) {
"update" -> entityOperationService.update(existingEntities, createBatchOperationResult)
else -> entityOperationService.replace(existingEntities, createBatchOperationResult)
}

BatchOperationResult(
ArrayList(createBatchOperationResult.success.plus(updateBatchOperationResult.success)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,26 @@ class Neo4jRepositoryTests {
neo4jRepository.deleteEntity(entity.id)
}

@Test
fun `it should delete entity attributes`() {
val sensor = createEntity(
"urn:ngsi-ld:Sensor:1233",
listOf("Sensor"),
mutableListOf(Property(name = "name", value = "Scalpa"))
)
val device = createEntity("urn:ngsi-ld:Device:1233", listOf("Device"), mutableListOf())
createRelationship(sensor, EGM_OBSERVED_BY, device.id)

neo4jRepository.deleteEntityAttributes(sensor.id)

val entity = entityRepository.findById(sensor.id).get()
assertEquals(entity.relationships.size, 0)
assertEquals(entity.properties.size, 0)

neo4jRepository.deleteEntity(sensor.id)
neo4jRepository.deleteEntity(device.id)
}

fun createEntity(id: String, type: List<String>, properties: MutableList<Property>): Entity {
val entity = Entity(id = id, type = type, properties = properties)
return entityRepository.save(entity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ class EntityOperationServiceTests {
val batchOperationResult =
entityOperationService.update(listOf(firstEntity, secondEntity), BatchOperationResult())

val a = listOf(1, 2)

assertEquals(listOf("1"), batchOperationResult.success)
assertEquals(
listOf(BatchEntityError("2", arrayListOf("error"))),
Expand Down Expand Up @@ -237,4 +239,51 @@ class EntityOperationServiceTests {
batchOperationResult.errors
)
}

@Test
fun `it should replace entities`() {
val firstEntity = mockkClass(ExpandedEntity::class, relaxed = true)
every { firstEntity.id } returns "1"
every { firstEntity.getLinkedEntitiesIds() } returns listOf()
val secondEntity = mockkClass(ExpandedEntity::class, relaxed = true)
every { secondEntity.id } returns "2"
every { secondEntity.getLinkedEntitiesIds() } returns listOf()

every { neo4jRepository.filterExistingEntitiesIds(listOf()) } returns listOf()
every { neo4jRepository.deleteEntityAttributes("1") } returns mockk()
every { entityService.appendEntityAttributes(eq("1"), any(), any()) } returns mockk()
every { neo4jRepository.deleteEntityAttributes("2") } returns mockk()
every { entityService.appendEntityAttributes(eq("2"), any(), any()) } returns mockk()

val batchOperationResult =
entityOperationService.replace(listOf(firstEntity, secondEntity), BatchOperationResult())

assertEquals(listOf("1", "2"), batchOperationResult.success)
assertTrue(batchOperationResult.errors.isEmpty())
}

@Test
fun `it should count as error entities that couldn't be replaced`() {
val firstEntity = mockkClass(ExpandedEntity::class, relaxed = true)
every { firstEntity.id } returns "1"
every { firstEntity.getLinkedEntitiesIds() } returns listOf()
val secondEntity = mockkClass(ExpandedEntity::class, relaxed = true)
every { secondEntity.id } returns "2"
every { secondEntity.getLinkedEntitiesIds() } returns listOf()

every { neo4jRepository.filterExistingEntitiesIds(listOf()) } returns listOf()
every { neo4jRepository.deleteEntityAttributes("1") } returns mockk()
every { entityService.appendEntityAttributes(eq("1"), any(), any()) } returns mockk()
every { neo4jRepository.deleteEntityAttributes("2") } returns mockk()
every { entityService.appendEntityAttributes(eq("2"), any(), any()) } throws BadRequestDataException("error")

val batchOperationResult =
entityOperationService.replace(listOf(firstEntity, secondEntity), BatchOperationResult())

assertEquals(listOf("1"), batchOperationResult.success)
assertEquals(
listOf(BatchEntityError("2", arrayListOf("error"))),
batchOperationResult.errors
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ class EntityOperationHandlerTests {
arrayListOf()
)
webClient.post()
.uri("/ngsi-ld/v1/entityOperations/upsert")
.uri("/ngsi-ld/v1/entityOperations/upsert?options=update")
.header("Link", "<$aquacContext>; rel=http://www.w3.org/ns/json-ld#context; type=application/ld+json")
.bodyValue(jsonLdFile)
.exchange()
Expand Down Expand Up @@ -254,7 +254,7 @@ class EntityOperationHandlerTests {
)

webClient.post()
.uri("/ngsi-ld/v1/entityOperations/upsert")
.uri("/ngsi-ld/v1/entityOperations/upsert?options=update")
.header("Link", "<$aquacContext>; rel=http://www.w3.org/ns/json-ld#context; type=application/ld+json")
.bodyValue(jsonLdFile)
.exchange()
Expand Down Expand Up @@ -282,6 +282,47 @@ class EntityOperationHandlerTests {
)
}

@Test
fun `upsert batch entity without option should replace entities`() {
val jsonLdFile = ClassPathResource("/ngsild/hcmr/HCMR_test_file_invalid_relation_update.json")
val entitiesIds = arrayListOf(
"urn:ngsi-ld:Sensor:HCMR-AQUABOX1temperature",
"urn:ngsi-ld:Sensor:HCMR-AQUABOX1dissolvedOxygen",
"urn:ngsi-ld:Device:HCMR-AQUABOX1"
)
val existingEntities = mockk<List<ExpandedEntity>>()

every { entityOperationService.splitEntitiesByExistence(any()) } returns Pair(
existingEntities,
listOf()
)
every { entityOperationService.create(any()) } returns BatchOperationResult(
arrayListOf(),
arrayListOf()
)
every { entityOperationService.replace(existingEntities, any()) } returns BatchOperationResult(
entitiesIds,
arrayListOf()
)

webClient.post()
.uri("/ngsi-ld/v1/entityOperations/upsert")
.header("Link", "<$aquacContext>; rel=http://www.w3.org/ns/json-ld#context; type=application/ld+json")
.bodyValue(jsonLdFile)
.exchange()
.expectStatus().isOk
.expectBody().json(
"{\n" +
" \"errors\": [],\n" +
" \"success\": [\n" +
" \"urn:ngsi-ld:Sensor:HCMR-AQUABOX1temperature\",\n" +
" \"urn:ngsi-ld:Sensor:HCMR-AQUABOX1dissolvedOxygen\",\n" +
" \"urn:ngsi-ld:Device:HCMR-AQUABOX1\"\n" +
" ]\n" +
"}"
)
}

@Test
fun `create batch entity should return a 400 if JSON-LD payload is not correct`() {
shouldReturn400WithBadPayload("create")
Expand Down

0 comments on commit 6504c8c

Please sign in to comment.