diff --git a/Cargo.lock b/Cargo.lock index e8582c36f74..1deccecba57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1588,6 +1588,7 @@ dependencies = [ "polkadot-primitives", "sc-client-api", "sc-consensus", + "sc-service", "sp-blockchain", "sp-consensus", "sp-runtime", diff --git a/client/consensus/common/Cargo.toml b/client/consensus/common/Cargo.toml index bcaa7ba73b2..b64db352d81 100644 --- a/client/consensus/common/Cargo.toml +++ b/client/consensus/common/Cargo.toml @@ -16,6 +16,7 @@ tracing = "0.1.37" # Substrate sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-service = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/client/consensus/common/src/level_monitor.rs b/client/consensus/common/src/level_monitor.rs index f959de4e73c..4e545e409df 100644 --- a/client/consensus/common/src/level_monitor.rs +++ b/client/consensus/common/src/level_monitor.rs @@ -204,7 +204,10 @@ where let level = self.levels.get(&number)?; - for blk_hash in level.iter().filter(|hash| **hash != best_hash) { + for blk_hash in level + .iter() + .filter(|hash| **hash != best_hash && self.freshness.get(*hash) != Some(&0u32.into())) + { // Search for the fresher leaf information for this block let candidate_info = leaves .iter() diff --git a/client/consensus/common/src/lib.rs b/client/consensus/common/src/lib.rs index e901a0e4e8a..120f3bd7f96 100644 --- a/client/consensus/common/src/lib.rs +++ b/client/consensus/common/src/lib.rs @@ -14,23 +14,25 @@ // You should have received a copy of the GNU General Public License // along with Cumulus. If not, see . -use polkadot_primitives::v2::{Hash as PHash, PersistedValidationData}; +use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; +use polkadot_primitives::v2::{Hash as RelayHash, Id as ParaId, PersistedValidationData}; use sc_client_api::Backend; use sc_consensus::{shared_data::SharedData, BlockImport, ImportResult}; -use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; +use sc_service::SpawnTaskHandle; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Zero}; use std::sync::Arc; mod level_monitor; mod parachain_consensus; + #[cfg(test)] mod tests; -pub use parachain_consensus::run_parachain_consensus; - use level_monitor::LevelMonitor; pub use level_monitor::{LevelLimit, MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT}; +pub use parachain_consensus::run_parachain_consensus; /// The result of [`ParachainConsensus::produce_candidate`]. pub struct ParachainCandidate { @@ -59,7 +61,7 @@ pub trait ParachainConsensus: Send + Sync + dyn_clone::DynClone { async fn produce_candidate( &mut self, parent: &B::Header, - relay_parent: PHash, + relay_parent: RelayHash, validation_data: &PersistedValidationData, ) -> Option>; } @@ -71,7 +73,7 @@ impl ParachainConsensus for Box + Send + async fn produce_candidate( &mut self, parent: &B::Header, - relay_parent: PHash, + relay_parent: RelayHash, validation_data: &PersistedValidationData, ) -> Option> { (*self).produce_candidate(parent, relay_parent, validation_data).await @@ -112,6 +114,94 @@ impl> ParachainBlockImport Self { inner, monitor } } + + /// EXPERIMENTAL - AND NOT THE DEFINITIVE INTERFACE + /// + /// Spawns an observer task preventing the level monitor from removing the blocks that + /// were already available on the relay chain. + pub fn spawn_relay_chain_observer( + &self, + relay_chain: RC, + para_id: ParaId, + spawn_handle: SpawnTaskHandle, + ) where + RC: RelayChainInterface + Clone + 'static, + BE: 'static, + { + let monitor = match self.monitor.as_ref() { + Some(m) => m.clone(), + None => { + eprintln!("YYY Monitor not instanced without level limit"); + return + }, + }; + + let relay_clone = relay_chain.clone(); + let task = async move { + // TODO: is observer exit a FATAL error? + match run_relay_chain_observer::(relay_clone, para_id, monitor).await { + Ok(_) => eprintln!("YYY EXITED"), + Err(e) => eprintln!("YYY Relay chain observer is DEAD: {:?}", e.to_string()), + } + }; + + spawn_handle.spawn("relay chain listener", None, task); + } +} + +// EXPERIMENTAL +// +// The following code aims to keep the monitor more resilient and should +// prevent the removal of blocks marked as available by the relay chain. + +async fn run_relay_chain_observer>( + relay_chain: impl RelayChainInterface, + para_id: ParaId, + monitor: SharedData>, +) -> RelayChainResult<()> { + use futures::StreamExt; + + eprintln!("YYY START IMPORT LISTENER"); + let mut imports = relay_chain.import_notification_stream().await?; + loop { + let pheader = match imports.next().await { + Some(h) => h, + None => { + eprintln!("YYY spurious notificaiton"); + continue + }, + }; + let hash = pheader.hash(); + eprintln!("YYY: imported {:?}", hash); + + let validation_data = relay_chain + .persisted_validation_data( + hash, + para_id, + polkadot_primitives::v2::OccupiedCoreAssumption::TimedOut, + ) + .await?; + let validation_data = match validation_data { + Some(v) => v, + None => { + eprintln!("YYY no state associated"); + continue + }, + }; + + use codec::Decode; + let para_head = Block::Header::decode(&mut &validation_data.parent_head.0[..])?; + let para_hash = para_head.hash(); + + eprintln!("YYY: setting monitor availability for {:?}", para_hash); + + if let Some(entry) = monitor.shared_data().freshness.get_mut(¶_hash) { + eprintln!("YYY: Updated"); + *entry = Zero::zero(); + } + + eprintln!("YYY: {:#?}", para_head); + } } impl Clone for ParachainBlockImport { diff --git a/polkadot-parachain/src/service.rs b/polkadot-parachain/src/service.rs index 028dc34203f..f3fa7618397 100644 --- a/polkadot-parachain/src/service.rs +++ b/polkadot-parachain/src/service.rs @@ -536,6 +536,12 @@ where s => s.to_string().into(), })?; + block_import.spawn_relay_chain_observer( + relay_chain_interface.clone(), + para_id, + task_manager.spawn_handle(), + ); + let block_announce_validator = BlockAnnounceValidator::new(relay_chain_interface.clone(), para_id);