From 2c2a7a42c94c6e7f0c6a898df24bfb4bc11dbb6d Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Mon, 1 Jul 2024 13:12:39 +0200 Subject: [PATCH 1/2] Added a registry of groups to protect access from a callback --- xtransmit/srt_socket_group.cpp | 59 ++++++++++++++++++++++++++++++++-- 1 file changed, 57 insertions(+), 2 deletions(-) diff --git a/xtransmit/srt_socket_group.cpp b/xtransmit/srt_socket_group.cpp index f9e01ce..5925f8c 100644 --- a/xtransmit/srt_socket_group.cpp +++ b/xtransmit/srt_socket_group.cpp @@ -32,6 +32,45 @@ namespace srt_logging std::string SockStatusStr(SRT_SOCKSTATUS); } +namespace xtransmit +{ +namespace details +{ +class group_registry +{ +public: + void add(intptr_t p) + { + std::lock_guard lck(m_mtx); + m_groups.emplace(p); + } + + void remove(intptr_t p) + { + std::lock_guard lck(m_mtx); + m_groups.erase(p); + } + + class not_found : public std::runtime_error { public: not_found(const char* m) : std::runtime_error(m) {} }; + + std::unique_lock&& scoped_lock(intptr_t p) const + { + std::unique_lock lck(m_mtx); + if (!m_groups.count(p)) + throw not_found(""); + return std::unique_lock(m_mtx); + } + +private: + mutable std::mutex m_mtx; + std::set m_groups; +}; + + +static group_registry g_group_registry; +} +} + #define LOG_SRT_GROUP "SOCKET::SRT_GROUP " SocketOption::Mode detect_srt_mode(const UriParser& uri) @@ -185,6 +224,8 @@ socket::srt_group::srt_group(const vector& uris) spdlog::trace(LOG_SRT_GROUP "Creating a group of callers (type {}).", gtype_str); create_callers(uris, gtype); } + + details::g_group_registry.add((intptr_t) this); } socket::srt_group::srt_group(srt_group& group, int group_id) @@ -202,6 +243,8 @@ socket::srt_group::srt_group(srt_group& group, int group_id) if (SRT_ERROR == srt_epoll_add_usock(m_epoll_io, m_bind_socket, &io_modes)) throw socket::exception(srt_getlasterror_str()); } + + details::g_group_registry.add((intptr_t)this); } socket::srt_group::~srt_group() @@ -215,6 +258,7 @@ socket::srt_group::~srt_group() srt_epoll_release(m_epoll_io); } spdlog::debug(LOG_SRT_GROUP "@{} Closing SRT group", m_bind_socket); + details::g_group_registry.remove((intptr_t)this); release_targets(); release_listeners(); srt_close(m_bind_socket); @@ -443,8 +487,19 @@ int socket::srt_group::listen_callback_fn(void* opaq, SRTSOCKET sock, int hsvers spdlog::trace(LOG_SRT_GROUP "Accepted member socket @{}, host IP {}, remote IP {}", sock, host.str(), sa.str()); // TODO: this group may no longer exist. Use some global array to track valid groups. - socket::srt_group* group = reinterpret_cast(opaq); - return group->on_listen_callback(sock); + + try + { + auto lck = details::g_group_registry.scoped_lock((intptr_t)opaq); + socket::srt_group* group = reinterpret_cast(opaq); + return group->on_listen_callback(sock); + } + catch (const details::group_registry::not_found&) + { + spdlog::warn(LOG_SRT_GROUP "listen_callback_fn: group has already been destructed."); + } + + return 0; } void socket::srt_group::set_listen_callback() From 049a2c559141f14a2e2c1a91b000658eb07c7f42 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Tue, 2 Jul 2024 16:15:59 +0200 Subject: [PATCH 2/2] Fixed errors --- xtransmit/scheduler.hpp | 10 +++++++--- xtransmit/srt_socket_group.cpp | 23 ++++++++++++++++------- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/xtransmit/scheduler.hpp b/xtransmit/scheduler.hpp index 2be0433..fe73a67 100644 --- a/xtransmit/scheduler.hpp +++ b/xtransmit/scheduler.hpp @@ -53,6 +53,11 @@ class scheduler thread_.join(); } + void stop() + { + done_ = true; + } + template void schedule_on(const steady_clock::time_point time, Callable&& f, Args&&... args) { @@ -76,7 +81,6 @@ class scheduler } sync_; multimap> tasks_; - mutex lock_; thread thread_; void timer_loop() @@ -102,14 +106,14 @@ class scheduler void add_task(const steady_clock::time_point time, shared_ptr t) { - lock_guard l(lock_); + lock_guard l(sync_.mtx); tasks_.emplace(time, move(t)); sync_.cv.notify_one(); } void manage_tasks() { - lock_guard l(lock_); + lock_guard l(sync_.mtx); auto end_of_tasks_to_run = tasks_.upper_bound(steady_clock::now()); if (end_of_tasks_to_run != tasks_.begin()) diff --git a/xtransmit/srt_socket_group.cpp b/xtransmit/srt_socket_group.cpp index 5925f8c..fa2c81e 100644 --- a/xtransmit/srt_socket_group.cpp +++ b/xtransmit/srt_socket_group.cpp @@ -53,12 +53,12 @@ class group_registry class not_found : public std::runtime_error { public: not_found(const char* m) : std::runtime_error(m) {} }; - std::unique_lock&& scoped_lock(intptr_t p) const + std::unique_lock scoped_lock(intptr_t p) const { std::unique_lock lck(m_mtx); if (!m_groups.count(p)) throw not_found(""); - return std::unique_lock(m_mtx); + return lck; // Compiler will perform an RVO or move. } private: @@ -249,6 +249,7 @@ socket::srt_group::srt_group(srt_group& group, int group_id) socket::srt_group::~srt_group() { + m_scheduler.stop(); if (!m_blocking_mode) { spdlog::debug(LOG_SRT_GROUP "@{} Closing. Releasing epolls", m_bind_socket); @@ -257,6 +258,7 @@ socket::srt_group::~srt_group() if (m_epoll_io != -1) srt_epoll_release(m_epoll_io); } + spdlog::debug(LOG_SRT_GROUP "@{} Closing SRT group", m_bind_socket); details::g_group_registry.remove((intptr_t)this); release_targets(); @@ -486,10 +488,10 @@ int socket::srt_group::listen_callback_fn(void* opaq, SRTSOCKET sock, int hsvers netaddr_any host(host_sa.get(), host_sa_len); spdlog::trace(LOG_SRT_GROUP "Accepted member socket @{}, host IP {}, remote IP {}", sock, host.str(), sa.str()); - // TODO: this group may no longer exist. Use some global array to track valid groups. try { + // The group passed via 'opaq' may no longer exist. The g_group_registry checks and holds the lifetime. auto lck = details::g_group_registry.scoped_lock((intptr_t)opaq); socket::srt_group* group = reinterpret_cast(opaq); return group->on_listen_callback(sock); @@ -519,10 +521,17 @@ void socket::srt_group::connect_callback_fn(void* opaq, SRTSOCKET sock, int erro return; } - // TODO: this group may no longer exist. Use some global array to track valid groups. - socket::srt_group* group = reinterpret_cast(opaq); - - group->on_connect_callback(sock, error, peer, token); + try + { + // The group passed via 'opaq' may no longer exist. The g_group_registry checks and holds the lifetime. + auto lck = details::g_group_registry.scoped_lock((intptr_t)opaq); + socket::srt_group* group = reinterpret_cast(opaq); + return group->on_connect_callback(sock, error, peer, token); + } + catch (const details::group_registry::not_found&) + { + spdlog::warn(LOG_SRT_GROUP "connect_callback_fn: group has already been destructed."); + } } void socket::srt_group::on_connect_callback(SRTSOCKET sock, int error, const sockaddr* /*peer*/, int token)