Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

slot-based-collator: Implement dedicated block import #6481

Merged
merged 9 commits into from
Dec 13, 2024
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cumulus/client/consensus/aura/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ sp-blockchain = { workspace = true, default-features = true }
sp-consensus = { workspace = true, default-features = true }
sp-consensus-aura = { workspace = true, default-features = true }
sp-core = { workspace = true, default-features = true }
sp-trie = { workspace = true, default-features = true }
sp-inherents = { workspace = true, default-features = true }
sp-keystore = { workspace = true, default-features = true }
sp-runtime = { workspace = true, default-features = true }
Expand Down
144 changes: 144 additions & 0 deletions cumulus/client/consensus/aura/src/collators/slot_based/block_import.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Cumulus.

// Cumulus is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Cumulus is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.

use futures::{stream::FusedStream, StreamExt};
use sc_consensus::{BlockImport, StateAction};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_api::{ApiExt, CallApiAt, CallContext, Core, ProvideRuntimeApi, StorageProof};
use sp_runtime::traits::{Block as BlockT, Header as _};
use sp_trie::proof_size_extension::ProofSizeExt;
use std::sync::Arc;

/// Handle for receiving the block and the storage proof from the [`SlotBasedBlockImport`].
///
/// This handle should be passed to [`Params`](super::Params) or can also be dropped if the node is
/// not running as collator.
pub struct SlotBasedBlockImportHandle<Block> {
receiver: TracingUnboundedReceiver<(Block, StorageProof)>,
}

impl<Block> SlotBasedBlockImportHandle<Block> {
/// Returns the next item.
///
/// The future will never return when the internal channel is closed.
pub async fn next(&mut self) -> (Block, StorageProof) {
loop {
if self.receiver.is_terminated() {
futures::pending!()
} else if let Some(res) = self.receiver.next().await {
return res
}
}
}
}

/// Special block import for the slot based collator.
pub struct SlotBasedBlockImport<Block, BI, Client> {
inner: BI,
client: Arc<Client>,
sender: TracingUnboundedSender<(Block, StorageProof)>,
}

impl<Block, BI, Client> SlotBasedBlockImport<Block, BI, Client> {
/// Create a new instance.
///
/// The returned [`SlotBasedBlockImportHandle`] needs to be passed to the
/// [`Params`](super::Params), so that this block import instance can communicate with the
/// collation task. If the node is not running as a collator, just dropping the handle is fine.
pub fn new(inner: BI, client: Arc<Client>) -> (Self, SlotBasedBlockImportHandle<Block>) {
let (sender, receiver) = tracing_unbounded("SlotBasedBlockImportChannel", 1000);

(Self { sender, client, inner }, SlotBasedBlockImportHandle { receiver })
}
}

impl<Block, BI: Clone, Client> Clone for SlotBasedBlockImport<Block, BI, Client> {
fn clone(&self) -> Self {
Self { inner: self.inner.clone(), client: self.client.clone(), sender: self.sender.clone() }
}
}

