Skip to content

Commit

Permalink
Merge pull request #1980 from TheBlueMatt/2023-01-async-utxo-lookups
Browse files Browse the repository at this point in the history
  • Loading branch information
wpaulino authored Feb 11, 2023
2 parents 90bb3f9 + 1f05575 commit be4bb58
Show file tree
Hide file tree
Showing 18 changed files with 1,191 additions and 201 deletions.
6 changes: 3 additions & 3 deletions ARCH.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ At a high level, some of the common interfaces fit together as follows:
--------------- / (as EventsProvider) ^ | |
| PeerManager |- \ | | |
--------------- \ | (is-a) | |
| ----------------- \ _---------------- / /
| | chain::Access | \ / | ChainMonitor |---------------
| ----------------- \ / ----------------
| -------------- \ _---------------- / /
| | UtxoLookup | \ / | ChainMonitor |---------------
| -------------- \ / ----------------
| ^ \ / |
(as RoutingMessageHandler) | v v
\ ----------------- --------- -----------------
Expand Down
3 changes: 2 additions & 1 deletion fuzz/src/full_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use lightning::ln::peer_handler::{MessageHandler,PeerManager,SocketDescriptor,Ig
use lightning::ln::msgs::{self, DecodeError};
use lightning::ln::script::ShutdownScript;
use lightning::routing::gossip::{P2PGossipSync, NetworkGraph};
use lightning::routing::utxo::UtxoLookup;
use lightning::routing::router::{find_route, InFlightHtlcs, PaymentParameters, Route, RouteHop, RouteParameters, Router};
use lightning::routing::scoring::FixedPenaltyScorer;
use lightning::util::config::UserConfig;
Expand Down Expand Up @@ -183,7 +184,7 @@ impl<'a> std::hash::Hash for Peer<'a> {
type ChannelMan<'a> = ChannelManager<
Arc<chainmonitor::ChainMonitor<EnforcingSigner, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<TestPersister>>>,
Arc<TestBroadcaster>, Arc<KeyProvider>, Arc<KeyProvider>, Arc<KeyProvider>, Arc<FuzzEstimator>, &'a FuzzRouter, Arc<dyn Logger>>;
type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan<'a>>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<dyn Logger>>>, Arc<dyn chain::Access>, Arc<dyn Logger>>>, IgnoringMessageHandler, Arc<dyn Logger>, IgnoringMessageHandler, Arc<KeyProvider>>;
type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan<'a>>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<dyn Logger>>>, Arc<dyn UtxoLookup>, Arc<dyn Logger>>>, IgnoringMessageHandler, Arc<dyn Logger>, IgnoringMessageHandler, Arc<KeyProvider>>;

struct MoneyLossDetector<'a> {
manager: Arc<ChannelMan<'a>>,
Expand Down
48 changes: 36 additions & 12 deletions fuzz/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ use bitcoin::blockdata::script::Builder;
use bitcoin::blockdata::transaction::TxOut;
use bitcoin::hash_types::BlockHash;

use lightning::chain;
use lightning::chain::transaction::OutPoint;
use lightning::ln::channelmanager::{self, ChannelDetails, ChannelCounterparty};
use lightning::ln::msgs;
use lightning::routing::gossip::{NetworkGraph, RoutingFees};
use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoLookupError, UtxoResult};
use lightning::routing::router::{find_route, PaymentParameters, RouteHint, RouteHintHop, RouteParameters};
use lightning::routing::scoring::FixedPenaltyScorer;
use lightning::util::config::UserConfig;
Expand Down Expand Up @@ -81,17 +81,36 @@ impl InputData {
}
}

