Skip to content

Commit

Permalink
Merge pull request #2476 from Taraxa-project/stabilize-test
Browse files Browse the repository at this point in the history
fix crash in p2p test + stabilization for other tests
  • Loading branch information
MatusKysel authored May 15, 2023
2 parents 45ae577 + 3966cf3 commit dbc828b
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 33 deletions.
58 changes: 31 additions & 27 deletions libraries/aleth/libp2p/Host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Host::Host(std::string _clientVersion, KeyPair const& kp, NetworkConfig _n, Tara
handshake_ctx.port = m_listenPort;
handshake_ctx.client_version = m_clientVersion;
handshake_ctx.on_success = [this](auto const& id, auto const& rlp, auto frame_coder, auto socket) {
ba::post(strand_, [=, this, _ = shared_from_this(), rlp = rlp.data().cropped(0, rlp.actualSize()).toBytes(),
ba::post(strand_, [=, this, rlp = rlp.data().cropped(0, rlp.actualSize()).toBytes(),
frame_coder = std::move(frame_coder)]() mutable {
startPeerSession(id, RLP(rlp), std::move(frame_coder), socket);
});
Expand Down Expand Up @@ -94,11 +94,15 @@ Host::Host(std::string _clientVersion, KeyPair const& kp, NetworkConfig _n, Tara
}
}
LOG(m_logger) << "devp2p started. Node id: " << id();
runAcceptor();
//!!! this needs to be post to session_ioc_ as main_loop_body handles peer/session related stuff
// and it should not be execute for bootnodes, but it needs to bind with strand_
// as it touching same structures as discovery part !!!
ba::post(session_ioc_, [this] { ba::post(strand_, [this] { main_loop_body(); }); });
ba::post(session_ioc_, [this] {
ba::post(strand_, [this] {
runAcceptor();
main_loop_body();
});
});
}

std::shared_ptr<Host> Host::make(std::string _clientVersion, CapabilitiesFactory const& cap_factory, KeyPair const& kp,
Expand All @@ -116,10 +120,6 @@ std::shared_ptr<Host> Host::make(std::string _clientVersion, CapabilitiesFactory
}

Host::~Host() {
// reset io_context (allows manually polling network, below)
ioc_.stop();
session_ioc_.restart();

// shutdown acceptor from same executor
ba::post(m_tcp4Acceptor.get_executor(), [this] {
m_tcp4Acceptor.cancel();
Expand All @@ -136,9 +136,13 @@ Host::~Host() {
s->disconnect(ClientQuit);
}
}
while (0 < session_ioc_.poll())
// We need to poll both as strand_ is ioc_
while (0 < session_ioc_.poll() + ioc_.poll())
;
save_state();

ioc_.restart();
session_ioc_.restart();
}

ba::io_context::count_type Host::do_work() {
Expand Down Expand Up @@ -423,7 +427,7 @@ void Host::runAcceptor() {
} else {
// incoming connection; we don't yet know nodeid
auto handshake = make_shared<RLPXHandshake>(handshake_ctx_, socket);
ba::post(strand_, [=, this, this_shared = shared_from_this()] {
ba::post(strand_, [=, this] {
m_connecting.push_back(handshake);
handshake->start();
});
Expand Down Expand Up @@ -462,24 +466,24 @@ void Host::connect(shared_ptr<Peer> const& _p) {
bi::tcp::endpoint ep(_p->get_endpoint());
cnetdetails << "Attempting connection to " << _p->id << "@" << ep << " from " << id();
auto socket = make_shared<RLPXSocket>(bi::tcp::socket(make_strand(session_ioc_)));
socket->ref().async_connect(
ep, ba::bind_executor(strand_, [=, this, this_shared = shared_from_this()](boost::system::error_code const& ec) {
_p->m_lastAttempted = chrono::system_clock::now();
_p->m_failedAttempts++;

if (ec) {
cnetdetails << "Connection refused to node " << _p->id << "@" << ep << " (" << ec.message() << ")";
// Manually set error (session not present)
_p->m_lastDisconnect = TCPError;
} else {
cnetdetails << "Starting RLPX handshake with " << _p->id << "@" << ep;
auto handshake = make_shared<RLPXHandshake>(handshake_ctx_, socket, _p->id);
m_connecting.push_back(handshake);

handshake->start();
}
m_pendingPeerConns.erase(nptr);
}));
socket->ref().async_connect(ep, ba::bind_executor(strand_, [=, this](boost::system::error_code const& ec) {
_p->m_lastAttempted = chrono::system_clock::now();
_p->m_failedAttempts++;

if (ec) {
cnetdetails << "Connection refused to node " << _p->id << "@" << ep << " ("
<< ec.message() << ")";
// Manually set error (session not present)
_p->m_lastDisconnect = TCPError;
} else {
cnetdetails << "Starting RLPX handshake with " << _p->id << "@" << ep;
auto handshake = make_shared<RLPXHandshake>(handshake_ctx_, socket, _p->id);
m_connecting.push_back(handshake);

handshake->start();
}
m_pendingPeerConns.erase(nptr);
}));
}

PeerSessionInfos Host::peerSessionInfos() const {
Expand Down
8 changes: 4 additions & 4 deletions tests/network_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ TEST_F(NetworkTest, transfer_block) {
SharedTransactions transactions({g_signed_trx_samples[0], g_signed_trx_samples[1]});
nw2->getSpecificHandler<network::tarcap::TransactionPacketHandler>()->onNewTransactions(std::move(transactions));

EXPECT_HAPPENS({10s, 200ms}, [&](auto& ctx) {
EXPECT_HAPPENS({60s, 100ms}, [&](auto& ctx) {
nw1->setPendingPeersToReady();
nw2->setPendingPeersToReady();
WAIT_EXPECT_EQ(ctx, nw1->getPeerCount(), 1)
Expand Down Expand Up @@ -318,7 +318,7 @@ TEST_F(NetworkTest, transfer_transaction) {
nw1->start();
nw2->start();

EXPECT_HAPPENS({20s, 100ms}, [&](auto& ctx) {
EXPECT_HAPPENS({60s, 100ms}, [&](auto& ctx) {
nw1->setPendingPeersToReady();
nw2->setPendingPeersToReady();
WAIT_EXPECT_EQ(ctx, nw1->getPeerCount(), 1)
Expand Down Expand Up @@ -365,7 +365,7 @@ TEST_F(NetworkTest, save_network) {
nw2->start();
nw3->start();

EXPECT_HAPPENS({120s, 500ms}, [&](auto& ctx) {
EXPECT_HAPPENS({120s, 100ms}, [&](auto& ctx) {
nw1->setPendingPeersToReady();
nw2->setPendingPeersToReady();
nw3->setPendingPeersToReady();
Expand All @@ -382,7 +382,7 @@ TEST_F(NetworkTest, save_network) {
nw2->start();
nw3->start();

EXPECT_HAPPENS({120s, 500ms}, [&](auto& ctx) {
EXPECT_HAPPENS({120s, 100ms}, [&](auto& ctx) {
nw2->setPendingPeersToReady();
nw3->setPendingPeersToReady();
WAIT_EXPECT_EQ(ctx, nw2->getPeerCount(), 1)
Expand Down
2 changes: 1 addition & 1 deletion tests/p2p_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ TEST_F(P2PTest, multiple_capabilities) {
std::filesystem::remove_all("/tmp/nw3");
};
auto wait_for_connection = [](std::shared_ptr<Network> nw1, std::shared_ptr<Network> nw2) {
EXPECT_HAPPENS({15s, 500ms}, [&](auto &ctx) {
EXPECT_HAPPENS({60s, 100ms}, [&](auto &ctx) {
nw1->setPendingPeersToReady();
nw2->setPendingPeersToReady();
WAIT_EXPECT_EQ(ctx, nw1->getPeerCount(), 1)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_util/src/test_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ std::vector<taraxa::FullNodeConfig> NodesTest::make_node_cfgs(size_t total_count

bool NodesTest::wait_connect(const std::vector<std::shared_ptr<taraxa::FullNode>>& nodes) {
auto num_peers_connected = nodes.size() - 1;
return wait({30s, 1s}, [&](auto& ctx) {
return wait({60s, 100ms}, [&](auto& ctx) {
for (const auto& node : nodes) {
if (ctx.fail_if(node->getNetwork()->getPeerCount() < num_peers_connected)) {
return;
Expand Down

0 comments on commit dbc828b

Please sign in to comment.