Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use alternative retry method for async storage #797

Merged
merged 1 commit into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 34 additions & 72 deletions mutiny-core/src/ldkstorage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use crate::nodemanager::ChannelClosure;
use crate::storage::{MutinyStorage, VersionedValue};
use crate::utils;
use crate::utils::{sleep, spawn};
use crate::vss::VssKeyValueItem;
use crate::{chain::MutinyChain, scorer::HubPreferentialScorer};
use anyhow::anyhow;
use bitcoin::hashes::hex::{FromHex, ToHex};
Expand Down Expand Up @@ -93,6 +92,15 @@ impl<S: MutinyStorage> MutinyNodePersister<S> {
format!("{}_{}", key, self.node_id)
}

pub(crate) fn get_monitor_key(&self, funding_txo: &OutPoint) -> String {
let key = format!(
"{MONITORS_PREFIX_KEY}{}_{}",
funding_txo.txid.to_hex(),
funding_txo.index
);
self.get_key(&key)
}

fn init_persist_monitor<W: Writeable>(
&self,
key: String,
Expand All @@ -105,47 +113,30 @@ impl<S: MutinyStorage> MutinyNodePersister<S> {
let logger = self.logger.clone();
let object = object.encode();

// currently we only retry storage to VSS because we don't have a way to detect
// for local storage if a higher version has been persisted. Without handling this
// we could end up with a previous state being persisted over a newer one.
// VSS does not have this problem because it verifies the version before storing
// and will not overwrite a newer version, so it is safe to retry.
spawn(async move {
let mut is_retry = false;
// Sleep before persisting to give chance for the manager to be persisted
sleep(50).await;
loop {
match persist_monitor(&storage, &key, &object, Some(version), is_retry, &logger)
.await
{
Ok(()) => {
log_debug!(logger, "Persisted channel monitor: {update_id:?}");

// unwrap is safe, we set it up immediately
let chain_monitor = chain_monitor.lock().await;
let chain_monitor = chain_monitor.as_ref().unwrap();

// these errors are not fatal, so we don't return them just log
if let Err(e) = chain_monitor.channel_monitor_updated(
update_id.funding_txo,
update_id.monitor_update_id,
) {
log_error!(
logger,
"Error notifying chain monitor of channel monitor update: {e:?}"
);
} else {
break; // successful storage, no more attempts
}
}
Err(e) => {
log_error!(logger, "Error persisting channel monitor: {e}");
match persist_monitor(&storage, &key, &object, Some(version), &logger).await {
Ok(()) => {
log_debug!(logger, "Persisted channel monitor: {update_id:?}");

// unwrap is safe, we set it up immediately
let chain_monitor = chain_monitor.lock().await;
let chain_monitor = chain_monitor.as_ref().unwrap();

// these errors are not fatal, so we don't return them just log
if let Err(e) = chain_monitor
.channel_monitor_updated(update_id.funding_txo, update_id.monitor_update_id)
{
log_error!(
logger,
"Error notifying chain monitor of channel monitor update: {e:?}"
);
}
}

// if we get here, we failed to persist, so we retry
is_retry = true;
sleep(1_000).await;
Err(e) => {
log_error!(logger, "Error persisting channel monitor: {e}");
}
}
});

Expand Down Expand Up @@ -675,15 +666,10 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner, S: MutinyStorage> Persist<Chann
monitor: &ChannelMonitor<ChannelSigner>,
monitor_update_id: MonitorUpdateId,
) -> ChannelMonitorUpdateStatus {
let key = format!(
"{MONITORS_PREFIX_KEY}{}_{}",
funding_txo.txid.to_hex(),
funding_txo.index
);
let key = self.get_key(&key);
let key = self.get_monitor_key(&funding_txo);

let update_id = monitor.get_latest_update_id();
debug_assert!(update_id == utils::get_monitor_version(monitor.encode()));
debug_assert!(update_id == utils::get_monitor_version(&monitor.encode()));

// safely convert u64 to u32
let version = if update_id >= u32::MAX as u64 {
Expand All @@ -707,14 +693,9 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner, S: MutinyStorage> Persist<Chann
monitor: &ChannelMonitor<ChannelSigner>,
monitor_update_id: MonitorUpdateId,
) -> ChannelMonitorUpdateStatus {
let key = format!(
"{MONITORS_PREFIX_KEY}{}_{}",
funding_txo.txid.to_hex(),
funding_txo.index
);
let key = self.get_key(&key);
let key = self.get_monitor_key(&funding_txo);
let update_id = monitor.get_latest_update_id();
debug_assert!(update_id == utils::get_monitor_version(monitor.encode()));
debug_assert!(update_id == utils::get_monitor_version(&monitor.encode()));

// safely convert u64 to u32
let version = if update_id >= u32::MAX as u64 {
Expand All @@ -738,33 +719,14 @@ pub struct MonitorUpdateIdentifier {
pub monitor_update_id: MonitorUpdateId,
}

async fn persist_monitor(
pub(crate) async fn persist_monitor(
storage: &impl MutinyStorage,
key: &str,
object: &Vec<u8>,
version: Option<u32>,
vss_only: bool,
logger: &MutinyLogger,
) -> Result<(), lightning::io::Error> {
let res = if vss_only {
// if we are only storing to VSS, we don't need to store to local storage
// just need to call put_objects on VSS
if let (Some(vss), Some(version)) = (storage.vss_client(), version) {
let value =
serde_json::to_value(object).map_err(|_| lightning::io::ErrorKind::Other)?;
let item = VssKeyValueItem {
key: key.to_string(),
value,
version,
};

vss.put_objects(vec![item]).await
} else {
Ok(())
}
} else {
storage.set_data_async(key, object, version).await
};
let res = storage.set_data_async(key, object, version).await;

res.map_err(|e| {
match e {
Expand Down
77 changes: 76 additions & 1 deletion mutiny-core/src/node.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::labels::LabelStorage;
use crate::ldkstorage::ChannelOpenParams;
use crate::ldkstorage::{persist_monitor, ChannelOpenParams};
use crate::nodemanager::ChannelClosure;
use crate::scb::StaticChannelBackup;
use crate::{
Expand Down Expand Up @@ -39,6 +39,7 @@ use lightning::{
};

use crate::multiesplora::MultiEsploraClient;
use crate::utils::get_monitor_version;
use bitcoin::util::bip32::ExtendedPrivKey;
use lightning::events::bump_transaction::{BumpTransactionEventHandler, Wallet};
use lightning::ln::PaymentSecret;
Expand Down Expand Up @@ -548,6 +549,80 @@ impl<S: MutinyStorage> Node<S> {
keys_manager.get_node_id(Recipient::Node).unwrap()
);

// Here we re-attempt to persist any monitors that failed to persist previously.
let retry_logger = logger.clone();
let retry_persister = persister.clone();
let retry_stop = stop.clone();
let retry_chain_monitor = chain_monitor.clone();
utils::spawn(async move {
loop {
if retry_stop.load(Ordering::Relaxed) {
break;
}

let updates = retry_chain_monitor.list_pending_monitor_updates();
for (funding_txo, update_ids) in updates {
// if there are no updates, skip
if update_ids.is_empty() {
continue;
}

log_debug!(
retry_logger,
"Retrying to persist monitor for outpoint: {funding_txo:?}"
);

match retry_chain_monitor.get_monitor(funding_txo) {
Ok(monitor) => {
let key = retry_persister.get_monitor_key(&funding_txo);
let object = monitor.encode();
let update_id = monitor.get_latest_update_id();
debug_assert_eq!(update_id, get_monitor_version(&object));

// safely convert u64 to u32
let version = if update_id >= u32::MAX as u64 {
u32::MAX
} else {
update_id as u32
};

let res = persist_monitor(
&retry_persister.storage,
&key,
&object,
Some(version),
&retry_logger,
)
.await;

match res {
Ok(_) => {
for id in update_ids {
if let Err(e) = retry_chain_monitor
.channel_monitor_updated(funding_txo, id)
{
log_error!(retry_logger, "Error notifying chain monitor of channel monitor update: {e:?}");
}
}
}
Err(e) => log_error!(
retry_logger,
"Failed to persist monitor for outpoint: {funding_txo:?}, error: {e:?}",
),
}
}
Err(_) => log_error!(
retry_logger,
"Failed to get monitor for outpoint: {funding_txo:?}"
),
}
}

// sleep 3 seconds
sleep(3_000).await;
}
});

Ok(Node {
_uuid: uuid,
stopped_components,
Expand Down
2 changes: 1 addition & 1 deletion mutiny-core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ where

/// Returns the version of a channel monitor from a serialized version
/// of a channel monitor.
pub fn get_monitor_version(bytes: Vec<u8>) -> u64 {
pub fn get_monitor_version(bytes: &[u8]) -> u64 {
// first two bytes are the version
// next 8 bytes are the version number
u64::from_be_bytes(bytes[2..10].try_into().unwrap())
Expand Down
6 changes: 3 additions & 3 deletions mutiny-wasm/src/indexed_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,12 @@ impl IndexedDbStorage {
match current.get::<Vec<u8>>(&key)? {
Some(bytes) => {
// check first byte is 1, then take u64 from next 8 bytes
let current_version = utils::get_monitor_version(bytes);
let current_version = utils::get_monitor_version(&bytes);

let obj: Value = LocalStorage::get(&key).unwrap();
let value = decrypt_value(&key, obj, current.password())?;
if let Ok(local_bytes) = serde_json::from_value::<Vec<u8>>(value.clone()) {
let local_version = utils::get_monitor_version(local_bytes);
let local_version = utils::get_monitor_version(&local_bytes);

// if the current version is less than the version from local storage
// then we want to use the local storage version
Expand Down Expand Up @@ -372,7 +372,7 @@ impl IndexedDbStorage {
// we can get versions from monitors, so we should compare
match current.get::<Vec<u8>>(&kv.key)? {
Some(bytes) => {
let current_version = utils::get_monitor_version(bytes);
let current_version = utils::get_monitor_version(&bytes);

// if the current version is less than the version from vss, then we want to use the vss version
if current_version < kv.version as u64 {
Expand Down
Loading