diff --git a/bridges/relays/finality/src/finality_loop.rs b/bridges/relays/finality/src/finality_loop.rs index a89068604709..302a038b81eb 100644 --- a/bridges/relays/finality/src/finality_loop.rs +++ b/bridges/relays/finality/src/finality_loop.rs @@ -32,6 +32,7 @@ use relay_utils::{ HeaderId, MaybeConnectionError, TrackedTransactionStatus, TransactionTracker, }; use std::{ + fmt::Debug, pin::Pin, time::{Duration, Instant}, }; @@ -182,15 +183,114 @@ pub(crate) struct Transaction { pub submitted_header_number: Number, } +impl Transaction { + pub async fn submit< + C: TargetClient, + P: FinalitySyncPipeline, + >( + target_client: &C, + header: P::Header, + justification: P::FinalityProof, + ) -> Result { + let submitted_header_number = header.number(); + log::debug!( + target: "bridge", + "Going to submit finality proof of {} header #{:?} to {}", + P::SOURCE_NAME, + submitted_header_number, + P::TARGET_NAME, + ); + + let tracker = target_client.submit_finality_proof(header, justification).await?; + Ok(Transaction { tracker, submitted_header_number }) + } + + pub async fn track, P: FinalitySyncPipeline>( + self, + target_client: &C, + ) -> Result<(), String> { + match self.tracker.wait().await { + TrackedTransactionStatus::Finalized(_) => { + // The transaction has been finalized, but it may have been finalized in the + // "failed" state. So let's check if the block number was actually updated. + // If it wasn't then we are stalled. + // + // Please also note that we're returning an error if we fail to read required data + // from the target client - that's the best we can do here to avoid actual stall. + target_client + .best_finalized_source_block_id() + .await + .map_err(|e| format!("failed to read best block from target node: {:?}", e)) + .and_then(|best_id_at_target| { + if self.submitted_header_number > best_id_at_target.0 { + return Err(format!( + "best block at target after tx is {:?} and we've submitted {:?}", + best_id_at_target.0, self.submitted_header_number, + )) + } + Ok(()) + }) + }, + TrackedTransactionStatus::Lost => Err("transaction failed".to_string()), + } + } +} + /// Finality proofs stream that may be restarted. pub(crate) struct RestartableFinalityProofsStream { /// Flag that the stream needs to be restarted. pub(crate) needs_restart: bool, /// The stream itself. - pub(crate) stream: Pin>, + stream: Pin>, +} + +impl RestartableFinalityProofsStream { + pub async fn create_raw_stream< + C: SourceClient, + P: FinalitySyncPipeline, + >( + source_client: &C, + ) -> Result { + source_client.finality_proofs().await.map_err(|error| { + log::error!( + target: "bridge", + "Failed to subscribe to {} justifications: {:?}. Going to reconnect", + P::SOURCE_NAME, + error, + ); + + FailedClient::Source + }) + } + + pub async fn restart_if_scheduled< + C: SourceClient, + P: FinalitySyncPipeline, + >( + &mut self, + source_client: &C, + ) -> Result<(), FailedClient> { + if self.needs_restart { + log::warn!(target: "bridge", "{} finality proofs stream is being restarted", P::SOURCE_NAME); + + self.needs_restart = false; + self.stream = Box::pin(Self::create_raw_stream(source_client).await?); + } + Ok(()) + } + + pub fn next(&mut self) -> Option { + match self.stream.next().now_or_never() { + Some(Some(finality_proof)) => Some(finality_proof), + Some(None) => { + self.needs_restart = true; + None + }, + None => None, + } + } } -#[cfg(test)] impl From for RestartableFinalityProofsStream { fn from(stream: S) -> Self { RestartableFinalityProofsStream { needs_restart: false, stream: Box::pin(stream) } @@ -218,27 +318,12 @@ pub(crate) async fn run_until_connection_lost( metrics_sync: Option, exit_signal: impl Future, ) -> Result<(), FailedClient> { - let restart_finality_proofs_stream = || async { - source_client.finality_proofs().await.map_err(|error| { - log::error!( - target: "bridge", - "Failed to subscribe to {} justifications: {:?}. Going to reconnect", - P::SOURCE_NAME, - error, - ); - - FailedClient::Source - }) - }; - let last_transaction_tracker = futures::future::Fuse::terminated(); let exit_signal = exit_signal.fuse(); futures::pin_mut!(last_transaction_tracker, exit_signal); - let mut finality_proofs_stream = RestartableFinalityProofsStream { - needs_restart: false, - stream: Box::pin(restart_finality_proofs_stream().await?), - }; + let mut finality_proofs_stream = + RestartableFinalityProofsStream::create_raw_stream(&source_client).await?.into(); let mut recent_finality_proofs = Vec::new(); let mut progress = (Instant::now(), None); @@ -263,10 +348,9 @@ pub(crate) async fn run_until_connection_lost( // deal with errors let next_tick = match iteration_result { - Ok(Some(updated_last_transaction)) => { - last_transaction_tracker.set(updated_last_transaction.tracker.wait().fuse()); - last_submitted_header_number = - Some(updated_last_transaction.submitted_header_number); + Ok(Some(updated_transaction)) => { + last_submitted_header_number = Some(updated_transaction.submitted_header_number); + last_transaction_tracker.set(updated_transaction.track(&target_client).fuse()); retry_backoff.reset(); sync_params.tick }, @@ -280,66 +364,23 @@ pub(crate) async fn run_until_connection_lost( retry_backoff.next_backoff().unwrap_or(relay_utils::relay_loop::RECONNECT_DELAY) }, }; - if finality_proofs_stream.needs_restart { - log::warn!(target: "bridge", "{} finality proofs stream is being restarted", P::SOURCE_NAME); - - finality_proofs_stream.needs_restart = false; - finality_proofs_stream.stream = Box::pin(restart_finality_proofs_stream().await?); - } + finality_proofs_stream.restart_if_scheduled(&source_client).await?; // wait till exit signal, or new source block select! { - transaction_status = last_transaction_tracker => { - match transaction_status { - TrackedTransactionStatus::Finalized(_) => { - // transaction has been finalized, but it may have been finalized in the "failed" state. So - // let's check if the block number has been actually updated. If it is not, then we are stalled. - // - // please also note that we're restarting the loop if we have failed to read required data - // from the target client - that's the best we can do here to avoid actual stall. - target_client - .best_finalized_source_block_id() - .await - .map_err(|e| format!("failed to read best block from target node: {:?}", e)) - .and_then(|best_id_at_target| { - let last_submitted_header_number = last_submitted_header_number - .expect("always Some when last_transaction_tracker is set;\ - last_transaction_tracker is set;\ - qed"); - if last_submitted_header_number > best_id_at_target.0 { - Err(format!( - "best block at target after tx is {:?} and we've submitted {:?}", - best_id_at_target, - last_submitted_header_number, - )) - } else { - Ok(()) - } - }) - .map_err(|e| { - log::error!( - target: "bridge", - "Failed Finality synchronization from {} to {} has stalled. Transaction failed: {}. \ - Going to restart", - P::SOURCE_NAME, - P::TARGET_NAME, - e, - ); - - FailedClient::Both - })?; - }, - TrackedTransactionStatus::Lost => { - log::error!( - target: "bridge", - "Finality synchronization from {} to {} has stalled. Going to restart", - P::SOURCE_NAME, - P::TARGET_NAME, - ); - - return Err(FailedClient::Both); - }, - } + transaction_result = last_transaction_tracker => { + transaction_result.map_err(|e| { + log::error!( + target: "bridge", + "Finality synchronization from {} to {} has stalled with error: {}. Going to restart", + P::SOURCE_NAME, + P::TARGET_NAME, + e, + ); + + // Restart the loop if we're stalled. + FailedClient::Both + })? }, _ = async_std::task::sleep(next_tick).fuse() => {}, _ = exit_signal => return Ok(()), @@ -414,20 +455,10 @@ where .await? { Some((header, justification)) => { - let submitted_header_number = header.number(); - log::debug!( - target: "bridge", - "Going to submit finality proof of {} header #{:?} to {}", - P::SOURCE_NAME, - submitted_header_number, - P::TARGET_NAME, - ); - - let tracker = target_client - .submit_finality_proof(header, justification) + let transaction = Transaction::submit(target_client, header, justification) .await .map_err(Error::Target)?; - Ok(Some(Transaction { tracker, submitted_header_number })) + Ok(Some(transaction)) }, None => Ok(None), } @@ -598,17 +629,7 @@ pub(crate) fn read_finality_proofs_from_stream< let mut proofs_count = 0; let mut first_header_number = None; let mut last_header_number = None; - loop { - let next_proof = finality_proofs_stream.stream.next(); - let finality_proof = match next_proof.now_or_never() { - Some(Some(finality_proof)) => finality_proof, - Some(None) => { - finality_proofs_stream.needs_restart = true; - break - }, - None => break, - }; - + while let Some(finality_proof) = finality_proofs_stream.next() { let target_header_number = finality_proof.target_header_number(); if first_header_number.is_none() { first_header_number = Some(target_header_number); @@ -700,16 +721,15 @@ pub(crate) fn prune_recent_finality_proofs( recent_finality_proofs: &mut FinalityProofs

, recent_finality_proofs_limit: usize, ) { - let position = recent_finality_proofs - .binary_search_by_key(&justified_header_number, |(header_number, _)| *header_number); - - // remove all obsolete elements - *recent_finality_proofs = recent_finality_proofs - .split_off(position.map(|position| position + 1).unwrap_or_else(|position| position)); - - // now - limit vec by size - let split_index = recent_finality_proofs.len().saturating_sub(recent_finality_proofs_limit); - *recent_finality_proofs = recent_finality_proofs.split_off(split_index); + let justified_header_idx = recent_finality_proofs + .binary_search_by_key(&justified_header_number, |(header_number, _)| *header_number) + .map(|idx| idx + 1) + .unwrap_or_else(|idx| idx); + let proofs_limit_idx = + recent_finality_proofs.len().saturating_sub(recent_finality_proofs_limit); + + *recent_finality_proofs = + recent_finality_proofs.split_off(std::cmp::max(justified_header_idx, proofs_limit_idx)); } fn print_sync_progress( diff --git a/bridges/relays/finality/src/finality_loop_tests.rs b/bridges/relays/finality/src/finality_loop_tests.rs index c8d5cefc2277..a6e97c0770f9 100644 --- a/bridges/relays/finality/src/finality_loop_tests.rs +++ b/bridges/relays/finality/src/finality_loop_tests.rs @@ -552,10 +552,7 @@ fn different_forks_at_source_and_at_target_are_detected() { ); let mut progress = (Instant::now(), None); - let mut finality_proofs_stream = RestartableFinalityProofsStream { - needs_restart: false, - stream: Box::pin(futures::stream::iter(vec![]).boxed()), - }; + let mut finality_proofs_stream = futures::stream::iter(vec![]).boxed().into(); let mut recent_finality_proofs = Vec::new(); let metrics_sync = SyncLoopMetrics::new(None, "source", "target").unwrap(); async_std::task::block_on(run_loop_iteration::(