Skip to content

Commit

Permalink
Resolve issues raised in code reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
xuzhenbao committed Dec 6, 2024
1 parent 8d490bc commit 65fead3
Show file tree
Hide file tree
Showing 30 changed files with 353 additions and 1,888 deletions.
7 changes: 4 additions & 3 deletions bundles/event_admin/event_admin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ event admin pubsub model, and events are delivered asynchronously.

### Properties/Configuration

| **Properties** | **Type** | **Description** | **Default value** |
|----------------------------------------|----------|---------------------------------------------------------------|-----|
| **CELIX_EVENT_ADMIN_HANDLER_THREADS** | long | The number of event handler threads. Its maximum value is 20. | 5 |
| **Properties** | **Type** | **Description** | **Default value** |
|----------------------------------------------------------|----------|---------------------------------------------------------------|-------------------|
| **CELIX_EVENT_ADMIN_HANDLER_THREADS** | long | The number of event handler threads. Its maximum value is 20. | 5 |
| **CELIX_EVENT_ADMIN_EVENT_SEQID_CACHE_CLEANUP_INTERVAL** | long | The event sequence id cache will be cleaned up when it has not been used for this interval. The unit is seconds. The event sequence id cache is used to prevent duplicate events. | (60*60)s |

### Software Design

Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ EventAdmin1->EventAdmin1:Delivery event to local event handlers
alt "celix.event.remote.enable" is true
EventAdmin1->EventAdmin1:Unset the "celix.event.remote.enable" property
EventAdmin1->RemoteProvider1:postEvent/sendEvent
RemoteProvider1->RemoteProvider2:IPC
RemoteProvider1->RemoteProvider2:IPC or Network
RemoteProvider2->EventAdmin2:postEvent/sendEvent
EventAdmin2 -> EventAdmin2:Delivery event to event handlers in FrameworkB
end alt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cstdlib>
#include <future>
#include <climits>
#include <thread>

#include <gtest/gtest.h>
#include "CelixEventAdminTestSuiteBaseClass.h"
Expand Down Expand Up @@ -91,6 +92,18 @@ TEST_F(CelixEventAdminTestSuite, CreateEventAdminWithMaxHandlerThreadNrTest) {
unsetenv("CELIX_EVENT_ADMIN_HANDLER_THREADS");
}

TEST_F(CelixEventAdminTestSuite, CreateEventAdminWithInvalidSeqIdCacheCleanupIntervalTest) {
setenv("CELIX_EVENT_ADMIN_EVENT_SEQID_CACHE_CLEANUP_INTERVAL", "0", 1);
auto ea = celix_eventAdmin_create(ctx.get());
EXPECT_TRUE(ea == nullptr);

setenv("CELIX_EVENT_ADMIN_EVENT_SEQID_CACHE_CLEANUP_INTERVAL", "-1", 1);
ea = celix_eventAdmin_create(ctx.get());
EXPECT_TRUE(ea == nullptr);

unsetenv("CELIX_EVENT_ADMIN_EVENT_SEQID_CACHE_CLEANUP_INTERVAL");
}

TEST_F(CelixEventAdminTestSuite, StartStopEventAdminTest) {
auto ea = celix_eventAdmin_create(ctx.get());
EXPECT_TRUE(ea != nullptr);
Expand Down Expand Up @@ -501,30 +514,6 @@ TEST_F(CelixEventAdminTestSuite, AddRemoteProviderServiceTest) {
});
}

TEST_F(CelixEventAdminTestSuite, AddRemoteProviderServiceWithoutServiceIdTest) {
TestEventAdmin([](celix_event_admin_t* ea, celix_bundle_context_t*) {
celix_event_remote_provider_service_t remoteProviderService;
remoteProviderService.handle = nullptr;
remoteProviderService.postEvent = [](void*, const char*, const celix_properties_t*) { return CELIX_SUCCESS;};
remoteProviderService.sendEvent = [](void*, const char*, const celix_properties_t*) { return CELIX_SUCCESS;};
celix_autoptr(celix_properties_t) props = celix_properties_create();
auto status = celix_eventAdmin_addRemoteProviderService(ea, &remoteProviderService, props);
EXPECT_EQ(CELIX_ILLEGAL_ARGUMENT, status);
});
}

