Skip to content

Commit

Permalink
adding sysAttrs to notification params
Browse files Browse the repository at this point in the history
  • Loading branch information
ranim-n committed Feb 14, 2024
1 parent 3385c2b commit 4d34007
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ data class NotificationParams(
val timesSent: Int = 0,
val lastNotification: ZonedDateTime? = null,
val lastFailure: ZonedDateTime? = null,
val lastSuccess: ZonedDateTime? = null
val lastSuccess: ZonedDateTime? = null,
val sysAttrs: Boolean = false
) {
enum class FormatType(val format: String) {
@JsonProperty("keyValues")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ class NotificationService(
ExpandedEntity(filteredEntity, it.contexts),
it.contexts
).toFinalRepresentation(
NgsiLdDataRepresentation(entityRepresentation, attributeRepresentation, true)
if (it.notification.sysAttrs)
NgsiLdDataRepresentation(entityRepresentation, attributeRepresentation, true)
else NgsiLdDataRepresentation(entityRepresentation, attributeRepresentation, false)

)
callSubscriber(it, compactedEntity)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,11 @@ class SubscriptionService(
INSERT INTO subscription(id, type, subscription_name, created_at, description, watched_attributes,
notification_trigger, time_interval, q, scope_q, notif_attributes, notif_format, endpoint_uri,
endpoint_accept, endpoint_receiver_info, endpoint_notifier_info, times_sent, is_active,
expires_at, sub, contexts, throttling)
expires_at, sub, contexts, throttling, sys_attrs)
VALUES(:id, :type, :subscription_name, :created_at, :description, :watched_attributes,
:notification_trigger, :time_interval, :q, :scope_q, :notif_attributes, :notif_format, :endpoint_uri,
:endpoint_accept, :endpoint_receiver_info, :endpoint_notifier_info, :times_sent, :is_active,
:expires_at, :sub, :contexts, :throttling)
:expires_at, :sub, :contexts, :throttling, :sys_attrs)
""".trimIndent()

databaseClient.sql(insertStatement)
Expand All @@ -179,6 +179,7 @@ class SubscriptionService(
.bind("sub", sub.toStringValue())
.bind("contexts", subscription.contexts.toTypedArray())
.bind("throttling", subscription.throttling)
.bind("sys_attrs" , subscription.notification.sysAttrs)
.execute().bind()

geoQuery?.let {
Expand Down Expand Up @@ -249,7 +250,7 @@ class SubscriptionService(
notif_format, endpoint_uri, endpoint_accept, endpoint_receiver_info, endpoint_notifier_info, status,
times_sent, is_active, last_notification, last_failure, last_success, entity_selector.id as entity_id,
id_pattern, entity_selector.type_selection as type_selection, georel, geometry, coordinates,
pgis_geometry, geoproperty, scope_q, expires_at, contexts, throttling
pgis_geometry, geoproperty, scope_q, expires_at, contexts, throttling, sys_attrs
FROM subscription
LEFT JOIN entity_selector ON entity_selector.subscription_id = :id
LEFT JOIN geometry_query ON geometry_query.subscription_id = :id
Expand Down Expand Up @@ -539,7 +540,7 @@ class SubscriptionService(
entity_selector.id as entity_id, entity_selector.id_pattern as id_pattern,
entity_selector.type_selection as type_selection, georel, geometry, coordinates, pgis_geometry,
geoproperty, scope_q, notif_attributes, notif_format, endpoint_uri, endpoint_accept, times_sent,
endpoint_receiver_info, endpoint_notifier_info, contexts, throttling
endpoint_receiver_info, endpoint_notifier_info, contexts, throttling, sys_attrs
FROM subscription
LEFT JOIN entity_selector on subscription.id = entity_selector.subscription_id
LEFT JOIN geometry_query on subscription.id = geometry_query.subscription_id
Expand Down Expand Up @@ -686,7 +687,8 @@ class SubscriptionService(
timesSent = row["times_sent"] as Int,
lastNotification = toNullableZonedDateTime(row["last_notification"]),
lastFailure = toNullableZonedDateTime(row["last_failure"]),
lastSuccess = toNullableZonedDateTime(row["last_success"])
lastSuccess = toNullableZonedDateTime(row["last_success"]),
sysAttrs = row["sys_attrs"] as Boolean
),
isActive = toBoolean(row["is_active"]),
contexts = toList(row["contexts"]),
Expand Down Expand Up @@ -717,7 +719,8 @@ class SubscriptionService(
timesSent = row["times_sent"] as Int,
lastNotification = null,
lastFailure = null,
lastSuccess = null
lastSuccess = null,
sysAttrs = row["sys_attrs"] as Boolean
),
contexts = toList(row["contexts"]),
throttling = toNullableInt(row["throttling"])
Expand Down Expand Up @@ -756,7 +759,7 @@ class SubscriptionService(
scope_q, notif_attributes, notif_format, endpoint_uri, endpoint_accept, endpoint_receiver_info,
endpoint_notifier_info, status, times_sent, last_notification, last_failure, last_success, is_active,
entity_selector.id as entity_id, id_pattern, entity_selector.type_selection as type_selection, georel,
geometry, coordinates, pgis_geometry, geoproperty, contexts, throttling
geometry, coordinates, pgis_geometry, geoproperty, contexts, throttling, sys_attrs
FROM subscription
LEFT JOIN entity_selector ON entity_selector.subscription_id = subscription.id
LEFT JOIN geometry_query ON geometry_query.subscription_id = subscription.id
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE subscription
ADD sys_attrs boolean;
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import com.egm.stellio.subscription.model.NotificationParams
import com.egm.stellio.subscription.model.NotificationParams.FormatType
import com.egm.stellio.subscription.model.NotificationTrigger.*
import com.egm.stellio.subscription.support.gimmeRawSubscription
import com.egm.stellio.subscription.utils.ParsingUtils
import com.github.tomakehurst.wiremock.client.WireMock.*
import com.github.tomakehurst.wiremock.junit5.WireMockTest
import com.ninjasquad.springmockk.MockkBean
Expand All @@ -21,8 +22,7 @@ import io.mockk.confirmVerified
import kotlinx.coroutines.reactor.awaitSingle
import kotlinx.coroutines.reactor.mono
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
Expand Down Expand Up @@ -69,6 +69,24 @@ class NotificationServiceTests {
}
""".trimIndent()

private val entity =
"""
{
"id":"$apiaryId",
"type":"Apiary",
"createdAt": "2024-02-13T18:15:00Z",
"modifiedAt": "2024-02-13T18:16:00Z",
"name": {
"type":"Property",
"value":"ApiarySophia",
"createdAt": "2024-02-13T18:15:00Z",
"modifiedAt": "2024-02-13T18:16:00Z"
},
"@context":[ "$APIC_COMPOUND_CONTEXT" ]
}
""".trimIndent()


@Test
fun `it should notify the subscriber and update the subscription`() = runTest {
val subscription = gimmeRawSubscription()
Expand Down Expand Up @@ -404,4 +422,120 @@ class NotificationServiceTests {

verify(1, postRequestedFor(urlPathEqualTo("/notification")))
}

@Test
fun `it should notify the subscriber and return entities without system attributes if sysAttrs is false`() = runTest {

val payload = mapOf(
"id" to "urn:ngsi-ld:Subscription:1",
"type" to JsonLdUtils.NGSILD_SUBSCRIPTION_TERM,
"entities" to listOf(
mapOf(
"id" to apiaryId,
"type" to "Apiary",
"@context" to APIC_COMPOUND_CONTEXT
)
) ,
"notification" to mapOf(
"endpoint" to mapOf(
"accept" to "application/ld+json",
"uri" to "http://localhost:8089/notification"),
"sysAttrs" to false
)
)


val subscription = ParsingUtils.parseSubscription(payload, emptyList()).shouldSucceedAndResult()
val expandedEntity = expandJsonLdEntity(entity)

coEvery {
subscriptionService.getMatchingSubscriptions(any(), any(), any())
} returns listOf(subscription).right()
coEvery { subscriptionService.updateSubscriptionNotification(any(), any(), any()) } returns 1

stubFor(
post(urlMatching("/notification"))
.willReturn(ok())
)

notificationService.notifyMatchingSubscribers(
expandedEntity,
setOf(NGSILD_NAME_PROPERTY),
ATTRIBUTE_UPDATED
).shouldSucceedWith {
val entity = it[0].second.data[0]
assertFalse(entity.containsKey("createdAt"))
assertFalse(entity.containsKey("modifiedAt"))

for ((key, value) in entity) {
if (value !is Map<*, *>) {
continue
}
assertFalse((value as Map<*, *>).containsKey("createdAt"))
assertFalse((value as Map<*, *>).containsKey("modifiedAt"))

}
}
}

@Test
fun `it should notify the subscriber and return entities with system attributes if sysAttrs is true`() = runTest {

val payload = mapOf(
"id" to "urn:ngsi-ld:Subscription:1",
"type" to JsonLdUtils.NGSILD_SUBSCRIPTION_TERM,
"entities" to listOf(
mapOf(
"id" to apiaryId,
"type" to "Apiary",
"createdAt" to "2024-02-13T18:15:00Z",
"modifiedAt" to "2024-02-13T18:16:00Z",
"name" to mapOf(
"type" to "Property",
"value" to "ApiarySophia",
"createdAt" to "2024-02-13T18:15:00Z",
"modifiedAt" to "2024-02-13T18:16:00Z"
),
"@context" to APIC_COMPOUND_CONTEXT
)
) ,
"notification" to mapOf(
"endpoint" to mapOf(
"accept" to "application/ld+json",
"uri" to "http://localhost:8089/notification"),
"sysAttrs" to true
)
)


val subscription = ParsingUtils.parseSubscription(payload, emptyList()).shouldSucceedAndResult()
val expandedEntity = expandJsonLdEntity(entity)

coEvery {
subscriptionService.getMatchingSubscriptions(any(), any(), any())
} returns listOf(subscription).right()
coEvery { subscriptionService.updateSubscriptionNotification(any(), any(), any()) } returns 1

stubFor(
post(urlMatching("/notification"))
.willReturn(ok())
)

notificationService.notifyMatchingSubscribers(
expandedEntity,
setOf(NGSILD_NAME_PROPERTY),
ATTRIBUTE_UPDATED
).shouldSucceedWith {
val entity = it[0].second.data[0]
assertTrue(entity.containsKey("createdAt"))
assertTrue(entity.containsKey("modifiedAt"))

for ((_, attributeValue) in entity) {
val attribute = attributeValue as Map<String, Any?>
assertTrue(attribute.containsKey("createdAt"))
assertTrue(attribute.containsKey("modifiedAt"))

}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer {
) &&
it.expiresAt == ZonedDateTime.parse("2100-01-01T00:00:00Z") &&
it.throttling == 60
it.notification.sysAttrs == true
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
"receiverInfo": [
{ "key": "Authorization-token", "value": "Authorization-token-value" }
]
}
},
"sysAttrs": true
},
"expiresAt": "2100-01-01T00:00:00Z",
"throttling": 60
Expand Down

0 comments on commit 4d34007

Please sign in to comment.