struct FuzzChainSource {
struct FuzzChainSource<'a, 'b, Out: test_logger::Output> {
input: Arc<InputData>,
net_graph: &'a NetworkGraph<&'b test_logger::TestLogger<Out>>,
}
impl chain::Access for FuzzChainSource {
fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> Result<TxOut, chain::AccessError> {
match self.input.get_slice(2) {
Some(&[0, _]) => Err(chain::AccessError::UnknownChain),
Some(&[1, _]) => Err(chain::AccessError::UnknownTx),
Some(&[_, x]) => Ok(TxOut { value: 0, script_pubkey: Builder::new().push_int(x as i64).into_script().to_v0_p2wsh() }),
None => Err(chain::AccessError::UnknownTx),
_ => unreachable!(),
impl<Out: test_logger::Output> UtxoLookup for FuzzChainSource<'_, '_, Out> {
fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> UtxoResult {
let input_slice = self.input.get_slice(2);
if input_slice.is_none() { return UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)); }
let input_slice = input_slice.unwrap();
let txo_res = TxOut {
value: if input_slice[0] % 2 == 0 { 1_000_000 } else { 1_000 },
script_pubkey: Builder::new().push_int(input_slice[1] as i64).into_script().to_v0_p2wsh(),
};
match input_slice {
&[0, _] => UtxoResult::Sync(Err(UtxoLookupError::UnknownChain)),
&[1, _] => UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)),
&[2, _] => {
let future = UtxoFuture::new();
future.resolve_without_forwarding(self.net_graph, Ok(txo_res));
UtxoResult::Async(future.clone())
},
&[3, _] => {
let future = UtxoFuture::new();
future.resolve_without_forwarding(self.net_graph, Err(UtxoLookupError::UnknownTx));
UtxoResult::Async(future.clone())
},
&[4, _] => {
UtxoResult::Async(UtxoFuture::new()) // the future will never resolve
},
&[..] => UtxoResult::Sync(Ok(txo_res)),
}
}
}
Expand Down Expand Up @@ -171,6 +190,10 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {

let our_pubkey = get_pubkey!();
let net_graph = NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash(), &logger);
let chain_source = FuzzChainSource {
input: Arc::clone(&input),
net_graph: &net_graph,
};

