Skip to content

Commit

Permalink
PVF: drop backing jobs if it is too late (#5616)
Browse files Browse the repository at this point in the history
Fixes #5530

This PR introduces the removal of backing jobs that have been back
pressured for longer than `allowedAncestryLen`, as these candidates are
no longer viable.

It is reasonable to expect a result for a backing job execution within
`allowedAncestryLen` blocks. Therefore, we set the job TTL as a relay
block number and synchronize the validation host by sending activated
leaves.

---------

Co-authored-by: Andrei Sandu <[email protected]>
Co-authored-by: Branislav Kontur <[email protected]>
  • Loading branch information
3 people authored Nov 7, 2024
1 parent bb8c7a3 commit 6c8a347
Show file tree
Hide file tree
Showing 20 changed files with 513 additions and 129 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions polkadot/node/core/backing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ async fn request_candidate_validation(
) -> Result<ValidationResult, Error> {
let (tx, rx) = oneshot::channel();
let is_system = candidate_receipt.descriptor.para_id().is_system();
let relay_parent = candidate_receipt.descriptor.relay_parent();

sender
.send_message(CandidateValidationMessage::ValidateFromExhaustive {
Expand All @@ -641,9 +642,9 @@ async fn request_candidate_validation(
pov,
executor_params,
exec_kind: if is_system {
PvfExecKind::BackingSystemParas
PvfExecKind::BackingSystemParas(relay_parent)
} else {
PvfExecKind::Backing
PvfExecKind::Backing(relay_parent)
},
response_sender: tx,
})
Expand Down
20 changes: 10 additions & 10 deletions polkadot/node/core/backing/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ async fn assert_validate_from_exhaustive(
) if validation_data == *assert_pvd &&
validation_code == *assert_validation_code &&
*pov == *assert_pov && candidate_receipt.descriptor == assert_candidate.descriptor &&
exec_kind == PvfExecKind::BackingSystemParas &&
matches!(exec_kind, PvfExecKind::BackingSystemParas(_)) &&
candidate_receipt.commitments_hash == assert_candidate.commitments.hash() =>
{
response_sender.send(Ok(ValidationResult::Valid(
Expand Down Expand Up @@ -652,7 +652,7 @@ fn backing_works(#[case] elastic_scaling_mvp: bool) {
) if validation_data == pvd_ab &&
validation_code == validation_code_ab &&
*pov == pov_ab && candidate_receipt.descriptor == candidate_a.descriptor &&
exec_kind == PvfExecKind::BackingSystemParas &&
matches!(exec_kind, PvfExecKind::BackingSystemParas(_)) &&
candidate_receipt.commitments_hash == candidate_a_commitments_hash =>
{
response_sender.send(Ok(
Expand Down Expand Up @@ -1288,7 +1288,7 @@ fn backing_works_while_validation_ongoing() {
) if validation_data == pvd_abc &&
validation_code == validation_code_abc &&
*pov == pov_abc && candidate_receipt.descriptor == candidate_a.descriptor &&
exec_kind == PvfExecKind::BackingSystemParas &&
matches!(exec_kind, PvfExecKind::BackingSystemParas(_)) &&
candidate_a_commitments_hash == candidate_receipt.commitments_hash =>
{
// we never validate the candidate. our local node
Expand Down Expand Up @@ -1455,7 +1455,7 @@ fn backing_misbehavior_works() {
) if validation_data == pvd_a &&
validation_code == validation_code_a &&
*pov == pov_a && candidate_receipt.descriptor == candidate_a.descriptor &&
exec_kind == PvfExecKind::BackingSystemParas &&
matches!(exec_kind, PvfExecKind::BackingSystemParas(_)) &&
candidate_a_commitments_hash == candidate_receipt.commitments_hash =>
{
response_sender.send(Ok(
Expand Down Expand Up @@ -1622,7 +1622,7 @@ fn backing_dont_second_invalid() {
) if validation_data == pvd_a &&
validation_code == validation_code_a &&
*pov == pov_block_a && candidate_receipt.descriptor == candidate_a.descriptor &&
exec_kind == PvfExecKind::BackingSystemParas &&
matches!(exec_kind, PvfExecKind::BackingSystemParas(_)) &&
candidate_a.commitments.hash() == candidate_receipt.commitments_hash =>
{
response_sender.send(Ok(ValidationResult::Invalid(InvalidCandidate::BadReturn))).unwrap();
Expand Down Expand Up @@ -1662,7 +1662,7 @@ fn backing_dont_second_invalid() {
) if validation_data == pvd_b &&
validation_code == validation_code_b &&
*pov == pov_block_b && candidate_receipt.descriptor == candidate_b.descriptor &&
exec_kind == PvfExecKind::BackingSystemParas &&
matches!(exec_kind, PvfExecKind::BackingSystemParas(_)) &&
candidate_b.commitments.hash() == candidate_receipt.commitments_hash =>
{
response_sender.send(Ok(
Expand Down Expand Up @@ -1789,7 +1789,7 @@ fn backing_second_after_first_fails_works() {
) if validation_data == pvd_a &&
validation_code == validation_code_a &&
*pov == pov_a && candidate_receipt.descriptor == candidate.descriptor &&
exec_kind == PvfExecKind::BackingSystemParas &&
matches!(exec_kind, PvfExecKind::BackingSystemParas(_)) &&
candidate.commitments.hash() == candidate_receipt.commitments_hash =>
{
response_sender.send(Ok(ValidationResult::Invalid(InvalidCandidate::BadReturn))).unwrap();
Expand Down Expand Up @@ -1933,7 +1933,7 @@ fn backing_works_after_failed_validation() {
) if validation_data == pvd_a &&
validation_code == validation_code_a &&
*pov == pov_a && candidate_receipt.descriptor == candidate.descriptor &&
exec_kind == PvfExecKind::BackingSystemParas &&
matches!(exec_kind, PvfExecKind::BackingSystemParas(_)) &&
candidate.commitments.hash() == candidate_receipt.commitments_hash =>
{
response_sender.send(Err(ValidationFailed("Internal test error".into()))).unwrap();
Expand Down Expand Up @@ -2212,7 +2212,7 @@ fn retry_works() {
) if validation_data == pvd_a &&
validation_code == validation_code_a &&
*pov == pov_a && candidate_receipt.descriptor == candidate.descriptor &&
exec_kind == PvfExecKind::BackingSystemParas &&
matches!(exec_kind, PvfExecKind::BackingSystemParas(_)) &&
candidate.commitments.hash() == candidate_receipt.commitments_hash
);
virtual_overseer
Expand Down Expand Up @@ -2754,7 +2754,7 @@ fn validator_ignores_statements_from_disabled_validators() {
) if validation_data == pvd &&
validation_code == expected_validation_code &&
*pov == expected_pov && candidate_receipt.descriptor == candidate.descriptor &&
exec_kind == PvfExecKind::BackingSystemParas &&
matches!(exec_kind, PvfExecKind::BackingSystemParas(_)) &&
candidate_commitments_hash == candidate_receipt.commitments_hash =>
{
response_sender.send(Ok(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ async fn assert_validate_seconded_candidate(
&validation_code == assert_validation_code &&
&*pov == assert_pov &&
candidate_receipt.descriptor == candidate.descriptor &&
exec_kind == PvfExecKind::BackingSystemParas &&
matches!(exec_kind, PvfExecKind::BackingSystemParas(_)) &&
candidate.commitments.hash() == candidate_receipt.commitments_hash =>
{
response_sender.send(Ok(ValidationResult::Valid(
Expand Down
106 changes: 99 additions & 7 deletions polkadot/node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@ use polkadot_node_primitives::{InvalidCandidate, PoV, ValidationResult};
use polkadot_node_subsystem::{
errors::RuntimeApiError,
messages::{
CandidateValidationMessage, PreCheckOutcome, PvfExecKind, RuntimeApiMessage,
RuntimeApiRequest, ValidationFailed,
CandidateValidationMessage, ChainApiMessage, PreCheckOutcome, PvfExecKind,
RuntimeApiMessage, RuntimeApiRequest, ValidationFailed,
},
overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, SubsystemResult,
SubsystemSender,
};
use polkadot_node_subsystem_util::{self as util, runtime::ClaimQueueSnapshot};
use polkadot_node_subsystem_util::{
self as util,
runtime::{prospective_parachains_mode, ClaimQueueSnapshot, ProspectiveParachainsMode},
};
use polkadot_overseer::ActiveLeavesUpdate;
use polkadot_parachain_primitives::primitives::ValidationResult as WasmValidationResult;
use polkadot_primitives::{
Expand Down Expand Up @@ -279,6 +282,7 @@ async fn run<Context>(
comm = ctx.recv().fuse() => {
match comm {
Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update))) => {
update_active_leaves(ctx.sender(), validation_host.clone(), update.clone()).await;
maybe_prepare_validation(ctx.sender(), keystore.clone(), validation_host.clone(), update, &mut prepare_state).await;
},
Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => {},
Expand Down Expand Up @@ -551,6 +555,66 @@ where
Some(processed_code_hashes)
}

async fn update_active_leaves<Sender>(
sender: &mut Sender,
mut validation_backend: impl ValidationBackend,
update: ActiveLeavesUpdate,
) where
Sender: SubsystemSender<ChainApiMessage> + SubsystemSender<RuntimeApiMessage>,
{
let ancestors = get_block_ancestors(sender, update.activated.as_ref().map(|x| x.hash)).await;
if let Err(err) = validation_backend.update_active_leaves(update, ancestors).await {
gum::warn!(
target: LOG_TARGET,
?err,
"cannot update active leaves in validation backend",
);
};
}

async fn get_allowed_ancestry_len<Sender>(sender: &mut Sender, relay_parent: Hash) -> Option<usize>
where
Sender: SubsystemSender<ChainApiMessage> + SubsystemSender<RuntimeApiMessage>,
{
match prospective_parachains_mode(sender, relay_parent).await {
Ok(ProspectiveParachainsMode::Enabled { allowed_ancestry_len, .. }) =>
Some(allowed_ancestry_len),
res => {
gum::warn!(target: LOG_TARGET, ?res, "async backing is disabled");
None
},
}
}

async fn get_block_ancestors<Sender>(
sender: &mut Sender,
maybe_relay_parent: Option<Hash>,
) -> Vec<Hash>
where
Sender: SubsystemSender<ChainApiMessage> + SubsystemSender<RuntimeApiMessage>,
{
let Some(relay_parent) = maybe_relay_parent else { return vec![] };
let Some(allowed_ancestry_len) = get_allowed_ancestry_len(sender, relay_parent).await else {
return vec![]
};

let (tx, rx) = oneshot::channel();
sender
.send_message(ChainApiMessage::Ancestors {
hash: relay_parent,
k: allowed_ancestry_len,
response_channel: tx,
})
.await;
match rx.await {
Ok(Ok(x)) => x,
res => {
gum::warn!(target: LOG_TARGET, ?res, "cannot request ancestors");
vec![]
},
}
}

struct RuntimeRequestFailed;

async fn runtime_api_request<T, Sender>(
Expand Down Expand Up @@ -698,7 +762,7 @@ async fn validate_candidate_exhaustive(

// We only check the session index for backing.
match (exec_kind, candidate_receipt.descriptor.session_index()) {
(PvfExecKind::Backing | PvfExecKind::BackingSystemParas, Some(session_index)) => {
(PvfExecKind::Backing(_) | PvfExecKind::BackingSystemParas(_), Some(session_index)) => {
let Some(expected_session_index) = maybe_expected_session_index else {
let error = "cannot fetch session index from the runtime";
gum::warn!(
Expand Down Expand Up @@ -731,7 +795,7 @@ async fn validate_candidate_exhaustive(
let result = match exec_kind {
// Retry is disabled to reduce the chance of nondeterministic blocks getting backed and
// honest backers getting slashed.
PvfExecKind::Backing | PvfExecKind::BackingSystemParas => {
PvfExecKind::Backing(_) | PvfExecKind::BackingSystemParas(_) => {
let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare);
let exec_timeout = pvf_exec_timeout(&executor_params, exec_kind.into());
let pvf = PvfPrepData::from_code(
Expand Down Expand Up @@ -809,6 +873,15 @@ async fn validate_candidate_exhaustive(
);
Err(ValidationFailed(e.to_string()))
},
Err(e @ ValidationError::ExecutionDeadline) => {
gum::warn!(
target: LOG_TARGET,
?para_id,
?e,
"Job assigned too late, execution queue probably overloaded",
);
Err(ValidationFailed(e.to_string()))
},
Ok(res) =>
if res.head_data.hash() != candidate_receipt.descriptor.para_head() {
gum::info!(target: LOG_TARGET, ?para_id, "Invalid candidate (para_head)");
Expand Down Expand Up @@ -846,7 +919,7 @@ async fn validate_candidate_exhaustive(
// descriptor core index.
(
Some(_core_index),
PvfExecKind::Backing | PvfExecKind::BackingSystemParas,
PvfExecKind::Backing(_) | PvfExecKind::BackingSystemParas(_),
) => {
let Some(claim_queue) = maybe_claim_queue else {
let error = "cannot fetch the claim queue from the runtime";
Expand Down Expand Up @@ -994,7 +1067,12 @@ trait ValidationBackend {
retry_immediately = true;
},

Ok(_) | Err(ValidationError::Invalid(_) | ValidationError::Preparation(_)) => break,
Ok(_) |
Err(
ValidationError::Invalid(_) |
ValidationError::Preparation(_) |
ValidationError::ExecutionDeadline,
) => break,
}

// If we got a possibly transient error, retry once after a brief delay, on the
Expand Down Expand Up @@ -1035,6 +1113,12 @@ trait ValidationBackend {
async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result<(), PrepareError>;

async fn heads_up(&mut self, active_pvfs: Vec<PvfPrepData>) -> Result<(), String>;

async fn update_active_leaves(
&mut self,
update: ActiveLeavesUpdate,
ancestors: Vec<Hash>,
) -> Result<(), String>;
}

#[async_trait]
Expand Down Expand Up @@ -1085,6 +1169,14 @@ impl ValidationBackend for ValidationHost {
async fn heads_up(&mut self, active_pvfs: Vec<PvfPrepData>) -> Result<(), String> {
self.heads_up(active_pvfs).await
}

async fn update_active_leaves(
&mut self,
update: ActiveLeavesUpdate,
ancestors: Vec<Hash>,
) -> Result<(), String> {
self.update_active_leaves(update, ancestors).await
}
}

/// Does basic checks of a candidate. Provide the encoded PoV-block. Returns `Ok` if basic checks
Expand Down
Loading

0 comments on commit 6c8a347

Please sign in to comment.