Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Read extrinsic dispatch result for mined transaction (#1582)
Browse files Browse the repository at this point in the history
* read extrinsic dispatch result for mined transaction

* commit for the history

* Revert "commit for the history"

This reverts commit 99341b04750639db296172cc1432bd70e458ef4b.

* Revert "read extrinsic dispatch result for mined transaction"

This reverts commit 662b776cbf992be9f1637e52f023b782e8c441d1.

* check for successfult transaction in finality relay

* check for successful transaction in parachains relay

* TrackedTransactionStatus ->TrackedTransactionStatus<HeaderId>

* check for successful transaction in messages relay

* fix compilation

* message_lane_loop_is_able_to_recover_from_unsuccessful_transaction

* fixed too-complex-type clippy error

* aaand compilation
  • Loading branch information
svyatonik authored Oct 3, 2022
1 parent 8b56d7c commit 63b51d9
Show file tree
Hide file tree
Showing 12 changed files with 462 additions and 114 deletions.
4 changes: 3 additions & 1 deletion relays/client-substrate/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,8 @@ impl<C: Chain> Client<C> {
prepare_extrinsic: impl FnOnce(HeaderIdOf<C>, C::Index) -> Result<UnsignedTransaction<C>>
+ Send
+ 'static,
) -> Result<TransactionTracker<C>> {
) -> Result<TransactionTracker<C, Self>> {
let self_clone = self.clone();
let _guard = self.submit_signed_extrinsic_lock.lock().await;
let transaction_nonce = self.next_account_index(extrinsic_signer).await?;
let best_header = self.best_header().await?;
Expand All @@ -494,6 +495,7 @@ impl<C: Chain> Client<C> {
})?;
log::trace!(target: "bridge", "Sent transaction to {} node: {:?}", C::NAME, tx_hash);
let tracker = TransactionTracker::new(
self_clone,
stall_timeout,
tx_hash,
Subscription(Mutex::new(receiver)),
Expand Down
124 changes: 101 additions & 23 deletions relays/client-substrate/src/transaction_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,28 @@

//! Helper for tracking transaction invalidation events.

use crate::{Chain, HashOf, Subscription, TransactionStatusOf};
use crate::{Chain, Client, Error, HashOf, HeaderIdOf, Subscription, TransactionStatusOf};

use async_trait::async_trait;
use futures::{future::Either, Future, FutureExt, Stream, StreamExt};
use relay_utils::TrackedTransactionStatus;
use relay_utils::{HeaderId, TrackedTransactionStatus};
use sp_runtime::traits::Header as _;
use std::time::Duration;

/// Transaction tracker environment.
#[async_trait]
pub trait Environment<C: Chain>: Send + Sync {
/// Returns header id by its hash.
async fn header_id_by_hash(&self, hash: HashOf<C>) -> Result<HeaderIdOf<C>, Error>;
}

#[async_trait]
impl<C: Chain> Environment<C> for Client<C> {
async fn header_id_by_hash(&self, hash: HashOf<C>) -> Result<HeaderIdOf<C>, Error> {
self.header_by_hash(hash).await.map(|h| HeaderId(*h.number(), hash))
}
}

/// Substrate transaction tracker implementation.
///
/// Substrate node provides RPC API to submit and watch for transaction events. This way
Expand All @@ -43,20 +58,22 @@ use std::time::Duration;
/// it is lost.
///
/// This struct implements third option as it seems to be the most optimal.
pub struct TransactionTracker<C: Chain> {
pub struct TransactionTracker<C: Chain, E> {
environment: E,
transaction_hash: HashOf<C>,
stall_timeout: Duration,
subscription: Subscription<TransactionStatusOf<C>>,
}

impl<C: Chain> TransactionTracker<C> {
impl<C: Chain, E: Environment<C>> TransactionTracker<C, E> {
/// Create transaction tracker.
pub fn new(
environment: E,
stall_timeout: Duration,
transaction_hash: HashOf<C>,
subscription: Subscription<TransactionStatusOf<C>>,
) -> Self {
Self { stall_timeout, transaction_hash, subscription }
Self { environment, stall_timeout, transaction_hash, subscription }
}

/// Wait for final transaction status and return it along with last known internal invalidation
Expand All @@ -65,10 +82,11 @@ impl<C: Chain> TransactionTracker<C> {
self,
wait_for_stall_timeout: impl Future<Output = ()>,
wait_for_stall_timeout_rest: impl Future<Output = ()>,
) -> (TrackedTransactionStatus, Option<InvalidationStatus>) {
) -> (TrackedTransactionStatus<HeaderIdOf<C>>, Option<InvalidationStatus<HeaderIdOf<C>>>) {
// sometimes we want to wait for the rest of the stall timeout even if
// `wait_for_invalidation` has been "select"ed first => it is shared
let wait_for_invalidation = watch_transaction_status::<C, _>(
let wait_for_invalidation = watch_transaction_status::<_, C, _>(
self.environment,
self.transaction_hash,
self.subscription.into_stream(),
);
Expand All @@ -86,8 +104,8 @@ impl<C: Chain> TransactionTracker<C> {
(TrackedTransactionStatus::Lost, None)
},
Either::Right((invalidation_status, _)) => match invalidation_status {
InvalidationStatus::Finalized =>
(TrackedTransactionStatus::Finalized, Some(invalidation_status)),
InvalidationStatus::Finalized(at_block) =>
(TrackedTransactionStatus::Finalized(at_block), Some(invalidation_status)),
InvalidationStatus::Invalid =>
(TrackedTransactionStatus::Lost, Some(invalidation_status)),
InvalidationStatus::Lost => {
Expand All @@ -111,8 +129,10 @@ impl<C: Chain> TransactionTracker<C> {
}

#[async_trait]
impl<C: Chain> relay_utils::TransactionTracker for TransactionTracker<C> {
async fn wait(self) -> TrackedTransactionStatus {
impl<C: Chain, E: Environment<C>> relay_utils::TransactionTracker for TransactionTracker<C, E> {
type HeaderId = HeaderIdOf<C>;

async fn wait(self) -> TrackedTransactionStatus<HeaderIdOf<C>> {
let wait_for_stall_timeout = async_std::task::sleep(self.stall_timeout).shared();
let wait_for_stall_timeout_rest = wait_for_stall_timeout.clone();
self.do_wait(wait_for_stall_timeout, wait_for_stall_timeout_rest).await.0
Expand All @@ -125,20 +145,25 @@ impl<C: Chain> relay_utils::TransactionTracker for TransactionTracker<C> {
/// ignored - relay loops are detecting the mining/finalization using their own
/// techniques. That's why we're using `InvalidationStatus` here.
#[derive(Debug, PartialEq)]
enum InvalidationStatus {
/// Transaction has been included into block and finalized.
Finalized,
enum InvalidationStatus<BlockId> {
/// Transaction has been included into block and finalized at given block.
Finalized(BlockId),
/// Transaction has been invalidated.
Invalid,
/// We have lost track of transaction status.
Lost,
}

/// Watch for transaction status until transaction is finalized or we lose track of its status.
async fn watch_transaction_status<C: Chain, S: Stream<Item = TransactionStatusOf<C>>>(
async fn watch_transaction_status<
E: Environment<C>,
C: Chain,
S: Stream<Item = TransactionStatusOf<C>>,
>(
environment: E,
transaction_hash: HashOf<C>,
subscription: S,
) -> InvalidationStatus {
) -> InvalidationStatus<HeaderIdOf<C>> {
futures::pin_mut!(subscription);

loop {
Expand All @@ -153,7 +178,23 @@ async fn watch_transaction_status<C: Chain, S: Stream<Item = TransactionStatusOf
transaction_hash,
block_hash,
);
return InvalidationStatus::Finalized

let header_id = match environment.header_id_by_hash(block_hash).await {
Ok(header_id) => header_id,
Err(e) => {
log::error!(
target: "bridge",
"Failed to read header {:?} when watching for {} transaction {:?}: {:?}",
block_hash,
C::NAME,
transaction_hash,
e,
);
// that's the best option we have here
return InvalidationStatus::Lost
},
};
return InvalidationStatus::Finalized(header_id)
},
Some(TransactionStatusOf::<C>::Invalid) => {
// if node says that the transaction is invalid, there are still chances that
Expand Down Expand Up @@ -247,11 +288,27 @@ mod tests {
use futures::{FutureExt, SinkExt};
use sc_transaction_pool_api::TransactionStatus;

struct TestEnvironment(Result<HeaderIdOf<TestChain>, Error>);

#[async_trait]
impl Environment<TestChain> for TestEnvironment {
async fn header_id_by_hash(
&self,
_hash: HashOf<TestChain>,
) -> Result<HeaderIdOf<TestChain>, Error> {
self.0.as_ref().map_err(|_| Error::UninitializedBridgePallet).cloned()
}
}

async fn on_transaction_status(
status: TransactionStatus<HashOf<TestChain>, HashOf<TestChain>>,
) -> Option<(TrackedTransactionStatus, InvalidationStatus)> {
) -> Option<(
TrackedTransactionStatus<HeaderIdOf<TestChain>>,
InvalidationStatus<HeaderIdOf<TestChain>>,
)> {
let (mut sender, receiver) = futures::channel::mpsc::channel(1);
let tx_tracker = TransactionTracker::<TestChain>::new(
let tx_tracker = TransactionTracker::<TestChain, TestEnvironment>::new(
TestEnvironment(Ok(HeaderId(0, Default::default()))),
Duration::from_secs(0),
Default::default(),
Subscription(async_std::sync::Mutex::new(receiver)),
Expand All @@ -270,7 +327,23 @@ mod tests {
async fn returns_finalized_on_finalized() {
assert_eq!(
on_transaction_status(TransactionStatus::Finalized(Default::default())).await,
Some((TrackedTransactionStatus::Finalized, InvalidationStatus::Finalized)),
Some((
TrackedTransactionStatus::Finalized(Default::default()),
InvalidationStatus::Finalized(Default::default())
)),
);
}

#[async_std::test]
async fn returns_lost_on_finalized_and_environment_error() {
assert_eq!(
watch_transaction_status::<_, TestChain, _>(
TestEnvironment(Err(Error::UninitializedBridgePallet)),
Default::default(),
futures::stream::iter([TransactionStatus::Finalized(Default::default())])
)
.now_or_never(),
Some(InvalidationStatus::Lost),
);
}

Expand Down Expand Up @@ -343,16 +416,21 @@ mod tests {
#[async_std::test]
async fn lost_on_subscription_error() {
assert_eq!(
watch_transaction_status::<TestChain, _>(Default::default(), futures::stream::iter([]))
.now_or_never(),
watch_transaction_status::<_, TestChain, _>(
TestEnvironment(Ok(HeaderId(0, Default::default()))),
Default::default(),
futures::stream::iter([])
)
.now_or_never(),
Some(InvalidationStatus::Lost),
);
}

#[async_std::test]
async fn lost_on_timeout_when_waiting_for_invalidation_status() {
let (_sender, receiver) = futures::channel::mpsc::channel(1);
let tx_tracker = TransactionTracker::<TestChain>::new(
let tx_tracker = TransactionTracker::<TestChain, TestEnvironment>::new(
TestEnvironment(Ok(HeaderId(0, Default::default()))),
Duration::from_secs(0),
Default::default(),
Subscription(async_std::sync::Mutex::new(receiver)),
Expand Down
58 changes: 49 additions & 9 deletions relays/finality/src/finality_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,15 +290,55 @@ pub(crate) async fn run_until_connection_lost<P: FinalitySyncPipeline>(
// wait till exit signal, or new source block
select! {
transaction_status = last_transaction_tracker => {
if transaction_status == TrackedTransactionStatus::Lost {
log::error!(
target: "bridge",
"Finality synchronization from {} to {} has stalled. Going to restart",
P::SOURCE_NAME,
P::TARGET_NAME,
);

return Err(FailedClient::Both);
match transaction_status {
TrackedTransactionStatus::Finalized(_) => {
// transaction has been finalized, but it may have been finalized in the "failed" state. So
// let's check if the block number has been actually updated. If it is not, then we are stalled.
//
// please also note that we're restarting the loop if we have failed to read required data
// from the target client - that's the best we can do here to avoid actual stall.
target_client
.best_finalized_source_block_id()
.await
.map_err(|e| format!("failed to read best block from target node: {:?}", e))
.and_then(|best_id_at_target| {
let last_submitted_header_number = last_submitted_header_number
.expect("always Some when last_transaction_tracker is set;\
last_transaction_tracker is set;\
qed");
if last_submitted_header_number > best_id_at_target.0 {
Err(format!(
"best block at target after tx is {:?} and we've submitted {:?}",
best_id_at_target,
last_submitted_header_number,
))
} else {
Ok(())
}
})
.map_err(|e| {
log::error!(
target: "bridge",
"Failed Finality synchronization from {} to {} has stalled. Transaction failed: {}. \
Going to restart",
P::SOURCE_NAME,
P::TARGET_NAME,
e,
);

FailedClient::Both
})?;
},
TrackedTransactionStatus::Lost => {
log::error!(
target: "bridge",
"Finality synchronization from {} to {} has stalled. Going to restart",
P::SOURCE_NAME,
P::TARGET_NAME,
);

return Err(FailedClient::Both);
},
}
},
_ = async_std::task::sleep(next_tick).fuse() => {},
Expand Down
22 changes: 18 additions & 4 deletions relays/finality/src/finality_loop_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,19 @@ type TestNumber = u64;
type TestHash = u64;

#[derive(Clone, Debug)]
struct TestTransactionTracker(TrackedTransactionStatus);
struct TestTransactionTracker(TrackedTransactionStatus<HeaderId<TestHash, TestNumber>>);

impl Default for TestTransactionTracker {
fn default() -> TestTransactionTracker {
TestTransactionTracker(TrackedTransactionStatus::Finalized)
TestTransactionTracker(TrackedTransactionStatus::Finalized(Default::default()))
}
}

#[async_trait]
impl TransactionTracker for TestTransactionTracker {
async fn wait(self) -> TrackedTransactionStatus {
type HeaderId = HeaderId<TestHash, TestNumber>;

async fn wait(self) -> TrackedTransactionStatus<HeaderId<TestHash, TestNumber>> {
self.0
}
}
Expand Down Expand Up @@ -224,7 +226,9 @@ fn prepare_test_clients(

target_best_block_id: HeaderId(5, 5),
target_headers: vec![],
target_transaction_tracker: TestTransactionTracker(TrackedTransactionStatus::Finalized),
target_transaction_tracker: TestTransactionTracker(TrackedTransactionStatus::Finalized(
Default::default(),
)),
}));
(
TestSourceClient {
Expand Down Expand Up @@ -581,3 +585,13 @@ fn stalls_when_transaction_tracker_returns_error() {

assert_eq!(result, Err(FailedClient::Both));
}

#[test]
fn stalls_when_transaction_tracker_returns_finalized_but_transaction_fails() {
let (_, result) = run_sync_loop(|data| {
data.target_best_block_id = HeaderId(5, 5);
data.target_best_block_id.0 == 16
});

assert_eq!(result, Err(FailedClient::Both));
}
2 changes: 1 addition & 1 deletion relays/lib-substrate-relay/src/finality/target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ where
AccountIdOf<P::TargetChain>: From<<AccountKeyPairOf<P::TransactionSignScheme> as Pair>::Public>,
P::TransactionSignScheme: TransactionSignScheme<Chain = P::TargetChain>,
{
type TransactionTracker = TransactionTracker<P::TargetChain>;
type TransactionTracker = TransactionTracker<P::TargetChain, Client<P::TargetChain>>;

async fn best_finalized_source_block_id(&self) -> Result<HeaderIdOf<P::SourceChain>, Error> {
// we can't continue to relay finality if target node is out of sync, because
Expand Down
2 changes: 1 addition & 1 deletion relays/lib-substrate-relay/src/messages_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ where
From<<AccountKeyPairOf<P::SourceTransactionSignScheme> as Pair>::Public>,
P::SourceTransactionSignScheme: TransactionSignScheme<Chain = P::SourceChain>,
{
type TransactionTracker = TransactionTracker<P::SourceChain>;
type TransactionTracker = TransactionTracker<P::SourceChain, Client<P::SourceChain>>;

async fn state(&self) -> Result<SourceClientState<MessageLaneAdapter<P>>, SubstrateError> {
// we can't continue to deliver confirmations if source node is out of sync, because
Expand Down
2 changes: 1 addition & 1 deletion relays/lib-substrate-relay/src/messages_target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ where
P::TargetTransactionSignScheme: TransactionSignScheme<Chain = P::TargetChain>,
BalanceOf<P::SourceChain>: TryFrom<BalanceOf<P::TargetChain>>,
{
type TransactionTracker = TransactionTracker<P::TargetChain>;
type TransactionTracker = TransactionTracker<P::TargetChain, Client<P::TargetChain>>;

async fn state(&self) -> Result<TargetClientState<MessageLaneAdapter<P>>, SubstrateError> {
// we can't continue to deliver confirmations if source node is out of sync, because
Expand Down
2 changes: 1 addition & 1 deletion relays/lib-substrate-relay/src/parachains/target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ where
P::TransactionSignScheme: TransactionSignScheme<Chain = P::TargetChain>,
AccountIdOf<P::TargetChain>: From<<AccountKeyPairOf<P::TransactionSignScheme> as Pair>::Public>,
{
type TransactionTracker = TransactionTracker<P::TargetChain>;
type TransactionTracker = TransactionTracker<P::TargetChain, Client<P::TargetChain>>;

async fn best_block(&self) -> Result<HeaderIdOf<P::TargetChain>, Self::Error> {
let best_header = self.client.best_header().await?;
Expand Down
Loading

0 comments on commit 63b51d9

Please sign in to comment.