diff --git a/mutiny-core/src/federation.rs b/mutiny-core/src/federation.rs index a3ab9c3fa..fb4be88ea 100644 --- a/mutiny-core/src/federation.rs +++ b/mutiny-core/src/federation.rs @@ -1,13 +1,11 @@ -use crate::utils::{convert_from_fedimint_invoice, convert_to_fedimint_invoice, spawn}; +use crate::utils::{convert_from_fedimint_invoice, convert_to_fedimint_invoice, now, spawn}; use crate::{ error::{MutinyError, MutinyStorageError}, event::PaymentInfo, key::{create_root_child_key, ChildKey}, logging::MutinyLogger, onchain::coin_type_from_network, - storage::{ - get_payment_info, list_payment_info, persist_payment_info, MutinyStorage, VersionedValue, - }, + storage::{list_payment_info, persist_payment_info, MutinyStorage, VersionedValue}, utils::sleep, HTLCStatus, MutinyInvoice, DEFAULT_PAYMENT_TIMEOUT, }; @@ -17,14 +15,12 @@ use bip39::Mnemonic; use bitcoin::secp256k1::{SecretKey, ThirtyTwoByteHash}; use bitcoin::{ bip32::{ChildNumber, DerivationPath, ExtendedPrivKey}, - hashes::sha256, secp256k1::Secp256k1, Network, }; use core::fmt; use fedimint_bip39::Bip39RootSecretStrategy; use fedimint_client::{ - db::ChronologicalOperationLogKey, derivable_secret::DerivableSecret, oplog::{OperationLogEntry, UpdateStreamOrOutcome}, secret::{get_default_client_secret, RootSecretStrategy}, @@ -55,17 +51,19 @@ use fedimint_ln_common::lightning_invoice::{Bolt11InvoiceDescription, Descriptio use fedimint_ln_common::{LightningCommonInit, LightningGateway}; use fedimint_mint_client::MintClientInit; use fedimint_wallet_client::{WalletClientInit, WalletClientModule}; -use futures::future::{self}; +use futures::{select, FutureExt}; use futures_util::{pin_mut, StreamExt}; use hex_conservative::{DisplayHex, FromHex}; -use lightning::{ - ln::PaymentHash, log_debug, log_error, log_info, log_trace, log_warn, util::logger::Logger, -}; +use lightning::{log_debug, log_error, log_info, log_trace, log_warn, util::logger::Logger}; use lightning_invoice::Bolt11Invoice; use serde::{de::DeserializeOwned, Deserialize, Serialize}; #[cfg(not(target_arch = "wasm32"))] use std::time::Instant; -use std::{collections::HashMap, fmt::Debug, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + fmt::Debug, + sync::{atomic::AtomicBool, Arc}, +}; use std::{ str::FromStr, sync::atomic::{AtomicU32, Ordering}, @@ -73,12 +71,6 @@ use std::{ #[cfg(target_arch = "wasm32")] use web_time::Instant; -// The amount of time in milliseconds to wait for -// checking the status of a fedimint payment. This -// is to work around their stream status checking -// when wanting just the current status. -const FEDIMINT_STATUS_TIMEOUT_CHECK_MS: u64 = 30; - // The maximum amount of operations we try to pull // from fedimint when we need to search through // their internal list. @@ -187,6 +179,7 @@ pub(crate) struct FederationClient { #[allow(dead_code)] fedimint_storage: FedimintStorage, gateway: Arc>>, + stop: Arc, pub(crate) logger: Arc, } @@ -198,6 +191,7 @@ impl FederationClient { xprivkey: ExtendedPrivKey, storage: S, network: Network, + stop: Arc, logger: Arc, ) -> Result { log_info!(logger, "initializing a new federation client: {uuid}"); @@ -317,15 +311,97 @@ impl FederationClient { }); log_debug!(logger, "Built fedimint client"); - Ok(FederationClient { + + let federation_client = FederationClient { uuid, fedimint_client, fedimint_storage, storage, logger, invite_code: federation_code, + stop, gateway, - }) + }; + + Ok(federation_client) + } + + pub(crate) async fn process_previous_operations(&self) -> Result<(), MutinyError> { + // look for our internal state pending transactions + let mut pending_invoices: HashSet<[u8; 32]> = HashSet::new(); + + pending_invoices.extend( + list_payment_info(&self.storage, true)? + .into_iter() + .filter(|(_h, i)| matches!(i.status, HTLCStatus::InFlight | HTLCStatus::Pending)) + .map(|(h, _i)| h.0), + ); + + pending_invoices.extend( + list_payment_info(&self.storage, false)? + .into_iter() + .filter(|(_h, i)| matches!(i.status, HTLCStatus::InFlight | HTLCStatus::Pending)) + .map(|(h, _i)| h.0), + ); + + // go through last 100 operations + let operations = self + .fedimint_client + .operation_log() + .list_operations(FEDIMINT_OPERATIONS_LIST_MAX, None) + .await; + + // find all of the pending ones + for (key, entry) in operations { + if entry.operation_module_kind() == LightningCommonInit::KIND.as_str() { + let lightning_meta: LightningOperationMeta = entry.meta(); + match lightning_meta.variant { + LightningOperationMetaVariant::Pay(pay_meta) => { + let hash = pay_meta.invoice.payment_hash().into_inner(); + if pending_invoices.contains(&hash) { + self.subscribe_operation( + entry, + hash, + key.operation_id, + self.fedimint_client.clone(), + ); + } + } + LightningOperationMetaVariant::Receive { invoice, .. } => { + let hash = invoice.payment_hash().into_inner(); + if pending_invoices.contains(&hash) { + self.subscribe_operation( + entry, + hash, + key.operation_id, + self.fedimint_client.clone(), + ); + } + } + LightningOperationMetaVariant::Claim { .. } => {} + } + } + } + + Ok(()) + } + + fn subscribe_operation( + &self, + entry: OperationLogEntry, + hash: [u8; 32], + operation_id: OperationId, + fedimint_client: ClientHandleArc, + ) { + subscribe_operation_ext( + entry, + hash, + operation_id, + fedimint_client, + self.logger.clone(), + self.stop.clone(), + self.storage.clone(), + ); } pub(crate) async fn gateway_fee(&self) -> Result { @@ -346,7 +422,7 @@ impl FederationClient { let desc = Description::new(String::new()).expect("empty string is valid"); let gateway = self.gateway.read().await; - let (_id, invoice, _) = lightning_module + let (id, invoice, _) = lightning_module .create_bolt11_invoice( Amount::from_sats(amount), Bolt11InvoiceDescription::Direct(&desc), @@ -368,198 +444,67 @@ impl FederationClient { persist_payment_info(&self.storage, &hash, &payment_info, inbound)?; log_trace!(self.logger, "Persisted payment"); - Ok(invoice.into()) - } - - /// Get the balance of this federation client in sats - pub(crate) async fn get_balance(&self) -> Result { - Ok(self.fedimint_client.get_balance().await.msats / 1_000) - } - - pub async fn check_activity(&self) -> Result<(), MutinyError> { - log_trace!(self.logger, "Getting activity"); - - let mut pending_invoices = Vec::new(); - // inbound - pending_invoices.extend( - list_payment_info(&self.storage, true)? - .into_iter() - .filter_map(|(h, i)| { - let mutiny_invoice = MutinyInvoice::from(i.clone(), h, true, vec![]).ok(); - - // filter out finalized invoices - mutiny_invoice.filter(|invoice| { - matches!(invoice.status, HTLCStatus::InFlight | HTLCStatus::Pending) - }) - }), - ); - // outbound - pending_invoices.extend( - list_payment_info(&self.storage, false)? - .into_iter() - .filter_map(|(h, i)| { - let mutiny_invoice = MutinyInvoice::from(i.clone(), h, false, vec![]).ok(); - - // filter out finalized invoices - mutiny_invoice.filter(|invoice| { - matches!(invoice.status, HTLCStatus::InFlight | HTLCStatus::Pending) - }) - }), - ); + // subscribe to updates for it + let fedimint_client_clone = self.fedimint_client.clone(); + let logger_clone = self.logger.clone(); + let storage_clone = self.storage.clone(); + let stop = self.stop.clone(); + spawn(async move { + let lightning_module = + Arc::new(fedimint_client_clone.get_first_module::()); - let operations = if !pending_invoices.is_empty() { - log_trace!(self.logger, "pending invoices, going to list operations"); - self.fedimint_client + let operations = fedimint_client_clone .operation_log() - .list_operations(FEDIMINT_OPERATIONS_LIST_MAX, None) + .get_operation(id) .await - } else { - vec![] - }; - - let lightning_module = Arc::new( - self.fedimint_client - .get_first_module::(), - ); - - let mut operation_map: HashMap< - [u8; 32], - (ChronologicalOperationLogKey, OperationLogEntry), - > = HashMap::with_capacity(operations.len()); - log_trace!( - self.logger, - "About to go through {} operations", - operations.len() - ); - for (key, entry) in operations { - if entry.operation_module_kind() == LightningCommonInit::KIND.as_str() { - let lightning_meta: LightningOperationMeta = entry.meta(); - match lightning_meta.variant { - LightningOperationMetaVariant::Pay(pay_meta) => { - let hash = pay_meta.invoice.payment_hash().into_inner(); - operation_map.insert(hash, (key, entry)); + .expect("just created it"); + + if let Some(updated_invoice) = process_operation_until_timeout( + logger_clone.clone(), + operations.meta(), + hash, + id, + &lightning_module, + None, + stop, + ) + .await + { + match maybe_update_after_checking_fedimint( + updated_invoice.clone(), + logger_clone.clone(), + storage_clone, + ) { + Ok(_) => { + log_info!(logger_clone, "updated invoice"); } - LightningOperationMetaVariant::Receive { invoice, .. } => { - let hash = invoice.payment_hash().into_inner(); - operation_map.insert(hash, (key, entry)); + Err(e) => { + log_error!(logger_clone, "could not check update invoice: {e}"); } - LightningOperationMetaVariant::Claim { .. } => {} } } - } + }); - log_trace!( - self.logger, - "Going through {} pending invoices to extract status", - pending_invoices.len() - ); - for invoice in pending_invoices { - let hash = invoice.payment_hash.into_32(); - if let Some((key, entry)) = operation_map.get(&hash) { - if let Some(updated_invoice) = extract_invoice_from_entry( - self.logger.clone(), - entry, - hash, - key.operation_id, - &lightning_module, - ) - .await - { - self.maybe_update_after_checking_fedimint(updated_invoice.clone()) - .await?; - } - } - } + Ok(invoice.into()) + } - Ok(()) + /// Get the balance of this federation client in sats + pub(crate) async fn get_balance(&self) -> Result { + Ok(self.fedimint_client.get_balance().await.msats / 1_000) } - async fn maybe_update_after_checking_fedimint( + fn maybe_update_after_checking_fedimint( &self, updated_invoice: MutinyInvoice, ) -> Result<(), MutinyError> { - if matches!( - updated_invoice.status, - HTLCStatus::Succeeded | HTLCStatus::Failed - ) { - log_debug!(self.logger, "Saving updated payment"); - let hash = updated_invoice.payment_hash.into_32(); - let inbound = updated_invoice.inbound; - let payment_info = PaymentInfo::from(updated_invoice); - persist_payment_info(&self.storage, &hash, &payment_info, inbound)?; - } + maybe_update_after_checking_fedimint( + updated_invoice, + self.logger.clone(), + self.storage.clone(), + )?; Ok(()) } - pub async fn get_invoice_by_hash( - &self, - hash: &sha256::Hash, - ) -> Result { - log_trace!(self.logger, "get_invoice_by_hash"); - - // Try to get the invoice from storage first - let (invoice, inbound) = match get_payment_info(&self.storage, hash, &self.logger) { - Ok(i) => i, - Err(e) => { - log_error!(self.logger, "could not get invoice by hash: {e}"); - return Err(e); - } - }; - - log_trace!(self.logger, "retrieved invoice by hash"); - - if matches!(invoice.status, HTLCStatus::InFlight | HTLCStatus::Pending) { - log_trace!(self.logger, "invoice still in flight, getting operations"); - // If the invoice is InFlight or Pending, check the operation log for updates - let lightning_module = self - .fedimint_client - .get_first_module::(); - - let operations = self - .fedimint_client - .operation_log() - .list_operations(FEDIMINT_OPERATIONS_LIST_MAX, None) - .await; - - log_trace!( - self.logger, - "going to go through {} operations", - operations.len() - ); - for (key, entry) in operations { - if entry.operation_module_kind() == LightningCommonInit::KIND.as_str() { - if let Some(updated_invoice) = extract_invoice_from_entry( - self.logger.clone(), - &entry, - hash.into_32(), - key.operation_id, - &lightning_module, - ) - .await - { - self.maybe_update_after_checking_fedimint(updated_invoice.clone()) - .await?; - return Ok(updated_invoice); - } - } else { - log_warn!( - self.logger, - "Unsupported module: {}", - entry.operation_module_kind() - ); - } - } - } else { - // If the invoice is not InFlight or Pending, return it directly - log_trace!(self.logger, "returning final invoice"); - // TODO labels - return MutinyInvoice::from(invoice, PaymentHash(hash.into_32()), inbound, vec![]); - } - - log_debug!(self.logger, "could not find invoice"); - Err(MutinyError::NotFound) - } - pub(crate) async fn pay_invoice( &self, invoice: Bolt11Invoice, @@ -587,50 +532,77 @@ impl FederationClient { persist_payment_info(&self.storage, &hash, &payment_info, inbound)?; // Subscribe and process outcome based on payment type - let mut inv = match outgoing_payment.payment_type { + let (mut inv, id) = match outgoing_payment.payment_type { fedimint_ln_client::PayType::Internal(pay_id) => { match lightning_module.subscribe_internal_pay(pay_id).await { Ok(o) => { - process_outcome( + let o = process_outcome( o, process_pay_state_internal, invoice.clone(), inbound, - DEFAULT_PAYMENT_TIMEOUT * 1_000, + Some(DEFAULT_PAYMENT_TIMEOUT * 1_000), + self.stop.clone(), Arc::clone(&self.logger), ) - .await + .await; + (o, pay_id) } - Err(_) => invoice.clone().into(), + Err(_) => (invoice.clone().into(), pay_id), } } fedimint_ln_client::PayType::Lightning(pay_id) => { match lightning_module.subscribe_ln_pay(pay_id).await { Ok(o) => { - process_outcome( + let o = process_outcome( o, process_pay_state_ln, invoice.clone(), inbound, - DEFAULT_PAYMENT_TIMEOUT * 1_000, + Some(DEFAULT_PAYMENT_TIMEOUT * 1_000), + self.stop.clone(), Arc::clone(&self.logger), ) - .await + .await; + (o, pay_id) } - Err(_) => invoice.clone().into(), + Err(_) => (invoice.clone().into(), pay_id), } } }; inv.fees_paid = Some(sats_round_up(&outgoing_payment.fee)); - self.maybe_update_after_checking_fedimint(inv.clone()) - .await?; + self.maybe_update_after_checking_fedimint(inv.clone())?; match inv.status { HTLCStatus::Succeeded => Ok(inv), HTLCStatus::Failed => Err(MutinyError::RoutingFailed), - HTLCStatus::Pending => Err(MutinyError::PaymentTimeout), - HTLCStatus::InFlight => Err(MutinyError::PaymentTimeout), + _ => { + // keep streaming after timeout happens + let fedimint_client_clone = self.fedimint_client.clone(); + let logger_clone = self.logger.clone(); + let storage_clone = self.storage.clone(); + let stop = self.stop.clone(); + spawn(async move { + let operation = fedimint_client_clone + .operation_log() + .get_operation(id) + .await + .expect("just created it"); + + subscribe_operation_ext( + operation, + hash, + id, + fedimint_client_clone, + logger_clone, + stop, + storage_clone, + ); + }); + + Err(MutinyError::PaymentTimeout) + } } } @@ -707,6 +679,67 @@ impl FederationClient { } } +fn subscribe_operation_ext( + entry: OperationLogEntry, + hash: [u8; 32], + operation_id: OperationId, + fedimint_client: ClientHandleArc, + logger: Arc, + stop: Arc, + storage: S, +) { + let lightning_meta: LightningOperationMeta = entry.meta(); + spawn(async move { + let lightning_module = + Arc::new(fedimint_client.get_first_module::()); + + if let Some(updated_invoice) = process_operation_until_timeout( + logger.clone(), + lightning_meta, + hash, + operation_id, + &lightning_module, + None, + stop, + ) + .await + { + match maybe_update_after_checking_fedimint( + updated_invoice.clone(), + logger.clone(), + storage, + ) { + Ok(_) => { + log_debug!(logger, "subscribed and updated federation operation") + } + Err(e) => { + log_error!(logger, "could not update federation operation: {e}") + } + } + } + }); +} + +fn maybe_update_after_checking_fedimint( + updated_invoice: MutinyInvoice, + logger: Arc, + storage: S, +) -> Result<(), MutinyError> { + match updated_invoice.status { + HTLCStatus::Succeeded | HTLCStatus::Failed => { + log_debug!(logger, "Saving updated payment"); + let hash = updated_invoice.payment_hash.into_32(); + let inbound = updated_invoice.inbound; + let mut payment_info = PaymentInfo::from(updated_invoice); + payment_info.last_update = now().as_secs(); + persist_payment_info(&storage, &hash, &payment_info, inbound)?; + } + HTLCStatus::Pending | HTLCStatus::InFlight => (), + } + + Ok(()) +} + fn sats_round_up(amount: &Amount) -> u64 { Amount::from_msats(amount.msats + 999).sats_round_down() } @@ -794,15 +827,15 @@ pub(crate) fn mnemonic_from_xpriv(xpriv: ExtendedPrivKey) -> Result, - entry: &OperationLogEntry, + lightning_meta: LightningOperationMeta, hash: [u8; 32], operation_id: OperationId, - lightning_module: &LightningClientModule, + lightning_module: &Arc>, + timeout: Option, + stop: Arc, ) -> Option { - let lightning_meta: LightningOperationMeta = entry.meta(); - match lightning_meta.variant { LightningOperationMetaVariant::Pay(pay_meta) => { let invoice = convert_from_fedimint_invoice(&pay_meta.invoice); @@ -814,12 +847,18 @@ async fn extract_invoice_from_entry( process_pay_state_ln, invoice, false, - FEDIMINT_STATUS_TIMEOUT_CHECK_MS, + timeout, + stop, logger, ) .await, ), - Err(_) => Some(invoice.into()), + Err(e) => { + log_error!(logger, "Error trying to process stream outcome: {e}"); + + // return the latest status of the invoice even if it fails + Some(invoice.into()) + } } } else { None @@ -835,12 +874,18 @@ async fn extract_invoice_from_entry( process_receive_state, invoice, true, - FEDIMINT_STATUS_TIMEOUT_CHECK_MS, + timeout, + stop, logger, ) .await, ), - Err(_) => Some(invoice.into()), + Err(e) => { + log_error!(logger, "Error trying to process stream outcome: {e}"); + + // return the latest status of the invoice even if it fails + Some(invoice.into()) + } } } else { None @@ -879,7 +924,8 @@ async fn process_outcome( process_fn: F, invoice: Bolt11Invoice, inbound: bool, - timeout: u64, + timeout: Option, + stop: Arc, logger: Arc, ) -> MutinyInvoice where @@ -902,28 +948,45 @@ where log_trace!(logger, "Outcome received: {}", invoice.status); } UpdateStreamOrOutcome::UpdateStream(mut s) => { - let timeout_future = sleep(timeout as i32); - pin_mut!(timeout_future); - + // break out after sleep time or check stop signal log_trace!(logger, "start timeout stream futures"); - while let future::Either::Left((outcome_option, _)) = - future::select(s.next(), &mut timeout_future).await - { - if let Some(outcome) = outcome_option { - log_trace!(logger, "Streamed Outcome received: {:?}", outcome); - process_fn(outcome, &mut invoice); - - if matches!(invoice.status, HTLCStatus::Succeeded | HTLCStatus::Failed) { - log_trace!(logger, "Streamed Outcome final, returning"); - break; - } + loop { + let timeout_future = if let Some(t) = timeout { + sleep(t as i32) } else { - log_debug!( - logger, - "Timeout reached, exiting loop for payment {}", - invoice.payment_hash - ); - break; + sleep(1_000_i32) + }; + + let mut stream_fut = Box::pin(s.next()).fuse(); + let delay_fut = Box::pin(timeout_future).fuse(); + pin_mut!(delay_fut); + + select! { + outcome_option = stream_fut => { + if let Some(outcome) = outcome_option { + log_trace!(logger, "Streamed Outcome received: {:?}", outcome); + process_fn(outcome, &mut invoice); + + if matches!(invoice.status, HTLCStatus::Succeeded | HTLCStatus::Failed) { + log_trace!(logger, "Streamed Outcome final, returning"); + break; + } + } + } + _ = delay_fut => { + if timeout.is_none() { + if stop.load(Ordering::Relaxed) { + break; + } + } else { + log_debug!( + logger, + "Timeout reached, exiting loop for payment {}", + invoice.payment_hash + ); + break; + } + } } } log_trace!( diff --git a/mutiny-core/src/lib.rs b/mutiny-core/src/lib.rs index 104096b06..54155f6b5 100644 --- a/mutiny-core/src/lib.rs +++ b/mutiny-core/src/lib.rs @@ -46,7 +46,6 @@ mod test_utils; pub use crate::gossip::{GOSSIP_SYNC_TIME_KEY, NETWORK_GRAPH_KEY, PROB_SCORER_KEY}; pub use crate::keymanager::generate_seed; pub use crate::ldkstorage::{CHANNEL_CLOSURE_PREFIX, CHANNEL_MANAGER_KEY, MONITORS_PREFIX_KEY}; -use crate::nostr::primal::{PrimalApi, PrimalClient}; use crate::storage::{ get_payment_hash_from_key, list_payment_info, persist_payment_info, update_nostr_contact_list, IndexItem, MutinyStorage, DEVICE_ID_KEY, EXPECTED_NETWORK_KEY, NEED_FULL_SYNC_KEY, @@ -79,6 +78,10 @@ use crate::{ nostr::nwc::{BudgetPeriod, BudgetedSpendingConditions, NwcProfileTag, SpendingConditions}, subscription::MutinySubscriptionClient, }; +use crate::{ + nostr::primal::{PrimalApi, PrimalClient}, + storage::get_invoice_by_hash, +}; use crate::{nostr::NostrManager, utils::sleep}; use ::nostr::nips::nip47::Method; use ::nostr::nips::nip57; @@ -919,6 +922,7 @@ impl MutinyWalletBuilder { federation_storage.clone(), &config, self.storage.clone(), + stop.clone(), &logger, ) .await?; @@ -1967,20 +1971,7 @@ impl MutinyWallet { &self, hash: &sha256::Hash, ) -> Result { - // First, try to find the invoice in the node manager - if let Ok(invoice) = self.node_manager.get_invoice_by_hash(hash).await { - return Ok(invoice); - } - - // If not found in node manager, search in federations - let federations = self.federations.read().await; - for (_fed_id, federation) in federations.iter() { - if let Ok(invoice) = federation.get_invoice_by_hash(hash).await { - return Ok(invoice); - } - } - - Err(MutinyError::NotFound) + get_invoice_by_hash(hash, &self.storage, &self.logger) } /// Checks whether or not the user is subscribed to Mutiny+. @@ -2468,6 +2459,7 @@ impl MutinyWallet { self.federations.clone(), self.hermes_client.clone(), federation_code, + self.stop.clone(), ) .await } @@ -2570,39 +2562,37 @@ impl MutinyWallet { /// Starts a background process that will check pending fedimint operations pub(crate) async fn start_fedimint_background_checker(&self) { let logger = self.logger.clone(); - let stop = self.stop.clone(); let self_clone = self.clone(); utils::spawn(async move { - loop { - if stop.load(Ordering::Relaxed) { - break; - }; - - sleep(1000).await; - let federation_lock = self_clone.federations.read().await; - - match self_clone.list_federation_ids().await { - Ok(federation_ids) => { - for fed_id in federation_ids { - match federation_lock.get(&fed_id) { - Some(fedimint_client) => { - let _ = fedimint_client.check_activity().await.map_err(|e| { - log_error!(logger, "error checking activity: {e}") + let federation_lock = self_clone.federations.read().await; + + match self_clone.list_federation_ids().await { + Ok(federation_ids) => { + for fed_id in federation_ids { + match federation_lock.get(&fed_id) { + Some(fedimint_client) => { + let _ = fedimint_client + .process_previous_operations() + .await + .map_err(|e| { + log_error!( + logger, + "error checking previous operations: {e}" + ) }); - } - None => { - log_error!( - logger, - "could not get a federation from the lock: {}", - fed_id - ) - } + } + None => { + log_error!( + logger, + "could not get a federation from the lock: {}", + fed_id + ) } } } - Err(e) => { - log_error!(logger, "could not list federations: {e}") - } + } + Err(e) => { + log_error!(logger, "could not list federations: {e}") } } }); @@ -3125,6 +3115,7 @@ async fn create_federations( federation_storage: FederationStorage, c: &MutinyWalletConfig, storage: S, + stop: Arc, logger: &Arc, ) -> Result>>>>, MutinyError> { let mut federation_map = HashMap::with_capacity(federation_storage.federations.len()); @@ -3135,6 +3126,7 @@ async fn create_federations( c.xprivkey, storage.clone(), c.network, + stop.clone(), logger.clone(), ) .await?; @@ -3158,6 +3150,7 @@ pub(crate) async fn create_new_federation( federations: Arc>>>>, hermes_client: Option>>, federation_code: InviteCode, + stop: Arc, ) -> Result { // Begin with a mutex lock so that nothing else can // save or alter the federation list while it is about to @@ -3186,6 +3179,7 @@ pub(crate) async fn create_new_federation( xprivkey, storage.clone(), network, + stop.clone(), logger.clone(), ) .await?; diff --git a/mutiny-core/src/node.rs b/mutiny-core/src/node.rs index b4404e6b9..7e641aeff 100644 --- a/mutiny-core/src/node.rs +++ b/mutiny-core/src/node.rs @@ -1336,23 +1336,6 @@ impl Node { Ok(()) } - pub fn get_invoice_by_hash(&self, payment_hash: &Sha256) -> Result { - let (payment_info, inbound) = self.get_payment_info_from_persisters(payment_hash)?; - let labels_map = self.persister.storage.get_invoice_labels()?; - let labels = payment_info - .bolt11 - .as_ref() - .and_then(|inv| labels_map.get(inv).cloned()) - .unwrap_or_default(); - - MutinyInvoice::from( - payment_info, - PaymentHash(payment_hash.into_32()), - inbound, - labels, - ) - } - /// Gets all the closed channels for this node pub fn get_channel_closure( &self, @@ -1366,32 +1349,6 @@ impl Node { self.persister.list_channel_closures() } - pub fn get_payment_info_from_persisters( - &self, - payment_hash: &bitcoin::hashes::sha256::Hash, - ) -> Result<(PaymentInfo, bool), MutinyError> { - // try inbound first - if let Some(payment_info) = read_payment_info( - &self.persister.storage, - &payment_hash.into_32(), - true, - &self.logger, - ) { - return Ok((payment_info, true)); - } - - // if no inbound check outbound - match read_payment_info( - &self.persister.storage, - &payment_hash.into_32(), - false, - &self.logger, - ) { - Some(payment_info) => Ok((payment_info, false)), - None => Err(MutinyError::NotFound), - } - } - fn retry_strategy() -> Retry { Retry::Attempts(15) } @@ -2431,6 +2388,7 @@ pub(crate) fn default_user_config(accept_underpaying_htlcs: bool) -> UserConfig #[cfg(not(target_arch = "wasm32"))] mod tests { use super::*; + use crate::get_invoice_by_hash; use crate::node::{map_sending_failure, parse_peer_info}; use crate::storage::MemoryStorage; use crate::test_utils::*; @@ -2589,6 +2547,7 @@ mod tests { async fn test_create_invoice() { let storage = MemoryStorage::default(); let node = create_node(storage.clone()).await; + let logger = Arc::new(MutinyLogger::default()); let now = crate::utils::now().as_secs(); @@ -2607,8 +2566,8 @@ mod tests { _ => panic!("unexpected invoice description"), } - let from_storage = node.get_invoice_by_hash(invoice.payment_hash()).unwrap(); - let by_hash = node.get_invoice_by_hash(invoice.payment_hash()).unwrap(); + let from_storage = get_invoice_by_hash(invoice.payment_hash(), &storage, &logger).unwrap(); + let by_hash = get_invoice_by_hash(invoice.payment_hash(), &storage, &logger).unwrap(); assert_eq!(from_storage, by_hash); assert_eq!(from_storage.bolt11, Some(invoice.clone())); @@ -2723,16 +2682,20 @@ mod tests { #[cfg(test)] #[cfg(target_arch = "wasm32")] mod wasm_test { - use crate::event::{MillisatAmount, PaymentInfo}; - use crate::labels::LabelStorage; use crate::storage::MemoryStorage; use crate::test_utils::create_node; use crate::{error::MutinyError, storage::persist_payment_info}; + use crate::{ + event::{MillisatAmount, PaymentInfo}, + storage::get_invoice_by_hash, + }; + use crate::{labels::LabelStorage, logging::MutinyLogger}; use crate::{HTLCStatus, PrivacyLevel}; use itertools::Itertools; use lightning::ln::channelmanager::PaymentId; use lightning::ln::PaymentHash; use lightning_invoice::Bolt11InvoiceDescription; + use std::sync::Arc; use wasm_bindgen_test::{wasm_bindgen_test as test, wasm_bindgen_test_configure}; wasm_bindgen_test_configure!(run_in_browser); @@ -2748,6 +2711,7 @@ mod wasm_test { async fn test_create_invoice() { let storage = MemoryStorage::default(); let node = create_node(storage.clone()).await; + let logger = Arc::new(MutinyLogger::default()); let now = crate::utils::now().as_secs(); @@ -2768,8 +2732,8 @@ mod wasm_test { _ => panic!("unexpected invoice description"), } - let from_storage = node.get_invoice_by_hash(invoice.payment_hash()).unwrap(); - let by_hash = node.get_invoice_by_hash(invoice.payment_hash()).unwrap(); + let from_storage = get_invoice_by_hash(invoice.payment_hash(), &storage, &logger).unwrap(); + let by_hash = get_invoice_by_hash(invoice.payment_hash(), &storage, &logger).unwrap(); assert_eq!(from_storage, by_hash); assert_eq!(from_storage.bolt11, Some(invoice.clone())); diff --git a/mutiny-core/src/nodemanager.rs b/mutiny-core/src/nodemanager.rs index d723c1889..767c8a408 100644 --- a/mutiny-core/src/nodemanager.rs +++ b/mutiny-core/src/nodemanager.rs @@ -31,7 +31,6 @@ use bitcoin::address::NetworkUnchecked; use bitcoin::bip32::ExtendedPrivKey; use bitcoin::blockdata::script; use bitcoin::hashes::hex::FromHex; -use bitcoin::hashes::sha256; use bitcoin::psbt::PartiallySignedTransaction; use bitcoin::secp256k1::PublicKey; use bitcoin::{Address, Network, OutPoint, Transaction, Txid}; @@ -1415,22 +1414,6 @@ impl NodeManager { .await } - /// Gets an invoice from the node manager. - /// This includes sent and received invoices. - pub(crate) async fn get_invoice_by_hash( - &self, - hash: &sha256::Hash, - ) -> Result { - let nodes = self.nodes.read().await; - for (_, node) in nodes.iter() { - if let Ok(inv) = node.get_invoice_by_hash(hash) { - return Ok(inv); - } - } - - Err(MutinyError::NotFound) - } - pub async fn get_channel_closure( &self, user_channel_id: u128, diff --git a/mutiny-core/src/storage.rs b/mutiny-core/src/storage.rs index b3ea94573..1884ee4cf 100644 --- a/mutiny-core/src/storage.rs +++ b/mutiny-core/src/storage.rs @@ -1,4 +1,4 @@ -use crate::event::HTLCStatus; +use crate::labels::LabelStorage; use crate::nodemanager::{ChannelClosure, NodeStorage}; use crate::utils::{now, spawn}; use crate::vss::{MutinyVssClient, VssKeyValueItem}; @@ -12,6 +12,7 @@ use crate::{ error::{MutinyError, MutinyStorageError}, event::PaymentInfo, }; +use crate::{event::HTLCStatus, MutinyInvoice}; use crate::{ldkstorage::CHANNEL_MANAGER_KEY, utils::sleep}; use async_trait::async_trait; use bdk::chain::{Append, PersistBackend}; @@ -934,6 +935,22 @@ pub(crate) fn persist_payment_info( Ok(()) } +pub(crate) fn get_invoice_by_hash( + hash: &bitcoin::hashes::sha256::Hash, + storage: &S, + logger: &MutinyLogger, +) -> Result { + let (payment_info, inbound) = get_payment_info(storage, hash, logger)?; + let labels_map = storage.get_invoice_labels()?; + let labels = payment_info + .bolt11 + .as_ref() + .and_then(|inv| labels_map.get(inv).cloned()) + .unwrap_or_default(); + + MutinyInvoice::from(payment_info, PaymentHash(hash.into_32()), inbound, labels) +} + pub(crate) fn get_payment_info( storage: &S, payment_hash: &bitcoin::hashes::sha256::Hash,