Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process messages from different peers in parallel in PeerManager. #1023

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 32 additions & 5 deletions lightning-net-tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,28 @@ struct Connection {
id: u64,
}
impl Connection {
async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, mut event_receiver: mpsc::Receiver<()>) where
CMH: ChannelMessageHandler + 'static + Send + Sync,
RMH: RoutingMessageHandler + 'static + Send + Sync,
L: Logger + 'static + ?Sized + Send + Sync,
UMH: CustomMessageHandler + 'static + Send + Sync {
loop {
if event_receiver.recv().await.is_none() {
return;
}
peer_manager.process_events();
}
}

async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
CMH: ChannelMessageHandler + 'static,
RMH: RoutingMessageHandler + 'static,
L: Logger + 'static + ?Sized,
UMH: CustomMessageHandler + 'static {
CMH: ChannelMessageHandler + 'static + Send + Sync,
RMH: RoutingMessageHandler + 'static + Send + Sync,
L: Logger + 'static + ?Sized + Send + Sync,
UMH: CustomMessageHandler + 'static + Send + Sync {
// Create a waker to wake up poll_event_process, above
let (event_waker, event_receiver) = mpsc::channel(1);
tokio::spawn(Self::poll_event_process(Arc::clone(&peer_manager), event_receiver));

// 8KB is nice and big but also should never cause any issues with stack overflowing.
let mut buf = [0; 8192];

Expand Down Expand Up @@ -176,7 +193,14 @@ impl Connection {
Err(_) => break Disconnect::PeerDisconnected,
},
}
peer_manager.process_events();
let _ = event_waker.try_send(());

// At this point we've processed a message or two, and reset the ping timer for this
// peer, at least in the "are we still receiving messages" context, if we don't give up
// our timeslice to another task we may just spin on this peer, starving other peers
// and eventually disconnecting them for ping timeouts. Instead, we explicitly yield
// here.
tokio::task::yield_now().await;
};
let writer_option = us.lock().unwrap().writer.take();
if let Some(mut writer) = writer_option {
Expand Down Expand Up @@ -448,6 +472,9 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
// pause read given we're now waiting on the remote end to ACK (and in
// accordance with the send_data() docs).
us.read_paused = true;
// Further, to avoid any current pending read causing a `read_event` call, wake
// up the read_waker and restart its loop.
let _ = us.read_waker.try_send(());
return written_len;
},
}
Expand Down
2 changes: 2 additions & 0 deletions lightning/src/debug_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,3 +362,5 @@ fn read_write_lockorder_fail() {
let _a = a.write().unwrap();
}
}

pub type FairRwLock<T> = RwLock<T>;
2 changes: 2 additions & 0 deletions lightning/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ mod sync {
pub use debug_sync::*;
#[cfg(not(test))]
pub use ::std::sync::{Arc, Mutex, Condvar, MutexGuard, RwLock, RwLockReadGuard};
#[cfg(not(test))]
pub use crate::util::fairrwlock::FairRwLock;
}

#[cfg(not(feature = "std"))]
Expand Down
928 changes: 533 additions & 395 deletions lightning/src/ln/peer_handler.rs

Large diffs are not rendered by default.

44 changes: 35 additions & 9 deletions lightning/src/ln/wire.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,26 @@ pub trait CustomMessageReader {
fn read<R: io::Read>(&self, message_type: u16, buffer: &mut R) -> Result<Option<Self::CustomMessage>, msgs::DecodeError>;
}

// TestEq is a dummy trait which requires PartialEq when built in testing, and otherwise is
// blanket-implemented for all types.

#[cfg(test)]
pub trait TestEq : PartialEq {}
#[cfg(test)]
impl<T: PartialEq> TestEq for T {}

#[cfg(not(test))]
pub(crate) trait TestEq {}
#[cfg(not(test))]
impl<T> TestEq for T {}


