Skip to content

Commit

Permalink
feat: add support for entityOperation/upsert with update option #21
Browse files Browse the repository at this point in the history
  • Loading branch information
vraybaud authored and vraybaud committed Jun 9, 2020
1 parent e4c31f5 commit 314a1d0
Show file tree
Hide file tree
Showing 14 changed files with 476 additions and 46 deletions.
7 changes: 7 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,22 @@ plugins {
kotlin("plugin.spring") version "1.3.61" apply false
id("org.jlleitschuh.gradle.ktlint") version "8.2.0"
id("com.google.cloud.tools.jib") version "1.6.1" apply false
kotlin("kapt") version "1.3.61" apply false
}

subprojects {
repositories {
mavenCentral()
maven { url = uri("https://repo.spring.io/milestone") }
jcenter()
maven { url = uri("https://dl.bintray.com/arrow-kt/arrow-kt/") }
}

apply(plugin = "io.spring.dependency-management")
apply(plugin = "org.jetbrains.kotlin.jvm")
apply(plugin = "org.jetbrains.kotlin.plugin.spring")
apply(plugin = "org.jlleitschuh.gradle.ktlint")
apply(plugin = "kotlin-kapt")

java.sourceCompatibility = JavaVersion.VERSION_11

Expand Down Expand Up @@ -55,6 +58,10 @@ subprojects {
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("com.github.jsonld-java:jsonld-java:0.13.0")

implementation("io.arrow-kt:arrow-fx:0.10.4")
implementation("io.arrow-kt:arrow-syntax:0.10.4")
"kapt"("io.arrow-kt:arrow-meta:0.10.4")

annotationProcessor("org.springframework.boot:spring-boot-configuration-processor")

runtimeOnly("de.siegmar:logback-gelf:3.0.0")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.egm.stellio.entity.service

import arrow.core.Either
import com.egm.stellio.entity.model.Entity
import com.egm.stellio.entity.repository.EntityRepository
import com.egm.stellio.entity.repository.Neo4jRepository
Expand All @@ -8,8 +9,6 @@ import com.egm.stellio.entity.web.BatchEntityError
import com.egm.stellio.entity.web.BatchOperationResult
import com.egm.stellio.shared.model.BadRequestDataException
import com.egm.stellio.shared.model.ExpandedEntity
import com.egm.stellio.shared.util.NgsiLdParsingUtils.NGSILD_ENTITY_ID
import com.egm.stellio.shared.util.NgsiLdParsingUtils.NGSILD_ENTITY_TYPE
import org.jgrapht.Graph
import org.jgrapht.Graphs
import org.jgrapht.graph.DefaultEdge
Expand Down Expand Up @@ -58,6 +57,87 @@ class EntityOperationService(
return BatchOperationResult(ArrayList(success), ArrayList(errors))
}

/**
* Update a batch of [entities].
* Only entities with relations linked to existing entities will be updated.
*
* @return a [BatchOperationResult] with list of updated ids and list of errors (either not totally updated or
* linked to invalid entity).
*/
fun update(entities: List<ExpandedEntity>, createBatchResult: BatchOperationResult): BatchOperationResult {
val existingEntitiesIds = createBatchResult.success.plus(entities.map { it.id })
val nonExistingEntitiesIds = createBatchResult.errors.map { it.entityId }
return entities.parallelStream().map { entity ->
updateEntity(entity, existingEntitiesIds, nonExistingEntitiesIds)
}.collect(
{ BatchOperationResult() },
{ batchOperationResult, updateResult ->
updateResult.fold({
batchOperationResult.errors.add(it)
}, {
batchOperationResult.success.add(it)
})
},
BatchOperationResult::plusAssign
)
}

private fun updateEntity(
entity: ExpandedEntity,
existingEntitiesIds: List<String>,
nonExistingEntitiesIds: List<String>
): Either<BatchEntityError, String> {
// All new attributes linked entities should be existing in the DB.
val linkedEntitiesIds = entity.getLinkedEntitiesIds()
val invalidLinkedEntityId =
findInvalidEntityId(linkedEntitiesIds, existingEntitiesIds, nonExistingEntitiesIds)

// If there's a link to an invalid entity, then avoid calling the processor and return an error
if (invalidLinkedEntityId != null) {
return Either.left(
BatchEntityError(
entity.id,
arrayListOf("Target entity $invalidLinkedEntityId does not exist.")
)
)
}

return try {
val (_, notUpdated) = entityService.appendEntityAttributes(
entity.id,
entity.attributes,
false
)

if (notUpdated.isEmpty()) {
Either.right(entity.id)
} else {
Either.left(
BatchEntityError(
entity.id,
ArrayList(notUpdated.map { it.attributeName + " : " + it.reason })
)
)
}
} catch (e: BadRequestDataException) {
Either.left(BatchEntityError(entity.id, arrayListOf(e.message)))
}
}

private fun findInvalidEntityId(
entitiesIds: List<String>,
existingEntitiesIds: List<String>,
nonExistingEntitiesIds: List<String>
): String? {
val invalidEntityId = entitiesIds.intersect(nonExistingEntitiesIds).firstOrNull()
if (invalidEntityId == null) {
val unknownEntitiesIds = entitiesIds.minus(existingEntitiesIds)
return unknownEntitiesIds
.minus(neo4jRepository.filterExistingEntitiesIds(unknownEntitiesIds)).firstOrNull()
}
return invalidEntityId
}

private fun createEntitiesWithoutCircularDependencies(graph: Graph<ExpandedEntity, DefaultEdge>): Pair<BatchOperationResult, Set<ExpandedEntity>> {
val batchOperationResult = BatchOperationResult(arrayListOf(), arrayListOf())
val temporaryGraph = DirectedPseudograph<ExpandedEntity, DefaultEdge>(DefaultEdge::class.java)
Expand Down Expand Up @@ -110,9 +190,7 @@ class EntityOperationService(
try {
entityService.appendEntityAttributes(
entity.id,
entity.attributes.filterKeys {
!listOf(NGSILD_ENTITY_ID, NGSILD_ENTITY_TYPE).contains(it)
},
entity.attributes,
false
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ class EntityService(
val entity = entityRepository.save(rawEntity)

// filter the unwanted entries and expand all attributes for easier later processing
val propertiesAndRelationshipsMap = expandedEntity.attributes.filterKeys {
!listOf(NGSILD_ENTITY_ID, NGSILD_ENTITY_TYPE).contains(it)
}.mapValues {
val propertiesAndRelationshipsMap = expandedEntity.attributes.mapValues {
expandValueAsMap(it.value)
}

Expand Down Expand Up @@ -102,8 +100,14 @@ class EntityService(
}

fun publishCreationEvent(expandedEntity: ExpandedEntity) {
val entityType = extractShortTypeFromPayload(expandedEntity.attributes)
val entityEvent = EntityEvent(EventType.CREATE, expandedEntity.id, entityType, getSerializedEntityById(expandedEntity.id), null)
val entityType = extractShortTypeFromPayload(expandedEntity.rawJsonLdProperties)
val entityEvent = EntityEvent(
EventType.CREATE,
expandedEntity.id,
entityType,
getSerializedEntityById(expandedEntity.id),
null
)
applicationEventPublisher.publishEvent(entityEvent)
}

Expand Down Expand Up @@ -336,7 +340,13 @@ class EntityService(
fun getSerializedEntityById(entityId: String): String {
val mapper = jacksonObjectMapper().findAndRegisterModules().disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
val entity = getFullEntityById(entityId)
return mapper.writeValueAsString(JsonLdProcessor.compact(entity.attributes, mapOf("@context" to entity.contexts), JsonLdOptions()))
return mapper.writeValueAsString(
JsonLdProcessor.compact(
entity.rawJsonLdProperties,
mapOf("@context" to entity.contexts),
JsonLdOptions()
)
)
}

fun searchEntities(type: String, query: List<String>, contextLink: String): List<ExpandedEntity> =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ class RepositoryEventsListener(

private fun getEntityById(entityId: String): String {
val entity = entityService.getFullEntityById(entityId)
return mapper.writeValueAsString(JsonLdProcessor.compact(entity.attributes, mapOf("@context" to entity.contexts), JsonLdOptions()))
return mapper.writeValueAsString(
JsonLdProcessor.compact(
entity.rawJsonLdProperties,
mapOf("@context" to entity.contexts),
JsonLdOptions()
)
)
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package com.egm.stellio.entity.web

data class BatchOperationResult(
val success: ArrayList<String>,
val errors: ArrayList<BatchEntityError>
)
val success: ArrayList<String> = arrayListOf(),
val errors: ArrayList<BatchEntityError> = arrayListOf()
) {

operator fun plusAssign(other: BatchOperationResult) {
success.addAll(other.success)
errors.addAll(other.errors)
}
}

data class BatchEntityError(
val entityId: String,
val error: ArrayList<String>
)

)
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,32 @@ class EntityOperationHandler(
}
}

/**
* Implements 6.15.3.1 - Upsert Batch of Entities
*/
@PostMapping("/upsert", consumes = [MediaType.APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE])
fun upsert(@RequestBody body: Mono<String>): Mono<ResponseEntity<*>> {
return body
.map {
extractAndParseBatchOfEntities(it)
}
.map {
val (existingEntities, newEntities) = entityOperationService.splitEntitiesByExistence(it)

val createBatchOperationResult = entityOperationService.create(newEntities)
val updateBatchOperationResult =
entityOperationService.update(existingEntities, createBatchOperationResult)

BatchOperationResult(
ArrayList(createBatchOperationResult.success.plus(updateBatchOperationResult.success)),
ArrayList(createBatchOperationResult.errors.plus(updateBatchOperationResult.errors))
)
}
.map {
ResponseEntity.status(HttpStatus.OK).body(it)
}
}

private fun extractAndParseBatchOfEntities(payload: String): List<ExpandedEntity> {
val extractedEntities = extractEntitiesFromJsonPayload(payload)
return NgsiLdParsingUtils.parseEntities(extractedEntities)
Expand Down
Loading

0 comments on commit 314a1d0

Please sign in to comment.