From 8b3a094b338311c0b9e31cfc8dc3d7698bc5f235 Mon Sep 17 00:00:00 2001 From: Benoit Orihuela Date: Fri, 17 Nov 2023 09:26:07 +0100 Subject: [PATCH] feat(subscription): add support for notification trigger in subscriptions (5.2.12) (#1041) - remove unused ENTITY_UPDATE event (composite event from attribute events) - send deleted entity with ENTITY_DELETE events (needed for notification triggers) - fix: geoQ was not updated if it did not previously exist - fix: tests using invalid subscriptions as fixture data - fix: at least one of entities or watchedAttributes should be present - handle entityUpdated alias / add more triggers matching tests --- .../search/service/EntityEventService.kt | 17 +- .../egm/stellio/search/web/EntityHandler.kt | 5 +- .../search/web/EntityOperationHandler.kt | 6 +- .../search/service/EntityEventServiceTests.kt | 6 +- .../stellio/search/web/EntityHandlerTests.kt | 5 +- .../search/web/EntityOperationHandlerTests.kt | 5 +- shared/config/detekt/baseline.xml | 2 +- .../egm/stellio/shared/model/APiExceptions.kt | 4 +- .../egm/stellio/shared/model/EntityEvent.kt | 23 +- .../egm/stellio/shared/util/JsonLdUtils.kt | 1 + .../egm/stellio/shared/util/JsonUtilsTests.kt | 3 +- .../events/entity/entityDeleteEvent.json | 1 + .../config/detekt/baseline.xml | 3 +- .../job/TimeIntervalNotificationJob.kt | 10 +- .../listener/EntityEventListenerService.kt | 57 +- .../subscription/model/Subscription.kt | 39 +- .../service/NotificationService.kt | 6 +- .../service/SubscriptionService.kt | 274 +++-- .../subscription/utils/ParsingUtils.kt | 22 +- ...V0_21__add_notification_trigger_column.sql | 5 + ..._add_subscription_unicity_in_geo_query.sql | 1 + .../EntityEventListenerServiceTests.kt | 41 +- .../service/NotificationServiceTests.kt | 186 +-- .../service/SubscriptionServiceTests.kt | 1046 +++++++++-------- .../subscription/utils/FixtureUtils.kt | 23 + .../web/SubscriptionHandlerTests.kt | 28 +- ...{subscription.json => subscription.jsonld} | 0 .../resources/ngsild/subscription_full.json | 39 + .../ngsild/subscription_minimal_entities.json | 14 + ...bscription_minimal_watched_attributes.json | 10 + .../resources/ngsild/subscription_update.json | 7 +- ...ayload.json => subscription_update.jsonld} | 5 +- ...icting_timeInterval_watchedAttributes.json | 18 +- ...iption_with_time_interval_less_than_0.json | 32 - 34 files changed, 1136 insertions(+), 808 deletions(-) create mode 100644 subscription-service/src/main/resources/db/migration/V0_21__add_notification_trigger_column.sql create mode 100644 subscription-service/src/main/resources/db/migration/V0_22__add_subscription_unicity_in_geo_query.sql rename subscription-service/src/test/resources/ngsild/{subscription.json => subscription.jsonld} (100%) create mode 100644 subscription-service/src/test/resources/ngsild/subscription_full.json create mode 100644 subscription-service/src/test/resources/ngsild/subscription_minimal_entities.json create mode 100644 subscription-service/src/test/resources/ngsild/subscription_minimal_watched_attributes.json rename subscription-service/src/test/resources/ngsild/{subscription_update_incorrect_payload.json => subscription_update.jsonld} (72%) delete mode 100644 subscription-service/src/test/resources/ngsild/subscription_with_time_interval_less_than_0.json diff --git a/search-service/src/main/kotlin/com/egm/stellio/search/service/EntityEventService.kt b/search-service/src/main/kotlin/com/egm/stellio/search/service/EntityEventService.kt index 113d1b1c2..2fa67b034 100644 --- a/search-service/src/main/kotlin/com/egm/stellio/search/service/EntityEventService.kt +++ b/search-service/src/main/kotlin/com/egm/stellio/search/service/EntityEventService.kt @@ -1,6 +1,7 @@ package com.egm.stellio.search.service import arrow.core.Either +import com.egm.stellio.search.model.EntityPayload import com.egm.stellio.search.model.UpdateOperationResult import com.egm.stellio.search.model.UpdateResult import com.egm.stellio.search.model.UpdatedDetails @@ -74,14 +75,22 @@ class EntityEventService( suspend fun publishEntityDeleteEvent( sub: String?, - entityId: URI, - entityTypes: List, + entityPayload: EntityPayload, contexts: List ): Job { val tenantUri = getTenantFromContext() return coroutineScope.launch { - logger.debug("Sending delete event for entity {} in tenant {}", entityId, tenantUri) - publishEntityEvent(EntityDeleteEvent(sub, tenantUri, entityId, entityTypes, contexts)) + logger.debug("Sending delete event for entity {} in tenant {}", entityPayload.entityId, tenantUri) + publishEntityEvent( + EntityDeleteEvent( + sub, + tenantUri, + entityPayload.entityId, + entityPayload.types, + entityPayload.payload.asString(), + contexts + ) + ) } } diff --git a/search-service/src/main/kotlin/com/egm/stellio/search/web/EntityHandler.kt b/search-service/src/main/kotlin/com/egm/stellio/search/web/EntityHandler.kt index e284e78c1..a09e2a002 100644 --- a/search-service/src/main/kotlin/com/egm/stellio/search/web/EntityHandler.kt +++ b/search-service/src/main/kotlin/com/egm/stellio/search/web/EntityHandler.kt @@ -280,14 +280,13 @@ class EntityHandler( val sub = getSubFromSecurityContext() entityPayloadService.checkEntityExistence(entityId).bind() - // Is there a way to avoid loading the entity to get its type and contexts (for the event to be published)? - val entity = entityPayloadService.retrieve(entityId).bind() authorizationService.userCanAdminEntity(entityId, sub).bind() + val entity = entityPayloadService.retrieve(entityId).bind() entityPayloadService.deleteEntity(entityId).bind() authorizationService.removeRightsOnEntity(entityId).bind() - entityEventService.publishEntityDeleteEvent(sub.getOrNull(), entityId, entity.types, entity.contexts) + entityEventService.publishEntityDeleteEvent(sub.getOrNull(), entity, entity.contexts) ResponseEntity.status(HttpStatus.NO_CONTENT).build() }.fold( diff --git a/search-service/src/main/kotlin/com/egm/stellio/search/web/EntityOperationHandler.kt b/search-service/src/main/kotlin/com/egm/stellio/search/web/EntityOperationHandler.kt index 47c2d2cd1..a8ce1a87d 100644 --- a/search-service/src/main/kotlin/com/egm/stellio/search/web/EntityOperationHandler.kt +++ b/search-service/src/main/kotlin/com/egm/stellio/search/web/EntityOperationHandler.kt @@ -225,15 +225,13 @@ class EntityOperationHandler( } if (entitiesUserCanAdmin.isNotEmpty()) { - val deleteOperationResult = - entityOperationService.delete(entitiesUserCanAdmin.map { it.entityId }.toSet()) + val deleteOperationResult = entityOperationService.delete(entitiesUserCanAdmin.map { it.entityId }.toSet()) deleteOperationResult.success.map { it.entityId }.forEach { uri -> val entity = entitiesBeforeDelete.find { it.entityId == uri }!! entityEventService.publishEntityDeleteEvent( sub.getOrNull(), - entity.entityId, - entity.types, + entity, entity.contexts ) } diff --git a/search-service/src/test/kotlin/com/egm/stellio/search/service/EntityEventServiceTests.kt b/search-service/src/test/kotlin/com/egm/stellio/search/service/EntityEventServiceTests.kt index 7217f49a7..061c8a34e 100644 --- a/search-service/src/test/kotlin/com/egm/stellio/search/service/EntityEventServiceTests.kt +++ b/search-service/src/test/kotlin/com/egm/stellio/search/service/EntityEventServiceTests.kt @@ -117,12 +117,14 @@ class EntityEventServiceTests { @Test fun `it should publish an ENTITY_DELETE event`() = runTest { + val entityPayload = mockk(relaxed = true) { + every { entityId } returns breedingServiceUri + } every { kafkaTemplate.send(any(), any(), any()) } returns CompletableFuture() entityEventService.publishEntityDeleteEvent( null, - breedingServiceUri, - listOf(breedingServiceType), + entityPayload, listOf(AQUAC_COMPOUND_CONTEXT) ).join() diff --git a/search-service/src/test/kotlin/com/egm/stellio/search/web/EntityHandlerTests.kt b/search-service/src/test/kotlin/com/egm/stellio/search/web/EntityHandlerTests.kt index d20856cbc..dd0c1273f 100644 --- a/search-service/src/test/kotlin/com/egm/stellio/search/web/EntityHandlerTests.kt +++ b/search-service/src/test/kotlin/com/egm/stellio/search/web/EntityHandlerTests.kt @@ -2145,7 +2145,7 @@ class EntityHandlerTests { coEvery { authorizationService.userCanAdminEntity(beehiveId, sub) } returns Unit.right() coEvery { entityPayloadService.deleteEntity(any()) } returns Unit.right() coEvery { authorizationService.removeRightsOnEntity(any()) } returns Unit.right() - coEvery { entityEventService.publishEntityDeleteEvent(any(), any(), any(), any()) } returns Job() + coEvery { entityEventService.publishEntityDeleteEvent(any(), any(), any()) } returns Job() webClient.delete() .uri("/ngsi-ld/v1/entities/$beehiveId") @@ -2163,8 +2163,7 @@ class EntityHandlerTests { coVerify { entityEventService.publishEntityDeleteEvent( eq("60AAEBA3-C0C7-42B6-8CB0-0D30857F210E"), - eq(beehiveId), - eq(listOf(BEEHIVE_TYPE)), + eq(entity), eq(listOf(APIC_COMPOUND_CONTEXT)) ) } diff --git a/search-service/src/test/kotlin/com/egm/stellio/search/web/EntityOperationHandlerTests.kt b/search-service/src/test/kotlin/com/egm/stellio/search/web/EntityOperationHandlerTests.kt index ffbf3e46e..d27243928 100644 --- a/search-service/src/test/kotlin/com/egm/stellio/search/web/EntityOperationHandlerTests.kt +++ b/search-service/src/test/kotlin/com/egm/stellio/search/web/EntityOperationHandlerTests.kt @@ -730,7 +730,7 @@ class EntityOperationHandlerTests { every { contexts } returns listOf(AQUAC_COMPOUND_CONTEXT) } ) - coEvery { entityEventService.publishEntityDeleteEvent(any(), any(), any(), any()) } returns Job() + coEvery { entityEventService.publishEntityDeleteEvent(any(), any(), any()) } returns Job() webClient.post() .uri(batchDeleteEndpoint) @@ -741,8 +741,7 @@ class EntityOperationHandlerTests { coVerify(timeout = 1000, exactly = 3) { entityEventService.publishEntityDeleteEvent( eq(sub.value), - match { it in allEntitiesUris }, - match { it[0] in listOf(SENSOR_TYPE, DEVICE_TYPE) }, + any(), eq(listOf(AQUAC_COMPOUND_CONTEXT)) ) } diff --git a/shared/config/detekt/baseline.xml b/shared/config/detekt/baseline.xml index 3f7bd94e3..2faea8f5c 100644 --- a/shared/config/detekt/baseline.xml +++ b/shared/config/detekt/baseline.xml @@ -11,7 +11,7 @@ LongParameterList:NgsiLdEntity.kt$NgsiLdPropertyInstance$( val value: Any, val unitCode: String?, createdAt: ZonedDateTime?, modifiedAt: ZonedDateTime?, observedAt: ZonedDateTime?, datasetId: URI?, properties: List<NgsiLdProperty>, relationships: List<NgsiLdRelationship> ) LongParameterList:NgsiLdEntity.kt$NgsiLdRelationshipInstance$( val objectId: URI, createdAt: ZonedDateTime?, modifiedAt: ZonedDateTime?, observedAt: ZonedDateTime?, datasetId: URI?, properties: List<NgsiLdProperty>, relationships: List<NgsiLdRelationship> ) NestedBlockDepth:JsonLdUtils.kt$JsonLdUtils$fun getPropertyValueFromMap(value: ExpandedAttributeInstance, propertyKey: String): Any? - SpreadOperator:EntityEvent.kt$EntityEvent$( *[ JsonSubTypes.Type(value = EntityCreateEvent::class), JsonSubTypes.Type(value = EntityReplaceEvent::class), JsonSubTypes.Type(value = EntityUpdateEvent::class), JsonSubTypes.Type(value = EntityDeleteEvent::class), JsonSubTypes.Type(value = AttributeAppendEvent::class), JsonSubTypes.Type(value = AttributeReplaceEvent::class), JsonSubTypes.Type(value = AttributeUpdateEvent::class), JsonSubTypes.Type(value = AttributeDeleteEvent::class), JsonSubTypes.Type(value = AttributeDeleteAllInstancesEvent::class) ] ) + SpreadOperator:EntityEvent.kt$EntityEvent$( *[ JsonSubTypes.Type(value = EntityCreateEvent::class), JsonSubTypes.Type(value = EntityReplaceEvent::class), JsonSubTypes.Type(value = EntityDeleteEvent::class), JsonSubTypes.Type(value = AttributeAppendEvent::class), JsonSubTypes.Type(value = AttributeReplaceEvent::class), JsonSubTypes.Type(value = AttributeUpdateEvent::class), JsonSubTypes.Type(value = AttributeDeleteEvent::class), JsonSubTypes.Type(value = AttributeDeleteAllInstancesEvent::class) ] ) SwallowedException:JsonLdUtils.kt$JsonLdUtils$e: JsonLdError TooManyFunctions:JsonLdUtils.kt$JsonLdUtils diff --git a/shared/src/main/kotlin/com/egm/stellio/shared/model/APiExceptions.kt b/shared/src/main/kotlin/com/egm/stellio/shared/model/APiExceptions.kt index a5c0d63a4..9816662d2 100644 --- a/shared/src/main/kotlin/com/egm/stellio/shared/model/APiExceptions.kt +++ b/shared/src/main/kotlin/com/egm/stellio/shared/model/APiExceptions.kt @@ -19,11 +19,11 @@ data class NotImplementedException(override val message: String) : APIException( data class LdContextNotAvailableException(override val message: String) : APIException(message) data class NonexistentTenantException(override val message: String) : APIException(message) -fun Exception.toAPIException(): APIException = +fun Throwable.toAPIException(fallbackMessage: String = ""): APIException = when (this) { is JsonLdError -> if (this.type == JsonLdError.Error.LOADING_REMOTE_CONTEXT_FAILED) LdContextNotAvailableException("Unable to load remote context (cause was: $this)") else BadRequestDataException("Unexpected error while parsing payload (cause was: $this)") - else -> BadRequestDataException(this.message ?: "Failed to parse subscription") + else -> BadRequestDataException(this.message ?: fallbackMessage) } diff --git a/shared/src/main/kotlin/com/egm/stellio/shared/model/EntityEvent.kt b/shared/src/main/kotlin/com/egm/stellio/shared/model/EntityEvent.kt index d62c4e57f..a618fa22f 100644 --- a/shared/src/main/kotlin/com/egm/stellio/shared/model/EntityEvent.kt +++ b/shared/src/main/kotlin/com/egm/stellio/shared/model/EntityEvent.kt @@ -13,7 +13,6 @@ import java.net.URI *[ JsonSubTypes.Type(value = EntityCreateEvent::class), JsonSubTypes.Type(value = EntityReplaceEvent::class), - JsonSubTypes.Type(value = EntityUpdateEvent::class), JsonSubTypes.Type(value = EntityDeleteEvent::class), JsonSubTypes.Type(value = AttributeAppendEvent::class), JsonSubTypes.Type(value = AttributeReplaceEvent::class), @@ -22,7 +21,7 @@ import java.net.URI JsonSubTypes.Type(value = AttributeDeleteAllInstancesEvent::class) ] ) -open class EntityEvent( +sealed class EntityEvent( val operationType: EventsType, open val sub: String?, open val tenantUri: URI = DEFAULT_TENANT_URI, @@ -69,25 +68,18 @@ data class EntityReplaceEvent( override fun getEntity() = this.operationPayload } -@JsonTypeName("ENTITY_UPDATE") -data class EntityUpdateEvent( - override val sub: String?, - override val tenantUri: URI = DEFAULT_TENANT_URI, - override val entityId: URI, - override val entityTypes: List, - val operationPayload: String, - val updatedEntity: String, - override val contexts: List -) : EntityEvent(EventsType.ENTITY_UPDATE, sub, tenantUri, entityId, entityTypes, contexts) - @JsonTypeName("ENTITY_DELETE") data class EntityDeleteEvent( override val sub: String?, override val tenantUri: URI = DEFAULT_TENANT_URI, override val entityId: URI, override val entityTypes: List, + // null only when in the case of an IAM event (previous state is not known) + val deletedEntity: String?, override val contexts: List -) : EntityEvent(EventsType.ENTITY_DELETE, sub, tenantUri, entityId, entityTypes, contexts) +) : EntityEvent(EventsType.ENTITY_DELETE, sub, tenantUri, entityId, entityTypes, contexts) { + override fun getEntity() = this.deletedEntity +} @JsonTypeName("ATTRIBUTE_APPEND") data class AttributeAppendEvent( @@ -149,6 +141,7 @@ data class AttributeDeleteEvent( val updatedEntity: String, override val contexts: List ) : EntityEvent(EventsType.ATTRIBUTE_DELETE, sub, tenantUri, entityId, entityTypes, contexts) { + override fun getEntity() = this.updatedEntity override fun getAttribute() = this.attributeName } @@ -162,13 +155,13 @@ data class AttributeDeleteAllInstancesEvent( val updatedEntity: String, override val contexts: List ) : EntityEvent(EventsType.ATTRIBUTE_DELETE_ALL_INSTANCES, sub, tenantUri, entityId, entityTypes, contexts) { + override fun getEntity() = this.updatedEntity override fun getAttribute() = this.attributeName } enum class EventsType { ENTITY_CREATE, ENTITY_REPLACE, - ENTITY_UPDATE, ENTITY_DELETE, ATTRIBUTE_APPEND, ATTRIBUTE_REPLACE, diff --git a/shared/src/main/kotlin/com/egm/stellio/shared/util/JsonLdUtils.kt b/shared/src/main/kotlin/com/egm/stellio/shared/util/JsonLdUtils.kt index 0cec261c8..033a4a127 100644 --- a/shared/src/main/kotlin/com/egm/stellio/shared/util/JsonLdUtils.kt +++ b/shared/src/main/kotlin/com/egm/stellio/shared/util/JsonLdUtils.kt @@ -71,6 +71,7 @@ object JsonLdUtils { val NGSILD_SYSATTRS_TERMS = setOf(NGSILD_CREATED_AT_TERM, NGSILD_MODIFIED_AT_TERM) const val NGSILD_CREATED_AT_PROPERTY = "https://uri.etsi.org/ngsi-ld/$NGSILD_CREATED_AT_TERM" const val NGSILD_MODIFIED_AT_PROPERTY = "https://uri.etsi.org/ngsi-ld/$NGSILD_MODIFIED_AT_TERM" + val NGSILD_SYSATTRS_PROPERTIES = setOf(NGSILD_CREATED_AT_PROPERTY, NGSILD_MODIFIED_AT_PROPERTY) const val NGSILD_OBSERVED_AT_TERM = "observedAt" const val NGSILD_OBSERVED_AT_PROPERTY = "https://uri.etsi.org/ngsi-ld/$NGSILD_OBSERVED_AT_TERM" const val NGSILD_UNIT_CODE_PROPERTY = "https://uri.etsi.org/ngsi-ld/unitCode" diff --git a/shared/src/test/kotlin/com/egm/stellio/shared/util/JsonUtilsTests.kt b/shared/src/test/kotlin/com/egm/stellio/shared/util/JsonUtilsTests.kt index 671755a46..49ae03062 100644 --- a/shared/src/test/kotlin/com/egm/stellio/shared/util/JsonUtilsTests.kt +++ b/shared/src/test/kotlin/com/egm/stellio/shared/util/JsonUtilsTests.kt @@ -131,13 +131,14 @@ class JsonUtilsTests { } @Test - fun `it should serialize an event of type ENTITY_DELETE`() { + fun `it should serialize an event of type ENTITY_DELETE`() = runTest { val event = mapper.writeValueAsString( EntityDeleteEvent( null, DEFAULT_TENANT_URI, entityId, listOf(BEEHIVE_TYPE), + serializeObject(expandJsonLdFragment(entityPayload, listOf(APIC_COMPOUND_CONTEXT))), listOf(APIC_COMPOUND_CONTEXT) ) ) diff --git a/shared/src/testFixtures/resources/ngsild/events/entity/entityDeleteEvent.json b/shared/src/testFixtures/resources/ngsild/events/entity/entityDeleteEvent.json index 1c9512938..4135d5853 100644 --- a/shared/src/testFixtures/resources/ngsild/events/entity/entityDeleteEvent.json +++ b/shared/src/testFixtures/resources/ngsild/events/entity/entityDeleteEvent.json @@ -2,6 +2,7 @@ "tenantUri": "urn:ngsi-ld:tenant:default", "entityId" : "urn:ngsi-ld:BeeHive:01", "entityTypes" : [ "https://ontology.eglobalmark.com/apic#BeeHive" ], + "deletedEntity":"{\"https://ontology.eglobalmark.com/egm#belongs\":[{\"https://uri.etsi.org/ngsi-ld/createdAt\":[{\"@value\":\"2022-02-12T08:36:59.389815Z\",\"@type\":\"https://uri.etsi.org/ngsi-ld/DateTime\"}],\"https://uri.etsi.org/ngsi-ld/hasObject\":[{\"@id\":\"urn:ngsi-ld:Apiary:01\"}],\"@type\":[\"https://uri.etsi.org/ngsi-ld/Relationship\"]}],\"https://uri.etsi.org/ngsi-ld/createdAt\":[{\"@value\":\"2022-02-12T08:36:59.179446Z\",\"@type\":\"https://uri.etsi.org/ngsi-ld/DateTime\"}],\"https://ontology.eglobalmark.com/apic#humidity\":[{\"https://uri.etsi.org/ngsi-ld/createdAt\":[{\"@value\":\"2022-02-12T08:36:59.448205Z\",\"@type\":\"https://uri.etsi.org/ngsi-ld/DateTime\"}],\"https://uri.etsi.org/ngsi-ld/observedAt\":[{\"@value\":\"2019-10-26T21:32:52.986010Z\",\"@type\":\"https://uri.etsi.org/ngsi-ld/DateTime\"}],\"https://ontology.eglobalmark.com/egm#observedBy\":[{\"https://uri.etsi.org/ngsi-ld/createdAt\":[{\"@value\":\"2022-02-12T08:36:59.455870Z\",\"@type\":\"https://uri.etsi.org/ngsi-ld/DateTime\"}],\"https://uri.etsi.org/ngsi-ld/hasObject\":[{\"@id\":\"urn:ngsi-ld:Sensor:02\"}],\"@type\":[\"https://uri.etsi.org/ngsi-ld/Relationship\"]}],\"@type\":[\"https://uri.etsi.org/ngsi-ld/Property\"],\"https://uri.etsi.org/ngsi-ld/unitCode\":[{\"@value\":\"P1\"}],\"https://uri.etsi.org/ngsi-ld/hasValue\":[{\"@value\":60}]}],\"@id\":\"urn:ngsi-ld:BeeHive:01\",\"https://uri.etsi.org/ngsi-ld/location\":[{\"@type\":[\"https://uri.etsi.org/ngsi-ld/GeoProperty\"],\"https://uri.etsi.org/ngsi-ld/hasValue\":[{\"@value\":\"POINT (24.30623 60.07966)\"}]}],\"https://ontology.eglobalmark.com/egm#managedBy\":[{\"https://uri.etsi.org/ngsi-ld/createdAt\":[{\"@value\":\"2022-02-12T08:36:59.417938Z\",\"@type\":\"https://uri.etsi.org/ngsi-ld/DateTime\"}],\"https://uri.etsi.org/ngsi-ld/hasObject\":[{\"@id\":\"urn:ngsi-ld:Beekeeper:01\"}],\"@type\":[\"https://uri.etsi.org/ngsi-ld/Relationship\"]}],\"https://uri.etsi.org/ngsi-ld/modifiedAt\":[{\"@value\":\"2022-02-12T08:36:59.218595Z\",\"@type\":\"https://uri.etsi.org/ngsi-ld/DateTime\"}],\"https://ontology.eglobalmark.com/apic#temperature\":[{\"https://uri.etsi.org/ngsi-ld/createdAt\":[{\"@value\":\"2022-02-12T08:36:59.465937Z\",\"@type\":\"https://uri.etsi.org/ngsi-ld/DateTime\"}],\"https://uri.etsi.org/ngsi-ld/observedAt\":[{\"@value\":\"2019-10-26T21:32:52.986010Z\",\"@type\":\"https://uri.etsi.org/ngsi-ld/DateTime\"}],\"https://ontology.eglobalmark.com/egm#observedBy\":[{\"https://uri.etsi.org/ngsi-ld/createdAt\":[{\"@value\":\"2022-02-12T08:36:59.473904Z\",\"@type\":\"https://uri.etsi.org/ngsi-ld/DateTime\"}],\"https://uri.etsi.org/ngsi-ld/hasObject\":[{\"@id\":\"urn:ngsi-ld:Sensor:01\"}],\"@type\":[\"https://uri.etsi.org/ngsi-ld/Relationship\"]}],\"@type\":[\"https://uri.etsi.org/ngsi-ld/Property\"],\"https://uri.etsi.org/ngsi-ld/unitCode\":[{\"@value\":\"CEL\"}],\"https://uri.etsi.org/ngsi-ld/hasValue\":[{\"@value\":22.2}]}],\"@type\":[\"https://ontology.eglobalmark.com/apic#BeeHive\"]}", "contexts" : [ "https://raw.githubusercontent.com/easy-global-market/ngsild-api-data-models/master/apic/jsonld-contexts/apic-compound.jsonld" ], "operationType" : "ENTITY_DELETE" } diff --git a/subscription-service/config/detekt/baseline.xml b/subscription-service/config/detekt/baseline.xml index b1313d1e6..acbf2b82d 100644 --- a/subscription-service/config/detekt/baseline.xml +++ b/subscription-service/config/detekt/baseline.xml @@ -2,8 +2,9 @@ + CyclomaticComplexMethod:SubscriptionServiceTests.kt$SubscriptionServiceTests$@Test fun `it should load a subscription with all possible members`() + LongMethod:EntityEventListenerService.kt$EntityEventListenerService$internal suspend fun dispatchEntityEvent(content: String) LongParameterList:FixtureUtils.kt$( withQueryAndGeoQuery: Pair<Boolean, Boolean> = Pair(true, true), withEndpointInfo: Boolean = true, withNotifParams: Pair<FormatType, List<String>> = Pair(FormatType.NORMALIZED, emptyList()), withModifiedAt: Boolean = false, georel: String = "within", geometry: String = "Polygon", coordinates: String = "[[[100.0, 0.0], [101.0, 0.0], [101.0, 1.0], [100.0, 1.0], [100.0, 0.0]]]", timeInterval: Int? = null, contexts: List<String> = listOf(NGSILD_CORE_CONTEXT) ) - TooGenericExceptionCaught:ParsingUtils.kt$ParsingUtils$e: Exception TooGenericExceptionCaught:SubscriptionService.kt$SubscriptionService$e: Exception TooManyFunctions:SubscriptionService.kt$SubscriptionService diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/job/TimeIntervalNotificationJob.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/job/TimeIntervalNotificationJob.kt index d98c6ba9e..4f612974a 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/job/TimeIntervalNotificationJob.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/job/TimeIntervalNotificationJob.kt @@ -75,7 +75,9 @@ class TimeIntervalNotificationJob( subscription: Subscription, contextLink: String ): Set = - subscription.entities + // if a subscription has a "timeInterval" member defined, it has at least one "entities" member + // because it can't have a "watchedAttributes" member + subscription.entities!! .map { getEntities( tenantUri, @@ -85,11 +87,11 @@ class TimeIntervalNotificationJob( } .flatten() .toSet() - .also { - if (it.isNotEmpty()) + .also { compactedEntities -> + if (compactedEntities.isNotEmpty()) logger.debug( "Gonna notify about entities: {} in tenant {}", - it.joinToString { it["id"] as String }, + compactedEntities.joinToString { it["id"] as String }, tenantUri ) } diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/listener/EntityEventListenerService.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/listener/EntityEventListenerService.kt index 7164b4432..91b9a5d71 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/listener/EntityEventListenerService.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/listener/EntityEventListenerService.kt @@ -1,15 +1,16 @@ package com.egm.stellio.subscription.listener import arrow.core.Either -import arrow.core.left import arrow.core.raise.either import com.egm.stellio.shared.model.* import com.egm.stellio.shared.util.ExpandedTerm import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_EXPANDED_ENTITY_CORE_MEMBERS +import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_SYSATTRS_PROPERTIES import com.egm.stellio.shared.util.JsonUtils.deserializeAs import com.egm.stellio.shared.util.JsonUtils.deserializeAsMap -import com.egm.stellio.shared.util.JsonUtils.deserializeObject +import com.egm.stellio.shared.util.JsonUtils.serializeObject import com.egm.stellio.shared.web.NGSILD_TENANT_HEADER +import com.egm.stellio.subscription.model.NotificationTrigger import com.egm.stellio.subscription.service.NotificationService import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers @@ -29,6 +30,8 @@ class EntityEventListenerService( private val coroutineScope = CoroutineScope(Dispatchers.Default) + private val ignoredExpandedEntityProperties = JSONLD_EXPANDED_ENTITY_CORE_MEMBERS.plus(NGSILD_SYSATTRS_PROPERTIES) + @KafkaListener(topics = ["cim.entity._CatchAll"], groupId = "context_subscription") fun processMessage(content: String) { coroutineScope.launch { @@ -49,32 +52,62 @@ class EntityEventListenerService( when (entityEvent) { is EntityCreateEvent -> handleEntityEvent( tenantUri, - deserializeObject(entityEvent.operationPayload) - .keys - .filter { !JSONLD_EXPANDED_ENTITY_CORE_MEMBERS.contains(it) } - .toSet(), + entityEvent.operationPayload.getUpdatedAttributes(), entityEvent.getEntity(), + NotificationTrigger.ENTITY_CREATED, + entityEvent.contexts + ) + is EntityReplaceEvent -> entityEvent.operationPayload.getUpdatedAttributes().forEach { attribute -> + handleEntityEvent( + tenantUri, + setOf(attribute), + entityEvent.getEntity(), + NotificationTrigger.ATTRIBUTE_CREATED, + entityEvent.contexts + ) + } + is EntityDeleteEvent -> handleEntityEvent( + tenantUri, + entityEvent.deletedEntity?.getUpdatedAttributes() ?: emptySet(), + entityEvent.getEntity() ?: serializeObject(emptyMap()), + NotificationTrigger.ENTITY_DELETED, entityEvent.contexts ) is AttributeAppendEvent -> handleEntityEvent( tenantUri, setOf(entityEvent.attributeName), entityEvent.getEntity(), + NotificationTrigger.ATTRIBUTE_CREATED, entityEvent.contexts ) is AttributeReplaceEvent -> handleEntityEvent( tenantUri, setOf(entityEvent.attributeName), entityEvent.getEntity(), + NotificationTrigger.ATTRIBUTE_UPDATED, entityEvent.contexts ) is AttributeUpdateEvent -> handleEntityEvent( tenantUri, setOf(entityEvent.attributeName), entityEvent.getEntity(), + NotificationTrigger.ATTRIBUTE_UPDATED, + entityEvent.contexts + ) + is AttributeDeleteEvent -> handleEntityEvent( + tenantUri, + setOf(entityEvent.attributeName), + entityEvent.getEntity(), + NotificationTrigger.ATTRIBUTE_DELETED, + entityEvent.contexts + ) + is AttributeDeleteAllInstancesEvent -> handleEntityEvent( + tenantUri, + setOf(entityEvent.attributeName), + entityEvent.getEntity(), + NotificationTrigger.ATTRIBUTE_DELETED, entityEvent.contexts ) - else -> OperationNotSupportedException(unhandledOperationType(entityEvent.operationType)).left() } }.onFailure { logger.warn("Entity event processing has failed: ${it.message}", it) @@ -85,6 +118,7 @@ class EntityEventListenerService( tenantUri: URI, updatedAttributes: Set, entityPayload: String, + notificationTrigger: NotificationTrigger, contexts: List ): Either = either { logger.debug("Attributes considered in the event: {}", updatedAttributes) @@ -93,7 +127,8 @@ class EntityEventListenerService( notificationService.notifyMatchingSubscribers( jsonLdEntity, jsonLdEntity.toNgsiLdEntity().bind(), - updatedAttributes + updatedAttributes, + notificationTrigger ) }.contextWrite { it.put(NGSILD_TENANT_HEADER, tenantUri) @@ -111,4 +146,10 @@ class EntityEventListenerService( }) } } + + private fun String.getUpdatedAttributes() = + this.deserializeAsMap() + .keys + .filter { !ignoredExpandedEntityProperties.contains(it) } + .toSet() } diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/model/Subscription.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/model/Subscription.kt index 19a707b2a..161b74cc6 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/model/Subscription.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/model/Subscription.kt @@ -10,6 +10,8 @@ import com.egm.stellio.shared.util.JsonUtils.convertToMap import com.egm.stellio.shared.util.JsonUtils.serializeObject import com.egm.stellio.shared.util.ngsiLdDateTime import com.egm.stellio.shared.util.toUri +import com.egm.stellio.subscription.model.NotificationTrigger.ATTRIBUTE_CREATED +import com.egm.stellio.subscription.model.NotificationTrigger.ATTRIBUTE_UPDATED import com.fasterxml.jackson.annotation.JsonInclude import com.fasterxml.jackson.annotation.JsonProperty import org.springframework.data.annotation.Id @@ -21,6 +23,11 @@ import java.time.ZoneOffset import java.time.ZonedDateTime import java.util.UUID +val defaultNotificationTriggers = listOf( + ATTRIBUTE_CREATED.notificationTrigger, + ATTRIBUTE_UPDATED.notificationTrigger +) + data class Subscription( @Id val id: URI = "urn:ngsi-ld:Subscription:${UUID.randomUUID()}".toUri(), val type: String, @@ -28,8 +35,9 @@ data class Subscription( val createdAt: ZonedDateTime = Instant.now().atZone(ZoneOffset.UTC), val modifiedAt: ZonedDateTime? = null, val description: String? = null, - val entities: Set, + val entities: Set? = null, val watchedAttributes: List? = null, + val notificationTrigger: List = defaultNotificationTriggers, val timeInterval: Int? = null, val q: String? = null, val geoQ: GeoQ? = null, @@ -56,11 +64,11 @@ data class Subscription( fun expand(contexts: List): Subscription = this.copy( - entities = entities.map { entityInfo -> + entities = entities?.map { entityInfo -> entityInfo.copy( type = expandJsonLdTerm(entityInfo.type, contexts) ) - }.toSet(), + }?.toSet(), notification = notification.copy( attributes = notification.attributes?.map { attributeName -> expandJsonLdTerm(attributeName, contexts) @@ -76,9 +84,9 @@ data class Subscription( fun compact(contexts: List): Subscription = this.copy( - entities = entities.map { + entities = entities?.map { EntityInfo(it.id, it.idPattern, compactTerm(it.type, contexts)) - }.toSet(), + }?.toSet(), notification = notification.copy( attributes = notification.attributes?.map { compactTerm(it, contexts) } ), @@ -134,6 +142,27 @@ enum class SubscriptionStatus(val status: String) { EXPIRED("expired") } +enum class NotificationTrigger(val notificationTrigger: String) { + ENTITY_CREATED("entityCreated"), + ENTITY_UPDATED("entityUpdated"), + ENTITY_DELETED("entityDeleted"), + ATTRIBUTE_CREATED("attributeCreated"), + ATTRIBUTE_UPDATED("attributeUpdated"), + ATTRIBUTE_DELETED("attributeDeleted"); + + companion object { + fun isValid(notificationTrigger: String): Boolean = + NotificationTrigger.entries.any { it.notificationTrigger == notificationTrigger } + + fun expandEntityUpdated(): String = + listOf( + ATTRIBUTE_CREATED.notificationTrigger, + ATTRIBUTE_UPDATED.notificationTrigger, + ATTRIBUTE_DELETED.notificationTrigger + ).joinToString(",") + } +} + fun Map.toFinalRepresentation( mediaType: MediaType = JSON_LD_MEDIA_TYPE, includeSysAttrs: Boolean = false diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/NotificationService.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/NotificationService.kt index cc0bd48a4..af82ddd2a 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/NotificationService.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/NotificationService.kt @@ -17,6 +17,7 @@ import com.egm.stellio.shared.web.DEFAULT_TENANT_URI import com.egm.stellio.shared.web.NGSILD_TENANT_HEADER import com.egm.stellio.subscription.model.Notification import com.egm.stellio.subscription.model.NotificationParams +import com.egm.stellio.subscription.model.NotificationTrigger import com.egm.stellio.subscription.model.Subscription import org.slf4j.LoggerFactory import org.springframework.http.HttpHeaders @@ -36,11 +37,12 @@ class NotificationService( suspend fun notifyMatchingSubscribers( jsonLdEntity: JsonLdEntity, ngsiLdEntity: NgsiLdEntity, - updatedAttributes: Set + updatedAttributes: Set, + notificationTrigger: NotificationTrigger ): Either>> = either { val id = ngsiLdEntity.id val types = ngsiLdEntity.types - subscriptionService.getMatchingSubscriptions(id, types, updatedAttributes) + subscriptionService.getMatchingSubscriptions(id, types, updatedAttributes, notificationTrigger) .filter { subscriptionService.isMatchingQQuery(it.q?.decode(), jsonLdEntity, it.contexts).bind() } diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt index 00ad77bfc..91a77c7bd 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt @@ -49,33 +49,37 @@ class SubscriptionService( private val logger = LoggerFactory.getLogger(javaClass) - suspend fun validateNewSubscription(subscription: Subscription): Either { - return either { - checkTypeIsSubscription(subscription).bind() - checkIdIsValid(subscription).bind() - checkTimeIntervalGreaterThanZero(subscription).bind() - checkSubscriptionValidity(subscription).bind() - checkExpiresAtInTheFuture(subscription).bind() - checkIdPatternIsValid(subscription).bind() - } + suspend fun validateNewSubscription(subscription: Subscription): Either = either { + checkTypeIsSubscription(subscription).bind() + checkIdIsValid(subscription).bind() + checkEntitiesOrWatchedAttributes(subscription).bind() + checkTimeIntervalGreaterThanZero(subscription).bind() + checkSubscriptionValidity(subscription).bind() + checkExpiresAtInTheFuture(subscription).bind() + checkIdPatternIsValid(subscription).bind() + checkNotificationTriggersAreValid(subscription).bind() } + private fun checkTypeIsSubscription(subscription: Subscription): Either = + if (subscription.type != NGSILD_SUBSCRIPTION_TERM) + BadRequestDataException("type attribute must be equal to 'Subscription'").left() + else Unit.right() + private fun checkIdIsValid(subscription: Subscription): Either = if (!subscription.id.isAbsolute) BadRequestDataException(invalidUriMessage("${subscription.id}")).left() else Unit.right() - private fun checkTypeIsSubscription(subscription: Subscription): Either = - if (subscription.type != NGSILD_SUBSCRIPTION_TERM) - BadRequestDataException("type attribute must be equal to 'Subscription'").left() + private fun checkEntitiesOrWatchedAttributes(subscription: Subscription): Either = + if (subscription.watchedAttributes == null && subscription.entities == null) + BadRequestDataException("At least one of entities or watchedAttributes shall be present").left() else Unit.right() private fun checkSubscriptionValidity(subscription: Subscription): Either = if (subscription.watchedAttributes != null && subscription.timeInterval != null) BadRequestDataException( - "You can't use 'timeInterval' with 'watchedAttributes' in conjunction" - ) - .left() + "You can't use 'timeInterval' in conjunction with 'watchedAttributes'" + ).left() else Unit.right() private fun checkTimeIntervalGreaterThanZero(subscription: Subscription): Either = @@ -97,64 +101,75 @@ class SubscriptionService( BadRequestDataException("Unable to parse 'expiresAt' value: $expiresAt").left() }) - private fun checkIdPatternIsValid(subscription: Subscription): Either = - subscription.entities.forEach { endpoint -> + private fun checkIdPatternIsValid(subscription: Subscription): Either { + val result = subscription.entities?.all { endpoint -> runCatching { endpoint.idPattern?.let { Pattern.compile(it) } - }.onFailure { - return BadRequestDataException("Invalid value for idPattern: ${endpoint.idPattern}").left() - } - }.right() + true + }.fold({ true }, { false }) + } + + return if (result == null || result) + Unit.right() + else BadRequestDataException("Invalid idPattern found in subscription").left() + } + + private fun checkNotificationTriggersAreValid(subscription: Subscription): Either = + subscription.notificationTrigger.all { + NotificationTrigger.isValid(it) + }.let { + if (it) Unit.right() + else BadRequestDataException("Unknown notification trigger in ${subscription.notificationTrigger}").left() + } @Transactional - suspend fun create(subscription: Subscription, sub: Option): Either { - return either { - validateNewSubscription(subscription).bind() + suspend fun create(subscription: Subscription, sub: Option): Either = either { + validateNewSubscription(subscription).bind() - val geoQuery = - if (subscription.geoQ != null) - parseGeoQueryParameters(subscription.geoQ.toMap(), subscription.contexts).bind() - else null + val geoQuery = + if (subscription.geoQ != null) + parseGeoQueryParameters(subscription.geoQ.toMap(), subscription.contexts).bind() + else null - val insertStatement = - """ - INSERT INTO subscription(id, type, subscription_name, created_at, description, watched_attributes, - time_interval, q, scope_q, notif_attributes, notif_format, endpoint_uri, endpoint_accept, - endpoint_info, times_sent, is_active, expires_at, sub, contexts) - VALUES(:id, :type, :subscription_name, :created_at, :description, :watched_attributes, - :time_interval, :q, :scope_q, :notif_attributes, :notif_format, :endpoint_uri, :endpoint_accept, - :endpoint_info, :times_sent, :is_active, :expires_at, :sub, :contexts) - """.trimIndent() + val insertStatement = + """ + INSERT INTO subscription(id, type, subscription_name, created_at, description, watched_attributes, + notification_trigger, time_interval, q, scope_q, notif_attributes, notif_format, endpoint_uri, + endpoint_accept, endpoint_info, times_sent, is_active, expires_at, sub, contexts) + VALUES(:id, :type, :subscription_name, :created_at, :description, :watched_attributes, + :notification_trigger, :time_interval, :q, :scope_q, :notif_attributes, :notif_format, :endpoint_uri, + :endpoint_accept, :endpoint_info, :times_sent, :is_active, :expires_at, :sub, :contexts) + """.trimIndent() - databaseClient.sql(insertStatement) - .bind("id", subscription.id) - .bind("type", subscription.type) - .bind("subscription_name", subscription.subscriptionName) - .bind("created_at", subscription.createdAt) - .bind("description", subscription.description) - .bind("watched_attributes", subscription.watchedAttributes?.joinToString(separator = ",")) - .bind("time_interval", subscription.timeInterval) - .bind("q", subscription.q) - .bind("scope_q", subscription.scopeQ) - .bind("notif_attributes", subscription.notification.attributes?.joinToString(separator = ",")) - .bind("notif_format", subscription.notification.format.name) - .bind("endpoint_uri", subscription.notification.endpoint.uri) - .bind("endpoint_accept", subscription.notification.endpoint.accept.name) - .bind("endpoint_info", Json.of(endpointInfoToString(subscription.notification.endpoint.info))) - .bind("times_sent", subscription.notification.timesSent) - .bind("is_active", subscription.isActive) - .bind("expires_at", subscription.expiresAt) - .bind("sub", sub.toStringValue()) - .bind("contexts", subscription.contexts.toTypedArray()) - .execute().bind() + databaseClient.sql(insertStatement) + .bind("id", subscription.id) + .bind("type", subscription.type) + .bind("subscription_name", subscription.subscriptionName) + .bind("created_at", subscription.createdAt) + .bind("description", subscription.description) + .bind("watched_attributes", subscription.watchedAttributes?.joinToString(separator = ",")) + .bind("notification_trigger", subscription.notificationTrigger.toTypedArray()) + .bind("time_interval", subscription.timeInterval) + .bind("q", subscription.q) + .bind("scope_q", subscription.scopeQ) + .bind("notif_attributes", subscription.notification.attributes?.joinToString(separator = ",")) + .bind("notif_format", subscription.notification.format.name) + .bind("endpoint_uri", subscription.notification.endpoint.uri) + .bind("endpoint_accept", subscription.notification.endpoint.accept.name) + .bind("endpoint_info", Json.of(endpointInfoToString(subscription.notification.endpoint.info))) + .bind("times_sent", subscription.notification.timesSent) + .bind("is_active", subscription.isActive) + .bind("expires_at", subscription.expiresAt) + .bind("sub", sub.toStringValue()) + .bind("contexts", subscription.contexts.toTypedArray()) + .execute().bind() - geoQuery?.let { - createGeometryQuery(it, subscription.id).bind() - } + geoQuery?.let { + upsertGeometryQuery(it, subscription.id).bind() + } - subscription.entities.forEach { - createEntityInfo(it, subscription.id).bind() - } + subscription.entities?.forEach { + createEntityInfo(it, subscription.id).bind() } } @@ -185,7 +200,7 @@ class SubscriptionService( .execute().bind() } - private suspend fun createGeometryQuery(geoQuery: GeoQuery, subscriptionId: URI): Either = + private suspend fun upsertGeometryQuery(geoQuery: GeoQuery, subscriptionId: URI): Either = either { databaseClient.sql( """ @@ -193,6 +208,9 @@ class SubscriptionService( geoproperty, subscription_id) VALUES (:georel, :geometry, :coordinates, public.ST_GeomFromText(:wkt_coordinates), :geoproperty, :subscription_id) + ON CONFLICT (subscription_id) + DO UPDATE SET georel = :georel, geometry = :geometry, coordinates = :coordinates, + pgis_geometry = public.ST_GeomFromText(:wkt_coordinates), geoproperty = :geoproperty """ ) .bind("georel", geoQuery.georel) @@ -208,9 +226,9 @@ class SubscriptionService( val selectStatement = """ SELECT subscription.id as sub_id, subscription.type as sub_type, subscription_name, created_at, - modified_at, description, watched_attributes, time_interval, q, notif_attributes, notif_format, - endpoint_uri, endpoint_accept, endpoint_info, status, times_sent, is_active, last_notification, - last_failure, last_success, entity_info.id as entity_id, id_pattern, + modified_at, description, watched_attributes, notification_trigger, time_interval, q, notif_attributes, + notif_format, endpoint_uri, endpoint_accept, endpoint_info, status, times_sent, is_active, + last_notification, last_failure, last_success, entity_info.id as entity_id, id_pattern, entity_info.type as entity_type, georel, geometry, coordinates, pgis_geometry, geoproperty, scope_q, expires_at, contexts FROM subscription @@ -223,10 +241,15 @@ class SubscriptionService( .bind("id", id) .allToMappedList { rowToSubscription(it) } .reduce { t: Subscription, u: Subscription -> - t.copy(entities = t.entities.plus(u.entities)) + t.copy(entities = mergeEntityInfo(t.entities, u.entities)) } } + private fun mergeEntityInfo(tEntityInfo: Set?, uEntityInfo: Set?): Set? = + if (tEntityInfo == null) uEntityInfo + else if (uEntityInfo == null) tEntityInfo + else tEntityInfo.plus(uEntityInfo) + suspend fun getContextsForSubscription(id: URI): Either> { val selectStatement = """ @@ -284,7 +307,7 @@ class SubscriptionService( when { it.key == "geoQ" -> parseGeoQueryParameters(it.value as Map, contexts).bind() - ?.let { updateGeometryQuery(subscriptionId, it).bind() } + ?.let { upsertGeometryQuery(it, subscriptionId).bind() } it.key == "notification" -> { val notification = it.value as Map @@ -312,6 +335,7 @@ class SubscriptionService( listOf( "subscriptionName", "description", + "notificationTrigger", "timeInterval", "q", "scopeQ", @@ -351,23 +375,6 @@ class SubscriptionService( .awaitFirst() } - suspend fun updateGeometryQuery(subscriptionId: URI, geoQ: GeoQuery): Either = - databaseClient.sql( - """ - UPDATE geometry_query - SET georel = :georel, geometry = :geometry, coordinates = :coordinates, - pgis_geometry = public.ST_GeomFromText(:wkt_coordinates), geoproperty= :geoproperty - WHERE subscription_id = :subscription_id - """ - ) - .bind("georel", geoQ.georel) - .bind("geometry", geoQ.geometry.type) - .bind("coordinates", geoQ.coordinates) - .bind("wkt_coordinates", geoQ.wktCoordinates.value) - .bind("geoproperty", geoQ.geoproperty) - .bind("subscription_id", subscriptionId) - .execute() - suspend fun updateNotification( subscriptionId: URI, notification: Map, @@ -407,11 +414,10 @@ class SubscriptionService( ): List> { return when (attribute.key) { "attributes" -> { - var valueList = attribute.value as List - valueList = valueList.map { - JsonLdUtils.expandJsonLdTerm(it, contexts) + val attributes = (attribute.value as List).joinToString(separator = ",") { + expandJsonLdTerm(it, contexts) } - listOf(Pair("notif_attributes", valueList.joinToString(separator = ","))) + listOf(Pair("notif_attributes", attributes)) } "format" -> { val format = @@ -444,12 +450,10 @@ class SubscriptionService( subscriptionId: URI, entities: List>, contexts: List - ): Either { - return either { - deleteEntityInfo(subscriptionId).bind() - entities.forEach { - createEntityInfo(parseEntityInfo(it, contexts), subscriptionId).bind() - } + ): Either = either { + deleteEntityInfo(subscriptionId).bind() + entities.forEach { + createEntityInfo(parseEntityInfo(it, contexts), subscriptionId).bind() } } @@ -472,10 +476,11 @@ class SubscriptionService( val selectStatement = """ SELECT subscription.id as sub_id, subscription.type as sub_type, subscription_name, created_at, - modified_At, description, watched_attributes, time_interval, q, notif_attributes, notif_format, - endpoint_uri, endpoint_accept, endpoint_info, status, times_sent, is_active, last_notification, - last_failure, last_success, entity_info.id as entity_id, id_pattern, entity_info.type as entity_type, - georel, geometry, coordinates, pgis_geometry, geoproperty, scope_q, expires_at, contexts + modified_At, description, watched_attributes, notification_trigger, time_interval, q, notif_attributes, + notif_format, endpoint_uri, endpoint_accept, endpoint_info, status, times_sent, is_active, + last_notification, last_failure, last_success, entity_info.id as entity_id, id_pattern, + entity_info.type as entity_type, georel, geometry, coordinates, pgis_geometry, geoproperty, scope_q, + expires_at, contexts FROM subscription LEFT JOIN entity_info ON entity_info.subscription_id = subscription.id LEFT JOIN geometry_query ON geometry_query.subscription_id = subscription.id @@ -492,12 +497,10 @@ class SubscriptionService( .bind("offset", offset) .bind("sub", sub.toStringValue()) .allToMappedList { rowToSubscription(it) } - .groupBy { t: Subscription -> - t.id - } + .groupBy { t: Subscription -> t.id } .mapValues { grouped -> grouped.value.reduce { t: Subscription, u: Subscription -> - t.copy(entities = t.entities.plus(u.entities)) + t.copy(entities = mergeEntityInfo(t.entities, u.entities)) } }.values.toList() } @@ -517,7 +520,8 @@ class SubscriptionService( suspend fun getMatchingSubscriptions( id: URI, types: List, - updatedAttributes: Set + updatedAttributes: Set, + notificationTrigger: NotificationTrigger ): List { val selectStatement = """ @@ -525,18 +529,27 @@ class SubscriptionService( scope_q, notif_attributes, notif_format, endpoint_uri, endpoint_accept, times_sent, endpoint_info, contexts FROM subscription + LEFT JOIN entity_info on subscription.id = entity_info.subscription_id WHERE is_active AND ( expires_at is null OR expires_at >= :date ) AND time_interval IS NULL - AND ( string_to_array(watched_attributes, ',') && string_to_array(:updatedAttributes, ',') - OR watched_attributes IS NULL) - AND id IN ( - SELECT subscription_id - FROM entity_info - WHERE entity_info.type IN (:types) - AND (entity_info.id IS NULL OR entity_info.id = :id) - AND (entity_info.id_pattern IS NULL OR :id ~ entity_info.id_pattern) - ) + AND CASE + WHEN notification_trigger && '{ entityUpdated }' + THEN notification_trigger || '{ ${NotificationTrigger.expandEntityUpdated()} }' && '{ ${notificationTrigger.notificationTrigger} }' + ELSE notification_trigger && '{ ${notificationTrigger.notificationTrigger} }' + END + AND CASE + WHEN watched_attributes is NULL + THEN entity_info.type IN (:types) + AND (entity_info.id IS NULL OR entity_info.id = :id) + AND (entity_info.id_pattern IS NULL OR :id ~ entity_info.id_pattern) + WHEN entity_info.type is NULL + THEN string_to_array(watched_attributes, ',') && string_to_array(:updatedAttributes, ',') + ELSE ( string_to_array(watched_attributes, ',') && string_to_array(:updatedAttributes, ',') + AND (entity_info.type IN (:types) + AND (entity_info.id IS NULL OR entity_info.id = :id) + AND (entity_info.id_pattern IS NULL OR :id ~ entity_info.id_pattern))) + END """.trimIndent() return databaseClient.sql(selectStatement) .bind("id", id) @@ -626,15 +639,10 @@ class SubscriptionService( expiresAt = toNullableZonedDateTime(row["expires_at"]), description = row["description"] as? String, watchedAttributes = (row["watched_attributes"] as? String)?.split(","), + notificationTrigger = toList(row["notification_trigger"]), timeInterval = toNullableInt(row["time_interval"]), q = row["q"] as? String, - entities = setOf( - EntityInfo( - id = toNullableUri(row["entity_id"]), - idPattern = row["id_pattern"] as? String, - type = row["entity_type"] as String - ) - ), + entities = rowToEntityInfo(row)?.let { setOf(it) }, geoQ = rowToGeoQ(row), scopeQ = row["scope_q"] as? String, notification = NotificationParams( @@ -696,14 +704,26 @@ class SubscriptionService( null } + private val rowToEntityInfo: ((Map) -> EntityInfo?) = { row -> + if (row["entity_type"] != null) + EntityInfo( + id = toNullableUri(row["entity_id"]), + idPattern = row["id_pattern"] as? String, + type = row["entity_type"] as String + ) + else + null + } + suspend fun getRecurringSubscriptionsToNotify(): List { val selectStatement = """ SELECT subscription.id as sub_id, subscription.type as sub_type, subscription_name, created_at, - modified_At, expires_at, description, watched_attributes, time_interval, q, scope_q, notif_attributes, - notif_format, endpoint_uri, endpoint_accept, endpoint_info, status, times_sent, last_notification, - last_failure, last_success, is_active, entity_info.id as entity_id, id_pattern, - entity_info.type as entity_type, georel, geometry, coordinates, pgis_geometry, geoproperty, contexts + modified_At, expires_at, description, watched_attributes, notification_trigger, time_interval, q, + scope_q, notif_attributes, notif_format, endpoint_uri, endpoint_accept, endpoint_info, status, + times_sent, last_notification, last_failure, last_success, is_active, entity_info.id as entity_id, + id_pattern, entity_info.type as entity_type, georel, geometry, coordinates, pgis_geometry, geoproperty, + contexts FROM subscription LEFT JOIN entity_info ON entity_info.subscription_id = subscription.id LEFT JOIN geometry_query ON geometry_query.subscription_id = subscription.id @@ -721,7 +741,7 @@ class SubscriptionService( } .mapValues { grouped -> grouped.value.reduce { t: Subscription, u: Subscription -> - t.copy(entities = t.entities.plus(u.entities)) + t.copy(entities = mergeEntityInfo(t.entities, u.entities)) } }.values.toList() } diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/utils/ParsingUtils.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/utils/ParsingUtils.kt index a887d190f..7f1e130bc 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/utils/ParsingUtils.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/utils/ParsingUtils.kt @@ -2,11 +2,13 @@ package com.egm.stellio.subscription.utils import arrow.core.Either import arrow.core.left -import arrow.core.raise.either +import arrow.core.right import com.egm.stellio.shared.model.APIException import com.egm.stellio.shared.model.toAPIException import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_CONTEXT import com.egm.stellio.shared.util.JsonLdUtils.expandJsonLdTerm +import com.egm.stellio.shared.util.JsonUtils.deserializeAs +import com.egm.stellio.shared.util.JsonUtils.serializeObject import com.egm.stellio.shared.util.mapper import com.egm.stellio.subscription.model.EndpointInfo import com.egm.stellio.subscription.model.EntityInfo @@ -15,16 +17,13 @@ import com.egm.stellio.subscription.model.Subscription object ParsingUtils { fun parseSubscription(input: Map, contexts: List): Either = - try { - either { - mapper.convertValue( - input.plus(JSONLD_CONTEXT to contexts), - Subscription::class.java - ).expand(contexts) - } - } catch (e: Exception) { - e.toAPIException().left() - } + runCatching { + deserializeAs(serializeObject(input.plus(JSONLD_CONTEXT to contexts))) + .expand(contexts) + }.fold( + { it.right() }, + { it.toAPIException("Failed to parse subscription").left() } + ) fun parseEntityInfo(input: Map, contexts: List): EntityInfo { val entityInfo = mapper.convertValue(input, EntityInfo::class.java) @@ -64,6 +63,7 @@ object ParsingUtils { else valueAsArrayList.joinToString(separator = ",") } + "notificationTrigger" -> (this as ArrayList).toTypedArray() else -> this } } diff --git a/subscription-service/src/main/resources/db/migration/V0_21__add_notification_trigger_column.sql b/subscription-service/src/main/resources/db/migration/V0_21__add_notification_trigger_column.sql new file mode 100644 index 000000000..ba68d2c3f --- /dev/null +++ b/subscription-service/src/main/resources/db/migration/V0_21__add_notification_trigger_column.sql @@ -0,0 +1,5 @@ +alter table subscription + add column notification_trigger text[]; + +update subscription + set notification_trigger = '{ "attributeCreated", "attributeUpdated" }'; diff --git a/subscription-service/src/main/resources/db/migration/V0_22__add_subscription_unicity_in_geo_query.sql b/subscription-service/src/main/resources/db/migration/V0_22__add_subscription_unicity_in_geo_query.sql new file mode 100644 index 000000000..0b78a863e --- /dev/null +++ b/subscription-service/src/main/resources/db/migration/V0_22__add_subscription_unicity_in_geo_query.sql @@ -0,0 +1 @@ +ALTER TABLE geometry_query ADD CONSTRAINT subscription_id_unicity UNIQUE (subscription_id) diff --git a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/listener/EntityEventListenerServiceTests.kt b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/listener/EntityEventListenerServiceTests.kt index 7f7c2f6fc..a07c16003 100644 --- a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/listener/EntityEventListenerServiceTests.kt +++ b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/listener/EntityEventListenerServiceTests.kt @@ -4,10 +4,14 @@ import arrow.core.right import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_NAME_PROPERTY import com.egm.stellio.shared.util.loadSampleData import com.egm.stellio.subscription.model.Notification +import com.egm.stellio.subscription.model.NotificationTrigger import com.egm.stellio.subscription.model.Subscription import com.egm.stellio.subscription.service.NotificationService import com.ninjasquad.springmockk.MockkBean -import io.mockk.* +import io.mockk.coEvery +import io.mockk.coVerify +import io.mockk.confirmVerified +import io.mockk.mockkClass import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.Test @@ -26,6 +30,28 @@ class EntityEventListenerServiceTests { @MockkBean private lateinit var notificationService: NotificationService + @Test + fun `it should parse and transmit an attribute create events for an entity replace event`() = runTest { + val replaceEvent = loadSampleData("events/entity/entityReplaceEvent.json") + + val mockedSubscription = mockkClass(Subscription::class) + val mockedNotification = mockkClass(Notification::class) + + coEvery { + notificationService.notifyMatchingSubscribers(any(), any(), any(), any()) + } returns listOf( + Triple(mockedSubscription, mockedNotification, true), + Triple(mockedSubscription, mockedNotification, false) + ).right() + + entityEventListenerService.dispatchEntityEvent(replaceEvent) + + coVerify(exactly = 4, timeout = 1000L) { + notificationService.notifyMatchingSubscribers(any(), any(), any(), any()) + } + confirmVerified(notificationService) + } + @Test fun `it should parse and transmit an attribute replace event`() = runTest { val replaceEvent = loadSampleData("events/entity/attributeReplaceTextPropEvent.json") @@ -34,7 +60,7 @@ class EntityEventListenerServiceTests { val mockedNotification = mockkClass(Notification::class) coEvery { - notificationService.notifyMatchingSubscribers(any(), any(), any()) + notificationService.notifyMatchingSubscribers(any(), any(), any(), any()) } returns listOf( Triple(mockedSubscription, mockedNotification, true), Triple(mockedSubscription, mockedNotification, false) @@ -42,7 +68,7 @@ class EntityEventListenerServiceTests { entityEventListenerService.dispatchEntityEvent(replaceEvent) - coVerify(timeout = 1000L) { notificationService.notifyMatchingSubscribers(any(), any(), any()) } + coVerify(timeout = 1000L) { notificationService.notifyMatchingSubscribers(any(), any(), any(), any()) } confirmVerified(notificationService) } @@ -54,7 +80,7 @@ class EntityEventListenerServiceTests { val mockedNotification = mockkClass(Notification::class) coEvery { - notificationService.notifyMatchingSubscribers(any(), any(), any()) + notificationService.notifyMatchingSubscribers(any(), any(), any(), any()) } returns listOf( Triple(mockedSubscription, mockedNotification, true), Triple(mockedSubscription, mockedNotification, false) @@ -63,7 +89,12 @@ class EntityEventListenerServiceTests { entityEventListenerService.dispatchEntityEvent(updateEvent) coVerify(timeout = 1000L) { - notificationService.notifyMatchingSubscribers(any(), any(), setOf(NGSILD_NAME_PROPERTY)) + notificationService.notifyMatchingSubscribers( + any(), + any(), + setOf(NGSILD_NAME_PROPERTY), + NotificationTrigger.ATTRIBUTE_UPDATED + ) } confirmVerified(notificationService) } diff --git a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/NotificationServiceTests.kt b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/NotificationServiceTests.kt index 6891c5879..7ac74dca9 100644 --- a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/NotificationServiceTests.kt +++ b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/NotificationServiceTests.kt @@ -15,6 +15,7 @@ import com.egm.stellio.shared.web.NGSILD_TENANT_HEADER import com.egm.stellio.subscription.model.Endpoint import com.egm.stellio.subscription.model.NotificationParams import com.egm.stellio.subscription.model.NotificationParams.FormatType +import com.egm.stellio.subscription.model.NotificationTrigger.* import com.egm.stellio.subscription.utils.gimmeRawSubscription import com.github.tomakehurst.wiremock.client.WireMock.* import com.github.tomakehurst.wiremock.junit5.WireMockTest @@ -83,7 +84,9 @@ class NotificationServiceTests { val jsonLdEntity = expandJsonLdEntity(rawEntity) val ngsiLdEntity = jsonLdEntity.toNgsiLdEntity().shouldSucceedAndResult() - coEvery { subscriptionService.getMatchingSubscriptions(any(), any(), any()) } returns listOf(subscription) + coEvery { + subscriptionService.getMatchingSubscriptions(any(), any(), any(), any()) + } returns listOf(subscription) coEvery { subscriptionService.isMatchingQQuery(any(), any(), any()) } returns true.right() coEvery { subscriptionService.isMatchingScopeQQuery(any(), any()) } returns true.right() coEvery { subscriptionService.isMatchingGeoQuery(any(), any()) } returns true.right() @@ -94,19 +97,24 @@ class NotificationServiceTests { .willReturn(ok()) ) - notificationService.notifyMatchingSubscribers(jsonLdEntity, ngsiLdEntity, setOf(NGSILD_NAME_PROPERTY)) - .shouldSucceedWith { - assertEquals(1, it.size) - assertEquals(subscription.id, it[0].second.subscriptionId) - assertEquals(1, it[0].second.data.size) - assertTrue(it[0].third) - } + notificationService.notifyMatchingSubscribers( + jsonLdEntity, + ngsiLdEntity, + setOf(NGSILD_NAME_PROPERTY), + ATTRIBUTE_UPDATED + ).shouldSucceedWith { + assertEquals(1, it.size) + assertEquals(subscription.id, it[0].second.subscriptionId) + assertEquals(1, it[0].second.data.size) + assertTrue(it[0].third) + } coVerify { subscriptionService.getMatchingSubscriptions( apiaryId.toUri(), listOf(APIARY_TYPE), - setOf(NGSILD_NAME_PROPERTY) + setOf(NGSILD_NAME_PROPERTY), + ATTRIBUTE_UPDATED ) } coVerify { subscriptionService.isMatchingScopeQQuery(subscription.scopeQ, any()) } @@ -128,7 +136,9 @@ class NotificationServiceTests { val jsonLdEntity = expandJsonLdEntity(rawEntity) val ngsiLdEntity = jsonLdEntity.toNgsiLdEntity().shouldSucceedAndResult() - coEvery { subscriptionService.getMatchingSubscriptions(any(), any(), any()) } returns listOf(subscription) + coEvery { + subscriptionService.getMatchingSubscriptions(any(), any(), any(), any()) + } returns listOf(subscription) coEvery { subscriptionService.isMatchingQQuery(any(), any(), any()) } returns true.right() coEvery { subscriptionService.isMatchingScopeQQuery(any(), any()) } returns true.right() coEvery { subscriptionService.isMatchingGeoQuery(any(), any()) } returns true.right() @@ -139,22 +149,26 @@ class NotificationServiceTests { .willReturn(ok()) ) - notificationService.notifyMatchingSubscribers(jsonLdEntity, ngsiLdEntity, setOf(NGSILD_NAME_PROPERTY)) - .shouldSucceedWith { - assertEquals(1, it.size) - assertEquals(subscription.id, it[0].second.subscriptionId) - assertEquals(1, it[0].second.data.size) - assertEquals(5, it[0].second.data[0].size) - assertTrue( - it[0].second.data[0].all { entry -> - JSONLD_COMPACTED_ENTITY_CORE_MEMBERS - .plus(NGSILD_NAME_TERM) - .plus(NGSILD_LOCATION_TERM) - .contains(entry.key) - } - ) - assertTrue(it[0].third) - } + notificationService.notifyMatchingSubscribers( + jsonLdEntity, + ngsiLdEntity, + setOf(NGSILD_NAME_PROPERTY), + ATTRIBUTE_UPDATED + ).shouldSucceedWith { + assertEquals(1, it.size) + assertEquals(subscription.id, it[0].second.subscriptionId) + assertEquals(1, it[0].second.data.size) + assertEquals(5, it[0].second.data[0].size) + assertTrue( + it[0].second.data[0].all { entry -> + JSONLD_COMPACTED_ENTITY_CORE_MEMBERS + .plus(NGSILD_NAME_TERM) + .plus(NGSILD_LOCATION_TERM) + .contains(entry.key) + } + ) + assertTrue(it[0].third) + } } @Test @@ -172,7 +186,9 @@ class NotificationServiceTests { val jsonLdEntity = expandJsonLdEntity(rawEntity) val ngsiLdEntity = jsonLdEntity.toNgsiLdEntity().shouldSucceedAndResult() - coEvery { subscriptionService.getMatchingSubscriptions(any(), any(), any()) } returns listOf(subscription) + coEvery { + subscriptionService.getMatchingSubscriptions(any(), any(), any(), any()) + } returns listOf(subscription) coEvery { subscriptionService.isMatchingQQuery(any(), any(), any()) } returns true.right() coEvery { subscriptionService.isMatchingScopeQQuery(any(), any()) } returns true.right() coEvery { subscriptionService.isMatchingGeoQuery(any(), any()) } returns true.right() @@ -183,17 +199,21 @@ class NotificationServiceTests { .willReturn(ok()) ) - notificationService.notifyMatchingSubscribers(jsonLdEntity, ngsiLdEntity, setOf(NGSILD_NAME_PROPERTY)) - .shouldSucceedWith { notificationResults -> - val notificationResult = notificationResults[0] - assertEquals(subscription.id, notificationResult.first.id) - assertEquals(subscription.id, notificationResult.second.subscriptionId) - assertEquals(1, notificationResult.second.data.size) - assertTrue(notificationResult.second.data[0].containsKey(NGSILD_NAME_PROPERTY)) - assertTrue(notificationResult.second.data[0].containsKey(MANAGED_BY_RELATIONSHIP)) - assertEquals(listOf(NGSILD_CORE_CONTEXT), notificationResult.second.data[0][JsonLdUtils.JSONLD_CONTEXT]) - assertTrue(notificationResult.third) - } + notificationService.notifyMatchingSubscribers( + jsonLdEntity, + ngsiLdEntity, + setOf(NGSILD_NAME_PROPERTY), + ATTRIBUTE_UPDATED + ).shouldSucceedWith { notificationResults -> + val notificationResult = notificationResults[0] + assertEquals(subscription.id, notificationResult.first.id) + assertEquals(subscription.id, notificationResult.second.subscriptionId) + assertEquals(1, notificationResult.second.data.size) + assertTrue(notificationResult.second.data[0].containsKey(NGSILD_NAME_PROPERTY)) + assertTrue(notificationResult.second.data[0].containsKey(MANAGED_BY_RELATIONSHIP)) + assertEquals(listOf(NGSILD_CORE_CONTEXT), notificationResult.second.data[0][JsonLdUtils.JSONLD_CONTEXT]) + assertTrue(notificationResult.third) + } } @Test @@ -205,7 +225,9 @@ class NotificationServiceTests { val jsonLdEntity = expandJsonLdEntity(rawEntity) val ngsiLdEntity = jsonLdEntity.toNgsiLdEntity().shouldSucceedAndResult() - coEvery { subscriptionService.getMatchingSubscriptions(any(), any(), any()) } returns listOf(subscription) + coEvery { + subscriptionService.getMatchingSubscriptions(any(), any(), any(), any()) + } returns listOf(subscription) coEvery { subscriptionService.isMatchingQQuery(any(), any(), any()) } returns true.right() coEvery { subscriptionService.isMatchingScopeQQuery(any(), any()) } returns true.right() coEvery { subscriptionService.isMatchingGeoQuery(any(), any()) } returns true.right() @@ -216,19 +238,24 @@ class NotificationServiceTests { .willReturn(ok()) ) - notificationService.notifyMatchingSubscribers(jsonLdEntity, ngsiLdEntity, setOf(NGSILD_NAME_PROPERTY)) - .shouldSucceedWith { - assertEquals(1, it.size) - assertEquals(subscription.id, it[0].second.subscriptionId) - assertEquals(1, it[0].second.data.size) - assertTrue(it[0].third) - } + notificationService.notifyMatchingSubscribers( + jsonLdEntity, + ngsiLdEntity, + setOf(NGSILD_NAME_PROPERTY), + ATTRIBUTE_UPDATED + ).shouldSucceedWith { + assertEquals(1, it.size) + assertEquals(subscription.id, it[0].second.subscriptionId) + assertEquals(1, it[0].second.data.size) + assertTrue(it[0].third) + } coVerify { subscriptionService.getMatchingSubscriptions( apiaryId.toUri(), listOf(APIARY_TYPE), - setOf(NGSILD_NAME_PROPERTY) + setOf(NGSILD_NAME_PROPERTY), + ATTRIBUTE_UPDATED ) } coVerify { subscriptionService.isMatchingQQuery(subscription.q, any(), any()) } @@ -246,7 +273,7 @@ class NotificationServiceTests { val ngsiLdEntity = jsonLdEntity.toNgsiLdEntity().shouldSucceedAndResult() coEvery { - subscriptionService.getMatchingSubscriptions(any(), any(), any()) + subscriptionService.getMatchingSubscriptions(any(), any(), any(), any()) } returns listOf(subscription1, subscription2) coEvery { subscriptionService.isMatchingQQuery(any(), any(), any()) } returns true.right() coEvery { subscriptionService.isMatchingScopeQQuery(any(), any()) } returns true.right() @@ -258,16 +285,21 @@ class NotificationServiceTests { .willReturn(ok()) ) - notificationService.notifyMatchingSubscribers(jsonLdEntity, ngsiLdEntity, setOf(NGSILD_NAME_PROPERTY)) - .shouldSucceedWith { - assertEquals(2, it.size) - } + notificationService.notifyMatchingSubscribers( + jsonLdEntity, + ngsiLdEntity, + setOf(NGSILD_NAME_PROPERTY), + ATTRIBUTE_DELETED + ).shouldSucceedWith { + assertEquals(2, it.size) + } coVerify { subscriptionService.getMatchingSubscriptions( apiaryId.toUri(), listOf(APIARY_TYPE), - setOf(NGSILD_NAME_PROPERTY) + setOf(NGSILD_NAME_PROPERTY), + ATTRIBUTE_DELETED ) } coVerify { subscriptionService.isMatchingQQuery(subscription1.q, any(), any()) } @@ -299,7 +331,7 @@ class NotificationServiceTests { val ngsiLdEntity = jsonLdEntity.toNgsiLdEntity().shouldSucceedAndResult() coEvery { - subscriptionService.getMatchingSubscriptions(any(), any(), any()) + subscriptionService.getMatchingSubscriptions(any(), any(), any(), any()) } returns listOf(subscription1, subscription2) coEvery { subscriptionService.isMatchingQQuery(any(), any(), any()) } returns true.right() coEvery { subscriptionService.isMatchingScopeQQuery(any(), any()) } returns true.right() @@ -311,22 +343,27 @@ class NotificationServiceTests { .willReturn(ok()) ) - notificationService.notifyMatchingSubscribers(jsonLdEntity, ngsiLdEntity, setOf(NGSILD_NAME_PROPERTY)) - .shouldSucceedWith { results -> - assertEquals(2, results.size) - assertTrue( - results.all { - it.first.id == subscription1.id && !it.third || - it.first.id == subscription2.id && it.third - } - ) - } + notificationService.notifyMatchingSubscribers( + jsonLdEntity, + ngsiLdEntity, + setOf(NGSILD_NAME_PROPERTY), + ATTRIBUTE_CREATED + ).shouldSucceedWith { results -> + assertEquals(2, results.size) + assertTrue( + results.all { + it.first.id == subscription1.id && !it.third || + it.first.id == subscription2.id && it.third + } + ) + } coVerify { subscriptionService.getMatchingSubscriptions( apiaryId.toUri(), listOf(APIARY_TYPE), - setOf(NGSILD_NAME_PROPERTY) + setOf(NGSILD_NAME_PROPERTY), + ATTRIBUTE_CREATED ) } coVerify { subscriptionService.isMatchingQQuery(subscription1.q, any(), any()) } @@ -347,7 +384,7 @@ class NotificationServiceTests { val ngsiLdEntity = jsonLdEntity.toNgsiLdEntity().shouldSucceedAndResult() coEvery { - subscriptionService.getMatchingSubscriptions(any(), any(), any()) + subscriptionService.getMatchingSubscriptions(any(), any(), any(), any()) } returns listOf(subscription1, subscription2) coEvery { subscriptionService.isMatchingQQuery(any(), any(), any()) } returns true.right() coEvery { subscriptionService.isMatchingScopeQQuery(any(), any()) } returns true.right() @@ -360,17 +397,22 @@ class NotificationServiceTests { .willReturn(ok()) ) - notificationService.notifyMatchingSubscribers(jsonLdEntity, ngsiLdEntity, setOf(NGSILD_NAME_PROPERTY)) - .shouldSucceedWith { - assertEquals(1, it.size) - assertEquals(subscription1.id, it[0].first.id) - } + notificationService.notifyMatchingSubscribers( + jsonLdEntity, + ngsiLdEntity, + setOf(NGSILD_NAME_PROPERTY), + ATTRIBUTE_UPDATED + ).shouldSucceedWith { + assertEquals(1, it.size) + assertEquals(subscription1.id, it[0].first.id) + } coVerify { subscriptionService.getMatchingSubscriptions( apiaryId.toUri(), listOf(APIARY_TYPE), - setOf(NGSILD_NAME_PROPERTY) + setOf(NGSILD_NAME_PROPERTY), + ATTRIBUTE_UPDATED ) } coVerify { subscriptionService.isMatchingQQuery(subscription1.q, any(), any()) } diff --git a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt index 24c197ff3..18e970c20 100644 --- a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt +++ b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt @@ -6,18 +6,22 @@ import com.egm.stellio.shared.model.JsonLdEntity import com.egm.stellio.shared.model.NotImplementedException import com.egm.stellio.shared.util.* import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_CORE_CONTEXT +import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_LOCATION_PROPERTY import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_SUBSCRIPTION_TERM import com.egm.stellio.subscription.model.* import com.egm.stellio.subscription.model.NotificationParams.FormatType import com.egm.stellio.subscription.model.NotificationParams.StatusType +import com.egm.stellio.subscription.model.NotificationTrigger.* import com.egm.stellio.subscription.support.WithTimescaleContainer import com.egm.stellio.subscription.utils.ParsingUtils -import com.egm.stellio.subscription.utils.gimmeRawSubscription +import com.egm.stellio.subscription.utils.gimmeSubscriptionFromMembers +import com.egm.stellio.subscription.utils.loadAndDeserializeSubscription import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.Test @@ -26,14 +30,13 @@ import org.junit.jupiter.params.provider.CsvSource import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest import org.springframework.core.io.ClassPathResource +import org.springframework.data.r2dbc.core.R2dbcEntityTemplate import org.springframework.test.context.ActiveProfiles import org.springframework.test.context.TestPropertySource import java.net.URI -import java.time.Instant -import java.time.ZoneOffset import java.time.ZonedDateTime -import java.time.temporal.ChronoUnit import java.util.UUID +import kotlin.time.Duration @OptIn(ExperimentalCoroutinesApi::class) @SpringBootTest @@ -44,15 +47,10 @@ class SubscriptionServiceTests : WithTimescaleContainer { @Autowired private lateinit var subscriptionService: SubscriptionService - private val mockUserSub = Some(UUID.randomUUID().toString()) + @Autowired + private lateinit var r2dbcEntityTemplate: R2dbcEntityTemplate - private lateinit var subscription1Id: URI - private lateinit var subscription2Id: URI - private lateinit var subscription3Id: URI - private lateinit var subscription4Id: URI - private lateinit var subscription5Id: URI - private lateinit var subscription6Id: URI - private lateinit var subscription7Id: URI + private val mockUserSub = Some(UUID.randomUUID().toString()) private val entity = ClassPathResource("/ngsild/aquac/FeedingService.json").inputStream.readBytes().toString(Charsets.UTF_8) @@ -60,168 +58,93 @@ class SubscriptionServiceTests : WithTimescaleContainer { private lateinit var jsonldEntity: JsonLdEntity @BeforeAll - fun bootstrapSubscriptions() { + fun loadTestEntity() { runBlocking { jsonldEntity = JsonLdUtils.expandJsonLdEntity(entity, listOf(APIC_COMPOUND_CONTEXT)) } - - createSubscription1() - createSubscription2() - createSubscription3() - createSubscription4() - createSubscription5() - createSubscription6() - createSubscription7() - } - - private fun createSubscription(subscription: Subscription): URI { - runBlocking { - subscriptionService.create(subscription, mockUserSub) - } - return subscription.id } - private fun createSubscription1() { - val subscription = gimmeRawSubscription( - withQueryAndGeoQuery = Pair(first = true, second = false), - withEndpointInfo = false, - withNotifParams = Pair(FormatType.NORMALIZED, listOf(INCOMING_PROPERTY)) - ).copy( - subscriptionName = "Subscription 1", - scopeQ = "/A/+/C,/B", - entities = setOf( - EntityInfo(id = null, idPattern = null, type = BEEHIVE_TYPE), - EntityInfo(id = null, idPattern = "urn:ngsi-ld:Beekeeper:1234*", type = BEEKEEPER_TYPE) - ) - ) - subscription1Id = createSubscription(subscription) - } - - private fun createSubscription2() { - val subscription = gimmeRawSubscription( - withEndpointInfo = true, - withNotifParams = Pair(FormatType.NORMALIZED, listOf(INCOMING_PROPERTY)) - ).copy( - subscriptionName = "Subscription 2", - entities = setOf( - EntityInfo(id = null, idPattern = null, type = BEEKEEPER_TYPE), - EntityInfo(id = "urn:ngsi-ld:Beehive:1234567890".toUri(), idPattern = null, type = BEEHIVE_TYPE) - ), - expiresAt = Instant.now().atZone(ZoneOffset.UTC).plusDays(1) - ) - subscription2Id = createSubscription(subscription) + @AfterEach + fun deleteSubscriptions() { + r2dbcEntityTemplate.delete(Subscription::class.java) + .all() + .block() } - private fun createSubscription3() { - val subscription = gimmeRawSubscription( - withEndpointInfo = false, - withNotifParams = Pair(FormatType.NORMALIZED, listOf(INCOMING_PROPERTY)) - ).copy( - subscriptionName = "Subscription 3", - entities = setOf( - EntityInfo(id = null, idPattern = null, type = APIARY_TYPE) - ), - isActive = false - ) - subscription3Id = createSubscription(subscription) - } - - private fun createSubscription4() { - val subscription = gimmeRawSubscription( - withEndpointInfo = false, - withNotifParams = Pair(FormatType.NORMALIZED, listOf(INCOMING_PROPERTY)) - ).copy( - subscriptionName = "Subscription 4", - entities = setOf( - EntityInfo(id = null, idPattern = null, type = BEEHIVE_TYPE) - ), - isActive = false, - watchedAttributes = listOf(INCOMING_PROPERTY, OUTGOING_PROPERTY) - ) - subscription4Id = createSubscription(subscription) - } - - private fun createSubscription5() { - val subscription = gimmeRawSubscription( - withNotifParams = Pair(FormatType.NORMALIZED, listOf(INCOMING_PROPERTY)) - ).copy( - subscriptionName = "Subscription 5", - entities = setOf( - EntityInfo(id = "urn:ngsi-ld:smartDoor:77".toUri(), idPattern = null, type = DEVICE_TYPE) - ), - isActive = true, - expiresAt = null - ) - subscription5Id = createSubscription(subscription) - } - - private fun createSubscription6() { - val subscription = gimmeRawSubscription( - withNotifParams = Pair(FormatType.NORMALIZED, listOf(INCOMING_PROPERTY)) - ).copy( - subscriptionName = "Subscription 6", - entities = setOf( - EntityInfo(id = "urn:ngsi-ld:smartDoor:88".toUri(), idPattern = null, type = DEVICE_TYPE) - ), - isActive = false, - expiresAt = ZonedDateTime.parse("2012-08-12T08:33:38Z") + @Test + fun `it should not allow a subscription with an empty id`() = runTest { + val payload = mapOf( + "id" to "", + "type" to NGSILD_SUBSCRIPTION_TERM, + "entities" to listOf(mapOf("type" to BEEHIVE_TYPE)), + "notification" to mapOf("endpoint" to mapOf("uri" to "http://my.endpoint/notifiy")) ) - subscription6Id = createSubscription(subscription) - } - private fun createSubscription7() { - val subscription = gimmeRawSubscription().copy( - subscriptionName = "Subscription 7", - entities = setOf( - EntityInfo(id = null, idPattern = null, type = APIARY_TYPE) - ), - contexts = listOf(APIC_COMPOUND_CONTEXT) - ) - subscription7Id = createSubscription(subscription) + val subscription = ParsingUtils.parseSubscription(payload, emptyList()).shouldSucceedAndResult() + subscriptionService.validateNewSubscription(subscription) + .shouldFailWith { + it is BadRequestDataException && + it.message == "The supplied identifier was expected to be an URI but it is not: " + } } @Test - fun `it should not retrieve an expired subscription matching an id`() = runTest { - val persistedSubscription = - subscriptionService.getMatchingSubscriptions( - "urn:ngsi-ld:smartDoor:88".toUri(), - listOf(DEVICE_TYPE), - setOf(INCOMING_PROPERTY) - ) + fun `it should not allow a subscription with an invalid id`() = runTest { + val payload = mapOf( + "id" to "invalidId", + "type" to NGSILD_SUBSCRIPTION_TERM, + "entities" to listOf(mapOf("type" to BEEHIVE_TYPE)), + "notification" to mapOf("endpoint" to mapOf("uri" to "http://my.endpoint/notifiy")) + ) - assertThat(persistedSubscription).isEmpty() + val subscription = ParsingUtils.parseSubscription(payload, emptyList()).shouldSucceedAndResult() + subscriptionService.validateNewSubscription(subscription) + .shouldFailWith { + it is BadRequestDataException && + it.message == "The supplied identifier was expected to be an URI but it is not: invalidId" + } } @Test - fun `it should retrieve a subscription matching an id when expired date is not given`() = runTest { - val persistedSubscription = - subscriptionService.getMatchingSubscriptions( - "urn:ngsi-ld:smartDoor:77".toUri(), - listOf(DEVICE_TYPE), - setOf(INCOMING_PROPERTY) - ) + fun `it should not allow a subscription with an invalid idPattern`() = runTest { + val payload = mapOf( + "id" to "urn:ngsi-ld:Beehive:1234567890".toUri(), + "type" to NGSILD_SUBSCRIPTION_TERM, + "entities" to listOf(mapOf("type" to BEEHIVE_TYPE, "idPattern" to "[")), + "notification" to mapOf("endpoint" to mapOf("uri" to "http://my.endpoint/notifiy")) + ) - assertThat(persistedSubscription).hasSize(1) + val subscription = ParsingUtils.parseSubscription(payload, emptyList()).shouldSucceedAndResult() + subscriptionService.validateNewSubscription(subscription) + .shouldFailWith { + it is BadRequestDataException && + it.message == "Invalid idPattern found in subscription" + } } @Test - fun `it should retrieve a subscription matching an id when expired date is in the future`() = runTest { - val persistedSubscription = - subscriptionService.getMatchingSubscriptions( - "urn:ngsi-ld:Beehive:1234567890".toUri(), - listOf(BEEHIVE_TYPE), - setOf(INCOMING_PROPERTY) - ) + fun `it should not allow a subscription without entities and watchedAttributes`() = runTest { + val payload = mapOf( + "id" to "urn:ngsi-ld:Beehive:1234567890".toUri(), + "type" to NGSILD_SUBSCRIPTION_TERM, + "notification" to mapOf("endpoint" to mapOf("uri" to "http://my.endpoint/notifiy")) + ) - assertThat(persistedSubscription).hasSize(2) + val subscription = ParsingUtils.parseSubscription(payload, emptyList()).shouldSucceedAndResult() + subscriptionService.validateNewSubscription(subscription) + .shouldFailWith { + it is BadRequestDataException && + it.message == "At least one of entities or watchedAttributes shall be present" + } } @Test - fun `it should not allow a subscription with an empty id`() = runTest { + fun `it should not allow a subscription with timeInterval and watchedAttributes`() = runTest { val payload = mapOf( - "id" to "", + "id" to "urn:ngsi-ld:Beehive:1234567890".toUri(), "type" to NGSILD_SUBSCRIPTION_TERM, - "entities" to listOf(mapOf("type" to BEEHIVE_TYPE)), + "timeInterval" to 10, + "watchedAttributes" to listOf(INCOMING_COMPACT_PROPERTY), "notification" to mapOf("endpoint" to mapOf("uri" to "http://my.endpoint/notifiy")) ) @@ -229,16 +152,17 @@ class SubscriptionServiceTests : WithTimescaleContainer { subscriptionService.validateNewSubscription(subscription) .shouldFailWith { it is BadRequestDataException && - it.message == "The supplied identifier was expected to be an URI but it is not: " + it.message == "You can't use 'timeInterval' in conjunction with 'watchedAttributes'" } } @Test - fun `it should not allow a subscription with an invalid id`() = runTest { + fun `it should not allow a subscription with a negative timeInterval`() = runTest { val payload = mapOf( - "id" to "invalidId", + "id" to "urn:ngsi-ld:Beehive:1234567890".toUri(), "type" to NGSILD_SUBSCRIPTION_TERM, - "entities" to listOf(mapOf("type" to BEEHIVE_TYPE)), + "watchedAttributes" to listOf(INCOMING_COMPACT_PROPERTY), + "timeInterval" to -10, "notification" to mapOf("endpoint" to mapOf("uri" to "http://my.endpoint/notifiy")) ) @@ -246,170 +170,144 @@ class SubscriptionServiceTests : WithTimescaleContainer { subscriptionService.validateNewSubscription(subscription) .shouldFailWith { it is BadRequestDataException && - it.message == "The supplied identifier was expected to be an URI but it is not: invalidId" + it.message == "The value of 'timeInterval' must be greater than zero (int)" } } @Test - fun `it should not allow a subscription with an invalid idPattern`() = runTest { + fun `it should not allow a subscription with an expiresAt in the past`() = runTest { val payload = mapOf( "id" to "urn:ngsi-ld:Beehive:1234567890".toUri(), "type" to NGSILD_SUBSCRIPTION_TERM, - "entities" to listOf(mapOf("type" to BEEHIVE_TYPE, "idPattern" to "[")), + "watchedAttributes" to listOf(INCOMING_COMPACT_PROPERTY), + "expiresAt" to ngsiLdDateTime().minusDays(1), "notification" to mapOf("endpoint" to mapOf("uri" to "http://my.endpoint/notifiy")) ) val subscription = ParsingUtils.parseSubscription(payload, emptyList()).shouldSucceedAndResult() subscriptionService.validateNewSubscription(subscription) - .shouldFail { + .shouldFailWith { it is BadRequestDataException && - it.message == "Invalid value for idPattern: [" + it.message == "'expiresAt' must be in the future" } } @Test - fun `it should load and fill a persisted subscription with entities info`() = runTest { - val persistedSubscription = subscriptionService.getById(subscription1Id) + fun `it should not allow a subscription with an unknown notification trigger`() = runTest { + val payload = mapOf( + "id" to "urn:ngsi-ld:Beehive:1234567890".toUri(), + "type" to NGSILD_SUBSCRIPTION_TERM, + "entities" to listOf(mapOf("type" to BEEHIVE_TYPE)), + "notificationTrigger" to listOf("unknownNotificationTrigger"), + "notification" to mapOf("endpoint" to mapOf("uri" to "http://my.endpoint/notifiy")) + ) - assertThat(persistedSubscription) - .matches { - it.subscriptionName == "Subscription 1" && - it.description == "My beautiful subscription" && - it.notification.attributes == listOf(INCOMING_PROPERTY) && - it.notification.format == FormatType.NORMALIZED && - it.notification.endpoint == - Endpoint( - URI("http://localhost:8089/notification"), - Endpoint.AcceptType.JSONLD, - null - ) && - it.entities.size == 2 && - it.entities.any { entityInfo -> - entityInfo.type == BEEKEEPER_TYPE && - entityInfo.id == null && - entityInfo.idPattern == "urn:ngsi-ld:Beekeeper:1234*" - } && - it.entities.any { entityInfo -> - entityInfo.type == BEEHIVE_TYPE && - entityInfo.id == null && - entityInfo.idPattern == null - } && - it.geoQ == null + val subscription = ParsingUtils.parseSubscription(payload, emptyList()).shouldSucceedAndResult() + subscriptionService.validateNewSubscription(subscription) + .shouldFailWith { + it is BadRequestDataException && + it.message == "Unknown notification trigger in [unknownNotificationTrigger]" } } @Test - fun `it should load and fill a persisted subscription with entities info and query`() = runTest { - val persistedSubscription = subscriptionService.getById(subscription3Id) + fun `it should load a subscription with minimal required info - entities`() = runTest { + val subscription = loadAndDeserializeSubscription("subscription_minimal_entities.json") + subscriptionService.create(subscription, mockUserSub).shouldSucceed() + val persistedSubscription = subscriptionService.getById(subscription.id) assertThat(persistedSubscription) .matches { - it.subscriptionName == "Subscription 3" && - it.description == "My beautiful subscription" && - it.q == "speed>50;foodName==dietary fibres" && - it.notification.attributes == listOf(INCOMING_PROPERTY) && + it.id == "urn:ngsi-ld:Subscription:1".toUri() && it.notification.format == FormatType.NORMALIZED && - it.notification.endpoint == Endpoint( - URI("http://localhost:8089/notification"), - Endpoint.AcceptType.JSONLD, - null - ) && - it.entities.size == 1 + it.notification.endpoint.uri == URI("http://localhost:8084") && + it.notification.endpoint.accept == Endpoint.AcceptType.JSON && + ( + it.entities != null && + it.entities!!.size == 1 && + it.entities!!.all { it.type == BEEHIVE_TYPE } + ) && + it.watchedAttributes == null && + it.isActive } } @Test - fun `it should load and fill a persisted subscription with entities info and geoquery and endpoint info`() = - runTest { - val persistedSubscription = subscriptionService.getById(subscription2Id) - - assertThat(persistedSubscription) - .matches { - it.subscriptionName == "Subscription 2" && - it.description == "My beautiful subscription" && - it.notification.attributes == listOf(INCOMING_PROPERTY) && - it.notification.format == FormatType.NORMALIZED && - it.notification.endpoint == Endpoint( - URI("http://localhost:8089/notification"), - Endpoint.AcceptType.JSONLD, - listOf(EndpointInfo("Authorization-token", "Authorization-token-value")) - ) && - it.entities.size == 2 && - it.geoQ != null && - it.geoQ!!.georel == "within" && - it.geoQ!!.geometry == "Polygon" && - it.geoQ!!.geoproperty == "https://uri.etsi.org/ngsi-ld/location" && - it.geoQ!!.coordinates == - "[[[100.0, 0.0], [101.0, 0.0], [101.0, 1.0], [100.0, 1.0], [100.0, 0.0]]]" - } - } - - @Test - fun `it should load and fill a persisted subscription with entities info and active status`() = runTest { - val persistedSubscription = subscriptionService.getById(subscription2Id) + fun `it should load a subscription with minimal required info - watchedAttributes`() = runTest { + val subscription = loadAndDeserializeSubscription("subscription_minimal_watched_attributes.json") + subscriptionService.create(subscription, mockUserSub).shouldSucceed() + val persistedSubscription = subscriptionService.getById(subscription.id) assertThat(persistedSubscription) .matches { - it.subscriptionName == "Subscription 2" && - it.description == "My beautiful subscription" && - it.notification.attributes == listOf(INCOMING_PROPERTY) && + it.id == "urn:ngsi-ld:Subscription:1".toUri() && it.notification.format == FormatType.NORMALIZED && - it.notification.endpoint == Endpoint( - URI("http://localhost:8089/notification"), - Endpoint.AcceptType.JSONLD, - listOf(EndpointInfo("Authorization-token", "Authorization-token-value")) - ) && - it.entities.size == 2 && + it.notification.endpoint.uri == URI("http://localhost:8084") && + it.notification.endpoint.accept == Endpoint.AcceptType.JSON && + it.entities == null && + it.watchedAttributes == listOf(INCOMING_PROPERTY, OUTGOING_PROPERTY) && it.isActive } } @Test - fun `it should load and fill a persisted subscription with entities info and inactive status`() = runTest { - val persistedSubscription = subscriptionService.getById(subscription4Id) + fun `it should load a subscription with all possible members`() = runTest { + val subscription = loadAndDeserializeSubscription("subscription_full.json") + subscriptionService.create(subscription, mockUserSub).shouldSucceed() + val persistedSubscription = subscriptionService.getById(subscription.id) assertThat(persistedSubscription) .matches { - it.subscriptionName == "Subscription 4" && - it.description == "My beautiful subscription" && - it.notification.attributes == listOf(INCOMING_PROPERTY) && + it.id == "urn:ngsi-ld:Subscription:1".toUri() && + it.subscriptionName == "A subscription with all possible members" && + it.description == "A possible description" && + ( + it.entities != null && + it.entities!!.size == 3 && + it.entities!!.all { it.type == BEEHIVE_TYPE } && + it.entities!!.any { it.id == "urn:ngsi-ld:Beehive:1234567890".toUri() } && + it.entities!!.any { it.idPattern == "urn:ngsi-ld:Beehive:1234*" } + ) && + it.watchedAttributes == listOf(INCOMING_PROPERTY) && + it.notificationTrigger == listOf( + ENTITY_CREATED.notificationTrigger, + ATTRIBUTE_UPDATED.notificationTrigger, + ENTITY_DELETED.notificationTrigger + ) && + it.timeInterval == null && + it.q == "foodQuantity<150;foodName=='dietary fibres'" && + ( + it.geoQ != null && + it.geoQ!!.georel == "within" && + it.geoQ!!.geometry == "Polygon" && + it.geoQ!!.coordinates == + "[[[100.0, 0.0], [101.0, 0.0], [101.0, 1.0], [100.0, 1.0], [100.0, 0.0]]]" && + it.geoQ!!.geoproperty == NGSILD_LOCATION_PROPERTY + ) && + it.scopeQ == "/Nantes/+" && + it.notification.attributes == listOf(INCOMING_PROPERTY, OUTGOING_PROPERTY) && it.notification.format == FormatType.NORMALIZED && it.notification.endpoint == Endpoint( - URI("http://localhost:8089/notification"), - Endpoint.AcceptType.JSONLD, - null + URI("http://localhost:8084"), + Endpoint.AcceptType.JSON, + listOf(EndpointInfo("Authorization-token", "Authorization-token-value")) ) && - it.entities.size == 1 && - !it.isActive + it.expiresAt == ZonedDateTime.parse("2100-01-01T00:00:00Z") } } @Test - fun `it should load and fill a persisted subscription with entities info and null value for watched attributes`() = - runTest { - val persistedSubscription = subscriptionService.getById(subscription2Id) - - assertThat(persistedSubscription) - .matches { - it.subscriptionName == "Subscription 2" && - it.description == "My beautiful subscription" && - it.entities.size == 2 && - it.watchedAttributes == null - } - } - - @Test - fun `it should load and fill a persisted subscription with the correct format for temporal values`() = runTest { - val createdAt = Instant.now().truncatedTo(ChronoUnit.MICROS).atZone(ZoneOffset.UTC) - val subscription = gimmeRawSubscription().copy( - createdAt = createdAt, - entities = setOf( - EntityInfo(id = "urn:ngsi-ld:smartDoor:77".toUri(), idPattern = null, type = DEVICE_TYPE) + fun `it should load a subscription with extra info on last notification`() = runTest { + val subscription = gimmeSubscriptionFromMembers( + mapOf( + "entities" to listOf( + mapOf("id" to "urn:ngsi-ld:smartDoor:77".toUri(), "type" to DEVICE_COMPACT_TYPE) + ) ) ) - val notifiedAt = ngsiLdDateTime() + subscriptionService.create(subscription, mockUserSub).shouldSucceed() - subscriptionService.create(subscription, mockUserSub) + val notifiedAt = ngsiLdDateTime() subscriptionService.updateSubscriptionNotification( subscription, Notification(subscriptionId = subscription.id, notifiedAt = notifiedAt, data = emptyList()), @@ -417,27 +315,25 @@ class SubscriptionServiceTests : WithTimescaleContainer { ) val persistedSubscription = subscriptionService.getById(subscription.id) - assertThat(persistedSubscription) .matches { it.notification.lastNotification != null && it.notification.lastNotification!!.isEqual(notifiedAt) && it.notification.lastSuccess != null && it.notification.lastSuccess!!.isEqual(notifiedAt) && - it.createdAt.isEqual(createdAt) + it.notification.lastFailure == null && + it.notification.timesSent == 1 && + it.notification.status == StatusType.OK } - - subscriptionService.delete(subscription.id) } @Test fun `it should delete an existing subscription`() = runTest { - val subscription = gimmeRawSubscription() + val subscription = loadAndDeserializeSubscription("subscription_minimal_entities.json") - subscriptionService.create(subscription, mockUserSub) + subscriptionService.create(subscription, mockUserSub).shouldSucceed() - subscriptionService.delete(subscription.id) - .shouldSucceed() + subscriptionService.delete(subscription.id).shouldSucceed() } @Test @@ -447,74 +343,114 @@ class SubscriptionServiceTests : WithTimescaleContainer { } @Test - fun `it should retrieve a subscription matching an idPattern`() = runTest { + fun `it should not retrieve an expired subscription`() = runTest { + val subscription = gimmeSubscriptionFromMembers( + mapOf( + "entities" to listOf(mapOf("id" to "urn:ngsi-ld:Beekeeper:01", "type" to BEEKEEPER_COMPACT_TYPE)), + "expiresAt" to ngsiLdDateTime().plusSeconds(1) + ) + ) + subscriptionService.create(subscription, mockUserSub).shouldSucceed() + + // add a delay to ensure subscription has expired + runBlocking { + delay(Duration.parse("2s")) + } + val persistedSubscription = subscriptionService.getMatchingSubscriptions( - "urn:ngsi-ld:Beekeeper:12345678".toUri(), + "urn:ngsi-ld:Beekeeper:01".toUri(), listOf(BEEKEEPER_TYPE), - setOf(INCOMING_PROPERTY) + emptySet(), + ATTRIBUTE_CREATED ) - assertThat(persistedSubscription).hasSize(2) + assertThat(persistedSubscription).isEmpty() } @Test - fun `it should not retrieve a subscription if idPattern does not match`() = runTest { + fun `it should retrieve a subscription whose expiration date has not been reached`() = runTest { + val subscription = gimmeSubscriptionFromMembers( + mapOf( + "entities" to listOf(mapOf("id" to "urn:ngsi-ld:Beekeeper:01", "type" to BEEKEEPER_COMPACT_TYPE)), + "expiresAt" to ngsiLdDateTime().plusDays(1), + ) + ) + subscriptionService.create(subscription, mockUserSub).shouldSucceed() + val persistedSubscription = subscriptionService.getMatchingSubscriptions( - "urn:ngsi-ld:Beekeeper:9876543".toUri(), + "urn:ngsi-ld:Beekeeper:01".toUri(), listOf(BEEKEEPER_TYPE), - setOf(INCOMING_PROPERTY) + emptySet(), + ATTRIBUTE_CREATED ) - assertThat(persistedSubscription) - .hasSize(1) - .element(0).matches { - it.subscriptionName == "Subscription 2" - } + assertThat(persistedSubscription).hasSize(1) } @Test - fun `it should retrieve a subscription matching a type and not one with non matching id`() = runTest { + fun `it should retrieve a subscription matching an idPattern`() = runTest { + val subscription = gimmeSubscriptionFromMembers( + mapOf( + "entities" to listOf( + mapOf("idPattern" to "urn:ngsi-ld:Beekeeper:123*", "type" to BEEKEEPER_COMPACT_TYPE) + ) + ) + ) + subscriptionService.create(subscription, mockUserSub).shouldSucceed() + val persistedSubscription = subscriptionService.getMatchingSubscriptions( - "urn:ngsi-ld:Beehive:ABCD".toUri(), - listOf(BEEHIVE_TYPE), - setOf(INCOMING_PROPERTY) + "urn:ngsi-ld:Beekeeper:12345678".toUri(), + listOf(BEEKEEPER_TYPE), + emptySet(), + ATTRIBUTE_UPDATED ) - assertThat(persistedSubscription) - .hasSize(1) - .element(0).matches { - it.subscriptionName == "Subscription 1" && - it.notification.endpoint == Endpoint( - URI("http://localhost:8089/notification"), - Endpoint.AcceptType.JSONLD - ) && - it.entities.isEmpty() - } + assertThat(persistedSubscription).hasSize(1) } @Test - fun `it should retrieve a subscription matching a type and an exact id`() = runTest { + fun `it should not retrieve a subscription if idPattern does not match`() = runTest { + val subscription = gimmeSubscriptionFromMembers( + mapOf( + "entities" to listOf( + mapOf("idPattern" to "urn:ngsi-ld:Beekeeper:123*", "type" to BEEKEEPER_COMPACT_TYPE) + ) + ) + ) + subscriptionService.create(subscription, mockUserSub).shouldSucceed() + val persistedSubscription = subscriptionService.getMatchingSubscriptions( - "urn:ngsi-ld:Beehive:1234567890".toUri(), - listOf(BEEHIVE_TYPE), - setOf(INCOMING_PROPERTY) + "urn:ngsi-ld:Beekeeper:3456789".toUri(), + listOf(BEEKEEPER_TYPE), + emptySet(), + ATTRIBUTE_UPDATED ) assertThat(persistedSubscription) - .hasSize(2) + .isEmpty() } @Test - fun `it should retrieve a subscription matching an id`() = runTest { + fun `it should retrieve a subscription matching a type`() = runTest { + val subscription = gimmeSubscriptionFromMembers( + mapOf( + "entities" to listOf( + mapOf("type" to BEEKEEPER_COMPACT_TYPE) + ) + ) + ) + subscriptionService.create(subscription, mockUserSub).shouldSucceed() + val persistedSubscription = subscriptionService.getMatchingSubscriptions( - "urn:ngsi-ld:Beehive:1234567890".toUri(), - listOf(BEEHIVE_TYPE), - setOf(INCOMING_PROPERTY) + "urn:ngsi-ld:Beekeeper:01".toUri(), + listOf(BEEKEEPER_TYPE), + emptySet(), + ATTRIBUTE_UPDATED ) assertThat(persistedSubscription) @@ -522,41 +458,69 @@ class SubscriptionServiceTests : WithTimescaleContainer { } @Test - fun `it should not retrieve a subscription if type does not match`() = runTest { + fun `it should retrieve a subscription matching a type and an id`() = runTest { + val subscription = gimmeSubscriptionFromMembers( + mapOf( + "entities" to listOf( + mapOf("id" to "urn:ngsi-ld:Beekeeper:01", "type" to BEEKEEPER_COMPACT_TYPE) + ) + ) + ) + subscriptionService.create(subscription, mockUserSub).shouldSucceed() + val persistedSubscription = subscriptionService.getMatchingSubscriptions( - "urn:ngsi-ld:Sensor:1234567890".toUri(), - listOf(SENSOR_TYPE), - setOf(INCOMING_PROPERTY) + "urn:ngsi-ld:Beekeeper:01".toUri(), + listOf(BEEKEEPER_TYPE), + emptySet(), + ATTRIBUTE_UPDATED ) assertThat(persistedSubscription) - .isEmpty() + .hasSize(1) } @Test - fun `it should retrieve an activated subscription matching an id`() = runTest { + fun `it should not retrieve a subscription if type does not match`() = runTest { + val subscription = gimmeSubscriptionFromMembers( + mapOf( + "entities" to listOf( + mapOf("id" to "urn:ngsi-ld:Beekeeper:01", "type" to BEEKEEPER_COMPACT_TYPE) + ) + ) + ) + subscriptionService.create(subscription, mockUserSub).shouldSucceed() + val persistedSubscription = subscriptionService.getMatchingSubscriptions( - "urn:ngsi-ld:smartDoor:77".toUri(), - listOf(DEVICE_TYPE), - setOf(INCOMING_PROPERTY) + "urn:ngsi-ld:Beekeeper:01".toUri(), + listOf(SENSOR_TYPE), + emptySet(), + ATTRIBUTE_UPDATED ) assertThat(persistedSubscription) - .hasSize(1) - .element(0).matches { - it.subscriptionName == "Subscription 5" - } + .isEmpty() } @Test fun `it should not retrieve a deactivated subscription matching an id`() = runTest { + val subscription = gimmeSubscriptionFromMembers( + mapOf( + "entities" to listOf( + mapOf("id" to "urn:ngsi-ld:Beekeeper:01", "type" to BEEKEEPER_COMPACT_TYPE) + ), + "isActive" to false + ) + ) + subscriptionService.create(subscription, mockUserSub).shouldSucceed() + val persistedSubscription = subscriptionService.getMatchingSubscriptions( - "urn:ngsi-ld:smartDoor:88".toUri(), - listOf(DEVICE_TYPE), - setOf(INCOMING_PROPERTY) + "urn:ngsi-ld:Beekeeper:01".toUri(), + listOf(BEEKEEPER_TYPE), + emptySet(), + ATTRIBUTE_UPDATED ) assertThat(persistedSubscription) @@ -564,88 +528,185 @@ class SubscriptionServiceTests : WithTimescaleContainer { } @Test - fun `it should retrieve a subscription if watched attributes is null`() = runTest { - val subscription = gimmeRawSubscription().copy( - subscriptionName = "My subscription", - entities = setOf( - EntityInfo(id = null, idPattern = null, type = BEEHIVE_TYPE) - ), - watchedAttributes = null + fun `it should retrieve a subscription matching one of the watched attributes`() = runTest { + val subscription = gimmeSubscriptionFromMembers( + mapOf( + "watchedAttributes" to listOf(INCOMING_COMPACT_PROPERTY, OUTGOING_COMPACT_PROPERTY) + ) ) + subscriptionService.create(subscription, mockUserSub).shouldSucceed() - subscriptionService.create(subscription, mockUserSub) - - val persistedSubscription = + val subscriptions = subscriptionService.getMatchingSubscriptions( "urn:ngsi-ld:Beehive:1234567890".toUri(), listOf(BEEHIVE_TYPE), - setOf(INCOMING_PROPERTY) + setOf(INCOMING_PROPERTY), + ATTRIBUTE_UPDATED ) - assertThat(persistedSubscription) - .anyMatch { it.id == subscription.id } + assertThat(subscriptions) + .hasSize(1) + } - subscriptionService.delete(subscription.id) + @Test + fun `it should not retrieve a subscription not matching one of the watched attributes`() = runTest { + val subscription = gimmeSubscriptionFromMembers( + mapOf( + "watchedAttributes" to listOf(INCOMING_COMPACT_PROPERTY, OUTGOING_COMPACT_PROPERTY) + ) + ) + subscriptionService.create(subscription, mockUserSub).shouldSucceed() + + val subscriptions = + subscriptionService.getMatchingSubscriptions( + "urn:ngsi-ld:Beehive:1234567890".toUri(), + listOf(BEEHIVE_TYPE), + setOf(TEMPERATURE_PROPERTY), + ATTRIBUTE_UPDATED + ) + + assertThat(subscriptions) + .isEmpty() } @Test - fun `it should retrieve a subscription if watched attributes contains at least one of the updated attributes`() = - runTest { - val subscription = gimmeRawSubscription().copy( - subscriptionName = "My subscription", - entities = setOf( - EntityInfo(id = null, idPattern = null, type = BEEHIVE_TYPE) - ), - watchedAttributes = listOf(INCOMING_PROPERTY, OUTGOING_PROPERTY, TEMPERATURE_PROPERTY) + fun `it should not retrieve a subscription matching on type and not on one of the watched attributes`() = runTest { + val subscription = gimmeSubscriptionFromMembers( + mapOf( + "entities" to listOf(mapOf("type" to BEEHIVE_COMPACT_TYPE)), + "watchedAttributes" to listOf(INCOMING_COMPACT_PROPERTY, OUTGOING_COMPACT_PROPERTY) ) + ) + subscriptionService.create(subscription, mockUserSub).shouldSucceed() - subscriptionService.create(subscription, mockUserSub) + val subscriptions = + subscriptionService.getMatchingSubscriptions( + "urn:ngsi-ld:Beehive:1234567890".toUri(), + listOf(BEEHIVE_TYPE), + setOf(TEMPERATURE_PROPERTY), + ATTRIBUTE_UPDATED + ) - val subscriptions = - subscriptionService.getMatchingSubscriptions( - "urn:ngsi-ld:Beehive:1234567890".toUri(), - listOf(BEEHIVE_TYPE), - setOf(INCOMING_PROPERTY) - ) + assertThat(subscriptions) + .isEmpty() + } - assertThat(subscriptions) - .anyMatch { it.id == subscription.id } + @Test + fun `it should retrieve a subscription without watched attributes matching on type`() = runTest { + val subscription = gimmeSubscriptionFromMembers( + mapOf("entities" to listOf(mapOf("type" to BEEHIVE_COMPACT_TYPE))) + ) + subscriptionService.create(subscription, mockUserSub).shouldSucceed() - subscriptionService.delete(subscription.id) - } + val subscriptions = + subscriptionService.getMatchingSubscriptions( + "urn:ngsi-ld:Beehive:1234567890".toUri(), + listOf(BEEHIVE_TYPE), + setOf(TEMPERATURE_PROPERTY), + ATTRIBUTE_UPDATED + ) + + assertThat(subscriptions) + .hasSize(1) + } @Test - fun `it should not retrieve a subscription if watched attributes do not match any updated attributes`() = runTest { - val subscription = gimmeRawSubscription().copy( - subscriptionName = "My subscription", - entities = setOf( - EntityInfo(id = null, idPattern = null, type = BEEHIVE_TYPE) - ), - watchedAttributes = listOf(OUTGOING_PROPERTY, TEMPERATURE_PROPERTY) + fun `it should retrieve a subscription without watched attributes matching on type and id pattern`() = runTest { + val subscription = gimmeSubscriptionFromMembers( + mapOf("entities" to listOf(mapOf("idPattern" to "urn:ngsi-ld:Beehive:*", "type" to BEEHIVE_COMPACT_TYPE))) ) + subscriptionService.create(subscription, mockUserSub).shouldSucceed() + + val subscriptions = + subscriptionService.getMatchingSubscriptions( + "urn:ngsi-ld:Beehive:1234567890".toUri(), + listOf(BEEHIVE_TYPE), + setOf(TEMPERATURE_PROPERTY), + ATTRIBUTE_UPDATED + ) - subscriptionService.create(subscription, mockUserSub) + assertThat(subscriptions) + .hasSize(1) + } + + @Test + fun `it should retrieve a subscription with exact match on the notification trigger`() = runTest { + val subscription = gimmeSubscriptionFromMembers( + mapOf( + "entities" to listOf(mapOf("idPattern" to "urn:ngsi-ld:Beehive:*", "type" to BEEHIVE_COMPACT_TYPE)), + "notificationTrigger" to listOf(ENTITY_CREATED.notificationTrigger) + ) + ) + subscriptionService.create(subscription, mockUserSub).shouldSucceed() val subscriptions = subscriptionService.getMatchingSubscriptions( "urn:ngsi-ld:Beehive:1234567890".toUri(), listOf(BEEHIVE_TYPE), - setOf(INCOMING_PROPERTY) + setOf(TEMPERATURE_PROPERTY), + ENTITY_CREATED ) assertThat(subscriptions) - .allMatch { it.id != subscription.id } + .hasSize(1) + } - subscriptionService.delete(subscription.id) + @Test + fun `it should retrieve a subscription with entityUpdated trigger matched with an attribute event`() = runTest { + val subscription = gimmeSubscriptionFromMembers( + mapOf( + "entities" to listOf(mapOf("idPattern" to "urn:ngsi-ld:Beehive:*", "type" to BEEHIVE_COMPACT_TYPE)), + "notificationTrigger" to listOf(ENTITY_UPDATED.notificationTrigger) + ) + ) + subscriptionService.create(subscription, mockUserSub).shouldSucceed() + + val subscriptions = + subscriptionService.getMatchingSubscriptions( + "urn:ngsi-ld:Beehive:1234567890".toUri(), + listOf(BEEHIVE_TYPE), + setOf(TEMPERATURE_PROPERTY), + ATTRIBUTE_UPDATED + ) + + assertThat(subscriptions) + .hasSize(1) } + @Test + fun `it should not retrieve a subscription with entityUpdated trigger matched with an entity delete event`() = + runTest { + val subscription = gimmeSubscriptionFromMembers( + mapOf( + "entities" to listOf(mapOf("idPattern" to "urn:ngsi-ld:Beehive:*", "type" to BEEHIVE_COMPACT_TYPE)), + "notificationTrigger" to listOf(ENTITY_DELETED.notificationTrigger) + ) + ) + subscriptionService.create(subscription, mockUserSub).shouldSucceed() + + val subscriptions = + subscriptionService.getMatchingSubscriptions( + "urn:ngsi-ld:Beehive:1234567890".toUri(), + listOf(BEEHIVE_TYPE), + setOf(TEMPERATURE_PROPERTY), + ATTRIBUTE_UPDATED + ) + + assertThat(subscriptions) + .isEmpty() + } + @Test fun `it should update a subscription`() = runTest { + val subscription = loadAndDeserializeSubscription("subscription_minimal_entities.json") + subscriptionService.create(subscription, mockUserSub).shouldSucceed() + val parsedInput = mapOf( "type" to NGSILD_SUBSCRIPTION_TERM, "subscriptionName" to "My Subscription Updated", "description" to "My beautiful subscription has been updated", "q" to "foodQuantity>=150", + "watchedAttributes" to arrayListOf(INCOMING_COMPACT_PROPERTY, TEMPERATURE_COMPACT_PROPERTY), "scopeQ" to "/A/#,/B", "geoQ" to mapOf( "georel" to "equals", @@ -655,14 +716,15 @@ class SubscriptionServiceTests : WithTimescaleContainer { ) ) - subscriptionService.update(subscription4Id, parsedInput, listOf(APIC_COMPOUND_CONTEXT)) - val subscription = subscriptionService.getById(subscription4Id) + subscriptionService.update(subscription.id, parsedInput, listOf(APIC_COMPOUND_CONTEXT)) - assertThat(subscription) + val updatedSubscription = subscriptionService.getById(subscription.id) + assertThat(updatedSubscription) .matches { it.subscriptionName == "My Subscription Updated" && it.description == "My beautiful subscription has been updated" && it.q == "foodQuantity>=150" && + it.watchedAttributes!! == listOf(INCOMING_PROPERTY, TEMPERATURE_PROPERTY) && it.scopeQ == "/A/#,/B" && it.geoQ!!.georel == "equals" && it.geoQ!!.geometry == "Point" && @@ -673,6 +735,9 @@ class SubscriptionServiceTests : WithTimescaleContainer { @Test fun `it should update a subscription notification`() = runTest { + val subscription = loadAndDeserializeSubscription("subscription_minimal_entities.json") + subscriptionService.create(subscription, mockUserSub).shouldSucceed() + val parsedInput = mapOf( "attributes" to listOf(OUTGOING_COMPACT_PROPERTY), "format" to "keyValues", @@ -685,10 +750,10 @@ class SubscriptionServiceTests : WithTimescaleContainer { ) ) - subscriptionService.updateNotification(subscription4Id, parsedInput, listOf(APIC_COMPOUND_CONTEXT)) - val subscription = subscriptionService.getById(subscription4Id) + subscriptionService.updateNotification(subscription.id, parsedInput, listOf(APIC_COMPOUND_CONTEXT)) - assertThat(subscription) + val updatedSubscription = subscriptionService.getById(subscription.id) + assertThat(updatedSubscription) .matches { it.notification.attributes == listOf(OUTGOING_PROPERTY) && it.notification.format.name == "KEY_VALUES" && @@ -703,6 +768,9 @@ class SubscriptionServiceTests : WithTimescaleContainer { @Test fun `it should update a subscription entities`() = runTest { + val subscription = loadAndDeserializeSubscription("subscription_minimal_entities.json") + subscriptionService.create(subscription, mockUserSub).shouldSucceed() + val parsedInput = listOf( mapOf( "id" to "urn:ngsi-ld:Beehive:123", @@ -714,40 +782,48 @@ class SubscriptionServiceTests : WithTimescaleContainer { ) ) - subscriptionService.updateEntities(subscription4Id, parsedInput, listOf(APIC_COMPOUND_CONTEXT)) - val subscription = subscriptionService.getById(subscription4Id) + subscriptionService.updateEntities(subscription.id, parsedInput, listOf(APIC_COMPOUND_CONTEXT)) - assertThat(subscription) + val updatedSubscription = subscriptionService.getById(subscription.id) + assertThat(updatedSubscription) .matches { - it.entities.contains( - EntityInfo( - id = "urn:ngsi-ld:Beehive:123".toUri(), - idPattern = null, - type = BEEHIVE_TYPE - ) - ) && - it.entities.contains( + it.entities != null && + it.entities!!.contains( + EntityInfo( + id = "urn:ngsi-ld:Beehive:123".toUri(), + idPattern = null, + type = BEEHIVE_TYPE + ) + ) && + it.entities!!.contains( EntityInfo( id = null, idPattern = "urn:ngsi-ld:Beehive:12*", type = BEEHIVE_TYPE ) ) && - it.entities.size == 2 + it.entities!!.size == 2 } } @Test fun `it should activate a subscription`() = runTest { + val subscription = gimmeSubscriptionFromMembers( + mapOf( + "entities" to listOf(mapOf("id" to "urn:ngsi-ld:Beekeeper:01", "type" to BEEKEEPER_COMPACT_TYPE)), + "isActive" to false + ) + ) + subscriptionService.create(subscription, mockUserSub).shouldSucceed() + subscriptionService.update( - subscription3Id, + subscription.id, mapOf("type" to NGSILD_SUBSCRIPTION_TERM, "isActive" to true), listOf(APIC_COMPOUND_CONTEXT) - ) + ).shouldSucceed() - val subscription = subscriptionService.getById(subscription3Id) - - assertThat(subscription) + val updatedSubscription = subscriptionService.getById(subscription.id) + assertThat(updatedSubscription) .matches { it.isActive && it.modifiedAt != null } @@ -755,15 +831,21 @@ class SubscriptionServiceTests : WithTimescaleContainer { @Test fun `it should deactivate a subscription`() = runTest { + val subscription = gimmeSubscriptionFromMembers( + mapOf( + "entities" to listOf(mapOf("id" to "urn:ngsi-ld:Beekeeper:01", "type" to BEEKEEPER_COMPACT_TYPE)) + ) + ) + subscriptionService.create(subscription, mockUserSub).shouldSucceed() + subscriptionService.update( - subscription1Id, + subscription.id, mapOf("type" to NGSILD_SUBSCRIPTION_TERM, "isActive" to false), listOf(APIC_COMPOUND_CONTEXT) ) - val subscription = subscriptionService.getById(subscription1Id) - - assertThat(subscription) + val updatedSubscription = subscriptionService.getById(subscription.id) + assertThat(updatedSubscription) .matches { !it.isActive && it.modifiedAt != null } @@ -771,16 +853,23 @@ class SubscriptionServiceTests : WithTimescaleContainer { @Test fun `it should update and expand watched attributes of a subscription`() = runTest { + val subscription = gimmeSubscriptionFromMembers( + mapOf( + "watchedAttributes" to arrayListOf(INCOMING_COMPACT_PROPERTY) + ) + ) + subscriptionService.create(subscription, mockUserSub).shouldSucceed() + val parsedInput = mapOf( "type" to NGSILD_SUBSCRIPTION_TERM, "watchedAttributes" to arrayListOf(INCOMING_COMPACT_PROPERTY, TEMPERATURE_COMPACT_PROPERTY) ) - subscriptionService.update(subscription5Id, parsedInput, listOf(APIC_COMPOUND_CONTEXT)).shouldSucceed() + subscriptionService.update(subscription.id, parsedInput, listOf(APIC_COMPOUND_CONTEXT)).shouldSucceed() - val subscription = subscriptionService.getById(subscription5Id) + val updatedSubscription = subscriptionService.getById(subscription.id) - assertThat(subscription) + assertThat(updatedSubscription) .matches { it.watchedAttributes!! == listOf(INCOMING_PROPERTY, TEMPERATURE_PROPERTY) && it.modifiedAt != null @@ -788,37 +877,58 @@ class SubscriptionServiceTests : WithTimescaleContainer { } @Test - fun `it should throw a BadRequestData exception if the subscription has an unknown attribute`() = runTest { - val parsedInput = mapOf("unknownAttribute" to "unknownValue") + fun `it should update notification trigger of a subscription`() = runTest { + val subscription = gimmeSubscriptionFromMembers( + mapOf( + "entities" to listOf(mapOf("id" to "urn:ngsi-ld:Beekeeper:01", "type" to BEEKEEPER_COMPACT_TYPE)), + "notificationTrigger" to listOf(ENTITY_CREATED.notificationTrigger) + ) + ) + subscriptionService.create(subscription, mockUserSub).shouldSucceed() - subscriptionService.update(subscription5Id, parsedInput, listOf(APIC_COMPOUND_CONTEXT)) - .shouldFail { it is BadRequestDataException } + val notificationTriggers = arrayListOf( + ENTITY_CREATED.notificationTrigger, + ENTITY_DELETED.notificationTrigger + ) + val parsedInput = mapOf( + "type" to NGSILD_SUBSCRIPTION_TERM, + "notificationTrigger" to notificationTriggers + ) + subscriptionService.update(subscription.id, parsedInput, listOf(APIC_COMPOUND_CONTEXT)) + + val updatedSubscription = subscriptionService.getById(subscription.id) + assertThat(updatedSubscription) + .matches { + it.notificationTrigger == notificationTriggers && + it.modifiedAt != null + } } @Test - fun `it should throw a NotImplemented exception if the subscription has an unsupported attribute`() = runTest { - val parsedInput = mapOf("throttling" to "someValue") + fun `it should return a BadRequestData exception if the subscription has an unknown attribute`() = runTest { + val subscription = loadAndDeserializeSubscription("subscription_minimal_entities.json") + subscriptionService.create(subscription, mockUserSub).shouldSucceed() + + val parsedInput = mapOf("type" to NGSILD_SUBSCRIPTION_TERM, "unknownAttribute" to "unknownValue") - subscriptionService.update(subscription5Id, parsedInput, listOf(APIC_COMPOUND_CONTEXT)) - .shouldFail { it is NotImplementedException } + subscriptionService.update(subscription.id, parsedInput, listOf(APIC_COMPOUND_CONTEXT)) + .shouldFailWith { + it is BadRequestDataException && + it.message == "Subscription urn:ngsi-ld:Subscription:1 has invalid attribute: unknownAttribute" + } } @Test - fun `it should update a subscription with a notification result`() = runTest { - val persistedSubscription = subscriptionService.getById(subscription1Id) - val notification = Notification(subscriptionId = subscription1Id, data = emptyList()) + fun `it should return a NotImplemented exception if the subscription has an unsupported attribute`() = runTest { + val subscription = loadAndDeserializeSubscription("subscription_minimal_entities.json") + subscriptionService.create(subscription, mockUserSub).shouldSucceed() - subscriptionService.updateSubscriptionNotification(persistedSubscription, notification, true) + val parsedInput = mapOf("type" to NGSILD_SUBSCRIPTION_TERM, "throttling" to "someValue") - val subscription = subscriptionService.getById(subscription1Id) - assertThat(subscription) - .matches { - it.id == subscription1Id && - it.notification.status == StatusType.OK && - it.notification.timesSent == 1 && - it.notification.lastNotification != null && - it.notification.lastSuccess != null && - it.notification.lastFailure == null + subscriptionService.update(subscription.id, parsedInput, listOf(APIC_COMPOUND_CONTEXT)) + .shouldFailWith { + it is NotImplementedException && + it.message == "Subscription urn:ngsi-ld:Subscription:1 has unsupported attribute: throttling" } } @@ -894,7 +1004,10 @@ class SubscriptionServiceTests : WithTimescaleContainer { @Test fun `it shoud return true if a subscription has no geoquery`() = runTest { - subscriptionService.isMatchingGeoQuery(subscription1Id, jsonldEntity) + val subscription = loadAndDeserializeSubscription("subscription_minimal_entities.json") + subscriptionService.create(subscription, mockUserSub).shouldSucceed() + + subscriptionService.isMatchingGeoQuery(subscription.id, jsonldEntity) .shouldSucceedWith { assertTrue(it) } } @@ -917,11 +1030,15 @@ class SubscriptionServiceTests : WithTimescaleContainer { coordinates: String, expectedResult: Boolean ) = runTest { - val subscription = gimmeRawSubscription( - withQueryAndGeoQuery = Pair(false, true), - georel = georel, - geometry = geometry, - coordinates = coordinates + val subscription = gimmeSubscriptionFromMembers( + mapOf( + "watchedAttributes" to listOf(INCOMING_COMPACT_PROPERTY), + "geoQ" to mapOf( + "georel" to georel, + "geometry" to geometry, + "coordinates" to coordinates + ) + ) ) subscriptionService.create(subscription, mockUserSub).shouldSucceed() @@ -931,67 +1048,59 @@ class SubscriptionServiceTests : WithTimescaleContainer { @Test fun `it should return all subscriptions whose 'timeInterval' is reached `() = runTest { - val subscription = gimmeRawSubscription( - withNotifParams = Pair(FormatType.NORMALIZED, listOf(INCOMING_PROPERTY)), - timeInterval = 500 - ).copy( - entities = setOf( - EntityInfo(id = null, idPattern = null, type = BEEHIVE_TYPE) + val subscription = gimmeSubscriptionFromMembers( + mapOf( + "entities" to listOf(mapOf("type" to BEEKEEPER_COMPACT_TYPE)), + "timeInterval" to 500 ) ) + subscriptionService.create(subscription, mockUserSub).shouldSucceed() - val subscriptionId = createSubscription(subscription) - - val subscription2 = gimmeRawSubscription( - withNotifParams = Pair(FormatType.NORMALIZED, listOf(INCOMING_PROPERTY)), - timeInterval = 5000 - ).copy( - entities = setOf( - EntityInfo(id = null, idPattern = null, type = BEEKEEPER_TYPE) + val subscription2 = gimmeSubscriptionFromMembers( + mapOf( + "id" to "urn:ngsi-ld:Subscription:02".toUri(), + "entities" to listOf(mapOf("type" to BEEKEEPER_COMPACT_TYPE)), + "timeInterval" to 5000 ) ) - val subscriptionId2 = createSubscription(subscription2) + subscriptionService.create(subscription2, mockUserSub).shouldSucceed() - val persistedSubscription = subscriptionService.getById(subscriptionId) - val notification = Notification(subscriptionId = subscriptionId, data = emptyList()) + val persistedSubscription = subscriptionService.getById(subscription.id) + val notification = Notification(subscriptionId = subscription.id, data = emptyList()) subscriptionService.updateSubscriptionNotification(persistedSubscription, notification, true) val subscriptionsToNotify = subscriptionService.getRecurringSubscriptionsToNotify() assertEquals(1, subscriptionsToNotify.size) - subscriptionService.delete(subscriptionId) - subscriptionService.delete(subscriptionId2) + subscriptionService.delete(subscription.id) + subscriptionService.delete(subscription2.id) } @Test fun `it should return all subscriptions whose 'timeInterval' is reached with a time of 5s`() = runTest { - val subscription = gimmeRawSubscription( - withNotifParams = Pair(FormatType.NORMALIZED, listOf(INCOMING_PROPERTY)), - timeInterval = 1 - ).copy( - entities = setOf( - EntityInfo(id = null, idPattern = null, type = BEEHIVE_TYPE) + val subscription = gimmeSubscriptionFromMembers( + mapOf( + "entities" to listOf(mapOf("type" to BEEKEEPER_COMPACT_TYPE)), + "timeInterval" to 1 ) ) + subscriptionService.create(subscription, mockUserSub).shouldSucceed() - val subscriptionId1 = createSubscription(subscription) - - val subscription2 = gimmeRawSubscription( - withNotifParams = Pair(FormatType.NORMALIZED, listOf(INCOMING_PROPERTY)), - timeInterval = 5000 - ).copy( - entities = setOf( - EntityInfo(id = null, idPattern = null, type = BEEKEEPER_TYPE) + val subscription2 = gimmeSubscriptionFromMembers( + mapOf( + "id" to "urn:ngsi-ld:Subscription:02".toUri(), + "entities" to listOf(mapOf("type" to BEEKEEPER_COMPACT_TYPE)), + "timeInterval" to 5000 ) ) - val subscriptionId2 = createSubscription(subscription2) + subscriptionService.create(subscription2, mockUserSub).shouldSucceed() - val persistedSubscription = subscriptionService.getById(subscriptionId1) - val notification = Notification(subscriptionId = subscriptionId1, data = emptyList()) + val persistedSubscription = subscriptionService.getById(subscription.id) + val notification = Notification(subscriptionId = subscription.id, data = emptyList()) - val persistedSubscription2 = subscriptionService.getById(subscriptionId2) - val notification2 = Notification(subscriptionId = subscriptionId2, data = emptyList()) + val persistedSubscription2 = subscriptionService.getById(subscription2.id) + val notification2 = Notification(subscriptionId = subscription2.id, data = emptyList()) subscriptionService.updateSubscriptionNotification(persistedSubscription, notification, true) @@ -1003,13 +1112,16 @@ class SubscriptionServiceTests : WithTimescaleContainer { val subscriptionsToNotify = subscriptionService.getRecurringSubscriptionsToNotify() assertEquals(1, subscriptionsToNotify.size) - subscriptionService.delete(subscriptionId1) - subscriptionService.delete(subscriptionId2) + subscriptionService.delete(subscription.id) + subscriptionService.delete(subscription2.id) } @Test - fun `it should retrieve a context of subscription`() = runTest { - subscriptionService.getContextsForSubscription(subscription7Id) + fun `it should retrieve the JSON-LD contexts of subscription`() = runTest { + val subscription = loadAndDeserializeSubscription("subscription.jsonld") + subscriptionService.create(subscription, mockUserSub).shouldSucceed() + + subscriptionService.getContextsForSubscription(subscription.id) .shouldSucceedWith { assertEquals(listOf(APIC_COMPOUND_CONTEXT), it) } @@ -1017,7 +1129,7 @@ class SubscriptionServiceTests : WithTimescaleContainer { @Test fun `it should return a link to contexts endpoint if subscription has more than one context`() = runTest { - val subscription = gimmeRawSubscription().copy( + val subscription = loadAndDeserializeSubscription("subscription_minimal_entities.json").copy( contexts = listOf(APIC_COMPOUND_CONTEXT, NGSILD_CORE_CONTEXT) ) diff --git a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/utils/FixtureUtils.kt b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/utils/FixtureUtils.kt index 24c7d575c..1dbd61207 100644 --- a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/utils/FixtureUtils.kt +++ b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/utils/FixtureUtils.kt @@ -1,13 +1,36 @@ package com.egm.stellio.subscription.utils +import com.egm.stellio.shared.util.APIC_COMPOUND_CONTEXT import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_CORE_CONTEXT import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_SUBSCRIPTION_TERM +import com.egm.stellio.shared.util.JsonUtils.deserializeAsMap +import com.egm.stellio.shared.util.loadSampleData +import com.egm.stellio.shared.util.shouldSucceedAndResult import com.egm.stellio.shared.util.toUri import com.egm.stellio.subscription.model.* import com.egm.stellio.subscription.model.NotificationParams.FormatType import java.time.Instant import java.time.ZoneOffset +fun loadAndDeserializeSubscription(filename: String, context: String = APIC_COMPOUND_CONTEXT): Subscription { + val subscriptionPayload = loadSampleData(filename) + return ParsingUtils.parseSubscription(subscriptionPayload.deserializeAsMap(), listOf(context)) + .shouldSucceedAndResult() +} + +fun gimmeSubscriptionFromMembers( + additionalMembers: Map, + contexts: List = listOf(APIC_COMPOUND_CONTEXT) +): Subscription { + val payload = mapOf( + "id" to "urn:ngsi-ld:Subscription:01".toUri(), + "type" to NGSILD_SUBSCRIPTION_TERM, + "notification" to mapOf("endpoint" to mapOf("uri" to "http://my.endpoint/notifiy")) + ).plus(additionalMembers) + + return ParsingUtils.parseSubscription(payload, contexts).shouldSucceedAndResult() +} + fun gimmeRawSubscription( withQueryAndGeoQuery: Pair = Pair(true, true), withEndpointInfo: Boolean = true, diff --git a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/web/SubscriptionHandlerTests.kt b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/web/SubscriptionHandlerTests.kt index 77e5663ae..723b5ea82 100644 --- a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/web/SubscriptionHandlerTests.kt +++ b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/web/SubscriptionHandlerTests.kt @@ -11,7 +11,10 @@ import com.egm.stellio.shared.util.JsonUtils.deserializeAsMap import com.egm.stellio.subscription.service.SubscriptionService import com.egm.stellio.subscription.utils.gimmeRawSubscription import com.ninjasquad.springmockk.MockkBean -import io.mockk.* +import io.mockk.Called +import io.mockk.coEvery +import io.mockk.coVerify +import io.mockk.confirmVerified import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.test.runTest import org.hamcrest.core.Is @@ -170,7 +173,7 @@ class SubscriptionHandlerTests { @Test fun `create subscription should return a 201 if JSON-LD payload is correct`() = runTest { - val jsonLdFile = ClassPathResource("/ngsild/subscription.json") + val jsonLdFile = ClassPathResource("/ngsild/subscription.jsonld") coEvery { subscriptionService.validateNewSubscription(any()) } returns Unit.right() coEvery { subscriptionService.exists(any()) } returns false.right() @@ -186,7 +189,7 @@ class SubscriptionHandlerTests { @Test fun `create subscription should return a 409 if the subscription already exists`() = runTest { - val jsonLdFile = ClassPathResource("/ngsild/subscription.json") + val jsonLdFile = ClassPathResource("/ngsild/subscription.jsonld") coEvery { subscriptionService.validateNewSubscription(any()) } returns Unit.right() coEvery { subscriptionService.exists(any()) } returns true.right() @@ -205,7 +208,7 @@ class SubscriptionHandlerTests { @Test fun `create subscription should return a 500 error if internal server Error`() = runTest { - val jsonLdFile = ClassPathResource("/ngsild/subscription.json") + val jsonLdFile = ClassPathResource("/ngsild/subscription.jsonld") coEvery { subscriptionService.validateNewSubscription(any()) } returns Unit.right() coEvery { subscriptionService.exists(any()) } returns false.right() @@ -250,7 +253,7 @@ class SubscriptionHandlerTests { @Test fun `create subscription should return a 415 if the content type is not correct`() { - val jsonLdFile = ClassPathResource("/ngsild/subscription.json") + val jsonLdFile = ClassPathResource("/ngsild/subscription.jsonld") webClient.post() .uri("/ngsi-ld/v1/subscriptions") @@ -269,7 +272,7 @@ class SubscriptionHandlerTests { coEvery { subscriptionService.exists(any()) } returns false.right() coEvery { subscriptionService.create(any(), any()) - } returns BadRequestDataException("You can't use 'timeInterval' with 'watchedAttributes' in conjunction").left() + } returns BadRequestDataException("You can't use 'timeInterval' in conjunction with 'watchedAttributes'").left() @Suppress("MaxLineLength") webClient.post() @@ -282,7 +285,7 @@ class SubscriptionHandlerTests { { "type":"https://uri.etsi.org/ngsi-ld/errors/BadRequestData", "title":"The request includes input data which does not meet the requirements of the operation", - "detail":"You can't use 'timeInterval' with 'watchedAttributes' in conjunction" + "detail":"You can't use 'timeInterval' in conjunction with 'watchedAttributes'" } """.trimIndent() ) @@ -482,7 +485,7 @@ class SubscriptionHandlerTests { @Test fun `update subscription should return a 204 if JSON-LD payload is correct`() = runTest { - val jsonLdFile = ClassPathResource("/ngsild/subscription_update.json") + val jsonLdFile = ClassPathResource("/ngsild/subscription_update.jsonld") val subscriptionId = subscriptionId val parsedSubscription = jsonLdFile.inputStream.readBytes().toString(Charsets.UTF_8).deserializeAsMap() @@ -505,7 +508,7 @@ class SubscriptionHandlerTests { @Test fun `update subscription should return a 500 if update in DB failed`() = runTest { - val jsonLdFile = ClassPathResource("/ngsild/subscription_update.json") + val jsonLdFile = ClassPathResource("/ngsild/subscription_update.jsonld") val subscriptionId = subscriptionId val parsedSubscription = jsonLdFile.inputStream.readBytes().toString(Charsets.UTF_8).deserializeAsMap() @@ -537,7 +540,7 @@ class SubscriptionHandlerTests { @Test fun `update subscription should return a 404 if subscription to be updated has not been found`() = runTest { - val jsonLdFile = ClassPathResource("/ngsild/subscription_update.json") + val jsonLdFile = ClassPathResource("/ngsild/subscription_update.jsonld") val subscriptionId = subscriptionId coEvery { subscriptionService.exists(any()) } returns false.right() @@ -558,8 +561,7 @@ class SubscriptionHandlerTests { @Test fun `update subscription should return a 400 if JSON-LD context is not correct`() = runTest { - val jsonLdFile = ClassPathResource("/ngsild/subscription_update_incorrect_payload.json") - val subscriptionId = subscriptionId + val jsonLdFile = ClassPathResource("/ngsild/subscription_update.json") coEvery { subscriptionService.exists(any()) } returns true.right() coEvery { subscriptionService.isCreatorOf(any(), any()) } returns true.right() @@ -586,7 +588,7 @@ class SubscriptionHandlerTests { @Test fun `update subscription should return a 403 if subscription does not belong to the user`() = runTest { - val jsonLdFile = ClassPathResource("/ngsild/subscription_update.json") + val jsonLdFile = ClassPathResource("/ngsild/subscription_update.jsonld") val subscriptionId = subscriptionId coEvery { subscriptionService.exists(any()) } returns true.right() diff --git a/subscription-service/src/test/resources/ngsild/subscription.json b/subscription-service/src/test/resources/ngsild/subscription.jsonld similarity index 100% rename from subscription-service/src/test/resources/ngsild/subscription.json rename to subscription-service/src/test/resources/ngsild/subscription.jsonld diff --git a/subscription-service/src/test/resources/ngsild/subscription_full.json b/subscription-service/src/test/resources/ngsild/subscription_full.json new file mode 100644 index 000000000..4a18ba265 --- /dev/null +++ b/subscription-service/src/test/resources/ngsild/subscription_full.json @@ -0,0 +1,39 @@ +{ + "id": "urn:ngsi-ld:Subscription:1", + "type": "Subscription", + "subscriptionName": "A subscription with all possible members", + "description": "A possible description", + "entities": [ + { + "type": "BeeHive" + }, + { "id": "urn:ngsi-ld:Beehive:1234567890", + "type": "BeeHive" + }, + { + "idPattern": "urn:ngsi-ld:Beehive:1234*", + "type": "BeeHive" + } + ], + "watchedAttributes": ["incoming"], + "notificationTrigger": ["entityCreated", "attributeUpdated", "entityDeleted"], + "q": "foodQuantity<150;foodName=='dietary fibres'", + "geoQ": { + "georel": "within", + "geometry": "Polygon", + "coordinates": "[[[100.0, 0.0], [101.0, 0.0], [101.0, 1.0], [100.0, 1.0], [100.0, 0.0]]]" + }, + "scopeQ": "/Nantes/+", + "notification": { + "attributes": ["incoming", "outgoing"], + "format": "normalized", + "endpoint": { + "uri": "http://localhost:8084", + "accept": "application/json", + "info": [ + { "key": "Authorization-token", "value": "Authorization-token-value" } + ] + } + }, + "expiresAt": "2100-01-01T00:00:00Z" +} diff --git a/subscription-service/src/test/resources/ngsild/subscription_minimal_entities.json b/subscription-service/src/test/resources/ngsild/subscription_minimal_entities.json new file mode 100644 index 000000000..8310be19e --- /dev/null +++ b/subscription-service/src/test/resources/ngsild/subscription_minimal_entities.json @@ -0,0 +1,14 @@ +{ + "id":"urn:ngsi-ld:Subscription:1", + "type":"Subscription", + "entities": [ + { + "type": "BeeHive" + } + ], + "notification": { + "endpoint": { + "uri": "http://localhost:8084" + } + } +} diff --git a/subscription-service/src/test/resources/ngsild/subscription_minimal_watched_attributes.json b/subscription-service/src/test/resources/ngsild/subscription_minimal_watched_attributes.json new file mode 100644 index 000000000..bb8228b16 --- /dev/null +++ b/subscription-service/src/test/resources/ngsild/subscription_minimal_watched_attributes.json @@ -0,0 +1,10 @@ +{ + "id":"urn:ngsi-ld:Subscription:1", + "type":"Subscription", + "watchedAttributes": ["incoming", "outgoing"], + "notification": { + "endpoint": { + "uri": "http://localhost:8084" + } + } +} diff --git a/subscription-service/src/test/resources/ngsild/subscription_update.json b/subscription-service/src/test/resources/ngsild/subscription_update.json index 823bd08ec..eae1001d1 100644 --- a/subscription-service/src/test/resources/ngsild/subscription_update.json +++ b/subscription-service/src/test/resources/ngsild/subscription_update.json @@ -3,7 +3,7 @@ "geoQ": { "georel": "within", "geometry": "Polygon", - "coordinates": "[[100.0, 0.0], [101.0, 0.0], [101.0, 1.0], [100.0, 1.0], [100.0, 0.0]]" + "coordinates": "[[[100.0, 0.0], [101.0, 0.0], [101.0, 1.0], [100.0, 1.0], [100.0, 0.0]]]" }, "subscriptionName": "Beehive_subscription", "notification": { @@ -13,8 +13,5 @@ "uri": "http://localhost:8084", "accept": "application/ld+json" } - }, - "@context":[ - "https://raw.githubusercontent.com/easy-global-market/ngsild-api-data-models/master/apic/jsonld-contexts/apic-compound.jsonld" - ] + } } diff --git a/subscription-service/src/test/resources/ngsild/subscription_update_incorrect_payload.json b/subscription-service/src/test/resources/ngsild/subscription_update.jsonld similarity index 72% rename from subscription-service/src/test/resources/ngsild/subscription_update_incorrect_payload.json rename to subscription-service/src/test/resources/ngsild/subscription_update.jsonld index 2e4303a2a..823bd08ec 100644 --- a/subscription-service/src/test/resources/ngsild/subscription_update_incorrect_payload.json +++ b/subscription-service/src/test/resources/ngsild/subscription_update.jsonld @@ -13,5 +13,8 @@ "uri": "http://localhost:8084", "accept": "application/ld+json" } - } + }, + "@context":[ + "https://raw.githubusercontent.com/easy-global-market/ngsild-api-data-models/master/apic/jsonld-contexts/apic-compound.jsonld" + ] } diff --git a/subscription-service/src/test/resources/ngsild/subscription_with_conflicting_timeInterval_watchedAttributes.json b/subscription-service/src/test/resources/ngsild/subscription_with_conflicting_timeInterval_watchedAttributes.json index d04945f3f..f7f587aeb 100644 --- a/subscription-service/src/test/resources/ngsild/subscription_with_conflicting_timeInterval_watchedAttributes.json +++ b/subscription-service/src/test/resources/ngsild/subscription_with_conflicting_timeInterval_watchedAttributes.json @@ -1,27 +1,11 @@ { "id":"urn:ngsi-ld:Subscription:1", "type":"Subscription", - "entities": [ - { - "type": "Beehive" - }, - { "id": "urn:ngsi-ld:Beehive:1234567890", - "type": "Beehive" - }, - { - "idPattern": "urn:ngsi-ld:Beehive:1234*", - "type": "Beehive" - } - ], "timeInterval": 1, "watchedAttributes": ["incoming"], - "q": "foodQuantity<150;foodName=='dietary fibres'", "notification": { - "attributes": ["incoming"], - "format": "normalized", "endpoint": { - "uri": "http://localhost:8084", - "accept": "application/json" + "uri": "http://localhost:8084" } }, "@context":[ diff --git a/subscription-service/src/test/resources/ngsild/subscription_with_time_interval_less_than_0.json b/subscription-service/src/test/resources/ngsild/subscription_with_time_interval_less_than_0.json deleted file mode 100644 index ac7f3bd65..000000000 --- a/subscription-service/src/test/resources/ngsild/subscription_with_time_interval_less_than_0.json +++ /dev/null @@ -1,32 +0,0 @@ -{ - "id":"urn:ngsi-ld:Subscription:1", - "type":"Subscription", - "entities": [ - { - "type": "Beehive" - }, - { - "id": "urn:ngsi-ld:Beehive:1234567890", - "type": "Beehive" - }, - { - "idPattern": "urn:ngsi-ld:Beehive:1234*", - "type": "Beehive" - } - ], - "timeInterval": -1, - "q": "foodQuantity<150;foodName=='dietary fibres'", - "notification": { - "attributes": [ - "incoming" - ], - "format": "normalized", - "endpoint": { - "uri": "http://localhost:8084", - "accept": "application/json" - } - }, - "@context": [ - "https://raw.githubusercontent.com/easy-global-market/ngsild-api-data-models/master/apic/jsonld-contexts/apic-compound.jsonld" - ] -}