Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RXI-1136 allow multi commits with the same claimID #111

Merged
merged 3 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 135 additions & 34 deletions src/test/main_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,13 @@ extern const char accTxIss1[];
extern const char accTxIss2[];
extern const char submIss[];

std::mutex gMCons;

static void
fail(boost::beast::error_code ec, char const* what)
{
auto s = fmt::format("{}: {}", what, ec.message());
std::lock_guard cl(gMCons);
std::cerr << "Error, throw: " << s << std::endl;
throw std::runtime_error(s);
}
Expand All @@ -109,8 +112,11 @@ wait_for(
return true;

auto const b = gCv.wait_for(l, to, stop);
DBG(std::cout << msg << ", wait finished: " << (b ? "condition" : "timeout")
<< std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << msg << ", wait finished: " << (b ? "condition" : "timeout")
<< std::endl;
})
return b;
}

Expand Down Expand Up @@ -152,6 +158,12 @@ class engineLoc
throw std::runtime_error("No 'method' parameter");
auto const method = msg[ripple::jss::method].asString();

DBG({
std::lock_guard cl(gMCons);
std::cout << "process(), method:" << method << ", side: " << side()
<< std::endl;
})

std::string s;
unsigned const id = msg["id"].asUInt();

Expand Down Expand Up @@ -182,7 +194,10 @@ class engineLoc

if (!clientInit_)
{
DBG(std::cout << side() << " clientInit" << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << side() << " clientInit" << std::endl;
})
std::unique_lock l(gMcv);
clientInit_ = true;
gCv.notify_all();
Expand Down Expand Up @@ -251,6 +266,12 @@ class engineIss
throw std::runtime_error("No 'method' parameter");
auto const method = msg[ripple::jss::method].asString();

DBG({
std::lock_guard cl(gMCons);
std::cout << "process(), method:" << method << ", side: " << side()
<< std::endl;
})

std::string s;
unsigned const id = msg["id"].asUInt();

Expand Down Expand Up @@ -288,7 +309,10 @@ class engineIss

