From fbdbd2bd4c611739ea331f43d99328a094cd7bfd Mon Sep 17 00:00:00 2001 From: "j.azzam" Date: Thu, 27 Jun 2024 09:45:57 +0200 Subject: [PATCH 1/8] Adding new attributes to the HttpCustom model (json, ngsi and timeout). Also added some tests for the pydantic model. --- filip/clients/ngsi_v2/cb.py | 2 +- filip/models/ngsi_v2/subscriptions.py | 66 ++++++++++++++++------ tests/models/test_ngsi_v2_subscriptions.py | 12 ++++ 3 files changed, 61 insertions(+), 19 deletions(-) diff --git a/filip/clients/ngsi_v2/cb.py b/filip/clients/ngsi_v2/cb.py index 57b4998b..cfe4c45a 100644 --- a/filip/clients/ngsi_v2/cb.py +++ b/filip/clients/ngsi_v2/cb.py @@ -1898,7 +1898,7 @@ def notify(self, message: Message) -> None: except requests.RequestException as err: msg = ( f"Sending notifcation message failed! \n " - f"{message.model_dump_json(inent=2)}" + f"{message.model_dump_json(indent=2)}" ) self.log_error(err=err, msg=msg) raise diff --git a/filip/models/ngsi_v2/subscriptions.py b/filip/models/ngsi_v2/subscriptions.py index f0be6504..23a6bc2e 100644 --- a/filip/models/ngsi_v2/subscriptions.py +++ b/filip/models/ngsi_v2/subscriptions.py @@ -14,6 +14,11 @@ from filip.utils.validators import validate_mqtt_url, validate_mqtt_topic from filip.models.ngsi_v2.context import ContextEntity from filip.custom_types import AnyMqttUrl +import warnings + +# The pydantic models still have a .json() function, but this method is deprecated. +warnings.filterwarnings("ignore", category=UserWarning, + message='Field name "json" shadows an attribute in parent "Http"') class Message(BaseModel): @@ -69,6 +74,32 @@ class HttpCustom(Http): 'default payload (see "Notification Messages" sections) ' 'is used.' ) + json: Optional[Dict[str, Union[str, Json]]] = Field( + default=None, + description="JSON-based payload to be used in notifications. See 'JSON Payloads' section for more details." + ) + ngsi: Optional[Dict[str, Union[str, Json]]] = Field( + default=None, + description="NGSI patching for payload to be used in notifications. See 'NGSI payload patching' section for " + "more details." + ) + timeout: Optional[int] = Field( + default=None, + description="Maximum time (in milliseconds) the subscription waits for the response. The maximum value allowed " + "for this parameter is 1800000 (30 minutes). If timeout is defined to 0 or omitted, then the value " + "passed as -httpTimeout CLI parameter is used. See section in the 'Command line options' for more " + "details." + ) + + @model_validator(mode='after') + def validate_http(self): + fields = [self.payload, self.json, self.ngsi] + filled_fields = [field for field in fields if field is not None] + + if len(filled_fields) > 1: + raise ValueError("Only one of payload, json or ngsi fields accepted at the same time in httpCustom.") + + return self class Mqtt(BaseModel): @@ -82,7 +113,7 @@ class Mqtt(BaseModel): 'only includes host and port)') topic: str = Field( description='to specify the MQTT topic to use', - ) + ) valid_type = field_validator("topic")(validate_mqtt_topic) qos: Optional[int] = Field( default=0, @@ -205,17 +236,17 @@ class Notification(BaseModel): '[A=0, B=null, C=null]. This ' ) - @field_validator('httpCustom') - def validate_http(cls, http_custom, values): - if http_custom is not None: - assert values['http'] is None - return http_custom + @model_validator(mode='after') + def validate_http(self): + if self.httpCustom is not None: + assert self.http is None + return self - @field_validator('exceptAttrs') - def validate_attr(cls, except_attrs, values): - if except_attrs is not None: - assert values['attrs'] is None - return except_attrs + @model_validator(mode='after') + def validate_attr(self): + if self.exceptAttrs is not None: + assert self.attrs is None + return self @model_validator(mode='after') def validate_endpoints(self): @@ -247,7 +278,7 @@ class Response(Notification): 'Last notification timestamp in ISO8601 format.' ) lastFailure: Optional[datetime] = Field( - default = None, + default=None, description='(not editable, only present in GET operations): ' 'Last failure timestamp in ISO8601 format. Not present if ' 'subscription has never had a problem with notifications.' @@ -342,21 +373,21 @@ class Subscription(BaseModel): ) subject: Subject = Field( description="An object that describes the subject of the subscription.", - example={ + examples=[{ 'entities': [{'idPattern': '.*', 'type': 'Room'}], 'condition': { 'attrs': ['temperature'], 'expression': {'q': 'temperature>40'}, }, - }, + }], ) notification: Notification = Field( description="An object that describes the notification to send when " "the subscription is triggered.", - example={ + examples=[{ 'http': {'url': 'http://localhost:1234'}, 'attrs': ['temperature', 'humidity'], - }, + }], ) expires: Optional[datetime] = Field( default=None, @@ -364,11 +395,10 @@ class Subscription(BaseModel): "Permanent subscriptions must omit this field." ) - throttling: Optional[conint(strict=True, ge=0,)] = Field( + throttling: Optional[conint(strict=True, ge=0, )] = Field( default=None, strict=True, description="Minimal period of time in seconds which " "must elapse between two consecutive notifications. " "It is optional." ) - diff --git a/tests/models/test_ngsi_v2_subscriptions.py b/tests/models/test_ngsi_v2_subscriptions.py index 6e9d453a..3ed99255 100644 --- a/tests/models/test_ngsi_v2_subscriptions.py +++ b/tests/models/test_ngsi_v2_subscriptions.py @@ -90,6 +90,10 @@ def test_notification_models(self): with self.assertRaises(ValidationError): Mqtt(url="mqtt://test.de:1883", topic='/,t') + with self.assertRaises(ValidationError): + HttpCustom(url="https://working-url.de:80", json={}, ngsi={}) + with self.assertRaises(ValidationError): + HttpCustom(url="https://working-url.de:80", payload="", json={}) httpCustom = HttpCustom(url=self.http_url) mqtt = Mqtt(url=self.mqtt_url, topic=self.mqtt_topic) @@ -100,10 +104,18 @@ def test_notification_models(self): notification = Notification.model_validate(self.notification) with self.assertRaises(ValidationError): notification.mqtt = httpCustom + notification = Notification.model_validate(self.notification) with self.assertRaises(ValidationError): notification.mqtt = mqtt + notification = Notification.model_validate(self.notification) with self.assertRaises(ValidationError): notification.mqtt = mqttCustom + notification = Notification.model_validate(self.notification) + with self.assertRaises(ValidationError): + notification.httpCustom = httpCustom + notification = Notification.model_validate(self.notification) + with self.assertRaises(ValidationError): + notification.exceptAttrs = ["temperature", "humidity"] # test onlyChangedAttrs-field notification = Notification.model_validate(self.notification) From 10b9a48c3245219de44d86260452b75862f7dc90 Mon Sep 17 00:00:00 2001 From: "j.azzam" Date: Thu, 27 Jun 2024 10:09:45 +0200 Subject: [PATCH 2/8] Renaming notification payloads validation functions --- filip/models/ngsi_v2/subscriptions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/filip/models/ngsi_v2/subscriptions.py b/filip/models/ngsi_v2/subscriptions.py index 23a6bc2e..46d16bd9 100644 --- a/filip/models/ngsi_v2/subscriptions.py +++ b/filip/models/ngsi_v2/subscriptions.py @@ -92,7 +92,7 @@ class HttpCustom(Http): ) @model_validator(mode='after') - def validate_http(self): + def validate_notification_payloads(self): fields = [self.payload, self.json, self.ngsi] filled_fields = [field for field in fields if field is not None] From c0ba701ce9d42bfc609f95e1c0892515b24ad15f Mon Sep 17 00:00:00 2001 From: SystemsPurge Date: Thu, 27 Jun 2024 14:15:32 +0200 Subject: [PATCH 3/8] Added ngsi and json notification payload types, to httpCustom and mqttCustom notifications --- filip/models/ngsi_v2/subscriptions.py | 40 ++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/filip/models/ngsi_v2/subscriptions.py b/filip/models/ngsi_v2/subscriptions.py index f0be6504..f30ba255 100644 --- a/filip/models/ngsi_v2/subscriptions.py +++ b/filip/models/ngsi_v2/subscriptions.py @@ -9,7 +9,8 @@ field_validator, model_validator, ConfigDict, BaseModel, \ conint, \ Field, \ - Json + Json, \ + AliasChoices from .base import AttrsFormat, EntityPattern, Http, Status, Expression from filip.utils.validators import validate_mqtt_url, validate_mqtt_topic from filip.models.ngsi_v2.context import ContextEntity @@ -69,6 +70,22 @@ class HttpCustom(Http): 'default payload (see "Notification Messages" sections) ' 'is used.' ) + json: Optional[Dict[str,Any]] = Field( + default=None, + description='get a json as notification. If omitted, the default' + 'payload (see "Notification Messages" sections) is used.' + ) + ngsi:Optional[ContextEntity] = Field( + default=None, + description='get an NGSI-v2 normalized entity as notification.If omitted, ' + 'the default payload (see "Notification Messages" sections) is used.' + ) + + @model_validator(mode='after') + def validate_payload_type(self): + assert len([v for k, v in self.model_dump().items() + if((v is not None) and (k in ['payload','ngsi','json']))]) <= 1 + return self class Mqtt(BaseModel): @@ -124,6 +141,21 @@ class MqttCustom(Mqtt): 'default payload (see "Notification Messages" sections) ' 'is used.' ) + json: Optional[Dict[str,Any]] = Field( + default=None, + description='get a json as notification. If omitted, the default' + 'payload (see "Notification Messages" sections) is used.' + ) + ngsi:Optional[ContextEntity] = Field( + default=None, + description='get an NGSI-v2 normalized entity as notification.If omitted, ' + 'the default payload (see "Notification Messages" sections) is used.' + ) + @model_validator(mode='after') + def validate_payload_type(self): + assert len([v for k, v in self.model_dump().items() + if((v is not None) and (k in ['payload','ngsi','json']))]) <= 1 + return self class Notification(BaseModel): @@ -205,12 +237,6 @@ class Notification(BaseModel): '[A=0, B=null, C=null]. This ' ) - @field_validator('httpCustom') - def validate_http(cls, http_custom, values): - if http_custom is not None: - assert values['http'] is None - return http_custom - @field_validator('exceptAttrs') def validate_attr(cls, except_attrs, values): if except_attrs is not None: From a858383a2cba1926f838f01208d682690d9eaf3e Mon Sep 17 00:00:00 2001 From: SystemsPurge Date: Fri, 28 Jun 2024 15:54:19 +0200 Subject: [PATCH 4/8] Added models for validation of ngsi patched payload type --- filip/models/ngsi_v2/subscriptions.py | 62 ++++++++++++++++++++++++--- 1 file changed, 57 insertions(+), 5 deletions(-) diff --git a/filip/models/ngsi_v2/subscriptions.py b/filip/models/ngsi_v2/subscriptions.py index f30ba255..b06365cc 100644 --- a/filip/models/ngsi_v2/subscriptions.py +++ b/filip/models/ngsi_v2/subscriptions.py @@ -9,14 +9,66 @@ field_validator, model_validator, ConfigDict, BaseModel, \ conint, \ Field, \ - Json, \ - AliasChoices + Json from .base import AttrsFormat, EntityPattern, Http, Status, Expression -from filip.utils.validators import validate_mqtt_url, validate_mqtt_topic +from filip.utils.validators import ( + validate_mqtt_url, + validate_mqtt_topic +) from filip.models.ngsi_v2.context import ContextEntity +from filip.models.ngsi_v2.base import ( + EntityPattern, + Expression, + BaseValueAttribute +) from filip.custom_types import AnyMqttUrl +class NotificationAttr(BaseValueAttribute): + """ + Model for NGSI V2 type payload in httpCustom/mqttCustom notifications. + The difference between this model and the usual BaseValueAttribute model is that + a metadata field is not allowed. + In the absence of type/value in some attribute field, one should resort to partial + representations ( as specified in the orion api manual), done by the BaseValueAttr. + model. + """ + model_config=ConfigDict(extra="forbid") + +class NotificationEntity(BaseModel): + """ + Model for NGSI V2 type payload in httpCustom/mqttCustom notifications. + Differences between this model and the usual Context entity models include: + - id and type are not mandatory + - a metadata Attribute field is not allowed + In the absence of type/value in some attribute field, one should resort to partial + representations ( as specified in the orion api manual), done by the BaseValueAttr. + model. + """ + model_config = ConfigDict( + extra="allow", validate_default=True + ) + id: Optional[str] = Field( + default=None, + max_length=256, + min_length=1, + frozen=True + ) + type: Optional[Union[str, Enum]] = Field( + default=None, + max_length=256, + min_length=1, + frozen=True, + ) + + @model_validator(mode='after') + def validate_notification_attrs(self): + for v in self.model_dump(exclude={"id","type"}).values(): + assert isinstance(NotificationAttr.model_validate(v),NotificationAttr) + return self + + + class Message(BaseModel): """ Model for a notification message, when sent to other NGSIv2-APIs @@ -75,7 +127,7 @@ class HttpCustom(Http): description='get a json as notification. If omitted, the default' 'payload (see "Notification Messages" sections) is used.' ) - ngsi:Optional[ContextEntity] = Field( + ngsi:Optional[NotificationEntity] = Field( default=None, description='get an NGSI-v2 normalized entity as notification.If omitted, ' 'the default payload (see "Notification Messages" sections) is used.' @@ -146,7 +198,7 @@ class MqttCustom(Mqtt): description='get a json as notification. If omitted, the default' 'payload (see "Notification Messages" sections) is used.' ) - ngsi:Optional[ContextEntity] = Field( + ngsi:Optional[NotificationEntity] = Field( default=None, description='get an NGSI-v2 normalized entity as notification.If omitted, ' 'the default payload (see "Notification Messages" sections) is used.' From 6c1aa85c754ba8cc3f9103369e348dbb8bba1a5c Mon Sep 17 00:00:00 2001 From: SystemsPurge Date: Mon, 1 Jul 2024 11:11:53 +0200 Subject: [PATCH 5/8] Added tests for validation of notification payload models --- filip/models/ngsi_v2/subscriptions.py | 15 ++++++------- tests/models/test_ngsi_v2_subscriptions.py | 25 +++++++++++++++++++++- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/filip/models/ngsi_v2/subscriptions.py b/filip/models/ngsi_v2/subscriptions.py index b06365cc..c27bcf65 100644 --- a/filip/models/ngsi_v2/subscriptions.py +++ b/filip/models/ngsi_v2/subscriptions.py @@ -24,7 +24,7 @@ from filip.custom_types import AnyMqttUrl -class NotificationAttr(BaseValueAttribute): +class NgsiPayloadAttr(BaseValueAttribute): """ Model for NGSI V2 type payload in httpCustom/mqttCustom notifications. The difference between this model and the usual BaseValueAttribute model is that @@ -35,15 +35,12 @@ class NotificationAttr(BaseValueAttribute): """ model_config=ConfigDict(extra="forbid") -class NotificationEntity(BaseModel): +class NgsiPayload(BaseModel): """ Model for NGSI V2 type payload in httpCustom/mqttCustom notifications. Differences between this model and the usual Context entity models include: - id and type are not mandatory - - a metadata Attribute field is not allowed - In the absence of type/value in some attribute field, one should resort to partial - representations ( as specified in the orion api manual), done by the BaseValueAttr. - model. + - an attribute metadata field is not allowed """ model_config = ConfigDict( extra="allow", validate_default=True @@ -64,7 +61,7 @@ class NotificationEntity(BaseModel): @model_validator(mode='after') def validate_notification_attrs(self): for v in self.model_dump(exclude={"id","type"}).values(): - assert isinstance(NotificationAttr.model_validate(v),NotificationAttr) + assert isinstance(NgsiPayloadAttr.model_validate(v),NgsiPayloadAttr) return self @@ -127,7 +124,7 @@ class HttpCustom(Http): description='get a json as notification. If omitted, the default' 'payload (see "Notification Messages" sections) is used.' ) - ngsi:Optional[NotificationEntity] = Field( + ngsi:Optional[NgsiPayload] = Field( default=None, description='get an NGSI-v2 normalized entity as notification.If omitted, ' 'the default payload (see "Notification Messages" sections) is used.' @@ -198,7 +195,7 @@ class MqttCustom(Mqtt): description='get a json as notification. If omitted, the default' 'payload (see "Notification Messages" sections) is used.' ) - ngsi:Optional[NotificationEntity] = Field( + ngsi:Optional[NgsiPayload] = Field( default=None, description='get an NGSI-v2 normalized entity as notification.If omitted, ' 'the default payload (see "Notification Messages" sections) is used.' diff --git a/tests/models/test_ngsi_v2_subscriptions.py b/tests/models/test_ngsi_v2_subscriptions.py index 6e9d453a..2393c6ef 100644 --- a/tests/models/test_ngsi_v2_subscriptions.py +++ b/tests/models/test_ngsi_v2_subscriptions.py @@ -12,7 +12,9 @@ Mqtt, \ MqttCustom, \ Notification, \ - Subscription + Subscription, \ + NgsiPayload, \ + NgsiPayloadAttr from filip.models.base import FiwareHeader from filip.utils.cleanup import clear_all, clean_test from tests.config import settings @@ -104,6 +106,27 @@ def test_notification_models(self): notification.mqtt = mqtt with self.assertRaises(ValidationError): notification.mqtt = mqttCustom + with self.assertRaises(ValidationError): + HttpCustom(url=self.http_url,json={},payload="") + with self.assertRaises(ValidationError): + MqttCustom(url=self.mqtt_url, + topic=self.mqtt_topic,ngsi=NgsiPayload(),payload="") + with self.assertRaises(ValidationError): + HttpCustom(url=self.http_url,ngsi=NgsiPayload(),json="") + + #Test validator for ngsi payload type + with self.assertRaises(ValidationError): + attr_dict={ + "metadata":{} + } + NgsiPayloadAttr(**attr_dict) + with self.assertRaises(ValidationError): + attr_dict={ + "id":"entityId", + "type":"entityType", + "k":"v" + } + NgsiPayload(NgsiPayloadAttr(**attr_dict),id="someId",type="someType") # test onlyChangedAttrs-field notification = Notification.model_validate(self.notification) From 573b5a64c8981adae65ce3c97b1a693cd3cd3c55 Mon Sep 17 00:00:00 2001 From: SystemsPurge Date: Tue, 2 Jul 2024 14:02:11 +0200 Subject: [PATCH 6/8] Added test for comparaison of posted and retrieved subscription objects --- tests/models/test_ngsi_v2_subscriptions.py | 62 +++++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) diff --git a/tests/models/test_ngsi_v2_subscriptions.py b/tests/models/test_ngsi_v2_subscriptions.py index 2393c6ef..133b13bd 100644 --- a/tests/models/test_ngsi_v2_subscriptions.py +++ b/tests/models/test_ngsi_v2_subscriptions.py @@ -145,7 +145,8 @@ def test_subscription_models(self) -> None: Returns: None """ - sub = Subscription.model_validate(self.sub_dict) + tmp_dict=self.sub_dict.copy() + sub = Subscription.model_validate(tmp_dict) fiware_header = FiwareHeader(service=settings.FIWARE_SERVICE, service_path=settings.FIWARE_SERVICEPATH) with ContextBrokerClient( @@ -163,6 +164,65 @@ def compare_dicts(dict1: dict, dict2: dict): compare_dicts(sub.model_dump(exclude={'id'}), sub_res.model_dump(exclude={'id'})) + + tmp_dict.update({"notification":{ + "httpCustom": { + "url": "http://localhost:1234", + "ngsi":{ + "patchattr":{ + "value":"${temperature/2}", + "type":"Calculated" + } + }, + "method":"POST" + }, + "attrs": [ + "temperature", + "humidity" + ] + }}) + sub = Subscription.model_validate(tmp_dict) + sub_id = client.post_subscription(subscription=sub) + sub_res = client.get_subscription(subscription_id=sub_id) + compare_dicts(sub.model_dump(exclude={'id'}), + sub_res.model_dump(exclude={'id'})) + + tmp_dict.update({"notification":{ + "httpCustom": { + "url": "http://localhost:1234", + "json":{ + "t":"${temperate}", + "h":"${humidity}" + }, + "method":"POST" + }, + "attrs": [ + "temperature", + "humidity" + ] + }}) + sub = Subscription.model_validate(tmp_dict) + sub_id = client.post_subscription(subscription=sub) + sub_res = client.get_subscription(subscription_id=sub_id) + compare_dicts(sub.model_dump(exclude={'id'}), + sub_res.model_dump(exclude={'id'})) + + tmp_dict.update({"notification":{ + "httpCustom": { + "url": "http://localhost:1234", + "payload":"Temperature is ${temperature} and humidity ${humidity}", + "method":"POST" + }, + "attrs": [ + "temperature", + "humidity" + ] + }}) + sub = Subscription.model_validate(tmp_dict) + sub_id = client.post_subscription(subscription=sub) + sub_res = client.get_subscription(subscription_id=sub_id) + compare_dicts(sub.model_dump(exclude={'id'}), + sub_res.model_dump(exclude={'id'})) # test validation of throttling with self.assertRaises(ValidationError): From 35018c3305b7a724ab6806ac6574384781c4df12 Mon Sep 17 00:00:00 2001 From: JunsongDu Date: Wed, 3 Jul 2024 09:17:31 +0200 Subject: [PATCH 7/8] docs: adapt doc strings length --- filip/models/ngsi_v2/subscriptions.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/filip/models/ngsi_v2/subscriptions.py b/filip/models/ngsi_v2/subscriptions.py index b82718a1..0fc712b1 100644 --- a/filip/models/ngsi_v2/subscriptions.py +++ b/filip/models/ngsi_v2/subscriptions.py @@ -136,10 +136,11 @@ class HttpCustom(Http): ) timeout: Optional[int] = Field( default=None, - description="Maximum time (in milliseconds) the subscription waits for the response. The maximum value allowed " - "for this parameter is 1800000 (30 minutes). If timeout is defined to 0 or omitted, then the value " - "passed as -httpTimeout CLI parameter is used. See section in the 'Command line options' for more " - "details." + description="Maximum time (in milliseconds) the subscription waits for the " + "response. The maximum value allowed for this parameter is 1800000 " + "(30 minutes). If timeout is defined to 0 or omitted, then the value " + "passed as -httpTimeout CLI parameter is used. See section in the " + "'Command line options' for more details." ) @model_validator(mode='after') @@ -148,7 +149,8 @@ def validate_notification_payloads(self): filled_fields = [field for field in fields if field is not None] if len(filled_fields) > 1: - raise ValueError("Only one of payload, json or ngsi fields accepted at the same time in httpCustom.") + raise ValueError("Only one of payload, json or ngsi fields accepted at the " + "same time in httpCustom.") return self From 85abe6f5a34f1ceaa892db1032f87d078b622f63 Mon Sep 17 00:00:00 2001 From: "j.azzam" Date: Mon, 15 Jul 2024 11:24:18 +0200 Subject: [PATCH 8/8] test: Adding tests for the mqtt custom notifications, that use payload, json and ngsi messages --- filip/models/ngsi_v2/subscriptions.py | 19 ++-- tests/clients/test_ngsi_v2_cb.py | 143 +++++++++++++++++++++++++- 2 files changed, 152 insertions(+), 10 deletions(-) diff --git a/filip/models/ngsi_v2/subscriptions.py b/filip/models/ngsi_v2/subscriptions.py index 0fc712b1..78101eb6 100644 --- a/filip/models/ngsi_v2/subscriptions.py +++ b/filip/models/ngsi_v2/subscriptions.py @@ -27,6 +27,8 @@ # The pydantic models still have a .json() function, but this method is deprecated. warnings.filterwarnings("ignore", category=UserWarning, message='Field name "json" shadows an attribute in parent "Http"') +warnings.filterwarnings("ignore", category=UserWarning, + message='Field name "json" shadows an attribute in parent "Mqtt"') class NgsiPayloadAttr(BaseValueAttribute): @@ -38,7 +40,8 @@ class NgsiPayloadAttr(BaseValueAttribute): representations ( as specified in the orion api manual), done by the BaseValueAttr. model. """ - model_config=ConfigDict(extra="forbid") + model_config = ConfigDict(extra="forbid") + class NgsiPayload(BaseModel): """ @@ -65,12 +68,11 @@ class NgsiPayload(BaseModel): @model_validator(mode='after') def validate_notification_attrs(self): - for v in self.model_dump(exclude={"id","type"}).values(): - assert isinstance(NgsiPayloadAttr.model_validate(v),NgsiPayloadAttr) + for v in self.model_dump(exclude={"id", "type"}).values(): + assert isinstance(NgsiPayloadAttr.model_validate(v), NgsiPayloadAttr) return self - class Message(BaseModel): """ Model for a notification message, when sent to other NGSIv2-APIs @@ -129,7 +131,7 @@ class HttpCustom(Http): description='get a json as notification. If omitted, the default' 'payload (see "Notification Messages" sections) is used.' ) - ngsi:Optional[NgsiPayload] = Field( + ngsi: Optional[NgsiPayload] = Field( default=None, description='get an NGSI-v2 normalized entity as notification.If omitted, ' 'the default payload (see "Notification Messages" sections) is used.' @@ -208,20 +210,21 @@ class MqttCustom(Mqtt): 'default payload (see "Notification Messages" sections) ' 'is used.' ) - json: Optional[Dict[str,Any]] = Field( + json: Optional[Dict[str, Any]] = Field( default=None, description='get a json as notification. If omitted, the default' 'payload (see "Notification Messages" sections) is used.' ) - ngsi:Optional[NgsiPayload] = Field( + ngsi: Optional[NgsiPayload] = Field( default=None, description='get an NGSI-v2 normalized entity as notification.If omitted, ' 'the default payload (see "Notification Messages" sections) is used.' ) + @model_validator(mode='after') def validate_payload_type(self): assert len([v for k, v in self.model_dump().items() - if((v is not None) and (k in ['payload','ngsi','json']))]) <= 1 + if ((v is not None) and (k in ['payload', 'ngsi', 'json']))]) <= 1 return self diff --git a/tests/clients/test_ngsi_v2_cb.py b/tests/clients/test_ngsi_v2_cb.py index a4b7fbf9..3cae6b43 100644 --- a/tests/clients/test_ngsi_v2_cb.py +++ b/tests/clients/test_ngsi_v2_cb.py @@ -910,7 +910,91 @@ def test_notification(self): "throttling": 0 }) + mqtt_custom_topic = "notification/custom" + sub_with_mqtt_custom_notification_payload = Subscription.model_validate({ + "description": "Test mqtt custom notification with payload message", + "subject": { + "entities": [ + { + "id": "Test:001", + "type": "Test" + } + ] + }, + "notification": { + "mqttCustom": { + "url": mqtt_url_internal, + "topic": mqtt_custom_topic, + "payload": "The value of the %22temperature%22 attribute %28of the device ${id}, ${type}%29 is" + " ${temperature}. Humidity is ${humidity} and CO2 is ${co2}." + }, + "attrs": ["temperature", "humidity", "co2"], + "onlyChangedAttrs": False + }, + "expires": datetime.now() + timedelta(days=1), + "throttling": 0 + }) + + sub_with_mqtt_custom_notification_json = Subscription.model_validate({ + "description": "Test mqtt custom notification with json message", + "subject": { + "entities": [ + { + "id": "Test:001", + "type": "Test" + } + ] + }, + "notification": { + "mqttCustom": { + "url": mqtt_url_internal, + "topic": mqtt_custom_topic, + "json": { + "t": "${temperature}", + "h": "${humidity}", + "c": "${co2}" + } + }, + "attrs": ["temperature", "humidity", "co2"], + "onlyChangedAttrs": False + }, + "expires": datetime.now() + timedelta(days=1), + "throttling": 0 + }) + + sub_with_mqtt_custom_notification_ngsi = Subscription.model_validate({ + "description": "Test mqtt custom notification with ngsi message", + "subject": { + "entities": [ + { + "id": "Test:001", + "type": "Test" + } + ] + }, + "notification": { + "mqttCustom": { + "url": mqtt_url_internal, + "topic": mqtt_custom_topic, + "ngsi": { + "id": "prefix:${id}", + "type": "newType", + "temperature": { + "value": 123, + "type": "Number" + }, + + } + }, + "attrs": ["temperature", "humidity", "co2"], + "onlyChangedAttrs": False + }, + "expires": datetime.now() + timedelta(days=1), + "throttling": 0 + }) + # MQTT settings + custom_sub_message = None sub_message = None sub_messages = {} @@ -923,6 +1007,7 @@ def on_connect(client, userdata, flags, reasonCode, properties=None): logger.info("Successfully, connected with result code " + str( reasonCode)) client.subscribe(mqtt_topic) + client.subscribe(mqtt_custom_topic) def on_subscribe(client, userdata, mid, granted_qos, properties=None): logger.info("Successfully subscribed to with QoS: %s", granted_qos) @@ -931,8 +1016,12 @@ def on_message(client, userdata, msg): logger.info("Received MQTT message: " + msg.topic + " " + str( msg.payload)) nonlocal sub_message - sub_message = Message.model_validate_json(msg.payload) - sub_messages[sub_message.subscriptionId] = sub_message + nonlocal custom_sub_message + if msg.topic == mqtt_topic: + sub_message = Message.model_validate_json(msg.payload) + sub_messages[sub_message.subscriptionId] = sub_message + elif msg.topic == mqtt_custom_topic: + custom_sub_message = msg.payload def on_disconnect(client, userdata, flags, reasonCode, properties=None): logger.info("MQTT client disconnected with reasonCode " @@ -1027,6 +1116,56 @@ def on_disconnect(client, userdata, flags, reasonCode, properties=None): sub_messages[sub_id_3].data[0].get_attribute( "temperature").value, 10) + # test4 notification with mqtt custom notification (payload) + sub_id_4 = client.post_subscription( + subscription=sub_with_mqtt_custom_notification_payload) + time.sleep(1) + client.update_attribute_value(entity_id=entity.id, + attr_name="temperature", + value=44 + ) + time.sleep(1) + sub_4 = client.get_subscription(sub_id_4) + self.assertEqual(first=custom_sub_message, + second=b'The value of the "temperature" attribute (of the device Test:001, Test) is 44. ' + b'Humidity is 20 and CO2 is 30.') + self.assertEqual(sub_4.notification.timesSent, 1) + client.delete_subscription(sub_id_4) + + # test5 notification with mqtt custom notification (json) + sub_id_5 = client.post_subscription( + subscription=sub_with_mqtt_custom_notification_json) + time.sleep(1) + client.update_attribute_value(entity_id=entity.id, + attr_name="humidity", + value=67 + ) + time.sleep(1) + sub_5 = client.get_subscription(sub_id_5) + self.assertEqual(first=custom_sub_message, + second=b'{"t":44,"h":67,"c":30}') + self.assertEqual(sub_5.notification.timesSent, 1) + client.delete_subscription(sub_id_5) + + # test6 notification with mqtt custom notification (ngsi) + sub_id_6 = client.post_subscription( + subscription=sub_with_mqtt_custom_notification_ngsi) + time.sleep(1) + client.update_attribute_value(entity_id=entity.id, + attr_name="co2", + value=78 + ) + time.sleep(1) + sub_6 = client.get_subscription(sub_id_6) + sub_message = Message.model_validate_json(custom_sub_message) + self.assertEqual(sub_6.notification.timesSent, 1) + self.assertEqual(len(sub_message.data[0].get_attributes()), 3) + self.assertEqual(sub_message.data[0].id, "prefix:Test:001") + self.assertEqual(sub_message.data[0].type, "newType") + self.assertEqual(sub_message.data[0].get_attribute("co2").value, 78) + self.assertEqual(sub_message.data[0].get_attribute("temperature").value, 123) + client.delete_subscription(sub_id_6) + @clean_test(fiware_service=settings.FIWARE_SERVICE, fiware_servicepath=settings.FIWARE_SERVICEPATH, cb_url=settings.CB_URL)