Skip to content

Commit

Permalink
net: Use mockable time for ping/pong, add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcoFalke committed Jun 19, 2020
1 parent faab4aa commit fa33654
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 34 deletions.
21 changes: 11 additions & 10 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -563,15 +563,15 @@ void CNode::copyStats(CNodeStats &stats, const std::vector<bool> &m_asmap)
// since pingtime does not update until the ping is complete, which might take a while.
// So, if a ping is taking an unusually long time in flight,
// the caller can immediately detect that this is happening.
int64_t nPingUsecWait = 0;
if ((0 != nPingNonceSent) && (0 != nPingUsecStart)) {
nPingUsecWait = GetTimeMicros() - nPingUsecStart;
std::chrono::microseconds ping_wait{0};
if ((0 != nPingNonceSent) && (0 != m_ping_start.load().count())) {
ping_wait = GetTime<std::chrono::microseconds>() - m_ping_start.load();
}

// Raw ping time is in microseconds, but show it to user as whole seconds (Bitcoin users should be well used to small numbers with many decimal places by now :)
stats.m_ping_usec = nPingUsecTime;
stats.m_min_ping_usec = nMinPingUsecTime;
stats.m_ping_wait_usec = nPingUsecWait;
stats.m_ping_wait_usec = count_microseconds(ping_wait);

// Leave string empty if addrLocal invalid (not filled in yet)
CService addrLocalUnlocked = GetAddrLocal();
Expand All @@ -582,9 +582,9 @@ void CNode::copyStats(CNodeStats &stats, const std::vector<bool> &m_asmap)
bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete)
{
complete = false;
int64_t nTimeMicros = GetTimeMicros();
const auto time = GetTime<std::chrono::microseconds>();
LOCK(cs_vRecv);
nLastRecv = nTimeMicros / 1000000;
nLastRecv = std::chrono::duration_cast<std::chrono::seconds>(time).count();
nRecvBytes += nBytes;
while (nBytes > 0) {
// absorb network data
Expand All @@ -596,7 +596,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete

if (m_deserializer->Complete()) {
// decompose a transport agnostic CNetMessage from the deserializer
CNetMessage msg = m_deserializer->GetMessage(Params().MessageStart(), nTimeMicros);
CNetMessage msg = m_deserializer->GetMessage(Params().MessageStart(), time);

//store received bytes per message command
//to prevent a memory DOS, only allow valid commands
Expand Down Expand Up @@ -699,7 +699,8 @@ const uint256& V1TransportDeserializer::GetMessageHash() const
return data_hash;
}

CNetMessage V1TransportDeserializer::GetMessage(const CMessageHeader::MessageStartChars& message_start, int64_t time) {
CNetMessage V1TransportDeserializer::GetMessage(const CMessageHeader::MessageStartChars& message_start, const std::chrono::microseconds time)
{
// decompose a single CNetMessage from the TransportDeserializer
CNetMessage msg(std::move(vRecv));

Expand Down Expand Up @@ -1154,9 +1155,9 @@ void CConnman::InactivityCheck(CNode *pnode)
LogPrintf("socket receive timeout: %is\n", nTime - pnode->nLastRecv);
pnode->fDisconnect = true;
}
else if (pnode->nPingNonceSent && pnode->nPingUsecStart + TIMEOUT_INTERVAL * 1000000 < GetTimeMicros())
else if (pnode->nPingNonceSent && pnode->m_ping_start.load() + std::chrono::seconds{TIMEOUT_INTERVAL} < GetTime<std::chrono::microseconds>())
{
LogPrintf("ping timeout: %fs\n", 0.000001 * (GetTimeMicros() - pnode->nPingUsecStart));
LogPrintf("ping timeout: %fs\n", 0.000001 * count_microseconds(GetTime<std::chrono::microseconds>() - pnode->m_ping_start.load()));
pnode->fDisconnect = true;
}
else if (!pnode->fSuccessfullyConnected)
Expand Down
16 changes: 8 additions & 8 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -612,13 +612,13 @@ class CNodeStats
*/
class CNetMessage {
public:
CDataStream m_recv; // received message data
int64_t m_time = 0; // time (in microseconds) of message receipt.
CDataStream m_recv; //!< received message data
std::chrono::microseconds m_time{0}; //!< time of message receipt
bool m_valid_netmagic = false;
bool m_valid_header = false;
bool m_valid_checksum = false;
uint32_t m_message_size = 0; // size of the payload
uint32_t m_raw_message_size = 0; // used wire size of the message (including header/checksum)
uint32_t m_message_size{0}; //!< size of the payload
uint32_t m_raw_message_size{0}; //!< used wire size of the message (including header/checksum)
std::string m_command;

CNetMessage(CDataStream&& recv_in) : m_recv(std::move(recv_in)) {}
Expand All @@ -642,7 +642,7 @@ class TransportDeserializer {
// read and deserialize data
virtual int Read(const char *data, unsigned int bytes) = 0;
// decomposes a message from the context
virtual CNetMessage GetMessage(const CMessageHeader::MessageStartChars& message_start, int64_t time) = 0;
virtual CNetMessage GetMessage(const CMessageHeader::MessageStartChars& message_start, std::chrono::microseconds time) = 0;
virtual ~TransportDeserializer() {}
};

Expand Down Expand Up @@ -695,7 +695,7 @@ class V1TransportDeserializer final : public TransportDeserializer
if (ret < 0) Reset();
return ret;
}
CNetMessage GetMessage(const CMessageHeader::MessageStartChars& message_start, int64_t time) override;
CNetMessage GetMessage(const CMessageHeader::MessageStartChars& message_start, std::chrono::microseconds time) override;
};

/** The TransportSerializer prepares messages for the network transport
Expand Down Expand Up @@ -845,8 +845,8 @@ class CNode
// Ping time measurement:
// The pong reply we're expecting, or 0 if no pong expected.
std::atomic<uint64_t> nPingNonceSent{0};
// Time (in usec) the last ping was sent, or 0 if no ping was ever sent.
std::atomic<int64_t> nPingUsecStart{0};
/** When the last ping was sent, or 0 if no ping was ever sent */
std::atomic<std::chrono::microseconds> m_ping_start{std::chrono::microseconds{0}};
// Last measured round-trip time.
std::atomic<int64_t> nPingUsecTime{0};
// Best measured round-trip time.
Expand Down
22 changes: 11 additions & 11 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ static constexpr int STALE_RELAY_AGE_LIMIT = 30 * 24 * 60 * 60;
/// Age after which a block is considered historical for purposes of rate
/// limiting block relay. Set to one week, denominated in seconds.
static constexpr int HISTORICAL_BLOCK_AGE = 7 * 24 * 60 * 60;
/** Time between pings automatically sent out for latency probing and keepalive (in seconds). */
static const int PING_INTERVAL = 2 * 60;
/** Time between pings automatically sent out for latency probing and keepalive */
static constexpr std::chrono::minutes PING_INTERVAL{2};
/** The maximum number of entries in a locator */
static const unsigned int MAX_LOCATOR_SZ = 101;
/** The maximum number of entries in an 'inv' protocol message */
Expand Down Expand Up @@ -2208,7 +2208,7 @@ void ProcessMessage(
CNode& pfrom,
const std::string& msg_type,
CDataStream& vRecv,
int64_t nTimeReceived,
const std::chrono::microseconds time_received,
const CChainParams& chainparams,
ChainstateManager& chainman,
CTxMemPool& mempool,
Expand Down Expand Up @@ -3111,7 +3111,7 @@ void ProcessMessage(
} // cs_main

if (fProcessBLOCKTXN)
return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams, chainman, mempool, connman, banman, interruptMsgProc);
return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, time_received, chainparams, chainman, mempool, connman, banman, interruptMsgProc);

if (fRevertToHeaderProcessing) {
// Headers received from HB compact block peers are permitted to be
Expand Down Expand Up @@ -3386,7 +3386,7 @@ void ProcessMessage(
}

if (msg_type == NetMsgType::PONG) {
int64_t pingUsecEnd = nTimeReceived;
const auto ping_end = time_received;
uint64_t nonce = 0;
size_t nAvail = vRecv.in_avail();
bool bPingFinished = false;
Expand All @@ -3400,11 +3400,11 @@ void ProcessMessage(
if (nonce == pfrom.nPingNonceSent) {
// Matching pong received, this ping is no longer outstanding
bPingFinished = true;
int64_t pingUsecTime = pingUsecEnd - pfrom.nPingUsecStart;
if (pingUsecTime > 0) {
const auto ping_time = ping_end - pfrom.m_ping_start.load();
if (ping_time.count() > 0) {
// Successful ping time measurement, replace previous
pfrom.nPingUsecTime = pingUsecTime;
pfrom.nMinPingUsecTime = std::min(pfrom.nMinPingUsecTime.load(), pingUsecTime);
pfrom.nPingUsecTime = count_microseconds(ping_time);
pfrom.nMinPingUsecTime = std::min(pfrom.nMinPingUsecTime.load(), count_microseconds(ping_time));
} else {
// This should never happen
sProblem = "Timing mishap";
Expand Down Expand Up @@ -3860,7 +3860,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto)
// RPC ping request by user
pingSend = true;
}
if (pto->nPingNonceSent == 0 && pto->nPingUsecStart + PING_INTERVAL * 1000000 < GetTimeMicros()) {
if (pto->nPingNonceSent == 0 && pto->m_ping_start.load() + PING_INTERVAL < GetTime<std::chrono::microseconds>()) {
// Ping automatically sent as a latency probe & keepalive.
pingSend = true;
}
Expand All @@ -3870,7 +3870,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto)
GetRandBytes((unsigned char*)&nonce, sizeof(nonce));
}
pto->fPingQueued = false;
pto->nPingUsecStart = GetTimeMicros();
pto->m_ping_start = GetTime<std::chrono::microseconds>();
if (pto->nVersion > BIP0031_VERSION) {
pto->nPingNonceSent = nonce;
connman->PushMessage(pto, msgMaker.Make(NetMsgType::PING, nonce));
Expand Down
2 changes: 1 addition & 1 deletion src/test/fuzz/p2p_transport_deserializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ void test_one_input(const std::vector<uint8_t>& buffer)
pch += handled;
n_bytes -= handled;
if (deserializer.Complete()) {
const int64_t m_time = std::numeric_limits<int64_t>::max();
const std::chrono::microseconds m_time{std::numeric_limits<int64_t>::max()};
const CNetMessage msg = deserializer.GetMessage(Params().MessageStart(), m_time);
assert(msg.m_command.size() <= CMessageHeader::COMMAND_SIZE);
assert(msg.m_raw_message_size <= buffer.size());
Expand Down
4 changes: 2 additions & 2 deletions src/test/fuzz/process_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ void ProcessMessage(
CNode& pfrom,
const std::string& msg_type,
CDataStream& vRecv,
int64_t nTimeReceived,
const std::chrono::microseconds time_received,
const CChainParams& chainparams,
ChainstateManager& chainman,
CTxMemPool& mempool,
Expand Down Expand Up @@ -87,7 +87,7 @@ void test_one_input(const std::vector<uint8_t>& buffer)
connman.AddTestNode(p2p_node);
g_setup->m_node.peer_logic->InitializeNode(&p2p_node);
try {
ProcessMessage(p2p_node, random_message_type, random_bytes_data_stream, GetTimeMillis(),
ProcessMessage(p2p_node, random_message_type, random_bytes_data_stream, GetTime<std::chrono::microseconds>(),
Params(), *g_setup->m_node.chainman, *g_setup->m_node.mempool,
g_setup->m_node.connman.get(), g_setup->m_node.banman.get(),
std::atomic<bool>{false});
Expand Down
5 changes: 4 additions & 1 deletion test/functional/feature_bip68_sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ class BIP68Test(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
self.extra_args = [
["-acceptnonstdtxn=1"],
[
"-acceptnonstdtxn=1",
"-peertimeout=9999", # bump because mocktime might cause a disconnect otherwise
],
["-acceptnonstdtxn=0"],
]

Expand Down
6 changes: 5 additions & 1 deletion test/functional/feature_maxuploadtarget.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ class MaxUploadTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 1
self.extra_args = [["-maxuploadtarget=800", "-acceptnonstdtxn=1"]]
self.extra_args = [[
"-maxuploadtarget=800",
"-acceptnonstdtxn=1",
"-peertimeout=9999", # bump because mocktime might cause a disconnect otherwise
]]
self.supports_cli = False

# Cache for utxos, as the listunspent may take a long time later in the test
Expand Down
123 changes: 123 additions & 0 deletions test/functional/p2p_ping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#!/usr/bin/env python3
# Copyright (c) 2020 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test ping message
"""

import time

from test_framework.messages import (
msg_pong,
)
from test_framework.mininode import (
P2PInterface,
wait_until,
)
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal

PING_INTERVAL = 2 * 60


class msg_pong_corrupt(msg_pong):
def serialize(self):
return b""


class NodePongAdd1(P2PInterface):
def on_ping(self, message):
self.send_message(msg_pong(message.nonce + 1))


class NodeNoPong(P2PInterface):
def on_ping(self, message):
pass


class PingPongTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 1
self.extra_args = [['-peertimeout=3']]

def check_peer_info(self, *, pingtime, minping, pingwait):
stats = self.nodes[0].getpeerinfo()[0]
assert_equal(stats.pop('pingtime', None), pingtime)
assert_equal(stats.pop('minping', None), minping)
assert_equal(stats.pop('pingwait', None), pingwait)

def mock_forward(self, delta):
self.mock_time += delta
self.nodes[0].setmocktime(self.mock_time)

def run_test(self):
self.mock_time = int(time.time())
self.mock_forward(0)

self.log.info('Check that ping is sent after connection is established')
no_pong_node = self.nodes[0].add_p2p_connection(NodeNoPong())
self.mock_forward(3)
assert no_pong_node.last_message.pop('ping').nonce != 0
self.check_peer_info(pingtime=None, minping=None, pingwait=3)

self.log.info('Reply without nonce cancels ping')
with self.nodes[0].assert_debug_log(['pong peer=0: Short payload']):
no_pong_node.send_and_ping(msg_pong_corrupt())
self.check_peer_info(pingtime=None, minping=None, pingwait=None)

self.log.info('Reply without ping')
with self.nodes[0].assert_debug_log([
'pong peer=0: Unsolicited pong without ping, 0 expected, 0 received, 8 bytes',
]):
no_pong_node.send_and_ping(msg_pong())
self.check_peer_info(pingtime=None, minping=None, pingwait=None)

self.log.info('Reply with wrong nonce does not cancel ping')
assert 'ping' not in no_pong_node.last_message
with self.nodes[0].assert_debug_log(['pong peer=0: Nonce mismatch']):
# mock time PING_INTERVAL ahead to trigger node into sending a ping
self.mock_forward(PING_INTERVAL + 1)
wait_until(lambda: 'ping' in no_pong_node.last_message)
self.mock_forward(9)
# Send the wrong pong
no_pong_node.send_and_ping(msg_pong(no_pong_node.last_message.pop('ping').nonce - 1))
self.check_peer_info(pingtime=None, minping=None, pingwait=9)

self.log.info('Reply with zero nonce does cancel ping')
with self.nodes[0].assert_debug_log(['pong peer=0: Nonce zero']):
no_pong_node.send_and_ping(msg_pong(0))
self.check_peer_info(pingtime=None, minping=None, pingwait=None)

self.log.info('Check that ping is properly reported on RPC')
assert 'ping' not in no_pong_node.last_message
# mock time PING_INTERVAL ahead to trigger node into sending a ping
self.mock_forward(PING_INTERVAL + 1)
wait_until(lambda: 'ping' in no_pong_node.last_message)
ping_delay = 29
self.mock_forward(ping_delay)
wait_until(lambda: 'ping' in no_pong_node.last_message)
no_pong_node.send_and_ping(msg_pong(no_pong_node.last_message.pop('ping').nonce))
self.check_peer_info(pingtime=ping_delay, minping=ping_delay, pingwait=None)

self.log.info('Check that minping is decreased after a fast roundtrip')
# mock time PING_INTERVAL ahead to trigger node into sending a ping
self.mock_forward(PING_INTERVAL + 1)
wait_until(lambda: 'ping' in no_pong_node.last_message)
ping_delay = 9
self.mock_forward(ping_delay)
wait_until(lambda: 'ping' in no_pong_node.last_message)
no_pong_node.send_and_ping(msg_pong(no_pong_node.last_message.pop('ping').nonce))
self.check_peer_info(pingtime=ping_delay, minping=ping_delay, pingwait=None)

self.log.info('Check that peer is disconnected after ping timeout')
assert 'ping' not in no_pong_node.last_message
self.nodes[0].ping()
wait_until(lambda: 'ping' in no_pong_node.last_message)
with self.nodes[0].assert_debug_log(['ping timeout: 1201.000000s']):
self.mock_forward(20 * 60 + 1)
time.sleep(4) # peertimeout + 1


if __name__ == '__main__':
PingPongTest().main()
1 change: 1 addition & 0 deletions test/functional/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@
'mempool_compatibility.py',
'rpc_deriveaddresses.py',
'rpc_deriveaddresses.py --usecli',
'p2p_ping.py',
'rpc_scantxoutset.py',
'feature_logging.py',
'p2p_node_network_limited.py',
Expand Down

0 comments on commit fa33654

Please sign in to comment.