From 7c8b0986984e0882229c239ef2d3b621a61f7d35 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 30 Jul 2021 18:03:28 +0000 Subject: [PATCH 01/15] Process messages from peers in parallel in `PeerManager`. This adds the required locking to process messages from different peers simultaneously in `PeerManager`. Note that channel messages are still processed under a global lock in `ChannelManager`, and most work is still processed under a global lock in gossip message handling, but parallelizing message deserialization and message decryption is somewhat helpful. --- lightning/src/ln/peer_handler.rs | 201 ++++++++++++++++--------------- 1 file changed, 107 insertions(+), 94 deletions(-) diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 07300fdf62e..dd6eda63fdc 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -33,7 +33,7 @@ use routing::network_graph::{NetworkGraph, NetGraphMsgHandler}; use prelude::*; use io; use alloc::collections::LinkedList; -use sync::{Arc, Mutex}; +use sync::{Arc, Mutex, RwLock}; use core::{cmp, hash, fmt, mem}; use core::ops::Deref; use core::convert::Infallible; @@ -376,9 +376,7 @@ impl Peer { } struct PeerHolder { - peers: HashMap, - /// Only add to this set when noise completes: - node_id_to_descriptor: HashMap, + peers: HashMap>, } /// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g. @@ -425,7 +423,12 @@ pub struct PeerManager, - peers: Mutex>, + peers: RwLock>, + /// Only add to this set when noise completes. + /// Locked *after* peers. When an item is removed, it must be removed with the `peers` write + /// lock held. Entries may be added with only the `peers` read lock held (though the + /// `Descriptor` value must already exist in `peers`). + node_id_to_descriptor: Mutex>, our_node_secret: SecretKey, ephemeral_key_midstate: Sha256Engine, custom_message_handler: CMH, @@ -553,10 +556,10 @@ impl P PeerManager { message_handler, - peers: Mutex::new(PeerHolder { + peers: RwLock::new(PeerHolder { peers: HashMap::new(), - node_id_to_descriptor: HashMap::new() }), + node_id_to_descriptor: Mutex::new(HashMap::new()), our_node_secret, ephemeral_key_midstate, peer_counter: AtomicCounter::new(), @@ -571,8 +574,9 @@ impl P /// new_outbound_connection, however entries will only appear once the initial handshake has /// completed and we are sure the remote peer has the private key for the given node_id. pub fn get_peer_node_ids(&self) -> Vec { - let peers = self.peers.lock().unwrap(); - peers.peers.values().filter_map(|p| { + let peers = self.peers.read().unwrap(); + peers.peers.values().filter_map(|peer_mutex| { + let p = peer_mutex.lock().unwrap(); if !p.channel_encryptor.is_ready_for_encryption() || p.their_features.is_none() { return None; } @@ -608,8 +612,8 @@ impl P let res = peer_encryptor.get_act_one().to_vec(); let pending_read_buffer = [0; 50].to_vec(); // Noise act two is 50 bytes - let mut peers = self.peers.lock().unwrap(); - if peers.peers.insert(descriptor, Peer { + let mut peers = self.peers.write().unwrap(); + if peers.peers.insert(descriptor, Mutex::new(Peer { channel_encryptor: peer_encryptor, their_node_id: None, their_features: None, @@ -629,7 +633,7 @@ impl P awaiting_pong_timer_tick_intervals: 0, received_message_since_timer_tick: false, sent_gossip_timestamp_filter: false, - }).is_some() { + })).is_some() { panic!("PeerManager driver duplicated descriptors!"); }; Ok(res) @@ -655,8 +659,8 @@ impl P let peer_encryptor = PeerChannelEncryptor::new_inbound(&self.our_node_secret); let pending_read_buffer = [0; 50].to_vec(); // Noise act one is 50 bytes - let mut peers = self.peers.lock().unwrap(); - if peers.peers.insert(descriptor, Peer { + let mut peers = self.peers.write().unwrap(); + if peers.peers.insert(descriptor, Mutex::new(Peer { channel_encryptor: peer_encryptor, their_node_id: None, their_features: None, @@ -676,7 +680,7 @@ impl P awaiting_pong_timer_tick_intervals: 0, received_message_since_timer_tick: false, sent_gossip_timestamp_filter: false, - }).is_some() { + })).is_some() { panic!("PeerManager driver duplicated descriptors!"); }; Ok(()) @@ -766,17 +770,18 @@ impl P /// [`send_data`]: SocketDescriptor::send_data /// [`write_buffer_space_avail`]: PeerManager::write_buffer_space_avail pub fn write_buffer_space_avail(&self, descriptor: &mut Descriptor) -> Result<(), PeerHandleError> { - let mut peers = self.peers.lock().unwrap(); - match peers.peers.get_mut(descriptor) { + let peers = self.peers.read().unwrap(); + match peers.peers.get(descriptor) { None => { // This is most likely a simple race condition where the user found that the socket // was writeable, then we told the user to `disconnect_socket()`, then they called // this method. Return an error to make sure we get disconnected. return Err(PeerHandleError { no_connection_possible: false }); }, - Some(peer) => { + Some(peer_mutex) => { + let mut peer = peer_mutex.lock().unwrap(); peer.awaiting_write_event = false; - self.do_attempt_write_data(descriptor, peer); + self.do_attempt_write_data(descriptor, &mut peer); } }; Ok(()) @@ -828,18 +833,20 @@ impl P fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result { let pause_read = { - let mut peers_lock = self.peers.lock().unwrap(); - let peers = &mut *peers_lock; + let peers = self.peers.read().unwrap(); let mut msgs_to_forward = Vec::new(); let mut peer_node_id = None; - let pause_read = match peers.peers.get_mut(peer_descriptor) { + let pause_read = match peers.peers.get(peer_descriptor) { None => { // This is most likely a simple race condition where the user read some bytes // from the socket, then we told the user to `disconnect_socket()`, then they // called this method. Return an error to make sure we get disconnected. return Err(PeerHandleError { no_connection_possible: false }); }, - Some(peer) => { + Some(peer_mutex) => { + let mut peer_lock = peer_mutex.lock().unwrap(); + let peer = &mut *peer_lock; + assert!(peer.pending_read_buffer.len() > 0); assert!(peer.pending_read_buffer.len() > peer.pending_read_buffer_pos); @@ -893,7 +900,7 @@ impl P macro_rules! insert_node_id { () => { - match peers.node_id_to_descriptor.entry(peer.their_node_id.unwrap()) { + match self.node_id_to_descriptor.lock().unwrap().entry(peer.their_node_id.unwrap()) { hash_map::Entry::Occupied(_) => { log_trace!(self.logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap())); peer.their_node_id = None; // Unset so that we don't generate a peer_disconnected event @@ -1023,7 +1030,7 @@ impl P }; for msg in msgs_to_forward.drain(..) { - self.forward_broadcast_msg(peers, &msg, peer_node_id.as_ref()); + self.forward_broadcast_msg(&*peers, &msg, peer_node_id.as_ref()); } pause_read @@ -1242,13 +1249,14 @@ impl P Ok(should_forward) } - fn forward_broadcast_msg(&self, peers: &mut PeerHolder, msg: &wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) { + fn forward_broadcast_msg(&self, peers: &PeerHolder, msg: &wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) { match msg { wire::Message::ChannelAnnouncement(ref msg) => { log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}", except_node, msg); let encoded_msg = encode_msg!(msg); - for (_, peer) in peers.peers.iter_mut() { + for (_, peer_mutex) in peers.peers.iter() { + let mut peer = peer_mutex.lock().unwrap(); if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { continue @@ -1266,14 +1274,15 @@ impl P if except_node.is_some() && peer.their_node_id.as_ref() == except_node { continue; } - self.enqueue_encoded_message(peer, &encoded_msg); + self.enqueue_encoded_message(&mut *peer, &encoded_msg); } }, wire::Message::NodeAnnouncement(ref msg) => { log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced node: {:?}", except_node, msg); let encoded_msg = encode_msg!(msg); - for (_, peer) in peers.peers.iter_mut() { + for (_, peer_mutex) in peers.peers.iter() { + let mut peer = peer_mutex.lock().unwrap(); if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || !peer.should_forward_node_announcement(msg.contents.node_id) { continue @@ -1290,14 +1299,15 @@ impl P if except_node.is_some() && peer.their_node_id.as_ref() == except_node { continue; } - self.enqueue_encoded_message(peer, &encoded_msg); + self.enqueue_encoded_message(&mut *peer, &encoded_msg); } }, wire::Message::ChannelUpdate(ref msg) => { log_gossip!(self.logger, "Sending message to all peers except {:?}: {:?}", except_node, msg); let encoded_msg = encode_msg!(msg); - for (_, peer) in peers.peers.iter_mut() { + for (_, peer_mutex) in peers.peers.iter() { + let mut peer = peer_mutex.lock().unwrap(); if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { continue @@ -1311,7 +1321,7 @@ impl P if except_node.is_some() && peer.their_node_id.as_ref() == except_node { continue; } - self.enqueue_encoded_message(peer, &encoded_msg); + self.enqueue_encoded_message(&mut *peer, &encoded_msg); } }, _ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"), @@ -1337,20 +1347,21 @@ impl P // buffer by doing things like announcing channels on another node. We should be willing to // drop optional-ish messages when send buffers get full! - let mut peers_lock = self.peers.lock().unwrap(); + let mut peers_lock = self.peers.write().unwrap(); let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events(); events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events()); let peers = &mut *peers_lock; macro_rules! get_peer_for_forwarding { ($node_id: expr) => { { - match peers.node_id_to_descriptor.get($node_id) { + match self.node_id_to_descriptor.lock().unwrap().get($node_id) { Some(descriptor) => match peers.peers.get_mut(&descriptor) { - Some(peer) => { - if peer.their_features.is_none() { + Some(peer_mutex) => { + let peer_lock = peer_mutex.lock().unwrap(); + if peer_lock.their_features.is_none() { continue; } - peer + peer_lock }, None => panic!("Inconsistent peers set state!"), }, @@ -1367,13 +1378,13 @@ impl P log_debug!(self.logger, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.temporary_channel_id)); - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => { log_debug!(self.logger, "Handling SendOpenChannel event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.temporary_channel_id)); - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => { log_debug!(self.logger, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})", @@ -1382,25 +1393,25 @@ impl P log_funding_channel_id!(msg.funding_txid, msg.funding_output_index)); // TODO: If the peer is gone we should generate a DiscardFunding event // indicating to the wallet that they should just throw away this funding transaction - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => { log_debug!(self.logger, "Handling SendFundingSigned event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.channel_id)); - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendFundingLocked { ref node_id, ref msg } => { log_debug!(self.logger, "Handling SendFundingLocked event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.channel_id)); - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => { log_debug!(self.logger, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})", log_pubkey!(node_id), log_bytes!(msg.channel_id)); - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { log_debug!(self.logger, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}", @@ -1409,47 +1420,47 @@ impl P update_fulfill_htlcs.len(), update_fail_htlcs.len(), log_bytes!(commitment_signed.channel_id)); - let peer = get_peer_for_forwarding!(node_id); + let mut peer = get_peer_for_forwarding!(node_id); for msg in update_add_htlcs { - self.enqueue_message(peer, msg); + self.enqueue_message(&mut *peer, msg); } for msg in update_fulfill_htlcs { - self.enqueue_message(peer, msg); + self.enqueue_message(&mut *peer, msg); } for msg in update_fail_htlcs { - self.enqueue_message(peer, msg); + self.enqueue_message(&mut *peer, msg); } for msg in update_fail_malformed_htlcs { - self.enqueue_message(peer, msg); + self.enqueue_message(&mut *peer, msg); } if let &Some(ref msg) = update_fee { - self.enqueue_message(peer, msg); + self.enqueue_message(&mut *peer, msg); } - self.enqueue_message(peer, commitment_signed); + self.enqueue_message(&mut *peer, commitment_signed); }, MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => { log_debug!(self.logger, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.channel_id)); - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => { log_debug!(self.logger, "Handling SendClosingSigned event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.channel_id)); - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendShutdown { ref node_id, ref msg } => { log_debug!(self.logger, "Handling Shutdown event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.channel_id)); - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => { log_debug!(self.logger, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.channel_id)); - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => { log_debug!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id); @@ -1483,22 +1494,26 @@ impl P MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => { log_trace!(self.logger, "Handling SendChannelUpdate event in peer_handler for node {} for channel {}", log_pubkey!(node_id), msg.contents.short_channel_id); - let peer = get_peer_for_forwarding!(node_id); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::HandleError { ref node_id, ref action } => { match *action { msgs::ErrorAction::DisconnectPeer { ref msg } => { - if let Some(mut descriptor) = peers.node_id_to_descriptor.remove(node_id) { - if let Some(mut peer) = peers.peers.remove(&descriptor) { + // Note that since we are holding the peers *write* lock we can + // remove from node_id_to_descriptor immediately (as no other + // thread can be holding the peer lock if we have the global write + // lock). + if let Some(mut descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(node_id) { + if let Some(peer_mutex) = peers.peers.remove(&descriptor) { if let Some(ref msg) = *msg { log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", log_pubkey!(node_id), msg.data); - self.enqueue_message(&mut peer, msg); + let mut peer = peer_mutex.lock().unwrap(); + self.enqueue_message(&mut *peer, msg); // This isn't guaranteed to work, but if there is enough free // room in the send buffer, put the error message there... - self.do_attempt_write_data(&mut descriptor, &mut peer); + self.do_attempt_write_data(&mut descriptor, &mut *peer); } else { log_gossip!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message", log_pubkey!(node_id)); } @@ -1518,21 +1533,21 @@ impl P log_trace!(self.logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}", log_pubkey!(node_id), msg.data); - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, msgs::ErrorAction::SendWarningMessage { ref msg, ref log_level } => { log_given_level!(self.logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}", log_pubkey!(node_id), msg.data); - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, } }, MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => { - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => { - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => { log_gossip!(self.logger, "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}", @@ -1541,20 +1556,20 @@ impl P msg.first_blocknum, msg.number_of_blocks, msg.sync_complete); - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } MessageSendEvent::SendGossipTimestampFilter { ref node_id, ref msg } => { - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } } } for (node_id, msg) in self.custom_message_handler.get_and_clear_pending_msg() { - self.enqueue_message(get_peer_for_forwarding!(&node_id), &msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg); } - for (descriptor, peer) in peers.peers.iter_mut() { - self.do_attempt_write_data(&mut (*descriptor).clone(), peer); + for (descriptor, peer_mutex) in peers.peers.iter_mut() { + self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer_mutex.lock().unwrap()); } } } @@ -1565,7 +1580,7 @@ impl P } fn disconnect_event_internal(&self, descriptor: &Descriptor, no_connection_possible: bool) { - let mut peers = self.peers.lock().unwrap(); + let mut peers = self.peers.write().unwrap(); let peer_option = peers.peers.remove(descriptor); match peer_option { None => { @@ -1573,13 +1588,14 @@ impl P // was disconnected, then we told the user to `disconnect_socket()`, then they // called this method. Either way we're disconnected, return. }, - Some(peer) => { + Some(peer_lock) => { + let peer = peer_lock.lock().unwrap(); match peer.their_node_id { Some(node_id) => { log_trace!(self.logger, "Handling disconnection of peer {}, with {}future connection to the peer possible.", log_pubkey!(node_id), if no_connection_possible { "no " } else { "" }); - peers.node_id_to_descriptor.remove(&node_id); + self.node_id_to_descriptor.lock().unwrap().remove(&node_id); self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible); }, None => {} @@ -1598,8 +1614,8 @@ impl P /// /// [`disconnect_socket`]: SocketDescriptor::disconnect_socket pub fn disconnect_by_node_id(&self, node_id: PublicKey, no_connection_possible: bool) { - let mut peers_lock = self.peers.lock().unwrap(); - if let Some(mut descriptor) = peers_lock.node_id_to_descriptor.remove(&node_id) { + let mut peers_lock = self.peers.write().unwrap(); + if let Some(mut descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(&node_id) { log_trace!(self.logger, "Disconnecting peer with id {} due to client request", node_id); peers_lock.peers.remove(&descriptor); self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible); @@ -1611,17 +1627,16 @@ impl P /// an indication that TCP sockets have stalled even if we weren't around to time them out /// using regular ping/pongs. pub fn disconnect_all_peers(&self) { - let mut peers_lock = self.peers.lock().unwrap(); + let mut peers_lock = self.peers.write().unwrap(); + self.node_id_to_descriptor.lock().unwrap().clear(); let peers = &mut *peers_lock; for (mut descriptor, peer) in peers.peers.drain() { - if let Some(node_id) = peer.their_node_id { + if let Some(node_id) = peer.lock().unwrap().their_node_id { log_trace!(self.logger, "Disconnecting peer with id {} due to client request to disconnect all peers", node_id); - peers.node_id_to_descriptor.remove(&node_id); self.message_handler.chan_handler.peer_disconnected(&node_id, false); } descriptor.disconnect_socket(); } - debug_assert!(peers.node_id_to_descriptor.is_empty()); } /// This is called when we're blocked on sending additional gossip messages until we receive a @@ -1650,15 +1665,13 @@ impl P /// /// [`send_data`]: SocketDescriptor::send_data pub fn timer_tick_occurred(&self) { - let mut peers_lock = self.peers.lock().unwrap(); + let mut peers_lock = self.peers.write().unwrap(); { - let peers = &mut *peers_lock; - let node_id_to_descriptor = &mut peers.node_id_to_descriptor; - let peers = &mut peers.peers; let mut descriptors_needing_disconnect = Vec::new(); - let peer_count = peers.len(); + let peer_count = peers_lock.peers.len(); - peers.retain(|descriptor, peer| { + peers_lock.peers.retain(|descriptor, peer_mutex| { + let mut peer = peer_mutex.lock().unwrap(); let mut do_disconnect_peer = false; if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_node_id.is_none() { // The peer needs to complete its handshake before we can exchange messages. We @@ -1689,7 +1702,7 @@ impl P match peer.their_node_id { Some(node_id) => { log_trace!(self.logger, "Disconnecting peer with id {} due to ping timeout", node_id); - node_id_to_descriptor.remove(&node_id); + self.node_id_to_descriptor.lock().unwrap().remove(&node_id); self.message_handler.chan_handler.peer_disconnected(&node_id, false); } None => {}, @@ -1708,7 +1721,7 @@ impl P ponglen: 0, byteslen: 64, }; - self.enqueue_message(peer, &ping); + self.enqueue_message(&mut *peer, &ping); self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer); true @@ -1834,7 +1847,7 @@ mod tests { let chan_handler = test_utils::TestChannelMessageHandler::new(); let mut peers = create_network(2, &cfgs); establish_connection(&peers[0], &peers[1]); - assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1); + assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1); let secp_ctx = Secp256k1::new(); let their_id = PublicKey::from_secret_key(&secp_ctx, &peers[1].our_node_secret); @@ -1847,7 +1860,7 @@ mod tests { peers[0].message_handler.chan_handler = &chan_handler; peers[0].process_events(); - assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0); + assert_eq!(peers[0].peers.read().unwrap().peers.len(), 0); } #[test] @@ -1856,17 +1869,17 @@ mod tests { let cfgs = create_peermgr_cfgs(2); let peers = create_network(2, &cfgs); establish_connection(&peers[0], &peers[1]); - assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1); + assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1); // peers[0] awaiting_pong is set to true, but the Peer is still connected peers[0].timer_tick_occurred(); peers[0].process_events(); - assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1); + assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1); // Since timer_tick_occurred() is called again when awaiting_pong is true, all Peers are disconnected peers[0].timer_tick_occurred(); peers[0].process_events(); - assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0); + assert_eq!(peers[0].peers.read().unwrap().peers.len(), 0); } #[test] @@ -1928,9 +1941,9 @@ mod tests { peers[0].new_inbound_connection(fd_a.clone(), None).unwrap(); // If we get a single timer tick before completion, that's fine - assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1); + assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1); peers[0].timer_tick_occurred(); - assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1); + assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1); assert_eq!(peers[0].read_event(&mut fd_a, &initial_data).unwrap(), false); peers[0].process_events(); @@ -1939,7 +1952,7 @@ mod tests { // ...but if we get a second timer tick, we should disconnect the peer peers[0].timer_tick_occurred(); - assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0); + assert_eq!(peers[0].peers.read().unwrap().peers.len(), 0); assert!(peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).is_err()); } From a731efcb6822609b400170bce54d59addc5821c3 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 6 Oct 2021 06:58:15 +0000 Subject: [PATCH 02/15] Process messages with only the top-level read lock held Users are required to only ever call `read_event` serially per-peer, thus we actually don't need any locks while we're processing messages - we can only be processing messages in one thread per-peer. That said, we do need to ensure that another thread doesn't disconnect the peer we're processing messages for, as that could result in a peer_disconencted call while we're processing a message for the same peer - somewhat nonsensical. This significantly improves parallelism especially during gossip processing as it avoids waiting on the entire set of individual peer locks to forward a gossip message while several other threads are validating gossip messages with their individual peer locks held. --- lightning/src/ln/peer_handler.rs | 532 ++++++++++++++++--------------- 1 file changed, 279 insertions(+), 253 deletions(-) diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index dd6eda63fdc..2226e9574b1 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -33,7 +33,7 @@ use routing::network_graph::{NetworkGraph, NetGraphMsgHandler}; use prelude::*; use io; use alloc::collections::LinkedList; -use sync::{Arc, Mutex, RwLock}; +use sync::{Arc, Mutex, MutexGuard, RwLock}; use core::{cmp, hash, fmt, mem}; use core::ops::Deref; use core::convert::Infallible; @@ -376,6 +376,10 @@ impl Peer { } struct PeerHolder { + /// Peer is under its own mutex for sending and receiving bytes, but note that we do *not* hold + /// this mutex while we're processing a message. This is fine as [`PeerManager::read_event`] + /// requires that there be no parallel calls for a given peer, so mutual exclusion of messages + /// handed to the `MessageHandler`s for a given peer is already guaranteed. peers: HashMap>, } @@ -832,209 +836,217 @@ impl P } fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result { - let pause_read = { - let peers = self.peers.read().unwrap(); - let mut msgs_to_forward = Vec::new(); - let mut peer_node_id = None; - let pause_read = match peers.peers.get(peer_descriptor) { - None => { - // This is most likely a simple race condition where the user read some bytes - // from the socket, then we told the user to `disconnect_socket()`, then they - // called this method. Return an error to make sure we get disconnected. - return Err(PeerHandleError { no_connection_possible: false }); - }, - Some(peer_mutex) => { + let mut pause_read = false; + let peers = self.peers.read().unwrap(); + let mut msgs_to_forward = Vec::new(); + let mut peer_node_id = None; + match peers.peers.get(peer_descriptor) { + None => { + // This is most likely a simple race condition where the user read some bytes + // from the socket, then we told the user to `disconnect_socket()`, then they + // called this method. Return an error to make sure we get disconnected. + return Err(PeerHandleError { no_connection_possible: false }); + }, + Some(peer_mutex) => { + let mut read_pos = 0; + while read_pos < data.len() { + macro_rules! try_potential_handleerror { + ($peer: expr, $thing: expr) => { + match $thing { + Ok(x) => x, + Err(e) => { + match e.action { + msgs::ErrorAction::DisconnectPeer { msg: _ } => { + //TODO: Try to push msg + log_debug!(self.logger, "Error handling message{}; disconnecting peer with: {}", OptionalFromDebugger(&peer_node_id), e.err); + return Err(PeerHandleError{ no_connection_possible: false }); + }, + msgs::ErrorAction::IgnoreAndLog(level) => { + log_given_level!(self.logger, level, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer_node_id), e.err); + continue + }, + msgs::ErrorAction::IgnoreDuplicateGossip => continue, // Don't even bother logging these + msgs::ErrorAction::IgnoreError => { + log_debug!(self.logger, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer_node_id), e.err); + continue; + }, + msgs::ErrorAction::SendErrorMessage { msg } => { + log_debug!(self.logger, "Error handling message{}; sending error message with: {}", OptionalFromDebugger(&peer_node_id), e.err); + self.enqueue_message($peer, &msg); + continue; + }, + msgs::ErrorAction::SendWarningMessage { msg, log_level } => { + log_given_level!(self.logger, log_level, "Error handling message{}; sending warning message with: {}", OptionalFromDebugger(&peer_node_id), e.err); + self.enqueue_message($peer, &msg); + continue; + }, + } + } + } + } + } + let mut peer_lock = peer_mutex.lock().unwrap(); let peer = &mut *peer_lock; + let mut msg_to_handle = None; + if peer_node_id.is_none() { + peer_node_id = peer.their_node_id.clone(); + } assert!(peer.pending_read_buffer.len() > 0); assert!(peer.pending_read_buffer.len() > peer.pending_read_buffer_pos); - let mut read_pos = 0; - while read_pos < data.len() { - { - let data_to_copy = cmp::min(peer.pending_read_buffer.len() - peer.pending_read_buffer_pos, data.len() - read_pos); - peer.pending_read_buffer[peer.pending_read_buffer_pos..peer.pending_read_buffer_pos + data_to_copy].copy_from_slice(&data[read_pos..read_pos + data_to_copy]); - read_pos += data_to_copy; - peer.pending_read_buffer_pos += data_to_copy; + { + let data_to_copy = cmp::min(peer.pending_read_buffer.len() - peer.pending_read_buffer_pos, data.len() - read_pos); + peer.pending_read_buffer[peer.pending_read_buffer_pos..peer.pending_read_buffer_pos + data_to_copy].copy_from_slice(&data[read_pos..read_pos + data_to_copy]); + read_pos += data_to_copy; + peer.pending_read_buffer_pos += data_to_copy; + } + + if peer.pending_read_buffer_pos == peer.pending_read_buffer.len() { + peer.pending_read_buffer_pos = 0; + + macro_rules! insert_node_id { + () => { + match self.node_id_to_descriptor.lock().unwrap().entry(peer.their_node_id.unwrap()) { + hash_map::Entry::Occupied(_) => { + log_trace!(self.logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap())); + peer.their_node_id = None; // Unset so that we don't generate a peer_disconnected event + return Err(PeerHandleError{ no_connection_possible: false }) + }, + hash_map::Entry::Vacant(entry) => { + log_debug!(self.logger, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap())); + entry.insert(peer_descriptor.clone()) + }, + }; + } } - if peer.pending_read_buffer_pos == peer.pending_read_buffer.len() { - peer.pending_read_buffer_pos = 0; + let next_step = peer.channel_encryptor.get_noise_step(); + match next_step { + NextNoiseStep::ActOne => { + let act_two = try_potential_handleerror!(peer, + peer.channel_encryptor.process_act_one_with_keys(&peer.pending_read_buffer[..], &self.our_node_secret, self.get_ephemeral_key())).to_vec(); + peer.pending_outbound_buffer.push_back(act_two); + peer.pending_read_buffer = [0; 66].to_vec(); // act three is 66 bytes long + }, + NextNoiseStep::ActTwo => { + let (act_three, their_node_id) = try_potential_handleerror!(peer, + peer.channel_encryptor.process_act_two(&peer.pending_read_buffer[..], &self.our_node_secret)); + peer.pending_outbound_buffer.push_back(act_three.to_vec()); + peer.pending_read_buffer = [0; 18].to_vec(); // Message length header is 18 bytes + peer.pending_read_is_header = true; + + peer.their_node_id = Some(their_node_id); + insert_node_id!(); + let features = InitFeatures::known(); + let resp = msgs::Init { features, remote_network_address: filter_addresses(peer.their_net_address.clone()) }; + self.enqueue_message(peer, &resp); + peer.awaiting_pong_timer_tick_intervals = 0; + }, + NextNoiseStep::ActThree => { + let their_node_id = try_potential_handleerror!(peer, + peer.channel_encryptor.process_act_three(&peer.pending_read_buffer[..])); + peer.pending_read_buffer = [0; 18].to_vec(); // Message length header is 18 bytes + peer.pending_read_is_header = true; + peer.their_node_id = Some(their_node_id); + insert_node_id!(); + let features = InitFeatures::known(); + let resp = msgs::Init { features, remote_network_address: filter_addresses(peer.their_net_address.clone()) }; + self.enqueue_message(peer, &resp); + peer.awaiting_pong_timer_tick_intervals = 0; + }, + NextNoiseStep::NoiseComplete => { + if peer.pending_read_is_header { + let msg_len = try_potential_handleerror!(peer, + peer.channel_encryptor.decrypt_length_header(&peer.pending_read_buffer[..])); + peer.pending_read_buffer = Vec::with_capacity(msg_len as usize + 16); + peer.pending_read_buffer.resize(msg_len as usize + 16, 0); + if msg_len < 2 { // Need at least the message type tag + return Err(PeerHandleError{ no_connection_possible: false }); + } + peer.pending_read_is_header = false; + } else { + let msg_data = try_potential_handleerror!(peer, + peer.channel_encryptor.decrypt_message(&peer.pending_read_buffer[..])); + assert!(msg_data.len() >= 2); + + // Reset read buffer + peer.pending_read_buffer = [0; 18].to_vec(); + peer.pending_read_is_header = true; - macro_rules! try_potential_handleerror { - ($thing: expr) => { - match $thing { + let mut reader = io::Cursor::new(&msg_data[..]); + let message_result = wire::read(&mut reader, &*self.custom_message_handler); + let message = match message_result { Ok(x) => x, Err(e) => { - match e.action { - msgs::ErrorAction::DisconnectPeer { msg: _ } => { - //TODO: Try to push msg - log_debug!(self.logger, "Error handling message{}; disconnecting peer with: {}", OptionalFromDebugger(&peer.their_node_id), e.err); - return Err(PeerHandleError{ no_connection_possible: false }); - }, - msgs::ErrorAction::IgnoreAndLog(level) => { - log_given_level!(self.logger, level, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer.their_node_id), e.err); - continue - }, - msgs::ErrorAction::IgnoreDuplicateGossip => continue, // Don't even bother logging these - msgs::ErrorAction::IgnoreError => { - log_debug!(self.logger, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer.their_node_id), e.err); + match e { + // Note that to avoid recursion we never call + // `do_attempt_write_data` from here, causing + // the messages enqueued here to not actually + // be sent before the peer is disconnected. + (msgs::DecodeError::UnknownRequiredFeature, Some(ty)) if is_gossip_msg(ty) => { + log_gossip!(self.logger, "Got a channel/node announcement with an unknown required feature flag, you may want to update!"); continue; - }, - msgs::ErrorAction::SendErrorMessage { msg } => { - log_debug!(self.logger, "Error handling message{}; sending error message with: {}", OptionalFromDebugger(&peer.their_node_id), e.err); - self.enqueue_message(peer, &msg); + } + (msgs::DecodeError::UnsupportedCompression, _) => { + log_gossip!(self.logger, "We don't support zlib-compressed message fields, sending a warning and ignoring message"); + self.enqueue_message(peer, &msgs::WarningMessage { channel_id: [0; 32], data: "Unsupported message compression: zlib".to_owned() }); continue; - }, - msgs::ErrorAction::SendWarningMessage { msg, log_level } => { - log_given_level!(self.logger, log_level, "Error handling message{}; sending warning message with: {}", OptionalFromDebugger(&peer.their_node_id), e.err); - self.enqueue_message(peer, &msg); + } + (_, Some(ty)) if is_gossip_msg(ty) => { + log_gossip!(self.logger, "Got an invalid value while deserializing a gossip message"); + self.enqueue_message(peer, &msgs::WarningMessage { channel_id: [0; 32], data: "Unreadable/bogus gossip message".to_owned() }); continue; - }, + } + (msgs::DecodeError::UnknownRequiredFeature, ty) => { + log_gossip!(self.logger, "Received a message with an unknown required feature flag or TLV, you may want to update!"); + self.enqueue_message(peer, &msgs::WarningMessage { channel_id: [0; 32], data: format!("Received an unknown required feature/TLV in message type {:?}", ty) }); + return Err(PeerHandleError { no_connection_possible: false }); + } + (msgs::DecodeError::UnknownVersion, _) => return Err(PeerHandleError { no_connection_possible: false }), + (msgs::DecodeError::InvalidValue, _) => { + log_debug!(self.logger, "Got an invalid value while deserializing message"); + return Err(PeerHandleError { no_connection_possible: false }); + } + (msgs::DecodeError::ShortRead, _) => { + log_debug!(self.logger, "Deserialization failed due to shortness of message"); + return Err(PeerHandleError { no_connection_possible: false }); + } + (msgs::DecodeError::BadLengthDescriptor, _) => return Err(PeerHandleError { no_connection_possible: false }), + (msgs::DecodeError::Io(_), _) => return Err(PeerHandleError { no_connection_possible: false }), } } - } - } - } - - macro_rules! insert_node_id { - () => { - match self.node_id_to_descriptor.lock().unwrap().entry(peer.their_node_id.unwrap()) { - hash_map::Entry::Occupied(_) => { - log_trace!(self.logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap())); - peer.their_node_id = None; // Unset so that we don't generate a peer_disconnected event - return Err(PeerHandleError{ no_connection_possible: false }) - }, - hash_map::Entry::Vacant(entry) => { - log_debug!(self.logger, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap())); - entry.insert(peer_descriptor.clone()) - }, }; - } - } - - let next_step = peer.channel_encryptor.get_noise_step(); - match next_step { - NextNoiseStep::ActOne => { - let act_two = try_potential_handleerror!(peer.channel_encryptor.process_act_one_with_keys(&peer.pending_read_buffer[..], &self.our_node_secret, self.get_ephemeral_key())).to_vec(); - peer.pending_outbound_buffer.push_back(act_two); - peer.pending_read_buffer = [0; 66].to_vec(); // act three is 66 bytes long - }, - NextNoiseStep::ActTwo => { - let (act_three, their_node_id) = try_potential_handleerror!(peer.channel_encryptor.process_act_two(&peer.pending_read_buffer[..], &self.our_node_secret)); - peer.pending_outbound_buffer.push_back(act_three.to_vec()); - peer.pending_read_buffer = [0; 18].to_vec(); // Message length header is 18 bytes - peer.pending_read_is_header = true; - peer.their_node_id = Some(their_node_id); - insert_node_id!(); - let features = InitFeatures::known(); - let resp = msgs::Init { features, remote_network_address: filter_addresses(peer.their_net_address.clone())}; - self.enqueue_message(peer, &resp); - peer.awaiting_pong_timer_tick_intervals = 0; - }, - NextNoiseStep::ActThree => { - let their_node_id = try_potential_handleerror!(peer.channel_encryptor.process_act_three(&peer.pending_read_buffer[..])); - peer.pending_read_buffer = [0; 18].to_vec(); // Message length header is 18 bytes - peer.pending_read_is_header = true; - peer.their_node_id = Some(their_node_id); - insert_node_id!(); - let features = InitFeatures::known(); - let resp = msgs::Init { features, remote_network_address: filter_addresses(peer.their_net_address.clone())}; - self.enqueue_message(peer, &resp); - peer.awaiting_pong_timer_tick_intervals = 0; - }, - NextNoiseStep::NoiseComplete => { - if peer.pending_read_is_header { - let msg_len = try_potential_handleerror!(peer.channel_encryptor.decrypt_length_header(&peer.pending_read_buffer[..])); - peer.pending_read_buffer = Vec::with_capacity(msg_len as usize + 16); - peer.pending_read_buffer.resize(msg_len as usize + 16, 0); - if msg_len < 2 { // Need at least the message type tag - return Err(PeerHandleError{ no_connection_possible: false }); - } - peer.pending_read_is_header = false; - } else { - let msg_data = try_potential_handleerror!(peer.channel_encryptor.decrypt_message(&peer.pending_read_buffer[..])); - assert!(msg_data.len() >= 2); - - // Reset read buffer - peer.pending_read_buffer = [0; 18].to_vec(); - peer.pending_read_is_header = true; - - let mut reader = io::Cursor::new(&msg_data[..]); - let message_result = wire::read(&mut reader, &*self.custom_message_handler); - let message = match message_result { - Ok(x) => x, - Err(e) => { - match e { - // Note that to avoid recursion we never call - // `do_attempt_write_data` from here, causing - // the messages enqueued here to not actually - // be sent before the peer is disconnected. - (msgs::DecodeError::UnknownRequiredFeature, Some(ty)) if is_gossip_msg(ty) => { - log_gossip!(self.logger, "Got a channel/node announcement with an unknown required feature flag, you may want to update!"); - continue; - } - (msgs::DecodeError::UnsupportedCompression, _) => { - log_gossip!(self.logger, "We don't support zlib-compressed message fields, sending a warning and ignoring message"); - self.enqueue_message(peer, &msgs::WarningMessage { channel_id: [0; 32], data: "Unsupported message compression: zlib".to_owned() }); - continue; - } - (_, Some(ty)) if is_gossip_msg(ty) => { - log_gossip!(self.logger, "Got an invalid value while deserializing a gossip message"); - self.enqueue_message(peer, &msgs::WarningMessage { channel_id: [0; 32], data: "Unreadable/bogus gossip message".to_owned() }); - continue; - } - (msgs::DecodeError::UnknownRequiredFeature, ty) => { - log_gossip!(self.logger, "Received a message with an unknown required feature flag or TLV, you may want to update!"); - self.enqueue_message(peer, &msgs::WarningMessage { channel_id: [0; 32], data: format!("Received an unknown required feature/TLV in message type {:?}", ty) }); - return Err(PeerHandleError { no_connection_possible: false }); - } - (msgs::DecodeError::UnknownVersion, _) => return Err(PeerHandleError { no_connection_possible: false }), - (msgs::DecodeError::InvalidValue, _) => { - log_debug!(self.logger, "Got an invalid value while deserializing message"); - return Err(PeerHandleError { no_connection_possible: false }); - } - (msgs::DecodeError::ShortRead, _) => { - log_debug!(self.logger, "Deserialization failed due to shortness of message"); - return Err(PeerHandleError { no_connection_possible: false }); - } - (msgs::DecodeError::BadLengthDescriptor, _) => return Err(PeerHandleError { no_connection_possible: false }), - (msgs::DecodeError::Io(_), _) => return Err(PeerHandleError { no_connection_possible: false }), - } - } - }; - - match self.handle_message(peer, message) { - Err(handling_error) => match handling_error { - MessageHandlingError::PeerHandleError(e) => { return Err(e) }, - MessageHandlingError::LightningError(e) => { - try_potential_handleerror!(Err(e)); - }, - }, - Ok(Some(msg)) => { - peer_node_id = Some(peer.their_node_id.expect("After noise is complete, their_node_id is always set")); - msgs_to_forward.push(msg); - }, - Ok(None) => {}, - } - } + msg_to_handle = Some(message); } } } } - - peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_READ_PAUSE // pause_read + pause_read = peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_READ_PAUSE; + + if let Some(message) = msg_to_handle { + match self.handle_message(&peer_mutex, peer_lock, message) { + Err(handling_error) => match handling_error { + MessageHandlingError::PeerHandleError(e) => { return Err(e) }, + MessageHandlingError::LightningError(e) => { + try_potential_handleerror!(&mut peer_mutex.lock().unwrap(), Err(e)); + }, + }, + Ok(Some(msg)) => { + msgs_to_forward.push(msg); + }, + Ok(None) => {}, + } + } } - }; - - for msg in msgs_to_forward.drain(..) { - self.forward_broadcast_msg(&*peers, &msg, peer_node_id.as_ref()); } + } - pause_read - }; + for msg in msgs_to_forward.drain(..) { + self.forward_broadcast_msg(&*peers, &msg, peer_node_id.as_ref()); + } Ok(pause_read) } @@ -1043,52 +1055,74 @@ impl P /// Returns the message back if it needs to be broadcasted to all other peers. fn handle_message( &self, - peer: &mut Peer, + peer_mutex: &Mutex, + mut peer_lock: MutexGuard, message: wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage> ) -> Result::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> { - if is_gossip_msg(message.type_id()) { - log_gossip!(self.logger, "Received message {:?} from {}", message, log_pubkey!(peer.their_node_id.unwrap())); - } else { - log_trace!(self.logger, "Received message {:?} from {}", message, log_pubkey!(peer.their_node_id.unwrap())); - } - - peer.received_message_since_timer_tick = true; + let their_node_id = peer_lock.their_node_id.clone().expect("We know the peer's public key by the time we receive messages"); + peer_lock.received_message_since_timer_tick = true; // Need an Init as first message - if let wire::Message::Init(_) = message { - } else if peer.their_features.is_none() { - log_debug!(self.logger, "Peer {} sent non-Init first message", log_pubkey!(peer.their_node_id.unwrap())); + if let wire::Message::Init(msg) = message { + if msg.features.requires_unknown_bits() { + log_debug!(self.logger, "Peer features required unknown version bits"); + return Err(PeerHandleError{ no_connection_possible: true }.into()); + } + if peer_lock.their_features.is_some() { + return Err(PeerHandleError{ no_connection_possible: false }.into()); + } + + log_info!(self.logger, "Received peer Init message from {}: {}", log_pubkey!(their_node_id), msg.features); + + // For peers not supporting gossip queries start sync now, otherwise wait until we receive a filter. + if msg.features.initial_routing_sync() && !msg.features.supports_gossip_queries() { + peer_lock.sync_status = InitSyncTracker::ChannelsSyncing(0); + } + + if !msg.features.supports_static_remote_key() { + log_debug!(self.logger, "Peer {} does not support static remote key, disconnecting with no_connection_possible", log_pubkey!(their_node_id)); + return Err(PeerHandleError{ no_connection_possible: true }.into()); + } + + self.message_handler.route_handler.peer_connected(&their_node_id, &msg); + + self.message_handler.chan_handler.peer_connected(&their_node_id, &msg); + peer_lock.their_features = Some(msg.features); + return Ok(None); + } else if peer_lock.their_features.is_none() { + log_debug!(self.logger, "Peer {} sent non-Init first message", log_pubkey!(their_node_id)); return Err(PeerHandleError{ no_connection_possible: false }.into()); } - let mut should_forward = None; - - match message { - // Setup and Control messages: - wire::Message::Init(msg) => { - if msg.features.requires_unknown_bits() { - log_debug!(self.logger, "Peer features required unknown version bits"); - return Err(PeerHandleError{ no_connection_possible: true }.into()); - } - if peer.their_features.is_some() { - return Err(PeerHandleError{ no_connection_possible: false }.into()); - } + if let wire::Message::GossipTimestampFilter(_msg) = message { + // When supporting gossip messages, start inital gossip sync only after we receive + // a GossipTimestampFilter + if peer_lock.their_features.as_ref().unwrap().supports_gossip_queries() && + !peer_lock.sent_gossip_timestamp_filter { + peer_lock.sent_gossip_timestamp_filter = true; + peer_lock.sync_status = InitSyncTracker::ChannelsSyncing(0); + } + return Ok(None); + } - log_info!(self.logger, "Received peer Init message from {}: {}", log_pubkey!(peer.their_node_id.unwrap()), msg.features); + let their_features = peer_lock.their_features.clone(); + mem::drop(peer_lock); - // For peers not supporting gossip queries start sync now, otherwise wait until we receive a filter. - if msg.features.initial_routing_sync() && !msg.features.supports_gossip_queries() { - peer.sync_status = InitSyncTracker::ChannelsSyncing(0); - } - if !msg.features.supports_static_remote_key() { - log_debug!(self.logger, "Peer {} does not support static remote key, disconnecting with no_connection_possible", log_pubkey!(peer.their_node_id.unwrap())); - return Err(PeerHandleError{ no_connection_possible: true }.into()); - } + if is_gossip_msg(message.type_id()) { + log_gossip!(self.logger, "Received message {:?} from {}", message, log_pubkey!(their_node_id)); + } else { + log_trace!(self.logger, "Received message {:?} from {}", message, log_pubkey!(their_node_id)); + } - self.message_handler.route_handler.peer_connected(&peer.their_node_id.unwrap(), &msg); + let mut should_forward = None; - self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap(), &msg); - peer.their_features = Some(msg.features); + match message { + // Setup and Control messages: + wire::Message::Init(_) => { + // Handled above + }, + wire::Message::GossipTimestampFilter(_) => { + // Handled above }, wire::Message::Error(msg) => { let mut data_is_printable = true; @@ -1100,11 +1134,11 @@ impl P } if data_is_printable { - log_debug!(self.logger, "Got Err message from {}: {}", log_pubkey!(peer.their_node_id.unwrap()), msg.data); + log_debug!(self.logger, "Got Err message from {}: {}", log_pubkey!(their_node_id), msg.data); } else { - log_debug!(self.logger, "Got Err message from {} with non-ASCII error message", log_pubkey!(peer.their_node_id.unwrap())); + log_debug!(self.logger, "Got Err message from {} with non-ASCII error message", log_pubkey!(their_node_id)); } - self.message_handler.chan_handler.handle_error(&peer.their_node_id.unwrap(), &msg); + self.message_handler.chan_handler.handle_error(&their_node_id, &msg); if msg.channel_id == [0; 32] { return Err(PeerHandleError{ no_connection_possible: true }.into()); } @@ -1119,78 +1153,79 @@ impl P } if data_is_printable { - log_debug!(self.logger, "Got warning message from {}: {}", log_pubkey!(peer.their_node_id.unwrap()), msg.data); + log_debug!(self.logger, "Got warning message from {}: {}", log_pubkey!(their_node_id), msg.data); } else { - log_debug!(self.logger, "Got warning message from {} with non-ASCII error message", log_pubkey!(peer.their_node_id.unwrap())); + log_debug!(self.logger, "Got warning message from {} with non-ASCII error message", log_pubkey!(their_node_id)); } }, wire::Message::Ping(msg) => { if msg.ponglen < 65532 { let resp = msgs::Pong { byteslen: msg.ponglen }; - self.enqueue_message(peer, &resp); + self.enqueue_message(&mut *peer_mutex.lock().unwrap(), &resp); } }, wire::Message::Pong(_msg) => { - peer.awaiting_pong_timer_tick_intervals = 0; - peer.msgs_sent_since_pong = 0; + let mut peer_lock = peer_mutex.lock().unwrap(); + peer_lock.awaiting_pong_timer_tick_intervals = 0; + peer_lock.msgs_sent_since_pong = 0; }, // Channel messages: wire::Message::OpenChannel(msg) => { - self.message_handler.chan_handler.handle_open_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg); + self.message_handler.chan_handler.handle_open_channel(&their_node_id, their_features.clone().unwrap(), &msg); }, wire::Message::AcceptChannel(msg) => { - self.message_handler.chan_handler.handle_accept_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg); + self.message_handler.chan_handler.handle_accept_channel(&their_node_id, their_features.clone().unwrap(), &msg); }, wire::Message::FundingCreated(msg) => { - self.message_handler.chan_handler.handle_funding_created(&peer.their_node_id.unwrap(), &msg); + self.message_handler.chan_handler.handle_funding_created(&their_node_id, &msg); }, wire::Message::FundingSigned(msg) => { - self.message_handler.chan_handler.handle_funding_signed(&peer.their_node_id.unwrap(), &msg); + self.message_handler.chan_handler.handle_funding_signed(&their_node_id, &msg); }, wire::Message::FundingLocked(msg) => { - self.message_handler.chan_handler.handle_funding_locked(&peer.their_node_id.unwrap(), &msg); + self.message_handler.chan_handler.handle_funding_locked(&their_node_id, &msg); }, wire::Message::Shutdown(msg) => { - self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), peer.their_features.as_ref().unwrap(), &msg); + self.message_handler.chan_handler.handle_shutdown(&their_node_id, their_features.as_ref().unwrap(), &msg); }, wire::Message::ClosingSigned(msg) => { - self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg); + self.message_handler.chan_handler.handle_closing_signed(&their_node_id, &msg); }, // Commitment messages: wire::Message::UpdateAddHTLC(msg) => { - self.message_handler.chan_handler.handle_update_add_htlc(&peer.their_node_id.unwrap(), &msg); + self.message_handler.chan_handler.handle_update_add_htlc(&their_node_id, &msg); }, wire::Message::UpdateFulfillHTLC(msg) => { - self.message_handler.chan_handler.handle_update_fulfill_htlc(&peer.their_node_id.unwrap(), &msg); + self.message_handler.chan_handler.handle_update_fulfill_htlc(&their_node_id, &msg); }, wire::Message::UpdateFailHTLC(msg) => { - self.message_handler.chan_handler.handle_update_fail_htlc(&peer.their_node_id.unwrap(), &msg); + self.message_handler.chan_handler.handle_update_fail_htlc(&their_node_id, &msg); }, wire::Message::UpdateFailMalformedHTLC(msg) => { - self.message_handler.chan_handler.handle_update_fail_malformed_htlc(&peer.their_node_id.unwrap(), &msg); + self.message_handler.chan_handler.handle_update_fail_malformed_htlc(&their_node_id, &msg); }, wire::Message::CommitmentSigned(msg) => { - self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg); + self.message_handler.chan_handler.handle_commitment_signed(&their_node_id, &msg); }, wire::Message::RevokeAndACK(msg) => { - self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg); + self.message_handler.chan_handler.handle_revoke_and_ack(&their_node_id, &msg); }, wire::Message::UpdateFee(msg) => { - self.message_handler.chan_handler.handle_update_fee(&peer.their_node_id.unwrap(), &msg); + self.message_handler.chan_handler.handle_update_fee(&their_node_id, &msg); }, wire::Message::ChannelReestablish(msg) => { - self.message_handler.chan_handler.handle_channel_reestablish(&peer.their_node_id.unwrap(), &msg); + self.message_handler.chan_handler.handle_channel_reestablish(&their_node_id, &msg); }, // Routing messages: wire::Message::AnnouncementSignatures(msg) => { - self.message_handler.chan_handler.handle_announcement_signatures(&peer.their_node_id.unwrap(), &msg); + self.message_handler.chan_handler.handle_announcement_signatures(&their_node_id, &msg); }, wire::Message::ChannelAnnouncement(msg) => { if self.message_handler.route_handler.handle_channel_announcement(&msg) @@ -1205,32 +1240,23 @@ impl P } }, wire::Message::ChannelUpdate(msg) => { - self.message_handler.chan_handler.handle_channel_update(&peer.their_node_id.unwrap(), &msg); + self.message_handler.chan_handler.handle_channel_update(&their_node_id, &msg); if self.message_handler.route_handler.handle_channel_update(&msg) .map_err(|e| -> MessageHandlingError { e.into() })? { should_forward = Some(wire::Message::ChannelUpdate(msg)); } }, wire::Message::QueryShortChannelIds(msg) => { - self.message_handler.route_handler.handle_query_short_channel_ids(&peer.their_node_id.unwrap(), msg)?; + self.message_handler.route_handler.handle_query_short_channel_ids(&their_node_id, msg)?; }, wire::Message::ReplyShortChannelIdsEnd(msg) => { - self.message_handler.route_handler.handle_reply_short_channel_ids_end(&peer.their_node_id.unwrap(), msg)?; + self.message_handler.route_handler.handle_reply_short_channel_ids_end(&their_node_id, msg)?; }, wire::Message::QueryChannelRange(msg) => { - self.message_handler.route_handler.handle_query_channel_range(&peer.their_node_id.unwrap(), msg)?; + self.message_handler.route_handler.handle_query_channel_range(&their_node_id, msg)?; }, wire::Message::ReplyChannelRange(msg) => { - self.message_handler.route_handler.handle_reply_channel_range(&peer.their_node_id.unwrap(), msg)?; - }, - wire::Message::GossipTimestampFilter(_msg) => { - // When supporting gossip messages, start inital gossip sync only after we receive - // a GossipTimestampFilter - if peer.their_features.as_ref().unwrap().supports_gossip_queries() && - !peer.sent_gossip_timestamp_filter { - peer.sent_gossip_timestamp_filter = true; - peer.sync_status = InitSyncTracker::ChannelsSyncing(0); - } + self.message_handler.route_handler.handle_reply_channel_range(&their_node_id, msg)?; }, // Unknown messages: @@ -1243,7 +1269,7 @@ impl P log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", type_id); }, wire::Message::Custom(custom) => { - self.custom_message_handler.handle_custom_message(custom, &peer.their_node_id.unwrap())?; + self.custom_message_handler.handle_custom_message(custom, &their_node_id)?; }, }; Ok(should_forward) From b1550524cfe2689e3743ddfc0a78527d26de8613 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 6 Oct 2021 04:29:19 +0000 Subject: [PATCH 03/15] [net-tokio] Call PeerManager::process_events without blocking reads Unlike very ancient versions of lightning-net-tokio, this does not rely on a single global process_events future, but instead has one per connection. This could still cause significant contention, so we'll ensure only two process_events calls can exist at once in the next few commits. --- lightning-net-tokio/src/lib.rs | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index a9fd861bc84..cee7c5c1b98 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -121,11 +121,28 @@ struct Connection { id: u64, } impl Connection { + async fn poll_event_process(peer_manager: Arc, Arc, Arc, Arc>>, mut event_receiver: mpsc::Receiver<()>) where + CMH: ChannelMessageHandler + 'static + Send + Sync, + RMH: RoutingMessageHandler + 'static + Send + Sync, + L: Logger + 'static + ?Sized + Send + Sync, + UMH: CustomMessageHandler + 'static + Send + Sync { + loop { + if event_receiver.recv().await.is_none() { + return; + } + peer_manager.process_events(); + } + } + async fn schedule_read(peer_manager: Arc, Arc, Arc, Arc>>, us: Arc>, mut reader: io::ReadHalf, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where - CMH: ChannelMessageHandler + 'static, - RMH: RoutingMessageHandler + 'static, - L: Logger + 'static + ?Sized, - UMH: CustomMessageHandler + 'static { + CMH: ChannelMessageHandler + 'static + Send + Sync, + RMH: RoutingMessageHandler + 'static + Send + Sync, + L: Logger + 'static + ?Sized + Send + Sync, + UMH: CustomMessageHandler + 'static + Send + Sync { + // Create a waker to wake up poll_event_process, above + let (event_waker, event_receiver) = mpsc::channel(1); + tokio::spawn(Self::poll_event_process(Arc::clone(&peer_manager), event_receiver)); + // 8KB is nice and big but also should never cause any issues with stack overflowing. let mut buf = [0; 8192]; @@ -176,7 +193,7 @@ impl Connection { Err(_) => break Disconnect::PeerDisconnected, }, } - peer_manager.process_events(); + let _ = event_waker.try_send(()); }; let writer_option = us.lock().unwrap().writer.take(); if let Some(mut writer) = writer_option { From a5adda18dc2c05959c107a0b2769730680d32d18 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sat, 25 Sep 2021 22:24:23 +0000 Subject: [PATCH 04/15] Avoid taking the peers write lock during event processing Because the peers write lock "blocks the world", and happens after each read event, always taking the write lock has pretty severe impacts on parallelism. Instead, here, we only take the global write lock if we have to disconnect a peer. --- lightning/src/ln/peer_handler.rs | 89 +++++++++++++++++++++----------- 1 file changed, 59 insertions(+), 30 deletions(-) diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 2226e9574b1..5836a8d0931 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -433,6 +433,10 @@ pub struct PeerManager>, + /// We can only have one thread processing events at once, but we don't usually need the full + /// `peers` write lock to do so, so instead we block on this empty mutex when entering + /// `process_events`. + event_processing_lock: Mutex<()>, our_node_secret: SecretKey, ephemeral_key_midstate: Sha256Engine, custom_message_handler: CMH, @@ -564,6 +568,7 @@ impl P peers: HashMap::new(), }), node_id_to_descriptor: Mutex::new(HashMap::new()), + event_processing_lock: Mutex::new(()), our_node_secret, ephemeral_key_midstate, peer_counter: AtomicCounter::new(), @@ -1368,20 +1373,29 @@ impl P /// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards /// [`send_data`]: SocketDescriptor::send_data pub fn process_events(&self) { + let _single_processor_lock = self.event_processing_lock.lock().unwrap(); + + let mut peers_to_disconnect = HashMap::new(); + let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events(); + events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events()); + { // TODO: There are some DoS attacks here where you can flood someone's outbound send // buffer by doing things like announcing channels on another node. We should be willing to // drop optional-ish messages when send buffers get full! - let mut peers_lock = self.peers.write().unwrap(); - let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events(); - events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events()); - let peers = &mut *peers_lock; + let peers_lock = self.peers.read().unwrap(); + let peers = &*peers_lock; macro_rules! get_peer_for_forwarding { ($node_id: expr) => { { - match self.node_id_to_descriptor.lock().unwrap().get($node_id) { - Some(descriptor) => match peers.peers.get_mut(&descriptor) { + if peers_to_disconnect.get($node_id).is_some() { + // If we've "disconnected" this peer, do not send to it. + continue; + } + let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned(); + match descriptor_opt { + Some(descriptor) => match peers.peers.get(&descriptor) { Some(peer_mutex) => { let peer_lock = peer_mutex.lock().unwrap(); if peer_lock.their_features.is_none() { @@ -1389,7 +1403,10 @@ impl P } peer_lock }, - None => panic!("Inconsistent peers set state!"), + None => { + debug_assert!(false, "Inconsistent peers set state!"); + continue; + } }, None => { continue; @@ -1525,28 +1542,10 @@ impl P MessageSendEvent::HandleError { ref node_id, ref action } => { match *action { msgs::ErrorAction::DisconnectPeer { ref msg } => { - // Note that since we are holding the peers *write* lock we can - // remove from node_id_to_descriptor immediately (as no other - // thread can be holding the peer lock if we have the global write - // lock). - if let Some(mut descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(node_id) { - if let Some(peer_mutex) = peers.peers.remove(&descriptor) { - if let Some(ref msg) = *msg { - log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", - log_pubkey!(node_id), - msg.data); - let mut peer = peer_mutex.lock().unwrap(); - self.enqueue_message(&mut *peer, msg); - // This isn't guaranteed to work, but if there is enough free - // room in the send buffer, put the error message there... - self.do_attempt_write_data(&mut descriptor, &mut *peer); - } else { - log_gossip!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message", log_pubkey!(node_id)); - } - } - descriptor.disconnect_socket(); - self.message_handler.chan_handler.peer_disconnected(&node_id, false); - } + // We do not have the peers write lock, so we just store that we're + // about to disconenct the peer and do it after we finish + // processing most messages. + peers_to_disconnect.insert(*node_id, msg.clone()); }, msgs::ErrorAction::IgnoreAndLog(level) => { log_given_level!(self.logger, level, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id)); @@ -1591,13 +1590,43 @@ impl P } for (node_id, msg) in self.custom_message_handler.get_and_clear_pending_msg() { + if peers_to_disconnect.get(&node_id).is_some() { continue; } self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg); } - for (descriptor, peer_mutex) in peers.peers.iter_mut() { + for (descriptor, peer_mutex) in peers.peers.iter() { self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer_mutex.lock().unwrap()); } } + if !peers_to_disconnect.is_empty() { + let mut peers_lock = self.peers.write().unwrap(); + let peers = &mut *peers_lock; + for (node_id, msg) in peers_to_disconnect.drain() { + // Note that since we are holding the peers *write* lock we can + // remove from node_id_to_descriptor immediately (as no other + // thread can be holding the peer lock if we have the global write + // lock). + + if let Some(mut descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(&node_id) { + if let Some(peer_mutex) = peers.peers.remove(&descriptor) { + if let Some(msg) = msg { + log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", + log_pubkey!(node_id), + msg.data); + let mut peer = peer_mutex.lock().unwrap(); + self.enqueue_message(&mut *peer, &msg); + // This isn't guaranteed to work, but if there is enough free + // room in the send buffer, put the error message there... + self.do_attempt_write_data(&mut descriptor, &mut *peer); + } else { + log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message", log_pubkey!(node_id)); + } + } + descriptor.disconnect_socket(); + self.message_handler.chan_handler.peer_disconnected(&node_id, false); + } + } + } } /// Indicates that the given socket descriptor's connection is now closed. From 4f50a94a3f6b9f9b2f047d319983f10ed0f3e88d Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 6 Oct 2021 06:10:01 +0000 Subject: [PATCH 05/15] Avoid the peers write lock unless we need it in timer_tick_occurred Similar to the previous commit, this avoids "blocking the world" on every timer tick unless we need to disconnect peers. --- lightning/src/ln/peer_handler.rs | 48 +++++++++++++++++--------------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 5836a8d0931..7408263085c 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -1720,55 +1720,44 @@ impl P /// /// [`send_data`]: SocketDescriptor::send_data pub fn timer_tick_occurred(&self) { - let mut peers_lock = self.peers.write().unwrap(); + let mut descriptors_needing_disconnect = Vec::new(); { - let mut descriptors_needing_disconnect = Vec::new(); - let peer_count = peers_lock.peers.len(); + let peers_lock = self.peers.read().unwrap(); - peers_lock.peers.retain(|descriptor, peer_mutex| { + for (descriptor, peer_mutex) in peers_lock.peers.iter() { let mut peer = peer_mutex.lock().unwrap(); - let mut do_disconnect_peer = false; if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_node_id.is_none() { // The peer needs to complete its handshake before we can exchange messages. We // give peers one timer tick to complete handshake, reusing // `awaiting_pong_timer_tick_intervals` to track number of timer ticks taken // for handshake completion. if peer.awaiting_pong_timer_tick_intervals != 0 { - do_disconnect_peer = true; + descriptors_needing_disconnect.push(descriptor.clone()); } else { peer.awaiting_pong_timer_tick_intervals = 1; - return true; } + continue; } if peer.awaiting_pong_timer_tick_intervals == -1 { // Magic value set in `maybe_send_extra_ping`. peer.awaiting_pong_timer_tick_intervals = 1; peer.received_message_since_timer_tick = false; - return true; + continue; } - if do_disconnect_peer - || (peer.awaiting_pong_timer_tick_intervals > 0 && !peer.received_message_since_timer_tick) + if (peer.awaiting_pong_timer_tick_intervals > 0 && !peer.received_message_since_timer_tick) || peer.awaiting_pong_timer_tick_intervals as u64 > - MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peer_count as u64 + MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.peers.len() as u64 { descriptors_needing_disconnect.push(descriptor.clone()); - match peer.their_node_id { - Some(node_id) => { - log_trace!(self.logger, "Disconnecting peer with id {} due to ping timeout", node_id); - self.node_id_to_descriptor.lock().unwrap().remove(&node_id); - self.message_handler.chan_handler.peer_disconnected(&node_id, false); - } - None => {}, - } - return false; + continue; } peer.received_message_since_timer_tick = false; if peer.awaiting_pong_timer_tick_intervals > 0 { peer.awaiting_pong_timer_tick_intervals += 1; - return true; + continue; } peer.awaiting_pong_timer_tick_intervals = 1; @@ -1778,9 +1767,22 @@ impl P }; self.enqueue_message(&mut *peer, &ping); self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer); + } + } - true - }); + if !descriptors_needing_disconnect.is_empty() { + { + let mut peers_lock = self.peers.write().unwrap(); + for descriptor in descriptors_needing_disconnect.iter() { + if let Some(peer) = peers_lock.peers.remove(&descriptor) { + if let Some(node_id) = peer.lock().unwrap().their_node_id { + log_trace!(self.logger, "Disconnecting peer with id {} due to ping timeout", node_id); + self.node_id_to_descriptor.lock().unwrap().remove(&node_id); + self.message_handler.chan_handler.peer_disconnected(&node_id, false); + } + } + } + } for mut descriptor in descriptors_needing_disconnect.drain(..) { descriptor.disconnect_socket(); From 97711aef96556637b3f8b4336b688fbfcf4c2beb Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 6 Oct 2021 04:45:07 +0000 Subject: [PATCH 06/15] Limit blocked PeerManager::process_events waiters to two Only one instance of PeerManager::process_events can run at a time, and each run always finishes all available work before returning. Thus, having several threads blocked on the process_events lock doesn't accomplish anything but blocking more threads. Here we limit the number of blocked calls on process_events to two - one processing events and one blocked at the top which will process all available events after the first completes. --- lightning/src/ln/peer_handler.rs | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 7408263085c..bb7b697e420 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -34,6 +34,7 @@ use prelude::*; use io; use alloc::collections::LinkedList; use sync::{Arc, Mutex, MutexGuard, RwLock}; +use core::sync::atomic::{AtomicBool, Ordering}; use core::{cmp, hash, fmt, mem}; use core::ops::Deref; use core::convert::Infallible; @@ -437,6 +438,11 @@ pub struct PeerManager, + /// Because event processing is global and always does all available work before returning, + /// there is no reason for us to have many event processors waiting on the lock at once. + /// Instead, we limit the total blocked event processors to always exactly one by setting this + /// when an event process call is waiting. + blocked_event_processors: AtomicBool, our_node_secret: SecretKey, ephemeral_key_midstate: Sha256Engine, custom_message_handler: CMH, @@ -569,6 +575,7 @@ impl P }), node_id_to_descriptor: Mutex::new(HashMap::new()), event_processing_lock: Mutex::new(()), + blocked_event_processors: AtomicBool::new(false), our_node_secret, ephemeral_key_midstate, peer_counter: AtomicCounter::new(), @@ -1369,11 +1376,34 @@ impl P /// You don't have to call this function explicitly if you are using [`lightning-net-tokio`] /// or one of the other clients provided in our language bindings. /// + /// Note that if there are any other calls to this function waiting on lock(s) this may return + /// without doing any work. All available events that need handling will be handled before the + /// other calls return. + /// /// [`send_payment`]: crate::ln::channelmanager::ChannelManager::send_payment /// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards /// [`send_data`]: SocketDescriptor::send_data pub fn process_events(&self) { - let _single_processor_lock = self.event_processing_lock.lock().unwrap(); + let mut _single_processor_lock = self.event_processing_lock.try_lock(); + if _single_processor_lock.is_err() { + // While we could wake the older sleeper here with a CV and make more even waiting + // times, that would be a lot of overengineering for a simple "reduce total waiter + // count" goal. + match self.blocked_event_processors.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) { + Err(val) => { + debug_assert!(val, "compare_exchange failed spuriously?"); + return; + }, + Ok(val) => { + debug_assert!(!val, "compare_exchange succeeded spuriously?"); + // We're the only waiter, as the running process_events may have emptied the + // pending events "long" ago and there are new events for us to process, wait until + // its done and process any leftover events before returning. + _single_processor_lock = Ok(self.event_processing_lock.lock().unwrap()); + self.blocked_event_processors.store(false, Ordering::Release); + } + } + } let mut peers_to_disconnect = HashMap::new(); let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events(); From f90983180eac9950256d47cb5eb77353f19fec6b Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 3 Oct 2021 21:44:52 +0000 Subject: [PATCH 07/15] Wake reader future when we fail to flush socket buffer This avoids any extra calls to `read_event` after a write fails to flush the write buffer fully, as is required by the PeerManager API (though it isn't critical). --- lightning-net-tokio/src/lib.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index cee7c5c1b98..fc7b260e41b 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -465,6 +465,9 @@ impl peer_handler::SocketDescriptor for SocketDescriptor { // pause read given we're now waiting on the remote end to ACK (and in // accordance with the send_data() docs). us.read_paused = true; + // Further, to avoid any current pending read causing a `read_event` call, wake + // up the read_waker and restart its loop. + let _ = us.read_waker.try_send(()); return written_len; }, } From ae4ceb71a584f0aa9e0c1a14a6219d87f1668eba Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 6 Oct 2021 16:58:56 +0000 Subject: [PATCH 08/15] Create a simple `FairRwLock` to avoid readers starving writers Because we handle messages (which can take some time, persisting things to disk or validating cryptographic signatures) with the top-level read lock, but require the top-level write lock to connect new peers or handle disconnection, we are particularly sensitive to writer starvation issues. Rust's libstd RwLock does not provide any fairness guarantees, using whatever the OS provides as-is. On Linux, pthreads defaults to starving writers, which Rust's RwLock exposes to us (without any configurability). Here we work around that issue by blocking readers if there are pending writers, optimizing for readable code over perfectly-optimized blocking. --- lightning/src/debug_sync.rs | 2 ++ lightning/src/lib.rs | 2 ++ lightning/src/ln/peer_handler.rs | 6 ++-- lightning/src/sync.rs | 2 ++ lightning/src/util/fairrwlock.rs | 50 ++++++++++++++++++++++++++++++++ lightning/src/util/mod.rs | 2 ++ 6 files changed, 61 insertions(+), 3 deletions(-) create mode 100644 lightning/src/util/fairrwlock.rs diff --git a/lightning/src/debug_sync.rs b/lightning/src/debug_sync.rs index b31ceacea15..6b36682f432 100644 --- a/lightning/src/debug_sync.rs +++ b/lightning/src/debug_sync.rs @@ -362,3 +362,5 @@ fn read_write_lockorder_fail() { let _a = a.write().unwrap(); } } + +pub type FairRwLock = RwLock; diff --git a/lightning/src/lib.rs b/lightning/src/lib.rs index 6d4cc50a920..abdc10c577a 100644 --- a/lightning/src/lib.rs +++ b/lightning/src/lib.rs @@ -159,6 +159,8 @@ mod sync { pub use debug_sync::*; #[cfg(not(test))] pub use ::std::sync::{Arc, Mutex, Condvar, MutexGuard, RwLock, RwLockReadGuard}; + #[cfg(not(test))] + pub use crate::util::fairrwlock::FairRwLock; } #[cfg(not(feature = "std"))] diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index bb7b697e420..0685785db1c 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -33,7 +33,7 @@ use routing::network_graph::{NetworkGraph, NetGraphMsgHandler}; use prelude::*; use io; use alloc::collections::LinkedList; -use sync::{Arc, Mutex, MutexGuard, RwLock}; +use sync::{Arc, Mutex, MutexGuard, FairRwLock}; use core::sync::atomic::{AtomicBool, Ordering}; use core::{cmp, hash, fmt, mem}; use core::ops::Deref; @@ -428,7 +428,7 @@ pub struct PeerManager, - peers: RwLock>, + peers: FairRwLock>, /// Only add to this set when noise completes. /// Locked *after* peers. When an item is removed, it must be removed with the `peers` write /// lock held. Entries may be added with only the `peers` read lock held (though the @@ -570,7 +570,7 @@ impl P PeerManager { message_handler, - peers: RwLock::new(PeerHolder { + peers: FairRwLock::new(PeerHolder { peers: HashMap::new(), }), node_id_to_descriptor: Mutex::new(HashMap::new()), diff --git a/lightning/src/sync.rs b/lightning/src/sync.rs index bde54703653..482759b8ca8 100644 --- a/lightning/src/sync.rs +++ b/lightning/src/sync.rs @@ -113,3 +113,5 @@ impl RwLock { Err(()) } } + +pub type FairRwLock = RwLock; diff --git a/lightning/src/util/fairrwlock.rs b/lightning/src/util/fairrwlock.rs new file mode 100644 index 00000000000..8dd74f2b53d --- /dev/null +++ b/lightning/src/util/fairrwlock.rs @@ -0,0 +1,50 @@ +use std::sync::{TryLockResult, LockResult, RwLock, RwLockReadGuard, RwLockWriteGuard}; +use std::sync::atomic::{AtomicUsize, Ordering}; + +/// Rust libstd's RwLock does not provide any fairness guarantees (and, in fact, when used on +/// Linux with pthreads under the hood, readers trivially and completely starve writers). +/// Because we often hold read locks while doing message processing in multiple threads which +/// can use significant CPU time, with write locks being time-sensitive but relatively small in +/// CPU time, we can end up with starvation completely blocking incoming connections or pings, +/// especially during initial graph sync. +/// +/// Thus, we need to block readers when a writer is pending, which we do with a trivial RwLock +/// wrapper here. Its not particularly optimized, but provides some reasonable fairness by +/// blocking readers (by taking the write lock) if there are writers pending when we go to take +/// a read lock. +pub struct FairRwLock { + lock: RwLock, + waiting_writers: AtomicUsize, +} + +impl FairRwLock { + pub fn new(t: T) -> Self { + Self { lock: RwLock::new(t), waiting_writers: AtomicUsize::new(0) } + } + + // Note that all atomic accesses are relaxed, as we do not rely on the atomics here for any + // ordering at all, instead relying on the underlying RwLock to provide ordering of unrelated + // memory. + pub fn write(&self) -> LockResult> { + self.waiting_writers.fetch_add(1, Ordering::Relaxed); + let res = self.lock.write(); + self.waiting_writers.fetch_sub(1, Ordering::Relaxed); + res + } + + pub fn try_write(&self) -> TryLockResult> { + self.lock.try_write() + } + + pub fn read(&self) -> LockResult> { + if self.waiting_writers.load(Ordering::Relaxed) != 0 { + let _write_queue_lock = self.lock.write(); + } + // Note that we don't consider ensuring that an underlying RwLock allowing writers to + // starve readers doesn't exhibit the same behavior here. I'm not aware of any + // libstd-backing RwLock which exhibits this behavior, and as documented in the + // struct-level documentation, it shouldn't pose a significant issue for our current + // codebase. + self.lock.read() + } +} diff --git a/lightning/src/util/mod.rs b/lightning/src/util/mod.rs index 95826b7e06e..0757983314e 100644 --- a/lightning/src/util/mod.rs +++ b/lightning/src/util/mod.rs @@ -25,6 +25,8 @@ pub mod persist; pub(crate) mod atomic_counter; pub(crate) mod byte_utils; pub(crate) mod chacha20; +#[cfg(feature = "std")] +pub(crate) mod fairrwlock; #[cfg(fuzzing)] pub mod zbase32; #[cfg(not(fuzzing))] From eb17464e78ca3af0c80b5262b2d14fbebff40f10 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 11 Oct 2021 05:03:45 +0000 Subject: [PATCH 09/15] Keep the same read buffer unless the last message was overly large This avoids repeatedly deallocating-allocating a Vec for the peer read buffer after every message/header. --- lightning/src/ln/peer_handler.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 0685785db1c..f771cf4ff2a 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -972,7 +972,7 @@ impl P if peer.pending_read_is_header { let msg_len = try_potential_handleerror!(peer, peer.channel_encryptor.decrypt_length_header(&peer.pending_read_buffer[..])); - peer.pending_read_buffer = Vec::with_capacity(msg_len as usize + 16); + if peer.pending_read_buffer.capacity() > 8192 { peer.pending_read_buffer = Vec::new(); } peer.pending_read_buffer.resize(msg_len as usize + 16, 0); if msg_len < 2 { // Need at least the message type tag return Err(PeerHandleError{ no_connection_possible: false }); @@ -984,7 +984,8 @@ impl P assert!(msg_data.len() >= 2); // Reset read buffer - peer.pending_read_buffer = [0; 18].to_vec(); + if peer.pending_read_buffer.capacity() > 8192 { peer.pending_read_buffer = Vec::new(); } + peer.pending_read_buffer.resize(18, 0); peer.pending_read_is_header = true; let mut reader = io::Cursor::new(&msg_data[..]); From 96fc0f3453388256145f9ff154554aedd69207d2 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 25 Feb 2022 21:57:57 +0000 Subject: [PATCH 10/15] Drop `PeerHolder` as it now only has one field --- lightning/src/ln/peer_handler.rs | 74 ++++++++++++++++---------------- 1 file changed, 36 insertions(+), 38 deletions(-) diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index f771cf4ff2a..08bfa919d27 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -376,14 +376,6 @@ impl Peer { } } -struct PeerHolder { - /// Peer is under its own mutex for sending and receiving bytes, but note that we do *not* hold - /// this mutex while we're processing a message. This is fine as [`PeerManager::read_event`] - /// requires that there be no parallel calls for a given peer, so mutual exclusion of messages - /// handed to the `MessageHandler`s for a given peer is already guaranteed. - peers: HashMap>, -} - /// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g. /// when you're using lightning-net-tokio (since tokio::spawn requires parameters with static /// lifetimes). Other times you can afford a reference, which is more efficient, in which case @@ -428,7 +420,15 @@ pub struct PeerManager, - peers: FairRwLock>, + /// Connection state for each connected peer - we have an outer read-write lock which is taken + /// as read while we're doing processing for a peer and taken write when a peer is being added + /// or removed. + /// + /// The inner Peer lock is held for sending and receiving bytes, but note that we do *not* hold + /// it while we're processing a message. This is fine as [`PeerManager::read_event`] requires + /// that there be no parallel calls for a given peer, so mutual exclusion of messages handed to + /// the `MessageHandler`s for a given peer is already guaranteed. + peers: FairRwLock>>, /// Only add to this set when noise completes. /// Locked *after* peers. When an item is removed, it must be removed with the `peers` write /// lock held. Entries may be added with only the `peers` read lock held (though the @@ -570,9 +570,7 @@ impl P PeerManager { message_handler, - peers: FairRwLock::new(PeerHolder { - peers: HashMap::new(), - }), + peers: FairRwLock::new(HashMap::new()), node_id_to_descriptor: Mutex::new(HashMap::new()), event_processing_lock: Mutex::new(()), blocked_event_processors: AtomicBool::new(false), @@ -591,7 +589,7 @@ impl P /// completed and we are sure the remote peer has the private key for the given node_id. pub fn get_peer_node_ids(&self) -> Vec { let peers = self.peers.read().unwrap(); - peers.peers.values().filter_map(|peer_mutex| { + peers.values().filter_map(|peer_mutex| { let p = peer_mutex.lock().unwrap(); if !p.channel_encryptor.is_ready_for_encryption() || p.their_features.is_none() { return None; @@ -629,7 +627,7 @@ impl P let pending_read_buffer = [0; 50].to_vec(); // Noise act two is 50 bytes let mut peers = self.peers.write().unwrap(); - if peers.peers.insert(descriptor, Mutex::new(Peer { + if peers.insert(descriptor, Mutex::new(Peer { channel_encryptor: peer_encryptor, their_node_id: None, their_features: None, @@ -676,7 +674,7 @@ impl P let pending_read_buffer = [0; 50].to_vec(); // Noise act one is 50 bytes let mut peers = self.peers.write().unwrap(); - if peers.peers.insert(descriptor, Mutex::new(Peer { + if peers.insert(descriptor, Mutex::new(Peer { channel_encryptor: peer_encryptor, their_node_id: None, their_features: None, @@ -787,7 +785,7 @@ impl P /// [`write_buffer_space_avail`]: PeerManager::write_buffer_space_avail pub fn write_buffer_space_avail(&self, descriptor: &mut Descriptor) -> Result<(), PeerHandleError> { let peers = self.peers.read().unwrap(); - match peers.peers.get(descriptor) { + match peers.get(descriptor) { None => { // This is most likely a simple race condition where the user found that the socket // was writeable, then we told the user to `disconnect_socket()`, then they called @@ -852,7 +850,7 @@ impl P let peers = self.peers.read().unwrap(); let mut msgs_to_forward = Vec::new(); let mut peer_node_id = None; - match peers.peers.get(peer_descriptor) { + match peers.get(peer_descriptor) { None => { // This is most likely a simple race condition where the user read some bytes // from the socket, then we told the user to `disconnect_socket()`, then they @@ -1288,13 +1286,13 @@ impl P Ok(should_forward) } - fn forward_broadcast_msg(&self, peers: &PeerHolder, msg: &wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) { + fn forward_broadcast_msg(&self, peers: &HashMap>, msg: &wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) { match msg { wire::Message::ChannelAnnouncement(ref msg) => { log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}", except_node, msg); let encoded_msg = encode_msg!(msg); - for (_, peer_mutex) in peers.peers.iter() { + for (_, peer_mutex) in peers.iter() { let mut peer = peer_mutex.lock().unwrap(); if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { @@ -1320,7 +1318,7 @@ impl P log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced node: {:?}", except_node, msg); let encoded_msg = encode_msg!(msg); - for (_, peer_mutex) in peers.peers.iter() { + for (_, peer_mutex) in peers.iter() { let mut peer = peer_mutex.lock().unwrap(); if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || !peer.should_forward_node_announcement(msg.contents.node_id) { @@ -1345,7 +1343,7 @@ impl P log_gossip!(self.logger, "Sending message to all peers except {:?}: {:?}", except_node, msg); let encoded_msg = encode_msg!(msg); - for (_, peer_mutex) in peers.peers.iter() { + for (_, peer_mutex) in peers.iter() { let mut peer = peer_mutex.lock().unwrap(); if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { @@ -1426,7 +1424,7 @@ impl P } let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned(); match descriptor_opt { - Some(descriptor) => match peers.peers.get(&descriptor) { + Some(descriptor) => match peers.get(&descriptor) { Some(peer_mutex) => { let peer_lock = peer_mutex.lock().unwrap(); if peer_lock.their_features.is_none() { @@ -1625,7 +1623,7 @@ impl P self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg); } - for (descriptor, peer_mutex) in peers.peers.iter() { + for (descriptor, peer_mutex) in peers.iter() { self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer_mutex.lock().unwrap()); } } @@ -1639,7 +1637,7 @@ impl P // lock). if let Some(mut descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(&node_id) { - if let Some(peer_mutex) = peers.peers.remove(&descriptor) { + if let Some(peer_mutex) = peers.remove(&descriptor) { if let Some(msg) = msg { log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", log_pubkey!(node_id), @@ -1667,7 +1665,7 @@ impl P fn disconnect_event_internal(&self, descriptor: &Descriptor, no_connection_possible: bool) { let mut peers = self.peers.write().unwrap(); - let peer_option = peers.peers.remove(descriptor); + let peer_option = peers.remove(descriptor); match peer_option { None => { // This is most likely a simple race condition where the user found that the socket @@ -1703,7 +1701,7 @@ impl P let mut peers_lock = self.peers.write().unwrap(); if let Some(mut descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(&node_id) { log_trace!(self.logger, "Disconnecting peer with id {} due to client request", node_id); - peers_lock.peers.remove(&descriptor); + peers_lock.remove(&descriptor); self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible); descriptor.disconnect_socket(); } @@ -1716,7 +1714,7 @@ impl P let mut peers_lock = self.peers.write().unwrap(); self.node_id_to_descriptor.lock().unwrap().clear(); let peers = &mut *peers_lock; - for (mut descriptor, peer) in peers.peers.drain() { + for (mut descriptor, peer) in peers.drain() { if let Some(node_id) = peer.lock().unwrap().their_node_id { log_trace!(self.logger, "Disconnecting peer with id {} due to client request to disconnect all peers", node_id); self.message_handler.chan_handler.peer_disconnected(&node_id, false); @@ -1755,7 +1753,7 @@ impl P { let peers_lock = self.peers.read().unwrap(); - for (descriptor, peer_mutex) in peers_lock.peers.iter() { + for (descriptor, peer_mutex) in peers_lock.iter() { let mut peer = peer_mutex.lock().unwrap(); if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_node_id.is_none() { // The peer needs to complete its handshake before we can exchange messages. We @@ -1779,7 +1777,7 @@ impl P if (peer.awaiting_pong_timer_tick_intervals > 0 && !peer.received_message_since_timer_tick) || peer.awaiting_pong_timer_tick_intervals as u64 > - MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.peers.len() as u64 + MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.len() as u64 { descriptors_needing_disconnect.push(descriptor.clone()); continue; @@ -1805,7 +1803,7 @@ impl P { let mut peers_lock = self.peers.write().unwrap(); for descriptor in descriptors_needing_disconnect.iter() { - if let Some(peer) = peers_lock.peers.remove(&descriptor) { + if let Some(peer) = peers_lock.remove(descriptor) { if let Some(node_id) = peer.lock().unwrap().their_node_id { log_trace!(self.logger, "Disconnecting peer with id {} due to ping timeout", node_id); self.node_id_to_descriptor.lock().unwrap().remove(&node_id); @@ -1935,7 +1933,7 @@ mod tests { let chan_handler = test_utils::TestChannelMessageHandler::new(); let mut peers = create_network(2, &cfgs); establish_connection(&peers[0], &peers[1]); - assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1); + assert_eq!(peers[0].peers.read().unwrap().len(), 1); let secp_ctx = Secp256k1::new(); let their_id = PublicKey::from_secret_key(&secp_ctx, &peers[1].our_node_secret); @@ -1948,7 +1946,7 @@ mod tests { peers[0].message_handler.chan_handler = &chan_handler; peers[0].process_events(); - assert_eq!(peers[0].peers.read().unwrap().peers.len(), 0); + assert_eq!(peers[0].peers.read().unwrap().len(), 0); } #[test] @@ -1957,17 +1955,17 @@ mod tests { let cfgs = create_peermgr_cfgs(2); let peers = create_network(2, &cfgs); establish_connection(&peers[0], &peers[1]); - assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1); + assert_eq!(peers[0].peers.read().unwrap().len(), 1); // peers[0] awaiting_pong is set to true, but the Peer is still connected peers[0].timer_tick_occurred(); peers[0].process_events(); - assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1); + assert_eq!(peers[0].peers.read().unwrap().len(), 1); // Since timer_tick_occurred() is called again when awaiting_pong is true, all Peers are disconnected peers[0].timer_tick_occurred(); peers[0].process_events(); - assert_eq!(peers[0].peers.read().unwrap().peers.len(), 0); + assert_eq!(peers[0].peers.read().unwrap().len(), 0); } #[test] @@ -2029,9 +2027,9 @@ mod tests { peers[0].new_inbound_connection(fd_a.clone(), None).unwrap(); // If we get a single timer tick before completion, that's fine - assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1); + assert_eq!(peers[0].peers.read().unwrap().len(), 1); peers[0].timer_tick_occurred(); - assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1); + assert_eq!(peers[0].peers.read().unwrap().len(), 1); assert_eq!(peers[0].read_event(&mut fd_a, &initial_data).unwrap(), false); peers[0].process_events(); @@ -2040,7 +2038,7 @@ mod tests { // ...but if we get a second timer tick, we should disconnect the peer peers[0].timer_tick_occurred(); - assert_eq!(peers[0].peers.read().unwrap().peers.len(), 0); + assert_eq!(peers[0].peers.read().unwrap().len(), 0); assert!(peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).is_err()); } From b222be233b0267ac2012a09e0d1c8ffee9cd2982 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 22 Mar 2022 21:03:41 +0000 Subject: [PATCH 11/15] [net-tokio] Explicitly yield after processing messages from a peer This reduces instances of disconnect peers after single timer intervals somewhat, at least on Tokio 1.14. --- lightning-net-tokio/src/lib.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index fc7b260e41b..3cfed870b31 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -194,6 +194,13 @@ impl Connection { }, } let _ = event_waker.try_send(()); + + // At this point we've processed a message or two, and reset the ping timer for this + // peer, at least in the "are we still receiving messages" context, if we don't give up + // our timeslice to another task we may just spin on this peer, starving other peers + // and eventually disconnecting them for ping timeouts. Instead, we explicitly yield + // here. + tokio::task::yield_now().await; }; let writer_option = us.lock().unwrap().writer.take(); if let Some(mut writer) = writer_option { From 101bcd8da5af5d9df294dc92b694d731c262020c Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 11 Apr 2022 17:34:11 +0000 Subject: [PATCH 12/15] Drop a needless match in favor of an `if let` --- lightning/src/ln/peer_handler.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 08bfa919d27..d4e6b4645b2 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -1674,15 +1674,12 @@ impl P }, Some(peer_lock) => { let peer = peer_lock.lock().unwrap(); - match peer.their_node_id { - Some(node_id) => { - log_trace!(self.logger, - "Handling disconnection of peer {}, with {}future connection to the peer possible.", - log_pubkey!(node_id), if no_connection_possible { "no " } else { "" }); - self.node_id_to_descriptor.lock().unwrap().remove(&node_id); - self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible); - }, - None => {} + if let Some(node_id) = peer.their_node_id { + log_trace!(self.logger, + "Handling disconnection of peer {}, with {}future connection to the peer possible.", + log_pubkey!(node_id), if no_connection_possible { "no " } else { "" }); + self.node_id_to_descriptor.lock().unwrap().remove(&node_id); + self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible); } } }; From 45c1411b161562e9a4db7feda07646d72df7bafc Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 12 Apr 2022 19:05:15 +0000 Subject: [PATCH 13/15] Require `PartialEq` for `wire::Message` in `cfg(test)` ...and implement wire::Type for `()` for `feature = "_test_utils"`. --- lightning/src/ln/wire.rs | 44 ++++++++++++++++++++++++++++++++-------- 1 file changed, 35 insertions(+), 9 deletions(-) diff --git a/lightning/src/ln/wire.rs b/lightning/src/ln/wire.rs index e7db446a86f..8fd5c16f362 100644 --- a/lightning/src/ln/wire.rs +++ b/lightning/src/ln/wire.rs @@ -28,11 +28,26 @@ pub trait CustomMessageReader { fn read(&self, message_type: u16, buffer: &mut R) -> Result, msgs::DecodeError>; } +// TestEq is a dummy trait which requires PartialEq when built in testing, and otherwise is +// blanket-implemented for all types. + +#[cfg(test)] +pub trait TestEq : PartialEq {} +#[cfg(test)] +impl TestEq for T {} + +#[cfg(not(test))] +pub(crate) trait TestEq {} +#[cfg(not(test))] +impl TestEq for T {} + + /// A Lightning message returned by [`read()`] when decoding bytes received over the wire. Each /// variant contains a message from [`msgs`] or otherwise the message type if unknown. #[allow(missing_docs)] #[derive(Debug)] -pub(crate) enum Message where T: core::fmt::Debug + Type { +#[cfg_attr(test, derive(PartialEq))] +pub(crate) enum Message where T: core::fmt::Debug + Type + TestEq { Init(msgs::Init), Error(msgs::ErrorMessage), Warning(msgs::WarningMessage), @@ -69,7 +84,7 @@ pub(crate) enum Message where T: core::fmt::Debug + Type { Custom(T), } -impl Message where T: core::fmt::Debug + Type { +impl Message where T: core::fmt::Debug + Type + TestEq { /// Returns the type that was used to decode the message payload. pub fn type_id(&self) -> u16 { match self { @@ -252,6 +267,7 @@ mod encode { pub(crate) use self::encode::Encode; +#[cfg(not(test))] /// Defines a type identifier for sending messages over the wire. /// /// Messages implementing this trait specify a type and must be [`Writeable`]. @@ -260,10 +276,24 @@ pub trait Type: core::fmt::Debug + Writeable { fn type_id(&self) -> u16; } +#[cfg(test)] +pub trait Type: core::fmt::Debug + Writeable + PartialEq { + fn type_id(&self) -> u16; +} + +#[cfg(any(feature = "_test_utils", fuzzing, test))] +impl Type for () { + fn type_id(&self) -> u16 { unreachable!(); } +} + +#[cfg(test)] +impl Type for T where T: Encode { + fn type_id(&self) -> u16 { T::TYPE } +} + +#[cfg(not(test))] impl Type for T where T: Encode { - fn type_id(&self) -> u16 { - T::TYPE - } + fn type_id(&self) -> u16 { T::TYPE } } impl Encode for msgs::Init { @@ -471,10 +501,6 @@ mod tests { } } - impl Type for () { - fn type_id(&self) -> u16 { unreachable!(); } - } - #[test] fn is_even_message_type() { let message = Message::<()>::Unknown(42); From cc7f859c01570c300bb53b1ebf924d6c6c307141 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 12 Apr 2022 19:16:38 +0000 Subject: [PATCH 14/15] Add support for testing recvd messages in TestChannelMessageHandler --- lightning/src/util/test_utils.rs | 109 +++++++++++++++++++++++++------ 1 file changed, 89 insertions(+), 20 deletions(-) diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index f68724309c0..0aa39e8a979 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -18,7 +18,7 @@ use chain::channelmonitor::MonitorEvent; use chain::transaction::OutPoint; use chain::keysinterface; use ln::features::{ChannelFeatures, InitFeatures}; -use ln::msgs; +use ln::{msgs, wire}; use ln::msgs::OptionalField; use ln::script::ShutdownScript; use routing::scoring::FixedPenaltyScorer; @@ -249,37 +249,106 @@ impl chaininterface::BroadcasterInterface for TestBroadcaster { pub struct TestChannelMessageHandler { pub pending_events: Mutex>, + expected_recv_msgs: Mutex>>>, } impl TestChannelMessageHandler { pub fn new() -> Self { TestChannelMessageHandler { pending_events: Mutex::new(Vec::new()), + expected_recv_msgs: Mutex::new(None), + } + } + + #[cfg(test)] + pub(crate) fn expect_receive_msg(&self, ev: wire::Message<()>) { + let mut expected_msgs = self.expected_recv_msgs.lock().unwrap(); + if expected_msgs.is_none() { *expected_msgs = Some(Vec::new()); } + expected_msgs.as_mut().unwrap().push(ev); + } + + fn received_msg(&self, ev: wire::Message<()>) { + let mut msgs = self.expected_recv_msgs.lock().unwrap(); + if msgs.is_none() { return; } + assert!(!msgs.as_ref().unwrap().is_empty(), "Received message when we weren't expecting one"); + #[cfg(test)] + assert_eq!(msgs.as_ref().unwrap()[0], ev); + msgs.as_mut().unwrap().remove(0); + } +} + +impl Drop for TestChannelMessageHandler { + fn drop(&mut self) { + let l = self.expected_recv_msgs.lock().unwrap(); + #[cfg(feature = "std")] + { + if !std::thread::panicking() { + assert!(l.is_none() || l.as_ref().unwrap().is_empty()); + } } } } impl msgs::ChannelMessageHandler for TestChannelMessageHandler { - fn handle_open_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &msgs::OpenChannel) {} - fn handle_accept_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &msgs::AcceptChannel) {} - fn handle_funding_created(&self, _their_node_id: &PublicKey, _msg: &msgs::FundingCreated) {} - fn handle_funding_signed(&self, _their_node_id: &PublicKey, _msg: &msgs::FundingSigned) {} - fn handle_funding_locked(&self, _their_node_id: &PublicKey, _msg: &msgs::FundingLocked) {} - fn handle_shutdown(&self, _their_node_id: &PublicKey, _their_features: &InitFeatures, _msg: &msgs::Shutdown) {} - fn handle_closing_signed(&self, _their_node_id: &PublicKey, _msg: &msgs::ClosingSigned) {} - fn handle_update_add_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateAddHTLC) {} - fn handle_update_fulfill_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateFulfillHTLC) {} - fn handle_update_fail_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateFailHTLC) {} - fn handle_update_fail_malformed_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateFailMalformedHTLC) {} - fn handle_commitment_signed(&self, _their_node_id: &PublicKey, _msg: &msgs::CommitmentSigned) {} - fn handle_revoke_and_ack(&self, _their_node_id: &PublicKey, _msg: &msgs::RevokeAndACK) {} - fn handle_update_fee(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateFee) {} - fn handle_channel_update(&self, _their_node_id: &PublicKey, _msg: &msgs::ChannelUpdate) {} - fn handle_announcement_signatures(&self, _their_node_id: &PublicKey, _msg: &msgs::AnnouncementSignatures) {} - fn handle_channel_reestablish(&self, _their_node_id: &PublicKey, _msg: &msgs::ChannelReestablish) {} + fn handle_open_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, msg: &msgs::OpenChannel) { + self.received_msg(wire::Message::OpenChannel(msg.clone())); + } + fn handle_accept_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, msg: &msgs::AcceptChannel) { + self.received_msg(wire::Message::AcceptChannel(msg.clone())); + } + fn handle_funding_created(&self, _their_node_id: &PublicKey, msg: &msgs::FundingCreated) { + self.received_msg(wire::Message::FundingCreated(msg.clone())); + } + fn handle_funding_signed(&self, _their_node_id: &PublicKey, msg: &msgs::FundingSigned) { + self.received_msg(wire::Message::FundingSigned(msg.clone())); + } + fn handle_funding_locked(&self, _their_node_id: &PublicKey, msg: &msgs::FundingLocked) { + self.received_msg(wire::Message::FundingLocked(msg.clone())); + } + fn handle_shutdown(&self, _their_node_id: &PublicKey, _their_features: &InitFeatures, msg: &msgs::Shutdown) { + self.received_msg(wire::Message::Shutdown(msg.clone())); + } + fn handle_closing_signed(&self, _their_node_id: &PublicKey, msg: &msgs::ClosingSigned) { + self.received_msg(wire::Message::ClosingSigned(msg.clone())); + } + fn handle_update_add_htlc(&self, _their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) { + self.received_msg(wire::Message::UpdateAddHTLC(msg.clone())); + } + fn handle_update_fulfill_htlc(&self, _their_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) { + self.received_msg(wire::Message::UpdateFulfillHTLC(msg.clone())); + } + fn handle_update_fail_htlc(&self, _their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) { + self.received_msg(wire::Message::UpdateFailHTLC(msg.clone())); + } + fn handle_update_fail_malformed_htlc(&self, _their_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) { + self.received_msg(wire::Message::UpdateFailMalformedHTLC(msg.clone())); + } + fn handle_commitment_signed(&self, _their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) { + self.received_msg(wire::Message::CommitmentSigned(msg.clone())); + } + fn handle_revoke_and_ack(&self, _their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) { + self.received_msg(wire::Message::RevokeAndACK(msg.clone())); + } + fn handle_update_fee(&self, _their_node_id: &PublicKey, msg: &msgs::UpdateFee) { + self.received_msg(wire::Message::UpdateFee(msg.clone())); + } + fn handle_channel_update(&self, _their_node_id: &PublicKey, _msg: &msgs::ChannelUpdate) { + // Don't call `received_msg` here as `TestRoutingMessageHandler` generates these sometimes + } + fn handle_announcement_signatures(&self, _their_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) { + self.received_msg(wire::Message::AnnouncementSignatures(msg.clone())); + } + fn handle_channel_reestablish(&self, _their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) { + self.received_msg(wire::Message::ChannelReestablish(msg.clone())); + } fn peer_disconnected(&self, _their_node_id: &PublicKey, _no_connection_possible: bool) {} - fn peer_connected(&self, _their_node_id: &PublicKey, _msg: &msgs::Init) {} - fn handle_error(&self, _their_node_id: &PublicKey, _msg: &msgs::ErrorMessage) {} + fn peer_connected(&self, _their_node_id: &PublicKey, _msg: &msgs::Init) { + // Don't bother with `received_msg` for Init as its auto-generated and we don't want to + // bother re-generating the expected Init message in all tests. + } + fn handle_error(&self, _their_node_id: &PublicKey, msg: &msgs::ErrorMessage) { + self.received_msg(wire::Message::Error(msg.clone())); + } } impl events::MessageSendEventsProvider for TestChannelMessageHandler { From 46009a5f83e539fc103ea164a3c765a7f014f09a Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 12 Apr 2022 19:19:00 +0000 Subject: [PATCH 15/15] Add a few more simple tests of the PeerHandler These increase coverage and caught previous lockorder inversions. --- lightning/src/ln/peer_handler.rs | 44 +++++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index d4e6b4645b2..be3b3aeda36 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -1833,7 +1833,7 @@ fn is_gossip_msg(type_id: u16) -> bool { #[cfg(test)] mod tests { use ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses}; - use ln::msgs; + use ln::{msgs, wire}; use ln::msgs::NetAddress; use util::events; use util::test_utils; @@ -1946,6 +1946,48 @@ mod tests { assert_eq!(peers[0].peers.read().unwrap().len(), 0); } + #[test] + fn test_send_simple_msg() { + // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and + // push a message from one peer to another. + let cfgs = create_peermgr_cfgs(2); + let a_chan_handler = test_utils::TestChannelMessageHandler::new(); + let b_chan_handler = test_utils::TestChannelMessageHandler::new(); + let mut peers = create_network(2, &cfgs); + let (fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]); + assert_eq!(peers[0].peers.read().unwrap().len(), 1); + + let secp_ctx = Secp256k1::new(); + let their_id = PublicKey::from_secret_key(&secp_ctx, &peers[1].our_node_secret); + + let msg = msgs::Shutdown { channel_id: [42; 32], scriptpubkey: bitcoin::Script::new() }; + a_chan_handler.pending_events.lock().unwrap().push(events::MessageSendEvent::SendShutdown { + node_id: their_id, msg: msg.clone() + }); + peers[0].message_handler.chan_handler = &a_chan_handler; + + b_chan_handler.expect_receive_msg(wire::Message::Shutdown(msg)); + peers[1].message_handler.chan_handler = &b_chan_handler; + + peers[0].process_events(); + + let a_data = fd_a.outbound_data.lock().unwrap().split_off(0); + assert_eq!(peers[1].read_event(&mut fd_b, &a_data).unwrap(), false); + } + + #[test] + fn test_disconnect_all_peer() { + // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and + // then calls disconnect_all_peers + let cfgs = create_peermgr_cfgs(2); + let peers = create_network(2, &cfgs); + establish_connection(&peers[0], &peers[1]); + assert_eq!(peers[0].peers.read().unwrap().len(), 1); + + peers[0].disconnect_all_peers(); + assert_eq!(peers[0].peers.read().unwrap().len(), 0); + } + #[test] fn test_timer_tick_occurred() { // Create peers, a vector of two peer managers, perform initial set up and check that peers[0] has one Peer.