Skip to content

Commit

Permalink
merge bitcoin#22387: Rate limit the processing of rumoured addresses
Browse files Browse the repository at this point in the history
  • Loading branch information
kwvg committed Apr 12, 2024
1 parent fe66202 commit 602d13d
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 7 deletions.
3 changes: 2 additions & 1 deletion src/net_permissions.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ enum class NetPermissionFlags : uint32_t {
NoBan = (1U << 4) | Download,
// Can query the mempool
Mempool = (1U << 5),
// Can request addrs without hitting a privacy-preserving cache
// Can request addrs without hitting a privacy-preserving cache, and send us
// unlimited amounts of addrs.
Addr = (1U << 7),

// True if the user did not specifically set fine grained permissions
Expand Down
54 changes: 54 additions & 0 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,13 @@ static constexpr uint32_t MAX_GETCFHEADERS_SIZE = 2000;
static constexpr size_t MAX_PCT_ADDR_TO_SEND = 23;
/** The maximum number of address records permitted in an ADDR message. */
static constexpr size_t MAX_ADDR_TO_SEND{1000};
/** The maximum rate of address records we're willing to process on average. Can be bypassed using
* the NetPermissionFlags::Addr permission. */
static constexpr double MAX_ADDR_RATE_PER_SECOND{0.1};
/** The soft limit of the address processing token bucket (the regular MAX_ADDR_RATE_PER_SECOND
* based increments won't go above this, but the MAX_ADDR_TO_SEND increment following GETADDR
* is exempt from this limit. */
static constexpr size_t MAX_ADDR_PROCESSING_TOKEN_BUCKET{MAX_ADDR_TO_SEND};

struct COrphanTx {
// When modifying, adapt the copy of this definition in tests/DoS_tests.
Expand Down Expand Up @@ -273,6 +280,15 @@ struct Peer {
std::atomic_bool m_wants_addrv2{false};
/** Whether this peer has already sent us a getaddr message. */
bool m_getaddr_recvd{false};
/** Number of addr messages that can be processed from this peer. Start at 1 to
* permit self-announcement. */
double m_addr_token_bucket{1.0};
/** When m_addr_token_bucket was last updated */
std::chrono::microseconds m_addr_token_timestamp{GetTime<std::chrono::microseconds>()};
/** Total number of addresses that were dropped due to rate limiting. */
std::atomic<uint64_t> m_addr_rate_limited{0};
/** Total number of addresses that were processed (excludes rate limited ones). */
std::atomic<uint64_t> m_addr_processed{0};

/** Set of txids to reconsider once their parent transactions have been accepted **/
std::set<uint256> m_orphan_work_set GUARDED_BY(g_cs_orphans);
Expand Down Expand Up @@ -1447,6 +1463,8 @@ bool PeerManagerImpl::GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats)
}

stats.m_ping_wait = ping_wait;
stats.m_addr_processed = peer->m_addr_processed.load();
stats.m_addr_rate_limited = peer->m_addr_rate_limited.load();

return true;
}
Expand Down Expand Up @@ -3219,6 +3237,9 @@ void PeerManagerImpl::ProcessMessage(
// Get recent addresses
m_connman.PushMessage(&pfrom, CNetMsgMaker(greatest_common_version).Make(NetMsgType::GETADDR));
peer->m_getaddr_sent = true;
// When requesting a getaddr, accept an additional MAX_ADDR_TO_SEND addresses in response
// (bypassing the MAX_ADDR_PROCESSING_TOKEN_BUCKET limit).
peer->m_addr_token_bucket += MAX_ADDR_TO_SEND;
}

