From b88e4543e73ebd431698cf05d54b39bec4744061 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zoe=20Faltib=C3=A0?= Date: Wed, 10 May 2023 17:40:47 +0200 Subject: [PATCH] persist payments info to disk --- src/cli.rs | 52 ++++++++++++++------------- src/disk.rs | 36 +++++++++++++++++-- src/main.rs | 100 +++++++++++++++++++++++++++++++++++++++++----------- 3 files changed, 140 insertions(+), 48 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index cba62559..9bd9ee9a 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -23,10 +23,9 @@ use std::env; use std::io; use std::io::Write; use std::net::{SocketAddr, ToSocketAddrs}; -use std::ops::Deref; use std::path::Path; use std::str::FromStr; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Duration; pub(crate) struct LdkUserInfo { @@ -61,8 +60,8 @@ impl Writeable for UserOnionMessageContents { pub(crate) async fn poll_for_user_input( peer_manager: Arc, channel_manager: Arc, keys_manager: Arc, network_graph: Arc, - onion_messenger: Arc, inbound_payments: PaymentInfoStorage, - outbound_payments: PaymentInfoStorage, ldk_data_dir: String, network: Network, + onion_messenger: Arc, inbound_payments: Arc>, + outbound_payments: Arc>, ldk_data_dir: String, network: Network, logger: Arc, ) { println!( @@ -157,7 +156,10 @@ pub(crate) async fn poll_for_user_input( } }; - send_payment(&*channel_manager, &invoice, outbound_payments.clone()); + let mut outbound_payments = outbound_payments.lock().unwrap(); + send_payment(&*channel_manager, &invoice, &mut outbound_payments); + disk::persist_outbound_payments(ldk_data_dir.clone(), &outbound_payments) + .unwrap(); } "keysend" => { let dest_pubkey = match words.next() { @@ -187,13 +189,16 @@ pub(crate) async fn poll_for_user_input( continue; } }; + let mut outbound_payments = outbound_payments.lock().unwrap(); keysend( &*channel_manager, dest_pubkey, amt_msat, &*keys_manager, - outbound_payments.clone(), + &mut outbound_payments, ); + disk::persist_outbound_payments(ldk_data_dir.clone(), &outbound_payments) + .unwrap(); } "getinvoice" => { let amt_str = words.next(); @@ -220,15 +225,18 @@ pub(crate) async fn poll_for_user_input( continue; } + let mut inbound_payments = inbound_payments.lock().unwrap(); get_invoice( amt_msat.unwrap(), - Arc::clone(&inbound_payments), + &mut inbound_payments, &*channel_manager, Arc::clone(&keys_manager), network, expiry_secs.unwrap(), Arc::clone(&logger), ); + disk::persist_inbound_payments(ldk_data_dir.clone(), &inbound_payments) + .unwrap(); } "connectpeer" => { let peer_pubkey_and_ip_addr = words.next(); @@ -278,9 +286,10 @@ pub(crate) async fn poll_for_user_input( } } "listchannels" => list_channels(&channel_manager, &network_graph), - "listpayments" => { - list_payments(inbound_payments.clone(), outbound_payments.clone()) - } + "listpayments" => list_payments( + &inbound_payments.lock().unwrap(), + &outbound_payments.lock().unwrap(), + ), "closechannel" => { let channel_id_str = words.next(); if channel_id_str.is_none() { @@ -527,11 +536,9 @@ fn list_channels(channel_manager: &Arc, network_graph: &Arc( channel_manager: &ChannelManager, payee_pubkey: PublicKey, amt_msat: u64, entropy_source: &E, - payment_storage: PaymentInfoStorage, + outbound_payments: &mut PaymentInfoStorage, ) { let payment_preimage = PaymentPreimage(entropy_source.get_secure_random_bytes()); let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0[..]).into_inner()); @@ -720,8 +726,7 @@ fn keysend( } }; - let mut payments = payment_storage.lock().unwrap(); - payments.insert( + outbound_payments.payments.insert( payment_hash, PaymentInfo { preimage: None, @@ -733,11 +738,10 @@ fn keysend( } fn get_invoice( - amt_msat: u64, payment_storage: PaymentInfoStorage, channel_manager: &ChannelManager, + amt_msat: u64, inbound_payments: &mut PaymentInfoStorage, channel_manager: &ChannelManager, keys_manager: Arc, network: Network, expiry_secs: u32, logger: Arc, ) { - let mut payments = payment_storage.lock().unwrap(); let currency = match network { Network::Bitcoin => Currency::Bitcoin, Network::Testnet => Currency::BitcoinTestnet, @@ -765,7 +769,7 @@ fn get_invoice( }; let payment_hash = PaymentHash(invoice.payment_hash().clone().into_inner()); - payments.insert( + inbound_payments.payments.insert( payment_hash, PaymentInfo { preimage: None, diff --git a/src/disk.rs b/src/disk.rs index 0c308513..e28c3179 100644 --- a/src/disk.rs +++ b/src/disk.rs @@ -1,14 +1,14 @@ -use crate::{cli, NetworkGraph}; +use crate::{cli, NetworkGraph, PaymentInfoStorage}; use bitcoin::secp256k1::PublicKey; use bitcoin::Network; use chrono::Utc; use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringParameters}; use lightning::util::logger::{Logger, Record}; -use lightning::util::ser::{ReadableArgs, Writer}; +use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use std::collections::HashMap; use std::fs; use std::fs::File; -use std::io::{BufRead, BufReader}; +use std::io::{BufRead, BufReader, BufWriter}; use std::net::SocketAddr; use std::path::Path; use std::sync::Arc; @@ -52,6 +52,27 @@ pub(crate) fn persist_channel_peer(path: &Path, peer_info: &str) -> std::io::Res file.write_all(format!("{}\n", peer_info).as_bytes()) } +pub(crate) fn persist_inbound_payments( + ldk_data_dir: String, inbound_payments: &PaymentInfoStorage, +) -> std::io::Result<()> { + persist_payment_info(Path::new(&format!("{}/inbound_payments", ldk_data_dir)), inbound_payments) +} + +pub(crate) fn persist_outbound_payments( + ldk_data_dir: String, outbound_payments: &PaymentInfoStorage, +) -> std::io::Result<()> { + persist_payment_info( + Path::new(&format!("{}/outbound_payments", ldk_data_dir)), + outbound_payments, + ) +} + +fn persist_payment_info(path: &Path, payment_info: &PaymentInfoStorage) -> std::io::Result<()> { + let mut buf = BufWriter::new(fs::File::create(&path)?); + payment_info.write(&mut buf)?; + buf.into_inner()?.sync_all() +} + pub(crate) fn read_channel_peer_data( path: &Path, ) -> Result, std::io::Error> { @@ -83,6 +104,15 @@ pub(crate) fn read_network( NetworkGraph::new(network, logger) } +pub(crate) fn read_payment_info(path: &Path) -> PaymentInfoStorage { + if let Ok(file) = File::open(path) { + if let Ok(info) = PaymentInfoStorage::read(&mut BufReader::new(file)) { + return info; + } + } + PaymentInfoStorage { payments: HashMap::new() } +} + pub(crate) fn read_scorer( path: &Path, graph: Arc, logger: Arc, ) -> ProbabilisticScorer, Arc> { diff --git a/src/main.rs b/src/main.rs index 159fa5b7..bc197f2d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,7 +13,6 @@ use bitcoin::network::constants::Network; use bitcoin::secp256k1::Secp256k1; use bitcoin::BlockHash; use bitcoin_bech32::WitnessProgram; -use lightning::chain; use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; use lightning::chain::keysinterface::{EntropySource, InMemorySigner, KeysManager}; use lightning::chain::{chainmonitor, ChannelMonitorUpdateStatus}; @@ -23,6 +22,7 @@ use lightning::ln::channelmanager; use lightning::ln::channelmanager::{ ChainParameters, ChannelManagerReadArgs, SimpleArcChannelManager, }; +use lightning::ln::msgs::DecodeError; use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler, SimpleArcPeerManager}; use lightning::ln::{PaymentHash, PaymentPreimage, PaymentSecret}; use lightning::onion_message::SimpleArcOnionMessenger; @@ -30,7 +30,8 @@ use lightning::routing::gossip; use lightning::routing::gossip::{NodeId, P2PGossipSync}; use lightning::routing::router::DefaultRouter; use lightning::util::config::UserConfig; -use lightning::util::ser::ReadableArgs; +use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer}; +use lightning::{chain, impl_writeable_tlv_based}; use lightning_background_processor::{process_events_async, GossipSync}; use lightning_block_sync::init; use lightning_block_sync::poll; @@ -52,10 +53,29 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime}; +#[derive(Copy, Clone)] pub(crate) enum HTLCStatus { - Pending, - Succeeded, - Failed, + Pending = 1, + Succeeded = 2, + Failed = 3, +} + +impl Readable for HTLCStatus { + fn read(r: &mut R) -> Result { + let val: u8 = Readable::read(r)?; + Ok(match val { + 1 => HTLCStatus::Pending, + 2 => HTLCStatus::Succeeded, + 3 => HTLCStatus::Failed, + _ => panic!("unexpected value for HTLCStatus"), + }) + } +} + +impl Writeable for HTLCStatus { + fn write(&self, w: &mut W) -> Result<(), std::io::Error> { + (*self as u8).write(w) + } } pub(crate) struct MillisatAmount(Option); @@ -69,6 +89,19 @@ impl fmt::Display for MillisatAmount { } } +impl Readable for MillisatAmount { + fn read(r: &mut R) -> Result { + let amt: Option = Readable::read(r)?; + Ok(MillisatAmount(amt)) + } +} + +impl Writeable for MillisatAmount { + fn write(&self, w: &mut W) -> Result<(), std::io::Error> { + self.0.write(w) + } +} + pub(crate) struct PaymentInfo { preimage: Option, secret: Option, @@ -76,7 +109,20 @@ pub(crate) struct PaymentInfo { amt_msat: MillisatAmount, } -pub(crate) type PaymentInfoStorage = Arc>>; +impl_writeable_tlv_based!(PaymentInfo, { + (0, preimage, option), + (1, secret, option), + (2, status, required), + (3, amt_msat, required), +}); + +pub(crate) struct PaymentInfoStorage { + payments: HashMap, +} + +impl_writeable_tlv_based!(PaymentInfoStorage, { + (0, payments, required), +}); type ChainMonitor = chainmonitor::ChainMonitor< InMemorySigner, @@ -106,8 +152,9 @@ type OnionMessenger = SimpleArcOnionMessenger; async fn handle_ldk_events( channel_manager: &Arc, bitcoind_client: &BitcoindClient, network_graph: &NetworkGraph, keys_manager: &KeysManager, - inbound_payments: &PaymentInfoStorage, outbound_payments: &PaymentInfoStorage, - network: Network, event: Event, + inbound_payments: Arc>, + outbound_payments: Arc>, network: Network, event: Event, + ldk_data_dir: String, ) { match event { Event::FundingGenerationReady { @@ -195,8 +242,8 @@ async fn handle_ldk_events( } PaymentPurpose::SpontaneousPayment(preimage) => (Some(preimage), None), }; - let mut payments = inbound_payments.lock().unwrap(); - match payments.entry(payment_hash) { + let mut inbound = inbound_payments.lock().unwrap(); + match inbound.payments.entry(payment_hash) { Entry::Occupied(mut e) => { let payment = e.get_mut(); payment.status = HTLCStatus::Succeeded; @@ -212,10 +259,11 @@ async fn handle_ldk_events( }); } } + disk::persist_inbound_payments(ldk_data_dir.clone(), &inbound).unwrap(); } Event::PaymentSent { payment_preimage, payment_hash, fee_paid_msat, .. } => { - let mut payments = outbound_payments.lock().unwrap(); - for (hash, payment) in payments.iter_mut() { + let mut outbound = outbound_payments.lock().unwrap(); + for (hash, payment) in outbound.payments.iter_mut() { if *hash == payment_hash { payment.preimage = Some(payment_preimage); payment.status = HTLCStatus::Succeeded; @@ -235,6 +283,7 @@ async fn handle_ldk_events( io::stdout().flush().unwrap(); } } + disk::persist_outbound_payments(ldk_data_dir, &outbound).unwrap(); } Event::OpenChannelRequest { .. } => { // Unreachable, we don't set manually_accept_inbound_channels @@ -252,11 +301,12 @@ async fn handle_ldk_events( print!("> "); io::stdout().flush().unwrap(); - let mut payments = outbound_payments.lock().unwrap(); - if payments.contains_key(&payment_hash) { - let payment = payments.get_mut(&payment_hash).unwrap(); + let mut outbound = outbound_payments.lock().unwrap(); + if outbound.payments.contains_key(&payment_hash) { + let payment = outbound.payments.get_mut(&payment_hash).unwrap(); payment.status = HTLCStatus::Failed; } + disk::persist_outbound_payments(ldk_data_dir, &outbound).unwrap(); } Event::PaymentForwarded { prev_channel_id, @@ -472,7 +522,7 @@ async fn start_ldk() { thread_rng().fill_bytes(&mut key); match File::create(keys_seed_path.clone()) { Ok(mut f) => { - f.write_all(&key).expect("Failed to write node keys seed to disk"); + Write::write_all(&mut f, &key).expect("Failed to write node keys seed to disk"); f.sync_all().expect("Failed to sync node keys seed to disk"); } Err(e) => { @@ -682,9 +732,14 @@ async fn start_ldk() { } }); - // TODO: persist payment info to disk - let inbound_payments: PaymentInfoStorage = Arc::new(Mutex::new(HashMap::new())); - let outbound_payments: PaymentInfoStorage = Arc::new(Mutex::new(HashMap::new())); + let inbound_payments = Arc::new(Mutex::new(disk::read_payment_info(Path::new(&format!( + "{}/inbound_payments", + ldk_data_dir + ))))); + let outbound_payments = Arc::new(Mutex::new(disk::read_payment_info(Path::new(&format!( + "{}/outbound_payments", + ldk_data_dir + ))))); // Step 18: Handle LDK Events let channel_manager_event_listener = Arc::clone(&channel_manager); @@ -694,6 +749,7 @@ async fn start_ldk() { let inbound_payments_event_listener = Arc::clone(&inbound_payments); let outbound_payments_event_listener = Arc::clone(&outbound_payments); let network = args.network; + let ldk_data_dir_copy = ldk_data_dir.clone(); let event_handler = move |event: Event| { let channel_manager_event_listener = Arc::clone(&channel_manager_event_listener); let bitcoind_client_event_listener = Arc::clone(&bitcoind_client_event_listener); @@ -701,16 +757,18 @@ async fn start_ldk() { let keys_manager_event_listener = Arc::clone(&keys_manager_event_listener); let inbound_payments_event_listener = Arc::clone(&inbound_payments_event_listener); let outbound_payments_event_listener = Arc::clone(&outbound_payments_event_listener); + let ldk_data_dir_copy = ldk_data_dir_copy.clone(); async move { handle_ldk_events( &channel_manager_event_listener, &bitcoind_client_event_listener, &network_graph_event_listener, &keys_manager_event_listener, - &inbound_payments_event_listener, - &outbound_payments_event_listener, + inbound_payments_event_listener, + outbound_payments_event_listener, network, event, + ldk_data_dir_copy, ) .await; }