Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add throttling #1085

Merged
merged 22 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
fc72841
added throttling to subscription
ranim-n Jan 19, 2024
935ce1a
Merge branch 'develop' into feature/add-throttling
ranim-n Jan 19, 2024
c3a57fe
commit after merge with dev
ranim-n Jan 19, 2024
ed58f0f
Merge branch 'develop' into feature/add-throttling
ranim-n Jan 22, 2024
6349447
Merge branch 'develop' into feature/add-throttling
ranim-n Jan 23, 2024
f911bb7
added throttling to subscription create and added unit tests
ranim-n Jan 23, 2024
efe50ba
fixed unit test for a subscription with all attributes
ranim-n Jan 24, 2024
a5c5b36
fixed indentation
ranim-n Jan 24, 2024
7f59493
added throttling to the update of a subscription
ranim-n Jan 25, 2024
ac455f1
fixed update method's length after adding throttling
ranim-n Jan 25, 2024
b115fb2
fixed issues from detekt
ranim-n Jan 25, 2024
e2e3a93
fixed issues from detekt part 2
ranim-n Jan 25, 2024
fcbc190
added throttling to getting matching subscriptions
ranim-n Jan 29, 2024
90d20a1
fixed previous commit related to throttling while getting matching su…
ranim-n Jan 31, 2024
3adb87f
minor fixes : added final unit test
ranim-n Jan 31, 2024
7fa1398
added delay after creating a subscription
ranim-n Feb 1, 2024
3d4771f
fixed import
ranim-n Feb 1, 2024
cfa0316
fix: incorrect condition when checking if throttling has elapsed
bobeal Feb 1, 2024
6d1f1ff
Merge branch 'develop' into feature/add-throttling
ranim-n Feb 2, 2024
5692d99
fixed condition while checking timeInterval
ranim-n Feb 2, 2024
1539a36
Merge branch 'develop' into feature/add-throttling
ranim-n Feb 4, 2024
4afeb58
fixed functions name
ranim-n Feb 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ data class Subscription(
// - used to compact entities in notifications
// - used when needed to serve contexts in JSON notifications
@JsonProperty(value = JSONLD_CONTEXT)
val contexts: List<ExpandedTerm>
val contexts: List<ExpandedTerm>,
val throttling: Int? = null
) {

@Transient
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package com.egm.stellio.subscription.service

import arrow.core.*
import arrow.core.Either
import arrow.core.Option
import arrow.core.left
import arrow.core.raise.either
import arrow.core.right
import com.egm.stellio.shared.model.*
import com.egm.stellio.shared.util.*
import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_TYPE_TERM
Expand All @@ -10,19 +13,18 @@ import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_SUBSCRIPTION_TERM
import com.egm.stellio.shared.util.JsonLdUtils.expandJsonLdTerm
import com.egm.stellio.subscription.config.SubscriptionProperties
import com.egm.stellio.subscription.model.*
import com.egm.stellio.subscription.model.GeoQ
import com.egm.stellio.subscription.model.Subscription
import com.egm.stellio.subscription.utils.*
import com.egm.stellio.subscription.utils.ParsingUtils.endpointInfoMapToString
import com.egm.stellio.subscription.utils.ParsingUtils.endpointInfoToString
import com.egm.stellio.subscription.utils.ParsingUtils.parseEndpointInfo
import com.egm.stellio.subscription.utils.ParsingUtils.parseEntitySelector
import com.egm.stellio.subscription.utils.ParsingUtils.toSqlColumnName
import com.egm.stellio.subscription.utils.ParsingUtils.toSqlValue
import com.egm.stellio.subscription.web.invalidSubscriptiondAttributeMessage
import com.egm.stellio.subscription.web.unsupportedSubscriptiondAttributeMessage
import io.r2dbc.postgresql.codec.Json
import kotlinx.coroutines.reactive.awaitFirst
import org.locationtech.jts.geom.Geometry
import org.slf4j.LoggerFactory
import org.springframework.data.r2dbc.core.R2dbcEntityTemplate
import org.springframework.data.relational.core.query.Criteria.where
import org.springframework.data.relational.core.query.Query.query
Expand All @@ -44,13 +46,12 @@ class SubscriptionService(
private val r2dbcEntityTemplate: R2dbcEntityTemplate
) {

private val logger = LoggerFactory.getLogger(javaClass)

suspend fun validateNewSubscription(subscription: Subscription): Either<APIException, Unit> = either {
checkTypeIsSubscription(subscription).bind()
checkIdIsValid(subscription).bind()
checkEntitiesOrWatchedAttributes(subscription).bind()
checkTimeIntervalGreaterThanZero(subscription).bind()
checkThrottlingGreaterThanZero(subscription).bind()
checkSubscriptionValidity(subscription).bind()
checkExpiresAtInTheFuture(subscription).bind()
checkIdPatternIsValid(subscription).bind()
Expand All @@ -73,17 +74,31 @@ class SubscriptionService(
else Unit.right()

private fun checkSubscriptionValidity(subscription: Subscription): Either<APIException, Unit> =
if (subscription.watchedAttributes != null && subscription.timeInterval != null)
BadRequestDataException(
"You can't use 'timeInterval' in conjunction with 'watchedAttributes'"
).left()
else Unit.right()
when {
subscription.watchedAttributes != null && subscription.timeInterval != null -> {
BadRequestDataException(
"You can't use 'timeInterval' in conjunction with 'watchedAttributes'"
).left()
}
subscription.timeInterval != null && subscription.throttling != null -> {
BadRequestDataException(
"You can't use 'timeInterval' in conjunction with 'throttling'"
).left()
}
else ->
Unit.right()
}

private fun checkTimeIntervalGreaterThanZero(subscription: Subscription): Either<APIException, Unit> =
if (subscription.timeInterval != null && subscription.timeInterval < 1)
if (subscription.timeInterval != null && subscription.timeInterval < 1 && subscription.throttling == null)
bobeal marked this conversation as resolved.
Show resolved Hide resolved
BadRequestDataException("The value of 'timeInterval' must be greater than zero (int)").left()
else Unit.right()

private fun checkThrottlingGreaterThanZero(subscription: Subscription): Either<APIException, Unit> =
if (subscription.throttling != null && subscription.throttling < 1)
BadRequestDataException("The value of 'throttling' must be greater than zero (int)").left()
else Unit.right()

private fun checkExpiresAtInTheFuture(subscription: Subscription): Either<BadRequestDataException, Unit> =
if (subscription.expiresAt != null && subscription.expiresAt.isBefore(ngsiLdDateTime()))
BadRequestDataException("'expiresAt' must be in the future").left()
Expand Down Expand Up @@ -134,11 +149,11 @@ class SubscriptionService(
INSERT INTO subscription(id, type, subscription_name, created_at, description, watched_attributes,
notification_trigger, time_interval, q, scope_q, notif_attributes, notif_format, endpoint_uri,
endpoint_accept, endpoint_receiver_info, endpoint_notifier_info, times_sent, is_active,
expires_at, sub, contexts)
expires_at, sub, contexts, throttling)
VALUES(:id, :type, :subscription_name, :created_at, :description, :watched_attributes,
:notification_trigger, :time_interval, :q, :scope_q, :notif_attributes, :notif_format, :endpoint_uri,
:endpoint_accept, :endpoint_receiver_info, :endpoint_notifier_info, :times_sent, :is_active,
:expires_at, :sub, :contexts)
:expires_at, :sub, :contexts, :throttling)
""".trimIndent()

databaseClient.sql(insertStatement)
Expand All @@ -163,6 +178,7 @@ class SubscriptionService(
.bind("expires_at", subscription.expiresAt)
.bind("sub", sub.toStringValue())
.bind("contexts", subscription.contexts.toTypedArray())
.bind("throttling", subscription.throttling)
.execute().bind()

geoQuery?.let {
Expand Down Expand Up @@ -233,7 +249,7 @@ class SubscriptionService(
notif_format, endpoint_uri, endpoint_accept, endpoint_receiver_info, endpoint_notifier_info, status,
times_sent, is_active, last_notification, last_failure, last_success, entity_selector.id as entity_id,
id_pattern, entity_selector.type_selection as type_selection, georel, geometry, coordinates,
pgis_geometry, geoproperty, scope_q, expires_at, contexts
pgis_geometry, geoproperty, scope_q, expires_at, contexts, throttling
FROM subscription
LEFT JOIN entity_selector ON entity_selector.subscription_id = :id
LEFT JOIN geometry_query ON geometry_query.subscription_id = :id
Expand Down Expand Up @@ -337,22 +353,21 @@ class SubscriptionService(
"q",
"scopeQ",
"isActive",
"modifiedAt"
"modifiedAt",
"throttling"
).contains(it.key) -> {
val columnName = it.key.toSqlColumnName()
val value = it.value.toSqlValue(it.key)
updateSubscriptionAttribute(subscriptionId, columnName, value).bind()
}

listOf("csf", "throttling", "temporalQ").contains(it.key) -> {
logger.warn("Subscription $subscriptionId has unsupported attribute: ${it.key}")
NotImplementedException("Subscription $subscriptionId has unsupported attribute: ${it.key}")
listOf("csf", "temporalQ").contains(it.key) -> {
NotImplementedException(unsupportedSubscriptiondAttributeMessage(subscriptionId, it))
.left().bind<Unit>()
}

else -> {
logger.warn("Subscription $subscriptionId has invalid attribute: ${it.key}")
BadRequestDataException("Subscription $subscriptionId has invalid attribute: ${it.key}")
BadRequestDataException(invalidSubscriptiondAttributeMessage(subscriptionId, it))
.left().bind<Unit>()
}
}
Expand Down Expand Up @@ -416,6 +431,7 @@ class SubscriptionService(
}
listOf(Pair("notif_attributes", attributes))
}

"format" -> {
val format =
if (attribute.value == "keyValues")
Expand All @@ -424,6 +440,7 @@ class SubscriptionService(
NotificationParams.FormatType.NORMALIZED.name
listOf(Pair("notif_format", format))
}

"endpoint" -> {
val endpoint = attribute.value as Map<String, Any>
val accept =
Expand All @@ -441,6 +458,7 @@ class SubscriptionService(
Pair("endpoint_notifier_info", Json.of(endpointInfoMapToString(endpointNotifierInfo)))
)
}

else -> throw BadRequestDataException("Could not update attribute ${attribute.key}")
}
}
Expand Down Expand Up @@ -479,7 +497,7 @@ class SubscriptionService(
notif_format, endpoint_uri, endpoint_accept, endpoint_receiver_info, endpoint_notifier_info, status,
times_sent, is_active, last_notification, last_failure, last_success, entity_selector.id as entity_id,
id_pattern, entity_selector.type_selection as type_selection, georel, geometry, coordinates,
pgis_geometry, geoproperty, scope_q, expires_at, contexts
pgis_geometry, geoproperty, scope_q, expires_at, contexts, throttling
FROM subscription
LEFT JOIN entity_selector ON entity_selector.subscription_id = subscription.id
LEFT JOIN geometry_query ON geometry_query.subscription_id = subscription.id
Expand Down Expand Up @@ -521,13 +539,15 @@ class SubscriptionService(
entity_selector.id as entity_id, entity_selector.id_pattern as id_pattern,
entity_selector.type_selection as type_selection, georel, geometry, coordinates, pgis_geometry,
geoproperty, scope_q, notif_attributes, notif_format, endpoint_uri, endpoint_accept, times_sent,
endpoint_receiver_info, endpoint_notifier_info, contexts
endpoint_receiver_info, endpoint_notifier_info, contexts, throttling
FROM subscription
LEFT JOIN entity_selector on subscription.id = entity_selector.subscription_id
LEFT JOIN geometry_query on subscription.id = geometry_query.subscription_id
WHERE is_active
AND ( expires_at is null OR expires_at >= :date )
AND time_interval IS NULL
AND ( throttling IS NULL
OR (last_notification + throttling * INTERVAL '1 second') > CURRENT_TIMESTAMP )
bobeal marked this conversation as resolved.
Show resolved Hide resolved
AND ( string_to_array(watched_attributes, ',') && string_to_array(:updatedAttributes, ',')
OR watched_attributes IS NULL)
AND CASE
Expand Down Expand Up @@ -669,7 +689,8 @@ class SubscriptionService(
lastSuccess = toNullableZonedDateTime(row["last_success"])
),
isActive = toBoolean(row["is_active"]),
contexts = toList(row["contexts"])
contexts = toList(row["contexts"]),
throttling = toNullableInt(row["throttling"])
)
}

Expand Down Expand Up @@ -698,7 +719,8 @@ class SubscriptionService(
lastFailure = null,
lastSuccess = null
),
contexts = toList(row["contexts"])
contexts = toList(row["contexts"]),
throttling = toNullableInt(row["throttling"])
)
}

Expand Down Expand Up @@ -734,11 +756,12 @@ class SubscriptionService(
scope_q, notif_attributes, notif_format, endpoint_uri, endpoint_accept, endpoint_receiver_info,
endpoint_notifier_info, status, times_sent, last_notification, last_failure, last_success, is_active,
entity_selector.id as entity_id, id_pattern, entity_selector.type_selection as type_selection, georel,
geometry, coordinates, pgis_geometry, geoproperty, contexts
geometry, coordinates, pgis_geometry, geoproperty, contexts, throttling
FROM subscription
LEFT JOIN entity_selector ON entity_selector.subscription_id = subscription.id
LEFT JOIN geometry_query ON geometry_query.subscription_id = subscription.id
WHERE time_interval IS NOT NULL
AND throttling IS NULL
bobeal marked this conversation as resolved.
Show resolved Hide resolved
AND (last_notification IS NULL
OR ((EXTRACT(EPOCH FROM last_notification) + time_interval) < EXTRACT(EPOCH FROM :currentDate))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,8 @@ fun subscriptionNotFoundMessage(subscriptionId: URI) = "Could not find a subscri
fun subscriptionAlreadyExistsMessage(subscriptionId: URI) = "A subscription with id $subscriptionId already exists"
fun subscriptionUnauthorizedMessage(subscriptionId: URI) =
"User is not authorized to access subscription $subscriptionId"
fun unsupportedSubscriptiondAttributeMessage(subscriptionId: URI, attribute: Map.Entry<String, Any>) =
"Subscription $subscriptionId has unsupported attribute: ${attribute.key}"

fun invalidSubscriptiondAttributeMessage(subscriptionId: URI, attribute: Map.Entry<String, Any>) =
"Subscription $subscriptionId has invalid attribute: ${attribute.key}"
bobeal marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE subscription
ADD throttling integer;
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,43 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer {
}
}

@Test
fun `it should not allow a subscription with timeInterval and throttling`() = runTest {
val payload = mapOf(
"id" to "urn:ngsi-ld:Beehive:1234567890".toUri(),
"type" to NGSILD_SUBSCRIPTION_TERM,
"timeInterval" to 10,
"entities" to listOf(mapOf("type" to BEEHIVE_TYPE)),
"notification" to mapOf("endpoint" to mapOf("uri" to "http://my.endpoint/notifiy")),
"throttling" to 30
)

val subscription = ParsingUtils.parseSubscription(payload, emptyList()).shouldSucceedAndResult()
subscriptionService.validateNewSubscription(subscription)
.shouldFailWith {
it is BadRequestDataException &&
it.message == "You can't use 'timeInterval' in conjunction with 'throttling'"
}
}

@Test
fun `it should not allow a subscription with a negative throttling`() = runTest {
val payload = mapOf(
"id" to "urn:ngsi-ld:Beehive:1234567890".toUri(),
"type" to NGSILD_SUBSCRIPTION_TERM,
"entities" to listOf(mapOf("type" to BEEHIVE_TYPE)),
"notification" to mapOf("endpoint" to mapOf("uri" to "http://my.endpoint/notifiy")),
"throttling" to -30
)

val subscription = ParsingUtils.parseSubscription(payload, emptyList()).shouldSucceedAndResult()
subscriptionService.validateNewSubscription(subscription)
.shouldFailWith {
it is BadRequestDataException &&
it.message == "The value of 'throttling' must be greater than zero (int)"
}
}

@Test
fun `it should not allow a subscription with a negative timeInterval`() = runTest {
val payload = mapOf(
Expand Down Expand Up @@ -285,7 +322,8 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer {
Endpoint.AcceptType.JSON,
listOf(EndpointInfo("Authorization-token", "Authorization-token-value"))
) &&
it.expiresAt == ZonedDateTime.parse("2100-01-01T00:00:00Z")
it.expiresAt == ZonedDateTime.parse("2100-01-01T00:00:00Z") &&
it.throttling == 60
}
}

Expand Down Expand Up @@ -678,7 +716,8 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer {
"geometry" to "Point",
"coordinates" to "[100.0, 0.0]",
"geoproperty" to "https://uri.etsi.org/ngsi-ld/observationSpace"
)
),
"throttling" to 50
)

subscriptionService.update(subscription.id, parsedInput, APIC_COMPOUND_CONTEXTS)
Expand All @@ -694,7 +733,8 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer {
it.geoQ!!.georel == "equals" &&
it.geoQ!!.geometry == "Point" &&
it.geoQ!!.coordinates == "[100.0, 0.0]" &&
it.geoQ!!.geoproperty == "https://uri.etsi.org/ngsi-ld/observationSpace"
it.geoQ!!.geoproperty == "https://uri.etsi.org/ngsi-ld/observationSpace" &&
it.throttling == 50
}
}

Expand Down Expand Up @@ -887,12 +927,12 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer {
val subscription = loadAndDeserializeSubscription("subscription_minimal_entities.json")
subscriptionService.create(subscription, mockUserSub).shouldSucceed()

val parsedInput = mapOf("type" to NGSILD_SUBSCRIPTION_TERM, "throttling" to "someValue")
val parsedInput = mapOf("type" to NGSILD_SUBSCRIPTION_TERM, "csf" to "someValue")

subscriptionService.update(subscription.id, parsedInput, APIC_COMPOUND_CONTEXTS)
.shouldFailWith {
it is NotImplementedException &&
it.message == "Subscription urn:ngsi-ld:Subscription:1 has unsupported attribute: throttling"
it.message == "Subscription urn:ngsi-ld:Subscription:1 has unsupported attribute: csf"
}
}

Expand Down Expand Up @@ -1062,6 +1102,29 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer {
.shouldSucceedWith { assertEquals(0, it.size) }
}

@Test
fun `it should not return a subscription if throttling has not elapsed yet`() = runTest {
bobeal marked this conversation as resolved.
Show resolved Hide resolved
val expandedEntity = expandJsonLdEntity(entity, APIC_COMPOUND_CONTEXTS)

val payload = mapOf(
"id" to "urn:ngsi-ld:Beehive:1234567890".toUri(),
"type" to NGSILD_SUBSCRIPTION_TERM,
"watchedAttributes" to listOf(INCOMING_COMPACT_PROPERTY),
"notification" to mapOf(
"endpoint" to mapOf("uri" to "http://my.endpoint/notifiy"),
"lastNotification" to ngsiLdDateTime()
),
"throttling" to 300
)

val subscription = ParsingUtils.parseSubscription(payload, emptyList()).shouldSucceedAndResult()

subscriptionService.create(subscription, mockUserSub).shouldSucceed()

subscriptionService.getMatchingSubscriptions(expandedEntity, setOf(NGSILD_LOCATION_PROPERTY), ATTRIBUTE_UPDATED)
bobeal marked this conversation as resolved.
Show resolved Hide resolved
.shouldSucceedWith { assertEquals(0, it.size) }
bobeal marked this conversation as resolved.
Show resolved Hide resolved
}

@ParameterizedTest
@CsvSource(
"near;minDistance==1000, Polygon, '[[[100.0, 0.0], [101.0, 0.0], [101.0, -1.0], [100.0, 0.0]]]', 0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ interface WithTimescaleContainer {
withEnv("POSTGRES_MULTIPLE_EXTENSIONS", "postgis,timescaledb,pgcrypto")
withExposedPorts(5432)
setWaitStrategy(Wait.forLogMessage(".*database system is ready to accept connections.*", 2))
withReuse(true)
bobeal marked this conversation as resolved.
Show resolved Hide resolved
}

@JvmStatic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,6 @@
]
}
},
"expiresAt": "2100-01-01T00:00:00Z"
"expiresAt": "2100-01-01T00:00:00Z",
"throttling": 60
bobeal marked this conversation as resolved.
Show resolved Hide resolved
}
Loading