diff --git a/Cargo.lock b/Cargo.lock index a6f5b664972..e8c61c91b47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1359,9 +1359,9 @@ dependencies = [ [[package]] name = "cpufeatures" -version = "0.2.1" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95059428f66df56b63431fdb4e1947ed2190586af5c5a8a8b71122bdf5a7f469" +checksum = "28d997bd5e24a5928dd43e46dc529867e207907fe0b239c3477d924f7f2ca320" dependencies = [ "libc", ] @@ -1517,12 +1517,12 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.5" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db" +checksum = "51887d4adc7b564537b15adcfb307936f8075dfcd5f00dde9a9f1d29383682bc" dependencies = [ "cfg-if 1.0.0", - "lazy_static", + "once_cell", ] [[package]] @@ -4184,7 +4184,7 @@ dependencies = [ "rand 0.8.5", "ring", "rw-stream-sink", - "sha2 0.10.2", + "sha2 0.10.5", "smallvec", "thiserror", "unsigned-varint", @@ -4258,7 +4258,7 @@ dependencies = [ "prost-build", "rand 0.7.3", "regex", - "sha2 0.10.2", + "sha2 0.10.5", "smallvec", "unsigned-varint", "wasm-timer", @@ -4305,7 +4305,7 @@ dependencies = [ "prost", "prost-build", "rand 0.7.3", - "sha2 0.10.2", + "sha2 0.10.5", "smallvec", "thiserror", "uint", @@ -4383,7 +4383,7 @@ dependencies = [ "prost", "prost-build", "rand 0.8.5", - "sha2 0.10.2", + "sha2 0.10.5", "snow", "static_assertions", "x25519-dalek", @@ -4480,7 +4480,7 @@ dependencies = [ "prost", "prost-build", "rand 0.8.5", - "sha2 0.10.2", + "sha2 0.10.5", "thiserror", "unsigned-varint", "void", @@ -4769,9 +4769,9 @@ dependencies = [ [[package]] name = "lz4" -version = "1.23.2" +version = "1.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aac20ed6991e01bf6a2e68cc73df2b389707403662a8ba89f68511fb340f724c" +checksum = "7e9e2dd86df36ce760a60f6ff6ad526f7ba1f14ba0356f8254fb6905e6494df1" dependencies = [ "libc", "lz4-sys", @@ -4779,9 +4779,9 @@ dependencies = [ [[package]] name = "lz4-sys" -version = "1.9.2" +version = "1.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dca79aa95d8b3226213ad454d328369853be3a1382d89532a854f4d69640acae" +checksum = "57d27b317e207b10f69f5e75494119e391a96f48861ae870d1da6edac98ca900" dependencies = [ "cc", "libc", @@ -4857,15 +4857,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "memmap2" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "723e3ebdcdc5c023db1df315364573789f8857c11b631a2fdfad7c00f5c046b4" -dependencies = [ - "libc", -] - [[package]] name = "memmap2" version = "0.5.0" @@ -5152,7 +5143,7 @@ dependencies = [ "core2", "digest 0.10.3", "multihash-derive", - "sha2 0.10.2", + "sha2 0.10.5", "sha3 0.10.1", "unsigned-varint", ] @@ -6462,9 +6453,9 @@ dependencies = [ [[package]] name = "parity-db" -version = "0.3.16" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bb474d0ed0836e185cb998a6b140ed1073d1fbf27d690ecf9ede8030289382c" +checksum = "2c8fdb726a43661fa54b43e7114e6b88b2289cae388eb3ad766d9d1754d83fce" dependencies = [ "blake2-rfc", "crc32fast", @@ -6473,8 +6464,8 @@ dependencies = [ "libc", "log", "lz4", - "memmap2 0.2.3", - "parking_lot 0.11.2", + "memmap2", + "parking_lot 0.12.1", "rand 0.8.5", "snap", ] @@ -9271,7 +9262,7 @@ version = "4.0.0-dev" source = "git+https://github.com/paritytech//substrate?branch=sv-locked-for-gav-xcm-v3-and-bridges#57e3486d9c7bb4deaef33cf9ba2da083b4e40314" dependencies = [ "impl-trait-for-tuples", - "memmap2 0.5.0", + "memmap2", "parity-scale-codec", "sc-chain-spec-derive", "sc-network-common", @@ -10468,9 +10459,9 @@ dependencies = [ [[package]] name = "sha2" -version = "0.10.2" +version = "0.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55deaec60f81eefe3cce0dc50bda92d6d8e88f2a27df7c5033b42afeb1ed2676" +checksum = "cf9db03534dff993187064c4e0c05a5708d2a9728ace9a8959b77bedf415dac5" dependencies = [ "cfg-if 1.0.0", "cpufeatures", @@ -10624,7 +10615,7 @@ dependencies = [ "rand_core 0.6.3", "ring", "rustc_version 0.4.0", - "sha2 0.10.2", + "sha2 0.10.5", "subtle", ] @@ -10908,7 +10899,7 @@ dependencies = [ "blake2", "byteorder", "digest 0.10.3", - "sha2 0.10.2", + "sha2 0.10.5", "sha3 0.10.1", "sp-std", "twox-hash", @@ -12343,7 +12334,7 @@ checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ "cfg-if 0.1.10", "digest 0.10.3", - "rand 0.8.5", + "rand 0.7.3", "static_assertions", ] diff --git a/bin/millau/runtime/src/lib.rs b/bin/millau/runtime/src/lib.rs index 66f454a0ecc..2fea97cea8d 100644 --- a/bin/millau/runtime/src/lib.rs +++ b/bin/millau/runtime/src/lib.rs @@ -540,6 +540,7 @@ parameter_types! { pub type WithRialtoParachainsInstance = (); impl pallet_bridge_parachains::Config for Runtime { + type Event = Event; type WeightInfo = pallet_bridge_parachains::weights::MillauWeight; type BridgesGrandpaPalletInstance = RialtoGrandpaInstance; type ParasPalletName = RialtoParasPalletName; @@ -551,6 +552,7 @@ impl pallet_bridge_parachains::Config for Runtime pub type WithWestendParachainsInstance = pallet_bridge_parachains::Instance1; impl pallet_bridge_parachains::Config for Runtime { + type Event = Event; type WeightInfo = pallet_bridge_parachains::weights::MillauWeight; type BridgesGrandpaPalletInstance = WestendGrandpaInstance; type ParasPalletName = WestendParasPalletName; @@ -592,10 +594,10 @@ construct_runtime!( // Westend bridge modules. BridgeWestendGrandpa: pallet_bridge_grandpa::::{Pallet, Call, Config, Storage}, - BridgeWestendParachains: pallet_bridge_parachains::::{Pallet, Call, Storage}, + BridgeWestendParachains: pallet_bridge_parachains::::{Pallet, Call, Storage, Event}, // RialtoParachain bridge modules. - BridgeRialtoParachains: pallet_bridge_parachains::{Pallet, Call, Storage}, + BridgeRialtoParachains: pallet_bridge_parachains::{Pallet, Call, Storage, Event}, BridgeRialtoParachainMessages: pallet_bridge_messages::::{Pallet, Call, Storage, Event, Config}, // Pallet for sending XCM. diff --git a/deny.toml b/deny.toml index e5281e0e849..3fa007bbe0a 100644 --- a/deny.toml +++ b/deny.toml @@ -48,17 +48,21 @@ notice = "warn" # A list of advisory IDs to ignore. Note that ignored advisories will still # output a note when they are encountered. ignore = [ - "RUSTSEC-2020-0070", # Comes from honggfuzz via storage-proof-fuzzer: 'memmap' "RUSTSEC-2020-0077", - # net2 (origin: Substrate RPC crates) - "RUSTSEC-2020-0016", # time (origin: Substrate RPC + benchmarking crates) "RUSTSEC-2020-0071", # chrono (origin: Substrate benchmarking + cli + ...) "RUSTSEC-2020-0159", # lru 0.6.6 (origin: libp2p) "RUSTSEC-2021-0130", + # ansi_term (The maintainer has adviced that this crate is deprecated and will not receive any maintenance. + # Once other crates will move to some alternative, we'll do that too) + "RUSTSEC-2021-0139", + # rocksdb (origin: Substrate and Polkadot kvdb-rocksdb - we need to upgrade soon) + "RUSTSEC-2022-0046", + # owning_ref (origin: Substrate, libp2p) + "RUSTSEC-2022-0040", ] # Threshold for security vulnerabilities, any vulnerability with a CVSS score # lower than the range specified will be ignored. Note that ignored advisories diff --git a/modules/parachains/src/extension.rs b/modules/parachains/src/extension.rs index fb93e671e22..05500cf41dd 100644 --- a/modules/parachains/src/extension.rs +++ b/modules/parachains/src/extension.rs @@ -17,7 +17,7 @@ use crate::{Config, Pallet, RelayBlockHash, RelayBlockHasher, RelayBlockNumber}; use bp_runtime::FilterCall; use frame_support::{dispatch::CallableCallFor, traits::IsSubType}; -use sp_runtime::transaction_validity::{TransactionValidity, ValidTransaction}; +use sp_runtime::transaction_validity::{InvalidTransaction, TransactionValidity, ValidTransaction}; /// Validate parachain heads in order to avoid "mining" transactions that provide /// outdated bridged parachain heads. Without this validation, even honest relayers @@ -57,13 +57,19 @@ where }; let maybe_stored_best_head = crate::ParasInfo::::get(parachain); - Self::validate_updated_parachain_head( + let is_valid = Self::validate_updated_parachain_head( parachain, &maybe_stored_best_head, updated_at_relay_block_number, parachain_head_hash, "Rejecting obsolete parachain-head transaction", - ) + ); + + if is_valid { + Ok(ValidTransaction::default()) + } else { + InvalidTransaction::Stale.into() + } } } diff --git a/modules/parachains/src/lib.rs b/modules/parachains/src/lib.rs index bf07cdc9e67..290bdadf04a 100644 --- a/modules/parachains/src/lib.rs +++ b/modules/parachains/src/lib.rs @@ -77,6 +77,27 @@ pub mod pallet { /// Weight info of the given parachains pallet. pub type WeightInfoOf = >::WeightInfo; + #[pallet::event] + #[pallet::generate_deposit(pub(super) fn deposit_event)] + pub enum Event, I: 'static = ()> { + /// The caller has provided head of parachain that the pallet is not configured to track. + UntrackedParachainRejected { parachain: ParaId }, + /// The caller has declared that he has provided given parachain head, but it is missing + /// from the storage proof. + MissingParachainHead { parachain: ParaId }, + /// The caller has provided parachain head hash that is not matching the hash read from the + /// storage proof. + IncorrectParachainHeadHash { + parachain: ParaId, + parachain_head_hash: ParaHash, + actual_parachain_head_hash: ParaHash, + }, + /// The caller has provided obsolete parachain head, which is already known to the pallet. + RejectedObsoleteParachainHead { parachain: ParaId, parachain_head_hash: ParaHash }, + /// Parachain head has been updated. + UpdatedParachainHead { parachain: ParaId, parachain_head_hash: ParaHash }, + } + #[pallet::error] pub enum Error { /// Relay chain block hash is unknown to us. @@ -100,6 +121,8 @@ pub mod pallet { pub trait Config: pallet_bridge_grandpa::Config { + /// The overarching event type. + type Event: From> + IsType<::Event>; /// Benchmarks results from runtime we're plugged into. type WeightInfo: WeightInfoExt; @@ -109,7 +132,11 @@ pub mod pallet { /// we're interested in. type BridgesGrandpaPalletInstance: 'static; - /// Name of the `paras` pallet in the `construct_runtime!()` call at the bridged chain. + /// Name of the original `paras` pallet in the `construct_runtime!()` call at the bridged + /// chain. + /// + /// Please keep in mind that this should be the name of the `runtime_parachains::paras` + /// pallet from polkadot repository, not the `pallet-bridge-parachains`. #[pallet::constant] type ParasPalletName: Get<&'static str>; @@ -248,6 +275,7 @@ pub mod pallet { "The head of parachain {:?} has been provided, but it is not tracked by the pallet", parachain, ); + Self::deposit_event(Event::UntrackedParachainRejected { parachain }); continue; } @@ -264,6 +292,7 @@ pub mod pallet { "Looks like it has been deregistered from the source relay chain" }, ); + Self::deposit_event(Event::MissingParachainHead { parachain }); continue; }, Err(e) => { @@ -273,6 +302,7 @@ pub mod pallet { parachain, e, ); + Self::deposit_event(Event::MissingParachainHead { parachain }); continue; }, }; @@ -288,6 +318,11 @@ pub mod pallet { parachain_head_hash, actual_parachain_head_hash, ); + Self::deposit_event(Event::IncorrectParachainHeadHash { + parachain, + parachain_head_hash, + actual_parachain_head_hash, + }); continue; } @@ -387,16 +422,20 @@ pub mod pallet { /// Check if para head has been already updated at better relay chain block. /// Without this check, we may import heads in random order. + /// + /// Returns `true` if the pallet is ready to import given parachain head. + /// Returns `false` if the pallet already knows the same or better parachain head. + #[must_use] pub fn validate_updated_parachain_head( parachain: ParaId, maybe_stored_best_head: &Option, updated_at_relay_block_number: RelayBlockNumber, updated_head_hash: ParaHash, err_log_prefix: &str, - ) -> TransactionValidity { + ) -> bool { let stored_best_head = match maybe_stored_best_head { Some(stored_best_head) => stored_best_head, - None => return Ok(ValidTransaction::default()), + None => return true, }; if stored_best_head.best_head_hash.at_relay_block_number >= @@ -410,7 +449,7 @@ pub mod pallet { stored_best_head.best_head_hash.at_relay_block_number, updated_at_relay_block_number ); - return InvalidTransaction::Stale.into() + return false } if stored_best_head.best_head_hash.head_hash == updated_head_hash { @@ -423,10 +462,10 @@ pub mod pallet { stored_best_head.best_head_hash.at_relay_block_number, updated_at_relay_block_number ); - return InvalidTransaction::Stale.into() + return false } - Ok(ValidTransaction::default()) + true } /// Try to update parachain head. @@ -439,14 +478,20 @@ pub mod pallet { ) -> Result { // check if head has been already updated at better relay chain block. Without this // check, we may import heads in random order - Self::validate_updated_parachain_head( + let is_valid = Self::validate_updated_parachain_head( parachain, &stored_best_head, updated_at_relay_block_number, updated_head_hash, "The parachain head can't be updated", - ) - .map_err(|_| ())?; + ); + if !is_valid { + Self::deposit_event(Event::RejectedObsoleteParachainHead { + parachain, + parachain_head_hash: updated_head_hash, + }); + return Err(()) + } let next_imported_hash_position = stored_best_head .map_or(0, |stored_best_head| stored_best_head.next_imported_hash_position); @@ -485,6 +530,10 @@ pub mod pallet { ); ImportedParaHeads::::remove(parachain, head_hash_to_prune); } + Self::deposit_event(Event::UpdatedParachainHead { + parachain, + parachain_head_hash: updated_head_hash, + }); Ok(UpdateParachainHeadArtifacts { best_head: updated_best_para_head, prune_happened }) } @@ -526,7 +575,8 @@ pub mod pallet { mod tests { use super::*; use crate::mock::{ - run_test, test_relay_header, Origin, TestRuntime, PARAS_PALLET_NAME, UNTRACKED_PARACHAIN_ID, + run_test, test_relay_header, Event as TestEvent, Origin, TestRuntime, PARAS_PALLET_NAME, + UNTRACKED_PARACHAIN_ID, }; use codec::Encode; @@ -545,6 +595,7 @@ mod tests { traits::{Get, OnInitialize}, weights::Weight, }; + use frame_system::{EventRecord, Pallet as System, Phase}; use sp_runtime::DispatchError; use sp_trie::{trie_types::TrieDBMutBuilderV1, LayoutV1, MemoryDB, Recorder, TrieMut}; @@ -733,6 +784,28 @@ mod tests { ImportedParaHeads::::get(ParaId(3), head_hash(3, 10)), Some(head_data(3, 10)) ); + + assert_eq!( + System::::events(), + vec![ + EventRecord { + phase: Phase::Initialization, + event: TestEvent::Parachains(Event::UpdatedParachainHead { + parachain: ParaId(1), + parachain_head_hash: initial_best_head(1).best_head_hash.head_hash, + }), + topics: vec![], + }, + EventRecord { + phase: Phase::Initialization, + event: TestEvent::Parachains(Event::UpdatedParachainHead { + parachain: ParaId(3), + parachain_head_hash: head_data(3, 10).hash(), + }), + topics: vec![], + } + ], + ); }); } @@ -764,6 +837,17 @@ mod tests { ImportedParaHeads::::get(ParaId(1), head_data(1, 10).hash()), None ); + assert_eq!( + System::::events(), + vec![EventRecord { + phase: Phase::Initialization, + event: TestEvent::Parachains(Event::UpdatedParachainHead { + parachain: ParaId(1), + parachain_head_hash: head_data(1, 5).hash(), + }), + topics: vec![], + }], + ); // import head#10 of parachain#1 at relay block #1 proceed(1, state_root_10); @@ -786,6 +870,27 @@ mod tests { ImportedParaHeads::::get(ParaId(1), head_data(1, 10).hash()), Some(head_data(1, 10)) ); + assert_eq!( + System::::events(), + vec![ + EventRecord { + phase: Phase::Initialization, + event: TestEvent::Parachains(Event::UpdatedParachainHead { + parachain: ParaId(1), + parachain_head_hash: head_data(1, 5).hash(), + }), + topics: vec![], + }, + EventRecord { + phase: Phase::Initialization, + event: TestEvent::Parachains(Event::UpdatedParachainHead { + parachain: ParaId(1), + parachain_head_hash: head_data(1, 10).hash(), + }), + topics: vec![], + } + ], + ); }); } @@ -834,6 +939,34 @@ mod tests { next_imported_hash_position: 1, }) ); + assert_eq!( + System::::events(), + vec![ + EventRecord { + phase: Phase::Initialization, + event: TestEvent::Parachains(Event::UpdatedParachainHead { + parachain: ParaId(1), + parachain_head_hash: head_data(1, 5).hash(), + }), + topics: vec![], + }, + EventRecord { + phase: Phase::Initialization, + event: TestEvent::Parachains(Event::UntrackedParachainRejected { + parachain: ParaId(UNTRACKED_PARACHAIN_ID), + }), + topics: vec![], + }, + EventRecord { + phase: Phase::Initialization, + event: TestEvent::Parachains(Event::UpdatedParachainHead { + parachain: ParaId(2), + parachain_head_hash: head_data(1, 5).hash(), + }), + topics: vec![], + } + ], + ); }); } @@ -846,12 +979,44 @@ mod tests { initialize(state_root); assert_ok!(import_parachain_1_head(0, state_root, parachains.clone(), proof.clone())); assert_eq!(ParasInfo::::get(ParaId(1)), Some(initial_best_head(1))); + assert_eq!( + System::::events(), + vec![EventRecord { + phase: Phase::Initialization, + event: TestEvent::Parachains(Event::UpdatedParachainHead { + parachain: ParaId(1), + parachain_head_hash: initial_best_head(1).best_head_hash.head_hash, + }), + topics: vec![], + }], + ); // try to import head#0 of parachain#1 at relay block#1 // => call succeeds, but nothing is changed proceed(1, state_root); assert_ok!(import_parachain_1_head(1, state_root, parachains, proof)); assert_eq!(ParasInfo::::get(ParaId(1)), Some(initial_best_head(1))); + assert_eq!( + System::::events(), + vec![ + EventRecord { + phase: Phase::Initialization, + event: TestEvent::Parachains(Event::UpdatedParachainHead { + parachain: ParaId(1), + parachain_head_hash: initial_best_head(1).best_head_hash.head_hash, + }), + topics: vec![], + }, + EventRecord { + phase: Phase::Initialization, + event: TestEvent::Parachains(Event::RejectedObsoleteParachainHead { + parachain: ParaId(1), + parachain_head_hash: initial_best_head(1).best_head_hash.head_hash, + }), + topics: vec![], + } + ], + ); }); } @@ -878,6 +1043,17 @@ mod tests { next_imported_hash_position: 1, }) ); + assert_eq!( + System::::events(), + vec![EventRecord { + phase: Phase::Initialization, + event: TestEvent::Parachains(Event::UpdatedParachainHead { + parachain: ParaId(1), + parachain_head_hash: head_data(1, 10).hash(), + }), + topics: vec![], + }], + ); // now try to import head#5 at relay block#0 // => nothing is changed, because better head has already been imported @@ -892,6 +1068,27 @@ mod tests { next_imported_hash_position: 1, }) ); + assert_eq!( + System::::events(), + vec![ + EventRecord { + phase: Phase::Initialization, + event: TestEvent::Parachains(Event::UpdatedParachainHead { + parachain: ParaId(1), + parachain_head_hash: head_data(1, 10).hash(), + }), + topics: vec![], + }, + EventRecord { + phase: Phase::Initialization, + event: TestEvent::Parachains(Event::RejectedObsoleteParachainHead { + parachain: ParaId(1), + parachain_head_hash: head_data(1, 5).hash(), + }), + topics: vec![], + } + ], + ); }); } @@ -1047,5 +1244,57 @@ mod tests { ); } + #[test] + fn ignores_parachain_head_if_it_is_missing_from_storage_proof() { + let (state_root, proof, _) = prepare_parachain_heads_proof(vec![(1, head_data(1, 0))]); + let parachains = vec![(ParaId(2), Default::default())]; + run_test(|| { + initialize(state_root); + assert_ok!(Pallet::::submit_parachain_heads( + Origin::signed(1), + (0, test_relay_header(0, state_root).hash()), + parachains, + proof, + )); + assert_eq!( + System::::events(), + vec![EventRecord { + phase: Phase::Initialization, + event: TestEvent::Parachains(Event::MissingParachainHead { + parachain: ParaId(2), + }), + topics: vec![], + }], + ); + }); + } + + #[test] + fn ignores_parachain_head_if_parachain_head_hash_is_wrong() { + let (state_root, proof, _) = prepare_parachain_heads_proof(vec![(1, head_data(1, 0))]); + let parachains = vec![(ParaId(1), head_data(1, 10).hash())]; + run_test(|| { + initialize(state_root); + assert_ok!(Pallet::::submit_parachain_heads( + Origin::signed(1), + (0, test_relay_header(0, state_root).hash()), + parachains, + proof, + )); + assert_eq!( + System::::events(), + vec![EventRecord { + phase: Phase::Initialization, + event: TestEvent::Parachains(Event::IncorrectParachainHeadHash { + parachain: ParaId(1), + parachain_head_hash: head_data(1, 10).hash(), + actual_parachain_head_hash: head_data(1, 0).hash(), + }), + topics: vec![], + }], + ); + }); + } + generate_owned_bridge_module_tests!(BasicOperatingMode::Normal, BasicOperatingMode::Halted); } diff --git a/modules/parachains/src/mock.rs b/modules/parachains/src/mock.rs index eaf5d3b4fdd..292856f35a7 100644 --- a/modules/parachains/src/mock.rs +++ b/modules/parachains/src/mock.rs @@ -46,7 +46,7 @@ construct_runtime! { System: frame_system::{Pallet, Call, Config, Storage, Event}, Grandpa1: pallet_bridge_grandpa::::{Pallet}, Grandpa2: pallet_bridge_grandpa::::{Pallet}, - Parachains: pallet_bridge_parachains::{Call, Pallet}, + Parachains: pallet_bridge_parachains::{Call, Pallet, Event}, } } @@ -67,7 +67,7 @@ impl frame_system::Config for TestRuntime { type AccountId = AccountId; type Lookup = IdentityLookup; type Header = Header; - type Event = (); + type Event = Event; type BlockHashCount = BlockHashCount; type Version = (); type PalletInfo = PalletInfo; @@ -112,6 +112,7 @@ parameter_types! { } impl pallet_bridge_parachains::Config for TestRuntime { + type Event = Event; type WeightInfo = (); type BridgesGrandpaPalletInstance = pallet_bridge_grandpa::Instance1; type ParasPalletName = ParasPalletName; @@ -166,7 +167,11 @@ impl Chain for OtherBridgedChain { } pub fn run_test(test: impl FnOnce() -> T) -> T { - sp_io::TestExternalities::new(Default::default()).execute_with(test) + sp_io::TestExternalities::new(Default::default()).execute_with(|| { + System::set_block_number(1); + System::reset_events(); + test() + }) } pub fn test_relay_header( diff --git a/relays/bin-substrate/src/cli/register_parachain.rs b/relays/bin-substrate/src/cli/register_parachain.rs index 89043653779..4febc4b04a8 100644 --- a/relays/bin-substrate/src/cli/register_parachain.rs +++ b/relays/bin-substrate/src/cli/register_parachain.rs @@ -26,10 +26,8 @@ use polkadot_runtime_common::{ paras_registrar::Call as ParaRegistrarCall, slots::Call as ParaSlotsCall, }; use polkadot_runtime_parachains::paras::ParaLifecycle; -use relay_substrate_client::{ - AccountIdOf, CallOf, Chain, Client, HashOf, SignParam, Subscription, TransactionStatusOf, - UnsignedTransaction, -}; +use relay_substrate_client::{AccountIdOf, CallOf, Chain, Client, SignParam, UnsignedTransaction}; +use relay_utils::{TrackedTransactionStatus, TransactionTracker}; use rialto_runtime::SudoCall; use sp_core::{ storage::{well_known_keys::CODE, StorageKey}, @@ -116,26 +114,30 @@ impl RegisterParachain { ParaRegistrarCall::reserve {}.into(); let reserve_parachain_signer = relay_sign.clone(); let (spec_version, transaction_version) = relay_client.simple_runtime_version().await?; - wait_until_transaction_is_finalized::( - relay_client - .submit_and_watch_signed_extrinsic( - relay_sudo_account.clone(), - SignParam:: { - spec_version, - transaction_version, - genesis_hash: relay_genesis_hash, - signer: reserve_parachain_signer, - }, - move |_, transaction_nonce| { - Ok(UnsignedTransaction::new( - reserve_parachain_id_call.into(), - transaction_nonce, - )) - }, - ) - .await?, - ) - .await?; + let reserve_result = relay_client + .submit_and_watch_signed_extrinsic( + relay_sudo_account.clone(), + SignParam:: { + spec_version, + transaction_version, + genesis_hash: relay_genesis_hash, + signer: reserve_parachain_signer, + }, + move |_, transaction_nonce| { + Ok(UnsignedTransaction::new( + reserve_parachain_id_call.into(), + transaction_nonce, + )) + }, + ) + .await? + .wait() + .await; + if reserve_result == TrackedTransactionStatus::Lost { + return Err(anyhow::format_err!( + "Failed to finalize `reserve-parachain-id` transaction" + )) + } log::info!(target: "bridge", "Reserved parachain id: {:?}", para_id); // step 2: register parathread @@ -161,26 +163,30 @@ impl RegisterParachain { } .into(); let register_parathread_signer = relay_sign.clone(); - wait_until_transaction_is_finalized::( - relay_client - .submit_and_watch_signed_extrinsic( - relay_sudo_account.clone(), - SignParam:: { - spec_version, - transaction_version, - genesis_hash: relay_genesis_hash, - signer: register_parathread_signer, - }, - move |_, transaction_nonce| { - Ok(UnsignedTransaction::new( - register_parathread_call.into(), - transaction_nonce, - )) - }, - ) - .await?, - ) - .await?; + let register_result = relay_client + .submit_and_watch_signed_extrinsic( + relay_sudo_account.clone(), + SignParam:: { + spec_version, + transaction_version, + genesis_hash: relay_genesis_hash, + signer: register_parathread_signer, + }, + move |_, transaction_nonce| { + Ok(UnsignedTransaction::new( + register_parathread_call.into(), + transaction_nonce, + )) + }, + ) + .await? + .wait() + .await; + if register_result == TrackedTransactionStatus::Lost { + return Err(anyhow::format_err!( + "Failed to finalize `register-parathread` transaction" + )) + } log::info!(target: "bridge", "Registered parachain: {:?}. Waiting for onboarding", para_id); // wait until parathread is onboarded @@ -256,46 +262,6 @@ impl RegisterParachain { } } -/// Wait until transaction is included into finalized block. -/// -/// Returns the hash of the finalized block with transaction. -pub(crate) async fn wait_until_transaction_is_finalized( - subscription: Subscription>, -) -> anyhow::Result> { - loop { - let transaction_status = subscription.next().await?; - match transaction_status { - Some(TransactionStatusOf::::FinalityTimeout(_)) | - Some(TransactionStatusOf::::Usurped(_)) | - Some(TransactionStatusOf::::Dropped) | - Some(TransactionStatusOf::::Invalid) | - None => - return Err(anyhow::format_err!( - "We've been waiting for finalization of {} transaction, but it now has the {:?} status", - C::NAME, - transaction_status, - )), - Some(TransactionStatusOf::::Finalized(block_hash)) => { - log::trace!( - target: "bridge", - "{} transaction has been finalized at block {}", - C::NAME, - block_hash, - ); - return Ok(block_hash) - }, - _ => { - log::trace!( - target: "bridge", - "Received intermediate status of {} transaction: {:?}", - C::NAME, - transaction_status, - ); - }, - } - } -} - /// Wait until parachain state is changed. async fn wait_para_state( relay_client: &Client, diff --git a/relays/client-substrate/src/client.rs b/relays/client-substrate/src/client.rs index b1f0eb85873..ed327e167b0 100644 --- a/relays/client-substrate/src/client.rs +++ b/relays/client-substrate/src/client.rs @@ -23,8 +23,8 @@ use crate::{ SubstrateGrandpaClient, SubstrateStateClient, SubstrateSystemClient, SubstrateTransactionPaymentClient, }, - ConnectionParams, Error, HashOf, HeaderIdOf, Result, SignParam, TransactionSignScheme, - TransactionStatusOf, UnsignedTransaction, + transaction_stall_timeout, ConnectionParams, Error, HashOf, HeaderIdOf, Result, SignParam, + TransactionSignScheme, TransactionTracker, UnsignedTransaction, }; use async_std::sync::{Arc, Mutex}; @@ -40,7 +40,7 @@ use jsonrpsee::{ use num_traits::{Bounded, Zero}; use pallet_balances::AccountData; use pallet_transaction_payment::InclusionFee; -use relay_utils::relay_loop::RECONNECT_DELAY; +use relay_utils::{relay_loop::RECONNECT_DELAY, STALL_TIMEOUT}; use sp_core::{ storage::{StorageData, StorageKey}, Bytes, Hasher, @@ -58,7 +58,7 @@ const SUB_API_TXPOOL_VALIDATE_TRANSACTION: &str = "TaggedTransactionQueue_valida const MAX_SUBSCRIPTION_CAPACITY: usize = 4096; /// Opaque justifications subscription type. -pub struct Subscription(Mutex>>); +pub struct Subscription(pub(crate) Mutex>>); /// Opaque GRANDPA authorities set. pub type OpaqueGrandpaAuthoritiesSet = Vec; @@ -467,14 +467,20 @@ impl Client { prepare_extrinsic: impl FnOnce(HeaderIdOf, C::Index) -> Result> + Send + 'static, - ) -> Result>> { + ) -> Result> { let _guard = self.submit_signed_extrinsic_lock.lock().await; let transaction_nonce = self.next_account_index(extrinsic_signer).await?; let best_header = self.best_header().await?; let best_header_id = best_header.id(); - let subscription = self + let (sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY); + let (tracker, subscription) = self .jsonrpsee_execute(move |client| async move { let extrinsic = prepare_extrinsic(best_header_id, transaction_nonce)?; + let stall_timeout = transaction_stall_timeout( + extrinsic.era.mortality_period(), + C::AVERAGE_BLOCK_INTERVAL, + STALL_TIMEOUT, + ); let signed_extrinsic = S::sign_transaction(signing_data, extrinsic)?.encode(); let tx_hash = C::Hasher::hash(&signed_extrinsic); let subscription = SubstrateAuthorClient::::submit_and_watch_extrinsic( @@ -487,17 +493,21 @@ impl Client { e })?; log::trace!(target: "bridge", "Sent transaction to {} node: {:?}", C::NAME, tx_hash); - Ok(subscription) + let tracker = TransactionTracker::new( + stall_timeout, + tx_hash, + Subscription(Mutex::new(receiver)), + ); + Ok((tracker, subscription)) }) .await?; - let (sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY); self.tokio.spawn(Subscription::background_worker( C::NAME.into(), "extrinsic".into(), subscription, sender, )); - Ok(Subscription(Mutex::new(receiver))) + Ok(tracker) } /// Returns pending extrinsics from transaction pool. @@ -669,6 +679,14 @@ impl Client { } impl Subscription { + /// Consumes subscription and returns future statuses stream. + pub fn into_stream(self) -> impl futures::Stream { + futures::stream::unfold(self, |this| async { + let item = this.0.lock().await.next().await.unwrap_or(None); + item.map(|i| (i, this)) + }) + } + /// Return next item from the subscription. pub async fn next(&self) -> Result> { let mut receiver = self.0.lock().await; diff --git a/relays/client-substrate/src/lib.rs b/relays/client-substrate/src/lib.rs index 0234459f9d7..b9e489688d3 100644 --- a/relays/client-substrate/src/lib.rs +++ b/relays/client-substrate/src/lib.rs @@ -23,6 +23,7 @@ mod client; mod error; mod rpc; mod sync_header; +mod transaction_tracker; pub mod guard; pub mod metrics; @@ -39,6 +40,7 @@ pub use crate::{ client::{ChainRuntimeVersion, Client, OpaqueGrandpaAuthoritiesSet, Subscription}, error::{Error, Result}, sync_header::SyncHeader, + transaction_tracker::TransactionTracker, }; pub use bp_runtime::{ AccountIdOf, AccountPublicOf, BalanceOf, BlockNumberOf, Chain as ChainBase, HashOf, HeaderOf, diff --git a/relays/client-substrate/src/transaction_tracker.rs b/relays/client-substrate/src/transaction_tracker.rs new file mode 100644 index 00000000000..b85e859017f --- /dev/null +++ b/relays/client-substrate/src/transaction_tracker.rs @@ -0,0 +1,322 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Helper for tracking transaction invalidation events. + +use crate::{Chain, HashOf, Subscription, TransactionStatusOf}; + +use async_trait::async_trait; +use futures::{Stream, StreamExt}; +use relay_utils::TrackedTransactionStatus; +use std::time::Duration; + +/// Substrate transaction tracker implementation. +/// +/// Substrate node provides RPC API to submit and watch for transaction events. This way +/// we may know when transaction is included into block, finalized or rejected. There are +/// some edge cases, when we can't fully trust this mechanism - e.g. transaction may broadcasted +/// and then dropped out of node transaction pool (some other cases are also possible - node +/// restarts, connection lost, ...). Then we can't know for sure - what is currently happening +/// with our transaction. Is the transaction really lost? Is it still alive on the chain network? +/// +/// We have several options to handle such cases: +/// +/// 1) hope that the transaction is still alive and wait for its mining until it is spoiled; +/// +/// 2) assume that the transaction is lost and resubmit another transaction instantly; +/// +/// 3) wait for some time (if transaction is mortal - then until block where it dies; if it is +/// immortal - then for some time that we assume is long enough to mine it) and assume that +/// it is lost. +/// +/// This struct implements third option as it seems to be the most optimal. +pub struct TransactionTracker { + transaction_hash: HashOf, + stall_timeout: Duration, + subscription: Subscription>, +} + +impl TransactionTracker { + /// Create transaction tracker. + pub fn new( + stall_timeout: Duration, + transaction_hash: HashOf, + subscription: Subscription>, + ) -> Self { + Self { stall_timeout, transaction_hash, subscription } + } + + /// Wait for final transaction status and return it along with last known internal invalidation + /// status. + async fn do_wait(self) -> (TrackedTransactionStatus, InvalidationStatus) { + let invalidation_status = watch_transaction_status::( + self.transaction_hash, + self.subscription.into_stream(), + ) + .await; + match invalidation_status { + InvalidationStatus::Finalized => + (TrackedTransactionStatus::Finalized, invalidation_status), + InvalidationStatus::Invalid => (TrackedTransactionStatus::Lost, invalidation_status), + InvalidationStatus::Lost => { + async_std::task::sleep(self.stall_timeout).await; + // if someone is still watching for our transaction, then we're reporting + // an error here (which is treated as "transaction lost") + log::trace!( + target: "bridge", + "{} transaction {:?} is considered lost after timeout", + C::NAME, + self.transaction_hash, + ); + + (TrackedTransactionStatus::Lost, invalidation_status) + }, + } + } +} + +#[async_trait] +impl relay_utils::TransactionTracker for TransactionTracker { + async fn wait(self) -> TrackedTransactionStatus { + self.do_wait().await.0 + } +} + +/// Transaction invalidation status. +/// +/// Note that in places where the `TransactionTracker` is used, the finalization event will be +/// ignored - relay loops are detecting the mining/finalization using their own +/// techniques. That's why we're using `InvalidationStatus` here. +#[derive(Debug, PartialEq)] +enum InvalidationStatus { + /// Transaction has been included into block and finalized. + Finalized, + /// Transaction has been invalidated. + Invalid, + /// We have lost track of transaction status. + Lost, +} + +/// Watch for transaction status until transaction is finalized or we lose track of its status. +async fn watch_transaction_status>>( + transaction_hash: HashOf, + subscription: S, +) -> InvalidationStatus { + futures::pin_mut!(subscription); + + loop { + match subscription.next().await { + Some(TransactionStatusOf::::Finalized(block_hash)) => { + // the only "successful" outcome of this method is when the block with transaction + // has been finalized + log::trace!( + target: "bridge", + "{} transaction {:?} has been finalized at block: {:?}", + C::NAME, + transaction_hash, + block_hash, + ); + return InvalidationStatus::Finalized + }, + Some(TransactionStatusOf::::Invalid) => { + // if node says that the transaction is invalid, there are still chances that + // it is not actually invalid - e.g. if the block where transaction has been + // revalidated is retracted and transaction (at some other node pool) becomes + // valid again on other fork. But let's assume that the chances of this event + // are almost zero - there's a lot of things that must happen for this to be the + // case. + log::trace!( + target: "bridge", + "{} transaction {:?} has been invalidated", + C::NAME, + transaction_hash, + ); + return InvalidationStatus::Invalid + }, + Some(TransactionStatusOf::::Future) | + Some(TransactionStatusOf::::Ready) | + Some(TransactionStatusOf::::Broadcast(_)) => { + // nothing important (for us) has happened + }, + Some(TransactionStatusOf::::InBlock(block_hash)) => { + // TODO: read matching system event (ExtrinsicSuccess or ExtrinsicFailed), log it + // here and use it later (on finality) for reporting invalid transaction + // https://github.com/paritytech/parity-bridges-common/issues/1464 + log::trace!( + target: "bridge", + "{} transaction {:?} has been included in block: {:?}", + C::NAME, + transaction_hash, + block_hash, + ); + }, + Some(TransactionStatusOf::::Retracted(block_hash)) => { + log::trace!( + target: "bridge", + "{} transaction {:?} at block {:?} has been retracted", + C::NAME, + transaction_hash, + block_hash, + ); + }, + Some(TransactionStatusOf::::FinalityTimeout(block_hash)) => { + // finality is lagging? let's wait a bit more and report a stall + log::trace!( + target: "bridge", + "{} transaction {:?} block {:?} has not been finalized for too long", + C::NAME, + transaction_hash, + block_hash, + ); + return InvalidationStatus::Lost + }, + Some(TransactionStatusOf::::Usurped(new_transaction_hash)) => { + // this may be result of our transaction resubmitter work or some manual + // intervention. In both cases - let's start stall timeout, because the meaning + // of transaction may have changed + log::trace!( + target: "bridge", + "{} transaction {:?} has been usurped by new transaction: {:?}", + C::NAME, + transaction_hash, + new_transaction_hash, + ); + return InvalidationStatus::Lost + }, + Some(TransactionStatusOf::::Dropped) => { + // the transaction has been removed from the pool because of its limits. Let's wait + // a bit and report a stall + log::trace!( + target: "bridge", + "{} transaction {:?} has been dropped from the pool", + C::NAME, + transaction_hash, + ); + return InvalidationStatus::Lost + }, + None => { + // the status of transaction is unknown to us (the subscription has been closed?). + // Let's wait a bit and report a stall + return InvalidationStatus::Lost + }, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_chain::TestChain; + use futures::{FutureExt, SinkExt}; + use sc_transaction_pool_api::TransactionStatus; + + async fn on_transaction_status( + status: TransactionStatus, HashOf>, + ) -> Option<(TrackedTransactionStatus, InvalidationStatus)> { + let (mut sender, receiver) = futures::channel::mpsc::channel(1); + let tx_tracker = TransactionTracker::::new( + Duration::from_secs(0), + Default::default(), + Subscription(async_std::sync::Mutex::new(receiver)), + ); + + sender.send(Some(status)).await.unwrap(); + tx_tracker.do_wait().now_or_never() + } + + #[async_std::test] + async fn returns_finalized_on_finalized() { + assert_eq!( + on_transaction_status(TransactionStatus::Finalized(Default::default())).await, + Some((TrackedTransactionStatus::Finalized, InvalidationStatus::Finalized)), + ); + } + + #[async_std::test] + async fn returns_invalid_on_invalid() { + assert_eq!( + on_transaction_status(TransactionStatus::Invalid).await, + Some((TrackedTransactionStatus::Lost, InvalidationStatus::Invalid)), + ); + } + + #[async_std::test] + async fn waits_on_future() { + assert_eq!(on_transaction_status(TransactionStatus::Future).await, None,); + } + + #[async_std::test] + async fn waits_on_ready() { + assert_eq!(on_transaction_status(TransactionStatus::Ready).await, None,); + } + + #[async_std::test] + async fn waits_on_broadcast() { + assert_eq!( + on_transaction_status(TransactionStatus::Broadcast(Default::default())).await, + None, + ); + } + + #[async_std::test] + async fn waits_on_in_block() { + assert_eq!( + on_transaction_status(TransactionStatus::InBlock(Default::default())).await, + None, + ); + } + + #[async_std::test] + async fn waits_on_retracted() { + assert_eq!( + on_transaction_status(TransactionStatus::Retracted(Default::default())).await, + None, + ); + } + + #[async_std::test] + async fn lost_on_finality_timeout() { + assert_eq!( + on_transaction_status(TransactionStatus::FinalityTimeout(Default::default())).await, + Some((TrackedTransactionStatus::Lost, InvalidationStatus::Lost)), + ); + } + + #[async_std::test] + async fn lost_on_usurped() { + assert_eq!( + on_transaction_status(TransactionStatus::Usurped(Default::default())).await, + Some((TrackedTransactionStatus::Lost, InvalidationStatus::Lost)), + ); + } + + #[async_std::test] + async fn lost_on_dropped() { + assert_eq!( + on_transaction_status(TransactionStatus::Dropped).await, + Some((TrackedTransactionStatus::Lost, InvalidationStatus::Lost)), + ); + } + + #[async_std::test] + async fn lost_on_subscription_error() { + assert_eq!( + watch_transaction_status::(Default::default(), futures::stream::iter([])) + .now_or_never(), + Some(InvalidationStatus::Lost), + ); + } +} diff --git a/relays/finality/src/finality_loop.rs b/relays/finality/src/finality_loop.rs index c29a5d5fec2..951edfdde94 100644 --- a/relays/finality/src/finality_loop.rs +++ b/relays/finality/src/finality_loop.rs @@ -29,7 +29,7 @@ use futures::{select, Future, FutureExt, Stream, StreamExt}; use num_traits::{One, Saturating}; use relay_utils::{ metrics::MetricsParams, relay_loop::Client as RelayClient, retry_backoff, FailedClient, - HeaderId, MaybeConnectionError, + HeaderId, MaybeConnectionError, TrackedTransactionStatus, TransactionTracker, }; use std::{ pin::Pin, @@ -86,6 +86,9 @@ pub trait SourceClient: RelayClient { /// Target client used in finality synchronization loop. #[async_trait] pub trait TargetClient: RelayClient { + /// Transaction tracker to track submitted transactions. + type TransactionTracker: TransactionTracker; + /// Get best finalized source block number. async fn best_finalized_source_block_id( &self, @@ -96,7 +99,7 @@ pub trait TargetClient: RelayClient { &self, header: P::Header, proof: P::FinalityProof, - ) -> Result<(), Self::Error>; + ) -> Result; } /// Return prefix that will be used by default to expose Prometheus metrics of the finality proofs @@ -153,8 +156,6 @@ pub(crate) enum Error { Target(TargetError), /// Finality proof for mandatory header is missing from the source node. MissingMandatoryFinalityProof(P::Number), - /// The synchronization has stalled. - Stalled, } impl Error @@ -167,7 +168,6 @@ where match *self { Error::Source(ref error) if error.is_connection_error() => Err(FailedClient::Source), Error::Target(ref error) if error.is_connection_error() => Err(FailedClient::Target), - Error::Stalled => Err(FailedClient::Both), _ => Ok(()), } } @@ -175,9 +175,9 @@ where /// Information about transaction that we have submitted. #[derive(Debug, Clone)] -pub(crate) struct Transaction { - /// Time when we have submitted this transaction. - pub time: Instant, +pub(crate) struct Transaction { + /// Submitted transaction tracker. + pub tracker: Tracker, /// The number of the header we have submitted. pub submitted_header_number: Number, } @@ -206,11 +206,12 @@ pub(crate) struct FinalityLoopState<'a, P: FinalitySyncPipeline, FinalityProofsS &'a mut RestartableFinalityProofsStream, /// Recent finality proofs that we have read from the stream. pub(crate) recent_finality_proofs: &'a mut FinalityProofs

, - /// Last transaction that we have submitted to the target node. - pub(crate) last_transaction: Option>, + /// Number of the last header, submitted to the target node. + pub(crate) submitted_header_number: Option, } -async fn run_until_connection_lost( +/// Run finality relay loop until connection to one of nodes is lost. +pub(crate) async fn run_until_connection_lost( source_client: impl SourceClient

, target_client: impl TargetClient

, sync_params: FinalitySyncParams, @@ -230,8 +231,9 @@ async fn run_until_connection_lost( }) }; + let last_transaction_tracker = futures::future::Fuse::terminated(); let exit_signal = exit_signal.fuse(); - futures::pin_mut!(exit_signal); + futures::pin_mut!(last_transaction_tracker, exit_signal); let mut finality_proofs_stream = RestartableFinalityProofsStream { needs_restart: false, @@ -241,7 +243,7 @@ async fn run_until_connection_lost( let mut progress = (Instant::now(), None); let mut retry_backoff = retry_backoff(); - let mut last_transaction = None; + let mut last_submitted_header_number = None; loop { // run loop iteration @@ -252,7 +254,7 @@ async fn run_until_connection_lost( progress: &mut progress, finality_proofs_stream: &mut finality_proofs_stream, recent_finality_proofs: &mut recent_finality_proofs, - last_transaction: last_transaction.clone(), + submitted_header_number: last_submitted_header_number, }, &sync_params, &metrics_sync, @@ -261,8 +263,14 @@ async fn run_until_connection_lost( // deal with errors let next_tick = match iteration_result { - Ok(updated_last_transaction) => { - last_transaction = updated_last_transaction; + Ok(Some(updated_last_transaction)) => { + last_transaction_tracker.set(updated_last_transaction.tracker.wait().fuse()); + last_submitted_header_number = + Some(updated_last_transaction.submitted_header_number); + retry_backoff.reset(); + sync_params.tick + }, + Ok(None) => { retry_backoff.reset(); sync_params.tick }, @@ -281,6 +289,18 @@ async fn run_until_connection_lost( // wait till exit signal, or new source block select! { + transaction_status = last_transaction_tracker => { + if transaction_status == TrackedTransactionStatus::Lost { + log::error!( + target: "bridge", + "Finality synchronization from {} to {} has stalled. Going to restart", + P::SOURCE_NAME, + P::TARGET_NAME, + ); + + return Err(FailedClient::Both); + } + }, _ = async_std::task::sleep(next_tick).fuse() => {}, _ = exit_signal => return Ok(()), } @@ -293,7 +313,7 @@ pub(crate) async fn run_loop_iteration( state: FinalityLoopState<'_, P, SC::FinalityProofsStream>, sync_params: &FinalitySyncParams, metrics_sync: &Option, -) -> Result>, Error> +) -> Result>, Error> where P: FinalitySyncPipeline, SC: SourceClient

, @@ -333,20 +353,11 @@ where // if we have already submitted header, then we just need to wait for it // if we're waiting too much, then we believe our transaction has been lost and restart sync - if let Some(last_transaction) = state.last_transaction { - if best_number_at_target >= last_transaction.submitted_header_number { + if let Some(submitted_header_number) = state.submitted_header_number { + if best_number_at_target >= submitted_header_number { // transaction has been mined && we can continue - } else if last_transaction.time.elapsed() > sync_params.stall_timeout { - log::error!( - target: "bridge", - "Finality synchronization from {} to {} has stalled. Going to restart", - P::SOURCE_NAME, - P::TARGET_NAME, - ); - - return Err(Error::Stalled) } else { - return Ok(Some(last_transaction)) + return Ok(None) } } @@ -363,22 +374,20 @@ where .await? { Some((header, justification)) => { - let new_transaction = - Transaction { time: Instant::now(), submitted_header_number: header.number() }; - + let submitted_header_number = header.number(); log::debug!( target: "bridge", "Going to submit finality proof of {} header #{:?} to {}", P::SOURCE_NAME, - new_transaction.submitted_header_number, + submitted_header_number, P::TARGET_NAME, ); - target_client + let tracker = target_client .submit_finality_proof(header, justification) .await .map_err(Error::Target)?; - Ok(Some(new_transaction)) + Ok(Some(Transaction { tracker, submitted_header_number })) }, None => Ok(None), } diff --git a/relays/finality/src/finality_loop_tests.rs b/relays/finality/src/finality_loop_tests.rs index b7f7bc80029..7144ccb0c48 100644 --- a/relays/finality/src/finality_loop_tests.rs +++ b/relays/finality/src/finality_loop_tests.rs @@ -20,10 +20,10 @@ use crate::{ finality_loop::{ - prune_recent_finality_proofs, read_finality_proofs_from_stream, run, run_loop_iteration, - select_better_recent_finality_proof, select_header_to_submit, FinalityLoopState, - FinalityProofs, FinalitySyncParams, RestartableFinalityProofsStream, SourceClient, - TargetClient, + prune_recent_finality_proofs, read_finality_proofs_from_stream, run_loop_iteration, + run_until_connection_lost, select_better_recent_finality_proof, select_header_to_submit, + FinalityLoopState, FinalityProofs, FinalitySyncParams, RestartableFinalityProofsStream, + SourceClient, TargetClient, }, sync_loop_metrics::SyncLoopMetrics, FinalityProof, FinalitySyncPipeline, SourceHeader, @@ -33,7 +33,8 @@ use async_trait::async_trait; use futures::{FutureExt, Stream, StreamExt}; use parking_lot::Mutex; use relay_utils::{ - metrics::MetricsParams, relay_loop::Client as RelayClient, HeaderId, MaybeConnectionError, + relay_loop::Client as RelayClient, FailedClient, HeaderId, MaybeConnectionError, + TrackedTransactionStatus, TransactionTracker, }; use std::{ collections::HashMap, @@ -46,6 +47,22 @@ type IsMandatory = bool; type TestNumber = u64; type TestHash = u64; +#[derive(Clone, Debug)] +struct TestTransactionTracker(TrackedTransactionStatus); + +impl Default for TestTransactionTracker { + fn default() -> TestTransactionTracker { + TestTransactionTracker(TrackedTransactionStatus::Finalized) + } +} + +#[async_trait] +impl TransactionTracker for TestTransactionTracker { + async fn wait(self) -> TrackedTransactionStatus { + self.0 + } +} + #[derive(Debug, Clone)] enum TestError { NonConnection, @@ -104,6 +121,7 @@ struct ClientsData { target_best_block_id: HeaderId, target_headers: Vec<(TestSourceHeader, TestFinalityProof)>, + target_transaction_tracker: TestTransactionTracker, } #[derive(Clone)] @@ -164,6 +182,8 @@ impl RelayClient for TestTargetClient { #[async_trait] impl TargetClient for TestTargetClient { + type TransactionTracker = TestTransactionTracker; + async fn best_finalized_source_block_id( &self, ) -> Result, TestError> { @@ -176,12 +196,13 @@ impl TargetClient for TestTargetClient { &self, header: TestSourceHeader, proof: TestFinalityProof, - ) -> Result<(), TestError> { + ) -> Result { let mut data = self.data.lock(); (self.on_method_call)(&mut data); data.target_best_block_id = HeaderId(header.number(), header.hash()); data.target_headers.push((header, proof)); - Ok(()) + (self.on_method_call)(&mut data); + Ok(data.target_transaction_tracker.clone()) } } @@ -203,6 +224,7 @@ fn prepare_test_clients( target_best_block_id: HeaderId(5, 5), target_headers: vec![], + target_transaction_tracker: TestTransactionTracker(TrackedTransactionStatus::Finalized), })); ( TestSourceClient { @@ -224,7 +246,7 @@ fn test_sync_params() -> FinalitySyncParams { fn run_sync_loop( state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static, -) -> ClientsData { +) -> (ClientsData, Result<(), FailedClient>) { let (exit_sender, exit_receiver) = futures::channel::mpsc::unbounded(); let (source_client, target_client) = prepare_test_clients( exit_sender, @@ -243,21 +265,21 @@ fn run_sync_loop( let sync_params = test_sync_params(); let clients_data = source_client.data.clone(); - let _ = async_std::task::block_on(run( + let result = async_std::task::block_on(run_until_connection_lost( source_client, target_client, sync_params, - MetricsParams::disabled(), + None, exit_receiver.into_future().map(|(_, _)| ()), )); let clients_data = clients_data.lock().clone(); - clients_data + (clients_data, result) } #[test] fn finality_sync_loop_works() { - let client_data = run_sync_loop(|data| { + let (client_data, result) = run_sync_loop(|data| { // header#7 has persistent finality proof, but it isn't mandatory => it isn't submitted, // because header#8 has persistent finality proof && it is mandatory => it is submitted // header#9 has persistent finality proof, but it isn't mandatory => it is submitted, @@ -286,6 +308,7 @@ fn finality_sync_loop_works() { data.target_best_block_id.0 == 16 }); + assert_eq!(result, Ok(())); assert_eq!( client_data.target_headers, vec![ @@ -538,7 +561,7 @@ fn different_forks_at_source_and_at_target_are_detected() { progress: &mut progress, finality_proofs_stream: &mut finality_proofs_stream, recent_finality_proofs: &mut recent_finality_proofs, - last_transaction: None, + submitted_header_number: None, }, &test_sync_params(), &Some(metrics_sync.clone()), @@ -547,3 +570,14 @@ fn different_forks_at_source_and_at_target_are_detected() { assert!(!metrics_sync.is_using_same_fork()); } + +#[test] +fn stalls_when_transaction_tracker_returns_error() { + let (_, result) = run_sync_loop(|data| { + data.target_transaction_tracker = TestTransactionTracker(TrackedTransactionStatus::Lost); + data.target_best_block_id = HeaderId(5, 5); + data.target_best_block_id.0 == 16 + }); + + assert_eq!(result, Err(FailedClient::Both)); +} diff --git a/relays/lib-substrate-relay/src/finality/target.rs b/relays/lib-substrate-relay/src/finality/target.rs index 351f21cec80..7bdb77d4ee0 100644 --- a/relays/lib-substrate-relay/src/finality/target.rs +++ b/relays/lib-substrate-relay/src/finality/target.rs @@ -28,7 +28,7 @@ use async_trait::async_trait; use finality_relay::TargetClient; use relay_substrate_client::{ AccountIdOf, AccountKeyPairOf, Chain, Client, Error, HeaderIdOf, HeaderOf, SignParam, - SyncHeader, TransactionEra, TransactionSignScheme, UnsignedTransaction, + SyncHeader, TransactionEra, TransactionSignScheme, TransactionTracker, UnsignedTransaction, }; use relay_utils::relay_loop::Client as RelayClient; use sp_core::Pair; @@ -89,6 +89,8 @@ where AccountIdOf: From< as Pair>::Public>, P::TransactionSignScheme: TransactionSignScheme, { + type TransactionTracker = TransactionTracker; + async fn best_finalized_source_block_id(&self) -> Result, Error> { // we can't continue to relay finality if target node is out of sync, because // it may have already received (some of) headers that we're going to relay @@ -109,14 +111,14 @@ where &self, header: SyncHeader>, proof: SubstrateFinalityProof

, - ) -> Result<(), Error> { + ) -> Result { let genesis_hash = *self.client.genesis_hash(); let transaction_params = self.transaction_params.clone(); let call = P::SubmitFinalityProofCallBuilder::build_submit_finality_proof_call(header, proof); let (spec_version, transaction_version) = self.client.simple_runtime_version().await?; self.client - .submit_signed_extrinsic( + .submit_and_watch_signed_extrinsic( self.transaction_params.signer.public().into(), SignParam:: { spec_version, @@ -130,6 +132,5 @@ where }, ) .await - .map(drop) } } diff --git a/relays/utils/src/lib.rs b/relays/utils/src/lib.rs index 8e8870ac188..dbc8e5df821 100644 --- a/relays/utils/src/lib.rs +++ b/relays/utils/src/lib.rs @@ -20,6 +20,7 @@ pub use bp_runtime::HeaderId; pub use error::Error; pub use relay_loop::{relay_loop, relay_metrics}; +use async_trait::async_trait; use backoff::{backoff::Backoff, ExponentialBackoff}; use futures::future::FutureExt; use std::time::Duration; @@ -119,6 +120,22 @@ pub trait MaybeConnectionError { fn is_connection_error(&self) -> bool; } +/// Final status of the tracked transaction. +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum TrackedTransactionStatus { + /// Transaction has been lost. + Lost, + /// Transaction has been mined and finalized. + Finalized, +} + +/// Transaction tracker. +#[async_trait] +pub trait TransactionTracker: Send { + /// Wait until transaction is either finalized or invalidated/lost. + async fn wait(self) -> TrackedTransactionStatus; +} + /// Stringified error that may be either connection-related or not. #[derive(Error, Debug)] pub enum StringifiedMaybeConnectionError {