Skip to content

Commit

Permalink
Add custom message handler to peer manager
Browse files Browse the repository at this point in the history
  • Loading branch information
Tibo-lg committed Aug 5, 2021
1 parent 69ee486 commit 70ea041
Show file tree
Hide file tree
Showing 7 changed files with 351 additions and 259 deletions.
6 changes: 3 additions & 3 deletions fuzz/src/full_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use lightning::chain::transaction::OutPoint;
use lightning::chain::keysinterface::{InMemorySigner, KeysInterface};
use lightning::ln::{PaymentHash, PaymentPreimage, PaymentSecret};
use lightning::ln::channelmanager::{ChainParameters, ChannelManager};
use lightning::ln::peer_handler::{MessageHandler,PeerManager,SocketDescriptor};
use lightning::ln::peer_handler::{MessageHandler,PeerManager,SocketDescriptor,IgnoringCustomMessageHandler};
use lightning::ln::msgs::DecodeError;
use lightning::routing::router::get_route;
use lightning::routing::network_graph::NetGraphMsgHandler;
Expand Down Expand Up @@ -159,7 +159,7 @@ type ChannelMan = ChannelManager<
EnforcingSigner,
Arc<chainmonitor::ChainMonitor<EnforcingSigner, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<TestPersister>>>,
Arc<TestBroadcaster>, Arc<KeyProvider>, Arc<FuzzEstimator>, Arc<dyn Logger>>;
type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<NetGraphMsgHandler<Arc<dyn chain::Access>, Arc<dyn Logger>>>, Arc<dyn Logger>>;
type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<NetGraphMsgHandler<Arc<dyn chain::Access>, Arc<dyn Logger>>>, Arc<dyn Logger>, Arc<IgnoringCustomMessageHandler>>;

struct MoneyLossDetector<'a> {
manager: Arc<ChannelMan>,
Expand Down Expand Up @@ -374,7 +374,7 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {
let mut loss_detector = MoneyLossDetector::new(&peers, channelmanager.clone(), monitor.clone(), PeerManager::new(MessageHandler {
chan_handler: channelmanager.clone(),
route_handler: net_graph_msg_handler.clone(),
}, our_network_key, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0], Arc::clone(&logger)));
}, our_network_key, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0], Arc::clone(&logger), Arc::new(IgnoringCustomMessageHandler{})));

let mut should_forward = false;
let mut payments_received: Vec<PaymentHash> = Vec::new();
Expand Down
12 changes: 7 additions & 5 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use lightning::chain::channelmonitor;
use lightning::chain::keysinterface::{Sign, KeysInterface};
use lightning::ln::channelmanager::ChannelManager;
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
use lightning::ln::peer_handler::{PeerManager, SocketDescriptor};
use lightning::ln::peer_handler::{PeerManager, SocketDescriptor, CustomMessageHandler};
use lightning::util::events::{EventHandler, EventsProvider};
use lightning::util::logger::Logger;
use std::sync::Arc;
Expand Down Expand Up @@ -120,7 +120,8 @@ impl BackgroundProcessor {
CMP: 'static + Send + ChannelManagerPersister<Signer, CW, T, K, F, L>,
M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L>> + Send + Sync,
UMH: 'static + Deref + Send + Sync,
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
>
(persister: CMP, event_handler: EH, chain_monitor: M, channel_manager: CM, peer_manager: PM, logger: L) -> Self
where
Expand All @@ -133,6 +134,7 @@ impl BackgroundProcessor {
P::Target: 'static + channelmonitor::Persist<Signer>,
CMH::Target: 'static + ChannelMessageHandler,
RMH::Target: 'static + RoutingMessageHandler,
UMH::Target: 'static + CustomMessageHandler,
{
let stop_thread = Arc::new(AtomicBool::new(false));
let stop_thread_clone = stop_thread.clone();
Expand Down Expand Up @@ -224,7 +226,7 @@ mod tests {
use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
use lightning::ln::features::InitFeatures;
use lightning::ln::msgs::ChannelMessageHandler;
use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor};
use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringCustomMessageHandler};
use lightning::util::config::UserConfig;
use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
use lightning::util::ser::Writeable;
Expand Down Expand Up @@ -252,7 +254,7 @@ mod tests {

struct Node {
node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>>>,
peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, Arc<IgnoringCustomMessageHandler>>>,
chain_monitor: Arc<ChainMonitor>,
persister: Arc<FilesystemPersister>,
tx_broadcaster: Arc<test_utils::TestBroadcaster>,
Expand Down Expand Up @@ -293,7 +295,7 @@ mod tests {
let params = ChainParameters { network, best_block };
let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(), &seed, logger.clone()));
let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(), &seed, logger.clone(), Arc::new(IgnoringCustomMessageHandler{})));
let node = Node { node: manager, peer_manager, chain_monitor, persister, tx_broadcaster, logger, best_block };
nodes.push(node);
}
Expand Down
25 changes: 15 additions & 10 deletions lightning-net-tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};

