Skip to content

Commit

Permalink
Merge pull request #1023 from TheBlueMatt/2021-07-par-gossip-processing
Browse files Browse the repository at this point in the history
  • Loading branch information
TheBlueMatt authored May 11, 2022
2 parents 3cae233 + 46009a5 commit b5a6307
Show file tree
Hide file tree
Showing 9 changed files with 747 additions and 429 deletions.
37 changes: 32 additions & 5 deletions lightning-net-tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,28 @@ struct Connection {
id: u64,
}
impl Connection {
async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, mut event_receiver: mpsc::Receiver<()>) where
CMH: ChannelMessageHandler + 'static + Send + Sync,
RMH: RoutingMessageHandler + 'static + Send + Sync,
L: Logger + 'static + ?Sized + Send + Sync,
UMH: CustomMessageHandler + 'static + Send + Sync {
loop {
if event_receiver.recv().await.is_none() {
return;
}
peer_manager.process_events();
}
}

async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
CMH: ChannelMessageHandler + 'static,
RMH: RoutingMessageHandler + 'static,
L: Logger + 'static + ?Sized,
UMH: CustomMessageHandler + 'static {
CMH: ChannelMessageHandler + 'static + Send + Sync,
RMH: RoutingMessageHandler + 'static + Send + Sync,
L: Logger + 'static + ?Sized + Send + Sync,
UMH: CustomMessageHandler + 'static + Send + Sync {
// Create a waker to wake up poll_event_process, above
let (event_waker, event_receiver) = mpsc::channel(1);
tokio::spawn(Self::poll_event_process(Arc::clone(&peer_manager), event_receiver));

// 8KB is nice and big but also should never cause any issues with stack overflowing.
let mut buf = [0; 8192];

Expand Down Expand Up @@ -175,7 +192,14 @@ impl Connection {
Err(_) => break Disconnect::PeerDisconnected,
},
}
peer_manager.process_events();
let _ = event_waker.try_send(());

// At this point we've processed a message or two, and reset the ping timer for this
// peer, at least in the "are we still receiving messages" context, if we don't give up
// our timeslice to another task we may just spin on this peer, starving other peers
// and eventually disconnecting them for ping timeouts. Instead, we explicitly yield
// here.
tokio::task::yield_now().await;
};
let writer_option = us.lock().unwrap().writer.take();
if let Some(mut writer) = writer_option {
Expand Down Expand Up @@ -443,6 +467,9 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
// pause read given we're now waiting on the remote end to ACK (and in
// accordance with the send_data() docs).
us.read_paused = true;
// Further, to avoid any current pending read causing a `read_event` call, wake
// up the read_waker and restart its loop.
let _ = us.read_waker.try_send(());
return written_len;
},
}
Expand Down
2 changes: 2 additions & 0 deletions lightning/src/debug_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,3 +362,5 @@ fn read_write_lockorder_fail() {
let _a = a.write().unwrap();
}
}

pub type FairRwLock<T> = RwLock<T>;
2 changes: 2 additions & 0 deletions lightning/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ mod sync {
pub use debug_sync::*;
#[cfg(not(test))]
pub use ::std::sync::{Arc, Mutex, Condvar, MutexGuard, RwLock, RwLockReadGuard};
#[cfg(not(test))]
pub use crate::util::fairrwlock::FairRwLock;
}

#[cfg(not(feature = "std"))]
Expand Down
928 changes: 533 additions & 395 deletions lightning/src/ln/peer_handler.rs

Large diffs are not rendered by default.

44 changes: 35 additions & 9 deletions lightning/src/ln/wire.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,26 @@ pub trait CustomMessageReader {
fn read<R: io::Read>(&self, message_type: u16, buffer: &mut R) -> Result<Option<Self::CustomMessage>, msgs::DecodeError>;
}

// TestEq is a dummy trait which requires PartialEq when built in testing, and otherwise is
// blanket-implemented for all types.

#[cfg(test)]
pub trait TestEq : PartialEq {}
#[cfg(test)]
impl<T: PartialEq> TestEq for T {}

#[cfg(not(test))]
pub(crate) trait TestEq {}
#[cfg(not(test))]
impl<T> TestEq for T {}


