Skip to content

Commit

Permalink
Add message version verification for remote_provider_mqtt
Browse files Browse the repository at this point in the history
  • Loading branch information
xuzhenbao committed Oct 10, 2024
1 parent 16402f0 commit 6a8f500
Show file tree
Hide file tree
Showing 14 changed files with 235 additions and 208 deletions.
4 changes: 0 additions & 4 deletions bundles/event_admin/examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ if (EVENT_ADMIN_EXAMPLES)

if (TARGET Celix::rsa_discovery_zeroconf)#Celix::rsa_discovery_zeroconf only available in linux
add_celix_container(remote_event_admin_mqtt_publisher
LAUNCHER Celix::launcher
NAME "publisher"
GROUP "event_admin/mqtt"
BUNDLES
Expand All @@ -47,13 +46,11 @@ if (EVENT_ADMIN_EXAMPLES)
event_publisher_example
Celix::rsa_discovery_zeroconf
Celix::event_admin_remote_provider_mqtt
USE_CONFIG
PROPERTIES
CELIX_EARPM_BROKER_PROFILE=${CMAKE_CURRENT_SOURCE_DIR}/res/mosquitto.conf
)

add_celix_container(remote_event_admin_mqtt_subscriber
LAUNCHER Celix::launcher
NAME "subscriber"
GROUP "event_admin/mqtt"
BUNDLES
Expand All @@ -64,7 +61,6 @@ if (EVENT_ADMIN_EXAMPLES)
event_handler_example
Celix::rsa_discovery_zeroconf
Celix::event_admin_remote_provider_mqtt
USE_CONFIG
)
endif ()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,5 +123,5 @@ When the MQTT connection is disconnected, `event_admin_remote_provider_mqtt` wil

See the cmake target `remote_event_admin_mqtt_publisher` and `remote_event_admin_mqtt_subscriber` in the `event_admin/examples` directory.