/// A Lightning message returned by [`read()`] when decoding bytes received over the wire. Each
/// variant contains a message from [`msgs`] or otherwise the message type if unknown.
#[allow(missing_docs)]
#[derive(Debug)]
pub(crate) enum Message<T> where T: core::fmt::Debug + Type {
#[cfg_attr(test, derive(PartialEq))]
pub(crate) enum Message<T> where T: core::fmt::Debug + Type + TestEq {
Init(msgs::Init),
Error(msgs::ErrorMessage),
Warning(msgs::WarningMessage),
Expand Down Expand Up @@ -69,7 +84,7 @@ pub(crate) enum Message<T> where T: core::fmt::Debug + Type {
Custom(T),
}

impl<T> Message<T> where T: core::fmt::Debug + Type {
impl<T> Message<T> where T: core::fmt::Debug + Type + TestEq {
/// Returns the type that was used to decode the message payload.
pub fn type_id(&self) -> u16 {
match self {
Expand Down Expand Up @@ -252,6 +267,7 @@ mod encode {

pub(crate) use self::encode::Encode;

#[cfg(not(test))]
/// Defines a type identifier for sending messages over the wire.
///
/// Messages implementing this trait specify a type and must be [`Writeable`].
Expand All @@ -260,10 +276,24 @@ pub trait Type: core::fmt::Debug + Writeable {
fn type_id(&self) -> u16;
}

#[cfg(test)]
pub trait Type: core::fmt::Debug + Writeable + PartialEq {
fn type_id(&self) -> u16;
}

#[cfg(any(feature = "_test_utils", fuzzing, test))]
impl Type for () {
fn type_id(&self) -> u16 { unreachable!(); }
}

#[cfg(test)]
impl<T: core::fmt::Debug + Writeable + PartialEq> Type for T where T: Encode {
fn type_id(&self) -> u16 { T::TYPE }
}

#[cfg(not(test))]
impl<T: core::fmt::Debug + Writeable> Type for T where T: Encode {
fn type_id(&self) -> u16 {
T::TYPE
}
fn type_id(&self) -> u16 { T::TYPE }
}

impl Encode for msgs::Init {
Expand Down Expand Up @@ -471,10 +501,6 @@ mod tests {
}
}

impl Type for () {
fn type_id(&self) -> u16 { unreachable!(); }
}

#[test]
fn is_even_message_type() {
let message = Message::<()>::Unknown(42);
Expand Down
2 changes: 2 additions & 0 deletions lightning/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,5 @@ impl<T> RwLock<T> {
Err(())
}
}

pub type FairRwLock<T> = RwLock<T>;
50 changes: 50 additions & 0 deletions lightning/src/util/fairrwlock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use std::sync::{TryLockResult, LockResult, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::sync::atomic::{AtomicUsize, Ordering};

/// Rust libstd's RwLock does not provide any fairness guarantees (and, in fact, when used on
/// Linux with pthreads under the hood, readers trivially and completely starve writers).
/// Because we often hold read locks while doing message processing in multiple threads which
/// can use significant CPU time, with write locks being time-sensitive but relatively small in
/// CPU time, we can end up with starvation completely blocking incoming connections or pings,
/// especially during initial graph sync.
///
/// Thus, we need to block readers when a writer is pending, which we do with a trivial RwLock
/// wrapper here. Its not particularly optimized, but provides some reasonable fairness by
/// blocking readers (by taking the write lock) if there are writers pending when we go to take
/// a read lock.
pub struct FairRwLock<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think fair is somewhat subjective. In this case, it's fair because it prioritizes writing. Should that perhaps be reflected in the name, though I don't really have good suggestions? Like WritePreferenceRwLock? WriterPriorityRwLock? Open to suggestions.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean its fair in the traditional sense that there is no starvation. Writers won't starve readers, either, I believe, as long as the underlying native RwLock doesn't allow writers to starve other writers.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose that's not entirely true, if the underlying RwLock allows writers to starve readers we will still exhibit that behavior here, but I don't think any do, so for now its probably fine, will add a comment.

lock: RwLock<T>,
waiting_writers: AtomicUsize,
}

impl<T> FairRwLock<T> {
pub fn new(t: T) -> Self {
Self { lock: RwLock::new(t), waiting_writers: AtomicUsize::new(0) }
}

// Note that all atomic accesses are relaxed, as we do not rely on the atomics here for any
// ordering at all, instead relying on the underlying RwLock to provide ordering of unrelated
// memory.
pub fn write(&self) -> LockResult<RwLockWriteGuard<T>> {
self.waiting_writers.fetch_add(1, Ordering::Relaxed);
let res = self.lock.write();
self.waiting_writers.fetch_sub(1, Ordering::Relaxed);
res
}

pub fn try_write(&self) -> TryLockResult<RwLockWriteGuard<T>> {
self.lock.try_write()
}

pub fn read(&self) -> LockResult<RwLockReadGuard<T>> {
if self.waiting_writers.load(Ordering::Relaxed) != 0 {
let _write_queue_lock = self.lock.write();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a comment here would be nice explaining the idea of initiating a wait for a write lock before a new read lock is accessed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current mechanism seems to achieve that whenever there are pending write locks, a new read lock cannot be added until those pending write locks complete. However, if there are already pending read locks, and a new write lock gets added, it will have to wait. Have you considered ways to jump the line, so to speak, and would that be a desirable property?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jumping the line would break the fairness property - if there's waiting writers, and we allow new readers to take the read lock, then those new readers may cause the waiting writer to wait even longer, which is the issue we're trying to solve to begin with.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for a comment, I'm not sure what to add to the existing one that's in the struct docs:

/// Thus, we need to block readers when a writer is pending, which we do with a trivial RwLock
/// wrapper here. Its not particularly optimized, but provides some reasonable fairness by
/// blocking readers (by taking the write lock) if there are writers pending when we go to take
/// a read lock.

}
// Note that we don't consider ensuring that an underlying RwLock allowing writers to
// starve readers doesn't exhibit the same behavior here. I'm not aware of any
// libstd-backing RwLock which exhibits this behavior, and as documented in the
// struct-level documentation, it shouldn't pose a significant issue for our current
// codebase.
self.lock.read()
}
}
2 changes: 2 additions & 0 deletions lightning/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub mod persist;
pub(crate) mod atomic_counter;
pub(crate) mod byte_utils;
pub(crate) mod chacha20;
#[cfg(feature = "std")]
pub(crate) mod fairrwlock;
#[cfg(fuzzing)]
pub mod zbase32;
#[cfg(not(fuzzing))]
Expand Down
109 changes: 89 additions & 20 deletions lightning/src/util/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use chain::channelmonitor::MonitorEvent;
use chain::transaction::OutPoint;
use chain::keysinterface;
use ln::features::{ChannelFeatures, InitFeatures};
use ln::msgs;
use ln::{msgs, wire};
use ln::msgs::OptionalField;
use ln::script::ShutdownScript;
use routing::scoring::FixedPenaltyScorer;
Expand Down Expand Up @@ -249,37 +249,106 @@ impl chaininterface::BroadcasterInterface for TestBroadcaster {

pub struct TestChannelMessageHandler {
pub pending_events: Mutex<Vec<events::MessageSendEvent>>,
expected_recv_msgs: Mutex<Option<Vec<wire::Message<()>>>>,
}

impl TestChannelMessageHandler {
pub fn new() -> Self {
TestChannelMessageHandler {
pending_events: Mutex::new(Vec::new()),
expected_recv_msgs: Mutex::new(None),
}
}

#[cfg(test)]
pub(crate) fn expect_receive_msg(&self, ev: wire::Message<()>) {
let mut expected_msgs = self.expected_recv_msgs.lock().unwrap();
if expected_msgs.is_none() { *expected_msgs = Some(Vec::new()); }
expected_msgs.as_mut().unwrap().push(ev);
}

fn received_msg(&self, ev: wire::Message<()>) {
let mut msgs = self.expected_recv_msgs.lock().unwrap();
if msgs.is_none() { return; }
assert!(!msgs.as_ref().unwrap().is_empty(), "Received message when we weren't expecting one");
#[cfg(test)]
assert_eq!(msgs.as_ref().unwrap()[0], ev);
TheBlueMatt marked this conversation as resolved.
Show resolved Hide resolved
msgs.as_mut().unwrap().remove(0);
}
}

impl Drop for TestChannelMessageHandler {
fn drop(&mut self) {
let l = self.expected_recv_msgs.lock().unwrap();
#[cfg(feature = "std")]
TheBlueMatt marked this conversation as resolved.
Show resolved Hide resolved
{
if !std::thread::panicking() {
assert!(l.is_none() || l.as_ref().unwrap().is_empty());
}
}
}
}

impl msgs::ChannelMessageHandler for TestChannelMessageHandler {
fn handle_open_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &msgs::OpenChannel) {}
fn handle_accept_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &msgs::AcceptChannel) {}
fn handle_funding_created(&self, _their_node_id: &PublicKey, _msg: &msgs::FundingCreated) {}
fn handle_funding_signed(&self, _their_node_id: &PublicKey, _msg: &msgs::FundingSigned) {}
fn handle_funding_locked(&self, _their_node_id: &PublicKey, _msg: &msgs::FundingLocked) {}
fn handle_shutdown(&self, _their_node_id: &PublicKey, _their_features: &InitFeatures, _msg: &msgs::Shutdown) {}
fn handle_closing_signed(&self, _their_node_id: &PublicKey, _msg: &msgs::ClosingSigned) {}
fn handle_update_add_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateAddHTLC) {}
fn handle_update_fulfill_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateFulfillHTLC) {}
fn handle_update_fail_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateFailHTLC) {}
fn handle_update_fail_malformed_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateFailMalformedHTLC) {}
fn handle_commitment_signed(&self, _their_node_id: &PublicKey, _msg: &msgs::CommitmentSigned) {}
fn handle_revoke_and_ack(&self, _their_node_id: &PublicKey, _msg: &msgs::RevokeAndACK) {}
fn handle_update_fee(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateFee) {}
fn handle_channel_update(&self, _their_node_id: &PublicKey, _msg: &msgs::ChannelUpdate) {}
fn handle_announcement_signatures(&self, _their_node_id: &PublicKey, _msg: &msgs::AnnouncementSignatures) {}
fn handle_channel_reestablish(&self, _their_node_id: &PublicKey, _msg: &msgs::ChannelReestablish) {}
fn handle_open_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, msg: &msgs::OpenChannel) {
self.received_msg(wire::Message::OpenChannel(msg.clone()));
}
fn handle_accept_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, msg: &msgs::AcceptChannel) {
self.received_msg(wire::Message::AcceptChannel(msg.clone()));
}
fn handle_funding_created(&self, _their_node_id: &PublicKey, msg: &msgs::FundingCreated) {
self.received_msg(wire::Message::FundingCreated(msg.clone()));
}
fn handle_funding_signed(&self, _their_node_id: &PublicKey, msg: &msgs::FundingSigned) {
self.received_msg(wire::Message::FundingSigned(msg.clone()));
}
fn handle_funding_locked(&self, _their_node_id: &PublicKey, msg: &msgs::FundingLocked) {
self.received_msg(wire::Message::FundingLocked(msg.clone()));
}
fn handle_shutdown(&self, _their_node_id: &PublicKey, _their_features: &InitFeatures, msg: &msgs::Shutdown) {
self.received_msg(wire::Message::Shutdown(msg.clone()));
}
fn handle_closing_signed(&self, _their_node_id: &PublicKey, msg: &msgs::ClosingSigned) {
self.received_msg(wire::Message::ClosingSigned(msg.clone()));
}
fn handle_update_add_htlc(&self, _their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) {
self.received_msg(wire::Message::UpdateAddHTLC(msg.clone()));
}
fn handle_update_fulfill_htlc(&self, _their_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) {
self.received_msg(wire::Message::UpdateFulfillHTLC(msg.clone()));
}
fn handle_update_fail_htlc(&self, _their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) {
self.received_msg(wire::Message::UpdateFailHTLC(msg.clone()));
}
fn handle_update_fail_malformed_htlc(&self, _their_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) {
self.received_msg(wire::Message::UpdateFailMalformedHTLC(msg.clone()));
}
fn handle_commitment_signed(&self, _their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) {
self.received_msg(wire::Message::CommitmentSigned(msg.clone()));
}
fn handle_revoke_and_ack(&self, _their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) {
self.received_msg(wire::Message::RevokeAndACK(msg.clone()));
}
fn handle_update_fee(&self, _their_node_id: &PublicKey, msg: &msgs::UpdateFee) {
self.received_msg(wire::Message::UpdateFee(msg.clone()));
}
fn handle_channel_update(&self, _their_node_id: &PublicKey, _msg: &msgs::ChannelUpdate) {
// Don't call `received_msg` here as `TestRoutingMessageHandler` generates these sometimes
}
fn handle_announcement_signatures(&self, _their_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) {
self.received_msg(wire::Message::AnnouncementSignatures(msg.clone()));
}
fn handle_channel_reestablish(&self, _their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) {
self.received_msg(wire::Message::ChannelReestablish(msg.clone()));
}
fn peer_disconnected(&self, _their_node_id: &PublicKey, _no_connection_possible: bool) {}
fn peer_connected(&self, _their_node_id: &PublicKey, _msg: &msgs::Init) {}
fn handle_error(&self, _their_node_id: &PublicKey, _msg: &msgs::ErrorMessage) {}
fn peer_connected(&self, _their_node_id: &PublicKey, _msg: &msgs::Init) {
// Don't bother with `received_msg` for Init as its auto-generated and we don't want to
// bother re-generating the expected Init message in all tests.
}
fn handle_error(&self, _their_node_id: &PublicKey, msg: &msgs::ErrorMessage) {
self.received_msg(wire::Message::Error(msg.clone()));
}
}

impl events::MessageSendEventsProvider for TestChannelMessageHandler {
Expand Down