Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add event admin remote provider based on mqtt #773

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
70f69fa
Add remote event admin based on mqtt
xuzhenbao Jun 18, 2024
f586be6
Improve remote event admin provider
xuzhenbao Aug 2, 2024
fd38073
Use discovery to announce mqtt broker info
xuzhenbao Aug 7, 2024
ed4ca51
Rename some function name
xuzhenbao Aug 7, 2024
55a365d
Add some unit test
xuzhenbao Aug 12, 2024
9ffc915
Improve the interface of celix_earpm_client
xuzhenbao Aug 22, 2024
56172a0
Add unit test for remote_event_admin_mqtt
xuzhenbao Sep 5, 2024
b17188e
Add event_admin_spi
xuzhenbao Sep 5, 2024
0cfd805
Get service configuration type from endpoint listener service socpe p…
xuzhenbao Sep 5, 2024
d435fab
Integrate remote provider to event admin
xuzhenbao Sep 5, 2024
7c6f11e
Improve code and add some comments
xuzhenbao Sep 30, 2024
9f3afbd
Add README document for the event admin remote provider based on MQTT
xuzhenbao Oct 8, 2024
7e5be93
Add conan testing for the event admin remote provider based on MQTT
xuzhenbao Oct 8, 2024
dd88ad1
Revert the code that was submitted by mistake
xuzhenbao Oct 8, 2024
95cdde5
Merge branch 'master' of https://github.com/xuzhenbao/celix into remo…
xuzhenbao Oct 8, 2024
9df591d
Merge master into remote_event_admin
xuzhenbao Oct 8, 2024
368045b
Try to trigger online workflows
xuzhenbao Oct 8, 2024
16402f0
Resolve testing failure of conan_test_package_v2
xuzhenbao Oct 9, 2024
6a8f500
Add message version verification for remote_provider_mqtt
xuzhenbao Oct 10, 2024
97e1618
Enable verbose output for macos tests
xuzhenbao Oct 10, 2024
54aa334
Resolve unit test failures in macos
xuzhenbao Oct 10, 2024
39e8ee5
Revert "Enable verbose output for macos tests"
xuzhenbao Oct 10, 2024
8d490bc
Add debug command for event admin remote provider based on MQTT
xuzhenbao Oct 10, 2024
65fead3
Resolve issues raised in code reviews
xuzhenbao Dec 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bundles/event_admin/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
celix_subproject(EVENT_ADMIN "Option to enable building the Event Admin bundles" ON)
if (EVENT_ADMIN)
add_subdirectory(event_admin_api)
add_subdirectory(event_admin_spi)
add_subdirectory(event_admin)
add_subdirectory(remote_provider)
add_subdirectory(examples)
endif()

