Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix occupied core handling #4691

Merged
merged 14 commits into from
Jun 7, 2024
2 changes: 1 addition & 1 deletion polkadot/runtime/parachains/src/assigner_coretime/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ fn run_to_block(
Scheduler::initializer_initialize(b + 1);

// In the real runtime this is expected to be called by the `InclusionInherent` pallet.
Scheduler::free_cores_and_fill_claimqueue(BTreeMap::new(), b + 1);
Scheduler::free_cores_and_fill_claim_queue(BTreeMap::new(), b + 1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ fn run_to_block(
OnDemandAssigner::on_initialize(b + 1);

// In the real runtime this is expected to be called by the `InclusionInherent` pallet.
Scheduler::free_cores_and_fill_claimqueue(BTreeMap::new(), b + 1);
Scheduler::free_cores_and_fill_claim_queue(BTreeMap::new(), b + 1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ fn run_to_block(
Scheduler::initializer_initialize(b + 1);

// In the real runtime this is expected to be called by the `InclusionInherent` pallet.
Scheduler::free_cores_and_fill_claimqueue(BTreeMap::new(), b + 1);
Scheduler::free_cores_and_fill_claim_queue(BTreeMap::new(), b + 1);
}
}

Expand Down
49 changes: 33 additions & 16 deletions polkadot/runtime/parachains/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,17 @@ pub(crate) struct BenchBuilder<T: paras_inherent::Config> {
/// will correspond to core index 3. There must be one entry for each core with a dispute
/// statement set.
dispute_sessions: Vec<u32>,
/// Paras here will both be backed in the inherent data and already occupying a core (which is
/// freed via bitfields).
///
/// Map from para id to number of validity votes. Core indices are generated based on
/// `elastic_paras` configuration. Each para id in `elastic_paras` gets the
/// specified amount of consecutive cores assigned to it. If a para id is not present
/// in `elastic_paras` it get assigned to a single core.
backed_and_concluding_paras: BTreeMap<u32, u32>,

/// Paras which don't yet occupy a core, but will after the inherent has been processed.
backed_in_inherent_paras: BTreeMap<u32, u32>,
/// Map from para id (seed) to number of chained candidates.
elastic_paras: BTreeMap<u32, u8>,
/// Make every candidate include a code upgrade by setting this to `Some` where the interior
Expand Down Expand Up @@ -132,6 +138,7 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
dispute_statements: BTreeMap::new(),
dispute_sessions: Default::default(),
backed_and_concluding_paras: Default::default(),
backed_in_inherent_paras: Default::default(),
elastic_paras: Default::default(),
code_upgrade: None,
fill_claimqueue: true,
Expand Down Expand Up @@ -167,6 +174,12 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
self
}

/// Set a map from para id seed to number of validity votes for votes in inherent data.
pub(crate) fn set_backed_in_inherent_paras(mut self, backed: BTreeMap<u32, u32>) -> Self {
self.backed_in_inherent_paras = backed;
self
}

/// Set a map from para id seed to number of cores assigned to it.
pub(crate) fn set_elastic_paras(mut self, elastic_paras: BTreeMap<u32, u8>) -> Self {
self.elastic_paras = elastic_paras;
Expand Down Expand Up @@ -753,8 +766,8 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
///
/// Note that this API only allows building scenarios where the `backed_and_concluding_paras`
/// are mutually exclusive with the cores for disputes. So
/// `backed_and_concluding_paras.len() + dispute_sessions.len()` must be less than the max
/// number of cores.
/// `backed_and_concluding_paras.len() + dispute_sessions.len() + backed_in_inherent_paras` must
/// be less than the max number of cores.
pub(crate) fn build(self) -> Bench<T> {
// Make sure relevant storage is cleared. This is just to get the asserts to work when
// running tests because it seems the storage is not cleared in between.
Expand All @@ -771,8 +784,10 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
.sum::<usize>()
.saturating_sub(self.elastic_paras.len() as usize);

let used_cores =
self.dispute_sessions.len() + self.backed_and_concluding_paras.len() + extra_cores;
let used_cores = self.dispute_sessions.len() +
self.backed_and_concluding_paras.len() +
self.backed_in_inherent_paras.len() +
extra_cores;

assert!(used_cores <= max_cores);
let fill_claimqueue = self.fill_claimqueue;
Expand All @@ -793,8 +808,12 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
&builder.elastic_paras,
used_cores,
);

let mut backed_in_inherent = BTreeMap::new();
backed_in_inherent.append(&mut builder.backed_and_concluding_paras.clone());
backed_in_inherent.append(&mut builder.backed_in_inherent_paras.clone());
let backed_candidates = builder.create_backed_candidates(
&builder.backed_and_concluding_paras,
&backed_in_inherent,
&builder.elastic_paras,
builder.code_upgrade,
);
Expand Down Expand Up @@ -845,12 +864,16 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
scheduler::AvailabilityCores::<T>::set(cores);

core_idx = 0u32;

// We need entries in the claim queue for those:
all_cores.append(&mut builder.backed_in_inherent_paras.clone());

if fill_claimqueue {
let cores = all_cores
.keys()
.flat_map(|para_id| {
(0..elastic_paras.get(&para_id).cloned().unwrap_or(1))
.filter_map(|_para_local_core_idx| {
.map(|_para_local_core_idx| {
let ttl = configuration::ActiveConfig::<T>::get().scheduler_params.ttl;
// Load an assignment into provider so that one is present to pop
let assignment =
Expand All @@ -859,17 +882,11 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
ParaId::from(*para_id),
);

let entry = (
CoreIndex(core_idx),
[ParasEntry::new(assignment, now + ttl)].into(),
);
let res = if builder.unavailable_cores.contains(&core_idx) {
None
} else {
Some(entry)
};
core_idx += 1;
res
(
CoreIndex(core_idx - 1),
[ParasEntry::new(assignment, now + ttl)].into(),
)
})
.collect::<Vec<(CoreIndex, VecDeque<ParasEntry<_>>)>>()
})
Expand Down
6 changes: 6 additions & 0 deletions polkadot/runtime/parachains/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ pub enum InconsistentError<BlockNumber> {
InconsistentExecutorParams { inner: ExecutorParamError },
/// TTL should be bigger than lookahead
LookaheadExceedsTTL,
/// Lookahead is zero, while it must be at least 1 for parachains to work.
LookaheadZero,
/// Passed in queue size for on-demand was too large.
OnDemandQueueSizeTooLarge,
/// Number of delay tranches cannot be 0.
Expand Down Expand Up @@ -432,6 +434,10 @@ where
return Err(LookaheadExceedsTTL)
}

if self.scheduler_params.lookahead == 0 {
return Err(LookaheadZero)
}

if self.scheduler_params.on_demand_queue_max_size > ON_DEMAND_MAX_QUEUE_MAX_SIZE {
return Err(OnDemandQueueSizeTooLarge)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ benchmarks! {
.collect();

let scenario = BenchBuilder::<T>::new()
.set_backed_and_concluding_paras(cores_with_backed.clone())
.set_backed_in_inherent_paras(cores_with_backed.clone())
.build();

let mut benchmark = scenario.data.clone();
Expand Down Expand Up @@ -161,7 +161,7 @@ benchmarks! {
.collect();

let scenario = BenchBuilder::<T>::new()
.set_backed_and_concluding_paras(cores_with_backed.clone())
.set_backed_in_inherent_paras(cores_with_backed.clone())
.set_code_upgrade(v)
.build();

Expand Down
19 changes: 10 additions & 9 deletions polkadot/runtime/parachains/src/paras_inherent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ impl<T: Config> Pallet<T> {
.chain(freed_disputed.into_iter().map(|core| (core, FreedReason::Concluded)))
.chain(freed_timeout.into_iter().map(|c| (c, FreedReason::TimedOut)))
.collect::<BTreeMap<CoreIndex, FreedReason>>();
scheduler::Pallet::<T>::free_cores_and_fill_claimqueue(freed, now);
scheduler::Pallet::<T>::free_cores_and_fill_claim_queue(freed, now);

METRICS.on_candidates_processed_total(backed_candidates.len() as u64);

Expand All @@ -570,25 +570,26 @@ impl<T: Config> Pallet<T> {
.map(|b| *b)
.unwrap_or(false);

let mut scheduled: BTreeMap<ParaId, BTreeSet<CoreIndex>> = BTreeMap::new();
let mut total_scheduled_cores = 0;
let mut eligible: BTreeMap<ParaId, BTreeSet<CoreIndex>> = BTreeMap::new();
let mut total_eligible_cores = 0;

for (core_idx, para_id) in scheduler::Pallet::<T>::scheduled_paras() {
total_scheduled_cores += 1;
scheduled.entry(para_id).or_default().insert(core_idx);
for (core_idx, para_id) in scheduler::Pallet::<T>::eligible_paras() {
total_eligible_cores += 1;
log::trace!(target: LOG_TARGET, "Found eligible para {:?} on core {:?}", para_id, core_idx);
eligible.entry(para_id).or_default().insert(core_idx);
}

let initial_candidate_count = backed_candidates.len();
let backed_candidates_with_core = sanitize_backed_candidates::<T>(
backed_candidates,
&allowed_relay_parents,
concluded_invalid_hashes,
scheduled,
eligible,
core_index_enabled,
);
let count = count_backed_candidates(&backed_candidates_with_core);

ensure!(count <= total_scheduled_cores, Error::<T>::UnscheduledCandidate);
ensure!(count <= total_eligible_cores, Error::<T>::UnscheduledCandidate);

METRICS.on_candidates_sanitized(count as u64);

Expand Down Expand Up @@ -1422,7 +1423,7 @@ fn map_candidates_to_cores<T: configuration::Config + scheduler::Config + inclus
} else {
log::debug!(
target: LOG_TARGET,
"Paraid: {:?} has no scheduled cores but {} candidates were supplied.",
"Paraid: {:?} has no entry in scheduled cores but {} candidates were supplied.",
para_id,
backed_candidates.len()
);
Expand Down
20 changes: 10 additions & 10 deletions polkadot/runtime/parachains/src/paras_inherent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ mod enter {
.unwrap();

// The current schedule is empty prior to calling `create_inherent_enter`.
assert!(scheduler::Pallet::<Test>::claimqueue_is_empty());
assert!(scheduler::Pallet::<Test>::claim_queue_is_empty());

// Nothing is filtered out (including the backed candidates.)
assert_eq!(
Expand Down Expand Up @@ -257,7 +257,7 @@ mod enter {
.unwrap();

// The current schedule is empty prior to calling `create_inherent_enter`.
assert!(scheduler::Pallet::<Test>::claimqueue_is_empty());
assert!(scheduler::Pallet::<Test>::claim_queue_is_empty());

assert!(pallet::OnChainVotes::<Test>::get().is_none());

Expand Down Expand Up @@ -372,7 +372,7 @@ mod enter {
let mut inherent_data = InherentData::new();
inherent_data.put_data(PARACHAINS_INHERENT_IDENTIFIER, &scenario.data).unwrap();

assert!(!scheduler::Pallet::<Test>::claimqueue_is_empty());
assert!(!scheduler::Pallet::<Test>::claim_queue_is_empty());

// The right candidates have been filtered out (the ones for cores 0,4,5)
assert_eq!(
Expand Down Expand Up @@ -618,7 +618,7 @@ mod enter {
.unwrap();

// The current schedule is empty prior to calling `create_inherent_enter`.
assert!(scheduler::Pallet::<Test>::claimqueue_is_empty());
assert!(scheduler::Pallet::<Test>::claim_queue_is_empty());

let multi_dispute_inherent_data =
Pallet::<Test>::create_inherent_inner(&inherent_data.clone()).unwrap();
Expand Down Expand Up @@ -690,7 +690,7 @@ mod enter {
.unwrap();

// The current schedule is empty prior to calling `create_inherent_enter`.
assert!(scheduler::Pallet::<Test>::claimqueue_is_empty());
assert!(scheduler::Pallet::<Test>::claim_queue_is_empty());

let limit_inherent_data =
Pallet::<Test>::create_inherent_inner(&inherent_data.clone()).unwrap();
Expand Down Expand Up @@ -762,7 +762,7 @@ mod enter {
.unwrap();

// The current schedule is empty prior to calling `create_inherent_enter`.
assert!(scheduler::Pallet::<Test>::claimqueue_is_empty());
assert!(scheduler::Pallet::<Test>::claim_queue_is_empty());

// Nothing is filtered out (including the backed candidates.)
let limit_inherent_data =
Expand Down Expand Up @@ -849,7 +849,7 @@ mod enter {
.unwrap();

// The current schedule is empty prior to calling `create_inherent_enter`.
assert!(scheduler::Pallet::<Test>::claimqueue_is_empty());
assert!(scheduler::Pallet::<Test>::claim_queue_is_empty());

// Nothing is filtered out (including the backed candidates.)
let limit_inherent_data =
Expand Down Expand Up @@ -1818,7 +1818,7 @@ mod sanitizers {
]);

// Update scheduler's claimqueue with the parachains
scheduler::Pallet::<Test>::set_claimqueue(BTreeMap::from([
scheduler::Pallet::<Test>::set_claim_queue(BTreeMap::from([
(
CoreIndex::from(0),
VecDeque::from([ParasEntry::new(
Expand Down Expand Up @@ -2001,7 +2001,7 @@ mod sanitizers {
]);

// Update scheduler's claimqueue with the parachains
scheduler::Pallet::<Test>::set_claimqueue(BTreeMap::from([
scheduler::Pallet::<Test>::set_claim_queue(BTreeMap::from([
(
CoreIndex::from(0),
VecDeque::from([ParasEntry::new(
Expand Down Expand Up @@ -2542,7 +2542,7 @@ mod sanitizers {
]);

// Update scheduler's claimqueue with the parachains
scheduler::Pallet::<Test>::set_claimqueue(BTreeMap::from([
scheduler::Pallet::<Test>::set_claim_queue(BTreeMap::from([
(
CoreIndex::from(0),
VecDeque::from([ParasEntry::new(
Expand Down
4 changes: 2 additions & 2 deletions polkadot/runtime/parachains/src/runtime_api_impl/v10.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub fn availability_cores<T: initializer::Config>() -> Vec<CoreState<T::Hash, Bl
//
// At the end of a session we clear the claim queues: Without this update call, nothing would be
// scheduled to the client.
scheduler::Pallet::<T>::free_cores_and_fill_claimqueue(Vec::new(), now);
scheduler::Pallet::<T>::free_cores_and_fill_claim_queue(Vec::new(), now);

let time_out_for = scheduler::Pallet::<T>::availability_timeout_predicate();

Expand Down Expand Up @@ -305,7 +305,7 @@ pub fn validation_code<T: initializer::Config>(

/// Implementation for the `candidate_pending_availability` function of the runtime API.
#[deprecated(
note = "`candidate_pending_availability` will be removed. Use `candidates_pending_availability` to query
note = "`candidate_pending_availability` will be removed. Use `candidates_pending_availability` to query
all candidates pending availability"
)]
pub fn candidate_pending_availability<T: initializer::Config>(
Expand Down
15 changes: 12 additions & 3 deletions polkadot/runtime/parachains/src/runtime_api_impl/vstaging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

//! Put implementations of functions from staging APIs here.

use crate::{inclusion, initializer, scheduler};
use crate::{configuration, inclusion, initializer, scheduler};
use polkadot_primitives::{CommittedCandidateReceipt, CoreIndex, Id as ParaId};
use sp_runtime::traits::One;
use sp_std::{
Expand All @@ -32,12 +32,21 @@ pub fn claim_queue<T: scheduler::Config>() -> BTreeMap<CoreIndex, VecDeque<ParaI
//
// At the end of a session we clear the claim queues: Without this update call, nothing would be
// scheduled to the client.
<scheduler::Pallet<T>>::free_cores_and_fill_claimqueue(Vec::new(), now);
<scheduler::Pallet<T>>::free_cores_and_fill_claim_queue(Vec::new(), now);
let config = configuration::ActiveConfig::<T>::get();
// Extra sanity, config should already never be smaller than 1:
let n_lookahead = config.scheduler_params.lookahead.max(1);
eskimor marked this conversation as resolved.
Show resolved Hide resolved

scheduler::ClaimQueue::<T>::get()
.into_iter()
.map(|(core_index, entries)| {
(core_index, entries.into_iter().map(|e| e.para_id()).collect())
// on cores timing out internal claim queue size may be temporarily longer than it
eskimor marked this conversation as resolved.
Show resolved Hide resolved
// should be as the timed out assignment might got pushed back to an already full claim
// queue:
(
core_index,
entries.into_iter().map(|e| e.para_id()).take(n_lookahead as usize).collect(),
)
})
.collect()
}
Expand Down
Loading
Loading