diff --git a/lightning/src/onion_message/functional_tests.rs b/lightning/src/onion_message/functional_tests.rs index b18431a66dd..14beaa604ad 100644 --- a/lightning/src/onion_message/functional_tests.rs +++ b/lightning/src/onion_message/functional_tests.rs @@ -170,3 +170,13 @@ fn reply_path() { "lightning::onion_message::messenger".to_string(), format!("Received an onion message with path_id: None and reply_path").to_string(), 2); } + +#[test] +fn peer_buffer_full() { + let nodes = create_nodes(2); + for _ in 0..188 { + nodes[0].messenger.send_onion_message(&[], Destination::Node(nodes[1].get_node_pk()), None).unwrap(); + } + let err = nodes[0].messenger.send_onion_message(&[], Destination::Node(nodes[1].get_node_pk()), None).unwrap_err(); + assert_eq!(err, SendError::PeerBufferFull); +} diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 211432bcbf8..eb96f5dcbb1 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -23,6 +23,7 @@ use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload use super::utils; use util::events::OnionMessageProvider; use util::logger::Logger; +use util::ser::Writeable; use core::ops::Deref; use sync::{Arc, Mutex}; @@ -122,6 +123,8 @@ pub enum SendError { TooFewBlindedHops, /// Our next-hop peer was offline or does not support onion message forwarding. InvalidFirstHop, + /// Our next-hop peer's buffer was full. + PeerBufferFull, } impl OnionMessenger @@ -169,6 +172,7 @@ impl OnionMessenger packet_payloads, packet_keys, prng_seed).map_err(|()| SendError::TooBigPacket)?; let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap(); + if outbound_buffer_full(&pending_per_peer_msgs) { return Err(SendError::PeerBufferFull) } match pending_per_peer_msgs.entry(introduction_node_id) { hash_map::Entry::Vacant(_) => Err(SendError::InvalidFirstHop), hash_map::Entry::Occupied(mut e) => { @@ -192,6 +196,20 @@ impl OnionMessenger } } +fn outbound_buffer_full(buffer: &HashMap>) -> bool { + const MAX_BUFFER_SIZE: usize = 262144; // 256KiB + let mut buffered_bytes = 0; + for peer_buf in buffer.values() { + for om in peer_buf { + buffered_bytes += om.serialized_length(); + if buffered_bytes >= MAX_BUFFER_SIZE { + return true + } + } + } + false +} + impl OnionMessageHandler for OnionMessenger where K::Target: KeysInterface, L::Target: Logger, @@ -260,6 +278,10 @@ impl OnionMessageHandler for OnionMessenger { log_trace!(self.logger, "Dropping forwarded onion message to disconnected peer {:?}", next_node_id);