Skip to content

Commit

Permalink
Finality loop - cosmetic changes (paritytech#1609)
Browse files Browse the repository at this point in the history
* Move some logic to RestartableFinalityProofsStream

* Move some logic to `Transaction`

* Avoid unnecessary split_off
  • Loading branch information
serban300 committed Apr 8, 2024
1 parent 553ec4e commit 0267ba6
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 117 deletions.
246 changes: 133 additions & 113 deletions bridges/relays/finality/src/finality_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use relay_utils::{
HeaderId, MaybeConnectionError, TrackedTransactionStatus, TransactionTracker,
};
use std::{
fmt::Debug,
pin::Pin,
time::{Duration, Instant},
};
Expand Down Expand Up @@ -182,15 +183,114 @@ pub(crate) struct Transaction<Tracker, Number> {
pub submitted_header_number: Number,
}

impl<Tracker: TransactionTracker, Number: Debug + PartialOrd> Transaction<Tracker, Number> {
pub async fn submit<
C: TargetClient<P, TransactionTracker = Tracker>,
P: FinalitySyncPipeline<Number = Number>,
>(
target_client: &C,
header: P::Header,
justification: P::FinalityProof,
) -> Result<Self, C::Error> {
let submitted_header_number = header.number();
log::debug!(
target: "bridge",
"Going to submit finality proof of {} header #{:?} to {}",
P::SOURCE_NAME,
submitted_header_number,
P::TARGET_NAME,
);

let tracker = target_client.submit_finality_proof(header, justification).await?;
Ok(Transaction { tracker, submitted_header_number })
}

pub async fn track<C: TargetClient<P>, P: FinalitySyncPipeline<Number = Number>>(
self,
target_client: &C,
) -> Result<(), String> {
match self.tracker.wait().await {
TrackedTransactionStatus::Finalized(_) => {
// The transaction has been finalized, but it may have been finalized in the
// "failed" state. So let's check if the block number was actually updated.
// If it wasn't then we are stalled.
//
// Please also note that we're returning an error if we fail to read required data
// from the target client - that's the best we can do here to avoid actual stall.
target_client
.best_finalized_source_block_id()
.await
.map_err(|e| format!("failed to read best block from target node: {:?}", e))
.and_then(|best_id_at_target| {
if self.submitted_header_number > best_id_at_target.0 {
return Err(format!(
"best block at target after tx is {:?} and we've submitted {:?}",
best_id_at_target.0, self.submitted_header_number,
))
}
Ok(())
})
},
TrackedTransactionStatus::Lost => Err("transaction failed".to_string()),
}
}
}

/// Finality proofs stream that may be restarted.
pub(crate) struct RestartableFinalityProofsStream<S> {
/// Flag that the stream needs to be restarted.
pub(crate) needs_restart: bool,
/// The stream itself.
pub(crate) stream: Pin<Box<S>>,
stream: Pin<Box<S>>,
}

impl<S: Stream> RestartableFinalityProofsStream<S> {
pub async fn create_raw_stream<
C: SourceClient<P, FinalityProofsStream = S>,
P: FinalitySyncPipeline,
>(
source_client: &C,
) -> Result<S, FailedClient> {
source_client.finality_proofs().await.map_err(|error| {
log::error!(
target: "bridge",
"Failed to subscribe to {} justifications: {:?}. Going to reconnect",
P::SOURCE_NAME,
error,
);

FailedClient::Source
})
}

pub async fn restart_if_scheduled<
C: SourceClient<P, FinalityProofsStream = S>,
P: FinalitySyncPipeline,
>(
&mut self,
source_client: &C,
) -> Result<(), FailedClient> {
if self.needs_restart {
log::warn!(target: "bridge", "{} finality proofs stream is being restarted", P::SOURCE_NAME);

self.needs_restart = false;
self.stream = Box::pin(Self::create_raw_stream(source_client).await?);
}
Ok(())
}

pub fn next(&mut self) -> Option<S::Item> {
match self.stream.next().now_or_never() {
Some(Some(finality_proof)) => Some(finality_proof),
Some(None) => {
self.needs_restart = true;
None
},
None => None,
}
}
}

#[cfg(test)]
impl<S> From<S> for RestartableFinalityProofsStream<S> {
fn from(stream: S) -> Self {
RestartableFinalityProofsStream { needs_restart: false, stream: Box::pin(stream) }
Expand Down Expand Up @@ -218,27 +318,12 @@ pub(crate) async fn run_until_connection_lost<P: FinalitySyncPipeline>(
metrics_sync: Option<SyncLoopMetrics>,
exit_signal: impl Future<Output = ()>,
) -> Result<(), FailedClient> {
let restart_finality_proofs_stream = || async {
source_client.finality_proofs().await.map_err(|error| {
log::error!(
target: "bridge",
"Failed to subscribe to {} justifications: {:?}. Going to reconnect",
P::SOURCE_NAME,
error,
);

FailedClient::Source
})
};

let last_transaction_tracker = futures::future::Fuse::terminated();
let exit_signal = exit_signal.fuse();
futures::pin_mut!(last_transaction_tracker, exit_signal);

let mut finality_proofs_stream = RestartableFinalityProofsStream {
needs_restart: false,
stream: Box::pin(restart_finality_proofs_stream().await?),
};
let mut finality_proofs_stream =
RestartableFinalityProofsStream::create_raw_stream(&source_client).await?.into();
let mut recent_finality_proofs = Vec::new();

let mut progress = (Instant::now(), None);
Expand All @@ -263,10 +348,9 @@ pub(crate) async fn run_until_connection_lost<P: FinalitySyncPipeline>(

// deal with errors
let next_tick = match iteration_result {
Ok(Some(updated_last_transaction)) => {
last_transaction_tracker.set(updated_last_transaction.tracker.wait().fuse());
last_submitted_header_number =
Some(updated_last_transaction.submitted_header_number);
Ok(Some(updated_transaction)) => {
last_submitted_header_number = Some(updated_transaction.submitted_header_number);
last_transaction_tracker.set(updated_transaction.track(&target_client).fuse());
retry_backoff.reset();
sync_params.tick
},
Expand All @@ -280,66 +364,23 @@ pub(crate) async fn run_until_connection_lost<P: FinalitySyncPipeline>(
retry_backoff.next_backoff().unwrap_or(relay_utils::relay_loop::RECONNECT_DELAY)
},
};
if finality_proofs_stream.needs_restart {
log::warn!(target: "bridge", "{} finality proofs stream is being restarted", P::SOURCE_NAME);

finality_proofs_stream.needs_restart = false;
finality_proofs_stream.stream = Box::pin(restart_finality_proofs_stream().await?);
}
finality_proofs_stream.restart_if_scheduled(&source_client).await?;

// wait till exit signal, or new source block
select! {
transaction_status = last_transaction_tracker => {
match transaction_status {
TrackedTransactionStatus::Finalized(_) => {
// transaction has been finalized, but it may have been finalized in the "failed" state. So
// let's check if the block number has been actually updated. If it is not, then we are stalled.
//
// please also note that we're restarting the loop if we have failed to read required data
// from the target client - that's the best we can do here to avoid actual stall.
target_client
.best_finalized_source_block_id()
.await
.map_err(|e| format!("failed to read best block from target node: {:?}", e))
.and_then(|best_id_at_target| {
let last_submitted_header_number = last_submitted_header_number
.expect("always Some when last_transaction_tracker is set;\
last_transaction_tracker is set;\
qed");
if last_submitted_header_number > best_id_at_target.0 {
Err(format!(
"best block at target after tx is {:?} and we've submitted {:?}",
best_id_at_target,
last_submitted_header_number,
))
} else {
Ok(())
}
})
.map_err(|e| {
log::error!(
target: "bridge",
"Failed Finality synchronization from {} to {} has stalled. Transaction failed: {}. \
Going to restart",
P::SOURCE_NAME,
P::TARGET_NAME,
e,
);

FailedClient::Both
})?;
},
TrackedTransactionStatus::Lost => {
log::error!(
target: "bridge",
"Finality synchronization from {} to {} has stalled. Going to restart",
P::SOURCE_NAME,
P::TARGET_NAME,
);

return Err(FailedClient::Both);
},
}
transaction_result = last_transaction_tracker => {
transaction_result.map_err(|e| {
log::error!(
target: "bridge",
"Finality synchronization from {} to {} has stalled with error: {}. Going to restart",
P::SOURCE_NAME,
P::TARGET_NAME,
e,
);

// Restart the loop if we're stalled.
FailedClient::Both
})?
},
_ = async_std::task::sleep(next_tick).fuse() => {},
_ = exit_signal => return Ok(()),
Expand Down Expand Up @@ -414,20 +455,10 @@ where
.await?
{
Some((header, justification)) => {
let submitted_header_number = header.number();
log::debug!(
target: "bridge",
"Going to submit finality proof of {} header #{:?} to {}",
P::SOURCE_NAME,
submitted_header_number,
P::TARGET_NAME,
);

let tracker = target_client
.submit_finality_proof(header, justification)
let transaction = Transaction::submit(target_client, header, justification)
.await
.map_err(Error::Target)?;
Ok(Some(Transaction { tracker, submitted_header_number }))
Ok(Some(transaction))
},
None => Ok(None),
}
Expand Down Expand Up @@ -598,17 +629,7 @@ pub(crate) fn read_finality_proofs_from_stream<
let mut proofs_count = 0;
let mut first_header_number = None;
let mut last_header_number = None;
loop {
let next_proof = finality_proofs_stream.stream.next();
let finality_proof = match next_proof.now_or_never() {
Some(Some(finality_proof)) => finality_proof,
Some(None) => {
finality_proofs_stream.needs_restart = true;
break
},
None => break,
};

while let Some(finality_proof) = finality_proofs_stream.next() {
let target_header_number = finality_proof.target_header_number();
if first_header_number.is_none() {
first_header_number = Some(target_header_number);
Expand Down Expand Up @@ -700,16 +721,15 @@ pub(crate) fn prune_recent_finality_proofs<P: FinalitySyncPipeline>(
recent_finality_proofs: &mut FinalityProofs<P>,
recent_finality_proofs_limit: usize,
) {
let position = recent_finality_proofs
.binary_search_by_key(&justified_header_number, |(header_number, _)| *header_number);

// remove all obsolete elements
*recent_finality_proofs = recent_finality_proofs
.split_off(position.map(|position| position + 1).unwrap_or_else(|position| position));

// now - limit vec by size
let split_index = recent_finality_proofs.len().saturating_sub(recent_finality_proofs_limit);
*recent_finality_proofs = recent_finality_proofs.split_off(split_index);
let justified_header_idx = recent_finality_proofs
.binary_search_by_key(&justified_header_number, |(header_number, _)| *header_number)
.map(|idx| idx + 1)
.unwrap_or_else(|idx| idx);
let proofs_limit_idx =
recent_finality_proofs.len().saturating_sub(recent_finality_proofs_limit);

*recent_finality_proofs =
recent_finality_proofs.split_off(std::cmp::max(justified_header_idx, proofs_limit_idx));
}

fn print_sync_progress<P: FinalitySyncPipeline>(
Expand Down
5 changes: 1 addition & 4 deletions bridges/relays/finality/src/finality_loop_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,10 +552,7 @@ fn different_forks_at_source_and_at_target_are_detected() {
);

let mut progress = (Instant::now(), None);
let mut finality_proofs_stream = RestartableFinalityProofsStream {
needs_restart: false,
stream: Box::pin(futures::stream::iter(vec![]).boxed()),
};
let mut finality_proofs_stream = futures::stream::iter(vec![]).boxed().into();
let mut recent_finality_proofs = Vec::new();
let metrics_sync = SyncLoopMetrics::new(None, "source", "target").unwrap();
async_std::task::block_on(run_loop_iteration::<TestFinalitySyncPipeline, _, _>(
Expand Down

0 comments on commit 0267ba6

Please sign in to comment.