let mut node_pks = HashSet::new();
let mut scid = 42;
Expand All @@ -191,13 +214,14 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
let msg = decode_msg_with_len16!(msgs::UnsignedChannelAnnouncement, 32+8+33*4);
node_pks.insert(get_pubkey_from_node_id!(msg.node_id_1));
node_pks.insert(get_pubkey_from_node_id!(msg.node_id_2));
let _ = net_graph.update_channel_from_unsigned_announcement::<&FuzzChainSource>(&msg, &None);
let _ = net_graph.update_channel_from_unsigned_announcement::
<&FuzzChainSource<'_, '_, Out>>(&msg, &None);
},
2 => {
let msg = decode_msg_with_len16!(msgs::UnsignedChannelAnnouncement, 32+8+33*4);
node_pks.insert(get_pubkey_from_node_id!(msg.node_id_1));
node_pks.insert(get_pubkey_from_node_id!(msg.node_id_2));
let _ = net_graph.update_channel_from_unsigned_announcement(&msg, &Some(&FuzzChainSource { input: Arc::clone(&input) }));
let _ = net_graph.update_channel_from_unsigned_announcement(&msg, &Some(&chain_source));
},
3 => {
let _ = net_graph.update_channel_unsigned(&decode_msg!(msgs::UnsignedChannelUpdate, 72));
Expand Down
45 changes: 23 additions & 22 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use lightning::ln::channelmanager::ChannelManager;
use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
use lightning::routing::utxo::UtxoLookup;
use lightning::routing::router::Router;
use lightning::routing::scoring::{Score, WriteableScore};
use lightning::util::events::{Event, EventHandler, EventsProvider};
Expand Down Expand Up @@ -116,13 +117,13 @@ const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;

/// Either [`P2PGossipSync`] or [`RapidGossipSync`].
pub enum GossipSync<
P: Deref<Target = P2PGossipSync<G, A, L>>,
P: Deref<Target = P2PGossipSync<G, U, L>>,
R: Deref<Target = RapidGossipSync<G, L>>,
G: Deref<Target = NetworkGraph<L>>,
A: Deref,
U: Deref,
L: Deref,
>
where A::Target: chain::Access, L::Target: Logger {
where U::Target: UtxoLookup, L::Target: Logger {
/// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
P2P(P),
/// Rapid gossip sync from a trusted server.
Expand All @@ -132,13 +133,13 @@ where A::Target: chain::Access, L::Target: Logger {
}

impl<
P: Deref<Target = P2PGossipSync<G, A, L>>,
P: Deref<Target = P2PGossipSync<G, U, L>>,
R: Deref<Target = RapidGossipSync<G, L>>,
G: Deref<Target = NetworkGraph<L>>,
A: Deref,
U: Deref,
L: Deref,
> GossipSync<P, R, G, A, L>
where A::Target: chain::Access, L::Target: Logger {
> GossipSync<P, R, G, U, L>
where U::Target: UtxoLookup, L::Target: Logger {
fn network_graph(&self) -> Option<&G> {
match self {
GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
Expand All @@ -163,10 +164,10 @@ where A::Target: chain::Access, L::Target: Logger {
}

/// (C-not exported) as the bindings concretize everything and have constructors for us
impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
impl<P: Deref<Target = P2PGossipSync<G, U, L>>, G: Deref<Target = NetworkGraph<L>>, U: Deref, L: Deref>
GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
where
A::Target: chain::Access,
U::Target: UtxoLookup,
L::Target: Logger,
{
/// Initializes a new [`GossipSync::P2P`] variant.
Expand All @@ -178,10 +179,10 @@ where
/// (C-not exported) as the bindings concretize everything and have constructors for us
impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
GossipSync<
&P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
&P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
R,
G,
&'a (dyn chain::Access + Send + Sync),
&'a (dyn UtxoLookup + Send + Sync),
L,
>
where
Expand All @@ -196,10 +197,10 @@ where
/// (C-not exported) as the bindings concretize everything and have constructors for us
impl<'a, L: Deref>
GossipSync<
&P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
&P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
&RapidGossipSync<&'a NetworkGraph<L>, L>,
&'a NetworkGraph<L>,
&'a (dyn chain::Access + Send + Sync),
&'a (dyn UtxoLookup + Send + Sync),
L,
>
where
Expand Down Expand Up @@ -397,7 +398,7 @@ macro_rules! define_run_body {
#[cfg(feature = "futures")]
pub async fn process_events_async<
'a,
CA: 'static + Deref + Send + Sync,
UL: 'static + Deref + Send + Sync,
CF: 'static + Deref + Send + Sync,
CW: 'static + Deref + Send + Sync,
T: 'static + Deref + Send + Sync,
Expand All @@ -418,7 +419,7 @@ pub async fn process_events_async<
PS: 'static + Deref + Send,
M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
UMH: 'static + Deref + Send + Sync,
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
Expand All @@ -428,11 +429,11 @@ pub async fn process_events_async<
Sleeper: Fn(Duration) -> SleepFuture
>(
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
sleeper: Sleeper,
) -> Result<(), io::Error>
where
CA::Target: 'static + chain::Access,
UL::Target: 'static + UtxoLookup,
CF::Target: 'static + chain::Filter,
CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
T::Target: 'static + BroadcasterInterface,
Expand Down Expand Up @@ -531,7 +532,7 @@ impl BackgroundProcessor {
/// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
pub fn start<
'a,
CA: 'static + Deref + Send + Sync,
UL: 'static + Deref + Send + Sync,
CF: 'static + Deref + Send + Sync,
CW: 'static + Deref + Send + Sync,
T: 'static + Deref + Send + Sync,
Expand All @@ -551,18 +552,18 @@ impl BackgroundProcessor {
PS: 'static + Deref + Send,
M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
UMH: 'static + Deref + Send + Sync,
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
S: 'static + Deref<Target = SC> + Send + Sync,
SC: for <'b> WriteableScore<'b>,
>(
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
) -> Self
where
CA::Target: 'static + chain::Access,
UL::Target: 'static + UtxoLookup,
CF::Target: 'static + chain::Filter,
CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
T::Target: 'static + BroadcasterInterface,
Expand Down
10 changes: 6 additions & 4 deletions lightning-net-tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@
//! type FeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync;
//! type Logger = dyn lightning::util::logger::Logger + Send + Sync;
//! type NodeSigner = dyn lightning::chain::keysinterface::NodeSigner + Send + Sync;
//! type ChainAccess = dyn lightning::chain::Access + Send + Sync;
//! type UtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync;
//! type ChainFilter = dyn lightning::chain::Filter + Send + Sync;
//! type DataPersister = dyn lightning::chain::chainmonitor::Persist<lightning::chain::keysinterface::InMemorySigner> + Send + Sync;
//! type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::chain::keysinterface::InMemorySigner, Arc<ChainFilter>, Arc<TxBroadcaster>, Arc<FeeEstimator>, Arc<Logger>, Arc<DataPersister>>;
//! type ChannelManager = Arc<lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor, TxBroadcaster, FeeEstimator, Logger>>;
//! type PeerManager = Arc<lightning::ln::peer_handler::SimpleArcPeerManager<lightning_net_tokio::SocketDescriptor, ChainMonitor, TxBroadcaster, FeeEstimator, ChainAccess, Logger>>;
//! type PeerManager = Arc<lightning::ln::peer_handler::SimpleArcPeerManager<lightning_net_tokio::SocketDescriptor, ChainMonitor, TxBroadcaster, FeeEstimator, UtxoLookup, Logger>>;
//!
//! // Connect to node with pubkey their_node_id at addr:
//! async fn connect_to_node(peer_manager: PeerManager, chain_monitor: Arc<ChainMonitor>, channel_manager: ChannelManager, their_node_id: PublicKey, addr: SocketAddr) {
Expand Down Expand Up @@ -176,8 +176,9 @@ impl Connection {
let (event_waker, event_receiver) = mpsc::channel(1);
tokio::spawn(Self::poll_event_process(peer_manager.clone(), event_receiver));

// 8KB is nice and big but also should never cause any issues with stack overflowing.
let mut buf = [0; 8192];
// 4KiB is nice and big without handling too many messages all at once, giving other peers
// a chance to do some work.
let mut buf = [0; 4096];

let mut our_descriptor = SocketDescriptor::new(us.clone());
// An enum describing why we did/are disconnecting:
Expand Down Expand Up @@ -623,6 +624,7 @@ mod tests {
fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) }
fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() }
fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::empty() }
fn processing_queue_high(&self) -> bool { false }
}
impl ChannelMessageHandler for MsgHandler {
fn handle_open_channel(&self, _their_node_id: &PublicKey, _msg: &OpenChannel) {}
Expand Down
21 changes: 0 additions & 21 deletions lightning/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
use bitcoin::blockdata::block::{Block, BlockHeader};
use bitcoin::blockdata::constants::genesis_block;
use bitcoin::blockdata::script::Script;
use bitcoin::blockdata::transaction::TxOut;
use bitcoin::hash_types::{BlockHash, Txid};
use bitcoin::network::constants::Network;
use bitcoin::secp256k1::PublicKey;
Expand Down Expand Up @@ -60,26 +59,6 @@ impl BestBlock {
pub fn height(&self) -> u32 { self.height }
}

/// An error when accessing the chain via [`Access`].
#[derive(Clone, Debug)]
pub enum AccessError {
/// The requested chain is unknown.
UnknownChain,

/// The requested transaction doesn't exist or hasn't confirmed.
UnknownTx,
}

/// The `Access` trait defines behavior for accessing chain data and state, such as blocks and
/// UTXOs.
pub trait Access {
/// Returns the transaction output of a funding transaction encoded by [`short_channel_id`].
/// Returns an error if `genesis_hash` is for a different chain or if such a transaction output
/// is unknown.
///
/// [`short_channel_id`]: https://github.com/lightning/bolts/blob/master/07-routing-gossip.md#definition-of-short_channel_id
fn get_utxo(&self, genesis_hash: &BlockHash, short_channel_id: u64) -> Result<TxOut, AccessError>;
}

/// The `Listen` trait is used to notify when blocks have been connected or disconnected from the
/// chain.
Expand Down
5 changes: 3 additions & 2 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5044,7 +5044,7 @@ where
), chan),
// Note that announcement_signatures fails if the channel cannot be announced,
// so get_channel_update_for_broadcast will never fail by the time we get here.
update_msg: self.get_channel_update_for_broadcast(chan.get()).unwrap(),
update_msg: Some(self.get_channel_update_for_broadcast(chan.get()).unwrap()),
});
},
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
Expand Down Expand Up @@ -5973,7 +5973,7 @@ where
msg: announcement,
// Note that announcement_signatures fails if the channel cannot be announced,
// so get_channel_update_for_broadcast will never fail by the time we get here.
update_msg: self.get_channel_update_for_broadcast(channel).unwrap(),
update_msg: Some(self.get_channel_update_for_broadcast(channel).unwrap()),
});
}
}
Expand Down Expand Up @@ -6289,6 +6289,7 @@ where
&events::MessageSendEvent::SendChannelAnnouncement { .. } => false,
&events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true,
&events::MessageSendEvent::BroadcastChannelUpdate { .. } => true,
&events::MessageSendEvent::BroadcastNodeAnnouncement { .. } => true,
&events::MessageSendEvent::SendChannelUpdate { .. } => false,
&events::MessageSendEvent::HandleError { .. } => false,
&events::MessageSendEvent::SendChannelRangeQuery { .. } => false,
Expand Down
Loading

0 comments on commit be4bb58

Please sign in to comment.