Skip to content

Commit

Permalink
[core] hotfix/memfile-ack-logic-deadlock (#1795)
Browse files Browse the repository at this point in the history
* connect and disconnect logic of a shared memory file connection leads to "deadlock" in case of very long ack timeout setting
  • Loading branch information
rex-schilasky authored Nov 20, 2024
1 parent 9278997 commit 50ddf9f
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 6 deletions.
20 changes: 15 additions & 5 deletions ecal/core/src/io/shm/ecal_memfile_sync.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -103,6 +103,9 @@ namespace eCAL
if (iter != m_event_handle_map.end())
{
const SEventHandlePair event_pair = iter->second;
// fire acknowledge events, to unlock blocking send function
gSetEvent(event_pair.event_ack);
// close the snd and ack event
gCloseEvent(event_pair.event_snd);
gCloseEvent(event_pair.event_ack);
m_event_handle_map.erase(iter);
Expand Down Expand Up @@ -336,19 +339,26 @@ namespace eCAL
// fire the publisher events
// connected subscribers will read the content from the memory file

const std::lock_guard<std::mutex> lock(m_event_handle_map_sync);
// we work on a copy of the event handle map, this is needed to ..
// 1. unlock a memory file sync via Disconnect(process_id) (ack event is set by the Disconnect in this case)
// 2. be able to add a new memory file sync via Connect(process_id)
EventHandleMapT event_handle_map_snapshot;
{
const std::lock_guard<std::mutex> lock(m_event_handle_map_sync);
event_handle_map_snapshot = m_event_handle_map;
}

// "eat" old acknowledge events :)
if (m_attr.timeout_ack_ms != 0)
{
for (const auto& event_handle : m_event_handle_map)
for (const auto& event_handle : event_handle_map_snapshot)
{
while (gWaitForEvent(event_handle.second.event_ack, 0)) {}
}
}

// send sync (memory file update) event
for (const auto& event_handle : m_event_handle_map)
for (const auto& event_handle : event_handle_map_snapshot)
{
// send sync event
gSetEvent(event_handle.second.event_snd);
Expand All @@ -360,7 +370,7 @@ namespace eCAL
// take start time for all acknowledge timeouts
const auto start_time = std::chrono::steady_clock::now();

for (auto& event_handle : m_event_handle_map)
for (auto& event_handle : event_handle_map_snapshot)
{
const auto time_since_start = std::chrono::steady_clock::now() - start_time;
const auto time_to_wait = std::chrono::milliseconds(m_attr.timeout_ack_ms)- time_since_start;
Expand Down
50 changes: 49 additions & 1 deletion ecal/core/src/readwrite/shm/ecal_writer_shm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,18 @@ namespace eCAL
return sent;
}

void CDataWriterSHM::ApplySubscription(const std::string& host_name_, const int32_t process_id_, const std::string& /*topic_id_*/, const std::string& /*conn_par_*/)
void CDataWriterSHM::ApplySubscription(const std::string& host_name_, const int32_t process_id_, const std::string& topic_id_, const std::string& /*conn_par_*/)
{
// we accept local connections only
if (host_name_ != m_attributes.host_name) return;

// add or update the map with process id's and sets of topic ids
{
const std::lock_guard<std::mutex> lock(m_process_id_topic_id_set_map_sync);
auto& topic_set = m_process_id_topic_id_set_map[process_id_];
topic_set.insert(topic_id_);
}

for (auto& memory_file : m_memory_file_vec)
{
memory_file->Connect(std::to_string(process_id_));
Expand All @@ -97,6 +104,47 @@ namespace eCAL
}
}

void CDataWriterSHM::RemoveSubscription(const std::string& host_name_, const int32_t process_id_, const std::string& topic_id_)
{
// we accept local disconnections only
if (host_name_ != m_attributes.host_name) return;

// remove topic id from the id set for the given process id
bool memfile_has_subscriptions(true);
{
const std::lock_guard<std::mutex> lock(m_process_id_topic_id_set_map_sync);
auto process_it = m_process_id_topic_id_set_map.find(process_id_);

// this process id is connected to the memory file
if (process_it != m_process_id_topic_id_set_map.end())
{
// remove it from the id set
process_it->second.erase(topic_id_);

// this process id has no more connection to this memory file
if (process_it->second.empty())
{
// we can remove the empty topic id set
m_process_id_topic_id_set_map.erase(process_it);
// and set the subscription state to false for later processing
memfile_has_subscriptions = false;
}
}
}

// memory file is still connected to at least one topic id of this process id
// no need to Disconnect process id
if (memfile_has_subscriptions) return;

for (auto& memory_file : m_memory_file_vec)
{
memory_file->Disconnect(std::to_string(process_id_));
#ifndef NDEBUG
Logging::Log(log_level_debug1, std::string("CDataWriterSHM::RemoveSubscription - Memory FileName: ") + memory_file->GetName() + " to ProcessId " + std::to_string(process_id_));
#endif
}
}

Registration::ConnectionPar CDataWriterSHM::GetConnectionParameter()
{
Registration::ConnectionPar connection_par;
Expand Down
8 changes: 8 additions & 0 deletions ecal/core/src/readwrite/shm/ecal_writer_shm.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
#include "readwrite/ecal_writer_base.h"

#include <cstddef>
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <vector>

Expand All @@ -47,6 +50,7 @@ namespace eCAL
bool Write(CPayloadWriter& payload_, const SWriterAttr& attr_) override;

void ApplySubscription(const std::string& host_name_, int32_t process_id_, const std::string& topic_id_, const std::string& conn_par_) override;
void RemoveSubscription(const std::string& host_name_, int32_t process_id_, const std::string& topic_id_) override;

Registration::ConnectionPar GetConnectionParameter() override;

Expand All @@ -58,5 +62,9 @@ namespace eCAL
size_t m_write_idx = 0;
std::vector<std::shared_ptr<CSyncMemoryFile>> m_memory_file_vec;
static const std::string m_memfile_base_name;

using ProcessIDTopicIDSetT = std::map<int32_t, std::set<std::string>>;
std::mutex m_process_id_topic_id_set_map_sync;
ProcessIDTopicIDSetT m_process_id_topic_id_set_map;
};
}

0 comments on commit 50ddf9f

Please sign in to comment.