diff --git a/client/node-common/src/service.rs b/client/node-common/src/service.rs index 3c214252d..8a000e084 100644 --- a/client/node-common/src/service.rs +++ b/client/node-common/src/service.rs @@ -19,7 +19,9 @@ use { core::time::Duration, core_extensions::TypeIdentity, cumulus_client_cli::CollatorOptions, - cumulus_client_service::{build_relay_chain_interface, CollatorSybilResistance}, + cumulus_client_service::{ + build_relay_chain_interface, CollatorSybilResistance, StartFullNodeParams, + }, cumulus_primitives_core::ParaId, cumulus_relay_chain_interface::RelayChainInterface, frame_benchmarking_cli::SUBSTRATE_REFERENCE_HARDWARE, @@ -27,7 +29,7 @@ use { jsonrpsee::RpcModule, polkadot_primitives::CollatorPair, sc_client_api::Backend, - sc_consensus::{block_import, BlockImport, ImportQueue}, + sc_consensus::{import_queue::ImportQueueService, BlockImport, ImportQueue}, sc_consensus_manual_seal::{ run_manual_seal, ConsensusDataProvider, EngineCommand, ManualSealParams, }, @@ -35,7 +37,7 @@ use { HeapAllocStrategy, NativeElseWasmExecutor, NativeExecutionDispatch, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY, }, - sc_network::{config::FullNetworkConfiguration, NetworkService}, + sc_network::{config::FullNetworkConfiguration, NetworkBlock, NetworkService}, sc_network_sync::SyncingService, sc_network_transactions::TransactionsHandlerController, sc_rpc::{DenyUnsafe, SubscriptionTaskExecutor}, @@ -67,19 +69,7 @@ macro_rules! T { [ConstructedRuntimeApi] => { >::RuntimeApi }; - [Where] => { - Block: cumulus_primitives_core::BlockT, - ParachainNativeExecutor: NativeExecutionDispatch + 'static, - RuntimeApi: ConstructRuntimeApi + Sync + Send + 'static, - T![ConstructedRuntimeApi]: TaggedTransactionQueue + BlockBuilder, - } -} - -pub struct Network { - pub network: Arc>, - pub system_rpc_tx: TracingUnboundedSender>, - pub start_network: NetworkStarter, - pub sync_service: Arc>, + [ImportQueueService] => { Box> } } // `Cumulus` and `TxHandler` are types that will change during the life of @@ -104,10 +94,16 @@ pub struct NodeBuilder< // each node. For that reason it is a `()` when calling `new`, then the // caller create the `import_queue` using systems contained in `NodeBuilder`, // then call `build_cumulus_network` with it to generate the cumulus systems. - Network = (), + SNetwork = (), // The `TxHandler` is constructed in `build_X_network` // and is then consumed when calling `spawn_common_tasks`. - TxHandler = (), + STxHandler = (), + // The import queue service is obtained from the import queue in + // `build_cumulus_network` or `build_substrate_network`, which also + // consumes the import queue. Neither of them are clonable, so we need to + // to store the service here to be able to consume it later in + // `start_full_node`. + SImportQueueService = (), > where Block: cumulus_primitives_core::BlockT, ParachainNativeExecutor: NativeExecutionDispatch + 'static, @@ -125,15 +121,23 @@ pub struct NodeBuilder< pub hwbench: Option, pub prometheus_registry: Option, - pub network: Network, - pub tx_handler_controller: TxHandler, + pub network: SNetwork, + pub tx_handler_controller: STxHandler, + pub import_queue_service: SImportQueueService, +} + +pub struct Network { + pub network: Arc>, + pub system_rpc_tx: TracingUnboundedSender>, + pub start_network: NetworkStarter, + pub sync_service: Arc>, } // `new` function doesn't take self, and the Rust compiler cannot infer that // only one type T implements `TypeIdentity`. With thus need a separate impl // block with concrete types `()`. impl - NodeBuilder + NodeBuilder where Block: cumulus_primitives_core::BlockT, ParachainNativeExecutor: NativeExecutionDispatch + 'static, @@ -220,12 +224,20 @@ where prometheus_registry: parachain_config.prometheus_registry().cloned(), network: TypeIdentity::from_type(()), tx_handler_controller: TypeIdentity::from_type(()), + import_queue_service: TypeIdentity::from_type(()), }) } } -impl - NodeBuilder +impl + NodeBuilder< + Block, + RuntimeApi, + ParachainNativeExecutor, + SNetwork, + STxHandler, + SImportQueueService, + > where Block: cumulus_primitives_core::BlockT, ParachainNativeExecutor: NativeExecutionDispatch + 'static, @@ -275,11 +287,13 @@ where ParachainNativeExecutor, Network, TransactionsHandlerController, + T![ImportQueueService], >, > where - NetworkT: TypeIdentity, - TxHandler: TypeIdentity, + SNetwork: TypeIdentity, + STxHandler: TypeIdentity, + SImportQueueService: TypeIdentity, RCInterface: RelayChainInterface + Clone + 'static, { let Self { @@ -294,9 +308,11 @@ where prometheus_registry, network: _, tx_handler_controller: _, + import_queue_service: _, } = self; let net_config = FullNetworkConfiguration::new(¶chain_config.network); + let import_queue_service = import_queue.service(); let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) = cumulus_client_service::build_network(cumulus_client_service::BuildNetworkParams { @@ -329,6 +345,7 @@ where sync_service, }, tx_handler_controller, + import_queue_service, }) } @@ -349,11 +366,13 @@ where ParachainNativeExecutor, Network, TransactionsHandlerController, + T![ImportQueueService], >, > where - NetworkT: TypeIdentity, - TxHandler: TypeIdentity, + SNetwork: TypeIdentity, + STxHandler: TypeIdentity, + SImportQueueService: TypeIdentity, { let Self { client, @@ -367,9 +386,11 @@ where prometheus_registry, network: _, tx_handler_controller: _, + import_queue_service: _, } = self; let net_config = FullNetworkConfiguration::new(¶chain_config.network); + let import_queue_service = import_queue.service(); let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) = sc_service::build_network(sc_service::BuildNetworkParams { @@ -400,6 +421,7 @@ where sync_service, }, tx_handler_controller, + import_queue_service, }) } @@ -418,11 +440,18 @@ where ) -> Result, sc_service::Error>, >, ) -> sc_service::error::Result< - NodeBuilder, ()>, + NodeBuilder< + Block, + RuntimeApi, + ParachainNativeExecutor, + Network, + (), + SImportQueueService, + >, > where - NetworkT: TypeIdentity>, - TxHandler: TypeIdentity>, + SNetwork: TypeIdentity>, + STxHandler: TypeIdentity>, Block::Hash: Unpin, Block::Header: Unpin, T![ConstructedRuntimeApi]: TaggedTransactionQueue @@ -441,11 +470,12 @@ where keystore_container, hwbench, prometheus_registry, - network: cumulus, + network, tx_handler_controller, + import_queue_service, } = self; - let cumulus = TypeIdentity::into_type(cumulus); + let network = TypeIdentity::into_type(network); let tx_handler_controller = TypeIdentity::into_type(tx_handler_controller); let collator = parachain_config.role.is_authority(); @@ -461,7 +491,7 @@ where transaction_pool: Some(OffchainTransactionPoolFactory::new( transaction_pool.clone(), )), - network_provider: cumulus.network.clone(), + network_provider: network.network.clone(), is_validator: parachain_config.role.is_authority(), enable_http_requests: false, custom_extensions: move |_| vec![], @@ -479,11 +509,11 @@ where config: parachain_config, keystore: keystore_container.keystore(), backend: backend.clone(), - network: cumulus.network.clone(), - system_rpc_tx: cumulus.system_rpc_tx.clone(), + network: network.network.clone(), + system_rpc_tx: network.system_rpc_tx.clone(), tx_handler_controller, telemetry: telemetry.as_mut(), - sync_service: cumulus.sync_service.clone(), + sync_service: network.sync_service.clone(), })?; if let Some(hwbench) = &hwbench { @@ -517,8 +547,9 @@ where keystore_container, hwbench, prometheus_registry, - network: TypeIdentity::from_type(cumulus), + network: TypeIdentity::from_type(network), tx_handler_controller: TypeIdentity::from_type(()), + import_queue_service, }) } @@ -612,6 +643,78 @@ where Ok(command_sink) } + + pub fn start_full_node<'a, RCInterface>( + self, + para_id: ParaId, + relay_chain_interface: RCInterface, + relay_chain_slot_duration: Duration, + ) -> sc_service::error::Result< + NodeBuilder, + > + where + SNetwork: TypeIdentity>, + SImportQueueService: TypeIdentity, + RCInterface: RelayChainInterface + Clone + 'static, + { + let NodeBuilder { + client, + backend, + transaction_pool, + telemetry, + telemetry_worker_handle, + mut task_manager, + keystore_container, + hwbench, + prometheus_registry, + network, + tx_handler_controller, + import_queue_service, + } = self; + + let network = TypeIdentity::into_type(network); + let import_queue_service = TypeIdentity::into_type(import_queue_service); + + let announce_block = { + let sync_service = network.sync_service.clone(); + Arc::new(move |hash, data| sync_service.announce_block(hash, data)) + }; + + let overseer_handle = relay_chain_interface + .overseer_handle() + .map_err(|e| sc_service::Error::Application(Box::new(e)))?; + + let params = StartFullNodeParams { + client: client.clone(), + announce_block, + task_manager: &mut task_manager, + para_id, + relay_chain_interface: relay_chain_interface.clone(), + relay_chain_slot_duration, + import_queue: import_queue_service, + recovery_handle: Box::new(overseer_handle), + sync_service: network.sync_service.clone(), + }; + + // TODO: change for async backing + #[allow(deprecated)] + cumulus_client_service::start_full_node(params)?; + + Ok(NodeBuilder { + client, + backend, + transaction_pool, + telemetry, + telemetry_worker_handle, + task_manager, + keystore_container, + hwbench, + prometheus_registry, + network: TypeIdentity::from_type(network), + tx_handler_controller, + import_queue_service: (), + }) + } } /// Block authoring scheme to be used by the dev service. diff --git a/node/src/service.rs b/node/src/service.rs index 20fb9836b..fb2679e20 100644 --- a/node/src/service.rs +++ b/node/src/service.rs @@ -293,69 +293,73 @@ async fn start_node_impl( // Channel to send messages to start/stop container chains let (cc_spawn_tx, cc_spawn_rx) = unbounded_channel(); - let params = new_partial(¶chain_config)?; - let (block_import, mut telemetry, telemetry_worker_handle) = params.other; - let client = params.client.clone(); - let backend = params.backend.clone(); - let mut task_manager = params.task_manager; + // Create a `NodeBuilder` which helps setup parachain nodes common systems. + let mut node_builder = + node_common::service::NodeBuilder::new(¶chain_config, hwbench.clone())?; + + // The nimbus import queue ONLY checks the signature correctness + // Any other checks corresponding to the author-correctness should be done + // in the runtime + let block_import = + ParachainBlockImport::new(node_builder.client.clone(), node_builder.backend.clone()); + let import_queue = nimbus_consensus::import_queue( + node_builder.client.clone(), + block_import.clone(), + move |_, _| async move { + let time = sp_timestamp::InherentDataProvider::from_system_time(); - let (relay_chain_interface, collator_key) = build_relay_chain_interface( - polkadot_config, - ¶chain_config, - telemetry_worker_handle, - &mut task_manager, - collator_options.clone(), - hwbench.clone(), - ) - .await - .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?; + Ok((time,)) + }, + &node_builder.task_manager.spawn_essential_handle(), + parachain_config.prometheus_registry(), + false, + )?; - let force_authoring = parachain_config.force_authoring; - let validator = parachain_config.role.is_authority(); - let prometheus_registry = parachain_config.prometheus_registry().cloned(); - let transaction_pool = params.transaction_pool.clone(); - let import_queue_service = params.import_queue.service(); - let net_config = FullNetworkConfiguration::new(¶chain_config.network); + // let params = new_partial(¶chain_config)?; + // let (block_import, mut telemetry, telemetry_worker_handle) = params.other; - let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) = - cumulus_client_service::build_network(cumulus_client_service::BuildNetworkParams { - parachain_config: ¶chain_config, - client: client.clone(), - transaction_pool: transaction_pool.clone(), - spawn_handle: task_manager.spawn_handle(), - import_queue: params.import_queue, - para_id, - relay_chain_interface: relay_chain_interface.clone(), - net_config, - sybil_resistance_level: CollatorSybilResistance::Resistant, - }) + // let client = params.client.clone(); + // let backend = params.backend.clone(); + // let mut task_manager = params.task_manager; + + let (relay_chain_interface, collator_key) = node_builder + .build_relay_chain_interface(¶chain_config, polkadot_config, collator_options.clone()) .await?; - if parachain_config.offchain_worker.enabled { - task_manager.spawn_handle().spawn( - "offchain-workers-runner", - "offchain-work", - sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions { - runtime_api_provider: client.clone(), - keystore: Some(params.keystore_container.keystore()), - offchain_db: backend.offchain_storage(), - transaction_pool: Some(OffchainTransactionPoolFactory::new( - transaction_pool.clone(), - )), - network_provider: network.clone(), - is_validator: parachain_config.role.is_authority(), - enable_http_requests: false, - custom_extensions: move |_| vec![], - }) - .run(client.clone(), task_manager.spawn_handle()) - .boxed(), - ); - } + // let force_authoring = parachain_config.force_authoring; + let validator = parachain_config.role.is_authority(); + // let prometheus_registry = parachain_config.prometheus_registry().cloned(); + // let transaction_pool = params.transaction_pool.clone(); + // let import_queue_service = params.import_queue.service(); + // let net_config = FullNetworkConfiguration::new(¶chain_config.network); + + // let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) = + // cumulus_client_service::build_network(cumulus_client_service::BuildNetworkParams { + // parachain_config: ¶chain_config, + // client: client.clone(), + // transaction_pool: transaction_pool.clone(), + // spawn_handle: task_manager.spawn_handle(), + // import_queue: params.import_queue, + // para_id, + // relay_chain_interface: relay_chain_interface.clone(), + // net_config, + // sybil_resistance_level: CollatorSybilResistance::Resistant, + // }) + // .await?; + + let node_builder = node_builder + .build_cumulus_network( + ¶chain_config, + para_id, + import_queue, + relay_chain_interface.clone(), + ) + .await?; let rpc_builder = { - let client = client.clone(); - let transaction_pool = transaction_pool.clone(); + let client = node_builder.client.clone(); + let transaction_pool = node_builder.transaction_pool.clone(); Box::new(move |deny_unsafe, _| { let deps = crate::rpc::FullDeps { @@ -370,46 +374,69 @@ async fn start_node_impl( }) }; - sc_service::spawn_tasks(sc_service::SpawnTasksParams { - rpc_builder, - client: client.clone(), - transaction_pool: transaction_pool.clone(), - task_manager: &mut task_manager, - config: parachain_config, - keystore: params.keystore_container.keystore(), - backend: backend.clone(), - network: network.clone(), - system_rpc_tx, - tx_handler_controller, - telemetry: telemetry.as_mut(), - sync_service: sync_service.clone(), - })?; - - if let Some(hwbench) = hwbench { - sc_sysinfo::print_hwbench(&hwbench); - // Here you can check whether the hardware meets your chains' requirements. Putting a link - // in there and swapping out the requirements for your own are probably a good idea. The - // requirements for a para-chain are dictated by its relay-chain. - if !SUBSTRATE_REFERENCE_HARDWARE.check_hardware(&hwbench) && validator { - log::warn!( - "⚠️ The hardware does not meet the minimal requirements for role 'Authority'." - ); - } - - if let Some(ref mut telemetry) = telemetry { - let telemetry_handle = telemetry.handle(); - task_manager.spawn_handle().spawn( - "telemetry_hwbench", - None, - sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench), - ); - } - } + let node_builder = node_builder.spawn_common_tasks(parachain_config, rpc_builder)?; - let announce_block = { - let sync_service = sync_service.clone(); - Arc::new(move |hash, data| sync_service.announce_block(hash, data)) - }; + // if parachain_config.offchain_worker.enabled { + // node_builder.task_manager.spawn_handle().spawn( + // "offchain-workers-runner", + // "offchain-work", + // sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions { + // runtime_api_provider: node_builder.client.clone(), + // keystore: Some(node_builder.keystore_container.keystore()), + // offchain_db: node_builder.backend.offchain_storage(), + // transaction_pool: Some(OffchainTransactionPoolFactory::new( + // node_builder.transaction_pool.clone(), + // )), + // network_provider: node_builder.network.network.clone(), + // is_validator: parachain_config.role.is_authority(), + // enable_http_requests: false, + // custom_extensions: move |_| vec![], + // }) + // .run(node_builder.client.clone(), node_builder.task_manager.spawn_handle()) + // .boxed(), + // ); + // } + + // sc_service::spawn_tasks(sc_service::SpawnTasksParams { + // rpc_builder, + // client: client.clone(), + // transaction_pool: transaction_pool.clone(), + // task_manager: &mut task_manager, + // config: parachain_config, + // keystore: params.keystore_container.keystore(), + // backend: backend.clone(), + // network: network.clone(), + // system_rpc_tx, + // tx_handler_controller, + // telemetry: telemetry.as_mut(), + // sync_service: sync_service.clone(), + // })?; + + // if let Some(hwbench) = hwbench { + // sc_sysinfo::print_hwbench(&hwbench); + // // Here you can check whether the hardware meets your chains' requirements. Putting a link + // // in there and swapping out the requirements for your own are probably a good idea. The + // // requirements for a para-chain are dictated by its relay-chain. + // if !SUBSTRATE_REFERENCE_HARDWARE.check_hardware(&hwbench) && validator { + // log::warn!( + // "⚠️ The hardware does not meet the minimal requirements for role 'Authority'." + // ); + // } + + // if let Some(ref mut telemetry) = telemetry { + // let telemetry_handle = telemetry.handle(); + // task_manager.spawn_handle().spawn( + // "telemetry_hwbench", + // None, + // sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench), + // ); + // } + // } + + // let announce_block = { + // let sync_service = sync_service.clone(); + // Arc::new(move |hash, data| sync_service.announce_block(hash, data)) + // }; let relay_chain_slot_duration = Duration::from_secs(6); @@ -417,104 +444,112 @@ async fn start_node_impl( .overseer_handle() .map_err(|e| sc_service::Error::Application(Box::new(e)))?; - let orchestrator_chain_interface_builder = OrchestratorChainInProcessInterfaceBuilder { - client: client.clone(), - backend: backend.clone(), - sync_oracle: sync_service.clone(), - overseer_handle: overseer_handle.clone(), - }; - - let sync_keystore = params.keystore_container.keystore(); - let mut collate_on_tanssi = None; - - if validator { - let parachain_consensus = build_consensus_orchestrator( - client.clone(), - block_import, - prometheus_registry.as_ref(), - telemetry.as_ref().map(|t| t.handle()), - &task_manager, - relay_chain_interface.clone(), - transaction_pool, - sync_service.clone(), - params.keystore_container.keystore(), - force_authoring, - para_id, - )?; - - // Start task which detects para id assignment, and starts/stops container chains. - // Note that if this node was started without a `container_chain_config`, we don't - // support collation on container chains, so there is no need to detect changes to assignment - if container_chain_config.is_some() { - build_check_assigned_para_id( - client.clone(), - sync_keystore.clone(), - cc_spawn_tx.clone(), - task_manager.spawn_essential_handle(), - ); - } - - let spawner = task_manager.spawn_handle(); - let params = StartCollatorParams { - para_id, - block_status: client.clone(), - announce_block: announce_block.clone(), - client: client.clone(), - task_manager: &mut task_manager, - relay_chain_interface: relay_chain_interface.clone(), - spawner: spawner.clone(), - parachain_consensus: parachain_consensus.clone(), - import_queue: import_queue_service, - collator_key: collator_key - .clone() - .expect("Command line arguments do not allow this. qed"), - relay_chain_slot_duration, - recovery_handle: Box::new(overseer_handle.clone()), - sync_service, - }; - - let client = client.clone(); - let collator_key = collator_key.clone(); - // TODO: change for async backing - collate_on_tanssi = Some(move || async move { - #[allow(deprecated)] - cumulus_client_collator::start_collator(cumulus_client_collator::StartCollatorParams { - runtime_api: client.clone(), - block_status: client.clone(), - announce_block, - overseer_handle, - spawner, - para_id, - key: collator_key - .clone() - .expect("Command line arguments do not allow this. qed"), - parachain_consensus, - }) - .await; - }); - - // TODO: change for async backing - #[allow(deprecated)] - start_collator(params).await?; + // let sync_keystore = node_builder.keystore_container.keystore(); + // let mut collate_on_tanssi = None; + + let mut node_builder = if validator { + todo!("node_builder.start_collator"); + + // let parachain_consensus = build_consensus_orchestrator( + // node_builder.client.clone(), + // block_import, + // node_builder.prometheus_registry.as_ref(), + // node_builder.telemetry.as_ref().map(|t| t.handle()), + // &node_builder.task_manager, + // relay_chain_interface.clone(), + // node_builder.transaction_pool, + // node_builder.network.sync_service.clone(), + // node_builder.keystore_container.keystore(), + // force_authoring, + // para_id, + // )?; + + // // Start task which detects para id assignment, and starts/stops container chains. + // // Note that if this node was started without a `container_chain_config`, we don't + // // support collation on container chains, so there is no need to detect changes to assignment + // if container_chain_config.is_some() { + // build_check_assigned_para_id( + // node_builder.client.clone(), + // sync_keystore.clone(), + // cc_spawn_tx.clone(), + // node_builder.ask_manager.spawn_essential_handle(), + // ); + // } + + // let spawner = task_manager.spawn_handle(); + // let params = StartCollatorParams { + // para_id, + // block_status: node_builder.client.clone(), + // announce_block: announce_block.clone(), + // client: node_builder.client.clone(), + // task_manager: &mut node_builder.task_manager, + // relay_chain_interface: relay_chain_interface.clone(), + // spawner: spawner.clone(), + // parachain_consensus: parachain_consensus.clone(), + // import_queue: import_queue_service, + // collator_key: collator_key + // .clone() + // .expect("Command line arguments do not allow this. qed"), + // relay_chain_slot_duration, + // recovery_handle: Box::new(overseer_handle.clone()), + // sync_service, + // }; + + // let client = client.clone(); + // let collator_key = collator_key.clone(); + // // TODO: change for async backing + // collate_on_tanssi = Some(move || async move { + // #[allow(deprecated)] + // cumulus_client_collator::start_collator(cumulus_client_collator::StartCollatorParams { + // runtime_api: client.clone(), + // block_status: client.clone(), + // announce_block, + // overseer_handle, + // spawner, + // para_id, + // key: collator_key + // .clone() + // .expect("Command line arguments do not allow this. qed"), + // parachain_consensus, + // }) + // .await; + // }); + + // // TODO: change for async backing + // #[allow(deprecated)] + // start_collator(params).await?; } else { - let params = StartFullNodeParams { - client: client.clone(), - announce_block, - task_manager: &mut task_manager, + node_builder.start_full_node( para_id, - relay_chain_interface: relay_chain_interface.clone(), + relay_chain_interface.clone(), relay_chain_slot_duration, - import_queue: import_queue_service, - recovery_handle: Box::new(overseer_handle), - sync_service, - }; + )? + // let params = StartFullNodeParams { + // client: client.clone(), + // announce_block, + // task_manager: &mut task_manager, + // para_id, + // relay_chain_interface: relay_chain_interface.clone(), + // relay_chain_slot_duration, + // import_queue: import_queue_service, + // recovery_handle: Box::new(overseer_handle), + // sync_service, + // }; + + // // TODO: change for async backing + // #[allow(deprecated)] + // start_full_node(params)?; + }; - // TODO: change for async backing - #[allow(deprecated)] - start_full_node(params)?; - } + node_builder.network.start_network.start_network(); - start_network.start_network(); + let sync_keystore = node_builder.keystore_container.keystore(); + let orchestrator_chain_interface_builder = OrchestratorChainInProcessInterfaceBuilder { + client: node_builder.client.clone(), + backend: node_builder.backend.clone(), + sync_oracle: node_builder.network.sync_service.clone(), + overseer_handle: overseer_handle.clone(), + }; if let Some((container_chain_cli, tokio_handle)) = container_chain_config { // If the orchestrator chain is running as a full-node, we start a full node for the @@ -533,8 +568,8 @@ async fn start_node_impl( } // Start container chain spawner task. This will start and stop container chains on demand. - let orchestrator_client = client.clone(); - let spawn_handle = task_manager.spawn_handle(); + let orchestrator_client = node_builder.client.clone(); + let spawn_handle = node_builder.task_manager.spawn_handle(); let container_chain_spawner = ContainerChainSpawner { orchestrator_chain_interface: orchestrator_chain_interface_builder.build(), orchestrator_client, @@ -549,24 +584,25 @@ async fn start_node_impl( validator, spawn_handle, state: Default::default(), - collate_on_tanssi: Arc::new(move || Box::pin((collate_on_tanssi.clone().unwrap())())), + collate_on_tanssi: todo!(), + // collate_on_tanssi: Arc::new(move || Box::pin((collate_on_tanssi.clone().unwrap())())), }; let state = container_chain_spawner.state.clone(); - task_manager.spawn_essential_handle().spawn( + node_builder.task_manager.spawn_essential_handle().spawn( "container-chain-spawner-rx-loop", None, container_chain_spawner.rx_loop(cc_spawn_rx), ); - task_manager.spawn_essential_handle().spawn( + node_builder.task_manager.spawn_essential_handle().spawn( "container-chain-spawner-debug-state", None, crate::container_chain_monitor::monitor_task(state), ) } - Ok((task_manager, client)) + Ok((node_builder.task_manager, node_builder.client)) } // Log string that will be shown for the container chain: `[Container-2000]`.