Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(subscription): add support for sysAttrs to notification params #1098

Merged
merged 7 commits into from
Mar 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ data class NotificationParams(
val timesSent: Int = 0,
val lastNotification: ZonedDateTime? = null,
val lastFailure: ZonedDateTime? = null,
val lastSuccess: ZonedDateTime? = null
val lastSuccess: ZonedDateTime? = null,
@JsonInclude(value = JsonInclude.Include.NON_DEFAULT)
val sysAttrs: Boolean = false
) {
enum class FormatType(val format: String) {
@JsonProperty("keyValues")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ class NotificationService(
ExpandedEntity(filteredEntity, it.contexts),
it.contexts
).toFinalRepresentation(
NgsiLdDataRepresentation(entityRepresentation, attributeRepresentation, true)
NgsiLdDataRepresentation(
entityRepresentation,
attributeRepresentation,
it.notification.sysAttrs
)
)
callSubscriber(it, compactedEntity)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,11 @@ class SubscriptionService(
INSERT INTO subscription(id, type, subscription_name, created_at, description, watched_attributes,
notification_trigger, time_interval, q, scope_q, notif_attributes, notif_format, endpoint_uri,
endpoint_accept, endpoint_receiver_info, endpoint_notifier_info, times_sent, is_active,
expires_at, sub, contexts, throttling)
expires_at, sub, contexts, throttling, sys_attrs)
VALUES(:id, :type, :subscription_name, :created_at, :description, :watched_attributes,
:notification_trigger, :time_interval, :q, :scope_q, :notif_attributes, :notif_format, :endpoint_uri,
:endpoint_accept, :endpoint_receiver_info, :endpoint_notifier_info, :times_sent, :is_active,
:expires_at, :sub, :contexts, :throttling)
:expires_at, :sub, :contexts, :throttling, :sys_attrs)
""".trimIndent()

databaseClient.sql(insertStatement)
Expand All @@ -179,6 +179,7 @@ class SubscriptionService(
.bind("sub", sub.toStringValue())
.bind("contexts", subscription.contexts.toTypedArray())
.bind("throttling", subscription.throttling)
.bind("sys_attrs", subscription.notification.sysAttrs)
.execute().bind()

geoQuery?.let {
Expand Down Expand Up @@ -249,7 +250,7 @@ class SubscriptionService(
notif_format, endpoint_uri, endpoint_accept, endpoint_receiver_info, endpoint_notifier_info, status,
times_sent, is_active, last_notification, last_failure, last_success, entity_selector.id as entity_id,
id_pattern, entity_selector.type_selection as type_selection, georel, geometry, coordinates,
pgis_geometry, geoproperty, scope_q, expires_at, contexts, throttling
pgis_geometry, geoproperty, scope_q, expires_at, contexts, throttling, sys_attrs
FROM subscription
LEFT JOIN entity_selector ON entity_selector.subscription_id = :id
LEFT JOIN geometry_query ON geometry_query.subscription_id = :id
Expand Down Expand Up @@ -497,7 +498,7 @@ class SubscriptionService(
notif_format, endpoint_uri, endpoint_accept, endpoint_receiver_info, endpoint_notifier_info, status,
times_sent, is_active, last_notification, last_failure, last_success, entity_selector.id as entity_id,
id_pattern, entity_selector.type_selection as type_selection, georel, geometry, coordinates,
pgis_geometry, geoproperty, scope_q, expires_at, contexts, throttling
pgis_geometry, geoproperty, scope_q, expires_at, contexts, throttling, sys_attrs
FROM subscription
LEFT JOIN entity_selector ON entity_selector.subscription_id = subscription.id
LEFT JOIN geometry_query ON geometry_query.subscription_id = subscription.id
Expand Down Expand Up @@ -539,7 +540,7 @@ class SubscriptionService(
entity_selector.id as entity_id, entity_selector.id_pattern as id_pattern,
entity_selector.type_selection as type_selection, georel, geometry, coordinates, pgis_geometry,
geoproperty, scope_q, notif_attributes, notif_format, endpoint_uri, endpoint_accept, times_sent,
endpoint_receiver_info, endpoint_notifier_info, contexts, throttling
endpoint_receiver_info, endpoint_notifier_info, contexts, throttling, sys_attrs
FROM subscription
LEFT JOIN entity_selector on subscription.id = entity_selector.subscription_id
LEFT JOIN geometry_query on subscription.id = geometry_query.subscription_id
Expand Down Expand Up @@ -687,7 +688,8 @@ class SubscriptionService(
timesSent = row["times_sent"] as Int,
lastNotification = toNullableZonedDateTime(row["last_notification"]),
lastFailure = toNullableZonedDateTime(row["last_failure"]),
lastSuccess = toNullableZonedDateTime(row["last_success"])
lastSuccess = toNullableZonedDateTime(row["last_success"]),
sysAttrs = row["sys_attrs"] as Boolean
),
isActive = toBoolean(row["is_active"]),
contexts = toList(row["contexts"]),
Expand Down Expand Up @@ -718,7 +720,8 @@ class SubscriptionService(
timesSent = row["times_sent"] as Int,
lastNotification = null,
lastFailure = null,
lastSuccess = null
lastSuccess = null,
sysAttrs = row["sys_attrs"] as Boolean
),
contexts = toList(row["contexts"]),
throttling = toNullableInt(row["throttling"])
Expand Down Expand Up @@ -757,7 +760,7 @@ class SubscriptionService(
scope_q, notif_attributes, notif_format, endpoint_uri, endpoint_accept, endpoint_receiver_info,
endpoint_notifier_info, status, times_sent, last_notification, last_failure, last_success, is_active,
entity_selector.id as entity_id, id_pattern, entity_selector.type_selection as type_selection, georel,
geometry, coordinates, pgis_geometry, geoproperty, contexts, throttling
geometry, coordinates, pgis_geometry, geoproperty, contexts, throttling, sys_attrs
FROM subscription
LEFT JOIN entity_selector ON entity_selector.subscription_id = subscription.id
LEFT JOIN geometry_query ON geometry_query.subscription_id = subscription.id
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ALTER TABLE subscription
ADD sys_attrs boolean;

UPDATE subscription
SET sys_attrs = false;
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.egm.stellio.subscription.service

import arrow.core.filterIsInstance
import arrow.core.right
import com.egm.stellio.shared.util.*
import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_COMPACTED_ENTITY_CORE_MEMBERS
Expand All @@ -21,8 +22,8 @@ import io.mockk.confirmVerified
import kotlinx.coroutines.reactor.awaitSingle
import kotlinx.coroutines.reactor.mono
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
Expand Down Expand Up @@ -69,6 +70,23 @@ class NotificationServiceTests {
}
""".trimIndent()

private val entityWithSysAttrs =
"""
{
"id":"$apiaryId",
"type":"Apiary",
"createdAt": "2024-02-13T18:15:00Z",
"modifiedAt": "2024-02-13T18:16:00Z",
"name": {
"type":"Property",
"value":"ApiarySophia",
"createdAt": "2024-02-13T18:15:00Z",
"modifiedAt": "2024-02-13T18:16:00Z"
},
"@context":[ "$APIC_COMPOUND_CONTEXT" ]
}
""".trimIndent()

@Test
fun `it should notify the subscriber and update the subscription`() = runTest {
val subscription = gimmeRawSubscription()
Expand Down Expand Up @@ -404,4 +422,78 @@ class NotificationServiceTests {

verify(1, postRequestedFor(urlPathEqualTo("/notification")))
}

@Test
fun `it should notify the subscriber and return entities without sysAttrs if sysAttrs is false`() = runTest {
val subscription = gimmeRawSubscription()
val expandedEntity = expandJsonLdEntity(entityWithSysAttrs)

coEvery {
subscriptionService.getMatchingSubscriptions(any(), any(), any())
} returns listOf(subscription).right()
coEvery { subscriptionService.updateSubscriptionNotification(any(), any(), any()) } returns 1

stubFor(
post(urlMatching("/notification"))
.willReturn(ok())
)

notificationService.notifyMatchingSubscribers(
expandedEntity,
setOf(NGSILD_NAME_PROPERTY),
ATTRIBUTE_UPDATED
).shouldSucceedWith {
val entity = it[0].second.data[0]
assertFalse(entity.containsKey("createdAt"))
assertFalse(entity.containsKey("modifiedAt"))

entity.filterIsInstance<String, Map<String, Any>>()
.forEach { (_, v) ->
assertThat(v).doesNotContainKey("createdAt")
assertThat(v).doesNotContainKey("modifiedAt")
}
}
}

@Test
fun `it should notify the subscriber and return entities with sysAttrs if sysAttrs is true`() = runTest {
val subscription = gimmeRawSubscription().copy(
notification = NotificationParams(
attributes = emptyList(),
endpoint = Endpoint(
uri = "http://localhost:8089/notification".toUri(),
accept = Endpoint.AcceptType.JSONLD
),
sysAttrs = true
)
)

val expandedEntity = expandJsonLdEntity(entityWithSysAttrs)

coEvery {
subscriptionService.getMatchingSubscriptions(any(), any(), any())
} returns listOf(subscription).right()
coEvery { subscriptionService.updateSubscriptionNotification(any(), any(), any()) } returns 1

stubFor(
post(urlMatching("/notification"))
.willReturn(ok())
)

notificationService.notifyMatchingSubscribers(
expandedEntity,
setOf(NGSILD_NAME_PROPERTY),
ATTRIBUTE_UPDATED
).shouldSucceedWith {
val entity = it[0].second.data[0]
assertTrue(entity.containsKey("createdAt"))
assertTrue(entity.containsKey("modifiedAt"))

entity.filterIsInstance<String, Map<String, Any>>()
.forEach { (_, v) ->
assertThat(v).containsKey("createdAt")
assertThat(v).containsKey("modifiedAt")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer {
Endpoint.AcceptType.JSON,
listOf(EndpointInfo("Authorization-token", "Authorization-token-value"))
) &&
it.notification.sysAttrs &&
it.expiresAt == ZonedDateTime.parse("2100-01-01T00:00:00Z") &&
it.throttling == 60
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
"receiverInfo": [
{ "key": "Authorization-token", "value": "Authorization-token-value" }
]
}
},
"sysAttrs": true
},
"expiresAt": "2100-01-01T00:00:00Z",
"throttling": 60
Expand Down
Loading