From 2c5cfb644235722904978f0cbdeb1284f552d11b Mon Sep 17 00:00:00 2001 From: SChernykh Date: Mon, 4 Sep 2023 19:33:31 +0200 Subject: [PATCH] Fixed data races when using uv_async_send --- src/p2p_server.cpp | 54 +++++++++++++++++++++--------------------- src/p2p_server.h | 1 + src/p2pool_api.cpp | 7 +++--- src/stratum_server.cpp | 42 ++++++++++++++++---------------- src/stratum_server.h | 1 + 5 files changed, 53 insertions(+), 52 deletions(-) diff --git a/src/p2p_server.cpp b/src/p2p_server.cpp index 01a9b3c5..5cedb482 100644 --- a/src/p2p_server.cpp +++ b/src/p2p_server.cpp @@ -91,6 +91,7 @@ P2PServer::P2PServer(p2pool* pool) uv_mutex_init_checked(&m_broadcastLock); uv_rwlock_init_checked(&m_cachedBlocksLock); uv_mutex_init_checked(&m_connectToPeersLock); + uv_mutex_init_checked(&m_showPeersLock); int err = uv_async_init(&m_loop, &m_broadcastAsync, on_broadcast); if (err) { @@ -149,6 +150,7 @@ P2PServer::~P2PServer() uv_rwlock_destroy(&m_cachedBlocksLock); uv_mutex_destroy(&m_connectToPeersLock); + uv_mutex_destroy(&m_showPeersLock); delete m_block; delete m_cache; @@ -205,13 +207,12 @@ void P2PServer::store_in_cache(const PoolBlock& block) void P2PServer::connect_to_peers_async(const char* peer_list) { - { - MutexLock lock(m_connectToPeersLock); - if (!m_connectToPeersData.empty()) { - m_connectToPeersData.append(1, ','); - } - m_connectToPeersData.append(peer_list); + MutexLock lock(m_connectToPeersLock); + + if (!m_connectToPeersData.empty()) { + m_connectToPeersData.append(1, ','); } + m_connectToPeersData.append(peer_list); if (!uv_is_closing(reinterpret_cast(&m_connectToPeersAsync))) { uv_async_send(&m_connectToPeersAsync); @@ -827,33 +828,21 @@ void P2PServer::broadcast(const PoolBlock& block, const PoolBlock* parent) LOGINFO(5, "Broadcasting block " << block.m_sidechainId << " (height " << block.m_sidechainHeight << "): " << data->compact_blob.size() << '/' << data->pruned_blob.size() << '/' << data->blob.size() << " bytes (compact/pruned/full)"); - { - MutexLock lock(m_broadcastLock); - m_broadcastQueue.push_back(data); - } + MutexLock lock(m_broadcastLock); if (uv_is_closing(reinterpret_cast(&m_broadcastAsync))) { + delete data; return; } + m_broadcastQueue.push_back(data); + const int err = uv_async_send(&m_broadcastAsync); if (err) { LOGERR(1, "uv_async_send failed, error " << uv_err_name(err)); - bool found = false; - { - MutexLock lock(m_broadcastLock); - - auto it = std::find(m_broadcastQueue.begin(), m_broadcastQueue.end(), data); - if (it != m_broadcastQueue.end()) { - found = true; - m_broadcastQueue.erase(it); - } - } - - if (found) { - delete data; - } + m_broadcastQueue.pop_back(); + delete data; } } @@ -988,6 +977,8 @@ void P2PServer::print_status() void P2PServer::show_peers_async() { + MutexLock lock(m_showPeersLock); + if (!uv_is_closing(reinterpret_cast(&m_showPeersAsync))) { uv_async_send(&m_showPeersAsync); } @@ -1283,9 +1274,18 @@ void P2PServer::on_shutdown() uv_timer_stop(&m_timer); uv_close(reinterpret_cast(&m_timer), nullptr); - uv_close(reinterpret_cast(&m_broadcastAsync), nullptr); - uv_close(reinterpret_cast(&m_connectToPeersAsync), nullptr); - uv_close(reinterpret_cast(&m_showPeersAsync), nullptr); + { + MutexLock lock(m_broadcastLock); + uv_close(reinterpret_cast(&m_broadcastAsync), nullptr); + } + { + MutexLock lock(m_connectToPeersLock); + uv_close(reinterpret_cast(&m_connectToPeersAsync), nullptr); + } + { + MutexLock lock(m_showPeersLock); + uv_close(reinterpret_cast(&m_showPeersAsync), nullptr); + } } void P2PServer::api_update_local_stats() diff --git a/src/p2p_server.h b/src/p2p_server.h index d08b9a96..95f5aeda 100644 --- a/src/p2p_server.h +++ b/src/p2p_server.h @@ -267,6 +267,7 @@ class P2PServer : public TCPServer static void on_connect_to_peers(uv_async_t* handle); + uv_mutex_t m_showPeersLock; uv_async_t m_showPeersAsync; static void on_show_peers(uv_async_t* handle) { reinterpret_cast(handle->data)->show_peers(); } diff --git a/src/p2pool_api.cpp b/src/p2pool_api.cpp index 5b7c18c6..c361a429 100644 --- a/src/p2pool_api.cpp +++ b/src/p2pool_api.cpp @@ -100,6 +100,7 @@ void p2pool_api::create_dir(const std::string& path) void p2pool_api::on_stop() { + MutexLock lock(m_dumpDataLock); uv_close(reinterpret_cast(&m_dumpToFileAsync), nullptr); } @@ -119,10 +120,8 @@ void p2pool_api::dump_to_file_async_internal(Category category, const char* file case Category::LOCAL: path = m_localPath + filename; break; } - { - MutexLock lock(m_dumpDataLock); - m_dumpData[path] = std::move(buf); - } + MutexLock lock(m_dumpDataLock); + m_dumpData[path] = std::move(buf); if (!uv_is_closing(reinterpret_cast(&m_dumpToFileAsync))) { uv_async_send(&m_dumpToFileAsync); diff --git a/src/stratum_server.cpp b/src/stratum_server.cpp index aac3f16b..488e4e50 100644 --- a/src/stratum_server.cpp +++ b/src/stratum_server.cpp @@ -63,6 +63,7 @@ StratumServer::StratumServer(p2pool* pool) m_hashrateData[0] = { seconds_since_epoch(), 0 }; uv_mutex_init_checked(&m_blobsQueueLock); + uv_mutex_init_checked(&m_showWorkersLock); uv_mutex_init_checked(&m_rngLock); uv_rwlock_init_checked(&m_hashrateDataLock); @@ -91,6 +92,7 @@ StratumServer::~StratumServer() shutdown_tcp(); uv_mutex_destroy(&m_blobsQueueLock); + uv_mutex_destroy(&m_showWorkersLock); uv_mutex_destroy(&m_rngLock); uv_rwlock_destroy(&m_hashrateDataLock); @@ -161,29 +163,19 @@ void StratumServer::on_block(const BlockTemplate& block) { MutexLock lock(m_blobsQueueLock); - m_blobsQueue.push_back(blobs_data); - } - - if (uv_is_closing(reinterpret_cast(&m_blobsAsync))) { - return; - } - const int err = uv_async_send(&m_blobsAsync); - if (err) { - LOGERR(1, "uv_async_send failed, error " << uv_err_name(err)); + if (uv_is_closing(reinterpret_cast(&m_blobsAsync))) { + delete blobs_data; + return; + } - bool found = false; - { - MutexLock lock(m_blobsQueueLock); + m_blobsQueue.push_back(blobs_data); - auto it = std::find(m_blobsQueue.begin(), m_blobsQueue.end(), blobs_data); - if (it != m_blobsQueue.end()) { - found = true; - m_blobsQueue.erase(it); - } - } + const int err = uv_async_send(&m_blobsAsync); + if (err) { + LOGERR(1, "uv_async_send failed, error " << uv_err_name(err)); - if (found) { + m_blobsQueue.pop_back(); delete blobs_data; } } @@ -487,6 +479,8 @@ void StratumServer::print_status() void StratumServer::show_workers_async() { + MutexLock lock(m_showWorkersLock); + if (!uv_is_closing(reinterpret_cast(&m_showWorkersAsync))) { uv_async_send(&m_showWorkersAsync); } @@ -1031,8 +1025,14 @@ void StratumServer::on_after_share_found(uv_work_t* req, int /*status*/) void StratumServer::on_shutdown() { - uv_close(reinterpret_cast(&m_blobsAsync), nullptr); - uv_close(reinterpret_cast(&m_showWorkersAsync), nullptr); + { + MutexLock lock(m_blobsQueueLock); + uv_close(reinterpret_cast(&m_blobsAsync), nullptr); + } + { + MutexLock lock(m_showWorkersLock); + uv_close(reinterpret_cast(&m_showWorkersAsync), nullptr); + } } StratumServer::StratumClient::StratumClient() diff --git a/src/stratum_server.h b/src/stratum_server.h index ec2946c6..0974d0a9 100644 --- a/src/stratum_server.h +++ b/src/stratum_server.h @@ -127,6 +127,7 @@ class StratumServer : public TCPServer static void on_blobs_ready(uv_async_t* handle) { reinterpret_cast(handle->data)->on_blobs_ready(); } void on_blobs_ready(); + uv_mutex_t m_showWorkersLock; uv_async_t m_showWorkersAsync; static void on_show_workers(uv_async_t* handle) { reinterpret_cast(handle->data)->show_workers(); }