Skip to content

Commit

Permalink
- Resolve memory leak of custom content-filter resources
Browse files Browse the repository at this point in the history
- Add missing package dependencies for rti_connext_dds_custom_sql_filter
- Clean up all participants upon factory finalization
- Reset context state upon finalization (rmw_connextddsmicro)
Signed-off-by: Andrea Sorbini <[email protected]>
  • Loading branch information
asorbini committed Oct 25, 2021
1 parent 1e0027e commit d4788f9
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 57 deletions.
3 changes: 2 additions & 1 deletion rmw_connextdds_common/src/common/rmw_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ rmw_context_impl_t::finalize_participant()
if (nullptr != this->participant) {
// If we are cleaning up after some RMW failure, it is possible for some
// DataWriter to not have been deleted.
// Call DDS_Publisher_delete_contained_entities() to make sure we can
// Call DDS_DomainParticipant_delete_contained_entities() to make sure we can
// dispose the publisher.
if (DDS_RETCODE_OK !=
DDS_DomainParticipant_delete_contained_entities(this->participant))
Expand All @@ -408,6 +408,7 @@ rmw_context_impl_t::finalize_participant()
RMW_CONNEXT_LOG_ERROR_SET("failed to delete DDS participant")
return RMW_RET_ERROR;
}

this->participant = nullptr;
}

Expand Down
4 changes: 2 additions & 2 deletions rmw_connextdds_common/src/common/rmw_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1368,9 +1368,9 @@ RMW_Connext_Subscriber::create(
RMW_CONNEXT_LOG_ERROR_SET("failed to allocate RMW subscriber")
return nullptr;
}
scope_exit_type_unregister.cancel();
scope_exit_topic_delete.cancel();
scope_exit_dds_reader_delete.cancel();
scope_exit_topic_delete.cancel();
scope_exit_type_unregister.cancel();

return rmw_sub_impl;
}
Expand Down
85 changes: 83 additions & 2 deletions rmw_connextdds_common/src/ndds/dds_api_ndds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@
const char * const RMW_CONNEXTDDS_ID = "rmw_connextdds";
const char * const RMW_CONNEXTDDS_SERIALIZATION_FORMAT = "cdr";

struct rmw_connextdds_api_pro
{
rti_connext_dds_custom_sql_filter::CustomSqlFilterData custom_filter_data;
};

rmw_connextdds_api_pro * RMW_Connext_fv_FactoryContext = nullptr;

