Skip to content

Commit

Permalink
CSubscriberImpl event handling implemented
Browse files Browse the repository at this point in the history
CPublisherImpl event handling renamings
  • Loading branch information
rex-schilasky committed Dec 17, 2024
1 parent a340c06 commit 6ebcdea
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 83 deletions.
30 changes: 15 additions & 15 deletions ecal/core/src/pubsub/ecal_publisher_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -741,23 +741,23 @@ namespace eCAL
ecal_reg_sample_topic.uname = m_attributes.unit_name;
}

void CPublisherImpl::FireEvent(const eCAL_Publisher_Event type_, const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& tinfo_)
void CPublisherImpl::FireEvent(const eCAL_Publisher_Event type_, const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_)
{
SPubEventCallbackData data;
data.type = type_;
data.time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
data.clock = 0;
data.tid = subscription_info_.entity_id;
data.tdatatype = tinfo_;
data.type = type_;
data.time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
data.clock = 0;
data.tid = subscription_info_.entity_id;
data.tdatatype = data_type_info_;

// new event handling with topic id
if(m_event_id_callback)
{
Registration::STopicId topic_id;
topic_id.topic_id.entity_id = subscription_info_.entity_id;
topic_id.topic_id.entity_id = subscription_info_.entity_id;
topic_id.topic_id.process_id = subscription_info_.process_id;
topic_id.topic_id.host_name = subscription_info_.host_name;
topic_id.topic_name = m_attributes.topic_name;
topic_id.topic_id.host_name = subscription_info_.host_name;
topic_id.topic_name = m_attributes.topic_name;
const std::lock_guard<std::mutex> lock(m_event_id_callback_mutex);
// call event callback
m_event_id_callback(topic_id, data);
Expand All @@ -774,19 +774,19 @@ namespace eCAL
}
}

void CPublisherImpl::FireConnectEvent(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& tinfo_)
void CPublisherImpl::FireConnectEvent(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_)
{
FireEvent(pub_event_connected, subscription_info_, tinfo_);
FireEvent(pub_event_connected, subscription_info_, data_type_info_);
}

void CPublisherImpl::FireUpdateEvent(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& tinfo_)
void CPublisherImpl::FireUpdateEvent(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_)
{
FireEvent(pub_event_update_connection, subscription_info_, tinfo_);
FireEvent(pub_event_update_connection, subscription_info_, data_type_info_);
}

void CPublisherImpl::FireDisconnectEvent(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& tinfo_)
void CPublisherImpl::FireDisconnectEvent(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_)
{
FireEvent(pub_event_disconnected, subscription_info_, tinfo_);
FireEvent(pub_event_disconnected, subscription_info_, data_type_info_);
}

size_t CPublisherImpl::GetConnectionCount()
Expand Down
8 changes: 4 additions & 4 deletions ecal/core/src/pubsub/ecal_publisher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,11 @@ namespace eCAL

void StopAllLayer();

void FireEvent(const eCAL_Publisher_Event type_, const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& tinfo_);
void FireEvent(const eCAL_Publisher_Event type_, const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_);

void FireConnectEvent (const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& tinfo_);
void FireUpdateEvent (const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& tinfo_);
void FireDisconnectEvent(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& tinfo_);
void FireConnectEvent (const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_);
void FireUpdateEvent (const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_);
void FireDisconnectEvent(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_);

size_t GetConnectionCount();

Expand Down
3 changes: 2 additions & 1 deletion ecal/core/src/pubsub/ecal_subgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,14 @@ namespace eCAL
if (topic_name.empty()) return;

const auto& publication_info = ecal_sample_.identifier;
const SDataTypeInformation& topic_information = ecal_topic.tdatatype;

// unregister publisher
const std::shared_lock<std::shared_timed_mutex> lock(m_topic_name_subscriber_mutex);
auto res = m_topic_name_subscriber_map.equal_range(topic_name);
for (auto iter = res.first; iter != res.second; ++iter)
{
iter->second->ApplyPublisherUnregistration(publication_info);
iter->second->ApplyPublisherUnregistration(publication_info, topic_information);
}
}

Expand Down
116 changes: 61 additions & 55 deletions ecal/core/src/pubsub/ecal_subscriber_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,13 @@ namespace eCAL

// reset receive callback
{
const std::lock_guard<std::mutex> lock(m_receive_callback_mtx);
const std::lock_guard<std::mutex> lock(m_receive_callback_mutex);
m_receive_callback = nullptr;
}

// reset event callback map
{
const std::lock_guard<std::mutex> lock(m_event_callback_map_mtx);
const std::lock_guard<std::mutex> lock(m_event_callback_map_mutex);
m_event_callback_map.clear();
}

