Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry chain swap server lockup tx verification #557

Merged
merged 1 commit into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 111 additions & 43 deletions lib/core/src/chain_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,24 @@ impl ChainSwapHandler {
pub(crate) async fn start(self: Arc<Self>, mut shutdown: watch::Receiver<()>) {
let cloned = self.clone();
tokio::spawn(async move {
let mut rescan_interval = tokio::time::interval(Duration::from_secs(60 * 10));
rescan_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
let mut bitcoin_rescan_interval = tokio::time::interval(Duration::from_secs(60 * 10));
let mut liquid_rescan_interval = tokio::time::interval(Duration::from_secs(60));
bitcoin_rescan_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
liquid_rescan_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

loop {
tokio::select! {
_ = rescan_interval.tick() => {
if let Err(e) = cloned.rescan_incoming_chain_swaps(false).await {
error!("Error checking incoming chain swaps: {e:?}");
_ = bitcoin_rescan_interval.tick() => {
if let Err(e) = cloned.rescan_incoming_user_lockup_txs(false).await {
error!("Error checking incoming user txs: {e:?}");
}
if let Err(e) = cloned.rescan_outgoing_chain_swaps().await {
error!("Error checking outgoing chain swaps: {e:?}");
if let Err(e) = cloned.rescan_outgoing_claim_txs().await {
error!("Error checking outgoing server txs: {e:?}");
}
},
_ = liquid_rescan_interval.tick() => {
if let Err(e) = cloned.rescan_incoming_server_lockup_txs().await {
error!("Error checking incoming server txs: {e:?}");
}
},
_ = shutdown.changed() => {
Expand Down Expand Up @@ -110,7 +117,7 @@ impl ChainSwapHandler {
}
}

pub(crate) async fn rescan_incoming_chain_swaps(
pub(crate) async fn rescan_incoming_user_lockup_txs(
&self,
ignore_monitoring_block_height: bool,
) -> Result<()> {
Expand All @@ -122,16 +129,23 @@ impl ChainSwapHandler {
.filter(|s| s.direction == Direction::Incoming)
.collect();
info!(
"Rescanning {} incoming Chain Swap(s) at height {}",
"Rescanning {} incoming Chain Swap(s) user lockup txs at height {}",
chain_swaps.len(),
current_height
);
for swap in chain_swaps {
if let Err(e) = self
.rescan_incoming_chain_swap(&swap, current_height, ignore_monitoring_block_height)
.rescan_incoming_chain_swap_user_lockup_tx(
&swap,
current_height,
ignore_monitoring_block_height,
)
.await
{
error!("Error rescanning incoming Chain Swap {}: {e:?}", swap.id);
error!(
"Error rescanning user lockup of incoming Chain Swap {}: {e:?}",
swap.id
);
}
}
Ok(())
Expand All @@ -142,7 +156,7 @@ impl ChainSwapHandler {
/// - `current_height`: the tip
/// - `ignore_monitoring_block_height`: if true, it rescans an expired swap even after the
/// cutoff monitoring block height
async fn rescan_incoming_chain_swap(
async fn rescan_incoming_chain_swap_user_lockup_tx(
&self,
swap: &ChainSwap,
current_height: u32,
Expand Down Expand Up @@ -201,7 +215,59 @@ impl ChainSwapHandler {
Ok(())
}

pub(crate) async fn rescan_outgoing_chain_swaps(&self) -> Result<()> {
pub(crate) async fn rescan_incoming_server_lockup_txs(&self) -> Result<()> {
let current_height = self.liquid_chain_service.lock().await.tip().await?;
let chain_swaps: Vec<ChainSwap> = self
.persister
.list_chain_swaps()?
.into_iter()
.filter(|s| {
s.direction == Direction::Incoming && s.state == Pending && s.claim_tx_id.is_none()
})
.collect();
info!(
"Rescanning {} incoming Chain Swap(s) server lockup txs at height {}",
chain_swaps.len(),
current_height
);
for swap in chain_swaps {
if let Err(e) = self
.rescan_incoming_chain_swap_server_lockup_tx(&swap)
.await
{
error!(
"Error rescanning server lockup of incoming Chain Swap {}: {e:?}",
swap.id
);
}
}
Ok(())
}

async fn rescan_incoming_chain_swap_server_lockup_tx(&self, swap: &ChainSwap) -> Result<()> {
let Some(tx_id) = swap.server_lockup_tx_id.clone() else {
// Skip the rescan if there is no server_lockup_tx_id yet
return Ok(());
};
let swap_id = &swap.id;
let swap_script = swap.get_claim_swap_script()?;
let script_history = self.fetch_liquid_script_history(&swap_script).await?;
let tx_history = script_history
.iter()
.find(|h| h.txid.to_hex().eq(&tx_id))
.ok_or(anyhow!(
"Server lockup tx for incoming Chain Swap {swap_id} was not found, txid={tx_id}"
))?;
if tx_history.height > 0 {
info!("Incoming Chain Swap {swap_id} server lockup tx is confirmed");
self.claim(swap_id)
.await
.map_err(|e| anyhow!("Could not claim Chain Swap {swap_id}: {e:?}"))?;
}
Ok(())
}

pub(crate) async fn rescan_outgoing_claim_txs(&self) -> Result<()> {
let current_height = self.bitcoin_chain_service.lock().await.tip()?.height as u32;
let chain_swaps: Vec<ChainSwap> = self
.persister
Expand All @@ -212,19 +278,19 @@ impl ChainSwapHandler {
})
.collect();
info!(
"Rescanning {} outgoing Chain Swap(s) at height {}",
"Rescanning {} outgoing Chain Swap(s) claim txs at height {}",
chain_swaps.len(),
current_height
);
for swap in chain_swaps {
if let Err(e) = self.rescan_outgoing_chain_swap(&swap).await {
if let Err(e) = self.rescan_outgoing_chain_swap_claim_tx(&swap).await {
error!("Error rescanning outgoing Chain Swap {}: {e:?}", swap.id);
}
}
Ok(())
}

async fn rescan_outgoing_chain_swap(&self, swap: &ChainSwap) -> Result<()> {
async fn rescan_outgoing_chain_swap_claim_tx(&self, swap: &ChainSwap) -> Result<()> {
if let Some(claim_address) = &swap.claim_address {
let address = Address::from_str(claim_address)?;
let claim_tx_id = swap.claim_tx_id.clone().ok_or(anyhow!("No claim tx id"))?;
Expand Down Expand Up @@ -327,28 +393,31 @@ impl ChainSwapHandler {
return Err(anyhow!("Could not verify user lockup transaction: {e}",));
}

if let Err(e) = self.verify_server_lockup_tx(swap, &transaction, true).await
{
warn!("Server lockup transaction for incoming Chain Swap {} could not be verified. txid: {}, err: {}",
swap.id,
transaction.id,
e);
return Err(anyhow!(
"Could not verify server lockup transaction {}: {e}",
transaction.id
));
}
let verify_res =
self.verify_server_lockup_tx(swap, &transaction, true).await;

info!(
"Server lockup transaction was verified for incoming Chain Swap {}",
swap.id
);
// Set the server_lockup_tx_id if it is verified or not.
// If it is not yet confirmed, then it will be claimed after confirmation
// in rescan_incoming_chain_swap_server_lockup_tx()
self.update_swap_info(id, Pending, Some(&transaction.id), None, None, None)
.await?;
self.claim(id).await.map_err(|e| {
error!("Could not cooperate Chain Swap {id} claim: {e}");
anyhow!("Could not post claim details. Err: {e:?}")
})?;

match verify_res {
Ok(_) => {
info!("Server lockup transaction was verified for incoming Chain Swap {}", swap.id);
self.claim(id).await.map_err(|e| {
error!("Could not cooperate Chain Swap {id} claim: {e}");
anyhow!("Could not post claim details. Err: {e:?}")
})?;
}
Err(e) => {
warn!("Server lockup transaction for incoming Chain Swap {} could not be verified. txid: {}, err: {}", swap.id, transaction.id, e);
return Err(anyhow!(
"Could not verify server lockup transaction {}: {e}",
transaction.id
));
}
}
}
Some(claim_tx_id) => {
warn!("Claim tx for Chain Swap {id} was already broadcast: txid {claim_tx_id}")
Expand Down Expand Up @@ -1238,9 +1307,10 @@ impl ChainSwapHandler {
}

async fn verify_user_lockup_tx(&self, chain_swap: &ChainSwap) -> Result<String> {
let swap_script = chain_swap.get_lockup_swap_script()?;
let script_history = match chain_swap.direction {
Direction::Incoming => self.fetch_incoming_user_script_history(chain_swap).await,
Direction::Outgoing => self.fetch_outgoing_user_script_history(chain_swap).await,
Direction::Incoming => self.fetch_bitcoin_script_history(&swap_script).await,
Direction::Outgoing => self.fetch_liquid_script_history(&swap_script).await,
}?;

match chain_swap.user_lockup_tx_id.clone() {
Expand All @@ -1264,11 +1334,10 @@ impl ChainSwapHandler {
}
}

async fn fetch_incoming_user_script_history(
async fn fetch_bitcoin_script_history(
&self,
chain_swap: &ChainSwap,
swap_script: &SwapScriptV2,
) -> Result<Vec<History>> {
let swap_script = chain_swap.get_lockup_swap_script()?;
let address = swap_script
.as_bitcoin_script()?
.to_address(self.config.network.as_bitcoin_chain())
Expand All @@ -1282,11 +1351,10 @@ impl ChainSwapHandler {
.await
}

async fn fetch_outgoing_user_script_history(
async fn fetch_liquid_script_history(
&self,
chain_swap: &ChainSwap,
swap_script: &SwapScriptV2,
) -> Result<Vec<History>> {
let swap_script = chain_swap.get_lockup_swap_script()?;
let address = swap_script
.as_liquid_script()?
.to_address(self.config.network.into())
Expand Down
2 changes: 1 addition & 1 deletion lib/core/src/sdk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1895,7 +1895,7 @@ impl LiquidSdk {
/// is not necessary as it happens automatically in the background.
pub async fn rescan_onchain_swaps(&self) -> SdkResult<()> {
self.chain_swap_handler
.rescan_incoming_chain_swaps(true)
.rescan_incoming_user_lockup_txs(true)
.await?;
Ok(())
}
Expand Down
Loading