if (!clientInit_)
{
DBG(std::cout << side() << " clientInit" << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << side() << " clientInit" << std::endl;
})
std::unique_lock l(gMcv);
clientInit_ = true;
gCv.notify_all();
Expand Down Expand Up @@ -371,7 +395,11 @@ class session : public std::enable_shared_from_this<session<engine>>
void
run()
{
DBG(std::cout << "session::run()" << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << "session::run()"
<< ", side: " << e_.side() << std::endl;
})
ws_.set_option(websocket::stream_base::timeout::suggested(
boost::beast::role_type::server));
ws_.set_option(websocket::stream_base::decorator(
Expand All @@ -390,7 +418,11 @@ class session : public std::enable_shared_from_this<session<engine>>
void
onAccept(boost::beast::error_code ec)
{
DBG(std::cout << "session::onAccept(), ec:" << ec << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << "session::onAccept(), ec:" << ec
<< ", side: " << e_.side() << std::endl;
})
if (ec == websocket::error::closed)
return;
if (ec)
Expand All @@ -401,7 +433,11 @@ class session : public std::enable_shared_from_this<session<engine>>
void
doRead()
{
DBG(std::cout << "session::doRead()" << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << "session::doRead()"
<< ", side: " << e_.side() << std::endl;
})
ws_.async_read(
buffer_,
boost::beast::bind_front_handler(
Expand All @@ -413,8 +449,12 @@ class session : public std::enable_shared_from_this<session<engine>>
void
onRead(boost::beast::error_code ec, std::size_t bytes_transferred)
{
DBG(std::cout << "session::onRead(), ec:" << ec
<< " bytes: " << bytes_transferred << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << "session::onRead(), ec:" << ec
<< " bytes: " << bytes_transferred
<< ", side: " << e_.side() << std::endl;
})
boost::ignore_unused(bytes_transferred);
if (ec == websocket::error::closed)
return;
Expand Down Expand Up @@ -461,8 +501,12 @@ class session : public std::enable_shared_from_this<session<engine>>
void
onWrite(boost::beast::error_code ec, std::size_t bytes_transferred)
{
DBG(std::cout << "session::onWrite(), ec:" << ec
<< " bytes: " << bytes_transferred << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << "session::onWrite(), ec:" << ec
<< " bytes: " << bytes_transferred
<< ", side: " << e_.side() << std::endl;
})
boost::ignore_unused(bytes_transferred);
if (ec == websocket::error::closed)
return;
Expand All @@ -475,11 +519,18 @@ class session : public std::enable_shared_from_this<session<engine>>
void
shutdown()
{
DBG(std::cout << "session::shutdown()" << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << "session::shutdown()"
<< ", side: " << e_.side() << std::endl;
})
ios_.post([this] {
ws_.async_close({}, [this](boost::beast::error_code const& ec) {
DBG(std::cout << "session::onAsync_close(), ec:" << ec
<< std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << "session::onAsync_close(), ec:" << ec
<< ", side: " << e_.side() << std::endl;
})
std::unique_lock l(gMcv);
this->finished_ = true;
gCv.notify_all();
Expand All @@ -502,7 +553,10 @@ class session : public std::enable_shared_from_this<session<engine>>
void
sendNewLedger()
{
DBG(std::cout << e_.side() << " session::sendNewLedger()" << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << e_.side() << " session::sendNewLedger()" << std::endl;
})
auto const jv = e_.getNewLedger();
auto const s = to_string(jv);

Expand All @@ -518,8 +572,12 @@ class session : public std::enable_shared_from_this<session<engine>>
void
onWriteNewLedger(boost::beast::error_code ec, std::size_t bytes_transferred)
{
DBG(std::cout << e_.side() << " session::onWriteNewLedger(), ec:" << ec
<< " bytes: " << bytes_transferred << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << " session::onWriteNewLedger(), ec:" << ec
<< ", bytes: " << bytes_transferred
<< ", side: " << side() << std::endl;
})
boost::ignore_unused(bytes_transferred);
if (ec == websocket::error::closed)
return;
Expand All @@ -538,6 +596,12 @@ class session : public std::enable_shared_from_this<session<engine>>
{
return e_.blobOk();
}

std::string
side() const
{
return e_.side();
}
};

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -567,7 +631,11 @@ class listener : public std::enable_shared_from_this<listener<engine>>
if (ec)
fail(ec, "bind");

DBG(std::cout << "listener::listen()" << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << "listener::listen()"
<< ", side: " << session_->side() << std::endl;
})
acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec);
if (ec)
fail(ec, "listen");
Expand All @@ -576,17 +644,29 @@ class listener : public std::enable_shared_from_this<listener<engine>>
void
run()
{
DBG(std::cout << "listener::run()" << std::endl);
DBG({
std::lock_guard cl(gMCons);
std::cout << "listener::run()"
<< ", side: " << session_->side() << std::endl;
})
doAccept();
}

void
shutdown()
{
DBG(std::cout << "listener::shutdown()" << std::endl);
DBG({
std::lock_guard cl(gMCons);
std::cout << "listener::shutdown()"
<< ", side: " << session_->side() << std::endl;
})
boost::system::error_code ec;
acceptor_.cancel(ec);
DBG(if (ec) std::cout << "cancel error: " << ec << std::endl);
DBG(if (ec) {
std::lock_guard cl(gMCons);
std::cout << "cancel error: " << ec
<< ", side: " << session_->side() << std::endl;
})
if (session_)
session_->shutdown();
}
Expand Down Expand Up @@ -619,7 +699,11 @@ class listener : public std::enable_shared_from_this<listener<engine>>
void
doAccept()
{
DBG(std::cout << "listener::doAccept()" << std::endl);
DBG({
std::lock_guard cl(gMCons);
std::cout << "listener::doAccept()"
<< ", side: " << session_->side() << std::endl;
})
acceptor_.async_accept(
boost::asio::make_strand(ios_),
boost::beast::bind_front_handler(
Expand All @@ -631,7 +715,11 @@ class listener : public std::enable_shared_from_this<listener<engine>>
void
onAccept(boost::beast::error_code ec, tcp::socket socket)
{
DBG(std::cout << "listener::onAccept(), ec:" << ec << std::endl);
DBG({
std::lock_guard cl(gMCons);
std::cout << "listener::onAccept(), ec:" << ec
<< ", side: " << session_->side() << std::endl;
})
if (ec == boost::system::errc::operation_canceled)
return;
if (ec)
Expand Down Expand Up @@ -664,19 +752,28 @@ struct Connection
void
startIOThreads()
{
DBG(std::cout << "Connection::startIOThreads()" << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << "Connection::startIOThreads()" << std::endl;
})
ioThreads_.reserve(NUM_THREADS);
for (auto i = 0; i < NUM_THREADS; ++i)
ioThreads_.emplace_back([this, i] {
ios_.run();
DBG(std::cout << "ioThread #" << i << " finished" << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << "ioThread #" << i << " finished" << std::endl;
})
});
}

void
waitIOThreads()
{
DBG(std::cout << "Connection::waitIOThreads()" << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << "Connection::waitIOThreads()" << std::endl;
})
for (auto& t : ioThreads_)
if (t.joinable())
t.join();
Expand All @@ -686,7 +783,10 @@ struct Connection
void
startServers()
{
DBG(std::cout << "Connection::startServers()" << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << "Connection::startServers()" << std::endl;
})
auto const address = boost::asio::ip::make_address(LHOST);
serverLoc_ = std::make_shared<listener<engineLoc>>(
ios_, tcp::endpoint{address, PORT_LOC});
Expand All @@ -699,7 +799,10 @@ struct Connection
void
shutdownServers()
{
DBG(std::cout << "Connection::shutdownServers()" << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << "Connection::shutdownServers()" << std::endl;
})
serverLoc_->shutdown();
serverIss_->shutdown();
wait_for(1s, [this]() {
Expand Down Expand Up @@ -788,17 +891,15 @@ class Main_test : public beast::unit_test::suite
bool locRepl = false, issRepl = false;
for (std::string l; (!locRepl || !issRepl) && std::getline(logf, l);)
{
if (l.find(
"initSyncDone start replay {\"chainType\": \"locking\",") !=
std::string::npos)
if (l.find("initSyncDone start replay {\"chainType\": "
"\"locking\",") != std::string::npos)
{
locRepl = true;
continue;
}

if (l.find(
"initSyncDone start replay {\"chainType\": \"issuing\",") !=
std::string::npos)
if (l.find("initSyncDone start replay {\"chainType\": "
"\"issuing\",") != std::string::npos)
{
issRepl = true;
continue;
Expand Down
Loading