Skip to content

Commit

Permalink
feat(subscription): add support for notification trigger in subscript…
Browse files Browse the repository at this point in the history
…ions (5.2.12) (#1041)

- remove unused ENTITY_UPDATE event (composite event from attribute events)
- send deleted entity with ENTITY_DELETE events (needed for notification triggers)
- fix: geoQ was not updated if it did not previously exist
- fix: tests using invalid subscriptions as fixture data
- fix: at least one of entities or watchedAttributes should be present
- handle entityUpdated alias / add more triggers matching tests
  • Loading branch information
bobeal authored Nov 17, 2023
1 parent 71a6232 commit 8b3a094
Show file tree
Hide file tree
Showing 34 changed files with 1,136 additions and 808 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.egm.stellio.search.service

import arrow.core.Either
import com.egm.stellio.search.model.EntityPayload
import com.egm.stellio.search.model.UpdateOperationResult
import com.egm.stellio.search.model.UpdateResult
import com.egm.stellio.search.model.UpdatedDetails
Expand Down Expand Up @@ -74,14 +75,22 @@ class EntityEventService(

suspend fun publishEntityDeleteEvent(
sub: String?,
entityId: URI,
entityTypes: List<ExpandedTerm>,
entityPayload: EntityPayload,
contexts: List<String>
): Job {
val tenantUri = getTenantFromContext()
return coroutineScope.launch {
logger.debug("Sending delete event for entity {} in tenant {}", entityId, tenantUri)
publishEntityEvent(EntityDeleteEvent(sub, tenantUri, entityId, entityTypes, contexts))
logger.debug("Sending delete event for entity {} in tenant {}", entityPayload.entityId, tenantUri)
publishEntityEvent(
EntityDeleteEvent(
sub,
tenantUri,
entityPayload.entityId,
entityPayload.types,
entityPayload.payload.asString(),
contexts
)
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,14 +280,13 @@ class EntityHandler(
val sub = getSubFromSecurityContext()

entityPayloadService.checkEntityExistence(entityId).bind()
// Is there a way to avoid loading the entity to get its type and contexts (for the event to be published)?
val entity = entityPayloadService.retrieve(entityId).bind()
authorizationService.userCanAdminEntity(entityId, sub).bind()

val entity = entityPayloadService.retrieve(entityId).bind()
entityPayloadService.deleteEntity(entityId).bind()
authorizationService.removeRightsOnEntity(entityId).bind()

entityEventService.publishEntityDeleteEvent(sub.getOrNull(), entityId, entity.types, entity.contexts)
entityEventService.publishEntityDeleteEvent(sub.getOrNull(), entity, entity.contexts)

ResponseEntity.status(HttpStatus.NO_CONTENT).build<String>()
}.fold(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,15 +225,13 @@ class EntityOperationHandler(
}

if (entitiesUserCanAdmin.isNotEmpty()) {
val deleteOperationResult =
entityOperationService.delete(entitiesUserCanAdmin.map { it.entityId }.toSet())
val deleteOperationResult = entityOperationService.delete(entitiesUserCanAdmin.map { it.entityId }.toSet())

deleteOperationResult.success.map { it.entityId }.forEach { uri ->
val entity = entitiesBeforeDelete.find { it.entityId == uri }!!
entityEventService.publishEntityDeleteEvent(
sub.getOrNull(),
entity.entityId,
entity.types,
entity,
entity.contexts
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,14 @@ class EntityEventServiceTests {

@Test
fun `it should publish an ENTITY_DELETE event`() = runTest {
val entityPayload = mockk<EntityPayload>(relaxed = true) {
every { entityId } returns breedingServiceUri
}
every { kafkaTemplate.send(any(), any(), any()) } returns CompletableFuture()

entityEventService.publishEntityDeleteEvent(
null,
breedingServiceUri,
listOf(breedingServiceType),
entityPayload,
listOf(AQUAC_COMPOUND_CONTEXT)
).join()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2145,7 +2145,7 @@ class EntityHandlerTests {
coEvery { authorizationService.userCanAdminEntity(beehiveId, sub) } returns Unit.right()
coEvery { entityPayloadService.deleteEntity(any()) } returns Unit.right()
coEvery { authorizationService.removeRightsOnEntity(any()) } returns Unit.right()
coEvery { entityEventService.publishEntityDeleteEvent(any(), any(), any(), any()) } returns Job()
coEvery { entityEventService.publishEntityDeleteEvent(any(), any(), any()) } returns Job()

webClient.delete()
.uri("/ngsi-ld/v1/entities/$beehiveId")
Expand All @@ -2163,8 +2163,7 @@ class EntityHandlerTests {
coVerify {
entityEventService.publishEntityDeleteEvent(
eq("60AAEBA3-C0C7-42B6-8CB0-0D30857F210E"),
eq(beehiveId),
eq(listOf(BEEHIVE_TYPE)),
eq(entity),
eq(listOf(APIC_COMPOUND_CONTEXT))
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ class EntityOperationHandlerTests {
every { contexts } returns listOf(AQUAC_COMPOUND_CONTEXT)
}
)
coEvery { entityEventService.publishEntityDeleteEvent(any(), any(), any(), any()) } returns Job()
coEvery { entityEventService.publishEntityDeleteEvent(any(), any(), any()) } returns Job()

webClient.post()
.uri(batchDeleteEndpoint)
Expand All @@ -741,8 +741,7 @@ class EntityOperationHandlerTests {
coVerify(timeout = 1000, exactly = 3) {
entityEventService.publishEntityDeleteEvent(
eq(sub.value),
match { it in allEntitiesUris },
match { it[0] in listOf(SENSOR_TYPE, DEVICE_TYPE) },
any(),
eq(listOf(AQUAC_COMPOUND_CONTEXT))
)
}
Expand Down
2 changes: 1 addition & 1 deletion shared/config/detekt/baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<ID>LongParameterList:NgsiLdEntity.kt$NgsiLdPropertyInstance$( val value: Any, val unitCode: String?, createdAt: ZonedDateTime?, modifiedAt: ZonedDateTime?, observedAt: ZonedDateTime?, datasetId: URI?, properties: List&lt;NgsiLdProperty&gt;, relationships: List&lt;NgsiLdRelationship&gt; )</ID>
<ID>LongParameterList:NgsiLdEntity.kt$NgsiLdRelationshipInstance$( val objectId: URI, createdAt: ZonedDateTime?, modifiedAt: ZonedDateTime?, observedAt: ZonedDateTime?, datasetId: URI?, properties: List&lt;NgsiLdProperty&gt;, relationships: List&lt;NgsiLdRelationship&gt; )</ID>
<ID>NestedBlockDepth:JsonLdUtils.kt$JsonLdUtils$fun getPropertyValueFromMap(value: ExpandedAttributeInstance, propertyKey: String): Any?</ID>
<ID>SpreadOperator:EntityEvent.kt$EntityEvent$( *[ JsonSubTypes.Type(value = EntityCreateEvent::class), JsonSubTypes.Type(value = EntityReplaceEvent::class), JsonSubTypes.Type(value = EntityUpdateEvent::class), JsonSubTypes.Type(value = EntityDeleteEvent::class), JsonSubTypes.Type(value = AttributeAppendEvent::class), JsonSubTypes.Type(value = AttributeReplaceEvent::class), JsonSubTypes.Type(value = AttributeUpdateEvent::class), JsonSubTypes.Type(value = AttributeDeleteEvent::class), JsonSubTypes.Type(value = AttributeDeleteAllInstancesEvent::class) ] )</ID>
<ID>SpreadOperator:EntityEvent.kt$EntityEvent$( *[ JsonSubTypes.Type(value = EntityCreateEvent::class), JsonSubTypes.Type(value = EntityReplaceEvent::class), JsonSubTypes.Type(value = EntityDeleteEvent::class), JsonSubTypes.Type(value = AttributeAppendEvent::class), JsonSubTypes.Type(value = AttributeReplaceEvent::class), JsonSubTypes.Type(value = AttributeUpdateEvent::class), JsonSubTypes.Type(value = AttributeDeleteEvent::class), JsonSubTypes.Type(value = AttributeDeleteAllInstancesEvent::class) ] )</ID>
<ID>SwallowedException:JsonLdUtils.kt$JsonLdUtils$e: JsonLdError</ID>
<ID>TooManyFunctions:JsonLdUtils.kt$JsonLdUtils</ID>
</CurrentIssues>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ data class NotImplementedException(override val message: String) : APIException(
data class LdContextNotAvailableException(override val message: String) : APIException(message)
data class NonexistentTenantException(override val message: String) : APIException(message)

fun Exception.toAPIException(): APIException =
fun Throwable.toAPIException(fallbackMessage: String = ""): APIException =
when (this) {
is JsonLdError ->
if (this.type == JsonLdError.Error.LOADING_REMOTE_CONTEXT_FAILED)
LdContextNotAvailableException("Unable to load remote context (cause was: $this)")
else BadRequestDataException("Unexpected error while parsing payload (cause was: $this)")
else -> BadRequestDataException(this.message ?: "Failed to parse subscription")
else -> BadRequestDataException(this.message ?: fallbackMessage)
}
23 changes: 8 additions & 15 deletions shared/src/main/kotlin/com/egm/stellio/shared/model/EntityEvent.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import java.net.URI
*[
JsonSubTypes.Type(value = EntityCreateEvent::class),
JsonSubTypes.Type(value = EntityReplaceEvent::class),
JsonSubTypes.Type(value = EntityUpdateEvent::class),
JsonSubTypes.Type(value = EntityDeleteEvent::class),
JsonSubTypes.Type(value = AttributeAppendEvent::class),
JsonSubTypes.Type(value = AttributeReplaceEvent::class),
Expand All @@ -22,7 +21,7 @@ import java.net.URI
JsonSubTypes.Type(value = AttributeDeleteAllInstancesEvent::class)
]
)
open class EntityEvent(
sealed class EntityEvent(
val operationType: EventsType,
open val sub: String?,
open val tenantUri: URI = DEFAULT_TENANT_URI,
Expand Down Expand Up @@ -69,25 +68,18 @@ data class EntityReplaceEvent(
override fun getEntity() = this.operationPayload
}

@JsonTypeName("ENTITY_UPDATE")
data class EntityUpdateEvent(
override val sub: String?,
override val tenantUri: URI = DEFAULT_TENANT_URI,
override val entityId: URI,
override val entityTypes: List<ExpandedTerm>,
val operationPayload: String,
val updatedEntity: String,
override val contexts: List<String>
) : EntityEvent(EventsType.ENTITY_UPDATE, sub, tenantUri, entityId, entityTypes, contexts)

@JsonTypeName("ENTITY_DELETE")
data class EntityDeleteEvent(
override val sub: String?,
override val tenantUri: URI = DEFAULT_TENANT_URI,
override val entityId: URI,
override val entityTypes: List<ExpandedTerm>,
// null only when in the case of an IAM event (previous state is not known)
val deletedEntity: String?,
override val contexts: List<String>
) : EntityEvent(EventsType.ENTITY_DELETE, sub, tenantUri, entityId, entityTypes, contexts)
) : EntityEvent(EventsType.ENTITY_DELETE, sub, tenantUri, entityId, entityTypes, contexts) {
override fun getEntity() = this.deletedEntity
}

@JsonTypeName("ATTRIBUTE_APPEND")
data class AttributeAppendEvent(
Expand Down Expand Up @@ -149,6 +141,7 @@ data class AttributeDeleteEvent(
val updatedEntity: String,
override val contexts: List<String>
) : EntityEvent(EventsType.ATTRIBUTE_DELETE, sub, tenantUri, entityId, entityTypes, contexts) {
override fun getEntity() = this.updatedEntity
override fun getAttribute() = this.attributeName
}

Expand All @@ -162,13 +155,13 @@ data class AttributeDeleteAllInstancesEvent(
val updatedEntity: String,
override val contexts: List<String>
) : EntityEvent(EventsType.ATTRIBUTE_DELETE_ALL_INSTANCES, sub, tenantUri, entityId, entityTypes, contexts) {
override fun getEntity() = this.updatedEntity
override fun getAttribute() = this.attributeName
}

enum class EventsType {
ENTITY_CREATE,
ENTITY_REPLACE,
ENTITY_UPDATE,
ENTITY_DELETE,
ATTRIBUTE_APPEND,
ATTRIBUTE_REPLACE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ object JsonLdUtils {
val NGSILD_SYSATTRS_TERMS = setOf(NGSILD_CREATED_AT_TERM, NGSILD_MODIFIED_AT_TERM)
const val NGSILD_CREATED_AT_PROPERTY = "https://uri.etsi.org/ngsi-ld/$NGSILD_CREATED_AT_TERM"
const val NGSILD_MODIFIED_AT_PROPERTY = "https://uri.etsi.org/ngsi-ld/$NGSILD_MODIFIED_AT_TERM"
val NGSILD_SYSATTRS_PROPERTIES = setOf(NGSILD_CREATED_AT_PROPERTY, NGSILD_MODIFIED_AT_PROPERTY)
const val NGSILD_OBSERVED_AT_TERM = "observedAt"
const val NGSILD_OBSERVED_AT_PROPERTY = "https://uri.etsi.org/ngsi-ld/$NGSILD_OBSERVED_AT_TERM"
const val NGSILD_UNIT_CODE_PROPERTY = "https://uri.etsi.org/ngsi-ld/unitCode"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,14 @@ class JsonUtilsTests {
}

@Test
fun `it should serialize an event of type ENTITY_DELETE`() {
fun `it should serialize an event of type ENTITY_DELETE`() = runTest {
val event = mapper.writeValueAsString(
EntityDeleteEvent(
null,
DEFAULT_TENANT_URI,
entityId,
listOf(BEEHIVE_TYPE),
serializeObject(expandJsonLdFragment(entityPayload, listOf(APIC_COMPOUND_CONTEXT))),
listOf(APIC_COMPOUND_CONTEXT)
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"tenantUri": "urn:ngsi-ld:tenant:default",
"entityId" : "urn:ngsi-ld:BeeHive:01",
"entityTypes" : [ "https://ontology.eglobalmark.com/apic#BeeHive" ],
"deletedEntity":"{\"https://ontology.eglobalmark.com/egm#belongs\":[{\"https://uri.etsi.org/ngsi-ld/createdAt\":[{\"@value\":\"2022-02-12T08:36:59.389815Z\",\"@type\":\"https://uri.etsi.org/ngsi-ld/DateTime\"}],\"https://uri.etsi.org/ngsi-ld/hasObject\":[{\"@id\":\"urn:ngsi-ld:Apiary:01\"}],\"@type\":[\"https://uri.etsi.org/ngsi-ld/Relationship\"]}],\"https://uri.etsi.org/ngsi-ld/createdAt\":[{\"@value\":\"2022-02-12T08:36:59.179446Z\",\"@type\":\"https://uri.etsi.org/ngsi-ld/DateTime\"}],\"https://ontology.eglobalmark.com/apic#humidity\":[{\"https://uri.etsi.org/ngsi-ld/createdAt\":[{\"@value\":\"2022-02-12T08:36:59.448205Z\",\"@type\":\"https://uri.etsi.org/ngsi-ld/DateTime\"}],\"https://uri.etsi.org/ngsi-ld/observedAt\":[{\"@value\":\"2019-10-26T21:32:52.986010Z\",\"@type\":\"https://uri.etsi.org/ngsi-ld/DateTime\"}],\"https://ontology.eglobalmark.com/egm#observedBy\":[{\"https://uri.etsi.org/ngsi-ld/createdAt\":[{\"@value\":\"2022-02-12T08:36:59.455870Z\",\"@type\":\"https://uri.etsi.org/ngsi-ld/DateTime\"}],\"https://uri.etsi.org/ngsi-ld/hasObject\":[{\"@id\":\"urn:ngsi-ld:Sensor:02\"}],\"@type\":[\"https://uri.etsi.org/ngsi-ld/Relationship\"]}],\"@type\":[\"https://uri.etsi.org/ngsi-ld/Property\"],\"https://uri.etsi.org/ngsi-ld/unitCode\":[{\"@value\":\"P1\"}],\"https://uri.etsi.org/ngsi-ld/hasValue\":[{\"@value\":60}]}],\"@id\":\"urn:ngsi-ld:BeeHive:01\",\"https://uri.etsi.org/ngsi-ld/location\":[{\"@type\":[\"https://uri.etsi.org/ngsi-ld/GeoProperty\"],\"https://uri.etsi.org/ngsi-ld/hasValue\":[{\"@value\":\"POINT (24.30623 60.07966)\"}]}],\"https://ontology.eglobalmark.com/egm#managedBy\":[{\"https://uri.etsi.org/ngsi-ld/createdAt\":[{\"@value\":\"2022-02-12T08:36:59.417938Z\",\"@type\":\"https://uri.etsi.org/ngsi-ld/DateTime\"}],\"https://uri.etsi.org/ngsi-ld/hasObject\":[{\"@id\":\"urn:ngsi-ld:Beekeeper:01\"}],\"@type\":[\"https://uri.etsi.org/ngsi-ld/Relationship\"]}],\"https://uri.etsi.org/ngsi-ld/modifiedAt\":[{\"@value\":\"2022-02-12T08:36:59.218595Z\",\"@type\":\"https://uri.etsi.org/ngsi-ld/DateTime\"}],\"https://ontology.eglobalmark.com/apic#temperature\":[{\"https://uri.etsi.org/ngsi-ld/createdAt\":[{\"@value\":\"2022-02-12T08:36:59.465937Z\",\"@type\":\"https://uri.etsi.org/ngsi-ld/DateTime\"}],\"https://uri.etsi.org/ngsi-ld/observedAt\":[{\"@value\":\"2019-10-26T21:32:52.986010Z\",\"@type\":\"https://uri.etsi.org/ngsi-ld/DateTime\"}],\"https://ontology.eglobalmark.com/egm#observedBy\":[{\"https://uri.etsi.org/ngsi-ld/createdAt\":[{\"@value\":\"2022-02-12T08:36:59.473904Z\",\"@type\":\"https://uri.etsi.org/ngsi-ld/DateTime\"}],\"https://uri.etsi.org/ngsi-ld/hasObject\":[{\"@id\":\"urn:ngsi-ld:Sensor:01\"}],\"@type\":[\"https://uri.etsi.org/ngsi-ld/Relationship\"]}],\"@type\":[\"https://uri.etsi.org/ngsi-ld/Property\"],\"https://uri.etsi.org/ngsi-ld/unitCode\":[{\"@value\":\"CEL\"}],\"https://uri.etsi.org/ngsi-ld/hasValue\":[{\"@value\":22.2}]}],\"@type\":[\"https://ontology.eglobalmark.com/apic#BeeHive\"]}",
"contexts" : [ "https://raw.githubusercontent.com/easy-global-market/ngsild-api-data-models/master/apic/jsonld-contexts/apic-compound.jsonld" ],
"operationType" : "ENTITY_DELETE"
}
3 changes: 2 additions & 1 deletion subscription-service/config/detekt/baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
<SmellBaseline>
<ManuallySuppressedIssues></ManuallySuppressedIssues>
<CurrentIssues>
<ID>CyclomaticComplexMethod:SubscriptionServiceTests.kt$SubscriptionServiceTests$@Test fun `it should load a subscription with all possible members`()</ID>
<ID>LongMethod:EntityEventListenerService.kt$EntityEventListenerService$internal suspend fun dispatchEntityEvent(content: String)</ID>
<ID>LongParameterList:FixtureUtils.kt$( withQueryAndGeoQuery: Pair&lt;Boolean, Boolean&gt; = Pair(true, true), withEndpointInfo: Boolean = true, withNotifParams: Pair&lt;FormatType, List&lt;String&gt;&gt; = Pair(FormatType.NORMALIZED, emptyList()), withModifiedAt: Boolean = false, georel: String = "within", geometry: String = "Polygon", coordinates: String = "[[[100.0, 0.0], [101.0, 0.0], [101.0, 1.0], [100.0, 1.0], [100.0, 0.0]]]", timeInterval: Int? = null, contexts: List&lt;String&gt; = listOf(NGSILD_CORE_CONTEXT) )</ID>
<ID>TooGenericExceptionCaught:ParsingUtils.kt$ParsingUtils$e: Exception</ID>
<ID>TooGenericExceptionCaught:SubscriptionService.kt$SubscriptionService$e: Exception</ID>
<ID>TooManyFunctions:SubscriptionService.kt$SubscriptionService</ID>
</CurrentIssues>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ class TimeIntervalNotificationJob(
subscription: Subscription,
contextLink: String
): Set<CompactedJsonLdEntity> =
subscription.entities
// if a subscription has a "timeInterval" member defined, it has at least one "entities" member
// because it can't have a "watchedAttributes" member
subscription.entities!!
.map {
getEntities(
tenantUri,
Expand All @@ -85,11 +87,11 @@ class TimeIntervalNotificationJob(
}
.flatten()
.toSet()
.also {
if (it.isNotEmpty())
.also { compactedEntities ->
if (compactedEntities.isNotEmpty())
logger.debug(
"Gonna notify about entities: {} in tenant {}",
it.joinToString { it["id"] as String },
compactedEntities.joinToString { it["id"] as String },
tenantUri
)
}
Expand Down
Loading

0 comments on commit 8b3a094

Please sign in to comment.