From 6a8f500546b59b7a7b5719352d986633ea2d8503 Mon Sep 17 00:00:00 2001 From: xuzhenbao <xuzhenbao133@163.com> Date: Thu, 10 Oct 2024 16:21:20 +0800 Subject: [PATCH] Add message version verification for remote_provider_mqtt --- bundles/event_admin/examples/CMakeLists.txt | 4 - .../remote_provider_mqtt/README.md | 2 +- ...CelixEarpmClientErrorInjectionTestSuite.cc | 7 + .../gtest/src/CelixEarpmClientTestSuite.cc | 2 + .../src/CelixEarpmClientTestSuiteBaseClass.h | 3 + .../CelixEarpmImplErrorInjectionTestSuite.cc | 107 +++------ .../gtest/src/CelixEarpmImplTestSuite.cc | 221 +++++++++--------- .../src/CelixEarpmImplTestSuiteBaseClass.h | 29 ++- .../src/CelixEarpmIntegrationTestSuite.cc | 4 +- .../src/celix_earpm_broker_discovery.c | 6 +- .../src/celix_earpm_client.c | 11 +- .../src/celix_earpm_client.h | 1 + .../src/celix_earpm_constants.h | 6 +- .../src/celix_earpm_impl.c | 40 +++- 14 files changed, 235 insertions(+), 208 deletions(-) diff --git a/bundles/event_admin/examples/CMakeLists.txt b/bundles/event_admin/examples/CMakeLists.txt index 5a4b63241..3672fb76d 100644 --- a/bundles/event_admin/examples/CMakeLists.txt +++ b/bundles/event_admin/examples/CMakeLists.txt @@ -36,7 +36,6 @@ if (EVENT_ADMIN_EXAMPLES) if (TARGET Celix::rsa_discovery_zeroconf)#Celix::rsa_discovery_zeroconf only available in linux add_celix_container(remote_event_admin_mqtt_publisher - LAUNCHER Celix::launcher NAME "publisher" GROUP "event_admin/mqtt" BUNDLES @@ -47,13 +46,11 @@ if (EVENT_ADMIN_EXAMPLES) event_publisher_example Celix::rsa_discovery_zeroconf Celix::event_admin_remote_provider_mqtt - USE_CONFIG PROPERTIES CELIX_EARPM_BROKER_PROFILE=${CMAKE_CURRENT_SOURCE_DIR}/res/mosquitto.conf ) add_celix_container(remote_event_admin_mqtt_subscriber - LAUNCHER Celix::launcher NAME "subscriber" GROUP "event_admin/mqtt" BUNDLES @@ -64,7 +61,6 @@ if (EVENT_ADMIN_EXAMPLES) event_handler_example Celix::rsa_discovery_zeroconf Celix::event_admin_remote_provider_mqtt - USE_CONFIG ) endif () diff --git a/bundles/event_admin/remote_provider/remote_provider_mqtt/README.md b/bundles/event_admin/remote_provider/remote_provider_mqtt/README.md index e5eb0b299..e34a82e37 100644 --- a/bundles/event_admin/remote_provider/remote_provider_mqtt/README.md +++ b/bundles/event_admin/remote_provider/remote_provider_mqtt/README.md @@ -123,5 +123,5 @@ When the MQTT connection is disconnected, `event_admin_remote_provider_mqtt` wil See the cmake target `remote_event_admin_mqtt_publisher` and `remote_event_admin_mqtt_subscriber` in the `event_admin/examples` directory. -Note: Before running the example, make sure the `mosquitto broker` and the `mdnsd` are running. You can get `mosquitto` from [here](https://github.com/eclipse/mosquitto) and `mdnsd` from [here](https://github.com/apple-oss-distributions/mDNSResponder). +Note: Before running the example, make sure the `mosquitto broker` and the `mdnsd` are running. You can get `mosquitto` from [here](https://github.com/eclipse/mosquitto) and `mdnsd` from [here](https://github.com/apple-oss-distributions/mDNSResponder). And you should use command `mosquitto -c <profile>` to start the `mosquitto broker`. The profile of the `mosquitto broker` can be got from [here](../../examples/res/mosquitto.conf). diff --git a/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmClientErrorInjectionTestSuite.cc b/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmClientErrorInjectionTestSuite.cc index 4e01842a4..2e6e44d91 100644 --- a/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmClientErrorInjectionTestSuite.cc +++ b/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmClientErrorInjectionTestSuite.cc @@ -173,6 +173,13 @@ TEST_F(CelixEarpmClientErrorInjectionTestSuite, FailedToAddSenderUUIDToWillMessa ASSERT_EQ(nullptr, client); } +TEST_F(CelixEarpmClientErrorInjectionTestSuite, FailedToAddMsgVersionToWillMessagePropertyTest) { + celix_ei_expect_mosquitto_property_add_string_pair((void*)&celix_earpmClient_create, 1, MOSQ_ERR_NOMEM, 2); + celix_earpm_client_create_options_t opts{defaultOpts}; + celix_earpm_client_t *client = celix_earpmClient_create(&opts); + ASSERT_EQ(nullptr, client); +} + TEST_F(CelixEarpmClientErrorInjectionTestSuite, FailedToSetWillMessageTest) { celix_ei_expect_mosquitto_will_set_v5((void*)&celix_earpmClient_create, 1, MOSQ_ERR_NOMEM); celix_earpm_client_create_options_t opts{defaultOpts}; diff --git a/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmClientTestSuite.cc b/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmClientTestSuite.cc index c86de4a4c..9721fd932 100644 --- a/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmClientTestSuite.cc +++ b/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmClientTestSuite.cc @@ -18,6 +18,8 @@ */ #include <cstring> #include <string> +#include <csignal> +#include <sys/wait.h> #include <unistd.h> #include <cstdlib> #include <functional> diff --git a/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmClientTestSuiteBaseClass.h b/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmClientTestSuiteBaseClass.h index 72adb408c..e56568c1e 100644 --- a/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmClientTestSuiteBaseClass.h +++ b/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmClientTestSuiteBaseClass.h @@ -22,6 +22,8 @@ #include <cstring> #include <string> +#include <csignal> +#include <sys/wait.h> #include <unistd.h> #include <cstdlib> #include <functional> @@ -91,6 +93,7 @@ class CelixEarpmClientTestSuiteBaseClass : public CelixEarpmTestSuiteBaseClass { defaultOpts.logHelper = logHelper.get(); defaultOpts.sessionEndMsgTopic = CELIX_EARPM_SESSION_END_TOPIC; defaultOpts.sessionEndMsgSenderUUID = fwUUID.c_str(); + defaultOpts.sessionEndMsgVersion = "1.0.0"; defaultOpts.callbackHandle = nullptr; defaultOpts.receiveMsgCallback = [](void*, const celix_earpm_client_request_info_t*) {}; defaultOpts.connectedCallback = [](void*) {}; diff --git a/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmImplErrorInjectionTestSuite.cc b/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmImplErrorInjectionTestSuite.cc index 9a86a1fa9..344d9520a 100644 --- a/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmImplErrorInjectionTestSuite.cc +++ b/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmImplErrorInjectionTestSuite.cc @@ -157,6 +157,8 @@ class CelixEarpmImplErrorInjectionTestSuite : public CelixEarpmImplTestSuiteBase long correlationData {0}; rc = mosquitto_property_add_binary(&mqttProps, MQTT_PROP_CORRELATION_DATA, &correlationData, sizeof(correlationData)); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + rc = mosquitto_property_add_string_pair(&mqttProps, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_MSG_VERSION", "1.0.0"); + EXPECT_EQ(rc, MOSQ_ERR_SUCCESS); rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, "syncEvent", (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_LEAST_ONCE, true, mqttProps); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); @@ -702,11 +704,9 @@ TEST_F(CelixEarpmImplErrorInjectionTestSuite, FailedToAllocMemoryForRemoteFramew TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { celix_ei_expect_calloc(CELIX_EI_UNKNOWN_CALLER, 0, nullptr); - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"({"handler":{"handlerId":123, "topics":["topic"]}})"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); @@ -719,11 +719,9 @@ TEST_F(CelixEarpmImplErrorInjectionTestSuite, FailedToCreateEventAckMapForRemote TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { celix_ei_expect_celix_longHashMap_createWithOptions(CELIX_EI_UNKNOWN_CALLER, 0, nullptr); - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"({"handler":{"handlerId":123, "topics":["topic"]}})"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); @@ -736,11 +734,9 @@ TEST_F(CelixEarpmImplErrorInjectionTestSuite, FailedToCreateHandlerInfoMapForRem TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { celix_ei_expect_celix_longHashMap_createWithOptions(CELIX_EI_UNKNOWN_CALLER, 0, nullptr, 2); - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"({"handler":{"handlerId":123, "topics":["topic"]}})"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); @@ -753,11 +749,9 @@ TEST_F(CelixEarpmImplErrorInjectionTestSuite, FailedToAddRemoteFrameworkInfoToRe TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { celix_ei_expect_celix_stringHashMap_put(CELIX_EI_UNKNOWN_CALLER, 0, ENOMEM); - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"({"handler":{"handlerId":123, "topics":["topic"]}})"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); @@ -770,11 +764,9 @@ TEST_F(CelixEarpmImplErrorInjectionTestSuite, FailedToAllocMemoryForRemoteHandle TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { celix_ei_expect_calloc(CELIX_EI_UNKNOWN_CALLER, 0, nullptr, 2); - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"({"handler":{"handlerId":123, "topics":["topic"]}})"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); @@ -787,11 +779,9 @@ TEST_F(CelixEarpmImplErrorInjectionTestSuite, FailedToCreateTopicListForRemoteHa TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { celix_ei_expect_celix_arrayList_createStringArray(CELIX_EI_UNKNOWN_CALLER, 0, nullptr); - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"({"handler":{"handlerId":123, "topics":["topic"]}})"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); @@ -804,11 +794,9 @@ TEST_F(CelixEarpmImplErrorInjectionTestSuite, FailedToAddTopicToRemoteHandlerInf TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { celix_ei_expect_celix_arrayList_addString(CELIX_EI_UNKNOWN_CALLER, 0, ENOMEM); - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"({"handler":{"handlerId":123, "topics":["topic"]}})"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); @@ -821,11 +809,9 @@ TEST_F(CelixEarpmImplErrorInjectionTestSuite, FailedToCreateFilterForRemoteHandl TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { celix_ei_expect_celix_filter_create(CELIX_EI_UNKNOWN_CALLER, 0, nullptr); - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"s({"handler":{"handlerId":123, "topics":["topic"], "filter":"(a=b)"}})s"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); @@ -838,11 +824,9 @@ TEST_F(CelixEarpmImplErrorInjectionTestSuite, FailedToAddRemoteHandlerInfoToRemo TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { celix_ei_expect_celix_longHashMap_put(CELIX_EI_UNKNOWN_CALLER, 0, ENOMEM); - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"s({"handler":{"handlerId":123, "topics":["topic"], "filter":"(a=b)"}})s"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); @@ -855,11 +839,9 @@ TEST_F(CelixEarpmImplErrorInjectionTestSuite, FailedToCreateRemoteFrameworkInfoW TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { celix_ei_expect_calloc(CELIX_EI_UNKNOWN_CALLER, 0, nullptr); - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"({"handlers":[{"handlerId":123, "topics":["topic"]}]})"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_UPDATE_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_UPDATE_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); @@ -872,10 +854,8 @@ TEST_F(CelixEarpmImplErrorInjectionTestSuite, FailedToCreateHandlerInfoUpdateMes TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { celix_ei_expect_json_object(CELIX_EI_UNKNOWN_CALLER, 0, nullptr); - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_QUERY_TOPIC, + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_QUERY_TOPIC, 0, nullptr, CELIX_EARPM_QOS_AT_LEAST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); @@ -888,10 +868,8 @@ TEST_F(CelixEarpmImplErrorInjectionTestSuite, FailedToCreateHandlerInfoArrayWhen TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { celix_ei_expect_json_array(CELIX_EI_UNKNOWN_CALLER, 0, nullptr); - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_QUERY_TOPIC, + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_QUERY_TOPIC, 0, nullptr, CELIX_EARPM_QOS_AT_LEAST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); @@ -904,10 +882,8 @@ TEST_F(CelixEarpmImplErrorInjectionTestSuite, FailedToAddHandlersInfoToHandlerIn TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { celix_ei_expect_json_object_set_new(CELIX_EI_UNKNOWN_CALLER, 0, -1); - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_QUERY_TOPIC, + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_QUERY_TOPIC, 0, nullptr, CELIX_EARPM_QOS_AT_LEAST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); @@ -920,10 +896,8 @@ TEST_F(CelixEarpmImplErrorInjectionTestSuite, FailedToDumpHandlerInfoUpdateMessa TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { celix_ei_expect_json_dumps(CELIX_EI_UNKNOWN_CALLER, 0, nullptr); - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_QUERY_TOPIC, + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_QUERY_TOPIC, 0, nullptr, CELIX_EARPM_QOS_AT_LEAST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); @@ -946,10 +920,8 @@ TEST_F(CelixEarpmImplErrorInjectionTestSuite, FailedToGenerateHandlerInfoWhenPro celix_ei_expect_json_pack_ex(CELIX_EI_UNKNOWN_CALLER, 0, nullptr); - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_QUERY_TOPIC, + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_QUERY_TOPIC, 0, nullptr, CELIX_EARPM_QOS_AT_LEAST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); @@ -972,10 +944,8 @@ TEST_F(CelixEarpmImplErrorInjectionTestSuite, FailedToAddHandlerInfoToHandlerInf celix_ei_expect_json_array_append_new(CELIX_EI_UNKNOWN_CALLER, 0, -1, 2); - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_QUERY_TOPIC, + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_QUERY_TOPIC, 0, nullptr, CELIX_EARPM_QOS_AT_LEAST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); @@ -988,10 +958,8 @@ TEST_F(CelixEarpmImplErrorInjectionTestSuite, FailedToPublishHandlerInfoUpdateMe TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { celix_ei_expect_mosquitto_publish_v5((void*)&celix_earpmClient_publishAsync, 2, -1); - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_QUERY_TOPIC, + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_QUERY_TOPIC, 0, nullptr, CELIX_EARPM_QOS_AT_LEAST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); std::string expectLog = "Failed to publish " + std::string(CELIX_EARPM_HANDLER_INFO_UPDATE_TOPIC); @@ -1082,8 +1050,9 @@ TEST_F(CelixEarpmImplErrorInjectionTestSuite, FailedToUnserializeAsyncEvent) { celix_autofree char* payload = nullptr; status = celix_properties_saveToString(eventProps, 0, &payload); ASSERT_EQ(status, CELIX_SUCCESS); + celix_autoptr(mosquitto_property) mqttProperties = CreateMqttProperties(); auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, "asyncEvent", - (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_LEAST_ONCE, true, nullptr); + (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_LEAST_ONCE, true, mqttProperties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); auto ok = WaitForLogMessage("Failed to load event properties for asyncEvent."); diff --git a/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmImplTestSuite.cc b/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmImplTestSuite.cc index 232792544..d1cbc74ad 100644 --- a/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmImplTestSuite.cc +++ b/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmImplTestSuite.cc @@ -247,11 +247,9 @@ TEST_F(CelixEarpmImplTestSuite, ProcessRemoveRemoteHandlerInfoMessageTest) { TEST_F(CelixEarpmImplTestSuite, ProcessUpdateRemoteHandlerInfoMessageTest) { TestRemoteProvider([](celix_event_admin_remote_provider_mqtt_t* earpm) { - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payLoad = R"({"handlers":[{"handlerId":123,"topics":["topic"]}]})"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_UPDATE_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_UPDATE_TOPIC, (int)strlen(payLoad), payLoad, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); auto ok = WaitFor([earpm] { return celix_earpm_currentRemoteFrameworkCount(earpm) != 0; }); @@ -270,10 +268,8 @@ TEST_F(CelixEarpmImplTestSuite, ProcessUpdateRemoteHandlerInfoMessageTest) { TEST_F(CelixEarpmImplTestSuite, ProcessQueryRemoteHandlerInfoMessageTest) { TestRemoteProvider([](celix_event_admin_remote_provider_mqtt_t*) { mqttClient->Reset();//clear received messages cache - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_QUERY_TOPIC, + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_QUERY_TOPIC, 0, nullptr, CELIX_EARPM_QOS_AT_LEAST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); //The remote provider will send the handler description message when received the query message @@ -284,10 +280,8 @@ TEST_F(CelixEarpmImplTestSuite, ProcessQueryRemoteHandlerInfoMessageTest) { TEST_F(CelixEarpmImplTestSuite, ProcessUnknownRemoteHandlerInfoControlMessageTest) { TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_TOPIC_PREFIX"unknown", + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_TOPIC_PREFIX"unknown", 0, nullptr, CELIX_EARPM_QOS_AT_LEAST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); auto ok = WaitForLogMessage("Unknown action"); @@ -298,10 +292,8 @@ TEST_F(CelixEarpmImplTestSuite, ProcessUnknownRemoteHandlerInfoControlMessageTes TEST_F(CelixEarpmImplTestSuite, ProcessSessionEndMessageTest) { TestRemoteProvider([](celix_event_admin_remote_provider_mqtt_t* earpm) { AddRemoteHandlerInfoToRemoteProviderAndCheck(earpm, R"({"handler":{"handlerId":123,"topics":["topic"]}})", FAKE_FW_UUID); - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_SESSION_END_TOPIC, + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_SESSION_END_TOPIC, 0, nullptr, CELIX_EARPM_QOS_AT_LEAST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); @@ -314,23 +306,73 @@ TEST_F(CelixEarpmImplTestSuite, ProcessSessionEndMessageTest) { }); } -TEST_F(CelixEarpmImplTestSuite, ProcessControlMessageWithoutSenderUUIDTest) { +TEST_F(CelixEarpmImplTestSuite, ProcessMessageWithoutVersion) { TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_QUERY_TOPIC, 0, nullptr, CELIX_EARPM_QOS_AT_LEAST_ONCE, false, nullptr); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); - auto ok = WaitForLogMessage("No sender UUID for control message"); + auto ok = WaitForLogMessage(CELIX_EARPM_HANDLER_INFO_QUERY_TOPIC" message version(null) is incompatible."); ASSERT_TRUE(ok); }); } -TEST_F(CelixEarpmImplTestSuite, ProcessAddRemoteHandlerInfoMessageWithInvalidPayloadTest) { +TEST_F(CelixEarpmImplTestSuite, ProcessMessageVersionFormatError) { + TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { + celix_autoptr(mosquitto_property) properties = nullptr; + auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_MSG_VERSION", "1"); + ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_QUERY_TOPIC, + 0, nullptr, CELIX_EARPM_QOS_AT_LEAST_ONCE, false, properties); + ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + auto ok = WaitForLogMessage(CELIX_EARPM_HANDLER_INFO_QUERY_TOPIC" message version(1) is incompatible."); + ASSERT_TRUE(ok); + }); +} + +TEST_F(CelixEarpmImplTestSuite, ProcessMessageVersionStringTooLong) { + TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { + celix_autoptr(mosquitto_property) properties = nullptr; + auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_MSG_VERSION", "100000.200000000.300000"); + ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_QUERY_TOPIC, + 0, nullptr, CELIX_EARPM_QOS_AT_LEAST_ONCE, false, properties); + ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + auto ok = WaitForLogMessage(CELIX_EARPM_HANDLER_INFO_QUERY_TOPIC" message version(100000.200000000.300000) is incompatible."); + ASSERT_TRUE(ok); + }); +} + +TEST_F(CelixEarpmImplTestSuite, ProcessMessageVersionIncompatible) { TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); + auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_MSG_VERSION", "10.0.0"); + ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_QUERY_TOPIC, + 0, nullptr, CELIX_EARPM_QOS_AT_LEAST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + auto ok = WaitForLogMessage(CELIX_EARPM_HANDLER_INFO_QUERY_TOPIC" message version(10.0.0) is incompatible."); + ASSERT_TRUE(ok); + }); +} + +TEST_F(CelixEarpmImplTestSuite, ProcessControlMessageWithoutSenderUUIDTest) { + TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { + celix_autoptr(mosquitto_property) properties = nullptr; + auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_MSG_VERSION", "1.0.0"); + ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_QUERY_TOPIC, + 0, nullptr, CELIX_EARPM_QOS_AT_LEAST_ONCE, false, properties); + ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + auto ok = WaitForLogMessage("No sender UUID for control message"); + ASSERT_TRUE(ok); + }); +} + +TEST_F(CelixEarpmImplTestSuite, ProcessAddRemoteHandlerInfoMessageWithInvalidPayloadTest) { + TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"(invalid)"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); auto ok = WaitForLogMessage("Failed to parse message"); @@ -340,11 +382,9 @@ TEST_F(CelixEarpmImplTestSuite, ProcessAddRemoteHandlerInfoMessageWithInvalidPay TEST_F(CelixEarpmImplTestSuite, ProcessAddRemoteHandlerInfoMessageWithoutHandlerInfoTest) { TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"({})"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); auto ok = WaitForLogMessage("No handler information"); @@ -354,11 +394,9 @@ TEST_F(CelixEarpmImplTestSuite, ProcessAddRemoteHandlerInfoMessageWithoutHandler TEST_F(CelixEarpmImplTestSuite, ProcessAddRemoteHandlerInfoMessageWithoutHandlerIdTest) { TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"({"handler":{"topics":["topic"]}})"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); auto ok = WaitForLogMessage("Handler id is lost or not integer"); @@ -368,11 +406,9 @@ TEST_F(CelixEarpmImplTestSuite, ProcessAddRemoteHandlerInfoMessageWithoutHandler TEST_F(CelixEarpmImplTestSuite, ProcessAddRemoteHandlerInfoMessageWithInvalidHandlerIdTypeTest) { TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"({"handler":{"handlerId":"invalid","topics":["topic"]}})"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); auto ok = WaitForLogMessage("Handler id is lost or not integer"); @@ -382,11 +418,9 @@ TEST_F(CelixEarpmImplTestSuite, ProcessAddRemoteHandlerInfoMessageWithInvalidHan TEST_F(CelixEarpmImplTestSuite, ProcessAddRemoteHandlerInfoMessageWithInvalidHandlerIdTest) { TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"({"handler":{"handlerId":-1,"topics":["topic"]}})"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); auto ok = WaitForLogMessage("Handler id(-1) is invalid"); @@ -396,11 +430,9 @@ TEST_F(CelixEarpmImplTestSuite, ProcessAddRemoteHandlerInfoMessageWithInvalidHan TEST_F(CelixEarpmImplTestSuite, ProcessAddRemoteHandlerInfoMessageWithoutTopicsTest) { TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"({"handler":{"handlerId":123}})"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); auto ok = WaitForLogMessage("Topics is lost or not array."); @@ -410,11 +442,9 @@ TEST_F(CelixEarpmImplTestSuite, ProcessAddRemoteHandlerInfoMessageWithoutTopicsT TEST_F(CelixEarpmImplTestSuite, ProcessAddRemoteHandlerInfoMessageWithInvalidTopicsTypeTest) { TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"({"handler":{"handlerId":123,"topics":123}})"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); auto ok = WaitForLogMessage("Topics is lost or not array."); @@ -424,11 +454,9 @@ TEST_F(CelixEarpmImplTestSuite, ProcessAddRemoteHandlerInfoMessageWithInvalidTop TEST_F(CelixEarpmImplTestSuite, ProcessAddRemoteHandlerInfoMessageWithInvalidTopicTest) { TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"({"handler":{"handlerId":123,"topics":[123]}})"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); auto ok = WaitForLogMessage("Topic is not string."); @@ -438,11 +466,9 @@ TEST_F(CelixEarpmImplTestSuite, ProcessAddRemoteHandlerInfoMessageWithInvalidTop TEST_F(CelixEarpmImplTestSuite, ProcessRemoveRemoteHandlerInfoMessageWithInvalidPayloadTest) { TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"(invalid)"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_REMOVE_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_REMOVE_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); auto ok = WaitForLogMessage("Failed to parse message"); @@ -452,11 +478,9 @@ TEST_F(CelixEarpmImplTestSuite, ProcessRemoveRemoteHandlerInfoMessageWithInvalid TEST_F(CelixEarpmImplTestSuite, ProcessRemoveRemoteHandlerInfoMessageWithoutHandlerIdTest) { TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"({})"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_REMOVE_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_REMOVE_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); auto ok = WaitForLogMessage("Handler id is lost or not integer"); @@ -466,11 +490,9 @@ TEST_F(CelixEarpmImplTestSuite, ProcessRemoveRemoteHandlerInfoMessageWithoutHand TEST_F(CelixEarpmImplTestSuite, ProcessRemoveRemoteHandlerInfoMessageWithInvalidHandlerIdTypeTest) { TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"({"handlerId":"invalid"})"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_REMOVE_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_REMOVE_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); auto ok = WaitForLogMessage("Handler id is lost or not integer"); @@ -480,11 +502,9 @@ TEST_F(CelixEarpmImplTestSuite, ProcessRemoveRemoteHandlerInfoMessageWithInvalid TEST_F(CelixEarpmImplTestSuite, ProcessRemoveRemoteHandlerInfoMessageWithInvalidHandlerIdTest) { TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"({"handlerId":-1})"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_REMOVE_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_REMOVE_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); auto ok = WaitForLogMessage("Handler id(-1) is invalid"); @@ -502,12 +522,10 @@ TEST_F(CelixEarpmImplTestSuite, ProcessRemoveRemoteHandlerInfoMessageWhenHandler TEST_F(CelixEarpmImplTestSuite, ProcessSyncEventAckMessageWithoutCorrelationDataTest) { TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); char topic[128]{0}; snprintf(topic, sizeof(topic), "%s%s", CELIX_EARPM_SYNC_EVENT_ACK_TOPIC_PREFIX, fwUUID.c_str()); - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, topic, 0, nullptr, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, topic, 0, nullptr, CELIX_EARPM_QOS_AT_LEAST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); auto ok = WaitForLogMessage("Correlation data size is invalid"); @@ -517,10 +535,8 @@ TEST_F(CelixEarpmImplTestSuite, ProcessSyncEventAckMessageWithoutCorrelationData TEST_F(CelixEarpmImplTestSuite, ProcessSyncEventAckMessageWithInvalidCorrelationDataTest) { TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); - rc = mosquitto_property_add_binary(&properties, MQTT_PROP_CORRELATION_DATA, "invalid", strlen("invalid")); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); + auto rc = mosquitto_property_add_binary(&properties, MQTT_PROP_CORRELATION_DATA, "invalid", strlen("invalid")); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); char topic[128]{0}; snprintf(topic, sizeof(topic), "%s%s", CELIX_EARPM_SYNC_EVENT_ACK_TOPIC_PREFIX, fwUUID.c_str()); @@ -534,11 +550,9 @@ TEST_F(CelixEarpmImplTestSuite, ProcessSyncEventAckMessageWithInvalidCorrelation TEST_F(CelixEarpmImplTestSuite, ProcessSyncEventAckMessageBeforeRemoteHandlerInfoMessageTest) { TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); long correlationData = 1; - rc = mosquitto_property_add_binary(&properties, MQTT_PROP_CORRELATION_DATA, &correlationData, sizeof(correlationData)); + auto rc = mosquitto_property_add_binary(&properties, MQTT_PROP_CORRELATION_DATA, &correlationData, sizeof(correlationData)); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); char topic[128]{0}; snprintf(topic, sizeof(topic), "%s%s", CELIX_EARPM_SYNC_EVENT_ACK_TOPIC_PREFIX, fwUUID.c_str()); @@ -552,11 +566,9 @@ TEST_F(CelixEarpmImplTestSuite, ProcessSyncEventAckMessageBeforeRemoteHandlerInf TEST_F(CelixEarpmImplTestSuite, ProcessUpdateRemoteHandlerInfoMessageWithInvalidPayloadTest) { TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payLoad = R"(invalid)"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_UPDATE_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_UPDATE_TOPIC, (int)strlen(payLoad), payLoad, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); auto ok = WaitForLogMessage("Failed to parse message"); @@ -566,11 +578,9 @@ TEST_F(CelixEarpmImplTestSuite, ProcessUpdateRemoteHandlerInfoMessageWithInvalid TEST_F(CelixEarpmImplTestSuite, ProcessUpdateRemoteHandlerInfoMessageWithoutHandlerInfoTest) { TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payLoad = R"({})"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_UPDATE_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_UPDATE_TOPIC, (int)strlen(payLoad), payLoad, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); auto ok = WaitForLogMessage("No handler information"); @@ -580,11 +590,9 @@ TEST_F(CelixEarpmImplTestSuite, ProcessUpdateRemoteHandlerInfoMessageWithoutHand TEST_F(CelixEarpmImplTestSuite, ProcessUpdateRemoteHandlerInfoMessageWithoutHandlerIdTest) { TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"({"handlers":[{"topics":["topic"]}]})"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_UPDATE_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_UPDATE_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); auto ok = WaitForLogMessage("Handler id is lost or not integer"); @@ -594,11 +602,9 @@ TEST_F(CelixEarpmImplTestSuite, ProcessUpdateRemoteHandlerInfoMessageWithoutHand TEST_F(CelixEarpmImplTestSuite, ProcessUpdateRemoteHandlerInfoMessageWithInvalidHandlerIdTypeTest) { TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"({"handlers":[{"handlerId":"invalid","topics":["topic"]}]})"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_UPDATE_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_UPDATE_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); auto ok = WaitForLogMessage("Handler id is lost or not integer"); @@ -608,11 +614,9 @@ TEST_F(CelixEarpmImplTestSuite, ProcessUpdateRemoteHandlerInfoMessageWithInvalid TEST_F(CelixEarpmImplTestSuite, ProcessUpdateRemoteHandlerInfoMessageWithInvalidHandlerIdTest) { TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"({"handlers":[{"handlerId":-1,"topics":["topic"]}]})"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_UPDATE_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_UPDATE_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); auto ok = WaitForLogMessage("Handler id(-1) is invalid"); @@ -622,11 +626,9 @@ TEST_F(CelixEarpmImplTestSuite, ProcessUpdateRemoteHandlerInfoMessageWithInvalid TEST_F(CelixEarpmImplTestSuite, ProcessUpdateRemoteHandlerInfoMessageWithoutTopicsTest) { TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"({"handlers":[{"handlerId":123}]})"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_UPDATE_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_UPDATE_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); auto ok = WaitForLogMessage("Topics is lost or not array."); @@ -636,11 +638,9 @@ TEST_F(CelixEarpmImplTestSuite, ProcessUpdateRemoteHandlerInfoMessageWithoutTopi TEST_F(CelixEarpmImplTestSuite, ProcessUpdateRemoteHandlerInfoMessageWithInvalidTopicsTypeTest) { TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"({"handlers":[{"handlerId":123,"topics":123}]})"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_UPDATE_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_UPDATE_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); auto ok = WaitForLogMessage("Topics is lost or not array."); @@ -650,11 +650,9 @@ TEST_F(CelixEarpmImplTestSuite, ProcessUpdateRemoteHandlerInfoMessageWithInvalid TEST_F(CelixEarpmImplTestSuite, ProcessUpdateRemoteHandlerInfoMessageWithInvalidTopicTest) { TestRemoteProvider([this](celix_event_admin_remote_provider_mqtt_t*) { - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(); const char* payload = R"({"handlers":[{"handlerId":123,"topics":[123]}]})"; - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_UPDATE_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_UPDATE_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); auto ok = WaitForLogMessage("Topic is not string."); @@ -698,8 +696,9 @@ TEST_F(CelixEarpmImplTestSuite, ProcessAsyncEventTest) { celix_autofree char* payload = nullptr; status = celix_properties_saveToString(eventProps, 0, &payload); ASSERT_EQ(status, CELIX_SUCCESS); + celix_autoptr(mosquitto_property) mqttProperties = CreateMqttProperties(); auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, "asyncEvent", - (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_LEAST_ONCE, true, nullptr); + (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_LEAST_ONCE, true, mqttProperties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); //wait for the async event @@ -759,6 +758,8 @@ TEST_F(CelixEarpmImplTestSuite, ProcessSyncEventTest) { long correlationData {0}; rc = mosquitto_property_add_binary(&mqttProps, MQTT_PROP_CORRELATION_DATA, &correlationData, sizeof(correlationData)); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + rc = mosquitto_property_add_string_pair(&mqttProps, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_MSG_VERSION", "1.0.0"); + ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, "syncEvent", (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_LEAST_ONCE, true, mqttProps); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); diff --git a/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmImplTestSuiteBaseClass.h b/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmImplTestSuiteBaseClass.h index b66ae414d..a9be7ee87 100644 --- a/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmImplTestSuiteBaseClass.h +++ b/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmImplTestSuiteBaseClass.h @@ -22,6 +22,8 @@ #include <cstring> #include <string> +#include <csignal> +#include <sys/wait.h> #include <unistd.h> #include <functional> #include <future> @@ -96,6 +98,8 @@ class MqttClient { } auto rc = mosquitto_property_add_string_pair(&responseProps, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + rc = mosquitto_property_add_string_pair(&responseProps, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_MSG_VERSION", "1.0.0"); + ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); rc = mosquitto_publish_v5(client->mosq.get(), nullptr, responseTopic, 0, nullptr, CELIX_EARPM_QOS_AT_LEAST_ONCE, false, responseProps); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); } @@ -215,11 +219,18 @@ class CelixEarpmImplTestSuiteBaseClass : public CelixEarpmTestSuiteBaseClass { celix_earpm_destroy(earpm); } - static void AddRemoteHandlerInfoToRemoteProviderAndCheck(celix_event_admin_remote_provider_mqtt_t* earpm, const char* handlerInfo, const char* senderUUID = FAKE_FW_UUID) { + static mosquitto_property* CreateMqttProperties(const char* senderUUID = FAKE_FW_UUID, const char* msgVersion = "1.0.0") { celix_autoptr(mosquitto_property) properties = nullptr; auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", senderUUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, + EXPECT_EQ(rc, MOSQ_ERR_SUCCESS); + rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_MSG_VERSION", msgVersion); + EXPECT_EQ(rc, MOSQ_ERR_SUCCESS); + return celix_steal_ptr(properties); + } + + static void AddRemoteHandlerInfoToRemoteProviderAndCheck(celix_event_admin_remote_provider_mqtt_t* earpm, const char* handlerInfo, const char* senderUUID = FAKE_FW_UUID) { + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(senderUUID); + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC, (int)strlen(handlerInfo), handlerInfo, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); auto ok = WaitFor([earpm] { return celix_earpm_currentRemoteFrameworkCount(earpm) != 0; });//Wait for receive the handler info message @@ -227,23 +238,19 @@ class CelixEarpmImplTestSuiteBaseClass : public CelixEarpmTestSuiteBaseClass { } static void RemoveRemoteHandlerInfoFromRemoteProvider(celix_event_admin_remote_provider_mqtt_t*, long handlerServiceId, const char* senderUUID = FAKE_FW_UUID) { - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", senderUUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(senderUUID); char payload[128]{0}; snprintf(payload, sizeof(payload), R"({"handlerId":%ld})", handlerServiceId); - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_REMOVE_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_REMOVE_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_LEAST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); } static void UpdateRemoteHandlerInfoToRemoteProvider(celix_event_admin_remote_provider_mqtt_t*, const char* handlers, const char* senderUUID = FAKE_FW_UUID) { - celix_autoptr(mosquitto_property) properties = nullptr; - auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", senderUUID); - ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); + celix_autoptr(mosquitto_property) properties = CreateMqttProperties(senderUUID); char payload[1024]{0}; snprintf(payload, sizeof(payload), R"({"handlers":%s})", handlers); - rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_UPDATE_TOPIC, + auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_UPDATE_TOPIC, (int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties); ASSERT_EQ(rc, MOSQ_ERR_SUCCESS); } diff --git a/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmIntegrationTestSuite.cc b/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmIntegrationTestSuite.cc index 774cc5495..0d552c7cf 100644 --- a/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmIntegrationTestSuite.cc +++ b/bundles/event_admin/remote_provider/remote_provider_mqtt/gtest/src/CelixEarpmIntegrationTestSuite.cc @@ -17,9 +17,11 @@ * under the License. */ #include <future> -#include <mosquitto.h> +#include <csignal> +#include <sys/wait.h> #include <unistd.h> #include <gtest/gtest.h> +#include <mosquitto.h> #include "celix_framework_utils.h" #include "celix_constants.h" diff --git a/bundles/event_admin/remote_provider/remote_provider_mqtt/src/celix_earpm_broker_discovery.c b/bundles/event_admin/remote_provider/remote_provider_mqtt/src/celix_earpm_broker_discovery.c index 63dc76c18..bf16bb387 100644 --- a/bundles/event_admin/remote_provider/remote_provider_mqtt/src/celix_earpm_broker_discovery.c +++ b/bundles/event_admin/remote_provider/remote_provider_mqtt/src/celix_earpm_broker_discovery.c @@ -40,7 +40,7 @@ #include "remote_constants.h" #include "celix_earpm_constants.h" -#define CELIX_EARPM_LOAD_PROFILE_INTERVAL 2 //s +#define CELIX_EARPM_LOAD_PROFILE_INTERVAL 2 //seconds #define CELIX_EARPM_LOAD_PROFILE_TRIES_MAX (600/CELIX_EARPM_LOAD_PROFILE_INTERVAL) //10 minutes @@ -429,13 +429,13 @@ static bool celix_earpmDiscovery_loadBrokerProfile(celix_earpm_broker_discovery_ return false; } - celix_array_list_t* brokerEndpoints = celix_earpmDiscovery_createBrokerEndpoints(discovery, brokerListeners); + celix_autoptr(celix_array_list_t) brokerEndpoints = celix_earpmDiscovery_createBrokerEndpoints(discovery, brokerListeners); if (brokerEndpoints == NULL) { celix_logHelper_error(discovery->logHelper, "Failed to create broker endpoints."); return false; } celix_auto(celix_mutex_lock_guard_t) mutexLockGuard = celixMutexLockGuard_init(&discovery->mutex); - discovery->brokerEndpoints = brokerEndpoints; + discovery->brokerEndpoints = celix_steal_ptr(brokerEndpoints); return true; } diff --git a/bundles/event_admin/remote_provider/remote_provider_mqtt/src/celix_earpm_client.c b/bundles/event_admin/remote_provider/remote_provider_mqtt/src/celix_earpm_client.c index 894f40e58..b10b4996c 100644 --- a/bundles/event_admin/remote_provider/remote_provider_mqtt/src/celix_earpm_client.c +++ b/bundles/event_admin/remote_provider/remote_provider_mqtt/src/celix_earpm_client.c @@ -109,7 +109,7 @@ struct celix_earpm_client { bool running; }; -static celix_status_t celix_earpmClient_configMosq(mosquitto* mosq, celix_log_helper_t* logHelper, const char* sessionEndMsgTopic, const char* sessionEndMsgSenderUUID); +static celix_status_t celix_earpmClient_configMosq(mosquitto* mosq, celix_log_helper_t* logHelper, const char* sessionEndMsgTopic, const char* sessionEndMsgSenderUUID, const char* sessionEndMsgVersion); static void celix_earpmClient_messageRelease(celix_earpm_client_msg_t* msg); static void celix_earpmClient_brokerInfoRelease(celix_earpm_client_broker_info_t* info); static void celix_earpmClient_connectCallback(struct mosquitto* mosq, void* handle, int rc, int flag, const mosquitto_property* props); @@ -162,6 +162,7 @@ celix_earpm_client_t* celix_earpmClient_create(celix_earpm_client_create_options assert(options->logHelper != NULL); assert(options->sessionEndMsgTopic != NULL); assert(options->sessionEndMsgSenderUUID != NULL); + assert(options->sessionEndMsgVersion != NULL); assert(options->receiveMsgCallback != NULL); assert(options->connectedCallback != NULL); @@ -276,7 +277,7 @@ celix_earpm_client_t* celix_earpmClient_create(celix_earpm_client_create_options celix_logHelper_error(client->logHelper, "Failed to create mosquitto instance."); return NULL; } - status = celix_earpmClient_configMosq(client->mosq, client->logHelper, options->sessionEndMsgTopic, options->sessionEndMsgSenderUUID); + status = celix_earpmClient_configMosq(client->mosq, client->logHelper, options->sessionEndMsgTopic, options->sessionEndMsgSenderUUID, options->sessionEndMsgVersion); if (status != CELIX_SUCCESS) { celix_logHelper_error(client->logHelper, "Failed to configure mosquitto instance."); return NULL; @@ -335,7 +336,7 @@ void celix_earpmClient_destroy(celix_earpm_client_t* client) { return; } -static celix_status_t celix_earpmClient_configMosq(mosquitto *mosq, celix_log_helper_t* logHelper, const char* sessionEndMsgTopic, const char* sessionEndMsgSenderUUID) { +static celix_status_t celix_earpmClient_configMosq(mosquitto *mosq, celix_log_helper_t* logHelper, const char* sessionEndMsgTopic, const char* sessionEndMsgSenderUUID, const char* sessionEndMsgVersion) { assert(mosq != NULL); int rc = mosquitto_int_option(mosq, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5); if (rc != MOSQ_ERR_SUCCESS) { @@ -360,6 +361,10 @@ static celix_status_t celix_earpmClient_configMosq(mosquitto *mosq, celix_log_he celix_logHelper_error(logHelper, "Failed to add sender UUID property for will message."); return ENOMEM; } + if (mosquitto_property_add_string_pair(&sessionEndMsgProps, MQTT_PROP_USER_PROPERTY, CELIX_EARPM_MQTT_USER_PROP_MSG_VERSION, sessionEndMsgVersion) != MOSQ_ERR_SUCCESS) { + celix_logHelper_error(logHelper, "Failed to add message version property for will message."); + return ENOMEM; + } rc = mosquitto_will_set_v5(mosq, sessionEndMsgTopic, 0, NULL, CELIX_EARPM_QOS_AT_LEAST_ONCE, false, sessionEndMsgProps); if (rc != MOSQ_ERR_SUCCESS) { celix_logHelper_error(logHelper, "Failed to set mqtt will. %d.", rc); diff --git a/bundles/event_admin/remote_provider/remote_provider_mqtt/src/celix_earpm_client.h b/bundles/event_admin/remote_provider/remote_provider_mqtt/src/celix_earpm_client.h index 1bf49447d..80c97d085 100644 --- a/bundles/event_admin/remote_provider/remote_provider_mqtt/src/celix_earpm_client.h +++ b/bundles/event_admin/remote_provider/remote_provider_mqtt/src/celix_earpm_client.h @@ -44,6 +44,7 @@ typedef struct celix_earpm_client_create_options { celix_log_helper_t* logHelper; const char* sessionEndMsgTopic; const char* sessionEndMsgSenderUUID; + const char* sessionEndMsgVersion; void* callbackHandle; celix_earpm_client_receive_msg_fp receiveMsgCallback; celix_earpm_client_connected_fp connectedCallback; diff --git a/bundles/event_admin/remote_provider/remote_provider_mqtt/src/celix_earpm_constants.h b/bundles/event_admin/remote_provider/remote_provider_mqtt/src/celix_earpm_constants.h index 518b4430e..d2c4e17de 100644 --- a/bundles/event_admin/remote_provider/remote_provider_mqtt/src/celix_earpm_constants.h +++ b/bundles/event_admin/remote_provider/remote_provider_mqtt/src/celix_earpm_constants.h @@ -24,7 +24,7 @@ extern "C" { #endif /** - * Topic for the EventAdminMqtt + * Control message topics for the EventAdminMqtt * @{ */ #define CELIX_EARPM_TOPIC_PREFIX "celix/EventAdminMqtt/" @@ -36,7 +36,7 @@ extern "C" { #define CELIX_EARPM_SYNC_EVENT_ACK_TOPIC_PREFIX CELIX_EARPM_TOPIC_PREFIX"SyncEvent/ack/" // topic = topic prefix + requester framework UUID #define CELIX_EARPM_SESSION_END_TOPIC CELIX_EARPM_TOPIC_PREFIX"session/end" -/** @}*///end of Topic for the EventAdminMqtt +/** @}*///end of Control message topics for the EventAdminMqtt /** * Configuration properties for the EventAdminMqtt @@ -88,7 +88,7 @@ extern "C" { /** @}*///end of Configuration properties for the EventAdminMqtt /** - * @brief The QoS MQTT + * @brief The QoS for the MQTT messages. */ typedef enum celix_earpm_qos { CELIX_EARPM_QOS_UNKNOWN = -1, diff --git a/bundles/event_admin/remote_provider/remote_provider_mqtt/src/celix_earpm_impl.c b/bundles/event_admin/remote_provider/remote_provider_mqtt/src/celix_earpm_impl.c index a0fca7b61..b34546715 100644 --- a/bundles/event_admin/remote_provider/remote_provider_mqtt/src/celix_earpm_impl.c +++ b/bundles/event_admin/remote_provider/remote_provider_mqtt/src/celix_earpm_impl.c @@ -23,6 +23,7 @@ #include <stdbool.h> #include <errno.h> #include <stdlib.h> +#include <stdio.h> #include <jansson.h> #include "celix_cleanup.h" @@ -34,7 +35,6 @@ #include "celix_threads.h" #include "celix_constants.h" #include "celix_filter.h" -#include "celix_framework.h" #include "celix_event_constants.h" #include "celix_event_remote_provider_service.h" #include "celix_earpm_event_deliverer.h" @@ -48,7 +48,9 @@ */ #define CELIX_EARPM_SYNC_EVENT_TIMEOUT_DEFAULT (5*60) //seconds - +/** + * @brief The version of the remote provider messages(It contains event messages and control messages). + */ #define CELIX_EARPM_MSG_VERSION "1.0.0" typedef struct celix_earpm_event_handler { @@ -99,7 +101,7 @@ struct celix_event_admin_remote_provider_mqtt { celix_thread_cond_t ackCond; celix_long_hash_map_t* eventHandlers;//key = serviceId, value = celix_earpm_event_handler_t* celix_string_hash_map_t* eventSubscriptions;//key = topic, value = celix_earpm_event_subscription_t* - celix_string_hash_map_t* remoteFrameworks;// key = frameworkUUID of remote frameworks, value = celix_remote_framework_info_t* + celix_string_hash_map_t* remoteFrameworks;// key = frameworkUUID of remote frameworks, value = celix_earpm_remote_framework_info_t* bool destroying; }; @@ -206,6 +208,7 @@ celix_event_admin_remote_provider_mqtt_t* celix_earpm_create(celix_bundle_contex opts.logHelper = logHelper; opts.sessionEndMsgTopic = CELIX_EARPM_SESSION_END_TOPIC; opts.sessionEndMsgSenderUUID = earpm->fwUUID; + opts.sessionEndMsgVersion = CELIX_EARPM_MSG_VERSION; opts.callbackHandle = earpm; opts.receiveMsgCallback = celix_earpm_receiveMsgCallback; opts.connectedCallback = celix_earpm_connectedCallback; @@ -749,6 +752,7 @@ static celix_status_t celix_earpm_publishEventSync(celix_event_admin_remote_prov requestInfo.qos = qos; requestInfo.pri = CELIX_EARPM_MSG_PRI_LOW; requestInfo.expiryInterval = expiryInterval; + requestInfo.version = CELIX_EARPM_MSG_VERSION; requestInfo.responseTopic = earpm->syncEventAckTopic; struct celix_earpm_sync_event_correlation_data correlationData; memset(&correlationData, 0, sizeof(correlationData)); @@ -1349,12 +1353,42 @@ static void celix_earpm_processEventMessage(celix_event_admin_remote_provider_mq return; } +static bool celix_earpm_isMsgCompatible(const celix_earpm_client_request_info_t* requestInfo) { + char actualVersion[16]= {0}; + if (requestInfo->version == NULL) { + return false; + } + int ret = snprintf(actualVersion, sizeof(actualVersion), "%s", requestInfo->version); + if (ret < 0 || ret >= (int)sizeof(actualVersion)) { + return false; + } + char* endPtr = NULL; + long actualMajor = strtol(actualVersion, &endPtr, 10); + if (endPtr == NULL || endPtr[0] != '.') { + return false; + } + long actualMinor = strtol(endPtr + 1, NULL, 10); + long expectedMajor = strtol(CELIX_EARPM_MSG_VERSION, &endPtr, 10); + assert(endPtr[0] == '.'); + long expectedMinor = strtol(endPtr + 1, NULL, 10); + + if (actualMajor == expectedMajor && actualMinor <= expectedMinor) { + return true; + } + return false; +} + static void celix_earpm_receiveMsgCallback(void* handle, const celix_earpm_client_request_info_t* requestInfo) { assert(handle != NULL); assert(requestInfo != NULL); assert(requestInfo->topic != NULL); celix_event_admin_remote_provider_mqtt_t* earpm = (celix_event_admin_remote_provider_mqtt_t*)handle; + if (!celix_earpm_isMsgCompatible(requestInfo)) { + celix_logHelper_warning(earpm->logHelper, "%s message version(%s) is incompatible.",requestInfo->topic, requestInfo->version == NULL ? "null" : requestInfo->version); + return; + } + if (strncmp(requestInfo->topic,CELIX_EARPM_TOPIC_PREFIX, sizeof(CELIX_EARPM_TOPIC_PREFIX)-1) == 0) { celix_earpm_processControlMessage(earpm, requestInfo); } else {// user topic