diff --git a/mutiny-core/src/federation.rs b/mutiny-core/src/federation.rs index a3ab9c3fa..78adc55b8 100644 --- a/mutiny-core/src/federation.rs +++ b/mutiny-core/src/federation.rs @@ -55,7 +55,10 @@ use fedimint_ln_common::lightning_invoice::{Bolt11InvoiceDescription, Descriptio use fedimint_ln_common::{LightningCommonInit, LightningGateway}; use fedimint_mint_client::MintClientInit; use fedimint_wallet_client::{WalletClientInit, WalletClientModule}; -use futures::future::{self}; +use futures::{ + future::{self, Either}, + select, FutureExt, +}; use futures_util::{pin_mut, StreamExt}; use hex_conservative::{DisplayHex, FromHex}; use lightning::{ @@ -317,7 +320,8 @@ impl FederationClient { }); log_debug!(logger, "Built fedimint client"); - Ok(FederationClient { + + let federation_client = FederationClient { uuid, fedimint_client, fedimint_storage, @@ -325,7 +329,84 @@ impl FederationClient { logger, invite_code: federation_code, gateway, - }) + }; + + federation_client.process_previous_operations().await; + + Ok(federation_client) + } + + pub(crate) async fn process_previous_operations(&self) { + // TODO loop? + + // go through last 100 operations + let operations = self + .fedimint_client + .operation_log() + .list_operations(FEDIMINT_OPERATIONS_LIST_MAX, None) + .await; + + // find all of the pending ones + for (key, entry) in operations { + if entry.operation_module_kind() == LightningCommonInit::KIND.as_str() { + let lightning_meta: LightningOperationMeta = entry.meta(); + match lightning_meta.variant { + LightningOperationMetaVariant::Pay(pay_meta) => { + let hash = pay_meta.invoice.payment_hash().into_inner(); + self.subscribe_operation(entry, hash, key, self.fedimint_client.clone()) + .await; + } + LightningOperationMetaVariant::Receive { invoice, .. } => { + let hash = invoice.payment_hash().into_inner(); + self.subscribe_operation(entry, hash, key, self.fedimint_client.clone()) + .await; + } + LightningOperationMetaVariant::Claim { .. } => {} + } + } + } + + // TODO also look for our internal state pending transactions + } + + async fn subscribe_operation( + &self, + entry: OperationLogEntry, + hash: [u8; 32], + key: ChronologicalOperationLogKey, + fedimint_client: ClientHandleArc, + ) { + let logger = self.logger.clone(); + let storage = self.storage.clone(); + let lightning_meta: LightningOperationMeta = entry.meta(); + + spawn(async move { + let lightning_module = + Arc::new(fedimint_client.get_first_module::()); + + if let Some(updated_invoice) = process_operation_until_finished( + logger.clone(), + lightning_meta, + hash, + key.operation_id, + &lightning_module, + ) + .await + { + match maybe_update_after_checking_fedimint( + updated_invoice.clone(), + logger.clone(), + storage, + ) { + Ok(_) => { + log_debug!(logger, "subscribed and updated federation operation") + } + Err(e) => { + log_error!(logger, "could not update federation operation: {e}") + } + } + } + }); } pub(crate) async fn gateway_fee(&self) -> Result { @@ -346,7 +427,7 @@ impl FederationClient { let desc = Description::new(String::new()).expect("empty string is valid"); let gateway = self.gateway.read().await; - let (_id, invoice, _) = lightning_module + let (id, invoice, _) = lightning_module .create_bolt11_invoice( Amount::from_sats(amount), Bolt11InvoiceDescription::Direct(&desc), @@ -368,6 +449,44 @@ impl FederationClient { persist_payment_info(&self.storage, &hash, &payment_info, inbound)?; log_trace!(self.logger, "Persisted payment"); + // subscribe to updates for it + let fedimint_client_clone = self.fedimint_client.clone(); + let logger_clone = self.logger.clone(); + let storage_clone = self.storage.clone(); + spawn(async move { + let lightning_module = + Arc::new(fedimint_client_clone.get_first_module::()); + + let operations = fedimint_client_clone + .operation_log() + .get_operation(id) + .await + .expect("just created it"); + + if let Some(updated_invoice) = process_operation_until_finished( + logger_clone.clone(), + operations.meta(), + hash, + id, + &lightning_module, + ) + .await + { + match maybe_update_after_checking_fedimint( + updated_invoice.clone(), + logger_clone.clone(), + storage_clone, + ) { + Ok(_) => { + log_info!(logger_clone, "updated invoice"); + } + Err(e) => { + log_error!(logger_clone, "could not check update invoice: {e}"); + } + } + } + }); + Ok(invoice.into()) } @@ -465,8 +584,7 @@ impl FederationClient { ) .await { - self.maybe_update_after_checking_fedimint(updated_invoice.clone()) - .await?; + self.maybe_update_after_checking_fedimint(updated_invoice.clone())?; } } } @@ -474,20 +592,15 @@ impl FederationClient { Ok(()) } - async fn maybe_update_after_checking_fedimint( + fn maybe_update_after_checking_fedimint( &self, updated_invoice: MutinyInvoice, ) -> Result<(), MutinyError> { - if matches!( - updated_invoice.status, - HTLCStatus::Succeeded | HTLCStatus::Failed - ) { - log_debug!(self.logger, "Saving updated payment"); - let hash = updated_invoice.payment_hash.into_32(); - let inbound = updated_invoice.inbound; - let payment_info = PaymentInfo::from(updated_invoice); - persist_payment_info(&self.storage, &hash, &payment_info, inbound)?; - } + maybe_update_after_checking_fedimint( + updated_invoice, + self.logger.clone(), + self.storage.clone(), + )?; Ok(()) } @@ -537,8 +650,7 @@ impl FederationClient { ) .await { - self.maybe_update_after_checking_fedimint(updated_invoice.clone()) - .await?; + self.maybe_update_after_checking_fedimint(updated_invoice.clone())?; return Ok(updated_invoice); } } else { @@ -596,7 +708,7 @@ impl FederationClient { process_pay_state_internal, invoice.clone(), inbound, - DEFAULT_PAYMENT_TIMEOUT * 1_000, + Some(DEFAULT_PAYMENT_TIMEOUT * 1_000), Arc::clone(&self.logger), ) .await @@ -612,7 +724,7 @@ impl FederationClient { process_pay_state_ln, invoice.clone(), inbound, - DEFAULT_PAYMENT_TIMEOUT * 1_000, + Some(DEFAULT_PAYMENT_TIMEOUT * 1_000), Arc::clone(&self.logger), ) .await @@ -623,14 +735,15 @@ impl FederationClient { }; inv.fees_paid = Some(sats_round_up(&outgoing_payment.fee)); - self.maybe_update_after_checking_fedimint(inv.clone()) - .await?; + self.maybe_update_after_checking_fedimint(inv.clone())?; match inv.status { HTLCStatus::Succeeded => Ok(inv), HTLCStatus::Failed => Err(MutinyError::RoutingFailed), - HTLCStatus::Pending => Err(MutinyError::PaymentTimeout), - HTLCStatus::InFlight => Err(MutinyError::PaymentTimeout), + _ => { + // TODO keep streaming after timeout happens + Err(MutinyError::PaymentTimeout) + } } } @@ -707,6 +820,25 @@ impl FederationClient { } } +fn maybe_update_after_checking_fedimint( + updated_invoice: MutinyInvoice, + logger: Arc, + storage: S, +) -> Result<(), MutinyError> { + Ok( + if matches!( + updated_invoice.status, + HTLCStatus::Succeeded | HTLCStatus::Failed + ) { + log_debug!(logger, "Saving updated payment"); + let hash = updated_invoice.payment_hash.into_32(); + let inbound = updated_invoice.inbound; + let payment_info = PaymentInfo::from(updated_invoice); + persist_payment_info(&storage, &hash, &payment_info, inbound)?; + }, + ) +} + fn sats_round_up(amount: &Amount) -> u64 { Amount::from_msats(amount.msats + 999).sats_round_down() } @@ -794,6 +926,46 @@ pub(crate) fn mnemonic_from_xpriv(xpriv: ExtendedPrivKey) -> Result, + lightning_meta: LightningOperationMeta, + hash: [u8; 32], + operation_id: OperationId, + lightning_module: &Arc>, +) -> Option { + match lightning_meta.variant { + LightningOperationMetaVariant::Pay(pay_meta) => { + let invoice = convert_from_fedimint_invoice(&pay_meta.invoice); + if invoice.payment_hash().into_32() == hash { + match lightning_module.subscribe_ln_pay(operation_id).await { + Ok(o) => Some( + process_outcome(o, process_pay_state_ln, invoice, false, None, logger) + .await, + ), + Err(_) => Some(invoice.into()), + } + } else { + None + } + } + LightningOperationMetaVariant::Receive { invoice, .. } => { + let invoice = convert_from_fedimint_invoice(&invoice); + if invoice.payment_hash().into_32() == hash { + match lightning_module.subscribe_ln_receive(operation_id).await { + Ok(o) => Some( + process_outcome(o, process_receive_state, invoice, true, None, logger) + .await, + ), + Err(_) => Some(invoice.into()), + } + } else { + None + } + } + LightningOperationMetaVariant::Claim { .. } => None, + } +} + async fn extract_invoice_from_entry( logger: Arc, entry: &OperationLogEntry, @@ -814,7 +986,7 @@ async fn extract_invoice_from_entry( process_pay_state_ln, invoice, false, - FEDIMINT_STATUS_TIMEOUT_CHECK_MS, + Some(FEDIMINT_STATUS_TIMEOUT_CHECK_MS), logger, ) .await, @@ -835,7 +1007,7 @@ async fn extract_invoice_from_entry( process_receive_state, invoice, true, - FEDIMINT_STATUS_TIMEOUT_CHECK_MS, + Some(FEDIMINT_STATUS_TIMEOUT_CHECK_MS), logger, ) .await, @@ -879,7 +1051,7 @@ async fn process_outcome( process_fn: F, invoice: Bolt11Invoice, inbound: bool, - timeout: u64, + timeout: Option, logger: Arc, ) -> MutinyInvoice where @@ -902,13 +1074,21 @@ where log_trace!(logger, "Outcome received: {}", invoice.status); } UpdateStreamOrOutcome::UpdateStream(mut s) => { - let timeout_future = sleep(timeout as i32); - pin_mut!(timeout_future); - + // break out after sleep time or check stop signal log_trace!(logger, "start timeout stream futures"); - while let future::Either::Left((outcome_option, _)) = - future::select(s.next(), &mut timeout_future).await - { + loop { + let timeout_future = if let Some(t) = timeout { + sleep(t as i32) + } else { + sleep(1_000 as i32) + }; + + let mut stream_fut = Box::pin(s.next()).fuse(); + let delay_fut = Box::pin(timeout_future).fuse(); + pin_mut!(delay_fut); + + select! { + outcome_option = stream_fut => { if let Some(outcome) = outcome_option { log_trace!(logger, "Streamed Outcome received: {:?}", outcome); process_fn(outcome, &mut invoice); @@ -917,13 +1097,25 @@ where log_trace!(logger, "Streamed Outcome final, returning"); break; } - } else { - log_debug!( - logger, - "Timeout reached, exiting loop for payment {}", - invoice.payment_hash - ); - break; + } + } + _ = delay_fut => { + if timeout.is_none() { + // TODO check sleep time to determine break + log_debug!( + logger, + "Still waiting for stream to finish {}", + invoice.payment_hash + ); + } else { + log_debug!( + logger, + "Timeout reached, exiting loop for payment {}", + invoice.payment_hash + ); + break; + } + } } } log_trace!( diff --git a/mutiny-core/src/lib.rs b/mutiny-core/src/lib.rs index 104096b06..de34314a7 100644 --- a/mutiny-core/src/lib.rs +++ b/mutiny-core/src/lib.rs @@ -1121,9 +1121,6 @@ impl MutinyWalletBuilder { // start the nostr background process mw.start_nostr().await; - // start the federation background processor - mw.start_fedimint_background_checker().await; - // start the blind auth fetching process mw.check_blind_tokens(); @@ -2567,47 +2564,6 @@ impl MutinyWallet { Ok(FederationBalances { balances }) } - /// Starts a background process that will check pending fedimint operations - pub(crate) async fn start_fedimint_background_checker(&self) { - let logger = self.logger.clone(); - let stop = self.stop.clone(); - let self_clone = self.clone(); - utils::spawn(async move { - loop { - if stop.load(Ordering::Relaxed) { - break; - }; - - sleep(1000).await; - let federation_lock = self_clone.federations.read().await; - - match self_clone.list_federation_ids().await { - Ok(federation_ids) => { - for fed_id in federation_ids { - match federation_lock.get(&fed_id) { - Some(fedimint_client) => { - let _ = fedimint_client.check_activity().await.map_err(|e| { - log_error!(logger, "error checking activity: {e}") - }); - } - None => { - log_error!( - logger, - "could not get a federation from the lock: {}", - fed_id - ) - } - } - } - } - Err(e) => { - log_error!(logger, "could not list federations: {e}") - } - } - } - }); - } - /// Calls upon a LNURL to get the parameters for it. /// This contains what kind of LNURL it is (pay, withdrawal, auth, etc). // todo revamp LnUrlParams to be well designed