Skip to content

Commit

Permalink
NodeBuilder::start_full_node
Browse files Browse the repository at this point in the history
  • Loading branch information
nanocryk committed Nov 7, 2023
1 parent 0355595 commit 83b963a
Show file tree
Hide file tree
Showing 2 changed files with 369 additions and 230 deletions.
177 changes: 140 additions & 37 deletions client/node-common/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,25 @@ use {
core::time::Duration,
core_extensions::TypeIdentity,
cumulus_client_cli::CollatorOptions,
cumulus_client_service::{build_relay_chain_interface, CollatorSybilResistance},
cumulus_client_service::{
build_relay_chain_interface, CollatorSybilResistance, StartFullNodeParams,
},
cumulus_primitives_core::ParaId,
cumulus_relay_chain_interface::RelayChainInterface,
frame_benchmarking_cli::SUBSTRATE_REFERENCE_HARDWARE,
futures::{channel::mpsc, FutureExt, Stream, StreamExt},
jsonrpsee::RpcModule,
polkadot_primitives::CollatorPair,
sc_client_api::Backend,
sc_consensus::{block_import, BlockImport, ImportQueue},
sc_consensus::{import_queue::ImportQueueService, BlockImport, ImportQueue},
sc_consensus_manual_seal::{
run_manual_seal, ConsensusDataProvider, EngineCommand, ManualSealParams,
},
sc_executor::{
HeapAllocStrategy, NativeElseWasmExecutor, NativeExecutionDispatch, WasmExecutor,
DEFAULT_HEAP_ALLOC_STRATEGY,
},
sc_network::{config::FullNetworkConfiguration, NetworkService},
sc_network::{config::FullNetworkConfiguration, NetworkBlock, NetworkService},
sc_network_sync::SyncingService,
sc_network_transactions::TransactionsHandlerController,
sc_rpc::{DenyUnsafe, SubscriptionTaskExecutor},
Expand Down Expand Up @@ -67,19 +69,7 @@ macro_rules! T {
[ConstructedRuntimeApi] => {
<RuntimeApi as ConstructRuntimeApi<Block, T![Client]>>::RuntimeApi
};
[Where] => {
Block: cumulus_primitives_core::BlockT,
ParachainNativeExecutor: NativeExecutionDispatch + 'static,
RuntimeApi: ConstructRuntimeApi<Block, T![Client]> + Sync + Send + 'static,
T![ConstructedRuntimeApi]: TaggedTransactionQueue<Block> + BlockBuilder<Block>,
}
}

pub struct Network<Block: cumulus_primitives_core::BlockT> {
pub network: Arc<NetworkService<Block, Block::Hash>>,
pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<Block>>,
pub start_network: NetworkStarter,
pub sync_service: Arc<SyncingService<Block>>,
[ImportQueueService] => { Box<dyn ImportQueueService<Block>> }
}

// `Cumulus` and `TxHandler` are types that will change during the life of
Expand All @@ -104,10 +94,16 @@ pub struct NodeBuilder<
// each node. For that reason it is a `()` when calling `new`, then the
// caller create the `import_queue` using systems contained in `NodeBuilder`,
// then call `build_cumulus_network` with it to generate the cumulus systems.
Network = (),
SNetwork = (),
// The `TxHandler` is constructed in `build_X_network`
// and is then consumed when calling `spawn_common_tasks`.
TxHandler = (),
STxHandler = (),
// The import queue service is obtained from the import queue in
// `build_cumulus_network` or `build_substrate_network`, which also
// consumes the import queue. Neither of them are clonable, so we need to
// to store the service here to be able to consume it later in
// `start_full_node`.
SImportQueueService = (),
> where
Block: cumulus_primitives_core::BlockT,
ParachainNativeExecutor: NativeExecutionDispatch + 'static,
Expand All @@ -125,15 +121,23 @@ pub struct NodeBuilder<
pub hwbench: Option<sc_sysinfo::HwBench>,
pub prometheus_registry: Option<substrate_prometheus_endpoint::Registry>,

pub network: Network,
pub tx_handler_controller: TxHandler,
pub network: SNetwork,
pub tx_handler_controller: STxHandler,
pub import_queue_service: SImportQueueService,
}