Expand All @@ -128,7 +128,7 @@ namespace eCAL
{
if (!m_created) return(false);

std::unique_lock<std::mutex> read_buffer_lock(m_read_buf_mtx);
std::unique_lock<std::mutex> read_buffer_lock(m_read_buf_mutex);

// No need to wait (for whatever time) if something has been received
if (!m_read_buf_received)
Expand Down Expand Up @@ -171,7 +171,7 @@ namespace eCAL

// store receive callback
{
const std::lock_guard<std::mutex> lock(m_receive_callback_mtx);
const std::lock_guard<std::mutex> lock(m_receive_callback_mutex);
#ifndef NDEBUG
// log it
Logging::Log(log_level_debug2, m_attributes.topic_name + "::CSubscriberImpl::AddReceiveCallback");
Expand All @@ -188,7 +188,7 @@ namespace eCAL

// reset receive callback
{
const std::lock_guard<std::mutex> lock(m_receive_callback_mtx);
const std::lock_guard<std::mutex> lock(m_receive_callback_mutex);
#ifndef NDEBUG
// log it
Logging::Log(log_level_debug2, m_attributes.topic_name + "::CSubscriberImpl::RemReceiveCallback");
Expand All @@ -209,7 +209,7 @@ namespace eCAL
// log it
Logging::Log(log_level_debug2, m_attributes.topic_name + "::CSubscriberImpl::AddEventCallback");
#endif
const std::lock_guard<std::mutex> lock(m_event_callback_map_mtx);
const std::lock_guard<std::mutex> lock(m_event_callback_map_mutex);
m_event_callback_map[type_] = std::move(callback_);
}

Expand All @@ -226,7 +226,7 @@ namespace eCAL
// log it
Logging::Log(log_level_debug2, m_attributes.topic_name + "::CSubscriberImpl::RemEventCallback");
#endif
const std::lock_guard<std::mutex> lock(m_event_callback_map_mtx);
const std::lock_guard<std::mutex> lock(m_event_callback_map_mutex);
m_event_callback_map[type_] = nullptr;
}

Expand All @@ -235,14 +235,18 @@ namespace eCAL

bool CSubscriberImpl::AddEventIDCallback(const SubEventIDCallbackT callback_)
{
// TODO: Implement this
return false;
if (!m_created) return false;
const std::lock_guard<std::mutex> lock(m_event_id_callback_mutex);
m_event_id_callback = callback_;
return true;
}

bool CSubscriberImpl::RemEventIDCallback()
{
// TODO: Implement this
return false;
if (!m_created) return false;
const std::lock_guard<std::mutex> lock(m_event_id_callback_mutex);
m_event_id_callback = nullptr;
return true;
}

bool CSubscriberImpl::SetAttribute(const std::string& attr_name_, const std::string& attr_value_)
Expand Down Expand Up @@ -328,12 +332,12 @@ namespace eCAL
if (is_new_connection)
{
// fire connect event
FireConnectEvent(publication_info_.entity_id, data_type_info_);
FireConnectEvent(publication_info_, data_type_info_);
}
else if (is_updated_connection)
{
// fire update event
FireUpdateEvent(publication_info_.entity_id, data_type_info_);
FireUpdateEvent(publication_info_, data_type_info_);
}

#ifndef NDEBUG
Expand All @@ -342,7 +346,7 @@ namespace eCAL
#endif
}

