Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix crash in p2p test + stabilization for other tests #2476

Merged
merged 3 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 31 additions & 27 deletions libraries/aleth/libp2p/Host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Host::Host(std::string _clientVersion, KeyPair const& kp, NetworkConfig _n, Tara
handshake_ctx.port = m_listenPort;
handshake_ctx.client_version = m_clientVersion;
handshake_ctx.on_success = [this](auto const& id, auto const& rlp, auto frame_coder, auto socket) {
ba::post(strand_, [=, this, _ = shared_from_this(), rlp = rlp.data().cropped(0, rlp.actualSize()).toBytes(),
ba::post(strand_, [=, this, rlp = rlp.data().cropped(0, rlp.actualSize()).toBytes(),
frame_coder = std::move(frame_coder)]() mutable {
startPeerSession(id, RLP(rlp), std::move(frame_coder), socket);
});
Expand Down Expand Up @@ -94,11 +94,15 @@ Host::Host(std::string _clientVersion, KeyPair const& kp, NetworkConfig _n, Tara
}
}
LOG(m_logger) << "devp2p started. Node id: " << id();
runAcceptor();
//!!! this needs to be post to session_ioc_ as main_loop_body handles peer/session related stuff
// and it should not be execute for bootnodes, but it needs to bind with strand_
// as it touching same structures as discovery part !!!
ba::post(session_ioc_, [this] { ba::post(strand_, [this] { main_loop_body(); }); });
ba::post(session_ioc_, [this] {
ba::post(strand_, [this] {
runAcceptor();
main_loop_body();
});
});
}

std::shared_ptr<Host> Host::make(std::string _clientVersion, CapabilitiesFactory const& cap_factory, KeyPair const& kp,
Expand All @@ -116,10 +120,6 @@ std::shared_ptr<Host> Host::make(std::string _clientVersion, CapabilitiesFactory
}

Host::~Host() {
// reset io_context (allows manually polling network, below)
ioc_.stop();
session_ioc_.restart();

// shutdown acceptor from same executor
ba::post(m_tcp4Acceptor.get_executor(), [this] {
m_tcp4Acceptor.cancel();
Expand All @@ -136,9 +136,13 @@ Host::~Host() {
s->disconnect(ClientQuit);
}
}
while (0 < session_ioc_.poll())
// We need to poll both as strand_ is ioc_
while (0 < session_ioc_.poll() + ioc_.poll())
;
save_state();

ioc_.restart();
session_ioc_.restart();
}

ba::io_context::count_type Host::do_work() {
Expand Down Expand Up @@ -423,7 +427,7 @@ void Host::runAcceptor() {
} else {
// incoming connection; we don't yet know nodeid
auto handshake = make_shared<RLPXHandshake>(handshake_ctx_, socket);
ba::post(strand_, [=, this, this_shared = shared_from_this()] {
ba::post(strand_, [=, this] {
m_connecting.push_back(handshake);
handshake->start();
});
Expand Down Expand Up @@ -462,24 +466,24 @@ void Host::connect(shared_ptr<Peer> const& _p) {
bi::tcp::endpoint ep(_p->get_endpoint());
cnetdetails << "Attempting connection to " << _p->id << "@" << ep << " from " << id();
auto socket = make_shared<RLPXSocket>(bi::tcp::socket(make_strand(session_ioc_)));
socket->ref().async_connect(
ep, ba::bind_executor(strand_, [=, this, this_shared = shared_from_this()](boost::system::error_code const& ec) {
_p->m_lastAttempted = chrono::system_clock::now();
_p->m_failedAttempts++;

if (ec) {
cnetdetails << "Connection refused to node " << _p->id << "@" << ep << " (" << ec.message() << ")";
// Manually set error (session not present)
_p->m_lastDisconnect = TCPError;
} else {
cnetdetails << "Starting RLPX handshake with " << _p->id << "@" << ep;
auto handshake = make_shared<RLPXHandshake>(handshake_ctx_, socket, _p->id);
m_connecting.push_back(handshake);

handshake->start();
}
m_pendingPeerConns.erase(nptr);
}));
socket->ref().async_connect(ep, ba::bind_executor(strand_, [=, this](boost::system::error_code const& ec) {
_p->m_lastAttempted = chrono::system_clock::now();
_p->m_failedAttempts++;

if (ec) {
cnetdetails << "Connection refused to node " << _p->id << "@" << ep << " ("
<< ec.message() << ")";
// Manually set error (session not present)
_p->m_lastDisconnect = TCPError;
} else {
cnetdetails << "Starting RLPX handshake with " << _p->id << "@" << ep;
auto handshake = make_shared<RLPXHandshake>(handshake_ctx_, socket, _p->id);
m_connecting.push_back(handshake);

handshake->start();
}
m_pendingPeerConns.erase(nptr);
}));
}

PeerSessionInfos Host::peerSessionInfos() const {
Expand Down
8 changes: 4 additions & 4 deletions tests/network_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ TEST_F(NetworkTest, transfer_block) {
SharedTransactions transactions({g_signed_trx_samples[0], g_signed_trx_samples[1]});
nw2->getSpecificHandler<network::tarcap::TransactionPacketHandler>()->onNewTransactions(std::move(transactions));

EXPECT_HAPPENS({10s, 200ms}, [&](auto& ctx) {
EXPECT_HAPPENS({60s, 100ms}, [&](auto& ctx) {
nw1->setPendingPeersToReady();
nw2->setPendingPeersToReady();
WAIT_EXPECT_EQ(ctx, nw1->getPeerCount(), 1)
Expand Down Expand Up @@ -318,7 +318,7 @@ TEST_F(NetworkTest, transfer_transaction) {
nw1->start();
nw2->start();

EXPECT_HAPPENS({20s, 100ms}, [&](auto& ctx) {
EXPECT_HAPPENS({60s, 100ms}, [&](auto& ctx) {
nw1->setPendingPeersToReady();
nw2->setPendingPeersToReady();
WAIT_EXPECT_EQ(ctx, nw1->getPeerCount(), 1)
Expand Down Expand Up @@ -365,7 +365,7 @@ TEST_F(NetworkTest, save_network) {
nw2->start();
nw3->start();

EXPECT_HAPPENS({120s, 500ms}, [&](auto& ctx) {
EXPECT_HAPPENS({120s, 100ms}, [&](auto& ctx) {
nw1->setPendingPeersToReady();
nw2->setPendingPeersToReady();
nw3->setPendingPeersToReady();
Expand All @@ -382,7 +382,7 @@ TEST_F(NetworkTest, save_network) {
nw2->start();
nw3->start();

EXPECT_HAPPENS({120s, 500ms}, [&](auto& ctx) {
EXPECT_HAPPENS({120s, 100ms}, [&](auto& ctx) {
nw2->setPendingPeersToReady();
nw3->setPendingPeersToReady();
WAIT_EXPECT_EQ(ctx, nw2->getPeerCount(), 1)
Expand Down
2 changes: 1 addition & 1 deletion tests/p2p_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ TEST_F(P2PTest, multiple_capabilities) {
std::filesystem::remove_all("/tmp/nw3");
};
auto wait_for_connection = [](std::shared_ptr<Network> nw1, std::shared_ptr<Network> nw2) {
EXPECT_HAPPENS({15s, 500ms}, [&](auto &ctx) {
EXPECT_HAPPENS({60s, 100ms}, [&](auto &ctx) {
nw1->setPendingPeersToReady();
nw2->setPendingPeersToReady();
WAIT_EXPECT_EQ(ctx, nw1->getPeerCount(), 1)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_util/src/test_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ std::vector<taraxa::FullNodeConfig> NodesTest::make_node_cfgs(size_t total_count

bool NodesTest::wait_connect(const std::vector<std::shared_ptr<taraxa::FullNode>>& nodes) {
auto num_peers_connected = nodes.size() - 1;
return wait({30s, 1s}, [&](auto& ctx) {
return wait({60s, 100ms}, [&](auto& ctx) {
for (const auto& node : nodes) {
if (ctx.fail_if(node->getNetwork()->getPeerCount() < num_peers_connected)) {
return;
Expand Down