/// A Lightning message returned by [`read()`] when decoding bytes received over the wire. Each
/// variant contains a message from [`msgs`] or otherwise the message type if unknown.
#[allow(missing_docs)]
#[derive(Debug)]
pub(crate) enum Message<T> where T: core::fmt::Debug + Type {
#[cfg_attr(test, derive(PartialEq))]
pub(crate) enum Message<T> where T: core::fmt::Debug + Type + TestEq {
Init(msgs::Init),
Error(msgs::ErrorMessage),
Warning(msgs::WarningMessage),
Expand Down Expand Up @@ -69,7 +84,7 @@ pub(crate) enum Message<T> where T: core::fmt::Debug + Type {
Custom(T),
}

impl<T> Message<T> where T: core::fmt::Debug + Type {
impl<T> Message<T> where T: core::fmt::Debug + Type + TestEq {
/// Returns the type that was used to decode the message payload.
pub fn type_id(&self) -> u16 {
match self {
Expand Down Expand Up @@ -252,6 +267,7 @@ mod encode {

pub(crate) use self::encode::Encode;

#[cfg(not(test))]
/// Defines a type identifier for sending messages over the wire.
///
/// Messages implementing this trait specify a type and must be [`Writeable`].
Expand All @@ -260,10 +276,24 @@ pub trait Type: core::fmt::Debug + Writeable {
fn type_id(&self) -> u16;
}

#[cfg(test)]
pub trait Type: core::fmt::Debug + Writeable + PartialEq {
fn type_id(&self) -> u16;
}

#[cfg(any(feature = "_test_utils", fuzzing, test))]
impl Type for () {
fn type_id(&self) -> u16 { unreachable!(); }
}

#[cfg(test)]
impl<T: core::fmt::Debug + Writeable + PartialEq> Type for T where T: Encode {
fn type_id(&self) -> u16 { T::TYPE }
}

#[cfg(not(test))]
impl<T: core::fmt::Debug + Writeable> Type for T where T: Encode {
fn type_id(&self) -> u16 {
T::TYPE
}
fn type_id(&self) -> u16 { T::TYPE }
}

impl Encode for msgs::Init {
Expand Down Expand Up @@ -471,10 +501,6 @@ mod tests {
}
}

impl Type for () {
fn type_id(&self) -> u16 { unreachable!(); }
}

#[test]
fn is_even_message_type() {
let message = Message::<()>::Unknown(42);
Expand Down
2 changes: 2 additions & 0 deletions lightning/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,5 @@ impl<T> RwLock<T> {
Err(())
}
}

pub type FairRwLock<T> = RwLock<T>;
50 changes: 50 additions & 0 deletions lightning/src/util/fairrwlock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use std::sync::{TryLockResult, LockResult, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::sync::atomic::{AtomicUsize, Ordering};

/// Rust libstd's RwLock does not provide any fairness guarantees (and, in fact, when used on
/// Linux with pthreads under the hood, readers trivially and completely starve writers).
/// Because we often hold read locks while doing message processing in multiple threads which
/// can use significant CPU time, with write locks being time-sensitive but relatively small in
/// CPU time, we can end up with starvation completely blocking incoming connections or pings,
/// especially during initial graph sync.
///
/// Thus, we need to block readers when a writer is pending, which we do with a trivial RwLock
/// wrapper here. Its not particularly optimized, but provides some reasonable fairness by
/// blocking readers (by taking the write lock) if there are writers pending when we go to take
/// a read lock.
pub struct FairRwLock<T> {
lock: RwLock<T>,
waiting_writers: AtomicUsize,
}

impl<T> FairRwLock<T> {
pub fn new(t: T) -> Self {
Self { lock: RwLock::new(t), waiting_writers: AtomicUsize::new(0) }
}

// Note that all atomic accesses are relaxed, as we do not rely on the atomics here for any
// ordering at all, instead relying on the underlying RwLock to provide ordering of unrelated
// memory.
pub fn write(&self) -> LockResult<RwLockWriteGuard<T>> {
self.waiting_writers.fetch_add(1, Ordering::Relaxed);
let res = self.lock.write();
self.waiting_writers.fetch_sub(1, Ordering::Relaxed);
res
}

pub fn try_write(&self) -> TryLockResult<RwLockWriteGuard<T>> {
self.lock.try_write()
}

pub fn read(&self) -> LockResult<RwLockReadGuard<T>> {
if self.waiting_writers.load(Ordering::Relaxed) != 0 {
let _write_queue_lock = self.lock.write();
}
// Note that we don't consider ensuring that an underlying RwLock allowing writers to
// starve readers doesn't exhibit the same behavior here. I'm not aware of any
// libstd-backing RwLock which exhibits this behavior, and as documented in the
// struct-level documentation, it shouldn't pose a significant issue for our current
// codebase.
self.lock.read()
}
}
2 changes: 2 additions & 0 deletions lightning/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub mod persist;
pub(crate) mod atomic_counter;
pub(crate) mod byte_utils;
pub(crate) mod chacha20;
#[cfg(feature = "std")]
pub(crate) mod fairrwlock;
#[cfg(fuzzing)]
pub mod zbase32;
#[cfg(not(fuzzing))]
Expand Down
109 changes: 89 additions & 20 deletions lightning/src/util/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use chain::channelmonitor::MonitorEvent;
use chain::transaction::OutPoint;
use chain::keysinterface;
use ln::features::{ChannelFeatures, InitFeatures};
use ln::msgs;
use ln::{msgs, wire};
use ln::msgs::OptionalField;
use ln::script::ShutdownScript;
use routing::scoring::FixedPenaltyScorer;
Expand Down Expand Up @@ -249,37 +249,106 @@ impl chaininterface::BroadcasterInterface for TestBroadcaster {

pub struct TestChannelMessageHandler {
pub pending_events: Mutex<Vec<events::MessageSendEvent>>,
expected_recv_msgs: Mutex<Option<Vec<wire::Message<()>>>>,
}

impl TestChannelMessageHandler {
pub fn new() -> Self {
TestChannelMessageHandler {
pending_events: Mutex::new(Vec::new()),
expected_recv_msgs: Mutex::new(None),
}
}

#[cfg(test)]
pub(crate) fn expect_receive_msg(&self, ev: wire::Message<()>) {
let mut expected_msgs = self.expected_recv_msgs.lock().unwrap();
if expected_msgs.is_none() { *expected_msgs = Some(Vec::new()); }
expected_msgs.as_mut().unwrap().push(ev);
}

fn received_msg(&self, ev: wire::Message<()>) {
let mut msgs = self.expected_recv_msgs.lock().unwrap();
if msgs.is_none() { return; }
assert!(!msgs.as_ref().unwrap().is_empty(), "Received message when we weren't expecting one");
#[cfg(test)]
assert_eq!(msgs.as_ref().unwrap()[0], ev);
msgs.as_mut().unwrap().remove(0);
}
}

impl Drop for TestChannelMessageHandler {
fn drop(&mut self) {
let l = self.expected_recv_msgs.lock().unwrap();
#[cfg(feature = "std")]
{
if !std::thread::panicking() {
assert!(l.is_none() || l.as_ref().unwrap().is_empty());
}
}
}
}

impl msgs::ChannelMessageHandler for TestChannelMessageHandler {
fn handle_open_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &msgs::OpenChannel) {}
fn handle_accept_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &msgs::AcceptChannel) {}
fn handle_funding_created(&self, _their_node_id: &PublicKey, _msg: &msgs::FundingCreated) {}
fn handle_funding_signed(&self, _their_node_id: &PublicKey, _msg: &msgs::FundingSigned) {}
fn handle_funding_locked(&self, _their_node_id: &PublicKey, _msg: &msgs::FundingLocked) {}
fn handle_shutdown(&self, _their_node_id: &PublicKey, _their_features: &InitFeatures, _msg: &msgs::Shutdown) {}
fn handle_closing_signed(&self, _their_node_id: &PublicKey, _msg: &msgs::ClosingSigned) {}
fn handle_update_add_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateAddHTLC) {}
fn handle_update_fulfill_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateFulfillHTLC) {}
fn handle_update_fail_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateFailHTLC) {}
fn handle_update_fail_malformed_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateFailMalformedHTLC) {}
fn handle_commitment_signed(&self, _their_node_id: &PublicKey, _msg: &msgs::CommitmentSigned) {}
fn handle_revoke_and_ack(&self, _their_node_id: &PublicKey, _msg: &msgs::RevokeAndACK) {}
fn handle_update_fee(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateFee) {}
fn handle_channel_update(&self, _their_node_id: &PublicKey, _msg: &msgs::ChannelUpdate) {}
fn handle_announcement_signatures(&self, _their_node_id: &PublicKey, _msg: &msgs::AnnouncementSignatures) {}
fn handle_channel_reestablish(&self, _their_node_id: &PublicKey, _msg: &msgs::ChannelReestablish) {}
fn handle_open_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, msg: &msgs::OpenChannel) {
self.received_msg(wire::Message::OpenChannel(msg.clone()));
}
fn handle_accept_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, msg: &msgs::AcceptChannel) {
self.received_msg(wire::Message::AcceptChannel(msg.clone()));
}
fn handle_funding_created(&self, _their_node_id: &PublicKey, msg: &msgs::FundingCreated) {
self.received_msg(wire::Message::FundingCreated(msg.clone()));
}
fn handle_funding_signed(&self, _their_node_id: &PublicKey, msg: &msgs::FundingSigned) {
self.received_msg(wire::Message::FundingSigned(msg.clone()));
}
fn handle_funding_locked(&self, _their_node_id: &PublicKey, msg: &msgs::FundingLocked) {
self.received_msg(wire::Message::FundingLocked(msg.clone()));
}
fn handle_shutdown(&self, _their_node_id: &PublicKey, _their_features: &InitFeatures, msg: &msgs::Shutdown) {
self.received_msg(wire::Message::Shutdown(msg.clone()));
}
fn handle_closing_signed(&self, _their_node_id: &PublicKey, msg: &msgs::ClosingSigned) {
self.received_msg(wire::Message::ClosingSigned(msg.clone()));
}
fn handle_update_add_htlc(&self, _their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) {
self.received_msg(wire::Message::UpdateAddHTLC(msg.clone()));
}
fn handle_update_fulfill_htlc(&self, _their_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) {
self.received_msg(wire::Message::UpdateFulfillHTLC(msg.clone()));
}
fn handle_update_fail_htlc(&self, _their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) {
self.received_msg(wire::Message::UpdateFailHTLC(msg.clone()));
}
fn handle_update_fail_malformed_htlc(&self, _their_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) {
self.received_msg(wire::Message::UpdateFailMalformedHTLC(msg.clone()));
}
fn handle_commitment_signed(&self, _their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) {
self.received_msg(wire::Message::CommitmentSigned(msg.clone()));
}
fn handle_revoke_and_ack(&self, _their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) {
self.received_msg(wire::Message::RevokeAndACK(msg.clone()));
}
fn handle_update_fee(&self, _their_node_id: &PublicKey, msg: &msgs::UpdateFee) {
self.received_msg(wire::Message::UpdateFee(msg.clone()));
}
fn handle_channel_update(&self, _their_node_id: &PublicKey, _msg: &msgs::ChannelUpdate) {
// Don't call `received_msg` here as `TestRoutingMessageHandler` generates these sometimes
}
fn handle_announcement_signatures(&self, _their_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) {
self.received_msg(wire::Message::AnnouncementSignatures(msg.clone()));
}
fn handle_channel_reestablish(&self, _their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) {
self.received_msg(wire::Message::ChannelReestablish(msg.clone()));
}
fn peer_disconnected(&self, _their_node_id: &PublicKey, _no_connection_possible: bool) {}
fn peer_connected(&self, _their_node_id: &PublicKey, _msg: &msgs::Init) {}
fn handle_error(&self, _their_node_id: &PublicKey, _msg: &msgs::ErrorMessage) {}
fn peer_connected(&self, _their_node_id: &PublicKey, _msg: &msgs::Init) {
// Don't bother with `received_msg` for Init as its auto-generated and we don't want to
// bother re-generating the expected Init message in all tests.
}
fn handle_error(&self, _their_node_id: &PublicKey, msg: &msgs::ErrorMessage) {
self.received_msg(wire::Message::Error(msg.clone()));
}
}

impl events::MessageSendEventsProvider for TestChannelMessageHandler {
Expand Down

0 comments on commit b5a6307

Please sign in to comment.