pub struct Network<Block: cumulus_primitives_core::BlockT> {
pub network: Arc<NetworkService<Block, Block::Hash>>,
pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<Block>>,
pub start_network: NetworkStarter,
pub sync_service: Arc<SyncingService<Block>>,
}

// `new` function doesn't take self, and the Rust compiler cannot infer that
// only one type T implements `TypeIdentity`. With thus need a separate impl
// block with concrete types `()`.
impl<Block, RuntimeApi, ParachainNativeExecutor>
NodeBuilder<Block, RuntimeApi, ParachainNativeExecutor, (), ()>
NodeBuilder<Block, RuntimeApi, ParachainNativeExecutor, (), (), ()>
where
Block: cumulus_primitives_core::BlockT,
ParachainNativeExecutor: NativeExecutionDispatch + 'static,
Expand Down Expand Up @@ -220,12 +224,20 @@ where
prometheus_registry: parachain_config.prometheus_registry().cloned(),
network: TypeIdentity::from_type(()),
tx_handler_controller: TypeIdentity::from_type(()),
import_queue_service: TypeIdentity::from_type(()),
})
}
}

impl<Block, RuntimeApi, ParachainNativeExecutor, NetworkT, TxHandler>
NodeBuilder<Block, RuntimeApi, ParachainNativeExecutor, NetworkT, TxHandler>
impl<Block, RuntimeApi, ParachainNativeExecutor, SNetwork, STxHandler, SImportQueueService>
NodeBuilder<
Block,
RuntimeApi,
ParachainNativeExecutor,
SNetwork,
STxHandler,
SImportQueueService,
>
where
Block: cumulus_primitives_core::BlockT,
ParachainNativeExecutor: NativeExecutionDispatch + 'static,
Expand Down Expand Up @@ -275,11 +287,13 @@ where
ParachainNativeExecutor,
Network<Block>,
TransactionsHandlerController<Block::Hash>,
T![ImportQueueService],
>,
>
where
NetworkT: TypeIdentity<Type = ()>,
TxHandler: TypeIdentity<Type = ()>,
SNetwork: TypeIdentity<Type = ()>,
STxHandler: TypeIdentity<Type = ()>,
SImportQueueService: TypeIdentity<Type = ()>,
RCInterface: RelayChainInterface + Clone + 'static,
{
let Self {
Expand All @@ -294,9 +308,11 @@ where
prometheus_registry,
network: _,
tx_handler_controller: _,
import_queue_service: _,
} = self;

let net_config = FullNetworkConfiguration::new(&parachain_config.network);
let import_queue_service = import_queue.service();

let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) =
cumulus_client_service::build_network(cumulus_client_service::BuildNetworkParams {
Expand Down Expand Up @@ -329,6 +345,7 @@ where
sync_service,
},
tx_handler_controller,
import_queue_service,
})
}

Expand All @@ -349,11 +366,13 @@ where
ParachainNativeExecutor,
Network<Block>,
TransactionsHandlerController<Block::Hash>,
T![ImportQueueService],
>,
>
where
NetworkT: TypeIdentity<Type = ()>,
TxHandler: TypeIdentity<Type = ()>,
SNetwork: TypeIdentity<Type = ()>,
STxHandler: TypeIdentity<Type = ()>,
SImportQueueService: TypeIdentity<Type = ()>,
{
let Self {
client,
Expand All @@ -367,9 +386,11 @@ where
prometheus_registry,
network: _,
tx_handler_controller: _,
import_queue_service: _,
} = self;

let net_config = FullNetworkConfiguration::new(&parachain_config.network);
let import_queue_service = import_queue.service();

let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) =
sc_service::build_network(sc_service::BuildNetworkParams {
Expand Down Expand Up @@ -400,6 +421,7 @@ where
sync_service,
},
tx_handler_controller,
import_queue_service,
})
}

