Skip to content

Commit

Permalink
Merge cdc77e0 into dd3d35a
Browse files Browse the repository at this point in the history
  • Loading branch information
xuzhenbao authored May 5, 2023
2 parents dd3d35a + cdc77e0 commit d84bf74
Show file tree
Hide file tree
Showing 44 changed files with 157 additions and 402 deletions.
7 changes: 3 additions & 4 deletions bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@

#include "pubsub_udpmc_common.h"

bool psa_udpmc_checkVersion(version_pt msgVersion, pubsub_udp_msg_header_t *hdr) {
bool psa_udpmc_checkVersion(celix_version_t *msgVersion, pubsub_udp_msg_header_t *hdr) {
bool check = false;

if (msgVersion != NULL) {
int major = 0, minor = 0;
version_getMajor(msgVersion,&major);
version_getMinor(msgVersion,&minor);
int major = celix_version_getMajor(msgVersion);
int minor = celix_version_getMinor(msgVersion);

if (hdr->major == ((unsigned char) major)) { /* Different major means incompatible */
check = (hdr->minor >= ((unsigned char) minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
Expand Down
4 changes: 2 additions & 2 deletions bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

#include <utils.h>

#include "version.h"
#include "celix_version.h"

typedef struct pubsub_udp_msg_header {
unsigned int type;
Expand All @@ -31,7 +31,7 @@ typedef struct pubsub_udp_msg_header {
} pubsub_udp_msg_header_t;


bool psa_udpmc_checkVersion(version_pt msgVersion, pubsub_udp_msg_header_t *hdr);
bool psa_udpmc_checkVersion(celix_version_t *msgVersion, pubsub_udp_msg_header_t *hdr);


#endif //CELIX_PUBSUB_UDPMC_COMMON_H
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ typedef struct psa_udpmc_requested_connection_entry {
} psa_udpmc_requested_connection_entry_t;

typedef struct psa_udpmc_subscriber_entry {
hash_map_t *msgTypes; //map from serializer svc
celix_long_hash_map_t *msgTypes; //map from serializer svc
hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
bool initialized; //true if the init function is called through the receive thread
} psa_udpmc_subscriber_entry_t;
Expand Down Expand Up @@ -474,7 +474,7 @@ static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub

pubsub_msg_serializer_t *msgSer = NULL;
if (entry->msgTypes != NULL) {
msgSer = hashMap_get(entry->msgTypes, (void *) (uintptr_t) msg->header.type);
msgSer = celix_longHashMap_get(entry->msgTypes, msg->header.type);
}
if (msgSer == NULL) {
L_WARN("[PSA_UDPMC] Serializer not available for message %d.\n", msg->header.type);
Expand Down Expand Up @@ -513,9 +513,8 @@ static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub
}

} else {
int major = 0, minor = 0;
version_getMajor(msgSer->msgVersion, &major);
version_getMinor(msgSer->msgVersion, &minor);
int major = celix_version_getMajor(msgSer->msgVersion);
int minor = celix_version_getMinor(msgSer->msgVersion);
L_WARN("[PSA_UDPMC] Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",
msgSer->msgName,major,minor,msg->header.major,msg->header.minor);
}
Expand Down
17 changes: 7 additions & 10 deletions bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "pubsub_psa_udpmc_constants.h"
#include "large_udp.h"
#include "pubsub_udpmc_common.h"
#include "hash_map.h"

#define FIRST_SEND_DELAY_IN_SECONDS 2

Expand Down Expand Up @@ -67,7 +68,7 @@ typedef struct psa_udpmc_bounded_service_entry {
pubsub_udpmc_topic_sender_t *parent;
pubsub_publisher_t service;
long bndId;
hash_map_t *msgTypes;
celix_long_hash_map_t *msgTypes;
hash_map_t *msgTypeIds;
int getCount;
largeUdp_t *largeUdpHandle;
Expand Down Expand Up @@ -240,9 +241,8 @@ static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t *r

int rc = sender->serializer->createSerializerMap(sender->serializer->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes);
if (rc == 0) {
hash_map_iterator_t iter = hashMapIterator_construct(entry->msgTypes);
while (hashMapIterator_hasNext(&iter)) {
pubsub_msg_serializer_t *msgSer = hashMapIterator_nextValue(&iter);
CELIX_LONG_HASH_MAP_ITERATE(entry->msgTypes, iter) {
pubsub_msg_serializer_t *msgSer = iter.value.ptrValue;
hashMap_put(entry->msgTypeIds, strndup(msgSer->msgName, 1024), (void *)(uintptr_t) msgSer->msgId);
}

Expand Down Expand Up @@ -292,7 +292,7 @@ static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId,

pubsub_msg_serializer_t* msgSer = NULL;
if (entry->msgTypes != NULL) {
msgSer = hashMap_get(entry->msgTypes, (void*)(intptr_t)(msgTypeId));
msgSer = celix_longHashMap_get(entry->msgTypes, msgTypeId);
}

if (msgSer != NULL) {
Expand All @@ -303,11 +303,8 @@ static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId,
msg_hdr->type = msgTypeId;

if (msgSer->msgVersion != NULL) {
int major = 0, minor = 0;
version_getMajor(msgSer->msgVersion, &major);
version_getMinor(msgSer->msgVersion, &minor);
msg_hdr->major = (unsigned char) major;
msg_hdr->minor = (unsigned char) minor;
msg_hdr->major = (unsigned char) celix_version_getMajor(msgSer->msgVersion);
msg_hdr->minor = (unsigned char) celix_version_getMinor(msgSer->msgVersion);
}

pubsub_udp_msg_t *msg = calloc(1, sizeof(*msg));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ typedef struct pubsub_avrobin_msg_serializer_impl {
dyn_message_type *msgType;
unsigned int msgId;
const char *msgName;
version_pt msgVersion;
celix_version_t *msgVersion;
} pubsub_avrobin_msg_serializer_impl_t;

static char *pubsubAvrobinSerializer_getMsgDescriptionDir(const celix_bundle_t *bundle);
static void pubsubAvrobinSerializer_addMsgSerializerFromBundle(const char *root, const celix_bundle_t *bundle, hash_map_pt msgTypesMap);
static void pubsubAvrobinSerializer_fillMsgSerializerMap(hash_map_pt msgTypesMap, const celix_bundle_t *bundle);
static void pubsubAvrobinSerializer_addMsgSerializerFromBundle(const char *root, const celix_bundle_t *bundle, celix_long_hash_map_t *msgTypesMap);
static void pubsubAvrobinSerializer_fillMsgSerializerMap(celix_long_hash_map_t *msgTypesMap, const celix_bundle_t *bundle);

static int pubsubMsgAvrobinSerializer_convertDescriptor(FILE* file_ptr, pubsub_msg_serializer_t* serializer);
static int pubsubMsgAvrobinSerializer_convertAvpr(FILE* file_ptr, pubsub_msg_serializer_t* serializer, const char* fqn);
Expand Down Expand Up @@ -114,11 +114,11 @@ celix_status_t pubsubAvrobinSerializer_destroy(pubsub_avrobin_serializer_t *seri
return status;
}

celix_status_t pubsubAvrobinSerializer_createSerializerMap(void *handle, const celix_bundle_t *bundle, hash_map_pt *serializerMap) {
celix_status_t pubsubAvrobinSerializer_createSerializerMap(void *handle, const celix_bundle_t *bundle, celix_long_hash_map_t **serializerMap) {
celix_status_t status = CELIX_SUCCESS;
pubsub_avrobin_serializer_t *serializer = handle;

hash_map_pt map = hashMap_create(NULL, NULL, NULL, NULL);
celix_long_hash_map_t *map = celix_longHashMap_create();

if (map != NULL) {
pubsubAvrobinSerializer_fillMsgSerializerMap(map, bundle);
Expand All @@ -134,23 +134,22 @@ celix_status_t pubsubAvrobinSerializer_createSerializerMap(void *handle, const c
return status;
}

celix_status_t pubsubAvrobinSerializer_destroySerializerMap(void *handle, hash_map_pt serializerMap) {
celix_status_t pubsubAvrobinSerializer_destroySerializerMap(void *handle, celix_long_hash_map_t *serializerMap) {
celix_status_t status = CELIX_SUCCESS;

if (serializerMap == NULL) {
return CELIX_ILLEGAL_ARGUMENT;
}

hash_map_iterator_t iter = hashMapIterator_construct(serializerMap);
while (hashMapIterator_hasNext(&iter)) {
pubsub_msg_serializer_t* msgSerializer = hashMapIterator_nextValue(&iter);
CELIX_LONG_HASH_MAP_ITERATE(serializerMap, iter){
pubsub_msg_serializer_t* msgSerializer = iter.value.ptrValue;
pubsub_avrobin_msg_serializer_impl_t *impl = msgSerializer->handle;
dynMessage_destroy(impl->msgType);
free(msgSerializer); //also contains the service struct.
free(impl);
}

hashMap_destroy(serializerMap, false, false);
celix_longHashMap_destroy(serializerMap);

return status;
}
Expand Down Expand Up @@ -260,7 +259,7 @@ static char *pubsubAvrobinSerializer_getMsgDescriptionDir(const celix_bundle_t *
return root;
}

static void pubsubAvrobinSerializer_addMsgSerializerFromBundle(const char *root, const celix_bundle_t *bundle, hash_map_pt msgTypesMap) {
static void pubsubAvrobinSerializer_addMsgSerializerFromBundle(const char *root, const celix_bundle_t *bundle, celix_long_hash_map_t *msgTypesMap) {
char fqn[MAX_PATH_LEN];
char path[MAX_PATH_LEN];
const char* entry_name = NULL;
Expand Down Expand Up @@ -304,7 +303,7 @@ static void pubsubAvrobinSerializer_addMsgSerializerFromBundle(const char *root,
}

// serializer has been constructed, try to put in the map
if (hashMap_containsKey(msgTypesMap, (void *) (uintptr_t) msgSerializer->msgId)) {
if (celix_longHashMap_hasKey(msgTypesMap, msgSerializer->msgId)) {
printf("Cannot add msg %s. clash in msg id %d!!\n", msgSerializer->msgName, msgSerializer->msgId);
dynMessage_destroy(impl->msgType);
free(msgSerializer);
Expand All @@ -316,7 +315,7 @@ static void pubsubAvrobinSerializer_addMsgSerializerFromBundle(const char *root,
free(impl);
}
else {
hashMap_put(msgTypesMap, (void *) (uintptr_t) msgSerializer->msgId, msgSerializer);
celix_longHashMap_put(msgTypesMap, msgSerializer->msgId, msgSerializer);
}
}

Expand All @@ -325,7 +324,7 @@ static void pubsubAvrobinSerializer_addMsgSerializerFromBundle(const char *root,
}
}

static void pubsubAvrobinSerializer_fillMsgSerializerMap(hash_map_pt msgTypesMap, const celix_bundle_t *bundle) {
static void pubsubAvrobinSerializer_fillMsgSerializerMap(celix_long_hash_map_t *msgTypesMap, const celix_bundle_t *bundle) {
char *root = NULL;
char *metaInfPath = NULL;

Expand Down Expand Up @@ -430,7 +429,7 @@ static int pubsubMsgAvrobinSerializer_convertDescriptor(FILE* file_ptr, pubsub_m
char* msgName = NULL;
rc += dynMessage_getName(msgType, &msgName);

version_pt msgVersion = NULL;
celix_version_t *msgVersion = NULL;
rc += dynMessage_getVersion(msgType, &msgVersion);

if (rc != 0 || msgName == NULL || msgVersion == NULL) {
Expand Down Expand Up @@ -490,13 +489,13 @@ static int pubsubMsgAvrobinSerializer_convertAvpr(FILE* file_ptr, pubsub_msg_ser

const char* msgName = dynType_getName(type);

version_pt msgVersion = NULL;
celix_status_t s = version_createVersionFromString(dynType_getMetaInfo(type, "version"), &msgVersion);
celix_version_t *msgVersion = NULL;
msgVersion = celix_version_createVersionFromString(dynType_getMetaInfo(type, "version"));

if (s != CELIX_SUCCESS || !msgName) {
if (msgVersion == NULL || !msgName) {
printf("DMU: cannot retrieve name and/or version from msg\n");
if (s == CELIX_SUCCESS) {
version_destroy(msgVersion);
if (msgVersion != NULL) {
celix_version_destroy(msgVersion);
}
return -1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ typedef struct pubsub_avrobin_serializer pubsub_avrobin_serializer_t;
celix_status_t pubsubAvrobinSerializer_create(celix_bundle_context_t *context, pubsub_avrobin_serializer_t **serializer);
celix_status_t pubsubAvrobinSerializer_destroy(pubsub_avrobin_serializer_t *serializer);

celix_status_t pubsubAvrobinSerializer_createSerializerMap(void *handle, const celix_bundle_t *bundle, hash_map_pt *serializerMap);
celix_status_t pubsubAvrobinSerializer_destroySerializerMap(void *handle, hash_map_pt serializerMap);
celix_status_t pubsubAvrobinSerializer_createSerializerMap(void *handle, const celix_bundle_t *bundle, celix_long_hash_map_t **serializerMap);
celix_status_t pubsubAvrobinSerializer_destroySerializerMap(void *handle, celix_long_hash_map_t *serializerMap);

#endif /* PUBSUB_SERIALIZER_AVROBIN_H_ */
38 changes: 18 additions & 20 deletions bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ typedef struct pubsub_json_msg_serializer_impl {

unsigned int msgId;
const char* msgName;
version_pt msgVersion;
celix_version_t *msgVersion;
} pubsub_json_msg_serializer_impl_t;

static char* pubsubSerializer_getMsgDescriptionDir(const celix_bundle_t *bundle);
static void pubsubSerializer_addMsgSerializerFromBundle(pubsub_json_serializer_t* serializer, const char *root, const celix_bundle_t *bundle, hash_map_pt msgSerializers);
static void pubsubSerializer_fillMsgSerializerMap(pubsub_json_serializer_t* serializer, hash_map_pt msgSerializers, const celix_bundle_t *bundle);
static void pubsubSerializer_addMsgSerializerFromBundle(pubsub_json_serializer_t* serializer, const char *root, const celix_bundle_t *bundle, celix_long_hash_map_t *msgSerializers);
static void pubsubSerializer_fillMsgSerializerMap(pubsub_json_serializer_t* serializer, celix_long_hash_map_t *msgSerializers, const celix_bundle_t *bundle);

static int pubsubMsgSerializer_convertDescriptor(pubsub_json_serializer_t* serializer, FILE* file_ptr, pubsub_msg_serializer_t* msgSerializer);
static int pubsubMsgSerializer_convertAvpr(pubsub_json_serializer_t *serializer, FILE* file_ptr, pubsub_msg_serializer_t* msgSerializer, const char* fqn);
Expand Down Expand Up @@ -125,32 +125,31 @@ celix_status_t pubsubSerializer_destroy(pubsub_json_serializer_t* serializer) {
return status;
}

celix_status_t pubsubSerializer_createSerializerMap(void *handle, const celix_bundle_t *bundle, hash_map_pt* serializerMap) {
celix_status_t pubsubSerializer_createSerializerMap(void *handle, const celix_bundle_t *bundle, celix_long_hash_map_t **serializerMap) {
pubsub_json_serializer_t *serializer = handle;

hash_map_pt map = hashMap_create(NULL, NULL, NULL, NULL);
celix_long_hash_map_t *map = celix_longHashMap_create();
pubsubSerializer_fillMsgSerializerMap(serializer, map, bundle);
*serializerMap = map;
return CELIX_SUCCESS;
}

celix_status_t pubsubSerializer_destroySerializerMap(void* handle __attribute__((unused)), hash_map_pt serializerMap) {
celix_status_t pubsubSerializer_destroySerializerMap(void* handle __attribute__((unused)), celix_long_hash_map_t *serializerMap) {
celix_status_t status = CELIX_SUCCESS;
//pubsub_json_serializer_t *serializer = handle;
if (serializerMap == NULL) {
return CELIX_ILLEGAL_ARGUMENT;
}

hash_map_iterator_t iter = hashMapIterator_construct(serializerMap);
while (hashMapIterator_hasNext(&iter)) {
pubsub_msg_serializer_t* msgSerializer = hashMapIterator_nextValue(&iter);
CELIX_LONG_HASH_MAP_ITERATE(serializerMap, iter){
pubsub_msg_serializer_t* msgSerializer = iter.value.ptrValue;
pubsub_json_msg_serializer_impl_t *impl = msgSerializer->handle;
dynMessage_destroy(impl->msgType);
free(msgSerializer); //also contains the service struct.
free(impl);
}

hashMap_destroy(serializerMap, false, false);
celix_longHashMap_destroy(serializerMap);

return status;
}
Expand Down Expand Up @@ -227,7 +226,7 @@ void pubsubMsgSerializer_freeDeserializeMsg(void* handle, void *msg) {
}


static void pubsubSerializer_fillMsgSerializerMap(pubsub_json_serializer_t* serializer, hash_map_pt msgSerializers, const celix_bundle_t *bundle) {
static void pubsubSerializer_fillMsgSerializerMap(pubsub_json_serializer_t* serializer, celix_long_hash_map_t *msgSerializers, const celix_bundle_t *bundle) {
char* root = NULL;
char* metaInfPath = NULL;

Expand Down Expand Up @@ -270,7 +269,7 @@ static char* pubsubSerializer_getMsgDescriptionDir(const celix_bundle_t *bundle)
return root;
}

static void pubsubSerializer_addMsgSerializerFromBundle(pubsub_json_serializer_t* serializer, const char *root, const celix_bundle_t *bundle, hash_map_pt msgSerializers) {
static void pubsubSerializer_addMsgSerializerFromBundle(pubsub_json_serializer_t* serializer, const char *root, const celix_bundle_t *bundle, celix_long_hash_map_t *msgSerializers) {
char fqn[MAX_PATH_LEN];
char pathOrError[MAX_PATH_LEN];
const char* entry_name = NULL;
Expand Down Expand Up @@ -319,7 +318,7 @@ static void pubsubSerializer_addMsgSerializerFromBundle(pubsub_json_serializer_t
}

// serializer has been constructed, try to put in the map
if (hashMap_containsKey(msgSerializers, (void *) (uintptr_t) msgSerializer->msgId)) {
if (celix_longHashMap_hasKey(msgSerializers, msgSerializer->msgId)) {
L_WARN("Cannot add msg %s. Clash is msg id %d!\n", msgSerializer->msgName, msgSerializer->msgId);
dynMessage_destroy(impl->msgType);
free(msgSerializer);
Expand All @@ -331,7 +330,7 @@ static void pubsubSerializer_addMsgSerializerFromBundle(pubsub_json_serializer_t
free(impl);
}
else {
hashMap_put(msgSerializers, (void *) (uintptr_t) msgSerializer->msgId, msgSerializer);
celix_longHashMap_put(msgSerializers, msgSerializer->msgId, msgSerializer);
}
}

Expand Down Expand Up @@ -425,7 +424,7 @@ static int pubsubMsgSerializer_convertDescriptor(pubsub_json_serializer_t* seria
char *msgName = NULL;
rc += dynMessage_getName(msgType, &msgName);

version_pt msgVersion = NULL;
celix_version_t *msgVersion = NULL;
rc += dynMessage_getVersion(msgType, &msgVersion);

if (rc != 0 || msgName == NULL || msgVersion == NULL) {
Expand Down Expand Up @@ -487,13 +486,12 @@ static int pubsubMsgSerializer_convertAvpr(pubsub_json_serializer_t *serializer,

const char *msgName = dynType_getName(type);

version_pt msgVersion = NULL;
celix_status_t s = version_createVersionFromString(dynType_getMetaInfo(type, "version"), &msgVersion);
celix_version_t *msgVersion = celix_version_createVersionFromString(dynType_getMetaInfo(type, "version"));

if (s != CELIX_SUCCESS || !msgName) {
if (msgVersion == NULL || !msgName) {
L_WARN("[json serializer] Cannot retrieve name and/or version from msg\n");
if (s == CELIX_SUCCESS) {
version_destroy(msgVersion);
if (msgVersion != NULL) {
celix_version_destroy(msgVersion);
}
return -1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ typedef struct pubsub_json_serializer pubsub_json_serializer_t;
celix_status_t pubsubSerializer_create(celix_bundle_context_t *context, pubsub_json_serializer_t **serializer);
celix_status_t pubsubSerializer_destroy(pubsub_json_serializer_t* serializer);

celix_status_t pubsubSerializer_createSerializerMap(void *handle, const celix_bundle_t *bundle, hash_map_pt* serializerMap);
celix_status_t pubsubSerializer_destroySerializerMap(void *handle, hash_map_pt serializerMap);
celix_status_t pubsubSerializer_createSerializerMap(void *handle, const celix_bundle_t *bundle, celix_long_hash_map_t **serializerMap);
celix_status_t pubsubSerializer_destroySerializerMap(void *handle, celix_long_hash_map_t *serializerMap);

void pubsubSerializer_onInstalled(void *handle, const celix_bundle_t *bundle);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,6 @@
extern "C" {
#endif

#include "hash_map.h"
#include "version.h"
#include "celix_bundle.h"
#include "sys/uio.h"

#define PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME "pubsub_message_serialization_marker"
#define PUBSUB_MESSAGE_SERIALIZATION_MARKER_VERSION "1.0.0"
#define PUBSUB_MESSAGE_SERIALIZATION_MARKER_RANGE "[1,2)"
Expand Down
Loading

0 comments on commit d84bf74

Please sign in to comment.