Note: Before running the example, make sure the `mosquitto broker` and the `mdnsd` are running. You can get `mosquitto` from [here](https://github.com/eclipse/mosquitto) and `mdnsd` from [here](https://github.com/apple-oss-distributions/mDNSResponder).
Note: Before running the example, make sure the `mosquitto broker` and the `mdnsd` are running. You can get `mosquitto` from [here](https://github.com/eclipse/mosquitto) and `mdnsd` from [here](https://github.com/apple-oss-distributions/mDNSResponder). And you should use command `mosquitto -c <profile>` to start the `mosquitto broker`. The profile of the `mosquitto broker` can be got from [here](../../examples/res/mosquitto.conf).

Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,13 @@ TEST_F(CelixEarpmClientErrorInjectionTestSuite, FailedToAddSenderUUIDToWillMessa
ASSERT_EQ(nullptr, client);
}

TEST_F(CelixEarpmClientErrorInjectionTestSuite, FailedToAddMsgVersionToWillMessagePropertyTest) {
celix_ei_expect_mosquitto_property_add_string_pair((void*)&celix_earpmClient_create, 1, MOSQ_ERR_NOMEM, 2);
celix_earpm_client_create_options_t opts{defaultOpts};
celix_earpm_client_t *client = celix_earpmClient_create(&opts);
ASSERT_EQ(nullptr, client);
}

TEST_F(CelixEarpmClientErrorInjectionTestSuite, FailedToSetWillMessageTest) {
celix_ei_expect_mosquitto_will_set_v5((void*)&celix_earpmClient_create, 1, MOSQ_ERR_NOMEM);
celix_earpm_client_create_options_t opts{defaultOpts};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
#include <cstring>
#include <string>
#include <csignal>
#include <sys/wait.h>
#include <unistd.h>
#include <cstdlib>
#include <functional>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

#include <cstring>
#include <string>
#include <csignal>
#include <sys/wait.h>
#include <unistd.h>
#include <cstdlib>
#include <functional>
Expand Down Expand Up @@ -91,6 +93,7 @@ class CelixEarpmClientTestSuiteBaseClass : public CelixEarpmTestSuiteBaseClass {
defaultOpts.logHelper = logHelper.get();
defaultOpts.sessionEndMsgTopic = CELIX_EARPM_SESSION_END_TOPIC;
defaultOpts.sessionEndMsgSenderUUID = fwUUID.c_str();
defaultOpts.sessionEndMsgVersion = "1.0.0";
defaultOpts.callbackHandle = nullptr;
defaultOpts.receiveMsgCallback = [](void*, const celix_earpm_client_request_info_t*) {};
defaultOpts.connectedCallback = [](void*) {};
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

#include <cstring>
#include <string>
#include <csignal>
#include <sys/wait.h>
#include <unistd.h>
#include <functional>
#include <future>
Expand Down Expand Up @@ -96,6 +98,8 @@ class MqttClient {
}
auto rc = mosquitto_property_add_string_pair(&responseProps, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", FAKE_FW_UUID);
ASSERT_EQ(rc, MOSQ_ERR_SUCCESS);
rc = mosquitto_property_add_string_pair(&responseProps, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_MSG_VERSION", "1.0.0");
ASSERT_EQ(rc, MOSQ_ERR_SUCCESS);
rc = mosquitto_publish_v5(client->mosq.get(), nullptr, responseTopic, 0, nullptr, CELIX_EARPM_QOS_AT_LEAST_ONCE, false, responseProps);
ASSERT_EQ(rc, MOSQ_ERR_SUCCESS);
}
Expand Down Expand Up @@ -215,35 +219,38 @@ class CelixEarpmImplTestSuiteBaseClass : public CelixEarpmTestSuiteBaseClass {
celix_earpm_destroy(earpm);
}

static void AddRemoteHandlerInfoToRemoteProviderAndCheck(celix_event_admin_remote_provider_mqtt_t* earpm, const char* handlerInfo, const char* senderUUID = FAKE_FW_UUID) {
static mosquitto_property* CreateMqttProperties(const char* senderUUID = FAKE_FW_UUID, const char* msgVersion = "1.0.0") {
celix_autoptr(mosquitto_property) properties = nullptr;
auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", senderUUID);
ASSERT_EQ(rc, MOSQ_ERR_SUCCESS);
rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC,
EXPECT_EQ(rc, MOSQ_ERR_SUCCESS);
rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_MSG_VERSION", msgVersion);
EXPECT_EQ(rc, MOSQ_ERR_SUCCESS);
return celix_steal_ptr(properties);
}

static void AddRemoteHandlerInfoToRemoteProviderAndCheck(celix_event_admin_remote_provider_mqtt_t* earpm, const char* handlerInfo, const char* senderUUID = FAKE_FW_UUID) {
celix_autoptr(mosquitto_property) properties = CreateMqttProperties(senderUUID);
auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_ADD_TOPIC,
(int)strlen(handlerInfo), handlerInfo, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties);
ASSERT_EQ(rc, MOSQ_ERR_SUCCESS);
auto ok = WaitFor([earpm] { return celix_earpm_currentRemoteFrameworkCount(earpm) != 0; });//Wait for receive the handler info message
ASSERT_TRUE(ok);
}

static void RemoveRemoteHandlerInfoFromRemoteProvider(celix_event_admin_remote_provider_mqtt_t*, long handlerServiceId, const char* senderUUID = FAKE_FW_UUID) {
celix_autoptr(mosquitto_property) properties = nullptr;
auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", senderUUID);
ASSERT_EQ(rc, MOSQ_ERR_SUCCESS);
celix_autoptr(mosquitto_property) properties = CreateMqttProperties(senderUUID);
char payload[128]{0};
snprintf(payload, sizeof(payload), R"({"handlerId":%ld})", handlerServiceId);
rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_REMOVE_TOPIC,
auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_REMOVE_TOPIC,
(int)strlen(payload), payload, CELIX_EARPM_QOS_AT_LEAST_ONCE, false, properties);
ASSERT_EQ(rc, MOSQ_ERR_SUCCESS);
}

static void UpdateRemoteHandlerInfoToRemoteProvider(celix_event_admin_remote_provider_mqtt_t*, const char* handlers, const char* senderUUID = FAKE_FW_UUID) {
celix_autoptr(mosquitto_property) properties = nullptr;
auto rc = mosquitto_property_add_string_pair(&properties, MQTT_PROP_USER_PROPERTY, "CELIX_EARPM_SENDER_UUID", senderUUID);
ASSERT_EQ(rc, MOSQ_ERR_SUCCESS);
celix_autoptr(mosquitto_property) properties = CreateMqttProperties(senderUUID);
char payload[1024]{0};
snprintf(payload, sizeof(payload), R"({"handlers":%s})", handlers);
rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_UPDATE_TOPIC,
auto rc = mosquitto_publish_v5(mqttClient->mosq.get(), nullptr, CELIX_EARPM_HANDLER_INFO_UPDATE_TOPIC,
(int)strlen(payload), payload, CELIX_EARPM_QOS_AT_MOST_ONCE, false, properties);
ASSERT_EQ(rc, MOSQ_ERR_SUCCESS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
* under the License.
*/
#include <future>
#include <mosquitto.h>
#include <csignal>
#include <sys/wait.h>
#include <unistd.h>
#include <gtest/gtest.h>
#include <mosquitto.h>

#include "celix_framework_utils.h"
#include "celix_constants.h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
#include "remote_constants.h"
#include "celix_earpm_constants.h"

#define CELIX_EARPM_LOAD_PROFILE_INTERVAL 2 //s
#define CELIX_EARPM_LOAD_PROFILE_INTERVAL 2 //seconds
#define CELIX_EARPM_LOAD_PROFILE_TRIES_MAX (600/CELIX_EARPM_LOAD_PROFILE_INTERVAL) //10 minutes


Expand Down Expand Up @@ -429,13 +429,13 @@ static bool celix_earpmDiscovery_loadBrokerProfile(celix_earpm_broker_discovery_
return false;
}

celix_array_list_t* brokerEndpoints = celix_earpmDiscovery_createBrokerEndpoints(discovery, brokerListeners);
celix_autoptr(celix_array_list_t) brokerEndpoints = celix_earpmDiscovery_createBrokerEndpoints(discovery, brokerListeners);
if (brokerEndpoints == NULL) {
celix_logHelper_error(discovery->logHelper, "Failed to create broker endpoints.");
return false;
}
celix_auto(celix_mutex_lock_guard_t) mutexLockGuard = celixMutexLockGuard_init(&discovery->mutex);
discovery->brokerEndpoints = brokerEndpoints;
discovery->brokerEndpoints = celix_steal_ptr(brokerEndpoints);

return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ struct celix_earpm_client {
bool running;
};

static celix_status_t celix_earpmClient_configMosq(mosquitto* mosq, celix_log_helper_t* logHelper, const char* sessionEndMsgTopic, const char* sessionEndMsgSenderUUID);
static celix_status_t celix_earpmClient_configMosq(mosquitto* mosq, celix_log_helper_t* logHelper, const char* sessionEndMsgTopic, const char* sessionEndMsgSenderUUID, const char* sessionEndMsgVersion);
static void celix_earpmClient_messageRelease(celix_earpm_client_msg_t* msg);
static void celix_earpmClient_brokerInfoRelease(celix_earpm_client_broker_info_t* info);
static void celix_earpmClient_connectCallback(struct mosquitto* mosq, void* handle, int rc, int flag, const mosquitto_property* props);
Expand Down Expand Up @@ -162,6 +162,7 @@ celix_earpm_client_t* celix_earpmClient_create(celix_earpm_client_create_options
assert(options->logHelper != NULL);
assert(options->sessionEndMsgTopic != NULL);
assert(options->sessionEndMsgSenderUUID != NULL);
assert(options->sessionEndMsgVersion != NULL);
assert(options->receiveMsgCallback != NULL);
assert(options->connectedCallback != NULL);

Expand Down Expand Up @@ -276,7 +277,7 @@ celix_earpm_client_t* celix_earpmClient_create(celix_earpm_client_create_options
celix_logHelper_error(client->logHelper, "Failed to create mosquitto instance.");
return NULL;
}
status = celix_earpmClient_configMosq(client->mosq, client->logHelper, options->sessionEndMsgTopic, options->sessionEndMsgSenderUUID);
status = celix_earpmClient_configMosq(client->mosq, client->logHelper, options->sessionEndMsgTopic, options->sessionEndMsgSenderUUID, options->sessionEndMsgVersion);
if (status != CELIX_SUCCESS) {
celix_logHelper_error(client->logHelper, "Failed to configure mosquitto instance.");
return NULL;
Expand Down Expand Up @@ -335,7 +336,7 @@ void celix_earpmClient_destroy(celix_earpm_client_t* client) {
return;
}

static celix_status_t celix_earpmClient_configMosq(mosquitto *mosq, celix_log_helper_t* logHelper, const char* sessionEndMsgTopic, const char* sessionEndMsgSenderUUID) {
static celix_status_t celix_earpmClient_configMosq(mosquitto *mosq, celix_log_helper_t* logHelper, const char* sessionEndMsgTopic, const char* sessionEndMsgSenderUUID, const char* sessionEndMsgVersion) {
assert(mosq != NULL);
int rc = mosquitto_int_option(mosq, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5);
if (rc != MOSQ_ERR_SUCCESS) {
Expand All @@ -360,6 +361,10 @@ static celix_status_t celix_earpmClient_configMosq(mosquitto *mosq, celix_log_he
celix_logHelper_error(logHelper, "Failed to add sender UUID property for will message.");
return ENOMEM;
}
if (mosquitto_property_add_string_pair(&sessionEndMsgProps, MQTT_PROP_USER_PROPERTY, CELIX_EARPM_MQTT_USER_PROP_MSG_VERSION, sessionEndMsgVersion) != MOSQ_ERR_SUCCESS) {
celix_logHelper_error(logHelper, "Failed to add message version property for will message.");
return ENOMEM;
}
rc = mosquitto_will_set_v5(mosq, sessionEndMsgTopic, 0, NULL, CELIX_EARPM_QOS_AT_LEAST_ONCE, false, sessionEndMsgProps);
if (rc != MOSQ_ERR_SUCCESS) {
celix_logHelper_error(logHelper, "Failed to set mqtt will. %d.", rc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ typedef struct celix_earpm_client_create_options {
celix_log_helper_t* logHelper;
const char* sessionEndMsgTopic;
const char* sessionEndMsgSenderUUID;
const char* sessionEndMsgVersion;
void* callbackHandle;
celix_earpm_client_receive_msg_fp receiveMsgCallback;
celix_earpm_client_connected_fp connectedCallback;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ extern "C" {
#endif

/**
* Topic for the EventAdminMqtt
* Control message topics for the EventAdminMqtt
* @{
*/
#define CELIX_EARPM_TOPIC_PREFIX "celix/EventAdminMqtt/"
Expand All @@ -36,7 +36,7 @@ extern "C" {
#define CELIX_EARPM_SYNC_EVENT_ACK_TOPIC_PREFIX CELIX_EARPM_TOPIC_PREFIX"SyncEvent/ack/" // topic = topic prefix + requester framework UUID
#define CELIX_EARPM_SESSION_END_TOPIC CELIX_EARPM_TOPIC_PREFIX"session/end"

/** @}*///end of Topic for the EventAdminMqtt
/** @}*///end of Control message topics for the EventAdminMqtt

/**
* Configuration properties for the EventAdminMqtt
Expand Down Expand Up @@ -88,7 +88,7 @@ extern "C" {
/** @}*///end of Configuration properties for the EventAdminMqtt

/**
* @brief The QoS MQTT
* @brief The QoS for the MQTT messages.
*/
typedef enum celix_earpm_qos {
CELIX_EARPM_QOS_UNKNOWN = -1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <stdbool.h>
#include <errno.h>
#include <stdlib.h>
#include <stdio.h>
#include <jansson.h>

#include "celix_cleanup.h"
Expand All @@ -34,7 +35,6 @@
#include "celix_threads.h"
#include "celix_constants.h"
#include "celix_filter.h"
#include "celix_framework.h"
#include "celix_event_constants.h"
#include "celix_event_remote_provider_service.h"
#include "celix_earpm_event_deliverer.h"
Expand All @@ -48,7 +48,9 @@
*/
#define CELIX_EARPM_SYNC_EVENT_TIMEOUT_DEFAULT (5*60) //seconds


/**
* @brief The version of the remote provider messages(It contains event messages and control messages).
*/
#define CELIX_EARPM_MSG_VERSION "1.0.0"

typedef struct celix_earpm_event_handler {
Expand Down Expand Up @@ -99,7 +101,7 @@ struct celix_event_admin_remote_provider_mqtt {
celix_thread_cond_t ackCond;
celix_long_hash_map_t* eventHandlers;//key = serviceId, value = celix_earpm_event_handler_t*
celix_string_hash_map_t* eventSubscriptions;//key = topic, value = celix_earpm_event_subscription_t*
celix_string_hash_map_t* remoteFrameworks;// key = frameworkUUID of remote frameworks, value = celix_remote_framework_info_t*
celix_string_hash_map_t* remoteFrameworks;// key = frameworkUUID of remote frameworks, value = celix_earpm_remote_framework_info_t*
bool destroying;
};

Expand Down Expand Up @@ -206,6 +208,7 @@ celix_event_admin_remote_provider_mqtt_t* celix_earpm_create(celix_bundle_contex
opts.logHelper = logHelper;
opts.sessionEndMsgTopic = CELIX_EARPM_SESSION_END_TOPIC;
opts.sessionEndMsgSenderUUID = earpm->fwUUID;
opts.sessionEndMsgVersion = CELIX_EARPM_MSG_VERSION;
opts.callbackHandle = earpm;
opts.receiveMsgCallback = celix_earpm_receiveMsgCallback;
opts.connectedCallback = celix_earpm_connectedCallback;
Expand Down Expand Up @@ -749,6 +752,7 @@ static celix_status_t celix_earpm_publishEventSync(celix_event_admin_remote_prov
requestInfo.qos = qos;
requestInfo.pri = CELIX_EARPM_MSG_PRI_LOW;
requestInfo.expiryInterval = expiryInterval;
requestInfo.version = CELIX_EARPM_MSG_VERSION;
requestInfo.responseTopic = earpm->syncEventAckTopic;
struct celix_earpm_sync_event_correlation_data correlationData;
memset(&correlationData, 0, sizeof(correlationData));
Expand Down Expand Up @@ -1349,12 +1353,42 @@ static void celix_earpm_processEventMessage(celix_event_admin_remote_provider_mq
return;
}

static bool celix_earpm_isMsgCompatible(const celix_earpm_client_request_info_t* requestInfo) {
char actualVersion[16]= {0};
if (requestInfo->version == NULL) {
return false;
}
int ret = snprintf(actualVersion, sizeof(actualVersion), "%s", requestInfo->version);
if (ret < 0 || ret >= (int)sizeof(actualVersion)) {
return false;
}
char* endPtr = NULL;
long actualMajor = strtol(actualVersion, &endPtr, 10);
if (endPtr == NULL || endPtr[0] != '.') {
return false;
}
long actualMinor = strtol(endPtr + 1, NULL, 10);
long expectedMajor = strtol(CELIX_EARPM_MSG_VERSION, &endPtr, 10);
assert(endPtr[0] == '.');
long expectedMinor = strtol(endPtr + 1, NULL, 10);

if (actualMajor == expectedMajor && actualMinor <= expectedMinor) {
return true;
}
return false;
}

static void celix_earpm_receiveMsgCallback(void* handle, const celix_earpm_client_request_info_t* requestInfo) {
assert(handle != NULL);
assert(requestInfo != NULL);
assert(requestInfo->topic != NULL);
celix_event_admin_remote_provider_mqtt_t* earpm = (celix_event_admin_remote_provider_mqtt_t*)handle;

if (!celix_earpm_isMsgCompatible(requestInfo)) {
celix_logHelper_warning(earpm->logHelper, "%s message version(%s) is incompatible.",requestInfo->topic, requestInfo->version == NULL ? "null" : requestInfo->version);
return;
}

if (strncmp(requestInfo->topic,CELIX_EARPM_TOPIC_PREFIX, sizeof(CELIX_EARPM_TOPIC_PREFIX)-1) == 0) {
celix_earpm_processControlMessage(earpm, requestInfo);
} else {// user topic
Expand Down

0 comments on commit 6a8f500

Please sign in to comment.