Skip to content

Commit

Permalink
impl
Browse files Browse the repository at this point in the history
  • Loading branch information
pugachAG committed Apr 4, 2024
1 parent efd0b83 commit cd40791
Show file tree
Hide file tree
Showing 17 changed files with 250 additions and 279 deletions.
12 changes: 3 additions & 9 deletions chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use near_primitives::shard_layout::{ShardLayout, ShardUId};
use near_primitives::sharding::{ChunkHash, ShardChunkHeader};
use near_primitives::state_part::PartId;
use near_primitives::stateless_validation::{
ChunkEndorsement, ChunkStateWitness, ChunkValidatorAssignments,
ChunkEndorsement, SignedChunkStateWitness, ChunkValidatorAssignments,
};
use near_primitives::transaction::{
Action, ExecutionMetadata, ExecutionOutcome, ExecutionOutcomeWithId, ExecutionStatus,
Expand Down Expand Up @@ -950,14 +950,8 @@ impl EpochManagerAdapter for MockEpochManager {

fn verify_chunk_state_witness_signature(
&self,
_state_witness: &ChunkStateWitness,
) -> Result<bool, Error> {
Ok(true)
}

fn verify_chunk_state_witness_signature_in_epoch(
&self,
_state_witness: &ChunkStateWitness,
_signed_witness: &SignedChunkStateWitness,
_chunk_producer: &AccountId,
_epoch_id: &EpochId,
) -> Result<bool, Error> {
Ok(true)
Expand Down
151 changes: 89 additions & 62 deletions chain/client/src/stateless_validation/chunk_validator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use near_primitives::merkle::merklize;
use near_primitives::receipt::Receipt;
use near_primitives::sharding::{ChunkHash, ReceiptProof, ShardChunkHeader};
use near_primitives::stateless_validation::{
ChunkEndorsement, ChunkStateWitness, ChunkStateWitnessAck, ChunkStateWitnessInner,
ChunkEndorsement, ChunkStateWitnessAck, ChunkStateWitnessInner, SignedChunkStateWitness,
};
use near_primitives::transaction::SignedTransaction;
use near_primitives::types::chunk_extra::ChunkExtra;
Expand Down Expand Up @@ -89,42 +89,29 @@ impl ChunkValidator {
/// you can use the `processing_done_tracker` argument (but it's optional, it's safe to pass None there).
pub fn start_validating_chunk(
&self,
state_witness: ChunkStateWitness,
state_witness: ChunkStateWitnessInner,
chain: &Chain,
processing_done_tracker: Option<ProcessingDoneTracker>,
) -> Result<(), Error> {
if !self.epoch_manager.verify_chunk_state_witness_signature(&state_witness)? {
return Err(Error::InvalidChunkStateWitness("Invalid signature".to_string()));
}

let state_witness_inner = state_witness.inner;
let chunk_header = state_witness_inner.chunk_header.clone();
let Some(my_signer) = self.my_signer.as_ref() else {
return Err(Error::NotAValidator);
};
let epoch_id =
self.epoch_manager.get_epoch_id_from_prev_block(chunk_header.prev_block_hash())?;
// We will only validate something if we are a chunk validator for this chunk.
// Note this also covers the case before the protocol upgrade for chunk validators,
// because the chunk validators will be empty.
let chunk_validator_assignments = self.epoch_manager.get_chunk_validator_assignments(
&epoch_id,
chunk_header.shard_id(),
chunk_header.height_created(),
)?;
if !chunk_validator_assignments.contains(my_signer.validator_id()) {
return Err(Error::NotAChunkValidator);
let prev_block_hash = state_witness.chunk_header.prev_block_hash();
let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(prev_block_hash)?;
if epoch_id != state_witness.epoch_id {
return Err(Error::InvalidChunkStateWitness(format!(
"Invalid EpochId {:?} for previous block {}, expected {:?}",
state_witness.epoch_id, prev_block_hash, epoch_id
)));
}

let pre_validation_result = pre_validate_chunk_state_witness(
&state_witness_inner,
&state_witness,
chain,
self.epoch_manager.as_ref(),
self.runtime_adapter.as_ref(),
)?;

let chunk_header = state_witness.chunk_header.clone();
let network_sender = self.network_sender.clone();
let signer = my_signer.clone();
let signer = self.my_signer.as_ref().ok_or(Error::NotAValidator)?.clone();
let epoch_manager = self.epoch_manager.clone();
let runtime_adapter = self.runtime_adapter.clone();
let chunk_endorsement_tracker = self.chunk_endorsement_tracker.clone();
Expand All @@ -133,7 +120,7 @@ impl ChunkValidator {
let _processing_done_tracker_capture = processing_done_tracker;

match validate_chunk_state_witness(
state_witness_inner,
state_witness,
pre_validation_result,
epoch_manager.as_ref(),
runtime_adapter.as_ref(),
Expand Down Expand Up @@ -634,70 +621,110 @@ impl Client {
/// you can use the `processing_done_tracker` argument (but it's optional, it's safe to pass None there).
pub fn process_chunk_state_witness(
&mut self,
witness: ChunkStateWitness,
signed_witness: SignedChunkStateWitness,
processing_done_tracker: Option<ProcessingDoneTracker>,
) -> Result<(), Error> {
let witness = self.partially_validate_state_witness_in_epoch(&signed_witness)?;

// Send the acknowledgement for the state witness back to the chunk producer.
// This is currently used for network roundtrip time measurement, so we do not need to
// wait for validation to finish.
if let Err(err) = self.send_state_witness_ack(&witness) {
tracing::warn!(target: "stateless_validation", error = &err as &dyn std::error::Error,
"Error sending chunk state witness acknowledgement");
}
self.send_state_witness_ack(&witness);

let prev_block_hash = witness.inner.chunk_header.prev_block_hash();
let prev_block = match self.chain.get_block(prev_block_hash) {
Ok(block) => block,
match self.chain.get_block(witness.chunk_header.prev_block_hash()) {
Ok(block) => self.process_chunk_state_witness_with_prev_block(
witness,
&block,
processing_done_tracker,
),
Err(Error::DBNotFoundErr(_)) => {
// Previous block isn't available at the moment, add this witness to the orphan pool.
self.handle_orphan_state_witness(witness)?;
return Ok(());
self.handle_orphan_state_witness(witness, signed_witness.witness_bytes.size_bytes())?;
Ok(())
}
Err(err) => return Err(err),
};

self.process_chunk_state_witness_with_prev_block(
witness,
&prev_block,
processing_done_tracker,
)
Err(err) => Err(err),
}
}

fn send_state_witness_ack(&self, witness: &ChunkStateWitness) -> Result<(), Error> {
// First find the AccountId for the chunk producer and then send the ack to that account.
let chunk_header = &witness.inner.chunk_header;
let prev_block_hash = chunk_header.prev_block_hash();
let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(prev_block_hash)?;
let chunk_producer = self.epoch_manager.get_chunk_producer(
&epoch_id,
chunk_header.height_created(),
chunk_header.shard_id(),
)?;

fn send_state_witness_ack(&self, witness: &ChunkStateWitnessInner) {
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::ChunkStateWitnessAck(
chunk_producer,
ChunkStateWitnessAck::new(&witness),
witness.chunk_producer.clone(),
ChunkStateWitnessAck::new(witness),
),
));

Ok(())
}

pub fn process_chunk_state_witness_with_prev_block(
&mut self,
witness: ChunkStateWitness,
witness: ChunkStateWitnessInner,
prev_block: &Block,
processing_done_tracker: Option<ProcessingDoneTracker>,
) -> Result<(), Error> {
if witness.inner.chunk_header.prev_block_hash() != prev_block.hash() {
if witness.chunk_header.prev_block_hash() != prev_block.hash() {
return Err(Error::Other(format!(
"process_chunk_state_witness_with_prev_block - prev_block doesn't match ({} != {})",
witness.inner.chunk_header.prev_block_hash(),
witness.chunk_header.prev_block_hash(),
prev_block.hash()
)));
}

self.chunk_validator.start_validating_chunk(witness, &self.chain, processing_done_tracker)
}

fn partially_validate_state_witness_in_epoch(
&self,
signed_witness: &SignedChunkStateWitness,
) -> Result<ChunkStateWitnessInner, Error> {
let witness = signed_witness.witness_bytes.decode()?;
let chunk_header = &witness.chunk_header;
let witness_height = chunk_header.height_created();
let witness_shard = chunk_header.shard_id();

if !self
.epoch_manager
.get_shard_layout(&witness.epoch_id)?
.shard_ids()
.contains(&witness_shard)
{
return Err(Error::InvalidChunkStateWitness(format!(
"Invalid shard_id in ChunkStateWitness: {}",
witness_shard
)));
}

let chunk_producer = self.epoch_manager.get_chunk_producer(
&witness.epoch_id,
witness_height,
witness_shard,
)?;
if witness.chunk_producer != chunk_producer {
return Err(Error::InvalidChunkStateWitness(format!(
"Incorrect chunk producer for epoch {:?} at height {}: expected {}, got {}",
witness.epoch_id, witness_height, chunk_producer, witness.chunk_producer,
)));
}

// Reject witnesses for chunks for which which this node isn't a validator.
// It's an error, as the sender shouldn't send the witness to a non-validator node.
let my_signer = self.chunk_validator.my_signer.as_ref().ok_or(Error::NotAValidator)?;
let chunk_validator_assignments = self.epoch_manager.get_chunk_validator_assignments(
&witness.epoch_id,
witness_shard,
witness_height,
)?;
if !chunk_validator_assignments.contains(my_signer.validator_id()) {
return Err(Error::NotAChunkValidator);
}

if !self.epoch_manager.verify_chunk_state_witness_signature(
&signed_witness,
&witness.chunk_producer,
&witness.epoch_id,
)? {
return Err(Error::InvalidChunkStateWitness("Invalid signature".to_string()));
}

Ok(witness)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@
//! arrives, all witnesses that were waiting for it can be processed.

use crate::Client;
use itertools::Itertools;
use near_chain::Block;
use near_chain_primitives::Error;
use near_primitives::stateless_validation::ChunkStateWitness;
use near_primitives::stateless_validation::ChunkStateWitnessInner;
use near_primitives::types::{BlockHeight, EpochId};
use std::ops::Range;

Expand All @@ -23,9 +22,10 @@ pub const ALLOWED_ORPHAN_WITNESS_DISTANCE_FROM_HEAD: Range<BlockHeight> = 2..6;
impl Client {
pub fn handle_orphan_state_witness(
&mut self,
witness: ChunkStateWitness,
witness: ChunkStateWitnessInner,
witness_size: usize,
) -> Result<HandleOrphanWitnessOutcome, Error> {
let chunk_header = &witness.inner.chunk_header;
let chunk_header = &witness.chunk_header;
let witness_height = chunk_header.height_created();
let witness_shard = chunk_header.shard_id();

Expand Down Expand Up @@ -53,7 +53,6 @@ impl Client {
}

// Don't save orphaned state witnesses which are bigger than the allowed limit.
let witness_size = borsh::to_vec(&witness)?.len();
let witness_size_u64: u64 = witness_size.try_into().map_err(|_| {
Error::Other(format!("Cannot convert witness size to u64: {}", witness_size))
})?;
Expand All @@ -76,32 +75,9 @@ impl Client {
// Let's collect the epoch_ids in which the witness might possibly be.
let possible_epochs =
self.epoch_manager.possible_epochs_of_height_around_tip(&chain_head, witness_height)?;

// Try to validate the witness assuming that it resides in one of the possible epochs.
// The witness must pass validation in one of these epochs before it can be admitted to the pool.
let mut epoch_validation_result: Option<Result<(), Error>> = None;
for epoch_id in possible_epochs {
match self.partially_validate_orphan_witness_in_epoch(&witness, &epoch_id) {
Ok(()) => {
epoch_validation_result = Some(Ok(()));
break;
}
Err(err) => epoch_validation_result = Some(Err(err)),
}
}
match epoch_validation_result {
Some(Ok(())) => {} // Validation passed in one of the possible epochs, witness can be added to the pool.
Some(Err(err)) => {
// Validation failed in all possible epochs, reject the witness
return Err(err);
}
None => {
// possible_epochs was empty. This shouldn't happen as all epochs around the chain head are known.
return Err(Error::Other(format!(
"Couldn't find any matching EpochId for orphan chunk state witness with height {}",
witness_height
)));
}

if possible_epochs.contains(&witness.epoch_id) {
return Ok(HandleOrphanWitnessOutcome::UnsupportedEpochId(witness.epoch_id));
}

// Orphan witness is OK, save it to the pool
Expand All @@ -110,45 +86,6 @@ impl Client {
Ok(HandleOrphanWitnessOutcome::SavedToPool)
}

fn partially_validate_orphan_witness_in_epoch(
&self,
witness: &ChunkStateWitness,
epoch_id: &EpochId,
) -> Result<(), Error> {
let chunk_header = &witness.inner.chunk_header;
let witness_height = chunk_header.height_created();
let witness_shard = chunk_header.shard_id();

// Validate shard_id
if !self.epoch_manager.get_shard_layout(&epoch_id)?.shard_ids().contains(&witness_shard) {
return Err(Error::InvalidChunkStateWitness(format!(
"Invalid shard_id in ChunkStateWitness: {}",
witness_shard
)));
}

// Reject witnesses for chunks for which which this node isn't a validator.
// It's an error, as the sender shouldn't send the witness to a non-validator node.
let Some(my_signer) = self.chunk_validator.my_signer.as_ref() else {
return Err(Error::NotAValidator);
};
let chunk_validator_assignments = self.epoch_manager.get_chunk_validator_assignments(
&epoch_id,
witness_shard,
witness_height,
)?;
if !chunk_validator_assignments.contains(my_signer.validator_id()) {
return Err(Error::NotAChunkValidator);
}

// Verify signature
if !self.epoch_manager.verify_chunk_state_witness_signature_in_epoch(&witness, &epoch_id)? {
return Err(Error::InvalidChunkStateWitness("Invalid signature".to_string()));
}

Ok(())
}

/// Once a new block arrives, we can process the orphaned chunk state witnesses that were waiting
/// for this block. This function takes the ready witnesses out of the orhan pool and process them.
/// It also removes old witnesses (below final height) from the orphan pool to save memory.
Expand All @@ -158,7 +95,7 @@ impl Client {
.orphan_witness_pool
.take_state_witnesses_waiting_for_block(new_block.hash());
for witness in ready_witnesses {
let header = &witness.inner.chunk_header;
let header = &witness.chunk_header;
tracing::debug!(
target: "client",
witness_height = header.height_created(),
Expand Down Expand Up @@ -201,9 +138,10 @@ impl Client {
/// of other reasons. In such cases the handler function returns Ok(outcome) to let the caller
/// know what happened with the witness.
/// It's useful in tests.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum HandleOrphanWitnessOutcome {
SavedToPool,
TooBig(usize),
TooFarFromHead { head_height: BlockHeight, witness_height: BlockHeight },
UnsupportedEpochId(EpochId),
}
Loading

0 comments on commit cd40791

Please sign in to comment.