Skip to content

Commit

Permalink
Fixed data races when using uv_async_send
Browse files Browse the repository at this point in the history
  • Loading branch information
SChernykh committed Sep 4, 2023
1 parent cfddaf1 commit 2c5cfb6
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 52 deletions.
54 changes: 27 additions & 27 deletions src/p2p_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ P2PServer::P2PServer(p2pool* pool)
uv_mutex_init_checked(&m_broadcastLock);
uv_rwlock_init_checked(&m_cachedBlocksLock);
uv_mutex_init_checked(&m_connectToPeersLock);
uv_mutex_init_checked(&m_showPeersLock);

int err = uv_async_init(&m_loop, &m_broadcastAsync, on_broadcast);
if (err) {
Expand Down Expand Up @@ -149,6 +150,7 @@ P2PServer::~P2PServer()
uv_rwlock_destroy(&m_cachedBlocksLock);

uv_mutex_destroy(&m_connectToPeersLock);
uv_mutex_destroy(&m_showPeersLock);

delete m_block;
delete m_cache;
Expand Down Expand Up @@ -205,13 +207,12 @@ void P2PServer::store_in_cache(const PoolBlock& block)

void P2PServer::connect_to_peers_async(const char* peer_list)
{
{
MutexLock lock(m_connectToPeersLock);
if (!m_connectToPeersData.empty()) {
m_connectToPeersData.append(1, ',');
}
m_connectToPeersData.append(peer_list);
MutexLock lock(m_connectToPeersLock);

if (!m_connectToPeersData.empty()) {
m_connectToPeersData.append(1, ',');
}
m_connectToPeersData.append(peer_list);

if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(&m_connectToPeersAsync))) {
uv_async_send(&m_connectToPeersAsync);
Expand Down Expand Up @@ -827,33 +828,21 @@ void P2PServer::broadcast(const PoolBlock& block, const PoolBlock* parent)

LOGINFO(5, "Broadcasting block " << block.m_sidechainId << " (height " << block.m_sidechainHeight << "): " << data->compact_blob.size() << '/' << data->pruned_blob.size() << '/' << data->blob.size() << " bytes (compact/pruned/full)");

{
MutexLock lock(m_broadcastLock);
m_broadcastQueue.push_back(data);
}
MutexLock lock(m_broadcastLock);

if (uv_is_closing(reinterpret_cast<uv_handle_t*>(&m_broadcastAsync))) {
delete data;
return;
}

m_broadcastQueue.push_back(data);

const int err = uv_async_send(&m_broadcastAsync);
if (err) {
LOGERR(1, "uv_async_send failed, error " << uv_err_name(err));

bool found = false;
{
MutexLock lock(m_broadcastLock);

auto it = std::find(m_broadcastQueue.begin(), m_broadcastQueue.end(), data);
if (it != m_broadcastQueue.end()) {
found = true;
m_broadcastQueue.erase(it);
}
}

if (found) {
delete data;
}
m_broadcastQueue.pop_back();
delete data;
}
}

Expand Down Expand Up @@ -988,6 +977,8 @@ void P2PServer::print_status()

void P2PServer::show_peers_async()
{
MutexLock lock(m_showPeersLock);

if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(&m_showPeersAsync))) {
uv_async_send(&m_showPeersAsync);
}
Expand Down Expand Up @@ -1283,9 +1274,18 @@ void P2PServer::on_shutdown()

uv_timer_stop(&m_timer);
uv_close(reinterpret_cast<uv_handle_t*>(&m_timer), nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&m_broadcastAsync), nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&m_connectToPeersAsync), nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&m_showPeersAsync), nullptr);
{
MutexLock lock(m_broadcastLock);
uv_close(reinterpret_cast<uv_handle_t*>(&m_broadcastAsync), nullptr);
}
{
MutexLock lock(m_connectToPeersLock);
uv_close(reinterpret_cast<uv_handle_t*>(&m_connectToPeersAsync), nullptr);
}
{
MutexLock lock(m_showPeersLock);
uv_close(reinterpret_cast<uv_handle_t*>(&m_showPeersAsync), nullptr);
}
}

void P2PServer::api_update_local_stats()
Expand Down
1 change: 1 addition & 0 deletions src/p2p_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ class P2PServer : public TCPServer

static void on_connect_to_peers(uv_async_t* handle);

uv_mutex_t m_showPeersLock;
uv_async_t m_showPeersAsync;

