Skip to content

Commit

Permalink
Limit OnionMessenger outbound buffer size
Browse files Browse the repository at this point in the history
Drop OMs if they push us over the max OnionMessenger outbound buffer size
  • Loading branch information
valentinewallace committed Sep 1, 2022
1 parent 3047dbb commit abf3517
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 0 deletions.
10 changes: 10 additions & 0 deletions lightning/src/onion_message/functional_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,13 @@ fn reply_path() {
"lightning::onion_message::messenger".to_string(),
format!("Received an onion message with path_id: None and reply_path").to_string(), 2);
}

#[test]
fn peer_buffer_full() {
let nodes = create_nodes(2);
for _ in 0..188 {
nodes[0].messenger.send_onion_message(&[], Destination::Node(nodes[1].get_node_pk()), None).unwrap();
}
let err = nodes[0].messenger.send_onion_message(&[], Destination::Node(nodes[1].get_node_pk()), None).unwrap_err();
assert_eq!(err, SendError::PeerBufferFull);
}
22 changes: 22 additions & 0 deletions lightning/src/onion_message/messenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload
use super::utils;
use util::events::OnionMessageProvider;
use util::logger::Logger;
use util::ser::Writeable;

use core::ops::Deref;
use sync::{Arc, Mutex};
Expand Down Expand Up @@ -122,6 +123,8 @@ pub enum SendError {
TooFewBlindedHops,
/// Our next-hop peer was offline or does not support onion message forwarding.
InvalidFirstHop,
/// Our next-hop peer's buffer was full.
PeerBufferFull,
}

impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
Expand Down Expand Up @@ -169,6 +172,7 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
packet_payloads, packet_keys, prng_seed).map_err(|()| SendError::TooBigPacket)?;

let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap();
if outbound_buffer_full(&pending_per_peer_msgs) { return Err(SendError::PeerBufferFull) }
match pending_per_peer_msgs.entry(introduction_node_id) {
hash_map::Entry::Vacant(_) => Err(SendError::InvalidFirstHop),
hash_map::Entry::Occupied(mut e) => {
Expand All @@ -192,6 +196,20 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
}
}

fn outbound_buffer_full(buffer: &HashMap<PublicKey, VecDeque<msgs::OnionMessage>>) -> bool {
const MAX_BUFFER_SIZE: usize = 262144; // 256KiB
let mut buffered_bytes = 0;
for peer_buf in buffer.values() {
for om in peer_buf {
buffered_bytes += om.serialized_length();
if buffered_bytes >= MAX_BUFFER_SIZE {
return true
}
}
}
false
}

impl<Signer: Sign, K: Deref, L: Deref> OnionMessageHandler for OnionMessenger<Signer, K, L>
where K::Target: KeysInterface<Signer = Signer>,
L::Target: Logger,
Expand Down Expand Up @@ -260,6 +278,10 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessageHandler for OnionMessenger<Si
#[cfg(fuzzing)]
pending_per_peer_msgs.entry(next_node_id).or_insert_with(|| VecDeque::new());

if outbound_buffer_full(&pending_per_peer_msgs) {
log_trace!(self.logger, "Dropping forwarded onion message to peer {:?}: outbound buffer full", next_node_id);
return
}
match pending_per_peer_msgs.entry(next_node_id) {
hash_map::Entry::Vacant(_) => {
log_trace!(self.logger, "Dropping forwarded onion message to disconnected peer {:?}", next_node_id);
Expand Down

0 comments on commit abf3517

Please sign in to comment.