use lightning::ln::peer_handler;
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
use lightning::ln::peer_handler::CustomMessageHandler;
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
use lightning::util::logger::Logger;

Expand Down Expand Up @@ -119,10 +120,11 @@ struct Connection {
id: u64,
}
impl Connection {
async fn schedule_read<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
CMH: ChannelMessageHandler + 'static,
RMH: RoutingMessageHandler + 'static,
L: Logger + 'static + ?Sized {
L: Logger + 'static + ?Sized,
UMH: CustomMessageHandler + 'static {
// 8KB is nice and big but also should never cause any issues with stack overflowing.
let mut buf = [0; 8192];

Expand Down Expand Up @@ -222,10 +224,11 @@ impl Connection {
/// The returned future will complete when the peer is disconnected and associated handling
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
/// not need to poll the provided future in order to make progress.
pub fn setup_inbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
CMH: ChannelMessageHandler + 'static + Send + Sync,
RMH: RoutingMessageHandler + 'static + Send + Sync,
L: Logger + 'static + ?Sized + Send + Sync {
L: Logger + 'static + ?Sized + Send + Sync,
UMH: CustomMessageHandler + 'static + Send + Sync {
let (reader, write_receiver, read_receiver, us) = Connection::new(stream);
#[cfg(debug_assertions)]
let last_us = Arc::clone(&us);
Expand Down Expand Up @@ -262,10 +265,11 @@ pub fn setup_inbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<So
/// The returned future will complete when the peer is disconnected and associated handling
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
/// not need to poll the provided future in order to make progress.
pub fn setup_outbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
CMH: ChannelMessageHandler + 'static + Send + Sync,
RMH: RoutingMessageHandler + 'static + Send + Sync,
L: Logger + 'static + ?Sized + Send + Sync {
L: Logger + 'static + ?Sized + Send + Sync,
UMH: CustomMessageHandler + 'static + Send + Sync {
let (reader, mut write_receiver, read_receiver, us) = Connection::new(stream);
#[cfg(debug_assertions)]
let last_us = Arc::clone(&us);
Expand Down Expand Up @@ -332,10 +336,11 @@ pub fn setup_outbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<S
/// disconnected and associated handling futures are freed, though, because all processing in said
/// futures are spawned with tokio::spawn, you do not need to poll the second future in order to
/// make progress.
pub async fn connect_outbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
pub async fn connect_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
CMH: ChannelMessageHandler + 'static + Send + Sync,
RMH: RoutingMessageHandler + 'static + Send + Sync,
L: Logger + 'static + ?Sized + Send + Sync {
L: Logger + 'static + ?Sized + Send + Sync,
UMH: CustomMessageHandler + 'static + Send + Sync {
if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }).await {
Some(setup_outbound(peer_manager, their_node_id, stream))
} else { None }
Expand Down Expand Up @@ -563,7 +568,7 @@ mod tests {
let a_manager = Arc::new(PeerManager::new(MessageHandler {
chan_handler: Arc::clone(&a_handler),
route_handler: Arc::clone(&a_handler),
}, a_key.clone(), &[1; 32], Arc::new(TestLogger())));
}, a_key.clone(), &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringCustomMessageHandler{})));

let (b_connected_sender, mut b_connected) = mpsc::channel(1);
let (b_disconnected_sender, mut b_disconnected) = mpsc::channel(1);
Expand All @@ -577,7 +582,7 @@ mod tests {
let b_manager = Arc::new(PeerManager::new(MessageHandler {
chan_handler: Arc::clone(&b_handler),
route_handler: Arc::clone(&b_handler),
}, b_key.clone(), &[2; 32], Arc::new(TestLogger())));
}, b_key.clone(), &[2; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringCustomMessageHandler{})));

// We bind on localhost, hoping the environment is properly configured with a local
// address. This may not always be the case in containers and the like, so if this test is
Expand Down
2 changes: 1 addition & 1 deletion lightning/src/ln/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub(crate) mod peer_channel_encryptor;

mod channel;
mod onion_utils;
mod wire;
pub mod wire;

// Older rustc (which we support) refuses to let us call the get_payment_preimage_hash!() macro
// without the node parameter being mut. This is incorrect, and thus newer rustcs will complain
Expand Down
Loading

0 comments on commit 70ea041

Please sign in to comment.