3 changes: 2 additions & 1 deletion bundles/event_admin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,5 @@ If we want to build the event admin examples, the cmake option `BUILD_EVENT_ADMI

## Event Admin Bundles

* [EventAdmin](event_admin/README.md) - The event admin implementation.
* [EventAdmin](event_admin/README.md) - The event admin implementation.
* [RemoteProviders](remote_provider/README.md) - The remote providers implementation for the event admin. It is used to deliver events to remote frameworks. It is not a part of the OSGi Event Admin specification.
1 change: 1 addition & 0 deletions bundles/event_admin/event_admin/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ set(EVENT_ADMIN_SRC

set(EVENT_ADMIN_DEPS
Celix::event_admin_api
Celix::event_admin_spi
Celix::log_helper
Celix::framework
Celix::utils
Expand Down
18 changes: 15 additions & 3 deletions bundles/event_admin/event_admin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ event admin pubsub model, and events are delivered asynchronously.

### Properties/Configuration

| **Properties** | **Type** | **Description** | **Default value** |
|----------------------------------------|----------|---------------------------------------------------------------|-----|
| **CELIX_EVENT_ADMIN_HANDLER_THREADS** | long | The number of event handler threads. Its maximum value is 20. | 5 |
| **Properties** | **Type** | **Description** | **Default value** |
|----------------------------------------------------------|----------|---------------------------------------------------------------|-------------------|
| **CELIX_EVENT_ADMIN_HANDLER_THREADS** | long | The number of event handler threads. Its maximum value is 20. | 5 |
| **CELIX_EVENT_ADMIN_EVENT_SEQID_CACHE_CLEANUP_INTERVAL** | long | The event sequence id cache will be cleaned up when it has not been used for this interval. The unit is seconds. The event sequence id cache is used to prevent duplicate events. | (60*60)s |

### Software Design

Expand All @@ -63,6 +64,17 @@ at most one event-delivery thread at a time, so that events can be delivered in
"event.delivery" property to "async.unordered", the event handler can hold multiple event-delivery threads at the same
time, so that events can be delivered in parallel.

#### Remote Event Delivery

If the event property "celix.event.remote.enable" is set to true, the event will be delivered to the local event handlers
and remote event handlers. For delivering events to local event handlers, it can refer to the section of synchronous delivery
and asynchronous delivery. For delivering events to remote event handlers, event admin will forward the event to the
[remote provider](../remote_provider/README.md). The remote provider will serialize the event and send it to the remote framework.
The remote framework will deserialize the event and deliver it to the remote event handler. The diagram of remote event delivery
is as follows:

![remote_delivery_seq.png](diagrams/remote_event_delivery_seq.png)


#### Event Adapter

Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

@startuml
'https://plantuml.com/sequence-diagram

box FrameworkA
participant "Event Admin" as EventAdmin1
participant "Remote Provider" as RemoteProvider1
end box

box FrameworkB
participant "Remote Provider" as RemoteProvider2
participant "Event Admin" as EventAdmin2
end box

-\EventAdmin1:postEvent/sendEvent
EventAdmin1->EventAdmin1:Delivery event to local event handlers
alt "celix.event.remote.enable" is true
EventAdmin1->EventAdmin1:Unset the "celix.event.remote.enable" property
EventAdmin1->RemoteProvider1:postEvent/sendEvent
RemoteProvider1->RemoteProvider2:IPC or Network
RemoteProvider2->EventAdmin2:postEvent/sendEvent
EventAdmin2 -> EventAdmin2:Delivery event to event handlers in FrameworkB
end alt

@enduml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,30 @@ TEST_F(CelixEventAdminActTestSuite, FailedToAddEventHandlerDependencyToEventAdmi
});
}

TEST_F(CelixEventAdminActTestSuite, FailedToCreateRemoteProviderDependencyForEventAdminTest) {
TestEventAdminActivator([](void *act, celix_bundle_context_t *ctx) {
celix_ei_expect_celix_dmServiceDependency_create((void*)&celix_bundleActivator_start, 1, nullptr, 2);
auto status = celix_bundleActivator_start(act, ctx);
ASSERT_EQ(CELIX_ENOMEM, status);
});
}

TEST_F(CelixEventAdminActTestSuite, FailedToSetServiceToRemoteProviderDependencyTest) {
TestEventAdminActivator([](void *act, celix_bundle_context_t *ctx) {
celix_ei_expect_celix_dmServiceDependency_setService((void*)&celix_bundleActivator_start, 1, CELIX_ENOMEM, 2);
auto status = celix_bundleActivator_start(act, ctx);
ASSERT_EQ(CELIX_ENOMEM, status);
});
}

TEST_F(CelixEventAdminActTestSuite, FailedToAddRemoteProviderDependencyToEventAdminComponentTest) {
TestEventAdminActivator([](void *act, celix_bundle_context_t *ctx) {
celix_ei_expect_celix_dmComponent_addServiceDependency((void*)&celix_bundleActivator_start, 1, CELIX_ENOMEM, 2);
auto status = celix_bundleActivator_start(act, ctx);
ASSERT_EQ(CELIX_ENOMEM, status);
});
}

TEST_F(CelixEventAdminActTestSuite, FailedToAddEventAdminServiceToComponentTest) {
TestEventAdminActivator([](void *act, celix_bundle_context_t *ctx) {
celix_ei_expect_celix_dmComponent_addInterface((void*)&celix_bundleActivator_start, 1, CELIX_ENOMEM);
Expand All @@ -129,23 +153,23 @@ TEST_F(CelixEventAdminActTestSuite, FailedToCreateEventAdapterTest) {

TEST_F(CelixEventAdminActTestSuite, FailedToCreateEventAdminDependencyForEventAdapterTest) {
TestEventAdminActivator([](void *act, celix_bundle_context_t *ctx) {
celix_ei_expect_celix_dmServiceDependency_create((void*)&celix_bundleActivator_start, 1, nullptr, 2);
celix_ei_expect_celix_dmServiceDependency_create((void*)&celix_bundleActivator_start, 1, nullptr, 3);
auto status = celix_bundleActivator_start(act, ctx);
ASSERT_EQ(CELIX_ENOMEM, status);
});
}

TEST_F(CelixEventAdminActTestSuite, FailedToSetServiceToEventAdminDependencyTest) {
TestEventAdminActivator([](void *act, celix_bundle_context_t *ctx) {
celix_ei_expect_celix_dmServiceDependency_setService((void*)&celix_bundleActivator_start, 1, CELIX_ENOMEM, 2);
celix_ei_expect_celix_dmServiceDependency_setService((void*)&celix_bundleActivator_start, 1, CELIX_ENOMEM, 3);
auto status = celix_bundleActivator_start(act, ctx);
ASSERT_EQ(CELIX_ENOMEM, status);
});
}

TEST_F(CelixEventAdminActTestSuite, FailedToAddEventAdminDependencyToEventAdapterComponentTest) {
TestEventAdminActivator([](void *act, celix_bundle_context_t *ctx) {
celix_ei_expect_celix_dmComponent_addServiceDependency((void*)&celix_bundleActivator_start, 1, CELIX_ENOMEM, 2);
celix_ei_expect_celix_dmComponent_addServiceDependency((void*)&celix_bundleActivator_start, 1, CELIX_ENOMEM, 3);
auto status = celix_bundleActivator_start(act, ctx);
ASSERT_EQ(CELIX_ENOMEM, status);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
#include <cerrno>

#include "CelixEventAdminTestSuiteBaseClass.h"
#include "celix_event_admin.h"
#include "celix_event.h"
Expand All @@ -27,6 +29,8 @@
#include "celix_long_hash_map_ei.h"
#include "celix_threads_ei.h"
#include "celix_utils_ei.h"
#include "celix_properties_ei.h"
#include "celix_bundle_context_ei.h"
#include "malloc_ei.h"
#include <gtest/gtest.h>

Expand All @@ -51,6 +55,11 @@ class CelixEventAdminErrorInjectionTestSuite : public CelixEventAdminTestSuiteBa
celix_ei_expect_celix_arrayList_addLong(nullptr, 0, 0);
celix_ei_expect_celix_elapsedtime(nullptr, 0, 0);
celix_ei_expect_celix_arrayList_createWithOptions(nullptr, 0, nullptr);
celix_ei_expect_celix_properties_copy(nullptr, 0, nullptr);
celix_ei_expect_celix_properties_set(nullptr, 0, 0);
celix_ei_expect_celix_properties_setLong(nullptr, 0, 0);
celix_ei_expect_celix_bundleContext_getProperty(nullptr, 0, nullptr);
celix_ei_expect_celix_stringHashMap_createWithOptions(nullptr, 0, nullptr);
}
};

Expand Down Expand Up @@ -89,6 +98,12 @@ TEST_F(CelixEventAdminErrorInjectionTestSuite, FailedToCreateLogHelperForEventAd
EXPECT_EQ(nullptr, ea);
}

TEST_F(CelixEventAdminErrorInjectionTestSuite, FailedToGetFrameworkUUIDForEventAdminTest) {
celix_ei_expect_celix_bundleContext_getProperty((void*)&celix_eventAdmin_create, 0, nullptr);
auto ea = celix_eventAdmin_create(ctx.get());
EXPECT_EQ(nullptr, ea);
}

TEST_F(CelixEventAdminErrorInjectionTestSuite, FailedToCreateLockForEventAdminTest) {
celix_ei_expect_celixThreadRwlock_create((void*)&celix_eventAdmin_create, 0, CELIX_ENOMEM);
auto ea = celix_eventAdmin_create(ctx.get());
Expand Down Expand Up @@ -119,6 +134,18 @@ TEST_F(CelixEventAdminErrorInjectionTestSuite, FailedToCreateEventHandlersMapFor
EXPECT_EQ(nullptr, ea);
}

TEST_F(CelixEventAdminErrorInjectionTestSuite, FailedToCreateEventSeqIdCacheForEventAdminTest) {
celix_ei_expect_celix_stringHashMap_createWithOptions((void*)&celix_eventAdmin_create, 0, nullptr);
auto ea = celix_eventAdmin_create(ctx.get());
EXPECT_EQ(nullptr, ea);
}

TEST_F(CelixEventAdminErrorInjectionTestSuite, FailedToCreateRemoteProviderMapForEventAdminTest) {
celix_ei_expect_celix_longHashMap_create((void*)&celix_eventAdmin_create, 0, nullptr, 2);
auto ea = celix_eventAdmin_create(ctx.get());
EXPECT_EQ(nullptr, ea);
}

TEST_F(CelixEventAdminErrorInjectionTestSuite, FailedToCreateMutexForEventAdminTest) {
celix_ei_expect_celixThreadMutex_create((void*)&celix_eventAdmin_create, 0, CELIX_ENOMEM);
auto ea = celix_eventAdmin_create(ctx.get());
Expand Down Expand Up @@ -339,4 +366,128 @@ TEST_F(CelixEventAdminErrorInjectionTestSuite, PostEventToBlacklistHandlerTest)
EXPECT_STRNE("org/celix/test1", topic);
return CELIX_SUCCESS;
});
}

TEST_F(CelixEventAdminErrorInjectionTestSuite, RetrieveEventSeqIdCacheTest) {
int receivedEventCount = 0;
TestPublishEvent("org/celix/test", nullptr, [](celix_event_admin_t *ea) {
for (int i = 0; i < 32; i++) {
celix_autoptr(celix_properties_t) eventProps = celix_properties_create();
std::string uuid = "9748f803-5766-49f1-a2e9-" + std::to_string(i);
celix_properties_set(eventProps, CELIX_EVENT_REMOTE_FRAMEWORK_UUID, uuid.c_str());
celix_properties_setLong(eventProps, CELIX_EVENT_REMOTE_SEQ_ID, 1);
auto status = celix_eventAdmin_sendEvent(ea, "org/celix/test", eventProps);
EXPECT_EQ(CELIX_SUCCESS, status);
}
celix_ei_expect_celix_elapsedtime(CELIX_EI_UNKNOWN_CALLER, 0, 60*60+1);
celix_autoptr(celix_properties_t) eventProps = celix_properties_create();
celix_properties_set(eventProps, CELIX_EVENT_REMOTE_FRAMEWORK_UUID, "9748f803-5766-49f1-a2e9-9bbb522e874b");
celix_properties_setLong(eventProps, CELIX_EVENT_REMOTE_SEQ_ID, 1);
auto status = celix_eventAdmin_sendEvent(ea, "org/celix/test", eventProps);
EXPECT_EQ(CELIX_SUCCESS, status);
}, [&receivedEventCount](void*, const char*, const celix_properties_t*) {
receivedEventCount ++;
return CELIX_SUCCESS;
});
EXPECT_EQ(33, receivedEventCount);
}

TEST_F(CelixEventAdminErrorInjectionTestSuite, FailedToCopyRemoteEnableEventPropertiesTest) {
celix_event_remote_provider_service_t remoteProviderService;
remoteProviderService.handle = nullptr;
remoteProviderService.sendEvent = [](void*, const char*, const celix_properties_t*) {
ADD_FAILURE() << "Should not be called";
return CELIX_SUCCESS;
};
remoteProviderService.postEvent = [](void*, const char*, const celix_properties_t*) {
ADD_FAILURE() << "Should not be called";
return CELIX_SUCCESS;
};
TestPublishEventToRemote([](celix_event_admin_t *ea) {
celix_ei_expect_celix_properties_copy((void*)&celix_eventAdmin_sendEvent, 1, nullptr);
celix_autoptr(celix_properties_t) eventProps = celix_properties_create();
celix_properties_setBool(eventProps, CELIX_EVENT_REMOTE_ENABLE, true);
auto status = celix_eventAdmin_sendEvent(ea, "org/celix/test", eventProps);
EXPECT_EQ(ENOMEM, status);
}, &remoteProviderService);
}

TEST_F(CelixEventAdminErrorInjectionTestSuite, FailedToSetRemoteFrameworkUUIDToRemoteEnableEventTest) {
celix_event_remote_provider_service_t remoteProviderService;
remoteProviderService.handle = nullptr;
remoteProviderService.sendEvent = [](void*, const char*, const celix_properties_t*) {
ADD_FAILURE() << "Should not be called";
return CELIX_SUCCESS;
};
remoteProviderService.postEvent = [](void*, const char*, const celix_properties_t*) {
ADD_FAILURE() << "Should not be called";
return CELIX_SUCCESS;
};
TestPublishEventToRemote([](celix_event_admin_t *ea) {
celix_ei_expect_celix_properties_set((void*)&celix_eventAdmin_sendEvent, 1, ENOMEM);
celix_autoptr(celix_properties_t) eventProps = celix_properties_create();
celix_properties_setBool(eventProps, CELIX_EVENT_REMOTE_ENABLE, true);
auto status = celix_eventAdmin_sendEvent(ea, "org/celix/test", eventProps);
EXPECT_EQ(ENOMEM, status);
}, &remoteProviderService);
}

TEST_F(CelixEventAdminErrorInjectionTestSuite, FailedToSetSeqIdToRemoteEnableEventTest) {
celix_event_remote_provider_service_t remoteProviderService;
remoteProviderService.handle = nullptr;
remoteProviderService.sendEvent = [](void*, const char*, const celix_properties_t*) {
ADD_FAILURE() << "Should not be called";
return CELIX_SUCCESS;
};
remoteProviderService.postEvent = [](void*, const char*, const celix_properties_t*) {
ADD_FAILURE() << "Should not be called";
return CELIX_SUCCESS;
};
TestPublishEventToRemote([](celix_event_admin_t *ea) {
celix_ei_expect_celix_properties_setLong((void*)&celix_eventAdmin_sendEvent, 1, ENOMEM);
celix_autoptr(celix_properties_t) eventProps = celix_properties_create();
celix_properties_setBool(eventProps, CELIX_EVENT_REMOTE_ENABLE, true);
auto status = celix_eventAdmin_sendEvent(ea, "org/celix/test", eventProps);
EXPECT_EQ(ENOMEM, status);
}, &remoteProviderService);
}

TEST_F(CelixEventAdminErrorInjectionTestSuite, FailedAllocMemoryForSeqIdCacheWhenSendRemoteEventTest) {
int receivedEventCount = 0;
TestPublishEvent("org/celix/test", nullptr, [](celix_event_admin_t *ea) {
celix_ei_expect_calloc((void*)&celix_eventAdmin_sendEvent, 2, nullptr);
celix_autoptr(celix_properties_t) eventProps = celix_properties_create();
celix_properties_set(eventProps, CELIX_EVENT_REMOTE_FRAMEWORK_UUID, "9748f803-5766-49f1-a2e9-9bbb522e874a");
celix_properties_setLong(eventProps, CELIX_EVENT_REMOTE_SEQ_ID, 1);
auto status = celix_eventAdmin_sendEvent(ea, "org/celix/test", eventProps);
EXPECT_EQ(CELIX_SUCCESS, status);

celix_properties_setLong(eventProps, CELIX_EVENT_REMOTE_SEQ_ID, 1);
status = celix_eventAdmin_sendEvent(ea, "org/celix/test", eventProps);
EXPECT_EQ(CELIX_SUCCESS, status);
}, [&receivedEventCount](void*, const char*, const celix_properties_t*) {
receivedEventCount ++;
return CELIX_SUCCESS;
});
EXPECT_EQ(2, receivedEventCount);
}

TEST_F(CelixEventAdminErrorInjectionTestSuite, FailedAddSeqIdCacheWhenSendRemoteEventTest) {
int receivedEventCount = 0;
TestPublishEvent("org/celix/test", nullptr, [](celix_event_admin_t *ea) {
celix_ei_expect_celix_stringHashMap_put((void*)&celix_eventAdmin_sendEvent, 2, ENOMEM);
celix_autoptr(celix_properties_t) eventProps = celix_properties_create();
celix_properties_set(eventProps, CELIX_EVENT_REMOTE_FRAMEWORK_UUID, "9748f803-5766-49f1-a2e9-9bbb522e874a");
celix_properties_setLong(eventProps, CELIX_EVENT_REMOTE_SEQ_ID, 1);
auto status = celix_eventAdmin_sendEvent(ea, "org/celix/test", eventProps);
EXPECT_EQ(CELIX_SUCCESS, status);

celix_properties_setLong(eventProps, CELIX_EVENT_REMOTE_SEQ_ID, 1);
status = celix_eventAdmin_sendEvent(ea, "org/celix/test", eventProps);
EXPECT_EQ(CELIX_SUCCESS, status);
}, [&receivedEventCount](void*, const char*, const celix_properties_t*) {
receivedEventCount ++;
return CELIX_SUCCESS;
});
EXPECT_EQ(2, receivedEventCount);
}
Loading
Loading