static void on_show_peers(uv_async_t* handle) { reinterpret_cast<P2PServer*>(handle->data)->show_peers(); }
Expand Down
7 changes: 3 additions & 4 deletions src/p2pool_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ void p2pool_api::create_dir(const std::string& path)

void p2pool_api::on_stop()
{
MutexLock lock(m_dumpDataLock);
uv_close(reinterpret_cast<uv_handle_t*>(&m_dumpToFileAsync), nullptr);
}

Expand All @@ -119,10 +120,8 @@ void p2pool_api::dump_to_file_async_internal(Category category, const char* file
case Category::LOCAL: path = m_localPath + filename; break;
}

{
MutexLock lock(m_dumpDataLock);
m_dumpData[path] = std::move(buf);
}
MutexLock lock(m_dumpDataLock);
m_dumpData[path] = std::move(buf);

if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(&m_dumpToFileAsync))) {
uv_async_send(&m_dumpToFileAsync);
Expand Down
42 changes: 21 additions & 21 deletions src/stratum_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ StratumServer::StratumServer(p2pool* pool)
m_hashrateData[0] = { seconds_since_epoch(), 0 };

uv_mutex_init_checked(&m_blobsQueueLock);
uv_mutex_init_checked(&m_showWorkersLock);
uv_mutex_init_checked(&m_rngLock);
uv_rwlock_init_checked(&m_hashrateDataLock);

Expand Down Expand Up @@ -91,6 +92,7 @@ StratumServer::~StratumServer()
shutdown_tcp();

uv_mutex_destroy(&m_blobsQueueLock);
uv_mutex_destroy(&m_showWorkersLock);
uv_mutex_destroy(&m_rngLock);
uv_rwlock_destroy(&m_hashrateDataLock);

Expand Down Expand Up @@ -161,29 +163,19 @@ void StratumServer::on_block(const BlockTemplate& block)

{
MutexLock lock(m_blobsQueueLock);
m_blobsQueue.push_back(blobs_data);
}

if (uv_is_closing(reinterpret_cast<uv_handle_t*>(&m_blobsAsync))) {
return;
}

const int err = uv_async_send(&m_blobsAsync);
if (err) {
LOGERR(1, "uv_async_send failed, error " << uv_err_name(err));
if (uv_is_closing(reinterpret_cast<uv_handle_t*>(&m_blobsAsync))) {
delete blobs_data;
return;
}

bool found = false;
{
MutexLock lock(m_blobsQueueLock);
m_blobsQueue.push_back(blobs_data);

auto it = std::find(m_blobsQueue.begin(), m_blobsQueue.end(), blobs_data);
if (it != m_blobsQueue.end()) {
found = true;
m_blobsQueue.erase(it);
}
}
const int err = uv_async_send(&m_blobsAsync);
if (err) {
LOGERR(1, "uv_async_send failed, error " << uv_err_name(err));

if (found) {
m_blobsQueue.pop_back();
delete blobs_data;
}
}
Expand Down Expand Up @@ -487,6 +479,8 @@ void StratumServer::print_status()

void StratumServer::show_workers_async()
{
MutexLock lock(m_showWorkersLock);

if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(&m_showWorkersAsync))) {
uv_async_send(&m_showWorkersAsync);
}
Expand Down Expand Up @@ -1031,8 +1025,14 @@ void StratumServer::on_after_share_found(uv_work_t* req, int /*status*/)

void StratumServer::on_shutdown()
{
uv_close(reinterpret_cast<uv_handle_t*>(&m_blobsAsync), nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&m_showWorkersAsync), nullptr);
{
MutexLock lock(m_blobsQueueLock);
uv_close(reinterpret_cast<uv_handle_t*>(&m_blobsAsync), nullptr);
}
{
MutexLock lock(m_showWorkersLock);
uv_close(reinterpret_cast<uv_handle_t*>(&m_showWorkersAsync), nullptr);
}
}

StratumServer::StratumClient::StratumClient()
Expand Down
1 change: 1 addition & 0 deletions src/stratum_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class StratumServer : public TCPServer
static void on_blobs_ready(uv_async_t* handle) { reinterpret_cast<StratumServer*>(handle->data)->on_blobs_ready(); }
void on_blobs_ready();

uv_mutex_t m_showWorkersLock;
uv_async_t m_showWorkersAsync;

static void on_show_workers(uv_async_t* handle) { reinterpret_cast<StratumServer*>(handle->data)->show_workers(); }
Expand Down

0 comments on commit 2c5cfb6

Please sign in to comment.