Skip to content

Commit

Permalink
Fix ZeroMQ invalid REQ/REP state after read timeout. (#125)
Browse files Browse the repository at this point in the history
After a read timeout, the thread-local ZMQ client is expecting a read,
and not send on the REQ socket. This attempts to read+discard 1 message
before retrying the REQ send. This has proven effective at recovering
from a temporarily unavailable daemon.
  • Loading branch information
vtnerd committed Jun 26, 2024
1 parent dca7059 commit 27b682b
Showing 1 changed file with 21 additions and 5 deletions.
26 changes: 21 additions & 5 deletions src/rest_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "net/http_base.h" // monero/contrib/epee/include
#include "net/net_parse_helpers.h" // monero/contrib/epee/include
#include "net/net_ssl.h" // monero/contrib/epee/include
#include "net/zmq.h" // monero/src
#include "rpc/admin.h"
#include "rpc/client.h"
#include "rpc/daemon_messages.h" // monero/src
Expand Down Expand Up @@ -82,6 +83,21 @@ namespace lws
return {new_ptr};
}

expect<void> send_with_retry(rpc::client& tclient, epee::byte_slice message, const std::chrono::seconds timeout)
{
expect<void> resp{common_error::kInvalidArgument};
for (unsigned i = 0; i < 2; ++i)
{
resp = tclient.send(message.clone(), timeout);
if (resp || resp != net::zmq::make_error_code(EFSM))
break;
// fix state machine by reading+discarding previously timed out response
if (tclient.get_message(timeout).matches(std::errc::timed_out))
return {error::daemon_timeout};
}
return resp;
}

struct context : epee::net_utils::connection_context_base
{
context()
Expand Down Expand Up @@ -402,7 +418,7 @@ namespace lws
histogram_req.recent_cutoff = 0;

epee::byte_slice msg = rpc::client::make_message("get_output_histogram", histogram_req);
MONERO_CHECK((*tclient)->send(std::move(msg), std::chrono::seconds{10}));
MONERO_CHECK(send_with_retry(**tclient, std::move(msg), std::chrono::seconds{10}));

auto histogram_resp = (*tclient)->receive<histogram_rpc::Response>(std::chrono::minutes{3}, MLWS_CURRENT_LOCATION);
if (!histogram_resp)
Expand Down Expand Up @@ -430,7 +446,7 @@ namespace lws

epee::byte_slice msg =
rpc::client::make_message("get_output_distribution", distribution_req);
MONERO_CHECK((*tclient)->send(std::move(msg), std::chrono::seconds{10}));
MONERO_CHECK(send_with_retry(**tclient, std::move(msg), std::chrono::seconds{10}));

auto distribution_resp =
(*tclient)->receive<distribution_rpc::Response>(std::chrono::minutes{3}, MLWS_CURRENT_LOCATION);
Expand Down Expand Up @@ -476,7 +492,7 @@ namespace lws
keys_req.outputs = std::move(ids);

epee::byte_slice msg = rpc::client::make_message("get_output_keys", keys_req);
MONERO_CHECK(tclient->send(std::move(msg), std::chrono::seconds{10}));
MONERO_CHECK(send_with_retry(*tclient, std::move(msg), std::chrono::seconds{10}));

auto keys_resp = tclient->receive<get_keys_rpc::Response>(std::chrono::seconds{10}, MLWS_CURRENT_LOCATION);
if (!keys_resp)
Expand Down Expand Up @@ -541,7 +557,7 @@ namespace lws
rpc_command::Request req{};
req.num_grace_blocks = 10;
epee::byte_slice msg = rpc::client::make_message("get_dynamic_fee_estimate", req);
MONERO_CHECK((*tclient)->send(std::move(msg), std::chrono::seconds{10}));
MONERO_CHECK(send_with_retry(**tclient, std::move(msg), std::chrono::seconds{10}));
}

if ((req.use_dust && *req.use_dust) || !req.dust_threshold)
Expand Down Expand Up @@ -772,7 +788,7 @@ namespace lws
daemon_req.tx_as_hex = std::move(req.tx);

epee::byte_slice message = rpc::client::make_message("send_raw_tx_hex", daemon_req);
MONERO_CHECK((*tclient)->send(std::move(message), std::chrono::seconds{10}));
MONERO_CHECK(send_with_retry(**tclient, std::move(message), std::chrono::seconds{10}));

const auto daemon_resp = (*tclient)->receive<transaction_rpc::Response>(std::chrono::seconds{20}, MLWS_CURRENT_LOCATION);
if (!daemon_resp)
Expand Down

0 comments on commit 27b682b

Please sign in to comment.