#[async_trait::async_trait]
impl<Block, BI, Client> BlockImport<Block> for SlotBasedBlockImport<Block, BI, Client>
where
Block: BlockT,
BI: BlockImport<Block> + Send + Sync,
BI::Error: Into<sp_consensus::Error>,
Client: ProvideRuntimeApi<Block> + CallApiAt<Block> + Send + Sync,
Client::StateBackend: Send,
Client::Api: Core<Block>,
{
type Error = sp_consensus::Error;

async fn check_block(
&self,
block: sc_consensus::BlockCheckParams<Block>,
) -> Result<sc_consensus::ImportResult, Self::Error> {
self.inner.check_block(block).await.map_err(Into::into)
}

async fn import_block(
&self,
mut params: sc_consensus::BlockImportParams<Block>,
) -> Result<sc_consensus::ImportResult, Self::Error> {
// If the channel exists and it is required to execute the block, we will execute the block
// here. This is done to collect the storage proof and to prevent re-execution, we push
// downwards the state changes. `StateAction::ApplyChanges` is ignored, because it either
// means that the node produced the block itself or the block was imported via state sync.
if !self.sender.is_closed() && !matches!(params.state_action, StateAction::ApplyChanges(_))
{
let mut runtime_api = self.client.runtime_api();

runtime_api.set_call_context(CallContext::Onchain);

runtime_api.record_proof();
let recorder = runtime_api
.proof_recorder()
.expect("Proof recording is enabled in the line above; qed.");
runtime_api.register_extension(ProofSizeExt::new(recorder));
bkchr marked this conversation as resolved.
Show resolved Hide resolved

let parent_hash = *params.header.parent_hash();

let block = Block::new(params.header.clone(), params.body.clone().unwrap_or_default());

runtime_api
.execute_block(parent_hash, block.clone())
.map_err(|e| Box::new(e) as Box<_>)?;

let storage_proof =
runtime_api.extract_proof().expect("Proof recording was enabled above; qed");

let state = self.client.state_at(parent_hash).map_err(|e| Box::new(e) as Box<_>)?;
let gen_storage_changes = runtime_api
.into_storage_changes(&state, parent_hash)
.map_err(sp_consensus::Error::ChainLookup)?;

if params.header.state_root() != &gen_storage_changes.transaction_storage_root {
return Err(sp_consensus::Error::Other(Box::new(
sp_blockchain::Error::InvalidStateRoot,
)))
}

params.state_action = StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(
gen_storage_changes,
));

let _ = self.sender.unbounded_send((block, storage_proof));
}

self.inner.import_block(params).await.map_err(Into::into)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ pub struct Params<Block: BlockT, RClient, CS> {
pub collator_service: CS,
/// Receiver channel for communication with the block builder task.
pub collator_receiver: TracingUnboundedReceiver<CollatorMessage<Block>>,
/// The handle from the special slot based block import.
pub block_import_handle: super::SlotBasedBlockImportHandle<Block>,
}

/// Asynchronously executes the collation task for a parachain.
Expand All @@ -55,28 +57,49 @@ pub struct Params<Block: BlockT, RClient, CS> {
/// collations to the relay chain. It listens for new best relay chain block notifications and
/// handles collator messages. If our parachain is scheduled on a core and we have a candidate,
/// the task will build a collation and send it to the relay chain.
pub async fn run_collation_task<Block, RClient, CS>(mut params: Params<Block, RClient, CS>)
where
pub async fn run_collation_task<Block, RClient, CS>(
Params {
relay_client,
collator_key,
para_id,
reinitialize,
collator_service,
mut collator_receiver,
mut block_import_handle,
}: Params<Block, RClient, CS>,
) where
Block: BlockT,
CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
RClient: RelayChainInterface + Clone + 'static,
{
let Ok(mut overseer_handle) = params.relay_client.overseer_handle() else {
let Ok(mut overseer_handle) = relay_client.overseer_handle() else {
tracing::error!(target: LOG_TARGET, "Failed to get overseer handle.");
return
};

cumulus_client_collator::initialize_collator_subsystems(
&mut overseer_handle,
params.collator_key,
params.para_id,
params.reinitialize,
collator_key,
para_id,
reinitialize,
)
.await;

let collator_service = params.collator_service;
while let Some(collator_message) = params.collator_receiver.next().await {
handle_collation_message(collator_message, &collator_service, &mut overseer_handle).await;
loop {
futures::select! {
collator_message = collator_receiver.next() => {
let Some(message) = collator_message else {
return;
};

handle_collation_message(message, &collator_service, &mut overseer_handle).await;
},
block_import_msg = block_import_handle.next().fuse() => {
// TODO: Implement me.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is there some ticket number to refer here?

// Issue: https://github.com/paritytech/polkadot-sdk/issues/6495
let _ = block_import_msg;
}
}
}
}

Expand Down
11 changes: 9 additions & 2 deletions cumulus/client/consensus/aura/src/collators/slot_based/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,14 @@ use sp_keystore::KeystorePtr;
use sp_runtime::traits::{Block as BlockT, Member};
use std::{sync::Arc, time::Duration};

pub use block_import::{SlotBasedBlockImport, SlotBasedBlockImportHandle};

mod block_builder_task;
mod block_import;
mod collation_task;

/// Parameters for [`run`].
pub struct Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner> {
pub struct Params<Block, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner> {
/// Inherent data providers. Only non-consensus inherent data should be provided, i.e.
/// the timestamp, slot, and paras inherents should be omitted, as they are set by this
/// collator.
Expand Down Expand Up @@ -90,6 +93,8 @@ pub struct Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner
/// Drift slots by a fixed duration. This can be used to create more preferrable authoring
/// timings.
pub slot_drift: Duration,
/// The handle returned by [`SlotBasedBlockImport`].
pub block_import_handle: SlotBasedBlockImportHandle<Block>,
/// Spawner for spawning futures.
pub spawner: Spawner,
}
Expand All @@ -111,8 +116,9 @@ pub fn run<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spaw
authoring_duration,
reinitialize,
slot_drift,
block_import_handle,
spawner,
}: Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner>,
}: Params<Block, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner>,
) where
Block: BlockT,
Client: ProvideRuntimeApi<Block>
Expand Down Expand Up @@ -147,6 +153,7 @@ pub fn run<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spaw
reinitialize,
collator_service: collator_service.clone(),
collator_receiver: rx,
block_import_handle,
};

let collation_task_fut = run_collation_task::<Block, _, _>(collator_task_params);
Expand Down
1 change: 1 addition & 0 deletions cumulus/polkadot-omni-node/lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pallet-transaction-payment = { workspace = true, default-features = true }
pallet-transaction-payment-rpc-runtime-api = { workspace = true, default-features = true }
sp-inherents = { workspace = true, default-features = true }
sp-api = { workspace = true, default-features = true }
sp-consensus = { workspace = true, default-features = true }
sp-consensus-aura = { workspace = true, default-features = true }
sc-consensus-manual-seal = { workspace = true, default-features = true }
sc-sysinfo = { workspace = true, default-features = true }
Expand Down
Loading
Loading