Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Companion for paritytech/substrate#12764 #1930

Merged
merged 7 commits into from
Dec 10, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions client/pov-recovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
//! make sure that the blocks are imported in the correct order.

use sc_client_api::{BlockBackend, BlockchainEvents, UsageProvider};
use sc_consensus::import_queue::{ImportQueue, IncomingBlock};
use sc_consensus::import_queue::{ImportQueueService, IncomingBlock};
use sp_consensus::{BlockOrigin, BlockStatus};
use sp_runtime::{
generic::BlockId,
Expand Down Expand Up @@ -103,7 +103,7 @@ impl RecoveryDelay {
}

/// Encapsulates the logic of the pov recovery.
pub struct PoVRecovery<Block: BlockT, PC, IQ, RC> {
pub struct PoVRecovery<Block: BlockT, PC, RC> {
/// All the pending candidates that we are waiting for to be imported or that need to be
/// recovered when `next_candidate_to_recover` tells us to do so.
pending_candidates: HashMap<Block::Hash, PendingCandidate<Block>>,
Expand All @@ -119,23 +119,22 @@ pub struct PoVRecovery<Block: BlockT, PC, IQ, RC> {
waiting_for_parent: HashMap<Block::Hash, Vec<Block>>,
recovery_delay: RecoveryDelay,
parachain_client: Arc<PC>,
parachain_import_queue: IQ,
parachain_import_queue: Box<dyn ImportQueueService<Block>>,
relay_chain_interface: RC,
para_id: ParaId,
}

impl<Block: BlockT, PC, IQ, RCInterface> PoVRecovery<Block, PC, IQ, RCInterface>
impl<Block: BlockT, PC, RCInterface> PoVRecovery<Block, PC, RCInterface>
where
PC: BlockBackend<Block> + BlockchainEvents<Block> + UsageProvider<Block>,
RCInterface: RelayChainInterface + Clone,
IQ: ImportQueue<Block>,
{
/// Create a new instance.
pub fn new(
overseer_handle: OverseerHandle,
recovery_delay: RecoveryDelay,
parachain_client: Arc<PC>,
parachain_import_queue: IQ,
parachain_import_queue: Box<dyn ImportQueueService<Block>>,
relay_chain_interface: RCInterface,
para_id: ParaId,
) -> Self {
Expand Down
59 changes: 27 additions & 32 deletions client/relay-chain-minimal-node/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use polkadot_node_network_protocol::PeerId;
use sc_network::{NetworkService, SyncState};

use sc_client_api::HeaderBackend;
use sc_consensus::{BlockImportError, BlockImportStatus, JustificationSyncLink, Link};
use sc_network_common::{
config::{
NonDefaultSetConfig, NonReservedPeerMode, NotificationHandshake, ProtocolId, SetConfig,
Expand All @@ -29,14 +30,12 @@ use sc_network_common::{
service::NetworkSyncForkRequest,
sync::{
message::{BlockAnnouncesHandshake, BlockRequest},
Metrics, SyncStatus,
BadPeer, Metrics, OnBlockData, PollBlockAnnounceValidation, SyncStatus,
},
};
use sc_network_light::light_client_requests;
use sc_network_sync::{block_request_handler, state_request_handler};
use sc_service::{error::Error, Configuration, NetworkStarter, SpawnTaskHandle};
use sp_consensus::BlockOrigin;
use sp_runtime::Justifications;

use std::{iter, sync::Arc};

Expand Down Expand Up @@ -92,7 +91,6 @@ pub(crate) fn build_collator_network(
chain_sync: Box::new(chain_sync),
network_config: config.network.clone(),
chain: client.clone(),
import_queue: Box::new(DummyImportQueue),
protocol_id,
metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()),
block_announce_config,
Expand Down Expand Up @@ -324,12 +322,7 @@ impl<B: BlockT> sc_network_common::sync::ChainSync<B> for DummyChainSync {
std::task::Poll::Pending
}

fn peer_disconnected(
&mut self,
_who: &PeerId,
) -> Option<sc_network_common::sync::OnBlockData<B>> {
None
}
fn peer_disconnected(&mut self, _who: &PeerId) {}

fn metrics(&self) -> sc_network_common::sync::Metrics {
Metrics {
Expand All @@ -355,7 +348,7 @@ impl<B: BlockT> sc_network_common::sync::ChainSync<B> for DummyChainSync {
fn poll(
&mut self,
_cx: &mut std::task::Context,
) -> std::task::Poll<sc_network_common::sync::PollResult<B>> {
) -> std::task::Poll<PollBlockAnnounceValidation<B::Header>> {
std::task::Poll::Pending
}

Expand All @@ -366,37 +359,39 @@ impl<B: BlockT> sc_network_common::sync::ChainSync<B> for DummyChainSync {
fn num_active_peers(&self) -> usize {
0
}

fn process_block_response_data(&mut self, _blocks_to_import: Result<OnBlockData<B>, BadPeer>) {}
}

struct DummyImportQueue;
struct DummyChainSyncService<B>(std::marker::PhantomData<B>);

impl sc_service::ImportQueue<Block> for DummyImportQueue {
fn import_blocks(
&mut self,
_origin: BlockOrigin,
_blocks: Vec<sc_consensus::IncomingBlock<Block>>,
) {
}
impl<B: BlockT> NetworkSyncForkRequest<B::Hash, NumberFor<B>> for DummyChainSyncService<B> {
fn set_sync_fork_request(&self, _peers: Vec<PeerId>, _hash: B::Hash, _number: NumberFor<B>) {}
}

impl<B: BlockT> JustificationSyncLink<B> for DummyChainSyncService<B> {
fn request_justification(&self, _hash: &B::Hash, _number: NumberFor<B>) {}

fn import_justifications(
fn clear_justification_requests(&self) {}
}

impl<B: BlockT> Link<B> for DummyChainSyncService<B> {
fn blocks_processed(
&mut self,
_who: PeerId,
_hash: Hash,
_number: NumberFor<Block>,
_justifications: Justifications,
_imported: usize,
_count: usize,
_results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
) {
}

fn poll_actions(
fn justification_imported(
&mut self,
_cx: &mut futures::task::Context,
_link: &mut dyn sc_consensus::import_queue::Link<Block>,
_who: PeerId,
_hash: &B::Hash,
_number: NumberFor<B>,
_success: bool,
) {
}
}

struct DummyChainSyncService<B>(std::marker::PhantomData<B>);

impl<B: BlockT> NetworkSyncForkRequest<B::Hash, NumberFor<B>> for DummyChainSyncService<B> {
fn set_sync_fork_request(&self, _peers: Vec<PeerId>, _hash: B::Hash, _number: NumberFor<B>) {}
fn request_justification(&mut self, _hash: &B::Hash, _number: NumberFor<B>) {}
}
62 changes: 10 additions & 52 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,16 @@ use polkadot_primitives::v2::CollatorPair;
use sc_client_api::{
Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, UsageProvider,
};
use sc_consensus::{
import_queue::{ImportQueue, IncomingBlock, Link, RuntimeOrigin},
BlockImport,
};
use sc_consensus::{import_queue::ImportQueueService, BlockImport};
use sc_service::{Configuration, TaskManager};
use sp_api::ProvideRuntimeApi;
use sp_blockchain::HeaderBackend;
use sp_consensus::BlockOrigin;
use sp_core::traits::SpawnNamed;
use sp_runtime::{
traits::{Block as BlockT, NumberFor},
Justifications,
};
use sp_runtime::traits::Block as BlockT;
use std::{sync::Arc, time::Duration};

/// Parameters given to [`start_collator`].
pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, RCInterface, Spawner, IQ> {
pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, RCInterface, Spawner> {
pub block_status: Arc<BS>,
pub client: Arc<Client>,
pub announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
Expand All @@ -50,7 +43,7 @@ pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, RCInterface, Spawn
pub relay_chain_interface: RCInterface,
pub task_manager: &'a mut TaskManager,
pub parachain_consensus: Box<dyn ParachainConsensus<Block>>,
pub import_queue: IQ,
pub import_queue: Box<dyn ImportQueueService<Block>>,
pub collator_key: CollatorPair,
pub relay_chain_slot_duration: Duration,
}
Expand All @@ -60,7 +53,7 @@ pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, RCInterface, Spawn
/// A collator is similar to a validator in a normal blockchain.
/// It is responsible for producing blocks and sending the blocks to a
/// parachain validator for validation and inclusion into the relay chain.
pub async fn start_collator<'a, Block, BS, Client, Backend, RCInterface, Spawner, IQ>(
pub async fn start_collator<'a, Block, BS, Client, Backend, RCInterface, Spawner>(
StartCollatorParams {
block_status,
client,
Expand All @@ -73,7 +66,7 @@ pub async fn start_collator<'a, Block, BS, Client, Backend, RCInterface, Spawner
import_queue,
collator_key,
relay_chain_slot_duration,
}: StartCollatorParams<'a, Block, BS, Client, RCInterface, Spawner, IQ>,
}: StartCollatorParams<'a, Block, BS, Client, RCInterface, Spawner>,
) -> sc_service::error::Result<()>
where
Block: BlockT,
Expand All @@ -92,7 +85,6 @@ where
Spawner: SpawnNamed + Clone + Send + Sync + 'static,
RCInterface: RelayChainInterface + Clone + 'static,
Backend: BackendT<Block> + 'static,
IQ: ImportQueue<Block> + 'static,
{
let consensus = cumulus_client_consensus_common::run_parachain_consensus(
para_id,
Expand Down Expand Up @@ -139,21 +131,21 @@ where
}

/// Parameters given to [`start_full_node`].
pub struct StartFullNodeParams<'a, Block: BlockT, Client, RCInterface, IQ> {
pub struct StartFullNodeParams<'a, Block: BlockT, Client, RCInterface> {
pub para_id: ParaId,
pub client: Arc<Client>,
pub relay_chain_interface: RCInterface,
pub task_manager: &'a mut TaskManager,
pub announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
pub relay_chain_slot_duration: Duration,
pub import_queue: IQ,
pub import_queue: Box<dyn ImportQueueService<Block>>,
}

/// Start a full node for a parachain.
///
/// A full node will only sync the given parachain and will follow the
/// tip of the chain.
pub fn start_full_node<Block, Client, Backend, RCInterface, IQ>(
pub fn start_full_node<Block, Client, Backend, RCInterface>(
StartFullNodeParams {
client,
announce_block,
Expand All @@ -162,7 +154,7 @@ pub fn start_full_node<Block, Client, Backend, RCInterface, IQ>(
para_id,
relay_chain_slot_duration,
import_queue,
}: StartFullNodeParams<Block, Client, RCInterface, IQ>,
}: StartFullNodeParams<Block, Client, RCInterface>,
) -> sc_service::error::Result<()>
where
Block: BlockT,
Expand All @@ -176,7 +168,6 @@ where
for<'a> &'a Client: BlockImport<Block>,
Backend: BackendT<Block> + 'static,
RCInterface: RelayChainInterface + Clone + 'static,
IQ: ImportQueue<Block> + 'static,
{
let consensus = cumulus_client_consensus_common::run_parachain_consensus(
para_id,
Expand Down Expand Up @@ -226,36 +217,3 @@ pub fn prepare_node_config(mut parachain_config: Configuration) -> Configuration

parachain_config
}

/// A shared import queue
///
/// This is basically a hack until the Substrate side is implemented properly.
#[derive(Clone)]
pub struct SharedImportQueue<Block: BlockT>(Arc<parking_lot::Mutex<dyn ImportQueue<Block>>>);

impl<Block: BlockT> SharedImportQueue<Block> {
/// Create a new instance of the shared import queue.
pub fn new<IQ: ImportQueue<Block> + 'static>(import_queue: IQ) -> Self {
Self(Arc::new(parking_lot::Mutex::new(import_queue)))
}
}

impl<Block: BlockT> ImportQueue<Block> for SharedImportQueue<Block> {
fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<Block>>) {
self.0.lock().import_blocks(origin, blocks)
}

fn import_justifications(
&mut self,
who: RuntimeOrigin,
hash: Block::Hash,
number: NumberFor<Block>,
justifications: Justifications,
) {
self.0.lock().import_justifications(who, hash, number, justifications)
}

fn poll_actions(&mut self, cx: &mut std::task::Context, link: &mut dyn Link<Block>) {
self.0.lock().poll_actions(cx, link)
}
}
10 changes: 6 additions & 4 deletions parachain-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayC
use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node;

// Substrate Imports
use sc_consensus::ImportQueue;
use sc_executor::NativeElseWasmExecutor;
use sc_network::NetworkService;
use sc_network_common::service::NetworkBlock;
Expand Down Expand Up @@ -196,14 +197,15 @@ async fn start_node_impl(
let validator = parachain_config.role.is_authority();
let prometheus_registry = parachain_config.prometheus_registry().cloned();
let transaction_pool = params.transaction_pool.clone();
let import_queue = cumulus_client_service::SharedImportQueue::new(params.import_queue);
let import_queue_service = params.import_queue.service();

let (network, system_rpc_tx, tx_handler_controller, start_network) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &parachain_config,
client: client.clone(),
transaction_pool: transaction_pool.clone(),
spawn_handle: task_manager.spawn_handle(),
import_queue: import_queue.clone(),
import_queue: params.import_queue,
block_announce_validator_builder: Some(Box::new(|_| {
Box::new(block_announce_validator)
})),
Expand Down Expand Up @@ -293,7 +295,7 @@ async fn start_node_impl(
relay_chain_interface,
spawner,
parachain_consensus,
import_queue,
import_queue: import_queue_service,
collator_key: collator_key.expect("Command line arguments do not allow this. qed"),
relay_chain_slot_duration,
};
Expand All @@ -307,7 +309,7 @@ async fn start_node_impl(
para_id: id,
relay_chain_interface,
relay_chain_slot_duration,
import_queue,
import_queue: import_queue_service,
};

start_full_node(params)?;
Expand Down
Loading