Skip to content

Commit

Permalink
feat(subscription): update to receiver / notifier info in notificatio…
Browse files Browse the repository at this point in the history
…n params (#1050)

- was still on pre-1.3 data model with only an info field
  • Loading branch information
bobeal authored Nov 27, 2023
1 parent fbef632 commit 17c1c56
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 38 deletions.
2 changes: 1 addition & 1 deletion subscription-service/config/detekt/baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<CurrentIssues>
<ID>CyclomaticComplexMethod:SubscriptionServiceTests.kt$SubscriptionServiceTests$@Test fun `it should load a subscription with all possible members`()</ID>
<ID>LongMethod:EntityEventListenerService.kt$EntityEventListenerService$internal suspend fun dispatchEntityEvent(content: String)</ID>
<ID>LongParameterList:FixtureUtils.kt$( withQueryAndGeoQuery: Pair&lt;Boolean, Boolean&gt; = Pair(true, true), withEndpointInfo: Boolean = true, withNotifParams: Pair&lt;FormatType, List&lt;String&gt;&gt; = Pair(FormatType.NORMALIZED, emptyList()), withModifiedAt: Boolean = false, georel: String = "within", geometry: String = "Polygon", coordinates: String = "[[[100.0, 0.0], [101.0, 0.0], [101.0, 1.0], [100.0, 1.0], [100.0, 0.0]]]", timeInterval: Int? = null, contexts: List&lt;String&gt; = listOf(NGSILD_CORE_CONTEXT) )</ID>
<ID>LongParameterList:FixtureUtils.kt$( withQueryAndGeoQuery: Pair&lt;Boolean, Boolean&gt; = Pair(true, true), withEndpointReceiverInfo: Boolean = true, withNotifParams: Pair&lt;FormatType, List&lt;String&gt;&gt; = Pair(FormatType.NORMALIZED, emptyList()), withModifiedAt: Boolean = false, georel: String = "within", geometry: String = "Polygon", coordinates: String = "[[[100.0, 0.0], [101.0, 0.0], [101.0, 1.0], [100.0, 1.0], [100.0, 0.0]]]", timeInterval: Int? = null, contexts: List&lt;String&gt; = listOf(NGSILD_CORE_CONTEXT) )</ID>
<ID>TooGenericExceptionCaught:SubscriptionService.kt$SubscriptionService$e: Exception</ID>
<ID>TooManyFunctions:SubscriptionService.kt$SubscriptionService</ID>
</CurrentIssues>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import java.net.URI
data class Endpoint(
val uri: URI,
val accept: AcceptType = JSON,
val info: List<EndpointInfo>? = null
val receiverInfo: List<EndpointInfo>? = null,
val notifierInfo: List<EndpointInfo>? = null
) {

enum class AcceptType(val accept: String) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class NotificationService(
}
if (tenantUri != DEFAULT_TENANT_URI)
it.set(NGSILD_TENANT_HEADER, tenantUri.toString())
subscription.notification.endpoint.info?.forEach { endpointInfo ->
subscription.notification.endpoint.receiverInfo?.forEach { endpointInfo ->
it.set(endpointInfo.key, endpointInfo.value)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,18 @@ class SubscriptionService(
if (subscription.geoQ != null)
parseGeoQueryParameters(subscription.geoQ.toMap(), subscription.contexts).bind()
else null
val endpoint = subscription.notification.endpoint

val insertStatement =
"""
INSERT INTO subscription(id, type, subscription_name, created_at, description, watched_attributes,
notification_trigger, time_interval, q, scope_q, notif_attributes, notif_format, endpoint_uri,
endpoint_accept, endpoint_info, times_sent, is_active, expires_at, sub, contexts)
endpoint_accept, endpoint_receiver_info, endpoint_notifier_info, times_sent, is_active,
expires_at, sub, contexts)
VALUES(:id, :type, :subscription_name, :created_at, :description, :watched_attributes,
:notification_trigger, :time_interval, :q, :scope_q, :notif_attributes, :notif_format, :endpoint_uri,
:endpoint_accept, :endpoint_info, :times_sent, :is_active, :expires_at, :sub, :contexts)
:endpoint_accept, :endpoint_receiver_info, :endpoint_notifier_info, :times_sent, :is_active,
:expires_at, :sub, :contexts)
""".trimIndent()

databaseClient.sql(insertStatement)
Expand All @@ -151,9 +154,10 @@ class SubscriptionService(
.bind("scope_q", subscription.scopeQ)
.bind("notif_attributes", subscription.notification.attributes?.joinToString(separator = ","))
.bind("notif_format", subscription.notification.format.name)
.bind("endpoint_uri", subscription.notification.endpoint.uri)
.bind("endpoint_accept", subscription.notification.endpoint.accept.name)
.bind("endpoint_info", Json.of(endpointInfoToString(subscription.notification.endpoint.info)))
.bind("endpoint_uri", endpoint.uri)
.bind("endpoint_accept", endpoint.accept.name)
.bind("endpoint_receiver_info", Json.of(endpointInfoToString(endpoint.receiverInfo)))
.bind("endpoint_notifier_info", Json.of(endpointInfoToString(endpoint.notifierInfo)))
.bind("times_sent", subscription.notification.timesSent)
.bind("is_active", subscription.isActive)
.bind("expires_at", subscription.expiresAt)
Expand Down Expand Up @@ -226,10 +230,10 @@ class SubscriptionService(
"""
SELECT subscription.id as sub_id, subscription.type as sub_type, subscription_name, created_at,
modified_at, description, watched_attributes, notification_trigger, time_interval, q, notif_attributes,
notif_format, endpoint_uri, endpoint_accept, endpoint_info, status, times_sent, is_active,
last_notification, last_failure, last_success, entity_selector.id as entity_id, id_pattern,
entity_selector.type_selection as type_selection, georel, geometry, coordinates, pgis_geometry,
geoproperty, scope_q, expires_at, contexts
notif_format, endpoint_uri, endpoint_accept, endpoint_receiver_info, endpoint_notifier_info, status,
times_sent, is_active, last_notification, last_failure, last_success, entity_selector.id as entity_id,
id_pattern, entity_selector.type_selection as type_selection, georel, geometry, coordinates,
pgis_geometry, geoproperty, scope_q, expires_at, contexts
FROM subscription
LEFT JOIN entity_selector ON entity_selector.subscription_id = :id
LEFT JOIN geometry_query ON geometry_query.subscription_id = :id
Expand Down Expand Up @@ -427,12 +431,14 @@ class SubscriptionService(
Endpoint.AcceptType.JSON.name
else
Endpoint.AcceptType.JSONLD.name
val endpointInfo = endpoint["info"] as? List<Map<String, String>>
val endpointReceiverInfo = endpoint["receiverInfo"] as? List<Map<String, String>>
val endpointNotifierInfo = endpoint["notifierInfo"] as? List<Map<String, String>>

listOf(
Pair("endpoint_uri", endpoint["uri"]),
Pair("endpoint_accept", accept),
Pair("endpoint_info", Json.of(endpointInfoMapToString(endpointInfo)))
Pair("endpoint_receiver_info", Json.of(endpointInfoMapToString(endpointReceiverInfo))),
Pair("endpoint_notifier_info", Json.of(endpointInfoMapToString(endpointNotifierInfo)))
)
}
else -> throw BadRequestDataException("Could not update attribute ${attribute.key}")
Expand Down Expand Up @@ -470,10 +476,10 @@ class SubscriptionService(
"""
SELECT subscription.id as sub_id, subscription.type as sub_type, subscription_name, created_at,
modified_At, description, watched_attributes, notification_trigger, time_interval, q, notif_attributes,
notif_format, endpoint_uri, endpoint_accept, endpoint_info, status, times_sent, is_active,
last_notification, last_failure, last_success, entity_selector.id as entity_id, id_pattern,
entity_selector.type_selection as type_selection, georel, geometry, coordinates, pgis_geometry,
geoproperty, scope_q, expires_at, contexts
notif_format, endpoint_uri, endpoint_accept, endpoint_receiver_info, endpoint_notifier_info, status,
times_sent, is_active, last_notification, last_failure, last_success, entity_selector.id as entity_id,
id_pattern, entity_selector.type_selection as type_selection, georel, geometry, coordinates,
pgis_geometry, geoproperty, scope_q, expires_at, contexts
FROM subscription
LEFT JOIN entity_selector ON entity_selector.subscription_id = subscription.id
LEFT JOIN geometry_query ON geometry_query.subscription_id = subscription.id
Expand Down Expand Up @@ -515,7 +521,7 @@ class SubscriptionService(
entity_selector.id as entity_id, entity_selector.id_pattern as id_pattern,
entity_selector.type_selection as type_selection, georel, geometry, coordinates, pgis_geometry,
geoproperty, scope_q, notif_attributes, notif_format, endpoint_uri, endpoint_accept, times_sent,
endpoint_info, contexts
endpoint_receiver_info, endpoint_notifier_info, contexts
FROM subscription
LEFT JOIN entity_selector on subscription.id = entity_selector.subscription_id
LEFT JOIN geometry_query on subscription.id = geometry_query.subscription_id
Expand Down Expand Up @@ -653,7 +659,8 @@ class SubscriptionService(
endpoint = Endpoint(
uri = toUri(row["endpoint_uri"]),
accept = toEnum(row["endpoint_accept"]!!),
info = parseEndpointInfo(toJsonString(row["endpoint_info"]))
receiverInfo = parseEndpointInfo(toJsonString(row["endpoint_receiver_info"])),
notifierInfo = parseEndpointInfo(toJsonString(row["endpoint_notifier_info"]))
),
status = toOptionalEnum<NotificationParams.StatusType>(row["status"]),
timesSent = row["times_sent"] as Int,
Expand Down Expand Up @@ -682,7 +689,8 @@ class SubscriptionService(
endpoint = Endpoint(
uri = toUri(row["endpoint_uri"]),
accept = toEnum(row["endpoint_accept"]!!),
info = parseEndpointInfo(toJsonString(row["endpoint_info"]))
receiverInfo = parseEndpointInfo(toJsonString(row["endpoint_receiver_info"])),
notifierInfo = parseEndpointInfo(toJsonString(row["endpoint_notifier_info"]))
),
status = null,
timesSent = row["times_sent"] as Int,
Expand Down Expand Up @@ -723,10 +731,10 @@ class SubscriptionService(
"""
SELECT subscription.id as sub_id, subscription.type as sub_type, subscription_name, created_at,
modified_At, expires_at, description, watched_attributes, notification_trigger, time_interval, q,
scope_q, notif_attributes, notif_format, endpoint_uri, endpoint_accept, endpoint_info, status,
times_sent, last_notification, last_failure, last_success, is_active, entity_selector.id as entity_id,
id_pattern, entity_selector.type_selection as type_selection, georel, geometry, coordinates,
pgis_geometry, geoproperty, contexts
scope_q, notif_attributes, notif_format, endpoint_uri, endpoint_accept, endpoint_receiver_info,
endpoint_notifier_info, status, times_sent, last_notification, last_failure, last_success, is_active,
entity_selector.id as entity_id, id_pattern, entity_selector.type_selection as type_selection, georel,
geometry, coordinates, pgis_geometry, geoproperty, contexts
FROM subscription
LEFT JOIN entity_selector ON entity_selector.subscription_id = subscription.id
LEFT JOIN geometry_query ON geometry_query.subscription_id = subscription.id
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
alter table subscription
rename column endpoint_info to endpoint_receiver_info;

alter table subscription
add column endpoint_notifier_info jsonb;
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class TimeIntervalNotificationJobTest {

@Test
fun `it should not return twice the same entity if there is overlap between 2 entity infos`() {
val subscription = gimmeRawSubscription(withEndpointInfo = false).copy(
val subscription = gimmeRawSubscription(withEndpointReceiverInfo = false).copy(
entities = setOf(
EntitySelector(
id = "urn:ngsi-ld:BeeHive:TESTC".toUri(),
Expand Down Expand Up @@ -226,7 +226,7 @@ class TimeIntervalNotificationJobTest {
@Test
fun `it should notify the recurring subscriptions that have reached the time interval`() = runTest {
val entity = loadSampleData("beehive.jsonld")
val subscription = gimmeRawSubscription(withEndpointInfo = false).copy(
val subscription = gimmeRawSubscription(withEndpointReceiverInfo = false).copy(
entities = setOf(
EntitySelector(
id = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import org.springframework.http.MediaType

class SubscriptionTest {

private val endpointInfo =
private val endpointReceiverInfo =
listOf(EndpointInfo(key = "Authorization-token", value = "Authorization-token-value"))

private val subscription = Subscription(
Expand All @@ -44,7 +44,7 @@ class SubscriptionTest {
endpoint = Endpoint(
uri = "http://localhost:8089/notification".toUri(),
accept = Endpoint.AcceptType.JSONLD,
info = endpointInfo
receiverInfo = endpointReceiverInfo
)
),
contexts = listOf(APIC_COMPOUND_CONTEXT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ class SubscriptionServiceTests : WithTimescaleContainer {
"endpoint" to mapOf(
"accept" to "application/ld+json",
"uri" to "http://localhost:8080",
"info" to listOf(
"receiverInfo" to listOf(
mapOf("key" to "Authorization-token", "value" to "Authorization-token-newValue")
)
)
Expand All @@ -736,10 +736,9 @@ class SubscriptionServiceTests : WithTimescaleContainer {
it.notification.format.name == "KEY_VALUES" &&
it.notification.endpoint.accept.name == "JSONLD" &&
it.notification.endpoint.uri.toString() == "http://localhost:8080" &&
it.notification.endpoint.info == listOf(
it.notification.endpoint.receiverInfo == listOf(
EndpointInfo("Authorization-token", "Authorization-token-newValue")
) &&
it.notification.endpoint.info!!.size == 1
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ fun gimmeSubscriptionFromMembers(

fun gimmeRawSubscription(
withQueryAndGeoQuery: Pair<Boolean, Boolean> = Pair(true, true),
withEndpointInfo: Boolean = true,
withEndpointReceiverInfo: Boolean = true,
withNotifParams: Pair<FormatType, List<String>> = Pair(FormatType.NORMALIZED, emptyList()),
withModifiedAt: Boolean = false,
georel: String = "within",
Expand All @@ -59,8 +59,8 @@ fun gimmeRawSubscription(
else
null

val endpointInfo =
if (withEndpointInfo)
val endpointReceiverInfo =
if (withEndpointReceiverInfo)
listOf(EndpointInfo(key = "Authorization-token", value = "Authorization-token-value"))
else
null
Expand All @@ -81,7 +81,7 @@ fun gimmeRawSubscription(
endpoint = Endpoint(
uri = "http://localhost:8089/notification".toUri(),
accept = Endpoint.AcceptType.JSONLD,
info = endpointInfo
receiverInfo = endpointReceiverInfo
)
),
contexts = contexts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"endpoint": {
"uri": "http://localhost:8084",
"accept": "application/json",
"info": [
"receiverInfo": [
{ "key": "Authorization-token", "value": "Authorization-token-value" }
]
}
Expand Down

0 comments on commit 17c1c56

Please sign in to comment.