From 33d47e0682c7b2f11c7ea009b48e6c3895478b17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Thu, 14 Nov 2024 14:04:39 +0100 Subject: [PATCH 1/5] slot-based-collator: Implement dedicated block import The `SlotBasedBlockImport` job is to collect the storage proofs of all blocks getting imported. These storage proofs alongside the block are being forwarded to the collation task. Right now they are just being thrown away. More logic will follow later. Basically this will be required to include multiple blocks into one `PoV` which will then be done by the collation task. --- Cargo.lock | 2 + cumulus/client/consensus/aura/Cargo.toml | 1 + .../src/collators/slot_based/block_import.rs | 144 ++++++++++++++++++ .../collators/slot_based/collation_task.rs | 40 +++-- .../aura/src/collators/slot_based/mod.rs | 9 +- cumulus/polkadot-omni-node/lib/Cargo.toml | 1 + .../polkadot-omni-node/lib/src/common/spec.rs | 78 ++++++++-- .../lib/src/common/types.rs | 14 +- .../polkadot-omni-node/lib/src/nodes/aura.rs | 119 ++++++++++++--- .../lib/src/nodes/manual_seal.rs | 18 ++- cumulus/test/service/src/lib.rs | 20 ++- 11 files changed, 386 insertions(+), 60 deletions(-) create mode 100644 cumulus/client/consensus/aura/src/collators/slot_based/block_import.rs diff --git a/Cargo.lock b/Cargo.lock index 1e1c902df0e1..7d3b0e9955e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4151,6 +4151,7 @@ dependencies = [ "sp-runtime 31.0.1", "sp-state-machine 0.35.0", "sp-timestamp", + "sp-trie 29.0.0", "substrate-prometheus-endpoint", "tokio", "tracing", @@ -15071,6 +15072,7 @@ dependencies = [ "serde_json", "sp-api 26.0.0", "sp-block-builder", + "sp-consensus", "sp-consensus-aura", "sp-core 28.0.0", "sp-genesis-builder", diff --git a/cumulus/client/consensus/aura/Cargo.toml b/cumulus/client/consensus/aura/Cargo.toml index 0bb2de6bb9b8..4d513fa0f937 100644 --- a/cumulus/client/consensus/aura/Cargo.toml +++ b/cumulus/client/consensus/aura/Cargo.toml @@ -33,6 +33,7 @@ sp-blockchain = { workspace = true, default-features = true } sp-consensus = { workspace = true, default-features = true } sp-consensus-aura = { workspace = true, default-features = true } sp-core = { workspace = true, default-features = true } +sp-trie = { workspace = true, default-features = true } sp-inherents = { workspace = true, default-features = true } sp-keystore = { workspace = true, default-features = true } sp-runtime = { workspace = true, default-features = true } diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/block_import.rs b/cumulus/client/consensus/aura/src/collators/slot_based/block_import.rs new file mode 100644 index 000000000000..9c53da6a6b7d --- /dev/null +++ b/cumulus/client/consensus/aura/src/collators/slot_based/block_import.rs @@ -0,0 +1,144 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +use futures::{stream::FusedStream, StreamExt}; +use sc_consensus::{BlockImport, StateAction}; +use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; +use sp_api::{ApiExt, CallApiAt, CallContext, Core, ProvideRuntimeApi, StorageProof}; +use sp_runtime::traits::{Block as BlockT, Header as _}; +use sp_trie::proof_size_extension::ProofSizeExt; +use std::sync::Arc; + +/// Handle for receiving the block and the storage proof from the [`SlotBasedBlockImport`]. +/// +/// This handle should be passed to [`Params`](super::Params) or can also be dropped if the node is +/// not running as collator. +pub struct SlotBasedBlockImportHandle { + receiver: TracingUnboundedReceiver<(Block, StorageProof)>, +} + +impl SlotBasedBlockImportHandle { + /// Returns the next item. + /// + /// The future will never return when the internal channel is closed. + pub async fn next(&mut self) -> (Block, StorageProof) { + loop { + if self.receiver.is_terminated() { + futures::pending!() + } else if let Some(res) = self.receiver.next().await { + return res + } + } + } +} + +/// Special block import for the slot based collator. +pub struct SlotBasedBlockImport { + inner: BI, + client: Arc, + sender: TracingUnboundedSender<(Block, StorageProof)>, +} + +impl SlotBasedBlockImport { + /// Create a new instance. + /// + /// The returned [`SlotBasedBlockImportHandle`] needs to be passed to the + /// [`Params`](super::Params), so that this block import instance can communicate with the + /// collation task. If the node is not running as a collator, just dropping the handle is fine. + pub fn new(inner: BI, client: Arc) -> (Self, SlotBasedBlockImportHandle) { + let (sender, receiver) = tracing_unbounded("SlotBasedBlockImportChannel", 1000); + + (Self { sender, client, inner }, SlotBasedBlockImportHandle { receiver }) + } +} + +impl Clone for SlotBasedBlockImport { + fn clone(&self) -> Self { + Self { inner: self.inner.clone(), client: self.client.clone(), sender: self.sender.clone() } + } +} + +#[async_trait::async_trait] +impl BlockImport for SlotBasedBlockImport +where + Block: BlockT, + BI: BlockImport + Send + Sync, + BI::Error: Into, + Client: ProvideRuntimeApi + CallApiAt + Send + Sync, + Client::StateBackend: Send, + Client::Api: Core, +{ + type Error = sp_consensus::Error; + + async fn check_block( + &self, + block: sc_consensus::BlockCheckParams, + ) -> Result { + self.inner.check_block(block).await.map_err(Into::into) + } + + async fn import_block( + &self, + mut params: sc_consensus::BlockImportParams, + ) -> Result { + // If the channel exists and it is required to execute the block, we will execute the block + // here. This is done to collect the storage proof and to prevent re-execution, we push + // downwards the state changes. `StateAction::ApplyChanges` is ignored, because it either + // means that the node produced the block itself or the block was imported via state sync. + if !self.sender.is_closed() && !matches!(params.state_action, StateAction::ApplyChanges(_)) + { + let mut runtime_api = self.client.runtime_api(); + + runtime_api.set_call_context(CallContext::Onchain); + + runtime_api.record_proof(); + let recorder = runtime_api + .proof_recorder() + .expect("Proof recording is enabled in the line above; qed."); + runtime_api.register_extension(ProofSizeExt::new(recorder)); + + let parent_hash = *params.header.parent_hash(); + + let block = Block::new(params.header.clone(), params.body.clone().unwrap_or_default()); + + runtime_api + .execute_block(parent_hash, block.clone()) + .map_err(|e| Box::new(e) as Box<_>)?; + + let storage_proof = + runtime_api.extract_proof().expect("Proof recording was enabled above; qed"); + + let state = self.client.state_at(parent_hash).map_err(|e| Box::new(e) as Box<_>)?; + let gen_storage_changes = runtime_api + .into_storage_changes(&state, parent_hash) + .map_err(sp_consensus::Error::ChainLookup)?; + + if params.header.state_root() != &gen_storage_changes.transaction_storage_root { + return Err(sp_consensus::Error::Other(Box::new( + sp_blockchain::Error::InvalidStateRoot, + ))) + } + + params.state_action = StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes( + gen_storage_changes, + )); + + let _ = self.sender.unbounded_send((block, storage_proof)); + } + + self.inner.import_block(params).await.map_err(Into::into) + } +} diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/collation_task.rs b/cumulus/client/consensus/aura/src/collators/slot_based/collation_task.rs index 5b8151f6302c..62e6f820c45c 100644 --- a/cumulus/client/consensus/aura/src/collators/slot_based/collation_task.rs +++ b/cumulus/client/consensus/aura/src/collators/slot_based/collation_task.rs @@ -47,6 +47,8 @@ pub struct Params { pub collator_service: CS, /// Receiver channel for communication with the block builder task. pub collator_receiver: TracingUnboundedReceiver>, + /// The handle from the special slot based block import. + pub block_import_handle: super::SlotBasedBlockImportHandle, } /// Asynchronously executes the collation task for a parachain. @@ -55,28 +57,48 @@ pub struct Params { /// collations to the relay chain. It listens for new best relay chain block notifications and /// handles collator messages. If our parachain is scheduled on a core and we have a candidate, /// the task will build a collation and send it to the relay chain. -pub async fn run_collation_task(mut params: Params) -where +pub async fn run_collation_task( + Params { + relay_client, + collator_key, + para_id, + reinitialize, + collator_service, + mut collator_receiver, + mut block_import_handle, + }: Params, +) where Block: BlockT, CS: CollatorServiceInterface + Send + Sync + 'static, RClient: RelayChainInterface + Clone + 'static, { - let Ok(mut overseer_handle) = params.relay_client.overseer_handle() else { + let Ok(mut overseer_handle) = relay_client.overseer_handle() else { tracing::error!(target: LOG_TARGET, "Failed to get overseer handle."); return }; cumulus_client_collator::initialize_collator_subsystems( &mut overseer_handle, - params.collator_key, - params.para_id, - params.reinitialize, + collator_key, + para_id, + reinitialize, ) .await; - let collator_service = params.collator_service; - while let Some(collator_message) = params.collator_receiver.next().await { - handle_collation_message(collator_message, &collator_service, &mut overseer_handle).await; + loop { + futures::select! { + collator_message = collator_receiver.next() => { + let Some(message) = collator_message else { + return; + }; + + handle_collation_message(message, &collator_service, &mut overseer_handle).await; + }, + block_import_msg = block_import_handle.next().fuse() => { + // TODO: Implement me. + let _ = block_import_msg; + } + } } } diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/mod.rs b/cumulus/client/consensus/aura/src/collators/slot_based/mod.rs index 7453d3c89d08..411433ae1c1e 100644 --- a/cumulus/client/consensus/aura/src/collators/slot_based/mod.rs +++ b/cumulus/client/consensus/aura/src/collators/slot_based/mod.rs @@ -56,12 +56,14 @@ use sp_runtime::traits::{Block as BlockT, Member}; use std::{sync::Arc, time::Duration}; use self::{block_builder_task::run_block_builder, collation_task::run_collation_task}; +pub use block_import::{SlotBasedBlockImport, SlotBasedBlockImportHandle}; mod block_builder_task; +mod block_import; mod collation_task; /// Parameters for [`run`]. -pub struct Params { +pub struct Params { /// Inherent data providers. Only non-consensus inherent data should be provided, i.e. /// the timestamp, slot, and paras inherents should be omitted, as they are set by this /// collator. @@ -93,11 +95,13 @@ pub struct Params { /// Drift slots by a fixed duration. This can be used to create more preferrable authoring /// timings. pub slot_drift: Duration, + /// The handle returned by [`SlotBasedBlockImport`]. + pub block_import_handle: SlotBasedBlockImportHandle, } /// Run aura-based block building and collation task. pub fn run( - params: Params, + params: Params, ) -> (impl futures::Future, impl futures::Future) where Block: BlockT, @@ -132,6 +136,7 @@ where reinitialize: params.reinitialize, collator_service: params.collator_service.clone(), collator_receiver: rx, + block_import_handle: params.block_import_handle, }; let collation_task_fut = run_collation_task::(collator_task_params); diff --git a/cumulus/polkadot-omni-node/lib/Cargo.toml b/cumulus/polkadot-omni-node/lib/Cargo.toml index a690229f1695..bd1f83d3a8e6 100644 --- a/cumulus/polkadot-omni-node/lib/Cargo.toml +++ b/cumulus/polkadot-omni-node/lib/Cargo.toml @@ -60,6 +60,7 @@ pallet-transaction-payment = { workspace = true, default-features = true } pallet-transaction-payment-rpc-runtime-api = { workspace = true, default-features = true } sp-inherents = { workspace = true, default-features = true } sp-api = { workspace = true, default-features = true } +sp-consensus = { workspace = true, default-features = true } sp-consensus-aura = { workspace = true, default-features = true } sc-consensus-manual-seal = { workspace = true, default-features = true } sc-sysinfo = { workspace = true, default-features = true } diff --git a/cumulus/polkadot-omni-node/lib/src/common/spec.rs b/cumulus/polkadot-omni-node/lib/src/common/spec.rs index 259f89049c92..d49dcf64b945 100644 --- a/cumulus/polkadot-omni-node/lib/src/common/spec.rs +++ b/cumulus/polkadot-omni-node/lib/src/common/spec.rs @@ -44,23 +44,28 @@ use sc_transaction_pool::TransactionPoolHandle; use sp_keystore::KeystorePtr; use std::{future::Future, pin::Pin, sync::Arc, time::Duration}; -pub(crate) trait BuildImportQueue { +pub(crate) trait BuildImportQueue< + Block: BlockT, + RuntimeApi, + BlockImport: sc_consensus::BlockImport, +> +{ fn build_import_queue( client: Arc>, - block_import: ParachainBlockImport, + block_import: ParachainBlockImport, config: &Configuration, telemetry_handle: Option, task_manager: &TaskManager, ) -> sc_service::error::Result>; } -pub(crate) trait StartConsensus +pub(crate) trait StartConsensus where RuntimeApi: ConstructNodeRuntimeApi>, { fn start_consensus( client: Arc>, - block_import: ParachainBlockImport, + block_import: ParachainBlockImport, prometheus_registry: Option<&Registry>, telemetry: Option, task_manager: &TaskManager, @@ -74,6 +79,7 @@ where announce_block: Arc>) + Send + Sync>, backend: Arc>, node_extra_args: NodeExtraArgs, + block_import_extra_return_value: BIExtraReturnValue, ) -> Result<(), sc_service::Error>; } @@ -92,6 +98,31 @@ fn warn_if_slow_hardware(hwbench: &sc_sysinfo::HwBench) { } } +pub(crate) trait InitBlockImport { + type BlockImport: sc_consensus::BlockImport + Clone + Send + Sync; + type ExtraReturnValue; + + fn init_block_import( + client: Arc>, + ) -> sc_service::error::Result<(Self::BlockImport, Self::ExtraReturnValue)>; +} + +pub(crate) struct ClientBlockImport; + +impl InitBlockImport for ClientBlockImport +where + RuntimeApi: Send + ConstructNodeRuntimeApi>, +{ + type BlockImport = Arc>; + type ExtraReturnValue = (); + + fn init_block_import( + client: Arc>, + ) -> sc_service::error::Result<(Self::BlockImport, Self::ExtraReturnValue)> { + Ok((client.clone(), ())) + } +} + pub(crate) trait BaseNodeSpec { type Block: NodeBlock; @@ -100,7 +131,13 @@ pub(crate) trait BaseNodeSpec { ParachainClient, >; - type BuildImportQueue: BuildImportQueue; + type BuildImportQueue: BuildImportQueue< + Self::Block, + Self::RuntimeApi, + >::BlockImport, + >; + + type WrapBlockImport: InitBlockImport; /// Starts a `ServiceBuilder` for a full service. /// @@ -108,7 +145,14 @@ pub(crate) trait BaseNodeSpec { /// be able to perform chain operations. fn new_partial( config: &Configuration, - ) -> sc_service::error::Result> { + ) -> sc_service::error::Result< + ParachainService< + Self::Block, + Self::RuntimeApi, + >::BlockImport, + >::ExtraReturnValue + > + >{ let telemetry = config .telemetry_endpoints .clone() @@ -160,7 +204,10 @@ pub(crate) trait BaseNodeSpec { .build(), ); - let block_import = ParachainBlockImport::new(client.clone(), backend.clone()); + let (block_import, extra_return_value) = + Self::WrapBlockImport::init_block_import(client.clone())?; + + let block_import = ParachainBlockImport::new(block_import, backend.clone()); let import_queue = Self::BuildImportQueue::build_import_queue( client.clone(), @@ -178,7 +225,7 @@ pub(crate) trait BaseNodeSpec { task_manager, transaction_pool, select_chain: (), - other: (block_import, telemetry, telemetry_worker_handle), + other: (block_import, telemetry, telemetry_worker_handle, extra_return_value), }) } } @@ -190,7 +237,12 @@ pub(crate) trait NodeSpec: BaseNodeSpec { TransactionPoolHandle>, >; - type StartConsensus: StartConsensus; + type StartConsensus: StartConsensus< + Self::Block, + Self::RuntimeApi, + >::BlockImport, + >::ExtraReturnValue, + >; const SYBIL_RESISTANCE: CollatorSybilResistance; @@ -213,7 +265,12 @@ pub(crate) trait NodeSpec: BaseNodeSpec { let parachain_config = prepare_node_config(parachain_config); let params = Self::new_partial(¶chain_config)?; - let (block_import, mut telemetry, telemetry_worker_handle) = params.other; + let ( + block_import, + mut telemetry, + telemetry_worker_handle, + block_import_extra_return_value, + ) = params.other; let client = params.client.clone(); let backend = params.backend.clone(); @@ -343,6 +400,7 @@ pub(crate) trait NodeSpec: BaseNodeSpec { announce_block, backend.clone(), node_extra_args, + block_import_extra_return_value, )?; } diff --git a/cumulus/polkadot-omni-node/lib/src/common/types.rs b/cumulus/polkadot-omni-node/lib/src/common/types.rs index 4bc58dc9db7e..978368be2584 100644 --- a/cumulus/polkadot-omni-node/lib/src/common/types.rs +++ b/cumulus/polkadot-omni-node/lib/src/common/types.rs @@ -22,7 +22,6 @@ use sc_service::{PartialComponents, TFullBackend, TFullClient}; use sc_telemetry::{Telemetry, TelemetryWorkerHandle}; use sc_transaction_pool::TransactionPoolHandle; use sp_runtime::{generic, traits::BlakeTwo256}; -use std::sync::Arc; pub use parachains_common::{AccountId, Balance, Hash, Nonce}; @@ -42,15 +41,20 @@ pub type ParachainClient = pub type ParachainBackend = TFullBackend; -pub type ParachainBlockImport = - TParachainBlockImport>, ParachainBackend>; +pub type ParachainBlockImport = + TParachainBlockImport>; /// Assembly of PartialComponents (enough to run chain ops subcommands) -pub type ParachainService = PartialComponents< +pub type ParachainService = PartialComponents< ParachainClient, ParachainBackend, (), DefaultImportQueue, TransactionPoolHandle>, - (ParachainBlockImport, Option, Option), + ( + ParachainBlockImport, + Option, + Option, + BIExtraReturnValue, + ), >; diff --git a/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs b/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs index ec5d0a439ec4..35bd78667a89 100644 --- a/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs +++ b/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs @@ -18,7 +18,10 @@ use crate::{ common::{ aura::{AuraIdT, AuraRuntimeApi}, rpc::BuildParachainRpcExtensions, - spec::{BaseNodeSpec, BuildImportQueue, NodeSpec, StartConsensus}, + spec::{ + BaseNodeSpec, BuildImportQueue, ClientBlockImport, InitBlockImport, NodeSpec, + StartConsensus, + }, types::{ AccountId, Balance, Hash, Nonce, ParachainBackend, ParachainBlockImport, ParachainClient, @@ -30,11 +33,14 @@ use crate::{ use cumulus_client_collator::service::{ CollatorService, ServiceInterface as CollatorServiceInterface, }; -use cumulus_client_consensus_aura::collators::lookahead::{self as aura, Params as AuraParams}; #[docify::export(slot_based_colator_import)] use cumulus_client_consensus_aura::collators::slot_based::{ self as slot_based, Params as SlotBasedParams, }; +use cumulus_client_consensus_aura::collators::{ + lookahead::{self as aura, Params as AuraParams}, + slot_based::{SlotBasedBlockImport, SlotBasedBlockImportHandle}, +}; use cumulus_client_consensus_proposer::{Proposer, ProposerInterface}; use cumulus_client_consensus_relay_chain::Verifier as RelayChainVerifier; #[allow(deprecated)] @@ -90,20 +96,23 @@ where /// Build the import queue for parachain runtimes that started with relay chain consensus and /// switched to aura. -pub(crate) struct BuildRelayToAuraImportQueue( - PhantomData<(Block, RuntimeApi, AuraId)>, +pub(crate) struct BuildRelayToAuraImportQueue( + PhantomData<(Block, RuntimeApi, AuraId, BlockImport)>, ); -impl BuildImportQueue - for BuildRelayToAuraImportQueue +impl + BuildImportQueue + for BuildRelayToAuraImportQueue where RuntimeApi: ConstructNodeRuntimeApi>, RuntimeApi::RuntimeApi: AuraRuntimeApi, AuraId: AuraIdT + Sync, + BlockImport: + sc_consensus::BlockImport + Send + Sync + 'static, { fn build_import_queue( client: Arc>, - block_import: ParachainBlockImport, + block_import: ParachainBlockImport, config: &Configuration, telemetry_handle: Option, task_manager: &TaskManager, @@ -158,20 +167,20 @@ where /// Uses the lookahead collator to support async backing. /// /// Start an aura powered parachain node. Some system chains use this. -pub(crate) struct AuraNode( - pub PhantomData<(Block, RuntimeApi, AuraId, StartConsensus)>, +pub(crate) struct AuraNode( + pub PhantomData<(Block, RuntimeApi, AuraId, StartConsensus, WrapBlockImport)>, ); -impl Default - for AuraNode +impl Default + for AuraNode { fn default() -> Self { Self(Default::default()) } } -impl BaseNodeSpec - for AuraNode +impl BaseNodeSpec + for AuraNode where Block: NodeBlock, RuntimeApi: ConstructNodeRuntimeApi>, @@ -179,14 +188,19 @@ where + pallet_transaction_payment_rpc::TransactionPaymentRuntimeApi + substrate_frame_rpc_system::AccountNonceApi, AuraId: AuraIdT + Sync, + InitBlockImport: self::InitBlockImport + Send, + InitBlockImport::BlockImport: + sc_consensus::BlockImport + 'static, { type Block = Block; type RuntimeApi = RuntimeApi; - type BuildImportQueue = BuildRelayToAuraImportQueue; + type BuildImportQueue = + BuildRelayToAuraImportQueue; + type WrapBlockImport = InitBlockImport; } -impl NodeSpec - for AuraNode +impl NodeSpec + for AuraNode where Block: NodeBlock, RuntimeApi: ConstructNodeRuntimeApi>, @@ -194,7 +208,15 @@ where + pallet_transaction_payment_rpc::TransactionPaymentRuntimeApi + substrate_frame_rpc_system::AccountNonceApi, AuraId: AuraIdT + Sync, - StartConsensus: self::StartConsensus + 'static, + StartConsensus: self::StartConsensus< + Block, + RuntimeApi, + InitBlockImport::BlockImport, + InitBlockImport::ExtraReturnValue, + > + 'static, + InitBlockImport: self::InitBlockImport + Send, + InitBlockImport::BlockImport: + sc_consensus::BlockImport + 'static, { type BuildRpcExtensions = BuildParachainRpcExtensions; type StartConsensus = StartConsensus; @@ -218,6 +240,7 @@ where RuntimeApi, AuraId, StartSlotBasedAuraConsensus, + StartSlotBasedAuraConsensus, >::default()) } else { Box::new(AuraNode::< @@ -225,6 +248,7 @@ where RuntimeApi, AuraId, StartLookaheadAuraConsensus, + ClientBlockImport, >::default()) } } @@ -244,7 +268,15 @@ where #[docify::export_content] fn launch_slot_based_collator( params: SlotBasedParams< - ParachainBlockImport, + Block, + ParachainBlockImport< + Block, + SlotBasedBlockImport< + Block, + Arc>, + ParachainClient, + >, + >, CIDP, ParachainClient, ParachainBackend, @@ -277,8 +309,17 @@ where } } -impl, RuntimeApi, AuraId> StartConsensus - for StartSlotBasedAuraConsensus +impl, RuntimeApi, AuraId> + StartConsensus< + Block, + RuntimeApi, + SlotBasedBlockImport< + Block, + Arc>, + ParachainClient, + >, + SlotBasedBlockImportHandle, + > for StartSlotBasedAuraConsensus where RuntimeApi: ConstructNodeRuntimeApi>, RuntimeApi::RuntimeApi: AuraRuntimeApi, @@ -286,7 +327,14 @@ where { fn start_consensus( client: Arc>, - block_import: ParachainBlockImport, + block_import: ParachainBlockImport< + Block, + SlotBasedBlockImport< + Block, + Arc>, + ParachainClient, + >, + >, prometheus_registry: Option<&Registry>, telemetry: Option, task_manager: &TaskManager, @@ -300,6 +348,7 @@ where announce_block: Arc>) + Send + Sync>, backend: Arc>, _node_extra_args: NodeExtraArgs, + block_import_handle: SlotBasedBlockImportHandle, ) -> Result<(), Error> { let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording( task_manager.spawn_handle(), @@ -335,6 +384,7 @@ where authoring_duration: Duration::from_millis(2000), reinitialize: false, slot_drift: Duration::from_secs(1), + block_import_handle, }; // We have a separate function only to be able to use `docify::export` on this piece of @@ -345,6 +395,27 @@ where } } +impl, RuntimeApi, AuraId> InitBlockImport + for StartSlotBasedAuraConsensus +where + RuntimeApi: ConstructNodeRuntimeApi>, + RuntimeApi::RuntimeApi: AuraRuntimeApi, + AuraId: AuraIdT + Sync, +{ + type BlockImport = SlotBasedBlockImport< + Block, + Arc>, + ParachainClient, + >; + type ExtraReturnValue = SlotBasedBlockImportHandle; + + fn init_block_import( + client: Arc>, + ) -> sc_service::error::Result<(Self::BlockImport, Self::ExtraReturnValue)> { + Ok(SlotBasedBlockImport::new(client.clone(), client)) + } +} + /// Wait for the Aura runtime API to appear on chain. /// This is useful for chains that started out without Aura. Components that /// are depending on Aura functionality will wait until Aura appears in the runtime. @@ -373,7 +444,8 @@ pub(crate) struct StartLookaheadAuraConsensus( PhantomData<(Block, RuntimeApi, AuraId)>, ); -impl, RuntimeApi, AuraId> StartConsensus +impl, RuntimeApi, AuraId> + StartConsensus>, ()> for StartLookaheadAuraConsensus where RuntimeApi: ConstructNodeRuntimeApi>, @@ -382,7 +454,7 @@ where { fn start_consensus( client: Arc>, - block_import: ParachainBlockImport, + block_import: ParachainBlockImport>>, prometheus_registry: Option<&Registry>, telemetry: Option, task_manager: &TaskManager, @@ -396,6 +468,7 @@ where announce_block: Arc>) + Send + Sync>, backend: Arc>, node_extra_args: NodeExtraArgs, + _: (), ) -> Result<(), Error> { let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording( task_manager.spawn_handle(), diff --git a/cumulus/polkadot-omni-node/lib/src/nodes/manual_seal.rs b/cumulus/polkadot-omni-node/lib/src/nodes/manual_seal.rs index 7e36ce735af3..70255bd2bc86 100644 --- a/cumulus/polkadot-omni-node/lib/src/nodes/manual_seal.rs +++ b/cumulus/polkadot-omni-node/lib/src/nodes/manual_seal.rs @@ -16,7 +16,7 @@ use crate::common::{ rpc::BuildRpcExtensions as BuildRpcExtensionsT, - spec::{BaseNodeSpec, BuildImportQueue, NodeSpec as NodeSpecT}, + spec::{BaseNodeSpec, BuildImportQueue, ClientBlockImport, NodeSpec as NodeSpecT}, types::{Hash, ParachainBlockImport, ParachainClient}, }; use codec::Encode; @@ -32,12 +32,19 @@ use std::{marker::PhantomData, sync::Arc}; pub struct ManualSealNode(PhantomData); -impl BuildImportQueue - for ManualSealNode +impl + BuildImportQueue< + NodeSpec::Block, + NodeSpec::RuntimeApi, + Arc>, + > for ManualSealNode { fn build_import_queue( client: Arc>, - _block_import: ParachainBlockImport, + _block_import: ParachainBlockImport< + NodeSpec::Block, + Arc>, + >, config: &Configuration, _telemetry_handle: Option, task_manager: &TaskManager, @@ -54,6 +61,7 @@ impl BaseNodeSpec for ManualSealNode { type Block = NodeSpec::Block; type RuntimeApi = NodeSpec::RuntimeApi; type BuildImportQueue = Self; + type WrapBlockImport = ClientBlockImport; } impl ManualSealNode { @@ -78,7 +86,7 @@ impl ManualSealNode { keystore_container, select_chain: _, transaction_pool, - other: (_, mut telemetry, _), + other: (_, mut telemetry, _, _), } = Self::new_partial(&config)?; let select_chain = LongestChain::new(backend.clone()); diff --git a/cumulus/test/service/src/lib.rs b/cumulus/test/service/src/lib.rs index 9234442d399c..fe4648996875 100644 --- a/cumulus/test/service/src/lib.rs +++ b/cumulus/test/service/src/lib.rs @@ -27,7 +27,10 @@ use cumulus_client_collator::service::CollatorService; use cumulus_client_consensus_aura::{ collators::{ lookahead::{self as aura, Params as AuraParams}, - slot_based::{self as slot_based, Params as SlotBasedParams}, + slot_based::{ + self as slot_based, Params as SlotBasedParams, SlotBasedBlockImport, + SlotBasedBlockImportHandle, + }, }, ImportQueueParams, }; @@ -131,7 +134,8 @@ pub type Client = TFullClient; /// The block-import type being used by the test service. -pub type ParachainBlockImport = TParachainBlockImport, Backend>; +pub type ParachainBlockImport = + TParachainBlockImport, Client>, Backend>; /// Transaction pool type used by the test service pub type TransactionPool = Arc>; @@ -184,7 +188,7 @@ pub type Service = PartialComponents< (), sc_consensus::import_queue::BasicQueue, sc_transaction_pool::TransactionPoolHandle, - ParachainBlockImport, + (ParachainBlockImport, SlotBasedBlockImportHandle), >; /// Starts a `ServiceBuilder` for a full service. @@ -217,7 +221,9 @@ pub fn new_partial( )?; let client = Arc::new(client); - let block_import = ParachainBlockImport::new(client.clone(), backend.clone()); + let (block_import, slot_based_handle) = + SlotBasedBlockImport::new(client.clone(), client.clone()); + let block_import = ParachainBlockImport::new(block_import, backend.clone()); let transaction_pool = Arc::from( sc_transaction_pool::Builder::new( @@ -260,7 +266,7 @@ pub fn new_partial( task_manager, transaction_pool, select_chain: (), - other: block_import, + other: (block_import, slot_based_handle), }; Ok(params) @@ -349,7 +355,8 @@ where let client = params.client.clone(); let backend = params.backend.clone(); - let block_import = params.other; + let block_import = params.other.0; + let slot_based_handle = params.other.1; let relay_chain_interface = build_relay_chain_interface( relay_chain_config, parachain_config.prometheus_registry(), @@ -497,6 +504,7 @@ where authoring_duration: Duration::from_millis(2000), reinitialize: false, slot_drift: Duration::from_secs(1), + block_import_handle: slot_based_handle, }; let (collation_future, block_builder_future) = From 444dad2c22275bb0ab401bb0e1d20cef5c4ceee6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Fri, 15 Nov 2024 10:28:32 +0100 Subject: [PATCH 2/5] Review comments --- Cargo.lock | 5 ++++ .../polkadot-omni-node/lib/src/common/spec.rs | 26 +++++++++---------- .../polkadot-omni-node/lib/src/nodes/aura.rs | 16 ++++++------ .../lib/src/nodes/manual_seal.rs | 2 +- 4 files changed, 27 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3d6262bfefb9..d35a93519757 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4496,6 +4496,8 @@ dependencies = [ "sp-keystore 0.34.0", "sp-runtime 31.0.1", "sp-state-machine 0.35.0", + "sp-timestamp 26.0.0", + "sp-trie 29.0.0", "substrate-prometheus-endpoint", "tokio", "tracing", @@ -17916,6 +17918,9 @@ dependencies = [ "serde", "serde_json", "sp-api 26.0.0", + "sp-block-builder 26.0.0", + "sp-consensus", + "sp-consensus-aura 0.32.0", "sp-core 28.0.0", "sp-genesis-builder 0.8.0", "sp-inherents 26.0.0", diff --git a/cumulus/polkadot-omni-node/lib/src/common/spec.rs b/cumulus/polkadot-omni-node/lib/src/common/spec.rs index d49dcf64b945..5a09e7dc77f9 100644 --- a/cumulus/polkadot-omni-node/lib/src/common/spec.rs +++ b/cumulus/polkadot-omni-node/lib/src/common/spec.rs @@ -59,7 +59,7 @@ pub(crate) trait BuildImportQueue< ) -> sc_service::error::Result>; } -pub(crate) trait StartConsensus +pub(crate) trait StartConsensus where RuntimeApi: ConstructNodeRuntimeApi>, { @@ -79,7 +79,7 @@ where announce_block: Arc>) + Send + Sync>, backend: Arc>, node_extra_args: NodeExtraArgs, - block_import_extra_return_value: BIExtraReturnValue, + block_import_extra_return_value: BIAuxiliaryData, ) -> Result<(), sc_service::Error>; } @@ -100,11 +100,11 @@ fn warn_if_slow_hardware(hwbench: &sc_sysinfo::HwBench) { pub(crate) trait InitBlockImport { type BlockImport: sc_consensus::BlockImport + Clone + Send + Sync; - type ExtraReturnValue; + type BlockImportAuxiliaryData; fn init_block_import( client: Arc>, - ) -> sc_service::error::Result<(Self::BlockImport, Self::ExtraReturnValue)>; + ) -> sc_service::error::Result<(Self::BlockImport, Self::BlockImportAuxiliaryData)>; } pub(crate) struct ClientBlockImport; @@ -114,11 +114,11 @@ where RuntimeApi: Send + ConstructNodeRuntimeApi>, { type BlockImport = Arc>; - type ExtraReturnValue = (); + type BlockImportAuxiliaryData = (); fn init_block_import( client: Arc>, - ) -> sc_service::error::Result<(Self::BlockImport, Self::ExtraReturnValue)> { + ) -> sc_service::error::Result<(Self::BlockImport, Self::BlockImportAuxiliaryData)> { Ok((client.clone(), ())) } } @@ -134,10 +134,10 @@ pub(crate) trait BaseNodeSpec { type BuildImportQueue: BuildImportQueue< Self::Block, Self::RuntimeApi, - >::BlockImport, + >::BlockImport, >; - type WrapBlockImport: InitBlockImport; + type InitBlockImport: self::InitBlockImport; /// Starts a `ServiceBuilder` for a full service. /// @@ -149,8 +149,8 @@ pub(crate) trait BaseNodeSpec { ParachainService< Self::Block, Self::RuntimeApi, - >::BlockImport, - >::ExtraReturnValue + >::BlockImport, + >::BlockImportAuxiliaryData > >{ let telemetry = config @@ -205,7 +205,7 @@ pub(crate) trait BaseNodeSpec { ); let (block_import, extra_return_value) = - Self::WrapBlockImport::init_block_import(client.clone())?; + Self::InitBlockImport::init_block_import(client.clone())?; let block_import = ParachainBlockImport::new(block_import, backend.clone()); @@ -240,8 +240,8 @@ pub(crate) trait NodeSpec: BaseNodeSpec { type StartConsensus: StartConsensus< Self::Block, Self::RuntimeApi, - >::BlockImport, - >::ExtraReturnValue, + >::BlockImport, + >::BlockImportAuxiliaryData, >; const SYBIL_RESISTANCE: CollatorSybilResistance; diff --git a/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs b/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs index 35bd78667a89..cec2c1c41947 100644 --- a/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs +++ b/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs @@ -167,12 +167,12 @@ where /// Uses the lookahead collator to support async backing. /// /// Start an aura powered parachain node. Some system chains use this. -pub(crate) struct AuraNode( - pub PhantomData<(Block, RuntimeApi, AuraId, StartConsensus, WrapBlockImport)>, +pub(crate) struct AuraNode( + pub PhantomData<(Block, RuntimeApi, AuraId, StartConsensus, InitBlockImport)>, ); -impl Default - for AuraNode +impl Default + for AuraNode { fn default() -> Self { Self(Default::default()) @@ -196,7 +196,7 @@ where type RuntimeApi = RuntimeApi; type BuildImportQueue = BuildRelayToAuraImportQueue; - type WrapBlockImport = InitBlockImport; + type InitBlockImport = InitBlockImport; } impl NodeSpec @@ -212,7 +212,7 @@ where Block, RuntimeApi, InitBlockImport::BlockImport, - InitBlockImport::ExtraReturnValue, + InitBlockImport::BlockImportAuxiliaryData, > + 'static, InitBlockImport: self::InitBlockImport + Send, InitBlockImport::BlockImport: @@ -407,11 +407,11 @@ where Arc>, ParachainClient, >; - type ExtraReturnValue = SlotBasedBlockImportHandle; + type BlockImportAuxiliaryData = SlotBasedBlockImportHandle; fn init_block_import( client: Arc>, - ) -> sc_service::error::Result<(Self::BlockImport, Self::ExtraReturnValue)> { + ) -> sc_service::error::Result<(Self::BlockImport, Self::BlockImportAuxiliaryData)> { Ok(SlotBasedBlockImport::new(client.clone(), client)) } } diff --git a/cumulus/polkadot-omni-node/lib/src/nodes/manual_seal.rs b/cumulus/polkadot-omni-node/lib/src/nodes/manual_seal.rs index 70255bd2bc86..8b7921da30c0 100644 --- a/cumulus/polkadot-omni-node/lib/src/nodes/manual_seal.rs +++ b/cumulus/polkadot-omni-node/lib/src/nodes/manual_seal.rs @@ -61,7 +61,7 @@ impl BaseNodeSpec for ManualSealNode { type Block = NodeSpec::Block; type RuntimeApi = NodeSpec::RuntimeApi; type BuildImportQueue = Self; - type WrapBlockImport = ClientBlockImport; + type InitBlockImport = ClientBlockImport; } impl ManualSealNode { From b7c57a1a10bf48ddc561ee868fa744d035c4f793 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Fri, 15 Nov 2024 14:09:44 +0100 Subject: [PATCH 3/5] Apply suggestions from code review Co-authored-by: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> --- cumulus/polkadot-omni-node/lib/src/common/spec.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cumulus/polkadot-omni-node/lib/src/common/spec.rs b/cumulus/polkadot-omni-node/lib/src/common/spec.rs index 5a09e7dc77f9..a21e6c90653a 100644 --- a/cumulus/polkadot-omni-node/lib/src/common/spec.rs +++ b/cumulus/polkadot-omni-node/lib/src/common/spec.rs @@ -204,7 +204,7 @@ pub(crate) trait BaseNodeSpec { .build(), ); - let (block_import, extra_return_value) = + let (block_import, block_import_auxiliary_data) = Self::InitBlockImport::init_block_import(client.clone())?; let block_import = ParachainBlockImport::new(block_import, backend.clone()); @@ -225,7 +225,7 @@ pub(crate) trait BaseNodeSpec { task_manager, transaction_pool, select_chain: (), - other: (block_import, telemetry, telemetry_worker_handle, extra_return_value), + other: (block_import, telemetry, telemetry_worker_handle, block_import_auxiliary_data), }) } } @@ -269,7 +269,7 @@ pub(crate) trait NodeSpec: BaseNodeSpec { block_import, mut telemetry, telemetry_worker_handle, - block_import_extra_return_value, + block_import_auxiliary_data, ) = params.other; let client = params.client.clone(); @@ -400,7 +400,7 @@ pub(crate) trait NodeSpec: BaseNodeSpec { announce_block, backend.clone(), node_extra_args, - block_import_extra_return_value, + block_import_auxiliary_data, )?; } From 1447ca79f4e73d86fe54ad76b5a76bdc66300ad7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Fri, 15 Nov 2024 15:19:57 +0100 Subject: [PATCH 4/5] Mention the issue --- .../consensus/aura/src/collators/slot_based/collation_task.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/collation_task.rs b/cumulus/client/consensus/aura/src/collators/slot_based/collation_task.rs index 62e6f820c45c..abaeb8319a40 100644 --- a/cumulus/client/consensus/aura/src/collators/slot_based/collation_task.rs +++ b/cumulus/client/consensus/aura/src/collators/slot_based/collation_task.rs @@ -96,6 +96,7 @@ pub async fn run_collation_task( }, block_import_msg = block_import_handle.next().fuse() => { // TODO: Implement me. + // Issue: https://github.com/paritytech/polkadot-sdk/issues/6495 let _ = block_import_msg; } } From e0a12549c0f06627e0036e62333d6c64cfddb9e2 Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Fri, 15 Nov 2024 19:50:25 +0000 Subject: [PATCH 5/5] Update from bkchr running command 'prdoc --audience node_dev --bump major' --- prdoc/pr_6481.prdoc | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 prdoc/pr_6481.prdoc diff --git a/prdoc/pr_6481.prdoc b/prdoc/pr_6481.prdoc new file mode 100644 index 000000000000..83ba0a32eb24 --- /dev/null +++ b/prdoc/pr_6481.prdoc @@ -0,0 +1,10 @@ +title: 'slot-based-collator: Implement dedicated block import' +doc: +- audience: Node Dev + description: |- + The `SlotBasedBlockImport` job is to collect the storage proofs of all blocks getting imported. These storage proofs alongside the block are being forwarded to the collation task. Right now they are just being thrown away. More logic will follow later. Basically this will be required to include multiple blocks into one `PoV` which will then be done by the collation task. +crates: +- name: cumulus-client-consensus-aura + bump: major +- name: polkadot-omni-node-lib + bump: major