Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Provisioner changes for async backing
Browse files Browse the repository at this point in the history
  • Loading branch information
slumber committed Jun 22, 2022
1 parent 36268ec commit fc07dd2
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 42 deletions.
3 changes: 3 additions & 0 deletions node/core/provisioner/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ pub enum Error {
#[error("failed to get votes on dispute")]
CanceledCandidateVotes(#[source] oneshot::Canceled),

#[error("failed to get backable candidate")]
CanceledProspectiveCandidateChild(#[source] oneshot::Canceled),

#[error(transparent)]
ChainApi(#[from] ChainApiError),

Expand Down
114 changes: 72 additions & 42 deletions node/core/provisioner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,26 @@

use bitvec::vec::BitVec;
use futures::{
channel::oneshot, future::BoxFuture, prelude::*, stream::FuturesUnordered, FutureExt,
channel::oneshot, future::BoxFuture, lock::Mutex, prelude::*, stream::FuturesUnordered,
FutureExt,
};
use futures_timer::Delay;

use polkadot_node_primitives::CandidateVotes;
use polkadot_node_subsystem::{
jaeger,
messages::{
CandidateBackingMessage, ChainApiMessage, DisputeCoordinatorMessage, ProvisionableData,
ProvisionerInherentData, ProvisionerMessage,
CandidateBackingMessage, ChainApiMessage, DisputeCoordinatorMessage,
ProspectiveParachainsMessage, ProvisionableData, ProvisionerInherentData,
ProvisionerMessage,
},
overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, LeafStatus, OverseerSignal,
PerLeafSpan, SpawnedSubsystem, SubsystemError,
};
use polkadot_node_subsystem_util::{request_availability_cores, request_persisted_validation_data};
use polkadot_node_subsystem_util::request_availability_cores;
use polkadot_primitives::v2::{
BackedCandidate, BlockNumber, CandidateHash, CandidateReceipt, CoreState, DisputeState,
DisputeStatement, DisputeStatementSet, Hash, MultiDisputeStatementSet, OccupiedCoreAssumption,
DisputeStatement, DisputeStatementSet, Hash, Id as ParaId, MultiDisputeStatementSet,
SessionIndex, SignedAvailabilityBitfield, ValidatorIndex,
};
use std::collections::{BTreeMap, HashMap, HashSet};
Expand Down Expand Up @@ -432,6 +434,12 @@ async fn select_candidates(
) -> Result<Vec<BackedCandidate>, Error> {
let block_number = get_block_number_under_construction(relay_parent, sender).await?;

// TODO [now]: `candidates` is unused since we request from prospective paras
// based on core state and bitfields. Should check whether returned hash is known?
// Does it make sense to track candidates at all?

// Wrapped sender is shared among concurrent prospective parachains requests.
let wrapped_sender = Mutex::new(sender);
let mut selected_candidates =
Vec::with_capacity(candidates.len().min(availability_cores.len()));

Expand All @@ -443,13 +451,23 @@ async fn select_candidates(
);

for (core_idx, core) in availability_cores.iter().enumerate() {
let (scheduled_core, assumption) = match core {
CoreState::Scheduled(scheduled_core) => (scheduled_core, OccupiedCoreAssumption::Free),
let (para_id, required_path) = match core {
CoreState::Scheduled(scheduled_core) => {
// The core is free, pick the first eligible candidate from
// the fragment tree.
(scheduled_core.para_id, Vec::new())
},
CoreState::Occupied(occupied_core) => {
if bitfields_indicate_availability(core_idx, bitfields, &occupied_core.availability)
{
if let Some(ref scheduled_core) = occupied_core.next_up_on_available {
(scheduled_core, OccupiedCoreAssumption::Included)
// The candidate occupying the core is available, choose its
// child in the fragment tree.
//
// TODO [now]: doesn't work for parathreads.
//
// ?? match scheduled para id == candidate_para_id?
(scheduled_core.para_id, vec![occupied_core.candidate_hash])
} else {
continue
}
Expand All @@ -458,7 +476,8 @@ async fn select_candidates(
continue
}
if let Some(ref scheduled_core) = occupied_core.next_up_on_time_out {
(scheduled_core, OccupiedCoreAssumption::TimedOut)
// Candidate's availability timed out, practically same as scheduled.
(scheduled_core.para_id, Vec::new())
} else {
continue
}
Expand All @@ -467,47 +486,35 @@ async fn select_candidates(
CoreState::Free => continue,
};

let validation_data = match request_persisted_validation_data(
relay_parent,
scheduled_core.para_id,
assumption,
sender,
)
.await
.await
.map_err(|err| Error::CanceledPersistedValidationData(err))??
{
Some(v) => v,
None => continue,
};
let fut = get_backable_candidate(relay_parent, para_id, required_path, &wrapped_sender);

let computed_validation_data_hash = validation_data.hash();
selected_candidates.push(fut);
}

// we arbitrarily pick the first of the backed candidates which match the appropriate selection criteria
if let Some(candidate) = candidates.iter().find(|backed_candidate| {
let descriptor = &backed_candidate.descriptor;
descriptor.para_id == scheduled_core.para_id &&
descriptor.persisted_validation_data_hash == computed_validation_data_hash
}) {
let candidate_hash = candidate.hash();
gum::trace!(
target: LOG_TARGET,
leaf_hash=?relay_parent,
?candidate_hash,
para = ?candidate.descriptor.para_id,
core = core_idx,
"Selected candidate receipt",
);
let selected_candidates = futures::future::try_join_all(selected_candidates).await?;
let mut selected = Vec::with_capacity(selected_candidates.len());

selected_candidates.push(candidate_hash);
for (core_idx, candidate_hash) in selected_candidates.into_iter().enumerate() {
match candidate_hash {
Some(hash) => selected.push(hash),
None => {
gum::debug!(
target: LOG_TARGET,
leaf_hash = ?relay_parent,
core = core_idx,
"No backable candidate returned by prospective parachains",
);
},
}
}

let sender = wrapped_sender.into_inner();

// now get the backed candidates corresponding to these candidate receipts
let (tx, rx) = oneshot::channel();
sender.send_unbounded_message(CandidateBackingMessage::GetBackedCandidates(
relay_parent,
selected_candidates.clone(),
selected.clone(),
tx,
));
let mut candidates = rx.await.map_err(|err| Error::CanceledBackedCandidates(err))?;
Expand All @@ -519,8 +526,8 @@ async fn select_candidates(
// maps to either 0 or 1 backed candidate, and the hashes correspond. Therefore, by checking them
// in order, we can ensure that the backed candidates are also in order.
let mut backed_idx = 0;
for selected in selected_candidates {
if selected ==
for selected_candidate in selected {
if selected_candidate ==
candidates.get(backed_idx).ok_or(Error::BackedCandidateOrderingProblem)?.hash()
{
backed_idx += 1;
Expand Down Expand Up @@ -571,6 +578,29 @@ async fn get_block_number_under_construction(
}
}

/// Requests backable candidate from Prospective Parachains based on
/// the given path in the fragment tree.
async fn get_backable_candidate(
relay_parent: Hash,
para_id: ParaId,
required_path: Vec<CandidateHash>,
sender: &Mutex<&mut impl overseer::ProvisionerSenderTrait>,
) -> Result<Option<CandidateHash>, Error> {
let (tx, rx) = oneshot::channel();
sender
.lock()
.await
.send_message(ProspectiveParachainsMessage::GetBackableCandidate(
relay_parent,
para_id,
required_path,
tx,
))
.await;

rx.await.map_err(Error::CanceledProspectiveCandidateChild)
}

/// The availability bitfield for a given core is the transpose
/// of a set of signed availability bitfields. It goes like this:
///
Expand Down
1 change: 1 addition & 0 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ pub struct Overseer<SupportsParachains> {
CandidateBackingMessage,
ChainApiMessage,
DisputeCoordinatorMessage,
ProspectiveParachainsMessage,
])]
provisioner: Provisioner,

Expand Down

0 comments on commit fc07dd2

Please sign in to comment.