TEST_F(CelixEventAdminTestSuite, RemoveRemoteProviderServiceWithoutServiceIdTest) {
TestEventAdmin([](celix_event_admin_t* ea, celix_bundle_context_t*) {
celix_event_remote_provider_service_t remoteProviderService;
remoteProviderService.handle = nullptr;
remoteProviderService.postEvent = [](void*, const char*, const celix_properties_t*) { return CELIX_SUCCESS;};
remoteProviderService.sendEvent = [](void*, const char*, const celix_properties_t*) { return CELIX_SUCCESS;};
celix_autoptr(celix_properties_t) props = celix_properties_create();
auto status = celix_eventAdmin_removeRemoteProviderService(ea, &remoteProviderService, props);
EXPECT_EQ(CELIX_ILLEGAL_ARGUMENT, status);
});
}

TEST_F(CelixEventAdminTestSuite, PostRemoteEnableEventTest) {
std::atomic<bool> remoteProviderCalled{false};
celix_event_remote_provider_service_t remoteProviderService;
Expand All @@ -535,7 +524,7 @@ TEST_F(CelixEventAdminTestSuite, PostRemoteEnableEventTest) {
EXPECT_STREQ("org/celix/test", topic);
EXPECT_NE(nullptr, celix_properties_get(props, CELIX_EVENT_REMOTE_FRAMEWORK_UUID, nullptr));
long seqId = celix_properties_getAsLong(props, CELIX_EVENT_REMOTE_SEQ_ID, -1L);
EXPECT_GT(seqId, 0);
EXPECT_GE(seqId, 0);
EXPECT_FALSE(celix_properties_getBool(props, CELIX_EVENT_REMOTE_ENABLE, false));
return CELIX_SUCCESS;
};
Expand All @@ -562,7 +551,7 @@ TEST_F(CelixEventAdminTestSuite, SendRemoteEnableEventTest) {
EXPECT_STREQ("org/celix/test", topic);
EXPECT_NE(nullptr, celix_properties_get(props, CELIX_EVENT_REMOTE_FRAMEWORK_UUID, nullptr));
long seqId = celix_properties_getAsLong(props, CELIX_EVENT_REMOTE_SEQ_ID, -1L);
EXPECT_GT(seqId, 0);
EXPECT_GE(seqId, 0);
EXPECT_FALSE(celix_properties_getBool(props, CELIX_EVENT_REMOTE_ENABLE, false));
return CELIX_SUCCESS;
};
Expand Down Expand Up @@ -704,6 +693,41 @@ TEST_F(CelixEventAdminTestSuite, SendDuplicateRemoteEventTest) {
EXPECT_EQ(1, receivedEventCount);
}

TEST_F(CelixEventAdminTestSuite, CleanupOldEventSeqIdCacheTest) {
setenv("CELIX_EVENT_ADMIN_EVENT_SEQID_CACHE_CLEANUP_INTERVAL", "1", 1);

int receivedEventCount = 0;
TestPublishEvent("org/celix/test", nullptr, [](celix_event_admin_t *ea) {
celix_autoptr(celix_properties_t) eventProps = celix_properties_create();
celix_properties_set(eventProps, CELIX_EVENT_REMOTE_FRAMEWORK_UUID, "9748f803-5766-49f1-a2e9-9bbb522e874a");
celix_properties_setLong(eventProps, CELIX_EVENT_REMOTE_SEQ_ID, 0);
auto status = celix_eventAdmin_sendEvent(ea, "org/celix/test", eventProps);
EXPECT_EQ(CELIX_SUCCESS, status);

std::this_thread::sleep_for(std::chrono::milliseconds(1500));

for (int i = 0; i < 16 ; ++i) {
char fwuuid[64] = {0};
snprintf(fwuuid, 64, "fw-uuid-%d", i);
celix_properties_set(eventProps, CELIX_EVENT_REMOTE_FRAMEWORK_UUID, fwuuid);
status = celix_eventAdmin_sendEvent(ea, "org/celix/test", eventProps);
EXPECT_EQ(CELIX_SUCCESS, status);
}

celix_properties_set(eventProps, CELIX_EVENT_REMOTE_FRAMEWORK_UUID, "9748f803-5766-49f1-a2e9-9bbb522e874a");
status = celix_eventAdmin_sendEvent(ea, "org/celix/test", eventProps);
EXPECT_EQ(CELIX_SUCCESS, status);

}, [&receivedEventCount](void*, const char* topic, const celix_properties_t*) {
EXPECT_STREQ("org/celix/test", topic);
receivedEventCount++;
return CELIX_SUCCESS;
});
EXPECT_EQ(18, receivedEventCount);

unsetenv("CELIX_EVENT_ADMIN_EVENT_SEQID_CACHE_CLEANUP_INTERVAL");
}

TEST_F(CelixEventAdminTestSuite, SendRemoteEventWhichFromDifferentFrameworkTest) {
int receivedEventCount = 0;
TestPublishEvent("org/celix/test", nullptr, [](celix_event_admin_t *ea) {
Expand Down Expand Up @@ -788,20 +812,6 @@ TEST_F(CelixEventAdminTestSuite, EventHandlerNoTopicTest) {
});
}

TEST_F(CelixEventAdminTestSuite, EventHandlerNoServiceIdTest) {
TestAddEventHandler([](void *handle, void *svc, const celix_properties_t *props) {
std::unique_ptr<celix_properties_t, decltype(&celix_properties_destroy)> propsCopy{celix_properties_copy(props), celix_properties_destroy};
celix_properties_unset(propsCopy.get(), CELIX_FRAMEWORK_SERVICE_ID);
auto status = celix_eventAdmin_addEventHandlerWithProperties(handle, svc, propsCopy.get());
EXPECT_EQ(CELIX_ILLEGAL_ARGUMENT, status);
}, [](void *handle, void *svc, const celix_properties_t *props) {
std::unique_ptr<celix_properties_t, decltype(&celix_properties_destroy)> propsCopy{celix_properties_copy(props), celix_properties_destroy};
celix_properties_unset(propsCopy.get(), CELIX_FRAMEWORK_SERVICE_ID);
auto status = celix_eventAdmin_removeEventHandlerWithProperties(handle, svc, propsCopy.get());
EXPECT_EQ(CELIX_ILLEGAL_ARGUMENT, status);
});
}

TEST_F(CelixEventAdminTestSuite, EventHandlerWithInvalidFilterTopicTest) {
TestAddEventHandler([](void *handle, void *svc, const celix_properties_t *props) {
std::unique_ptr<celix_properties_t, decltype(&celix_properties_destroy)> propsCopy{celix_properties_copy(props), celix_properties_destroy};
Expand Down
64 changes: 30 additions & 34 deletions bundles/event_admin/event_admin/src/celix_event_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
#include "celix_event_admin.h"

#include <limits.h>
#include <string.h>
#include <stdbool.h>
#include <assert.h>
Expand All @@ -38,6 +39,8 @@
#define CELIX_EVENT_ADMIN_MAX_HANDLER_THREADS 20
#define CELIX_EVENT_ADMIN_HANDLER_THREADS_DEFAULT 5

#define CELIX_EVENT_ADMIN_EVENT_SEQID_CACHE_CLEANUP_INTERVAL_DFT (60*60) //1h

//Belows parameters are not configurable, consider its configurability until a real need arises.
#define CELIX_EVENT_ADMIN_MAX_PARALLEL_EVENTS_OF_HANDLER(handlerThNr) ((handlerThNr)/3 + 1) //max parallel async event for a single handler
#define CELIX_EVENT_ADMIN_MAX_HANDLE_EVENT_TIME 60 //seconds
Expand All @@ -53,34 +56,35 @@ typedef struct celix_event_handler {
celix_filter_t* eventFilter;
bool blackListed;//Blacklisted handlers must not be notified of any events.
unsigned int handlingAsyncEventCnt;
}celix_event_handler_t;
} celix_event_handler_t;

typedef struct celix_event_channel {
celix_array_list_t* eventHandlerSvcIdList;
}celix_event_channel_t;
} celix_event_channel_t;

typedef struct celix_event_entry {
celix_event_t* event;
celix_long_hash_map_t* eventHandlers;//key: event handler service id, value: null
}celix_event_entry_t;
} celix_event_entry_t;

typedef struct celix_event_seq_id_cache {
struct timespec lastModified;
long seqIdBuffer[CELIX_EVENT_ADMIN_MAX_EVENT_SEQ_ID_CACHE_SIZE];
}celix_event_seq_id_cache_t;
} celix_event_seq_id_cache_t;

struct celix_event_admin {
celix_bundle_context_t* ctx;
celix_log_helper_t* logHelper;
unsigned int handlerThreadNr;
long seqIdCacheCleanupInterval;
const char* fwUUID;
long nextSeqId;
celix_thread_rwlock_t lock;//projects: channels,eventHandlers,eventSeqIdCache,remoteProviderServices
celix_event_channel_t channelMatchingAllEvents;
celix_string_hash_map_t* channelsMatchingTopic; //key: topic, value: celix_event_channel_t *
celix_string_hash_map_t* channelsMatchingPrefixTopic;//key:prefix topic, value: celix_event_channel_t *
celix_long_hash_map_t* eventHandlers;//key: event handler service id, value: celix_event_handler_t*
celix_string_hash_map_t* eventSeqIdCache;//key: remote framework uuid, value: celix_event_seq_id_cache_t*
long nextSeqId;
celix_long_hash_map_t* remoteProviderServices;//key: service id, value: celix_event_remote_provider_service_t*
celix_thread_mutex_t eventsMutex;// protect belows
celix_thread_cond_t eventsTriggerCond;
Expand All @@ -99,7 +103,7 @@ celix_event_admin_t* celix_eventAdmin_create(celix_bundle_context_t* ctx) {
}
ea->ctx = ctx;
ea->threadsRunning = false;
ea->nextSeqId = 1;
ea->nextSeqId = 0;

celix_autoptr(celix_log_helper_t) logHelper = ea->logHelper = celix_logHelper_create(ctx, "CelixEventAdmin");
if (logHelper == NULL) {
Expand All @@ -115,6 +119,11 @@ celix_event_admin_t* celix_eventAdmin_create(celix_bundle_context_t* ctx) {
celix_logHelper_error(logHelper, "CELIX_EVENT_ADMIN_HANDLER_THREADS is set to %i, but max is %i.", ea->handlerThreadNr, CELIX_EVENT_ADMIN_MAX_HANDLER_THREADS);
return NULL;
}
ea->seqIdCacheCleanupInterval = celix_bundleContext_getPropertyAsLong(ctx, "CELIX_EVENT_ADMIN_EVENT_SEQID_CACHE_CLEANUP_INTERVAL", CELIX_EVENT_ADMIN_EVENT_SEQID_CACHE_CLEANUP_INTERVAL_DFT);
if (ea->seqIdCacheCleanupInterval <= 0) {
celix_logHelper_error(logHelper, "CELIX_EVENT_ADMIN_EVENT_SEQID_CACHE_CLEANUP_INTERVAL is set to %ld, but must be > 0.", ea->seqIdCacheCleanupInterval);
return NULL;
}
celix_status_t status = celixThreadRwlock_create(&ea->lock, NULL);
if (status != CELIX_SUCCESS) {
celix_logHelper_error(logHelper, "Failed to create event admin lock.");
Expand Down Expand Up @@ -355,10 +364,7 @@ int celix_eventAdmin_addEventHandlerWithProperties(void* handle, void* svc, cons
return CELIX_ILLEGAL_ARGUMENT;
}
long serviceId = celix_properties_getAsLong(props, CELIX_FRAMEWORK_SERVICE_ID, -1L);
if (serviceId < 0) {
celix_logHelper_error(ea->logHelper, "Event handler service id is empty.");
return CELIX_ILLEGAL_ARGUMENT;
}
assert(serviceId >= 0);

celix_autofree celix_event_handler_t *handler = calloc(1, sizeof(*handler));
if (handler == NULL) {
Expand Down Expand Up @@ -436,10 +442,7 @@ int celix_eventAdmin_removeEventHandlerWithProperties(void* handle, void* svc, c
celix_event_admin_t* ea = handle;

long serviceId = celix_properties_getAsLong(props, CELIX_FRAMEWORK_SERVICE_ID, -1L);
if (serviceId < 0) {
celix_logHelper_error(ea->logHelper, "Event handler service id is empty.");
return CELIX_ILLEGAL_ARGUMENT;
}
assert(serviceId >= 0);

celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = celixRwlockWlockGuard_init(&ea->lock);
celix_event_handler_t *handler = celix_longHashMap_get(ea->eventHandlers, serviceId);
Expand All @@ -459,10 +462,8 @@ int celix_eventAdmin_addRemoteProviderService(void* handle, void* svc, const cel
assert(svc != NULL);
celix_event_admin_t* ea = handle;
long serviceId = celix_properties_getAsLong(props, CELIX_FRAMEWORK_SERVICE_ID, -1L);
if (serviceId < 0) {
celix_logHelper_error(ea->logHelper, "Remote provider service id is invalid.");
return CELIX_ILLEGAL_ARGUMENT;
}
assert(serviceId >= 0);

celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = celixRwlockWlockGuard_init(&ea->lock);
celix_longHashMap_put(ea->remoteProviderServices, serviceId, svc);
return CELIX_SUCCESS;
Expand All @@ -473,21 +474,19 @@ int celix_eventAdmin_removeRemoteProviderService(void* handle, void* svc, const
assert(svc != NULL);
celix_event_admin_t* ea = handle;
long serviceId = celix_properties_getAsLong(props, CELIX_FRAMEWORK_SERVICE_ID, -1L);
if (serviceId < 0) {
celix_logHelper_error(ea->logHelper, "Remote provider service id is invalid.");
return CELIX_ILLEGAL_ARGUMENT;
}
assert(serviceId >= 0);

celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = celixRwlockWlockGuard_init(&ea->lock);
celix_longHashMap_remove(ea->remoteProviderServices, serviceId);
return CELIX_SUCCESS;
}

static void celix_eventAdmin_retrieveLongTimeUnusedEventSeqIdCache(celix_event_admin_t* ea) {
static void celix_eventAdmin_cleanupOldEventSeqIdCache(celix_event_admin_t* ea) {
if (celix_stringHashMap_size(ea->eventSeqIdCache) > 16) {
celix_string_hash_map_iterator_t iter = celix_stringHashMap_begin(ea->eventSeqIdCache);
while (!celix_stringHashMapIterator_isEnd(&iter)) {
celix_event_seq_id_cache_t* cache = iter.value.ptrValue;
if (celix_elapsedtime(CLOCK_MONOTONIC, cache->lastModified) > 60*60/*1h*/) {
if (celix_elapsedtime(CLOCK_MONOTONIC, cache->lastModified) > ea->seqIdCacheCleanupInterval) {
celix_stringHashMapIterator_remove(&iter);
} else {
celix_stringHashMapIterator_next(&iter);
Expand All @@ -503,7 +502,7 @@ static bool celix_eventAdmin_isDuplicateEvent(celix_event_admin_t* ea, const cha
return false;
}
long seqId = celix_properties_getAsLong(properties, CELIX_EVENT_REMOTE_SEQ_ID, -1L);
if (seqId <= 0) {
if (seqId < 0) {
return false;
}
long seqIdMod = seqId % CELIX_EVENT_ADMIN_MAX_EVENT_SEQ_ID_CACHE_SIZE;
Expand All @@ -515,6 +514,7 @@ static bool celix_eventAdmin_isDuplicateEvent(celix_event_admin_t* ea, const cha
celix_logHelper_error(ea->logHelper, "Failed to create event seq id cache for %s.", remoteFwUUID);
return false;
}
memset(cache->seqIdBuffer, -1, sizeof(cache->seqIdBuffer));
celix_status_t status = celix_stringHashMap_put(ea->eventSeqIdCache, remoteFwUUID, cache);
if (status != CELIX_SUCCESS) {
celix_logHelper_error(ea->logHelper, "Failed to add event seq id cache for %s.", remoteFwUUID);
Expand All @@ -528,18 +528,14 @@ static bool celix_eventAdmin_isDuplicateEvent(celix_event_admin_t* ea, const cha
}
seqIdCache->seqIdBuffer[seqIdMod] = seqId;

celix_eventAdmin_retrieveLongTimeUnusedEventSeqIdCache(ea);
celix_eventAdmin_cleanupOldEventSeqIdCache(ea);

return false;
}

static long celix_eventAdmin_getEventSeqId(celix_event_admin_t* ea) {
celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = celixRwlockWlockGuard_init(&ea->lock);
long seqId = ea->nextSeqId++;
if (seqId <= 0) {
seqId = 1;
ea->nextSeqId = seqId + 1;
}
long seqId = __atomic_fetch_add(&ea->nextSeqId, 1, __ATOMIC_RELAXED);
seqId = seqId & LONG_MAX;//avoid negative seq id when overflow
return seqId;
}

Expand Down Expand Up @@ -572,7 +568,7 @@ static int celix_eventAdmin_deliverEventToRemote(celix_event_admin_t* ea, const
status = remoteProvider->sendEvent(remoteProvider->handle, topic, remoteProps);
}
if (status != CELIX_SUCCESS) {
celix_logHelper_error(ea->logHelper, "Failed to deliver event %s to remote provider(%ld).", topic, iter.key);
celix_logHelper_warning(ea->logHelper, "Failed to deliver event %s to remote provider(%ld).", topic, iter.key);
}
}

Expand Down Expand Up @@ -673,7 +669,7 @@ static void celix_eventAdmin_deliverEventToHandler(celix_event_admin_t* ea, cons
//If a Log Service is available, the exception should be logged.
//Once the exception has been caught and dealt with, the event delivery must continue with the next handlers to be notified, if any.
//See https://docs.osgi.org/specification/osgi.cmpn/7.0.0/service.event.html#d0e47600
celix_logHelper_error(ea->logHelper, "Failed to handle event %s for handler(%s)", topic, eventHandler->serviceDescription);
celix_logHelper_warning(ea->logHelper, "Failed to handle event %s for handler(%s)", topic, eventHandler->serviceDescription);
}
double elapsedTime = celix_elapsedtime(CLOCK_MONOTONIC, startTime);
if (elapsedTime > CELIX_EVENT_ADMIN_MAX_HANDLE_EVENT_TIME) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ typedef struct celix_event_admin_service {
* @return Status code indicating failure or success. CELIX_SUCCESS if no errors are encountered. If an error is encountered, it should be a celix errno.
*/
celix_status_t (*sendEvent)(void* handle, const char* topic, const celix_properties_t* properties);
}celix_event_admin_service_t;
} celix_event_admin_service_t;

#ifdef __cplusplus
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ extern "C" {
* - 0: At most once delivery
* - 1: At least once delivery
* - 2: Exactly once delivery
*
* The value of QOS will not impact whether `sendEvent` method of event admin will return successfully or not.
* But it will impact the remote provider's behavior. For example, if the remote provider has not established a connection,
* it maybe immediately return ENOTCONN if the QOS value is 0.(The event admin will not forward the error to the caller of `sendEvent` method.)
* If the QOS value is greater than 0, it will wait until the connection is established before sending.
*/
#define CELIX_EVENT_REMOTE_QOS "celix.event.remote.qos"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ typedef struct celix_event_handler_service {
* @see CELIX_EVENT_TOPIC, CELIX_EVENT_FILTER, CELIX_EVENT_DELIVERY
*/
celix_status_t (*handleEvent)(void* handle, const char* topic, const celix_properties_t* properties);
}celix_event_handler_service_t;
} celix_event_handler_service_t;

#ifdef __cplusplus
}
Expand Down
Loading

0 comments on commit 65fead3

Please sign in to comment.