Skip to content

Commit

Permalink
Limit OnionMessenger outbound buffer size
Browse files Browse the repository at this point in the history
Drop OMs if they push us over the max OnionMessenger outbound buffer size
  • Loading branch information
valentinewallace committed Sep 2, 2022
1 parent 04f4673 commit a8ea0bd
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 0 deletions.
10 changes: 10 additions & 0 deletions lightning/src/onion_message/functional_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,13 @@ fn reply_path() {
"lightning::onion_message::messenger".to_string(),
format!("Received an onion message with path_id: None and reply_path").to_string(), 2);
}

#[test]
fn peer_buffer_full() {
let nodes = create_nodes(2);
for _ in 0..188 { // Based on MAX_PER_PEER_BUFFER_SIZE in OnionMessenger
nodes[0].messenger.send_onion_message(&[], Destination::Node(nodes[1].get_node_pk()), None).unwrap();
}
let err = nodes[0].messenger.send_onion_message(&[], Destination::Node(nodes[1].get_node_pk()), None).unwrap_err();
assert_eq!(err, SendError::BufferFull);
}
31 changes: 31 additions & 0 deletions lightning/src/onion_message/messenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload
use super::utils;
use util::events::OnionMessageProvider;
use util::logger::Logger;
use util::ser::Writeable;

use core::ops::Deref;
use sync::{Arc, Mutex};
Expand Down Expand Up @@ -124,6 +125,8 @@ pub enum SendError {
TooFewBlindedHops,
/// Our next-hop peer was offline or does not support onion message forwarding.
InvalidFirstHop,
/// Our next-hop peer's buffer was full or our total outbound buffer was full.
BufferFull,
}

impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
Expand Down Expand Up @@ -171,6 +174,7 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
packet_payloads, packet_keys, prng_seed).map_err(|()| SendError::TooBigPacket)?;

let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap();
if outbound_buffer_full(&introduction_node_id, &pending_per_peer_msgs) { return Err(SendError::BufferFull) }
match pending_per_peer_msgs.entry(introduction_node_id) {
hash_map::Entry::Vacant(_) => Err(SendError::InvalidFirstHop),
hash_map::Entry::Occupied(mut e) => {
Expand All @@ -193,6 +197,29 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
}
}

fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap<PublicKey, VecDeque<msgs::OnionMessage>>) -> bool {
const MAX_TOTAL_BUFFER_SIZE: usize = (1 << 20) * 128;
const MAX_PER_PEER_BUFFER_SIZE: usize = (1 << 10) * 256;
let mut total_buffered_bytes = 0;
let mut peer_buffered_bytes = 0;
for (pk, peer_buf) in buffer {
for om in peer_buf {
let om_len = om.serialized_length();
if pk == peer_node_id {
peer_buffered_bytes += om_len;
}
total_buffered_bytes += om_len;

if total_buffered_bytes >= MAX_TOTAL_BUFFER_SIZE ||
peer_buffered_bytes >= MAX_PER_PEER_BUFFER_SIZE
{
return true
}
}
}
false
}

impl<Signer: Sign, K: Deref, L: Deref> OnionMessageHandler for OnionMessenger<Signer, K, L>
where K::Target: KeysInterface<Signer = Signer>,
L::Target: Logger,
Expand Down Expand Up @@ -279,6 +306,10 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessageHandler for OnionMessenger<Si
};

let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap();
if outbound_buffer_full(&next_node_id, &pending_per_peer_msgs) {
log_trace!(self.logger, "Dropping forwarded onion message to peer {:?}: outbound buffer full", next_node_id);
return
}

#[cfg(fuzzing)]
pending_per_peer_msgs.entry(next_node_id).or_insert_with(VecDeque::new);
Expand Down

0 comments on commit a8ea0bd

Please sign in to comment.