void CSubscriberImpl::ApplyPublisherUnregistration(const SPublicationInfo& publication_info_)
void CSubscriberImpl::ApplyPublisherUnregistration(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_)
{
// remove key from connection map
bool last_connection_gone(false);
Expand All @@ -359,7 +363,7 @@ namespace eCAL
if (last_connection_gone)
{
// fire disconnect event
FireDisconnectEvent();
FireDisconnectEvent(publication_info_, data_type_info_);
}

#ifndef NDEBUG
Expand Down Expand Up @@ -424,7 +428,7 @@ namespace eCAL
size_t CSubscriberImpl::ApplySample(const Payload::TopicInfo& topic_info_, const char* payload_, size_t size_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_)
{
// ensure thread safety
const std::lock_guard<std::mutex> lock(m_receive_callback_mtx);
const std::lock_guard<std::mutex> lock(m_receive_callback_mutex);
if (!m_created) return(0);

// check receive layer configuration
Expand Down Expand Up @@ -496,7 +500,7 @@ namespace eCAL
// Update frequency calculation
{
const auto receive_time = std::chrono::steady_clock::now();
const std::lock_guard<std::mutex> freq_lock(m_frequency_calculator_mtx);
const std::lock_guard<std::mutex> freq_lock(m_frequency_calculator_mutex);
m_frequency_calculator.addTick(receive_time);
}

Expand Down Expand Up @@ -546,7 +550,7 @@ namespace eCAL
if (!processed)
{
// push sample into read buffer
const std::lock_guard<std::mutex> read_buffer_lock(m_read_buf_mtx);
const std::lock_guard<std::mutex> read_buffer_lock(m_read_buf_mutex);
m_read_buf.clear();
m_read_buf.assign(payload_, payload_ + size_);
m_read_time = time_;
Expand Down Expand Up @@ -796,50 +800,52 @@ namespace eCAL
#endif
}

void CSubscriberImpl::FireConnectEvent(const std::string& tid_, const SDataTypeInformation& tinfo_)
void CSubscriberImpl::FireEvent(const eCAL_Subscriber_Event type_, const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_)
{
const std::lock_guard<std::mutex> lock(m_event_callback_map_mtx);
auto iter = m_event_callback_map.find(sub_event_connected);
if (iter != m_event_callback_map.end() && iter->second)
SSubEventCallbackData data;
data.type = type_;
data.time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
data.clock = 0;
data.tid = publication_info_.entity_id;
data.tdatatype = data_type_info_;

// new event handling with topic id
if (m_event_id_callback)
{
SSubEventCallbackData data;
data.type = sub_event_connected;
data.time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
data.clock = 0;
data.tid = tid_;
data.tdatatype = tinfo_;
(iter->second)(m_attributes.topic_name.c_str(), &data);
Registration::STopicId topic_id;
topic_id.topic_id.entity_id = publication_info_.entity_id;
topic_id.topic_id.process_id = publication_info_.process_id;
topic_id.topic_id.host_name = publication_info_.host_name;
topic_id.topic_name = m_attributes.topic_name;
const std::lock_guard<std::mutex> lock(m_event_id_callback_mutex);
// call event callback
m_event_id_callback(topic_id, data);
}
}

void CSubscriberImpl::FireUpdateEvent(const std::string& tid_, const SDataTypeInformation& tinfo_)
{
const std::lock_guard<std::mutex> lock(m_event_callback_map_mtx);
auto iter = m_event_callback_map.find(sub_event_update_connection);
if (iter != m_event_callback_map.end() && iter->second)
// deprecated event handling with topic name
{
SSubEventCallbackData data;
data.type = sub_event_update_connection;
data.time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
data.clock = 0;
data.tid = tid_;
data.tdatatype = tinfo_;
(iter->second)(m_attributes.topic_name.c_str(), &data);
const std::lock_guard<std::mutex> lock(m_event_callback_map_mutex);
auto iter = m_event_callback_map.find(type_);
if (iter != m_event_callback_map.end() && iter->second)
{
(iter->second)(m_attributes.topic_name.c_str(), &data);
}
}
}

void CSubscriberImpl::FireDisconnectEvent()
void CSubscriberImpl::FireConnectEvent(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_)
{
const std::lock_guard<std::mutex> lock(m_event_callback_map_mtx);
auto iter = m_event_callback_map.find(sub_event_disconnected);
if (iter != m_event_callback_map.end() && iter->second)
{
SSubEventCallbackData data;
data.type = sub_event_disconnected;
data.time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
data.clock = 0;
(iter->second)(m_attributes.topic_name.c_str(), &data);
}
FireEvent(sub_event_connected, publication_info_, data_type_info_);
}

void CSubscriberImpl::FireUpdateEvent(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_)
{
FireEvent(sub_event_update_connection, publication_info_, data_type_info_);
}

void CSubscriberImpl::FireDisconnectEvent(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_)
{
FireEvent(sub_event_disconnected, publication_info_, data_type_info_);
}

size_t CSubscriberImpl::GetConnectionCount()
Expand Down Expand Up @@ -912,7 +918,7 @@ namespace eCAL
#endif
// we fire the message drop event
{
const std::lock_guard<std::mutex> lock(m_event_callback_map_mtx);
const std::lock_guard<std::mutex> lock(m_event_callback_map_mutex);
auto citer = m_event_callback_map.find(sub_event_dropped);
if (citer != m_event_callback_map.end() && citer->second)
{
Expand Down Expand Up @@ -978,7 +984,7 @@ namespace eCAL
int32_t CSubscriberImpl::GetFrequency()
{
const auto frequency_time = std::chrono::steady_clock::now();
const std::lock_guard<std::mutex> lock(m_frequency_calculator_mtx);
const std::lock_guard<std::mutex> lock(m_frequency_calculator_mutex);
return static_cast<int32_t>(m_frequency_calculator.getFrequency(frequency_time) * 1000);
}
}
Loading

0 comments on commit 6ebcdea

Please sign in to comment.