Expand All @@ -418,11 +440,18 @@ where
) -> Result<RpcModule<TRpc>, sc_service::Error>,
>,
) -> sc_service::error::Result<
NodeBuilder<Block, RuntimeApi, ParachainNativeExecutor, Network<Block>, ()>,
NodeBuilder<
Block,
RuntimeApi,
ParachainNativeExecutor,
Network<Block>,
(),
SImportQueueService,
>,
>
where
NetworkT: TypeIdentity<Type = Network<Block>>,
TxHandler: TypeIdentity<Type = TransactionsHandlerController<Block::Hash>>,
SNetwork: TypeIdentity<Type = Network<Block>>,
STxHandler: TypeIdentity<Type = TransactionsHandlerController<Block::Hash>>,
Block::Hash: Unpin,
Block::Header: Unpin,
T![ConstructedRuntimeApi]: TaggedTransactionQueue<Block>
Expand All @@ -441,11 +470,12 @@ where
keystore_container,
hwbench,
prometheus_registry,
network: cumulus,
network,
tx_handler_controller,
import_queue_service,
} = self;

let cumulus = TypeIdentity::into_type(cumulus);
let network = TypeIdentity::into_type(network);
let tx_handler_controller = TypeIdentity::into_type(tx_handler_controller);

let collator = parachain_config.role.is_authority();
Expand All @@ -461,7 +491,7 @@ where
transaction_pool: Some(OffchainTransactionPoolFactory::new(
transaction_pool.clone(),
)),
network_provider: cumulus.network.clone(),
network_provider: network.network.clone(),
is_validator: parachain_config.role.is_authority(),
enable_http_requests: false,
custom_extensions: move |_| vec![],
Expand All @@ -479,11 +509,11 @@ where
config: parachain_config,
keystore: keystore_container.keystore(),
backend: backend.clone(),
network: cumulus.network.clone(),
system_rpc_tx: cumulus.system_rpc_tx.clone(),
network: network.network.clone(),
system_rpc_tx: network.system_rpc_tx.clone(),
tx_handler_controller,
telemetry: telemetry.as_mut(),
sync_service: cumulus.sync_service.clone(),
sync_service: network.sync_service.clone(),
})?;

if let Some(hwbench) = &hwbench {
Expand Down Expand Up @@ -517,8 +547,9 @@ where
keystore_container,
hwbench,
prometheus_registry,
network: TypeIdentity::from_type(cumulus),
network: TypeIdentity::from_type(network),
tx_handler_controller: TypeIdentity::from_type(()),
import_queue_service,
})
}

Expand Down Expand Up @@ -612,6 +643,78 @@ where

Ok(command_sink)
}

pub fn start_full_node<'a, RCInterface>(
self,
para_id: ParaId,
relay_chain_interface: RCInterface,
relay_chain_slot_duration: Duration,
) -> sc_service::error::Result<
NodeBuilder<Block, RuntimeApi, ParachainNativeExecutor, SNetwork, STxHandler, ()>,
>
where
SNetwork: TypeIdentity<Type = Network<Block>>,
SImportQueueService: TypeIdentity<Type = T![ImportQueueService]>,
RCInterface: RelayChainInterface + Clone + 'static,
{
let NodeBuilder {
client,
backend,
transaction_pool,
telemetry,
telemetry_worker_handle,
mut task_manager,
keystore_container,
hwbench,
prometheus_registry,
network,
tx_handler_controller,
import_queue_service,
} = self;

let network = TypeIdentity::into_type(network);
let import_queue_service = TypeIdentity::into_type(import_queue_service);

let announce_block = {
let sync_service = network.sync_service.clone();
Arc::new(move |hash, data| sync_service.announce_block(hash, data))
};

let overseer_handle = relay_chain_interface
.overseer_handle()
.map_err(|e| sc_service::Error::Application(Box::new(e)))?;

let params = StartFullNodeParams {
client: client.clone(),
announce_block,
task_manager: &mut task_manager,
para_id,
relay_chain_interface: relay_chain_interface.clone(),
relay_chain_slot_duration,
import_queue: import_queue_service,
recovery_handle: Box::new(overseer_handle),
sync_service: network.sync_service.clone(),
};

// TODO: change for async backing
#[allow(deprecated)]
cumulus_client_service::start_full_node(params)?;

Ok(NodeBuilder {
client,
backend,
transaction_pool,
telemetry,
telemetry_worker_handle,
task_manager,
keystore_container,
hwbench,
prometheus_registry,
network: TypeIdentity::from_type(network),
tx_handler_controller,
import_queue_service: (),
})
}
}

/// Block authoring scheme to be used by the dev service.
Expand Down
Loading

0 comments on commit 83b963a

Please sign in to comment.