if (!pfrom.IsInboundConn()) {
Expand Down Expand Up @@ -3387,11 +3408,34 @@ void PeerManagerImpl::ProcessMessage(
std::vector<CAddress> vAddrOk;
int64_t nNow = GetAdjustedTime();
int64_t nSince = nNow - 10 * 60;

// Update/increment addr rate limiting bucket.
const auto current_time = GetTime<std::chrono::microseconds>();
if (peer->m_addr_token_bucket < MAX_ADDR_PROCESSING_TOKEN_BUCKET) {
// Don't increment bucket if it's already full
const auto time_diff = std::max(current_time - peer->m_addr_token_timestamp, 0us);
const double increment = CountSecondsDouble(time_diff) * MAX_ADDR_RATE_PER_SECOND;
peer->m_addr_token_bucket = std::min<double>(peer->m_addr_token_bucket + increment, MAX_ADDR_PROCESSING_TOKEN_BUCKET);
}
peer->m_addr_token_timestamp = current_time;

const bool rate_limited = !pfrom.HasPermission(NetPermissionFlags::Addr);
uint64_t num_proc = 0;
uint64_t num_rate_limit = 0;
Shuffle(vAddr.begin(), vAddr.end(), FastRandomContext());
for (CAddress& addr : vAddr)
{
if (interruptMsgProc)
return;

// Apply rate limiting.
if (rate_limited) {
if (peer->m_addr_token_bucket < 1.0) {
++num_rate_limit;
continue;
}
peer->m_addr_token_bucket -= 1.0;
}
// We only bother storing full nodes, though this may include
// things which we would not make an outbound connection to, in
// part because we may make feeler connections to them.
Expand All @@ -3405,6 +3449,7 @@ void PeerManagerImpl::ProcessMessage(
// Do not process banned/discouraged addresses beyond remembering we received them
continue;
}
++num_proc;
bool fReachable = IsReachable(addr);
if (addr.nTime > nSince && !peer->m_getaddr_sent && vAddr.size() <= 10 && addr.IsRoutable()) {
// Relay to a limited number of other nodes
Expand All @@ -3414,6 +3459,15 @@ void PeerManagerImpl::ProcessMessage(
if (fReachable)
vAddrOk.push_back(addr);
}
peer->m_addr_processed += num_proc;
peer->m_addr_rate_limited += num_rate_limit;
LogPrint(BCLog::NET, "Received addr: %u addresses (%u processed, %u rate-limited) from peer=%d%s\n",
vAddr.size(),
num_proc,
num_rate_limit,
pfrom.GetId(),
fLogIPs ? ", peeraddr=" + pfrom.addr.ToString() : "");

m_addrman.Add(vAddrOk, pfrom.addr, 2 * 60 * 60);
if (vAddr.size() < 1000) peer->m_getaddr_sent = false;
if (pfrom.IsAddrFetchConn()) {
Expand Down
2 changes: 2 additions & 0 deletions src/net_processing.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ struct CNodeStateStats {
int m_starting_height = -1;
std::chrono::microseconds m_ping_wait;
std::vector<int> vHeightInFlight;
uint64_t m_addr_processed = 0;
uint64_t m_addr_rate_limited = 0;
};

class PeerManager : public CValidationInterface, public NetEventsInterface
Expand Down
2 changes: 2 additions & 0 deletions src/rpc/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ static RPCHelpMan getpeerinfo()
heights.push_back(height);
}
obj.pushKV("inflight", heights);
obj.pushKV("addr_processed", statestats.m_addr_processed);
obj.pushKV("addr_rate_limited", statestats.m_addr_rate_limited);
}
obj.pushKV("whitelisted", stats.m_legacyWhitelisted);
UniValue permissions(UniValue::VARR);
Expand Down
95 changes: 90 additions & 5 deletions test/functional/p2p_addr_relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@
msg_addr,
msg_getaddr
)
from test_framework.p2p import P2PInterface
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
assert_equal,
from test_framework.p2p import (
P2PInterface,
p2p_lock,
)
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal
import random


class AddrReceiver(P2PInterface):
num_ipv4_received = 0
test_addr_contents = False
_tokens = 1

def __init__(self, test_addr_contents=False):
super().__init__()
Expand All @@ -38,6 +41,20 @@ def on_addr(self, message):
raise AssertionError("Invalid addr.port of {} (8333-8342 expected)".format(addr.port))
assert addr.ip.startswith('123.123.123.')

def on_getaddr(self, message):
# When the node sends us a getaddr, it increments the addr relay tokens for the connection by 1000
self._tokens += 1000

@property
def tokens(self):
with p2p_lock:
return self._tokens

def increment_tokens(self, n):
# When we move mocktime forward, the node increments the addr relay tokens for its peers
with p2p_lock:
self._tokens += n

def addr_received(self):
return self.num_ipv4_received != 0

Expand All @@ -50,12 +67,14 @@ class AddrTest(BitcoinTestFramework):

def set_test_params(self):
self.num_nodes = 1
self.extra_args = [["[email protected]"]]

def run_test(self):
self.oversized_addr_test()
self.relay_tests()
self.getaddr_tests()
self.blocksonly_mode_tests()
self.rate_limit_tests()

def setup_addr_msg(self, num):
addrs = []
Expand All @@ -72,6 +91,19 @@ def setup_addr_msg(self, num):
msg.addrs = addrs
return msg

def setup_rand_addr_msg(self, num):
addrs = []
for i in range(num):
addr = CAddress()
addr.time = self.mocktime + i
addr.nServices = NODE_NETWORK
addr.ip = f"{random.randrange(128,169)}.{random.randrange(1,255)}.{random.randrange(1,255)}.{random.randrange(1,255)}"
addr.port = 8333
addrs.append(addr)
msg = msg_addr()
msg.addrs = addrs
return msg

def send_addr_msg(self, source, msg, receivers):
source.send_and_ping(msg)
# pop m_next_addr_send timer
Expand Down Expand Up @@ -186,7 +218,7 @@ def getaddr_tests(self):

def blocksonly_mode_tests(self):
self.log.info('Test addr relay in -blocksonly mode')
self.restart_node(0, ["-blocksonly"])
self.restart_node(0, ["-blocksonly", "[email protected]"])

self.log.info('Check that we send getaddr messages')
full_outbound_peer = self.nodes[0].add_outbound_p2p_connection(AddrReceiver(), p2p_idx=0, connection_type="outbound-full-relay")
Expand All @@ -203,6 +235,59 @@ def blocksonly_mode_tests(self):

self.nodes[0].disconnect_p2ps()

def send_addrs_and_test_rate_limiting(self, peer, no_relay, new_addrs, total_addrs):
"""Send an addr message and check that the number of addresses processed and rate-limited is as expected"""

peer.send_and_ping(self.setup_rand_addr_msg(new_addrs))

peerinfo = self.nodes[0].getpeerinfo()[0]
addrs_processed = peerinfo['addr_processed']
addrs_rate_limited = peerinfo['addr_rate_limited']
self.log.debug(f"addrs_processed = {addrs_processed}, addrs_rate_limited = {addrs_rate_limited}")

if no_relay:
assert_equal(addrs_processed, 0)
assert_equal(addrs_rate_limited, 0)
else:
assert_equal(addrs_processed, min(total_addrs, peer.tokens))
assert_equal(addrs_rate_limited, max(0, total_addrs - peer.tokens))

def rate_limit_tests(self):

self.restart_node(0, [])

for contype, no_relay in [("outbound-full-relay", False), ("block-relay-only", True), ("inbound", False)]:
self.log.info(f'Test rate limiting of addr processing for {contype} peers')
if contype == "inbound":
peer = self.nodes[0].add_p2p_connection(AddrReceiver())
else:
peer = self.nodes[0].add_outbound_p2p_connection(AddrReceiver(), p2p_idx=0, connection_type=contype)

# Send 600 addresses. For all but the block-relay-only peer this should result in addresses being processed.
self.send_addrs_and_test_rate_limiting(peer, no_relay, 600, 600)

# Send 600 more addresses. For the outbound-full-relay peer (which we send a GETADDR, and thus will
# process up to 1001 incoming addresses), this means more addresses will be processed.
self.send_addrs_and_test_rate_limiting(peer, no_relay, 600, 1200)

# Send 10 more. As we reached the processing limit for all nodes, no more addresses should be procesesd.
self.send_addrs_and_test_rate_limiting(peer, no_relay, 10, 1210)

# Advance the time by 100 seconds, permitting the processing of 10 more addresses.
# Send 200 and verify that 10 are processed.
self.bump_mocktime(100)
peer.increment_tokens(10)

self.send_addrs_and_test_rate_limiting(peer, no_relay, 200, 1410)

# Advance the time by 1000 seconds, permitting the processing of 100 more addresses.
# Send 200 and verify that 100 are processed.
self.bump_mocktime(1000)
peer.increment_tokens(100)

self.send_addrs_and_test_rate_limiting(peer, no_relay, 200, 1610)

self.nodes[0].disconnect_p2ps()

if __name__ == '__main__':
AddrTest().main()
5 changes: 4 additions & 1 deletion test/functional/p2p_addrv2_relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ def __init__(self):
super().__init__(support_addrv2 = True)

def on_addrv2(self, message):
if ADDRS == message.addrs:
expected_set = set((addr.ip, addr.port) for addr in ADDRS)
received_set = set((addr.ip, addr.port) for addr in message.addrs)
if expected_set == received_set:
self.addrv2_received_and_checked = True

def wait_for_addrv2(self):
Expand All @@ -38,6 +40,7 @@ class AddrTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 1
self.extra_args = [["[email protected]"]]

def run_test(self):
for i in range(10):
Expand Down
1 change: 1 addition & 0 deletions test/functional/p2p_invalid_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class InvalidMessagesTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 1
self.setup_clean_chain = True
self.extra_args = [["[email protected]"]]

def run_test(self):
self.test_buffer()
Expand Down

0 comments on commit 602d13d

Please sign in to comment.