From 739c37bfd6df30fac0ffb9b491ee2495e1753054 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Wed, 19 Jun 2024 12:58:29 +0300 Subject: [PATCH] Fix core sharing and make use of scheduling_lookahead (#4724) Implements most of https://github.com/paritytech/polkadot-sdk/issues/1797 Core sharing (two parachains or more marachains scheduled on the same core with the same `PartsOf57600` value) was not working correctly. The expected behaviour is to have Backed and Included event in each block for the paras sharing the core and the paras should take turns. E.g. for two cores we expect: Backed(a); Included(a)+Backed(b); Included(b)+Backed(a); etc. Instead of this each block contains just one event and there are a lot of gaps (blocks w/o events) during the session. Core sharing should also work when collators are building collations ahead of time TODOs: - [x] Add a zombienet test verifying that the behaviour mentioned above works. - [x] prdoc --------- Co-authored-by: alindima --- .gitlab/pipeline/zombienet/polkadot.yml | 17 ++ Cargo.lock | 1 + polkadot/node/core/backing/src/lib.rs | 88 ++++++----- polkadot/node/core/backing/src/tests/mod.rs | 45 +++++- .../src/tests/prospective_parachains.rs | 20 +++ .../core/prospective-parachains/Cargo.toml | 1 + .../core/prospective-parachains/src/lib.rs | 73 +++++---- .../core/prospective-parachains/src/tests.rs | 145 +++++++++++++----- .../src/collator_side/mod.rs | 30 ++-- .../src/collator_side/tests/mod.rs | 57 +++++-- .../tests/prospective_parachains.rs | 89 +---------- .../src/validator_side/collation.rs | 6 +- .../src/validator_side/mod.rs | 74 ++++----- .../src/validator_side/tests/mod.rs | 38 ++++- .../tests/prospective_parachains.rs | 20 +++ .../statement-distribution/src/v2/mod.rs | 42 ++--- polkadot/node/subsystem-util/src/vstaging.rs | 15 +- .../src/runtime_api_impl/vstaging.rs | 8 +- polkadot/runtime/parachains/src/scheduler.rs | 3 + polkadot/zombienet_tests/assign-core.js | 48 ++++++ .../0001-basic-3cores-6s-blocks.zndsl | 4 +- ...stic-scaling-doesnt-break-parachains.zndsl | 4 +- .../elastic_scaling/assign-core.js | 40 +---- .../functional/0015-coretime-shared-core.toml | 44 ++++++ .../0015-coretime-shared-core.zndsl | 16 ++ .../functional/0015-force-register-paras.js | 63 ++++++++ .../zombienet_tests/functional/assign-core.js | 1 + prdoc/pr_4724.prdoc | 24 +++ 28 files changed, 675 insertions(+), 341 deletions(-) create mode 100644 polkadot/zombienet_tests/assign-core.js mode change 100644 => 120000 polkadot/zombienet_tests/elastic_scaling/assign-core.js create mode 100644 polkadot/zombienet_tests/functional/0015-coretime-shared-core.toml create mode 100644 polkadot/zombienet_tests/functional/0015-coretime-shared-core.zndsl create mode 100644 polkadot/zombienet_tests/functional/0015-force-register-paras.js create mode 120000 polkadot/zombienet_tests/functional/assign-core.js create mode 100644 prdoc/pr_4724.prdoc diff --git a/.gitlab/pipeline/zombienet/polkadot.yml b/.gitlab/pipeline/zombienet/polkadot.yml index b158cbe0b5aa..90251082077c 100644 --- a/.gitlab/pipeline/zombienet/polkadot.yml +++ b/.gitlab/pipeline/zombienet/polkadot.yml @@ -162,6 +162,9 @@ zombienet-polkadot-elastic-scaling-0001-basic-3cores-6s-blocks: - .zombienet-polkadot-common variables: FORCED_INFRA_INSTANCE: "spot-iops" + before_script: + - !reference [.zombienet-polkadot-common, before_script] + - cp --remove-destination ${LOCAL_DIR}/assign-core.js ${LOCAL_DIR}/elastic_scaling script: - /home/nonroot/zombie-net/scripts/ci/run-test-local-env-manager.sh --local-dir="${LOCAL_DIR}/elastic_scaling" @@ -170,6 +173,9 @@ zombienet-polkadot-elastic-scaling-0001-basic-3cores-6s-blocks: zombienet-polkadot-elastic-scaling-0002-elastic-scaling-doesnt-break-parachains: extends: - .zombienet-polkadot-common + before_script: + - !reference [.zombienet-polkadot-common, before_script] + - cp --remove-destination ${LOCAL_DIR}/assign-core.js ${LOCAL_DIR}/elastic_scaling script: - /home/nonroot/zombie-net/scripts/ci/run-test-local-env-manager.sh --local-dir="${LOCAL_DIR}/elastic_scaling" @@ -199,6 +205,17 @@ zombienet-polkadot-functional-0014-chunk-fetching-network-compatibility: --local-dir="${LOCAL_DIR}/functional" --test="0014-chunk-fetching-network-compatibility.zndsl" +zombienet-polkadot-functional-0015-coretime-shared-core: + extends: + - .zombienet-polkadot-common + before_script: + - !reference [.zombienet-polkadot-common, before_script] + - cp --remove-destination ${LOCAL_DIR}/assign-core.js ${LOCAL_DIR}/functional + script: + - /home/nonroot/zombie-net/scripts/ci/run-test-local-env-manager.sh + --local-dir="${LOCAL_DIR}/functional" + --test="0015-coretime-shared-core.zndsl" + zombienet-polkadot-smoke-0001-parachains-smoke-test: extends: - .zombienet-polkadot-common diff --git a/Cargo.lock b/Cargo.lock index 113cfa06a84a..bbb785a618a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13209,6 +13209,7 @@ dependencies = [ "polkadot-node-subsystem-util", "polkadot-primitives", "polkadot-primitives-test-helpers", + "rstest", "sc-keystore", "sp-application-crypto", "sp-core", diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index 38e8a93bb048..1bda81c5197e 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -102,6 +102,7 @@ use polkadot_node_subsystem_util::{ runtime::{ self, prospective_parachains_mode, request_min_backing_votes, ProspectiveParachainsMode, }, + vstaging::{fetch_claim_queue, ClaimQueueSnapshot}, Validator, }; use polkadot_primitives::{ @@ -212,8 +213,6 @@ struct PerRelayParentState { parent: Hash, /// Session index. session_index: SessionIndex, - /// The `ParaId` assigned to the local validator at this relay parent. - assigned_para: Option, /// The `CoreIndex` assigned to the local validator at this relay parent. assigned_core: Option, /// The candidates that are backed by enough validators in their group, by hash. @@ -233,8 +232,11 @@ struct PerRelayParentState { /// If true, we're appending extra bits in the BackedCandidate validator indices bitfield, /// which represent the assigned core index. True if ElasticScalingMVP is enabled. inject_core_index: bool, - /// The core states for all cores. - cores: Vec, + /// The number of cores. + n_cores: u32, + /// Claim queue state. If the runtime API is not available, it'll be populated with info from + /// availability cores. + claim_queue: ClaimQueueSnapshot, /// The validator index -> group mapping at this relay parent. validator_to_group: Arc>>, /// The associated group rotation information. @@ -1004,20 +1006,19 @@ macro_rules! try_runtime_api { fn core_index_from_statement( validator_to_group: &IndexedVec>, group_rotation_info: &GroupRotationInfo, - cores: &[CoreState], + n_cores: u32, + claim_queue: &ClaimQueueSnapshot, statement: &SignedFullStatementWithPVD, ) -> Option { let compact_statement = statement.as_unchecked(); let candidate_hash = CandidateHash(*compact_statement.unchecked_payload().candidate_hash()); - let n_cores = cores.len(); - gum::trace!( target:LOG_TARGET, ?group_rotation_info, ?statement, ?validator_to_group, - n_cores = ?cores.len(), + n_cores, ?candidate_hash, "Extracting core index from statement" ); @@ -1029,7 +1030,7 @@ fn core_index_from_statement( ?group_rotation_info, ?statement, ?validator_to_group, - n_cores = ?cores.len() , + n_cores, ?candidate_hash, "Invalid validator index: {:?}", statement_validator_index @@ -1038,37 +1039,25 @@ fn core_index_from_statement( }; // First check if the statement para id matches the core assignment. - let core_index = group_rotation_info.core_for_group(*group_index, n_cores); + let core_index = group_rotation_info.core_for_group(*group_index, n_cores as _); - if core_index.0 as usize > n_cores { + if core_index.0 > n_cores { gum::warn!(target: LOG_TARGET, ?candidate_hash, ?core_index, n_cores, "Invalid CoreIndex"); return None } if let StatementWithPVD::Seconded(candidate, _pvd) = statement.payload() { let candidate_para_id = candidate.descriptor.para_id; - let assigned_para_id = match &cores[core_index.0 as usize] { - CoreState::Free => { - gum::debug!(target: LOG_TARGET, ?candidate_hash, "Invalid CoreIndex, core is not assigned to any para_id"); - return None - }, - CoreState::Occupied(occupied) => - if let Some(next) = &occupied.next_up_on_available { - next.para_id - } else { - return None - }, - CoreState::Scheduled(scheduled) => scheduled.para_id, - }; + let mut assigned_paras = claim_queue.iter_claims_for_core(&core_index); - if assigned_para_id != candidate_para_id { + if !assigned_paras.any(|id| id == &candidate_para_id) { gum::debug!( target: LOG_TARGET, ?candidate_hash, ?core_index, - ?assigned_para_id, + assigned_paras = ?claim_queue.iter_claims_for_core(&core_index).collect::>(), ?candidate_para_id, - "Invalid CoreIndex, core is assigned to a different para_id" + "Invalid CoreIndex, core is not assigned to this para_id" ); return None } @@ -1129,6 +1118,8 @@ async fn construct_per_relay_parent_state( Error::UtilError(TryFrom::try_from(e).expect("the conversion is infallible; qed")) })?; + let maybe_claim_queue = try_runtime_api!(fetch_claim_queue(ctx.sender(), parent).await); + let signing_context = SigningContext { parent_hash: parent, session_index }; let validator = match Validator::construct( &validators, @@ -1153,31 +1144,35 @@ async fn construct_per_relay_parent_state( let mut groups = HashMap::>::new(); let mut assigned_core = None; - let mut assigned_para = None; + + let has_claim_queue = maybe_claim_queue.is_some(); + let mut claim_queue = maybe_claim_queue.unwrap_or_default().0; for (idx, core) in cores.iter().enumerate() { - let core_para_id = match core { - CoreState::Scheduled(scheduled) => scheduled.para_id, - CoreState::Occupied(occupied) => - if mode.is_enabled() { + let core_index = CoreIndex(idx as _); + + if !has_claim_queue { + match core { + CoreState::Scheduled(scheduled) => + claim_queue.insert(core_index, [scheduled.para_id].into_iter().collect()), + CoreState::Occupied(occupied) if mode.is_enabled() => { // Async backing makes it legal to build on top of // occupied core. if let Some(next) = &occupied.next_up_on_available { - next.para_id + claim_queue.insert(core_index, [next.para_id].into_iter().collect()) } else { continue } - } else { - continue }, - CoreState::Free => continue, - }; + _ => continue, + }; + } else if !claim_queue.contains_key(&core_index) { + continue + } - let core_index = CoreIndex(idx as _); let group_index = group_rotation_info.group_for_core(core_index, n_cores); if let Some(g) = validator_groups.get(group_index.0 as usize) { if validator.as_ref().map_or(false, |v| g.contains(&v.index())) { - assigned_para = Some(core_para_id); assigned_core = Some(core_index); } groups.insert(core_index, g.clone()); @@ -1212,7 +1207,6 @@ async fn construct_per_relay_parent_state( parent, session_index, assigned_core, - assigned_para, backed: HashSet::new(), table: Table::new(table_config), table_context, @@ -1221,7 +1215,8 @@ async fn construct_per_relay_parent_state( fallbacks: HashMap::new(), minimum_backing_votes, inject_core_index, - cores, + n_cores: cores.len() as u32, + claim_queue: ClaimQueueSnapshot::from(claim_queue), validator_to_group: validator_to_group.clone(), group_rotation_info, })) @@ -1674,7 +1669,8 @@ async fn import_statement( let core = core_index_from_statement( &rp_state.validator_to_group, &rp_state.group_rotation_info, - &rp_state.cores, + rp_state.n_cores, + &rp_state.claim_queue, statement, ) .ok_or(Error::CoreIndexUnavailable)?; @@ -2098,12 +2094,14 @@ async fn handle_second_message( return Ok(()) } + let assigned_paras = rp_state.assigned_core.and_then(|core| rp_state.claim_queue.0.get(&core)); + // Sanity check that candidate is from our assignment. - if Some(candidate.descriptor().para_id) != rp_state.assigned_para { + if !matches!(assigned_paras, Some(paras) if paras.contains(&candidate.descriptor().para_id)) { gum::debug!( target: LOG_TARGET, our_assignment_core = ?rp_state.assigned_core, - our_assignment_para = ?rp_state.assigned_para, + our_assignment_paras = ?assigned_paras, collation = ?candidate.descriptor().para_id, "Subsystem asked to second for para outside of our assignment", ); @@ -2113,7 +2111,7 @@ async fn handle_second_message( gum::debug!( target: LOG_TARGET, our_assignment_core = ?rp_state.assigned_core, - our_assignment_para = ?rp_state.assigned_para, + our_assignment_paras = ?assigned_paras, collation = ?candidate.descriptor().para_id, "Current assignments vs collation", ); diff --git a/polkadot/node/core/backing/src/tests/mod.rs b/polkadot/node/core/backing/src/tests/mod.rs index bb23c7fbeb24..5f2bc7e18424 100644 --- a/polkadot/node/core/backing/src/tests/mod.rs +++ b/polkadot/node/core/backing/src/tests/mod.rs @@ -42,7 +42,10 @@ use sp_application_crypto::AppCrypto; use sp_keyring::Sr25519Keyring; use sp_keystore::Keystore; use sp_tracing as _; -use std::{collections::HashMap, time::Duration}; +use std::{ + collections::{BTreeMap, HashMap, VecDeque}, + time::Duration, +}; mod prospective_parachains; @@ -75,6 +78,7 @@ pub(crate) struct TestState { validator_groups: (Vec>, GroupRotationInfo), validator_to_group: IndexedVec>, availability_cores: Vec, + claim_queue: BTreeMap>, head_data: HashMap, signing_context: SigningContext, relay_parent: Hash, @@ -130,6 +134,10 @@ impl Default for TestState { CoreState::Scheduled(ScheduledCore { para_id: chain_b, collator: None }), ]; + let mut claim_queue = BTreeMap::new(); + claim_queue.insert(CoreIndex(0), [chain_a].into_iter().collect()); + claim_queue.insert(CoreIndex(1), [chain_b].into_iter().collect()); + let mut head_data = HashMap::new(); head_data.insert(chain_a, HeadData(vec![4, 5, 6])); head_data.insert(chain_b, HeadData(vec![5, 6, 7])); @@ -153,6 +161,7 @@ impl Default for TestState { validator_groups: (validator_groups, group_rotation_info), validator_to_group, availability_cores, + claim_queue, head_data, validation_data, signing_context, @@ -338,6 +347,26 @@ async fn test_startup(virtual_overseer: &mut VirtualOverseer, test_state: &TestS tx.send(Ok(test_state.disabled_validators.clone())).unwrap(); } ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(parent, RuntimeApiRequest::Version(tx)) + ) if parent == test_state.relay_parent => { + tx.send(Ok(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)).unwrap(); + } + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(parent, RuntimeApiRequest::ClaimQueue(tx)) + ) if parent == test_state.relay_parent => { + tx.send(Ok( + test_state.claim_queue.clone() + )).unwrap(); + } + ); } async fn assert_validation_requests( @@ -730,11 +759,16 @@ fn get_backed_candidate_preserves_order() { // Assign the second core to the same para as the first one. test_state.availability_cores[1] = CoreState::Scheduled(ScheduledCore { para_id: test_state.chain_ids[0], collator: None }); + *test_state.claim_queue.get_mut(&CoreIndex(1)).unwrap() = + [test_state.chain_ids[0]].into_iter().collect(); // Add another availability core for paraid 2. test_state.availability_cores.push(CoreState::Scheduled(ScheduledCore { para_id: test_state.chain_ids[1], collator: None, })); + test_state + .claim_queue + .insert(CoreIndex(2), [test_state.chain_ids[1]].into_iter().collect()); test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move { test_startup(&mut virtual_overseer, &test_state).await; @@ -1103,7 +1137,8 @@ fn extract_core_index_from_statement_works() { let core_index_1 = core_index_from_statement( &test_state.validator_to_group, &test_state.validator_groups.1, - &test_state.availability_cores, + test_state.availability_cores.len() as _, + &test_state.claim_queue.clone().into(), &signed_statement_1, ) .unwrap(); @@ -1113,7 +1148,8 @@ fn extract_core_index_from_statement_works() { let core_index_2 = core_index_from_statement( &test_state.validator_to_group, &test_state.validator_groups.1, - &test_state.availability_cores, + test_state.availability_cores.len() as _, + &test_state.claim_queue.clone().into(), &signed_statement_2, ); @@ -1123,7 +1159,8 @@ fn extract_core_index_from_statement_works() { let core_index_3 = core_index_from_statement( &test_state.validator_to_group, &test_state.validator_groups.1, - &test_state.availability_cores, + test_state.availability_cores.len() as _, + &test_state.claim_queue.clone().into(), &signed_statement_3, ) .unwrap(); diff --git a/polkadot/node/core/backing/src/tests/prospective_parachains.rs b/polkadot/node/core/backing/src/tests/prospective_parachains.rs index 74490c84eb18..15bc0b4a1139 100644 --- a/polkadot/node/core/backing/src/tests/prospective_parachains.rs +++ b/polkadot/node/core/backing/src/tests/prospective_parachains.rs @@ -212,6 +212,26 @@ async fn activate_leaf( tx.send(Ok(Vec::new())).unwrap(); } ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(parent, RuntimeApiRequest::Version(tx)) + ) if parent == hash => { + tx.send(Ok(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)).unwrap(); + } + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(parent, RuntimeApiRequest::ClaimQueue(tx)) + ) if parent == hash => { + tx.send(Ok( + test_state.claim_queue.clone() + )).unwrap(); + } + ); } } diff --git a/polkadot/node/core/prospective-parachains/Cargo.toml b/polkadot/node/core/prospective-parachains/Cargo.toml index f3193153be89..b9573ee98519 100644 --- a/polkadot/node/core/prospective-parachains/Cargo.toml +++ b/polkadot/node/core/prospective-parachains/Cargo.toml @@ -32,3 +32,4 @@ sc-keystore = { path = "../../../../substrate/client/keystore" } sp-application-crypto = { path = "../../../../substrate/primitives/application-crypto" } sp-keyring = { path = "../../../../substrate/primitives/keyring" } sp-keystore = { path = "../../../../substrate/primitives/keystore" } +rstest = "0.18.2" diff --git a/polkadot/node/core/prospective-parachains/src/lib.rs b/polkadot/node/core/prospective-parachains/src/lib.rs index d5bb5ff76ba8..e4b6deffdf4a 100644 --- a/polkadot/node/core/prospective-parachains/src/lib.rs +++ b/polkadot/node/core/prospective-parachains/src/lib.rs @@ -44,6 +44,7 @@ use polkadot_node_subsystem_util::{ inclusion_emulator::{Constraints, RelayChainBlockInfo}, request_session_index_for_child, runtime::{prospective_parachains_mode, ProspectiveParachainsMode}, + vstaging::fetch_claim_queue, }; use polkadot_primitives::{ async_backing::CandidatePendingAvailability, BlockNumber, CandidateHash, @@ -870,37 +871,51 @@ async fn fetch_backing_state( async fn fetch_upcoming_paras( ctx: &mut Context, relay_parent: Hash, -) -> JfyiErrorResult> { - let (tx, rx) = oneshot::channel(); - - // This'll have to get more sophisticated with parathreads, - // but for now we can just use the `AvailabilityCores`. - ctx.send_message(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::AvailabilityCores(tx), - )) - .await; - - let cores = rx.await.map_err(JfyiError::RuntimeApiRequestCanceled)??; - let mut upcoming = HashSet::new(); - for core in cores { - match core { - CoreState::Occupied(occupied) => { - if let Some(next_up_on_available) = occupied.next_up_on_available { - upcoming.insert(next_up_on_available.para_id); - } - if let Some(next_up_on_time_out) = occupied.next_up_on_time_out { - upcoming.insert(next_up_on_time_out.para_id); +) -> JfyiErrorResult> { + Ok(match fetch_claim_queue(ctx.sender(), relay_parent).await? { + Some(claim_queue) => { + // Runtime supports claim queue - use it + claim_queue + .iter_all_claims() + .flat_map(|(_, paras)| paras.into_iter()) + .copied() + .collect() + }, + None => { + // fallback to availability cores - remove this branch once claim queue is released + // everywhere + let (tx, rx) = oneshot::channel(); + ctx.send_message(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::AvailabilityCores(tx), + )) + .await; + + let cores = rx.await.map_err(JfyiError::RuntimeApiRequestCanceled)??; + + let mut upcoming = HashSet::with_capacity(cores.len()); + for core in cores { + match core { + CoreState::Occupied(occupied) => { + // core sharing won't work optimally with this branch because the collations + // can't be prepared in advance. + if let Some(next_up_on_available) = occupied.next_up_on_available { + upcoming.insert(next_up_on_available.para_id); + } + if let Some(next_up_on_time_out) = occupied.next_up_on_time_out { + upcoming.insert(next_up_on_time_out.para_id); + } + }, + CoreState::Scheduled(scheduled) => { + upcoming.insert(scheduled.para_id); + }, + CoreState::Free => {}, } - }, - CoreState::Scheduled(scheduled) => { - upcoming.insert(scheduled.para_id); - }, - CoreState::Free => {}, - } - } + } - Ok(upcoming.into_iter().collect()) + upcoming + }, + }) } // Fetch ancestors in descending order, up to the amount requested. diff --git a/polkadot/node/core/prospective-parachains/src/tests.rs b/polkadot/node/core/prospective-parachains/src/tests.rs index d2fc3cbd3623..221fbf4c4e60 100644 --- a/polkadot/node/core/prospective-parachains/src/tests.rs +++ b/polkadot/node/core/prospective-parachains/src/tests.rs @@ -26,11 +26,15 @@ use polkadot_node_subsystem::{ use polkadot_node_subsystem_test_helpers as test_helpers; use polkadot_primitives::{ async_backing::{AsyncBackingParams, BackingState, Constraints, InboundHrmpLimitations}, - CommittedCandidateReceipt, HeadData, Header, PersistedValidationData, ScheduledCore, + CommittedCandidateReceipt, CoreIndex, HeadData, Header, PersistedValidationData, ScheduledCore, ValidationCodeHash, }; use polkadot_primitives_test_helpers::make_candidate; -use std::sync::Arc; +use rstest::rstest; +use std::{ + collections::{BTreeMap, VecDeque}, + sync::Arc, +}; use test_helpers::mock::new_leaf; const ALLOWED_ANCESTRY_LEN: u32 = 3; @@ -70,7 +74,8 @@ fn dummy_constraints( } struct TestState { - availability_cores: Vec, + claim_queue: BTreeMap>, + runtime_api_version: u32, validation_code_hash: ValidationCodeHash, } @@ -79,13 +84,23 @@ impl Default for TestState { let chain_a = ParaId::from(1); let chain_b = ParaId::from(2); - let availability_cores = vec![ - CoreState::Scheduled(ScheduledCore { para_id: chain_a, collator: None }), - CoreState::Scheduled(ScheduledCore { para_id: chain_b, collator: None }), - ]; + let mut claim_queue = BTreeMap::new(); + claim_queue.insert(CoreIndex(0), [chain_a].into_iter().collect()); + claim_queue.insert(CoreIndex(1), [chain_b].into_iter().collect()); + let validation_code_hash = Hash::repeat_byte(42).into(); - Self { availability_cores, validation_code_hash } + Self { + validation_code_hash, + claim_queue, + runtime_api_version: RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT, + } + } +} + +impl TestState { + fn set_runtime_api_version(&mut self, version: u32) { + self.runtime_api_version = version; } } @@ -227,12 +242,39 @@ async fn handle_leaf_activation( assert_matches!( virtual_overseer.recv().await, AllMessages::RuntimeApi( - RuntimeApiMessage::Request(parent, RuntimeApiRequest::AvailabilityCores(tx)) + RuntimeApiMessage::Request(parent, RuntimeApiRequest::Version(tx)) ) if parent == *hash => { - tx.send(Ok(test_state.availability_cores.clone())).unwrap(); + tx.send( + Ok(test_state.runtime_api_version) + ).unwrap(); } ); + if test_state.runtime_api_version < RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT { + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(parent, RuntimeApiRequest::AvailabilityCores(tx)) + ) if parent == *hash => { + tx.send(Ok(test_state.claim_queue.values().map(|paras| CoreState::Scheduled( + ScheduledCore { + para_id: *paras.front().unwrap(), + collator: None + } + )).collect())).unwrap(); + } + ); + } else { + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(parent, RuntimeApiRequest::ClaimQueue(tx)) + ) if parent == *hash => { + tx.send(Ok(test_state.claim_queue.clone())).unwrap(); + } + ); + } + send_block_header(virtual_overseer, *hash, *number).await; // Check that subsystem job issues a request for ancestors. @@ -277,14 +319,16 @@ async fn handle_leaf_activation( ); } - for _ in 0..test_state.availability_cores.len() { + let paras: HashSet<_> = test_state.claim_queue.values().flatten().collect(); + + for _ in 0..paras.len() { let message = virtual_overseer.recv().await; // Get the para we are working with since the order is not deterministic. - let para_id = match message { + let para_id = match &message { AllMessages::RuntimeApi(RuntimeApiMessage::Request( _, RuntimeApiRequest::ParaBackingState(p_id, _), - )) => p_id, + )) => *p_id, _ => panic!("received unexpected message {:?}", message), }; @@ -505,9 +549,18 @@ fn should_do_no_work_if_async_backing_disabled_for_leaf() { // - Two for the same leaf A (one for parachain 1 and one for parachain 2) // - One for leaf B on parachain 1 // - One for leaf C on parachain 2 +// Also tests a claim queue size larger than 1. #[test] fn introduce_candidates_basic() { - let test_state = TestState::default(); + let mut test_state = TestState::default(); + + let chain_a = ParaId::from(1); + let chain_b = ParaId::from(2); + let mut claim_queue = BTreeMap::new(); + claim_queue.insert(CoreIndex(0), [chain_a, chain_b].into_iter().collect()); + + test_state.claim_queue = claim_queue; + let view = test_harness(|mut virtual_overseer| async move { // Leaf A let leaf_a = TestLeaf { @@ -2032,9 +2085,15 @@ fn check_pvd_query() { // Test simultaneously activating and deactivating leaves, and simultaneously deactivating // multiple leaves. -#[test] -fn correctly_updates_leaves() { - let test_state = TestState::default(); +// This test is parametrised with the runtime api version. For versions that don't support the claim +// queue API, we check that av-cores are used. +#[rstest] +#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)] +#[case(8)] +fn correctly_updates_leaves(#[case] runtime_api_version: u32) { + let mut test_state = TestState::default(); + test_state.set_runtime_api_version(runtime_api_version); + let view = test_harness(|mut virtual_overseer| async move { // Leaf A let leaf_a = TestLeaf { @@ -2140,15 +2199,12 @@ fn correctly_updates_leaves() { fn persists_pending_availability_candidate() { let mut test_state = TestState::default(); let para_id = ParaId::from(1); - test_state.availability_cores = test_state - .availability_cores + test_state.claim_queue = test_state + .claim_queue .into_iter() - .filter(|core| match core { - CoreState::Scheduled(scheduled_core) => scheduled_core.para_id == para_id, - _ => false, - }) + .filter(|(_, paras)| matches!(paras.front(), Some(para) if para == ¶_id)) .collect(); - assert_eq!(test_state.availability_cores.len(), 1); + assert_eq!(test_state.claim_queue.len(), 1); test_harness(|mut virtual_overseer| async move { let para_head = HeadData(vec![1, 2, 3]); @@ -2237,18 +2293,15 @@ fn persists_pending_availability_candidate() { } #[test] -fn backwards_compatible() { +fn backwards_compatible_with_non_async_backing_params() { let mut test_state = TestState::default(); let para_id = ParaId::from(1); - test_state.availability_cores = test_state - .availability_cores + test_state.claim_queue = test_state + .claim_queue .into_iter() - .filter(|core| match core { - CoreState::Scheduled(scheduled_core) => scheduled_core.para_id == para_id, - _ => false, - }) + .filter(|(_, paras)| matches!(paras.front(), Some(para) if para == ¶_id)) .collect(); - assert_eq!(test_state.availability_cores.len(), 1); + assert_eq!(test_state.claim_queue.len(), 1); test_harness(|mut virtual_overseer| async move { let para_head = HeadData(vec![1, 2, 3]); @@ -2350,20 +2403,30 @@ fn uses_ancestry_only_within_session() { .await; assert_matches!( - virtual_overseer.recv().await, - AllMessages::RuntimeApi( - RuntimeApiMessage::Request(parent, RuntimeApiRequest::AsyncBackingParams(tx)) - ) if parent == hash => { - tx.send(Ok(AsyncBackingParams { max_candidate_depth: 0, allowed_ancestry_len: ancestry_len - })).unwrap(); } - ); + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + parent, + RuntimeApiRequest::AsyncBackingParams(tx) + )) if parent == hash => { + tx.send(Ok(AsyncBackingParams { max_candidate_depth: 0, allowed_ancestry_len: ancestry_len})).unwrap(); + }); assert_matches!( virtual_overseer.recv().await, AllMessages::RuntimeApi( - RuntimeApiMessage::Request(parent, RuntimeApiRequest::AvailabilityCores(tx)) + RuntimeApiMessage::Request(parent, RuntimeApiRequest::Version(tx)) + ) if parent == hash => { + tx.send(Ok(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)).unwrap(); + } + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(parent, RuntimeApiRequest::ClaimQueue(tx)) ) if parent == hash => { - tx.send(Ok(Vec::new())).unwrap(); + tx.send(Ok(BTreeMap::new())).unwrap(); } ); diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs index 80a85420b392..5c201542eb56 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs @@ -51,6 +51,7 @@ use polkadot_node_subsystem_util::{ get_availability_cores, get_group_rotation_info, prospective_parachains_mode, ProspectiveParachainsMode, RuntimeInfo, }, + vstaging::fetch_claim_queue, TimeoutExt, }; use polkadot_primitives::{ @@ -579,22 +580,27 @@ async fn determine_cores( let cores = get_availability_cores(sender, relay_parent).await?; let n_cores = cores.len(); let mut assigned_cores = Vec::new(); + let maybe_claim_queue = fetch_claim_queue(sender, relay_parent).await?; for (idx, core) in cores.iter().enumerate() { - let core_para_id = match core { - CoreState::Scheduled(scheduled) => Some(scheduled.para_id), - CoreState::Occupied(occupied) => - if relay_parent_mode.is_enabled() { - // With async backing we don't care about the core state, - // it is only needed for figuring our validators group. - Some(occupied.candidate_descriptor.para_id) - } else { - None - }, - CoreState::Free => None, + let core_is_scheduled = match maybe_claim_queue { + Some(ref claim_queue) => { + // Runtime supports claim queue - use it. + claim_queue + .iter_claims_for_core(&CoreIndex(idx as u32)) + .any(|para| para == ¶_id) + }, + None => match core { + CoreState::Scheduled(scheduled) if scheduled.para_id == para_id => true, + CoreState::Occupied(occupied) if relay_parent_mode.is_enabled() => + // With async backing we don't care about the core state, + // it is only needed for figuring our validators group. + occupied.next_up_on_available.as_ref().map(|c| c.para_id) == Some(para_id), + _ => false, + }, }; - if core_para_id == Some(para_id) { + if core_is_scheduled { assigned_cores.push(CoreIndex::from(idx as u32)); } } diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs index a13e99df4ab4..13601ca7a005 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs @@ -16,7 +16,11 @@ use super::*; -use std::{collections::HashSet, sync::Arc, time::Duration}; +use std::{ + collections::{BTreeMap, HashSet, VecDeque}, + sync::Arc, + time::Duration, +}; use assert_matches::assert_matches; use futures::{executor, future, Future}; @@ -66,7 +70,7 @@ struct TestState { group_rotation_info: GroupRotationInfo, validator_peer_id: Vec, relay_parent: Hash, - availability_cores: Vec, + claim_queue: BTreeMap>, local_peer_id: PeerId, collator_pair: CollatorPair, session_index: SessionIndex, @@ -105,8 +109,9 @@ impl Default for TestState { let group_rotation_info = GroupRotationInfo { session_start_block: 0, group_rotation_frequency: 100, now: 1 }; - let availability_cores = - vec![CoreState::Scheduled(ScheduledCore { para_id, collator: None }), CoreState::Free]; + let mut claim_queue = BTreeMap::new(); + claim_queue.insert(CoreIndex(0), [para_id].into_iter().collect()); + claim_queue.insert(CoreIndex(1), VecDeque::new()); let relay_parent = Hash::random(); @@ -133,7 +138,7 @@ impl Default for TestState { group_rotation_info, validator_peer_id, relay_parent, - availability_cores, + claim_queue, local_peer_id, collator_pair, session_index: 1, @@ -147,17 +152,14 @@ impl TestState { pub fn with_elastic_scaling() -> Self { let mut state = Self::default(); let para_id = state.para_id; - state - .availability_cores - .push(CoreState::Scheduled(ScheduledCore { para_id, collator: None })); - state - .availability_cores - .push(CoreState::Scheduled(ScheduledCore { para_id, collator: None })); + + state.claim_queue.insert(CoreIndex(2), [para_id].into_iter().collect()); + state.claim_queue.insert(CoreIndex(3), [para_id].into_iter().collect()); state } fn current_group_validator_indices(&self) -> &[ValidatorIndex] { - let core_num = self.availability_cores.len(); + let core_num = self.claim_queue.len(); let GroupIndex(group_idx) = self.group_rotation_info.group_for_core(CoreIndex(0), core_num); &self.session_info.validator_groups.get(GroupIndex::from(group_idx)).unwrap() } @@ -395,7 +397,36 @@ async fn distribute_collation_with_receipt( RuntimeApiRequest::AvailabilityCores(tx) )) => { assert_eq!(relay_parent, _relay_parent); - tx.send(Ok(test_state.availability_cores.clone())).unwrap(); + tx.send(Ok(test_state.claim_queue.values().map(|paras| + if let Some(para) = paras.front() { + CoreState::Scheduled(ScheduledCore { para_id: *para, collator: None }) + } else { + CoreState::Free + } + ).collect())).unwrap(); + } + ); + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _relay_parent, + RuntimeApiRequest::Version(tx) + )) => { + assert_eq!(relay_parent, _relay_parent); + tx.send(Ok(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)).unwrap(); + } + ); + + // obtain the claim queue schedule. + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _relay_parent, + RuntimeApiRequest::ClaimQueue(tx) + )) => { + assert_eq!(relay_parent, _relay_parent); + tx.send(Ok(test_state.claim_queue.clone())).unwrap(); } ); diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs index 0a0a85fb1f27..ea8fdb0e04fb 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs @@ -19,7 +19,7 @@ use super::*; use polkadot_node_subsystem::messages::ChainApiMessage; -use polkadot_primitives::{AsyncBackingParams, Header, OccupiedCore}; +use polkadot_primitives::{AsyncBackingParams, Header}; const ASYNC_BACKING_PARAMETERS: AsyncBackingParams = AsyncBackingParams { max_candidate_depth: 4, allowed_ancestry_len: 3 }; @@ -665,90 +665,3 @@ fn advertise_and_send_collation_by_hash() { }, ) } - -/// Tests that collator distributes collation built on top of occupied core. -#[test] -fn advertise_core_occupied() { - let mut test_state = TestState::default(); - let candidate = - TestCandidateBuilder { para_id: test_state.para_id, ..Default::default() }.build(); - test_state.availability_cores[0] = CoreState::Occupied(OccupiedCore { - next_up_on_available: None, - occupied_since: 0, - time_out_at: 0, - next_up_on_time_out: None, - availability: BitVec::default(), - group_responsible: GroupIndex(0), - candidate_hash: candidate.hash(), - candidate_descriptor: candidate.descriptor, - }); - - let local_peer_id = test_state.local_peer_id; - let collator_pair = test_state.collator_pair.clone(); - - test_harness( - local_peer_id, - collator_pair, - ReputationAggregator::new(|_| true), - |mut test_harness| async move { - let virtual_overseer = &mut test_harness.virtual_overseer; - - let head_a = Hash::from_low_u64_be(128); - let head_a_num: u32 = 64; - - // Grandparent of head `a`. - let head_b = Hash::from_low_u64_be(130); - - // Set collating para id. - overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id)) - .await; - // Activated leaf is `a`, but the collation will be based on `b`. - update_view(virtual_overseer, vec![(head_a, head_a_num)], 1).await; - - let pov = PoV { block_data: BlockData(vec![1, 2, 3]) }; - let candidate = TestCandidateBuilder { - para_id: test_state.para_id, - relay_parent: head_b, - pov_hash: pov.hash(), - ..Default::default() - } - .build(); - let candidate_hash = candidate.hash(); - distribute_collation_with_receipt( - virtual_overseer, - &test_state, - head_b, - true, - candidate, - pov, - Hash::zero(), - ) - .await; - - let validators = test_state.current_group_validator_authority_ids(); - let peer_ids = test_state.current_group_validator_peer_ids(); - - connect_peer( - virtual_overseer, - peer_ids[0], - CollationVersion::V2, - Some(validators[0].clone()), - ) - .await; - expect_declare_msg_v2(virtual_overseer, &test_state, &peer_ids[0]).await; - // Peer is aware of the leaf. - send_peer_view_change(virtual_overseer, &peer_ids[0], vec![head_a]).await; - - // Collation is advertised. - expect_advertise_collation_msg( - virtual_overseer, - &peer_ids[0], - head_b, - Some(vec![candidate_hash]), - ) - .await; - - test_harness - }, - ) -} diff --git a/polkadot/node/network/collator-protocol/src/validator_side/collation.rs b/polkadot/node/network/collator-protocol/src/validator_side/collation.rs index 001df1fb3da9..96ffe9f13db3 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side/collation.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side/collation.rs @@ -270,7 +270,7 @@ impl Collations { // We don't need to fetch any other collation when we already have seconded one. CollationStatus::Seconded => None, CollationStatus::Waiting => - if !self.is_seconded_limit_reached(relay_parent_mode) { + if self.is_seconded_limit_reached(relay_parent_mode) { None } else { self.waiting_queue.pop_front() @@ -280,7 +280,7 @@ impl Collations { } } - /// Checks the limit of seconded candidates for a given para. + /// Checks the limit of seconded candidates. pub(super) fn is_seconded_limit_reached( &self, relay_parent_mode: ProspectiveParachainsMode, @@ -293,7 +293,7 @@ impl Collations { } else { 1 }; - self.seconded_count < seconded_limit + self.seconded_count >= seconded_limit } } diff --git a/polkadot/node/network/collator-protocol/src/validator_side/mod.rs b/polkadot/node/network/collator-protocol/src/validator_side/mod.rs index 9f037a983e51..f5c9726f3f6a 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side/mod.rs @@ -19,7 +19,7 @@ use futures::{ }; use futures_timer::Delay; use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, + collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, future::Future, time::{Duration, Instant}, }; @@ -51,6 +51,7 @@ use polkadot_node_subsystem_util::{ backing_implicit_view::View as ImplicitView, reputation::{ReputationAggregator, REPUTATION_CHANGE_INTERVAL}, runtime::{prospective_parachains_mode, ProspectiveParachainsMode}, + vstaging::fetch_claim_queue, }; use polkadot_primitives::{ CandidateHash, CollatorId, CoreState, Hash, HeadData, Id as ParaId, OccupiedCoreAssumption, @@ -362,8 +363,8 @@ impl PeerData { #[derive(Debug)] struct GroupAssignments { - /// Current assignment. - current: Option, + /// Current assignments. + current: Vec, } struct PerRelayParent { @@ -376,7 +377,7 @@ impl PerRelayParent { fn new(mode: ProspectiveParachainsMode) -> Self { Self { prospective_parachains_mode: mode, - assignment: GroupAssignments { current: None }, + assignment: GroupAssignments { current: vec![] }, collations: Collations::default(), } } @@ -491,34 +492,34 @@ where .await .map_err(Error::CancelledAvailabilityCores)??; - let para_now = match polkadot_node_subsystem_util::signing_key_and_index(&validators, keystore) - .and_then(|(_, index)| polkadot_node_subsystem_util::find_validator_group(&groups, index)) - { - Some(group) => { - let core_now = rotation_info.core_for_group(group, cores.len()); - - cores.get(core_now.0 as usize).and_then(|c| match c { - CoreState::Occupied(core) if relay_parent_mode.is_enabled() => Some(core.para_id()), - CoreState::Scheduled(core) => Some(core.para_id), - CoreState::Occupied(_) | CoreState::Free => None, - }) - }, - None => { - gum::trace!(target: LOG_TARGET, ?relay_parent, "Not a validator"); - - return Ok(()) - }, + let core_now = if let Some(group) = + polkadot_node_subsystem_util::signing_key_and_index(&validators, keystore).and_then( + |(_, index)| polkadot_node_subsystem_util::find_validator_group(&groups, index), + ) { + rotation_info.core_for_group(group, cores.len()) + } else { + gum::trace!(target: LOG_TARGET, ?relay_parent, "Not a validator"); + return Ok(()) }; - // This code won't work well, if at all for on-demand parachains. For on-demand we'll - // have to be aware of which core the on-demand claim is going to be multiplexed - // onto. The on-demand claim will also have a known collator, and we should always - // allow an incoming connection from that collator. If not even connecting to them - // directly. - // - // However, this'll work fine for parachains, as each parachain gets a dedicated - // core. - if let Some(para_id) = para_now.as_ref() { + let paras_now = match fetch_claim_queue(sender, relay_parent).await.map_err(Error::Runtime)? { + // Runtime supports claim queue - use it + // + // `relay_parent_mode` is not examined here because if the runtime supports claim queue + // then it supports async backing params too (`ASYNC_BACKING_STATE_RUNTIME_REQUIREMENT` + // < `CLAIM_QUEUE_RUNTIME_REQUIREMENT`). + Some(mut claim_queue) => claim_queue.0.remove(&core_now), + // Claim queue is not supported by the runtime - use availability cores instead. + None => cores.get(core_now.0 as usize).and_then(|c| match c { + CoreState::Occupied(core) if relay_parent_mode.is_enabled() => + core.next_up_on_available.as_ref().map(|c| [c.para_id].into_iter().collect()), + CoreState::Scheduled(core) => Some([core.para_id].into_iter().collect()), + CoreState::Occupied(_) | CoreState::Free => None, + }), + } + .unwrap_or_else(|| VecDeque::new()); + + for para_id in paras_now.iter() { let entry = current_assignments.entry(*para_id).or_default(); *entry += 1; if *entry == 1 { @@ -531,7 +532,7 @@ where } } - *group_assignment = GroupAssignments { current: para_now }; + *group_assignment = GroupAssignments { current: paras_now.into_iter().collect() }; Ok(()) } @@ -542,7 +543,7 @@ fn remove_outgoing( ) { let GroupAssignments { current, .. } = per_relay_parent.assignment; - if let Some(cur) = current { + for cur in current { if let Entry::Occupied(mut occupied) = current_assignments.entry(cur) { *occupied.get_mut() -= 1; if *occupied.get() == 0 { @@ -857,7 +858,8 @@ async fn process_incoming_peer_message( peer_id = ?origin, ?collator_id, ?para_id, - "Declared as collator for unneeded para", + "Declared as collator for unneeded para. Current assignments: {:?}", + &state.current_assignments ); modify_reputation( @@ -1089,7 +1091,7 @@ where peer_data.collating_para().ok_or(AdvertisementError::UndeclaredCollator)?; // Check if this is assigned to us. - if assignment.current.map_or(true, |id| id != collator_para_id) { + if !assignment.current.contains(&collator_para_id) { return Err(AdvertisementError::InvalidAssignment) } @@ -1105,7 +1107,7 @@ where ) .map_err(AdvertisementError::Invalid)?; - if !per_relay_parent.collations.is_seconded_limit_reached(relay_parent_mode) { + if per_relay_parent.collations.is_seconded_limit_reached(relay_parent_mode) { return Err(AdvertisementError::SecondedLimitReached) } @@ -1197,7 +1199,7 @@ where }); let collations = &mut per_relay_parent.collations; - if !collations.is_seconded_limit_reached(relay_parent_mode) { + if collations.is_seconded_limit_reached(relay_parent_mode) { gum::trace!( target: LOG_TARGET, peer_id = ?peer_id, diff --git a/polkadot/node/network/collator-protocol/src/validator_side/tests/mod.rs b/polkadot/node/network/collator-protocol/src/validator_side/tests/mod.rs index 3f4459d8e65d..44e25efd4dfc 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side/tests/mod.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side/tests/mod.rs @@ -21,7 +21,12 @@ use sc_network::ProtocolName; use sp_core::{crypto::Pair, Encode}; use sp_keyring::Sr25519Keyring; use sp_keystore::Keystore; -use std::{iter, sync::Arc, time::Duration}; +use std::{ + collections::{BTreeMap, VecDeque}, + iter, + sync::Arc, + time::Duration, +}; use polkadot_node_network_protocol::{ our_view, @@ -37,7 +42,7 @@ use polkadot_node_subsystem::{ use polkadot_node_subsystem_test_helpers as test_helpers; use polkadot_node_subsystem_util::{reputation::add_reputation, TimeoutExt}; use polkadot_primitives::{ - CandidateReceipt, CollatorPair, CoreState, GroupIndex, GroupRotationInfo, HeadData, + CandidateReceipt, CollatorPair, CoreIndex, CoreState, GroupIndex, GroupRotationInfo, HeadData, OccupiedCore, PersistedValidationData, ScheduledCore, ValidatorId, ValidatorIndex, }; use polkadot_primitives_test_helpers::{ @@ -71,6 +76,7 @@ struct TestState { validator_groups: Vec>, group_rotation_info: GroupRotationInfo, cores: Vec, + claim_queue: BTreeMap>, } impl Default for TestState { @@ -104,7 +110,7 @@ impl Default for TestState { CoreState::Scheduled(ScheduledCore { para_id: chain_ids[0], collator: None }), CoreState::Free, CoreState::Occupied(OccupiedCore { - next_up_on_available: None, + next_up_on_available: Some(ScheduledCore { para_id: chain_ids[1], collator: None }), occupied_since: 0, time_out_at: 1, next_up_on_time_out: None, @@ -120,6 +126,11 @@ impl Default for TestState { }), ]; + let mut claim_queue = BTreeMap::new(); + claim_queue.insert(CoreIndex(0), [chain_ids[0]].into_iter().collect()); + claim_queue.insert(CoreIndex(1), VecDeque::new()); + claim_queue.insert(CoreIndex(2), [chain_ids[1]].into_iter().collect()); + Self { chain_ids, relay_parent, @@ -128,6 +139,7 @@ impl Default for TestState { validator_groups, group_rotation_info, cores, + claim_queue, } } } @@ -264,6 +276,26 @@ async fn respond_to_core_info_queries( let _ = tx.send(Ok(test_state.cores.clone())); } ); + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::Version(tx), + )) => { + let _ = tx.send(Ok(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)); + } + ); + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::ClaimQueue(tx), + )) => { + let _ = tx.send(Ok(test_state.claim_queue.clone())); + } + ); } /// Assert that the next message is a `CandidateBacking(Second())`. diff --git a/polkadot/node/network/collator-protocol/src/validator_side/tests/prospective_parachains.rs b/polkadot/node/network/collator-protocol/src/validator_side/tests/prospective_parachains.rs index 178dcb85e035..472731b506ab 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side/tests/prospective_parachains.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side/tests/prospective_parachains.rs @@ -72,6 +72,26 @@ async fn assert_assign_incoming( tx.send(Ok(test_state.cores.clone())).unwrap(); } ); + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + parent, + RuntimeApiRequest::Version(tx), + )) if parent == hash => { + let _ = tx.send(Ok(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)); + } + ); + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + parent, + RuntimeApiRequest::ClaimQueue(tx), + )) if parent == hash => { + let _ = tx.send(Ok(test_state.claim_queue.clone())); + } + ); } /// Handle a view update. diff --git a/polkadot/node/network/statement-distribution/src/v2/mod.rs b/polkadot/node/network/statement-distribution/src/v2/mod.rs index 73416b193bbe..2bb9c82c6a6f 100644 --- a/polkadot/node/network/statement-distribution/src/v2/mod.rs +++ b/polkadot/node/network/statement-distribution/src/v2/mod.rs @@ -195,8 +195,8 @@ struct ActiveValidatorState { index: ValidatorIndex, // our validator group group: GroupIndex, - // the assignment of our validator group, if any. - assignment: Option, + // the assignments of our validator group, if any. + assignments: Vec, // the 'direct-in-group' communication at this relay-parent. cluster_tracker: ClusterTracker, } @@ -740,8 +740,8 @@ fn find_active_validator_state( let our_group = groups.by_validator_index(validator_index)?; let core_index = group_rotation_info.core_for_group(our_group, availability_cores.len()); - let para_assigned_to_core = if let Some(claim_queue) = maybe_claim_queue { - claim_queue.get_claim_for(core_index, 0) + let paras_assigned_to_core = if let Some(claim_queue) = maybe_claim_queue { + claim_queue.iter_claims_for_core(&core_index).copied().collect() } else { availability_cores .get(core_index.0 as usize) @@ -753,6 +753,8 @@ fn find_active_validator_state( .map(|scheduled_core| scheduled_core.para_id), CoreState::Free | CoreState::Occupied(_) => None, }) + .into_iter() + .collect() }; let group_validators = groups.get(our_group)?.to_owned(); @@ -760,7 +762,7 @@ fn find_active_validator_state( active: Some(ActiveValidatorState { index: validator_index, group: our_group, - assignment: para_assigned_to_core, + assignments: paras_assigned_to_core, cluster_tracker: ClusterTracker::new(group_validators, seconding_limit) .expect("group is non-empty because we are in it; qed"), }), @@ -1162,10 +1164,10 @@ pub(crate) async fn share_local_statement( None => return Ok(()), }; - let (local_index, local_assignment, local_group) = + let (local_index, local_assignments, local_group) = match per_relay_parent.active_validator_state() { None => return Err(JfyiError::InvalidShare), - Some(l) => (l.index, l.assignment, l.group), + Some(l) => (l.index, &l.assignments, l.group), }; // Two possibilities: either the statement is `Seconded` or we already @@ -1203,7 +1205,7 @@ pub(crate) async fn share_local_statement( return Err(JfyiError::InvalidShare) } - if local_assignment != Some(expected_para) || relay_parent != expected_relay_parent { + if !local_assignments.contains(&expected_para) || relay_parent != expected_relay_parent { return Err(JfyiError::InvalidShare) } @@ -2144,12 +2146,11 @@ async fn determine_groups_per_para( let n_cores = availability_cores.len(); // Determine the core indices occupied by each para at the current relay parent. To support - // on-demand parachains we also consider the core indices at next block if core has a candidate - // pending availability. - let para_core_indices: Vec<_> = if let Some(claim_queue) = maybe_claim_queue { + // on-demand parachains we also consider the core indices at next blocks. + let schedule: HashMap> = if let Some(claim_queue) = maybe_claim_queue { claim_queue - .iter_claims_at_depth(0) - .map(|(core_index, para)| (para, core_index)) + .iter_all_claims() + .map(|(core_index, paras)| (*core_index, paras.iter().copied().collect())) .collect() } else { availability_cores @@ -2157,12 +2158,12 @@ async fn determine_groups_per_para( .enumerate() .filter_map(|(index, core)| match core { CoreState::Scheduled(scheduled_core) => - Some((scheduled_core.para_id, CoreIndex(index as u32))), + Some((CoreIndex(index as u32), vec![scheduled_core.para_id])), CoreState::Occupied(occupied_core) => if max_candidate_depth >= 1 { - occupied_core - .next_up_on_available - .map(|scheduled_core| (scheduled_core.para_id, CoreIndex(index as u32))) + occupied_core.next_up_on_available.map(|scheduled_core| { + (CoreIndex(index as u32), vec![scheduled_core.para_id]) + }) } else { None }, @@ -2173,9 +2174,12 @@ async fn determine_groups_per_para( let mut groups_per_para = HashMap::new(); // Map from `CoreIndex` to `GroupIndex` and collect as `HashMap`. - for (para, core_index) in para_core_indices { + for (core_index, paras) in schedule { let group_index = group_rotation_info.group_for_core(core_index, n_cores); - groups_per_para.entry(para).or_insert_with(Vec::new).push(group_index) + + for para in paras { + groups_per_para.entry(para).or_insert_with(Vec::new).push(group_index); + } } groups_per_para diff --git a/polkadot/node/subsystem-util/src/vstaging.rs b/polkadot/node/subsystem-util/src/vstaging.rs index b166a54f75c4..b6cd73f412b3 100644 --- a/polkadot/node/subsystem-util/src/vstaging.rs +++ b/polkadot/node/subsystem-util/src/vstaging.rs @@ -31,7 +31,7 @@ const LOG_TARGET: &'static str = "parachain::subsystem-util-vstaging"; /// A snapshot of the runtime claim queue at an arbitrary relay chain block. #[derive(Default)] -pub struct ClaimQueueSnapshot(BTreeMap>); +pub struct ClaimQueueSnapshot(pub BTreeMap>); impl From>> for ClaimQueueSnapshot { fn from(claim_queue_snapshot: BTreeMap>) -> Self { @@ -56,6 +56,19 @@ impl ClaimQueueSnapshot { .iter() .filter_map(move |(core_index, paras)| Some((*core_index, *paras.get(depth)?))) } + + /// Returns an iterator over all claims on the given core. + pub fn iter_claims_for_core( + &self, + core_index: &CoreIndex, + ) -> impl Iterator + '_ { + self.0.get(core_index).map(|c| c.iter()).into_iter().flatten() + } + + /// Returns an iterator over the whole claim queue. + pub fn iter_all_claims(&self) -> impl Iterator)> + '_ { + self.0.iter() + } } // TODO: https://github.com/paritytech/polkadot-sdk/issues/1940 diff --git a/polkadot/runtime/parachains/src/runtime_api_impl/vstaging.rs b/polkadot/runtime/parachains/src/runtime_api_impl/vstaging.rs index 62e96e9fbb05..f4e3db185fea 100644 --- a/polkadot/runtime/parachains/src/runtime_api_impl/vstaging.rs +++ b/polkadot/runtime/parachains/src/runtime_api_impl/vstaging.rs @@ -28,10 +28,10 @@ use sp_std::{ pub fn claim_queue() -> BTreeMap> { let now = >::block_number() + One::one(); - // This explicit update is only strictly required for session boundaries: - // - // At the end of a session we clear the claim queues: Without this update call, nothing would be - // scheduled to the client. + // This is needed so that the claim queue always has the right size (equal to + // scheduling_lookahead). Otherwise, if a candidate is backed in the same block where the + // previous candidate is included, the claim queue will have already pop()-ed the next item + // from the queue and the length would be `scheduling_lookahead - 1`. >::free_cores_and_fill_claim_queue(Vec::new(), now); let config = configuration::ActiveConfig::::get(); // Extra sanity, config should already never be smaller than 1: diff --git a/polkadot/runtime/parachains/src/scheduler.rs b/polkadot/runtime/parachains/src/scheduler.rs index 33b4d849c490..d7fe5c06863c 100644 --- a/polkadot/runtime/parachains/src/scheduler.rs +++ b/polkadot/runtime/parachains/src/scheduler.rs @@ -351,6 +351,9 @@ impl Pallet { } /// Note that the given cores have become occupied. Update the claim queue accordingly. + /// This will not push a new entry onto the claim queue, so the length after this call will be + /// the expected length - 1. The claim_queue runtime API will take care of adding another entry + /// here, to ensure the right lookahead. pub(crate) fn occupied( now_occupied: BTreeMap, ) -> BTreeMap { diff --git a/polkadot/zombienet_tests/assign-core.js b/polkadot/zombienet_tests/assign-core.js new file mode 100644 index 000000000000..5ddb86930f5a --- /dev/null +++ b/polkadot/zombienet_tests/assign-core.js @@ -0,0 +1,48 @@ +async function run(nodeName, networkInfo, args) { + const wsUri = networkInfo.nodesByName[nodeName].wsUri; + const api = await zombie.connect(wsUri); + + let core = Number(args[0]); + + let assignments = []; + + for (let i = 1; i < args.length; i += 2) { + let [para, parts] = [args[i], args[i + 1]]; + + console.log(`Assigning para ${para} to core ${core}`); + + assignments.push( + [{ task: para }, parts] + ); + } + await zombie.util.cryptoWaitReady(); + + // account to submit tx + const keyring = new zombie.Keyring({ type: "sr25519" }); + const alice = keyring.addFromUri("//Alice"); + + await new Promise(async (resolve, reject) => { + const unsub = await api.tx.sudo + .sudo(api.tx.coretime.assignCore(core, 0, assignments, null)) + .signAndSend(alice, ({ status, isError }) => { + if (status.isInBlock) { + console.log( + `Transaction included at blockhash ${status.asInBlock}`, + ); + } else if (status.isFinalized) { + console.log( + `Transaction finalized at blockHash ${status.asFinalized}`, + ); + unsub(); + return resolve(); + } else if (isError) { + console.log(`Transaction error`); + reject(`Transaction error`); + } + }); + }); + + return 0; +} + +module.exports = { run }; diff --git a/polkadot/zombienet_tests/elastic_scaling/0001-basic-3cores-6s-blocks.zndsl b/polkadot/zombienet_tests/elastic_scaling/0001-basic-3cores-6s-blocks.zndsl index d624cbaf9df6..d47ef8f415f7 100644 --- a/polkadot/zombienet_tests/elastic_scaling/0001-basic-3cores-6s-blocks.zndsl +++ b/polkadot/zombienet_tests/elastic_scaling/0001-basic-3cores-6s-blocks.zndsl @@ -11,8 +11,8 @@ elastic-validator-4: reports node_roles is 4 # Register 2 extra cores to this some-parachain. -elastic-validator-0: js-script ./assign-core.js with "2000,0" return is 0 within 600 seconds -elastic-validator-0: js-script ./assign-core.js with "2000,1" return is 0 within 600 seconds +elastic-validator-0: js-script ./assign-core.js with "0,2000,57600" return is 0 within 600 seconds +elastic-validator-0: js-script ./assign-core.js with "1,2000,57600" return is 0 within 600 seconds # Wait for 20 relay chain blocks elastic-validator-0: reports substrate_block_height{status="best"} is at least 20 within 600 seconds diff --git a/polkadot/zombienet_tests/elastic_scaling/0002-elastic-scaling-doesnt-break-parachains.zndsl b/polkadot/zombienet_tests/elastic_scaling/0002-elastic-scaling-doesnt-break-parachains.zndsl index 900a3befbc6f..7ba896e1c903 100644 --- a/polkadot/zombienet_tests/elastic_scaling/0002-elastic-scaling-doesnt-break-parachains.zndsl +++ b/polkadot/zombienet_tests/elastic_scaling/0002-elastic-scaling-doesnt-break-parachains.zndsl @@ -11,8 +11,8 @@ validator: reports substrate_block_height{status="finalized"} is at least 10 wit validator: parachain 2000 block height is at least 10 within 200 seconds # Register the second core assigned to this parachain. -alice: js-script ./assign-core.js with "2000,0" return is 0 within 600 seconds -alice: js-script ./assign-core.js with "2000,1" return is 0 within 600 seconds +alice: js-script ./assign-core.js with "0,2000,57600" return is 0 within 600 seconds +alice: js-script ./assign-core.js with "0,2000,57600" return is 0 within 600 seconds validator: reports substrate_block_height{status="finalized"} is at least 35 within 100 seconds diff --git a/polkadot/zombienet_tests/elastic_scaling/assign-core.js b/polkadot/zombienet_tests/elastic_scaling/assign-core.js deleted file mode 100644 index add63b6d3085..000000000000 --- a/polkadot/zombienet_tests/elastic_scaling/assign-core.js +++ /dev/null @@ -1,39 +0,0 @@ -async function run(nodeName, networkInfo, args) { - const wsUri = networkInfo.nodesByName[nodeName].wsUri; - const api = await zombie.connect(wsUri); - - let para = Number(args[0]); - let core = Number(args[1]); - console.log(`Assigning para ${para} to core ${core}`); - - await zombie.util.cryptoWaitReady(); - - // account to submit tx - const keyring = new zombie.Keyring({ type: "sr25519" }); - const alice = keyring.addFromUri("//Alice"); - - await new Promise(async (resolve, reject) => { - const unsub = await api.tx.sudo - .sudo(api.tx.coretime.assignCore(core, 0, [[{ task: para }, 57600]], null)) - .signAndSend(alice, ({ status, isError }) => { - if (status.isInBlock) { - console.log( - `Transaction included at blockhash ${status.asInBlock}`, - ); - } else if (status.isFinalized) { - console.log( - `Transaction finalized at blockHash ${status.asFinalized}`, - ); - unsub(); - return resolve(); - } else if (isError) { - console.log(`Transaction error`); - reject(`Transaction error`); - } - }); - }); - - return 0; -} - -module.exports = { run }; diff --git a/polkadot/zombienet_tests/elastic_scaling/assign-core.js b/polkadot/zombienet_tests/elastic_scaling/assign-core.js new file mode 120000 index 000000000000..eeb6402c06f5 --- /dev/null +++ b/polkadot/zombienet_tests/elastic_scaling/assign-core.js @@ -0,0 +1 @@ +../assign-core.js \ No newline at end of file diff --git a/polkadot/zombienet_tests/functional/0015-coretime-shared-core.toml b/polkadot/zombienet_tests/functional/0015-coretime-shared-core.toml new file mode 100644 index 000000000000..fed30e0db053 --- /dev/null +++ b/polkadot/zombienet_tests/functional/0015-coretime-shared-core.toml @@ -0,0 +1,44 @@ +[settings] +timeout = 1000 + +[relaychain.genesis.runtimeGenesis.patch.configuration.config.async_backing_params] + max_candidate_depth = 3 + allowed_ancestry_len = 2 + +[relaychain.genesis.runtimeGenesis.patch.configuration.config.scheduler_params] + max_validators_per_core = 1 + lookahead = 2 + num_cores = 4 + +[relaychain.genesis.runtimeGenesis.patch.configuration.config.approval_voting_params] + needed_approvals = 3 + +[relaychain] +default_image = "{{ZOMBIENET_INTEGRATION_TEST_IMAGE}}" +chain = "rococo-local" +command = "polkadot" + + [[relaychain.node_groups]] + name = "validator" + args = ["-lruntime=debug,parachain=debug,parachain::backing=trace,parachain::collator-protocol=trace,parachain::prospective-parachains=trace,runtime::parachains::scheduler=trace,runtime::inclusion-inherent=trace,runtime::inclusion=trace" ] + count = 4 + +{% for id in range(2000,2004) %} +[[parachains]] +id = {{id}} +register_para = false +onboard_as_parachain = false +add_to_genesis = false +chain = "glutton-westend-local-{{id}}" + [parachains.genesis.runtimeGenesis.patch.glutton] + compute = "50000000" + storage = "2500000000" + trashDataCount = 5120 + + [parachains.collator] + name = "collator-{{id}}" + image = "{{CUMULUS_IMAGE}}" + command = "polkadot-parachain" + args = ["-lparachain=debug"] + +{% endfor %} diff --git a/polkadot/zombienet_tests/functional/0015-coretime-shared-core.zndsl b/polkadot/zombienet_tests/functional/0015-coretime-shared-core.zndsl new file mode 100644 index 000000000000..b8b8887df857 --- /dev/null +++ b/polkadot/zombienet_tests/functional/0015-coretime-shared-core.zndsl @@ -0,0 +1,16 @@ +Description: CT shared core test +Network: ./0015-coretime-shared-core.toml +Creds: config + +validator: reports node_roles is 4 + +# register paras 2 by 2 to speed up the test. registering all at once will exceed the weight limit. +validator-0: js-script ./0015-force-register-paras.js with "2000,2001" return is 0 within 600 seconds +validator-0: js-script ./0015-force-register-paras.js with "2002,2003" return is 0 within 600 seconds +# assign core 0 to be shared by all paras. +validator-0: js-script ./assign-core.js with "0,2000,14400,2001,14400,2002,14400,2003,14400" return is 0 within 600 seconds + +collator-2000: reports block height is at least 6 within 200 seconds +collator-2001: reports block height is at least 6 within 50 seconds +collator-2002: reports block height is at least 6 within 50 seconds +collator-2003: reports block height is at least 6 within 50 seconds diff --git a/polkadot/zombienet_tests/functional/0015-force-register-paras.js b/polkadot/zombienet_tests/functional/0015-force-register-paras.js new file mode 100644 index 000000000000..f82163b01105 --- /dev/null +++ b/polkadot/zombienet_tests/functional/0015-force-register-paras.js @@ -0,0 +1,63 @@ +async function run(nodeName, networkInfo, args) { + const init = networkInfo.nodesByName[nodeName]; + let wsUri = init.wsUri; + let userDefinedTypes = init.userDefinedTypes; + const api = await zombie.connect(wsUri, userDefinedTypes); + + // account to submit tx + const keyring = new zombie.Keyring({ type: "sr25519" }); + const alice = keyring.addFromUri("//Alice"); + + let calls = []; + + for (let i = 0; i < args.length; i++) { + let para = args[i]; + const sec = networkInfo.nodesByName["collator-" + para]; + const api_collator = await zombie.connect(sec.wsUri, sec.userDefinedTypes); + + await zombie.util.cryptoWaitReady(); + + // Get the genesis header and the validation code of the parachain + const genesis_header = await api_collator.rpc.chain.getHeader(); + const validation_code = await api_collator.rpc.state.getStorage("0x3A636F6465"); + + calls.push( + api.tx.paras.addTrustedValidationCode(validation_code.toHex()) + ); + calls.push( + api.tx.registrar.forceRegister( + alice.address, + 0, + Number(para), + genesis_header.toHex(), + validation_code.toHex(), + ) + ); + } + + const sudo_batch = api.tx.sudo.sudo(api.tx.utility.batch(calls)); + + await new Promise(async (resolve, reject) => { + const unsub = await sudo_batch + .signAndSend(alice, ({ status, isError }) => { + if (status.isInBlock) { + console.log( + `Transaction included at blockhash ${status.asInBlock}`, + ); + } else if (status.isFinalized) { + console.log( + `Transaction finalized at blockHash ${status.asFinalized}`, + ); + unsub(); + return resolve(); + } else if (isError) { + console.log(`Transaction error`); + reject(`Transaction error`); + } + }); + }); + + return 0; +} + +module.exports = { run }; diff --git a/polkadot/zombienet_tests/functional/assign-core.js b/polkadot/zombienet_tests/functional/assign-core.js new file mode 120000 index 000000000000..eeb6402c06f5 --- /dev/null +++ b/polkadot/zombienet_tests/functional/assign-core.js @@ -0,0 +1 @@ +../assign-core.js \ No newline at end of file diff --git a/prdoc/pr_4724.prdoc b/prdoc/pr_4724.prdoc new file mode 100644 index 000000000000..3723c2a70246 --- /dev/null +++ b/prdoc/pr_4724.prdoc @@ -0,0 +1,24 @@ +title: Fix core sharing and make use of scheduling_lookahead during backing + +doc: + - audience: Node Dev + description: | + Core sharing (two or more parachains scheduled on the same core with interlaced assignments) was not working correctly. + Adds the neccessary fixes to the backing subsystems. Moreover, adds support for backing collations which are built + and advertised ahead of time (with up to `scheduling_lookahead` relay chain blocks in advance). + +crates: + - name: polkadot-node-core-backing + bump: patch + - name: polkadot-node-core-prospective-parachains + bump: patch + - name: polkadot-collator-protocol + bump: patch + - name: polkadot-statement-distribution + bump: patch + - name: polkadot-node-subsystem-util + bump: minor + - name: polkadot-runtime-parachains + bump: none + - name: polkadot + bump: none