diff --git a/ecal/core/src/pubsub/ecal_publisher_impl.cpp b/ecal/core/src/pubsub/ecal_publisher_impl.cpp index aa8e4e3583..d9e448be2c 100644 --- a/ecal/core/src/pubsub/ecal_publisher_impl.cpp +++ b/ecal/core/src/pubsub/ecal_publisher_impl.cpp @@ -741,23 +741,23 @@ namespace eCAL ecal_reg_sample_topic.uname = m_attributes.unit_name; } - void CPublisherImpl::FireEvent(const eCAL_Publisher_Event type_, const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& tinfo_) + void CPublisherImpl::FireEvent(const eCAL_Publisher_Event type_, const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_) { SPubEventCallbackData data; - data.type = type_; - data.time = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); - data.clock = 0; - data.tid = subscription_info_.entity_id; - data.tdatatype = tinfo_; + data.type = type_; + data.time = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); + data.clock = 0; + data.tid = subscription_info_.entity_id; + data.tdatatype = data_type_info_; // new event handling with topic id if(m_event_id_callback) { Registration::STopicId topic_id; - topic_id.topic_id.entity_id = subscription_info_.entity_id; + topic_id.topic_id.entity_id = subscription_info_.entity_id; topic_id.topic_id.process_id = subscription_info_.process_id; - topic_id.topic_id.host_name = subscription_info_.host_name; - topic_id.topic_name = m_attributes.topic_name; + topic_id.topic_id.host_name = subscription_info_.host_name; + topic_id.topic_name = m_attributes.topic_name; const std::lock_guard lock(m_event_id_callback_mutex); // call event callback m_event_id_callback(topic_id, data); @@ -774,19 +774,19 @@ namespace eCAL } } - void CPublisherImpl::FireConnectEvent(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& tinfo_) + void CPublisherImpl::FireConnectEvent(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_) { - FireEvent(pub_event_connected, subscription_info_, tinfo_); + FireEvent(pub_event_connected, subscription_info_, data_type_info_); } - void CPublisherImpl::FireUpdateEvent(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& tinfo_) + void CPublisherImpl::FireUpdateEvent(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_) { - FireEvent(pub_event_update_connection, subscription_info_, tinfo_); + FireEvent(pub_event_update_connection, subscription_info_, data_type_info_); } - void CPublisherImpl::FireDisconnectEvent(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& tinfo_) + void CPublisherImpl::FireDisconnectEvent(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_) { - FireEvent(pub_event_disconnected, subscription_info_, tinfo_); + FireEvent(pub_event_disconnected, subscription_info_, data_type_info_); } size_t CPublisherImpl::GetConnectionCount() diff --git a/ecal/core/src/pubsub/ecal_publisher_impl.h b/ecal/core/src/pubsub/ecal_publisher_impl.h index 509e980297..564b9ecbd0 100644 --- a/ecal/core/src/pubsub/ecal_publisher_impl.h +++ b/ecal/core/src/pubsub/ecal_publisher_impl.h @@ -129,11 +129,11 @@ namespace eCAL void StopAllLayer(); - void FireEvent(const eCAL_Publisher_Event type_, const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& tinfo_); + void FireEvent(const eCAL_Publisher_Event type_, const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_); - void FireConnectEvent (const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& tinfo_); - void FireUpdateEvent (const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& tinfo_); - void FireDisconnectEvent(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& tinfo_); + void FireConnectEvent (const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_); + void FireUpdateEvent (const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_); + void FireDisconnectEvent(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_); size_t GetConnectionCount(); diff --git a/ecal/core/src/pubsub/ecal_subgate.cpp b/ecal/core/src/pubsub/ecal_subgate.cpp index 3a0ea1eb6e..79530e33a6 100644 --- a/ecal/core/src/pubsub/ecal_subgate.cpp +++ b/ecal/core/src/pubsub/ecal_subgate.cpp @@ -267,13 +267,14 @@ namespace eCAL if (topic_name.empty()) return; const auto& publication_info = ecal_sample_.identifier; + const SDataTypeInformation& topic_information = ecal_topic.tdatatype; // unregister publisher const std::shared_lock lock(m_topic_name_subscriber_mutex); auto res = m_topic_name_subscriber_map.equal_range(topic_name); for (auto iter = res.first; iter != res.second; ++iter) { - iter->second->ApplyPublisherUnregistration(publication_info); + iter->second->ApplyPublisherUnregistration(publication_info, topic_information); } } diff --git a/ecal/core/src/pubsub/ecal_subscriber_impl.cpp b/ecal/core/src/pubsub/ecal_subscriber_impl.cpp index 119d0048cd..0e748c51c4 100644 --- a/ecal/core/src/pubsub/ecal_subscriber_impl.cpp +++ b/ecal/core/src/pubsub/ecal_subscriber_impl.cpp @@ -107,13 +107,13 @@ namespace eCAL // reset receive callback { - const std::lock_guard lock(m_receive_callback_mtx); + const std::lock_guard lock(m_receive_callback_mutex); m_receive_callback = nullptr; } // reset event callback map { - const std::lock_guard lock(m_event_callback_map_mtx); + const std::lock_guard lock(m_event_callback_map_mutex); m_event_callback_map.clear(); } @@ -128,7 +128,7 @@ namespace eCAL { if (!m_created) return(false); - std::unique_lock read_buffer_lock(m_read_buf_mtx); + std::unique_lock read_buffer_lock(m_read_buf_mutex); // No need to wait (for whatever time) if something has been received if (!m_read_buf_received) @@ -171,7 +171,7 @@ namespace eCAL // store receive callback { - const std::lock_guard lock(m_receive_callback_mtx); + const std::lock_guard lock(m_receive_callback_mutex); #ifndef NDEBUG // log it Logging::Log(log_level_debug2, m_attributes.topic_name + "::CSubscriberImpl::AddReceiveCallback"); @@ -188,7 +188,7 @@ namespace eCAL // reset receive callback { - const std::lock_guard lock(m_receive_callback_mtx); + const std::lock_guard lock(m_receive_callback_mutex); #ifndef NDEBUG // log it Logging::Log(log_level_debug2, m_attributes.topic_name + "::CSubscriberImpl::RemReceiveCallback"); @@ -209,7 +209,7 @@ namespace eCAL // log it Logging::Log(log_level_debug2, m_attributes.topic_name + "::CSubscriberImpl::AddEventCallback"); #endif - const std::lock_guard lock(m_event_callback_map_mtx); + const std::lock_guard lock(m_event_callback_map_mutex); m_event_callback_map[type_] = std::move(callback_); } @@ -226,7 +226,7 @@ namespace eCAL // log it Logging::Log(log_level_debug2, m_attributes.topic_name + "::CSubscriberImpl::RemEventCallback"); #endif - const std::lock_guard lock(m_event_callback_map_mtx); + const std::lock_guard lock(m_event_callback_map_mutex); m_event_callback_map[type_] = nullptr; } @@ -235,14 +235,18 @@ namespace eCAL bool CSubscriberImpl::AddEventIDCallback(const SubEventIDCallbackT callback_) { - // TODO: Implement this - return false; + if (!m_created) return false; + const std::lock_guard lock(m_event_id_callback_mutex); + m_event_id_callback = callback_; + return true; } bool CSubscriberImpl::RemEventIDCallback() { - // TODO: Implement this - return false; + if (!m_created) return false; + const std::lock_guard lock(m_event_id_callback_mutex); + m_event_id_callback = nullptr; + return true; } bool CSubscriberImpl::SetAttribute(const std::string& attr_name_, const std::string& attr_value_) @@ -328,12 +332,12 @@ namespace eCAL if (is_new_connection) { // fire connect event - FireConnectEvent(publication_info_.entity_id, data_type_info_); + FireConnectEvent(publication_info_, data_type_info_); } else if (is_updated_connection) { // fire update event - FireUpdateEvent(publication_info_.entity_id, data_type_info_); + FireUpdateEvent(publication_info_, data_type_info_); } #ifndef NDEBUG @@ -342,7 +346,7 @@ namespace eCAL #endif } - void CSubscriberImpl::ApplyPublisherUnregistration(const SPublicationInfo& publication_info_) + void CSubscriberImpl::ApplyPublisherUnregistration(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_) { // remove key from connection map bool last_connection_gone(false); @@ -359,7 +363,7 @@ namespace eCAL if (last_connection_gone) { // fire disconnect event - FireDisconnectEvent(); + FireDisconnectEvent(publication_info_, data_type_info_); } #ifndef NDEBUG @@ -424,7 +428,7 @@ namespace eCAL size_t CSubscriberImpl::ApplySample(const Payload::TopicInfo& topic_info_, const char* payload_, size_t size_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_) { // ensure thread safety - const std::lock_guard lock(m_receive_callback_mtx); + const std::lock_guard lock(m_receive_callback_mutex); if (!m_created) return(0); // check receive layer configuration @@ -496,7 +500,7 @@ namespace eCAL // Update frequency calculation { const auto receive_time = std::chrono::steady_clock::now(); - const std::lock_guard freq_lock(m_frequency_calculator_mtx); + const std::lock_guard freq_lock(m_frequency_calculator_mutex); m_frequency_calculator.addTick(receive_time); } @@ -546,7 +550,7 @@ namespace eCAL if (!processed) { // push sample into read buffer - const std::lock_guard read_buffer_lock(m_read_buf_mtx); + const std::lock_guard read_buffer_lock(m_read_buf_mutex); m_read_buf.clear(); m_read_buf.assign(payload_, payload_ + size_); m_read_time = time_; @@ -796,50 +800,52 @@ namespace eCAL #endif } - void CSubscriberImpl::FireConnectEvent(const std::string& tid_, const SDataTypeInformation& tinfo_) + void CSubscriberImpl::FireEvent(const eCAL_Subscriber_Event type_, const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_) { - const std::lock_guard lock(m_event_callback_map_mtx); - auto iter = m_event_callback_map.find(sub_event_connected); - if (iter != m_event_callback_map.end() && iter->second) + SSubEventCallbackData data; + data.type = type_; + data.time = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); + data.clock = 0; + data.tid = publication_info_.entity_id; + data.tdatatype = data_type_info_; + + // new event handling with topic id + if (m_event_id_callback) { - SSubEventCallbackData data; - data.type = sub_event_connected; - data.time = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); - data.clock = 0; - data.tid = tid_; - data.tdatatype = tinfo_; - (iter->second)(m_attributes.topic_name.c_str(), &data); + Registration::STopicId topic_id; + topic_id.topic_id.entity_id = publication_info_.entity_id; + topic_id.topic_id.process_id = publication_info_.process_id; + topic_id.topic_id.host_name = publication_info_.host_name; + topic_id.topic_name = m_attributes.topic_name; + const std::lock_guard lock(m_event_id_callback_mutex); + // call event callback + m_event_id_callback(topic_id, data); } - } - void CSubscriberImpl::FireUpdateEvent(const std::string& tid_, const SDataTypeInformation& tinfo_) - { - const std::lock_guard lock(m_event_callback_map_mtx); - auto iter = m_event_callback_map.find(sub_event_update_connection); - if (iter != m_event_callback_map.end() && iter->second) + // deprecated event handling with topic name { - SSubEventCallbackData data; - data.type = sub_event_update_connection; - data.time = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); - data.clock = 0; - data.tid = tid_; - data.tdatatype = tinfo_; - (iter->second)(m_attributes.topic_name.c_str(), &data); + const std::lock_guard lock(m_event_callback_map_mutex); + auto iter = m_event_callback_map.find(type_); + if (iter != m_event_callback_map.end() && iter->second) + { + (iter->second)(m_attributes.topic_name.c_str(), &data); + } } } - void CSubscriberImpl::FireDisconnectEvent() + void CSubscriberImpl::FireConnectEvent(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_) { - const std::lock_guard lock(m_event_callback_map_mtx); - auto iter = m_event_callback_map.find(sub_event_disconnected); - if (iter != m_event_callback_map.end() && iter->second) - { - SSubEventCallbackData data; - data.type = sub_event_disconnected; - data.time = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); - data.clock = 0; - (iter->second)(m_attributes.topic_name.c_str(), &data); - } + FireEvent(sub_event_connected, publication_info_, data_type_info_); + } + + void CSubscriberImpl::FireUpdateEvent(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_) + { + FireEvent(sub_event_update_connection, publication_info_, data_type_info_); + } + + void CSubscriberImpl::FireDisconnectEvent(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_) + { + FireEvent(sub_event_disconnected, publication_info_, data_type_info_); } size_t CSubscriberImpl::GetConnectionCount() @@ -912,7 +918,7 @@ namespace eCAL #endif // we fire the message drop event { - const std::lock_guard lock(m_event_callback_map_mtx); + const std::lock_guard lock(m_event_callback_map_mutex); auto citer = m_event_callback_map.find(sub_event_dropped); if (citer != m_event_callback_map.end() && citer->second) { @@ -978,7 +984,7 @@ namespace eCAL int32_t CSubscriberImpl::GetFrequency() { const auto frequency_time = std::chrono::steady_clock::now(); - const std::lock_guard lock(m_frequency_calculator_mtx); + const std::lock_guard lock(m_frequency_calculator_mutex); return static_cast(m_frequency_calculator.getFrequency(frequency_time) * 1000); } } diff --git a/ecal/core/src/pubsub/ecal_subscriber_impl.h b/ecal/core/src/pubsub/ecal_subscriber_impl.h index f3ec870b90..461d95b0b2 100644 --- a/ecal/core/src/pubsub/ecal_subscriber_impl.h +++ b/ecal/core/src/pubsub/ecal_subscriber_impl.h @@ -63,6 +63,7 @@ namespace eCAL }; using SPublicationInfo = Registration::SampleIdentifier; + CSubscriberImpl(const SDataTypeInformation& topic_info_, const eCAL::eCALReader::SAttributes& attr_); ~CSubscriberImpl(); @@ -85,7 +86,7 @@ namespace eCAL void SetFilterIDs(const std::set& filter_ids_); void ApplyPublisherRegistration(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& pub_layer_states_); - void ApplyPublisherUnregistration(const SPublicationInfo& publication_info_); + void ApplyPublisherUnregistration(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_); void ApplyLayerParameter(const SPublicationInfo& publication_info_, eTLayerType type_, const Registration::ConnectionPar& parameter_); @@ -124,9 +125,11 @@ namespace eCAL void StartTransportLayer(); void StopTransportLayer(); - void FireConnectEvent(const std::string& tid_, const SDataTypeInformation& tinfo_); - void FireUpdateEvent(const std::string& tid_, const SDataTypeInformation& tinfo_); - void FireDisconnectEvent(); + void FireEvent(const eCAL_Subscriber_Event type_, const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_); + + void FireConnectEvent (const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_); + void FireUpdateEvent (const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_); + void FireDisconnectEvent(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_); size_t GetConnectionCount(); @@ -150,25 +153,28 @@ namespace eCAL ConnectionMapT m_connection_map; std::atomic m_connection_count{ 0 }; - mutable std::mutex m_read_buf_mtx; + mutable std::mutex m_read_buf_mutex; std::condition_variable m_read_buf_cv; bool m_read_buf_received = false; std::string m_read_buf; long long m_read_time = 0; - std::mutex m_receive_callback_mtx; + std::mutex m_receive_callback_mutex; ReceiveIDCallbackT m_receive_callback; std::atomic m_receive_time; std::deque m_sample_hash_queue; using EventCallbackMapT = std::map; - std::mutex m_event_callback_map_mtx; + std::mutex m_event_callback_map_mutex; EventCallbackMapT m_event_callback_map; + std::mutex m_event_id_callback_mutex; + SubEventIDCallbackT m_event_id_callback; + std::atomic m_clock; - std::mutex m_frequency_calculator_mtx; + std::mutex m_frequency_calculator_mutex; ResettableFrequencyCalculator m_frequency_calculator; std::set m_id_set;