rmw_ret_t
rmw_connextdds_set_log_verbosity(rmw_log_severity_t severity)
{
Expand Down Expand Up @@ -70,16 +77,89 @@ rmw_ret_t
rmw_connextdds_initialize_participant_factory_context(
rmw_context_impl_t * const ctx)
{
RMW_CONNEXT_ASSERT(RMW_Connext_fv_FactoryContext == nullptr)
// RMW_Connext_gv_DomainParticipantFactory is initialized by
// rmw_api_connextdds_init().
RMW_CONNEXT_ASSERT(RMW_Connext_gv_DomainParticipantFactory == nullptr)
UNUSED_ARG(ctx);
// Nothing to do

rmw_connextdds_api_pro * ctx_api = nullptr;
auto scope_exit_api_delete = rcpputils::make_scope_exit(
[ctx_api]()
{
if (nullptr != ctx_api) {
delete ctx_api;
}
});

ctx_api = new (std::nothrow) rmw_connextdds_api_pro();
if (nullptr == ctx_api) {
return RMW_RET_ERROR;
}

scope_exit_api_delete.cancel();
RMW_Connext_fv_FactoryContext = ctx_api;
return RMW_RET_OK;
}

rmw_ret_t
rmw_connextdds_finalize_participant_factory_context(
rmw_context_impl_t * const ctx)
{
RMW_CONNEXT_ASSERT(nullptr != RMW_Connext_fv_FactoryContext)
rmw_connextdds_api_pro * const ctx_api = RMW_Connext_fv_FactoryContext;
RMW_Connext_fv_FactoryContext = nullptr;

delete ctx_api;

// There might be some DomainParticipants left-over from a ("failed context
// initialization" + "failed participant finalization"), so let's try to
// clean them up.
DDS_DomainParticipantSeq participants = DDS_SEQUENCE_INITIALIZER;
auto scope_exit_seq = rcpputils::make_scope_exit(
[&participants]()
{
DDS_DomainParticipantSeq_finalize(&participants);
});

if (DDS_RETCODE_OK !=
DDS_DomainParticipantFactory_get_participants(
RMW_Connext_gv_DomainParticipantFactory, &participants))
{
RMW_CONNEXT_LOG_ERROR_SET("failed to list existing participants")
return RMW_RET_ERROR;
}

const DDS_Long pending = DDS_DomainParticipantSeq_get_length(&participants);
for (DDS_Long i = 0; i < pending; i++) {
DDS_DomainParticipant * const participant =
*DDS_DomainParticipantSeq_get_reference(&participants, i);

if (DDS_RETCODE_OK !=
DDS_Entity_enable(DDS_DomainParticipant_as_entity(participant)))
{
RMW_CONNEXT_LOG_ERROR_SET(
"failed to enable pending DomainParticipant before deletion")
return RMW_RET_ERROR;
}

if (DDS_RETCODE_OK !=
DDS_DomainParticipant_delete_contained_entities(participant))
{
RMW_CONNEXT_LOG_ERROR_SET(
"failed to delete pending DomainParticipant's entities")
return RMW_RET_ERROR;
}

if (DDS_RETCODE_OK !=
DDS_DomainParticipantFactory_delete_participant(
RMW_Connext_gv_DomainParticipantFactory, participant))
{
RMW_CONNEXT_LOG_ERROR_SET("failed to delete pending DomainParticipant")
return RMW_RET_ERROR;
}
}

UNUSED_ARG(ctx);
return RMW_RET_OK;
}
Expand Down Expand Up @@ -238,7 +318,8 @@ rmw_connextdds_configure_participant(
UNUSED_ARG(ctx);

if (DDS_RETCODE_OK !=
rti_connext_dds_custom_sql_filter::register_content_filter(participant))
rti_connext_dds_custom_sql_filter::register_content_filter(
participant, &RMW_Connext_fv_FactoryContext->custom_filter_data))
{
RMW_CONNEXT_LOG_ERROR_SET("failed to register custom SQL filter")
return RMW_RET_ERROR;
Expand Down
2 changes: 2 additions & 0 deletions rmw_connextdds_common/src/rtime/dds_api_rtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,8 @@ rmw_connextdds_finalize_participant_factory_context(
RMW_CONNEXT_ASSERT(nullptr != RMW_Connext_fv_FactoryContext)
rmw_connextdds_api_micro * const ctx_api = RMW_Connext_fv_FactoryContext;

RMW_Connext_fv_FactoryContext = nullptr;

RT_Registry_T * registry =
DDS_DomainParticipantFactory_get_registry(
RMW_Connext_gv_DomainParticipantFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,22 @@
namespace rti_connext_dds_custom_sql_filter
{

struct CustomSqlFilterData
{
DDS_SqlFilterGeneratorQos base;

CustomSqlFilterData();

DDS_ReturnCode_t
set_memory_management_property(
const DDS_DomainParticipantQos & dp_qos);
};

RTI_CONNEXTDDS_CUSTOM_SQL_FILTER_PUBLIC
DDS_ReturnCode_t
register_content_filter(DDS_DomainParticipant * const participant);
register_content_filter(
DDS_DomainParticipant * const participant,
CustomSqlFilterData * const filter_data);

RTI_CONNEXTDDS_CUSTOM_SQL_FILTER_PUBLIC
extern const char * const PLUGIN_NAME;
Expand Down
1 change: 1 addition & 0 deletions rti_connext_dds_custom_sql_filter/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
<buildtool_depend>ament_cmake_ros</buildtool_depend>

<depend>rti_connext_dds_cmake_module</depend>
<depend>rcpputils</depend>

<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
Expand Down
100 changes: 49 additions & 51 deletions rti_connext_dds_custom_sql_filter/src/custom_sql_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,54 +77,52 @@ struct RTI_CustomSqlFilterReaderData
}
};

struct RTI_CustomSqlFilterData
{
DDS_SqlFilterGeneratorQos base;
using RTI_CustomSqlFilterData = rti_connext_dds_custom_sql_filter::CustomSqlFilterData;

RTI_CustomSqlFilterData()
RTI_CustomSqlFilterData::CustomSqlFilterData()
: base(DDS_SQLFILTER_QOS_DEFAULT)
{}
{
}

DDS_ReturnCode_t
set_memory_management_property(
const DDS_DomainParticipantQos & dp_qos)
{
static const DDS_SqlFilterMemoryManagementQos DEFAULT =
DDS_SqlFilterMemoryManagementQos_INITIALIZER;
this->base.memory_management = DEFAULT;
DDS_ReturnCode_t
RTI_CustomSqlFilterData::set_memory_management_property(
const DDS_DomainParticipantQos & dp_qos)
{
static const DDS_SqlFilterMemoryManagementQos DEFAULT =
DDS_SqlFilterMemoryManagementQos_INITIALIZER;
this->base.memory_management = DEFAULT;

auto properties = const_cast<DDS_PropertyQosPolicy *>(&dp_qos.property);
auto properties = const_cast<DDS_PropertyQosPolicy *>(&dp_qos.property);

const DDS_Property_t * property = DDS_PropertyQosPolicyHelper_lookup_property(
properties,
DDS_CONTENT_FILTER_SQL_DESERIALIZED_SAMPLE_MIN_BUFFER_SIZE_PROPERTY_NAME);
const DDS_Property_t * property = DDS_PropertyQosPolicyHelper_lookup_property(
properties,
DDS_CONTENT_FILTER_SQL_DESERIALIZED_SAMPLE_MIN_BUFFER_SIZE_PROPERTY_NAME);

if (nullptr != property) {
if (!sscanf(
property->value, "%d",
&this->base.memory_management.buffer_min_size))
{
// TODO(asorbini) log error
return DDS_RETCODE_ERROR;
}
if (nullptr != property) {
if (!sscanf(
property->value, "%d",
&this->base.memory_management.buffer_min_size))
{
// TODO(asorbini) log error
return DDS_RETCODE_ERROR;
}
}

property = DDS_PropertyQosPolicyHelper_lookup_property(
properties,
DDS_CONTENT_FILTER_SQL_DESERIALIZED_SAMPLE_TRIM_TO_SIZE_PROPERTY_NAME);
property = DDS_PropertyQosPolicyHelper_lookup_property(
properties,
DDS_CONTENT_FILTER_SQL_DESERIALIZED_SAMPLE_TRIM_TO_SIZE_PROPERTY_NAME);

if (nullptr != property) {
if (!REDAString_iCompare(property->value, "1") ||
!REDAString_iCompare(property->value, "true") ||
!REDAString_iCompare(property->value, "yes"))
{
this->base.memory_management.trim_buffer = DDS_BOOLEAN_TRUE;
}
if (nullptr != property) {
if (!REDAString_iCompare(property->value, "1") ||
!REDAString_iCompare(property->value, "true") ||
!REDAString_iCompare(property->value, "yes"))
{
this->base.memory_management.trim_buffer = DDS_BOOLEAN_TRUE;
}

return DDS_RETCODE_OK;
}
};

return DDS_RETCODE_OK;
}

static
int
Expand Down Expand Up @@ -276,6 +274,16 @@ RTI_CustomSqlFilter_writer_detach(
static_cast<RTI_CustomSqlFilterWriterData *>(writer_filter_data);

DDS_SqlFilter_writerDetach(&cft_data->base, writer_data->base);

REDASkiplistNode * node = nullptr;
REDASkiplist_gotoTopNode(&writer_data->readers, &node);
while (REDASkiplist_gotoNextNode(&writer_data->readers, &node)) {
RTI_CustomSqlFilterReaderData * const rdata =
static_cast<RTI_CustomSqlFilterReaderData *>(REDASkiplistNode_getUserData(node));
delete rdata;
}

REDASkiplist_deleteDefaultAllocator(&writer_data->readers_desc);
delete writer_data;
}

Expand Down Expand Up @@ -666,7 +674,8 @@ RTI_CustomSqlFilter_query(void * filter_data, void * handle)

DDS_ReturnCode_t
rti_connext_dds_custom_sql_filter::register_content_filter(
DDS_DomainParticipant * const participant)
DDS_DomainParticipant * const participant,
rti_connext_dds_custom_sql_filter::CustomSqlFilterData * const filter_data)
{
DDS_DomainParticipantQos dp_qos = DDS_DomainParticipantQos_INITIALIZER;
auto scope_exit_qos = rcpputils::make_scope_exit(
Expand All @@ -683,17 +692,7 @@ rti_connext_dds_custom_sql_filter::register_content_filter(
return DDS_RETCODE_ERROR;
}

RTI_CustomSqlFilterData * cft_data = new (std::nothrow) RTI_CustomSqlFilterData();
if (nullptr == cft_data) {
// TODO(asorbini) log error
return DDS_RETCODE_ERROR;
}
auto scope_exit_data = rcpputils::make_scope_exit(
[cft_data]()
{
delete cft_data;
});
DDS_ReturnCode_t rc = cft_data->set_memory_management_property(dp_qos);
DDS_ReturnCode_t rc = filter_data->set_memory_management_property(dp_qos);
if (DDS_RETCODE_OK != rc) {
// TODO(asorbini) log error
return rc;
Expand All @@ -709,7 +708,7 @@ rti_connext_dds_custom_sql_filter::register_content_filter(
filter.writer_return_loan = RTI_CustomSqlFilter_writer_return_loan;
filter.evaluate = RTI_CustomSqlFilter_evaluate;
filter.finalize = RTI_CustomSqlFilter_finalize;
filter.filter_data = cft_data;
filter.filter_data = filter_data;

rc = DDS_ContentFilter_register_filter(
participant,
Expand All @@ -724,7 +723,6 @@ rti_connext_dds_custom_sql_filter::register_content_filter(
return DDS_RETCODE_ERROR;
}

scope_exit_data.cancel();
return DDS_RETCODE_OK;
}

Expand Down

0 comments on commit d4788f9

Please sign in to comment.