-
Notifications
You must be signed in to change notification settings - Fork 622
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: state witness struct #10934
Changes from all commits
46a410a
e1a843f
6c7325a
017902d
8a51329
680d7ef
d2c7a99
815be9d
d9c170c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -27,7 +27,7 @@ use near_primitives::merkle::merklize; | |||
use near_primitives::receipt::Receipt; | ||||
use near_primitives::sharding::{ChunkHash, ReceiptProof, ShardChunkHeader}; | ||||
use near_primitives::stateless_validation::{ | ||||
ChunkEndorsement, ChunkStateWitness, ChunkStateWitnessAck, ChunkStateWitnessInner, | ||||
ChunkEndorsement, ChunkStateWitness, ChunkStateWitnessAck, SignedEncodedChunkStateWitness, | ||||
}; | ||||
use near_primitives::transaction::SignedTransaction; | ||||
use near_primitives::types::chunk_extra::ChunkExtra; | ||||
|
@@ -93,38 +93,25 @@ impl ChunkValidator { | |||
chain: &Chain, | ||||
processing_done_tracker: Option<ProcessingDoneTracker>, | ||||
) -> Result<(), Error> { | ||||
if !self.epoch_manager.verify_chunk_state_witness_signature(&state_witness)? { | ||||
return Err(Error::InvalidChunkStateWitness("Invalid signature".to_string())); | ||||
} | ||||
|
||||
let state_witness_inner = state_witness.inner; | ||||
let chunk_header = state_witness_inner.chunk_header.clone(); | ||||
let Some(my_signer) = self.my_signer.as_ref() else { | ||||
return Err(Error::NotAValidator); | ||||
}; | ||||
let epoch_id = | ||||
self.epoch_manager.get_epoch_id_from_prev_block(chunk_header.prev_block_hash())?; | ||||
// We will only validate something if we are a chunk validator for this chunk. | ||||
// Note this also covers the case before the protocol upgrade for chunk validators, | ||||
// because the chunk validators will be empty. | ||||
let chunk_validator_assignments = self.epoch_manager.get_chunk_validator_assignments( | ||||
&epoch_id, | ||||
chunk_header.shard_id(), | ||||
chunk_header.height_created(), | ||||
)?; | ||||
if !chunk_validator_assignments.contains(my_signer.validator_id()) { | ||||
return Err(Error::NotAChunkValidator); | ||||
let prev_block_hash = state_witness.chunk_header.prev_block_hash(); | ||||
let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(prev_block_hash)?; | ||||
if epoch_id != state_witness.epoch_id { | ||||
return Err(Error::InvalidChunkStateWitness(format!( | ||||
"Invalid EpochId {:?} for previous block {}, expected {:?}", | ||||
state_witness.epoch_id, prev_block_hash, epoch_id | ||||
))); | ||||
} | ||||
|
||||
let pre_validation_result = pre_validate_chunk_state_witness( | ||||
&state_witness_inner, | ||||
&state_witness, | ||||
chain, | ||||
self.epoch_manager.as_ref(), | ||||
self.runtime_adapter.as_ref(), | ||||
)?; | ||||
|
||||
let chunk_header = state_witness.chunk_header.clone(); | ||||
let network_sender = self.network_sender.clone(); | ||||
let signer = my_signer.clone(); | ||||
let signer = self.my_signer.as_ref().ok_or(Error::NotAValidator)?.clone(); | ||||
let epoch_manager = self.epoch_manager.clone(); | ||||
let runtime_adapter = self.runtime_adapter.clone(); | ||||
let chunk_endorsement_tracker = self.chunk_endorsement_tracker.clone(); | ||||
|
@@ -133,7 +120,7 @@ impl ChunkValidator { | |||
let _processing_done_tracker_capture = processing_done_tracker; | ||||
|
||||
match validate_chunk_state_witness( | ||||
state_witness_inner, | ||||
state_witness, | ||||
pre_validation_result, | ||||
epoch_manager.as_ref(), | ||||
runtime_adapter.as_ref(), | ||||
|
@@ -182,7 +169,7 @@ pub(crate) fn validate_prepared_transactions( | |||
/// We do this before handing off the computationally intensive part to a | ||||
/// validation thread. | ||||
pub(crate) fn pre_validate_chunk_state_witness( | ||||
state_witness: &ChunkStateWitnessInner, | ||||
state_witness: &ChunkStateWitness, | ||||
chain: &Chain, | ||||
epoch_manager: &dyn EpochManagerAdapter, | ||||
runtime_adapter: &dyn RuntimeAdapter, | ||||
|
@@ -467,7 +454,7 @@ pub(crate) struct PreValidationOutput { | |||
} | ||||
|
||||
pub(crate) fn validate_chunk_state_witness( | ||||
state_witness: ChunkStateWitnessInner, | ||||
state_witness: ChunkStateWitness, | ||||
pre_validation_output: PreValidationOutput, | ||||
epoch_manager: &dyn EpochManagerAdapter, | ||||
runtime_adapter: &dyn RuntimeAdapter, | ||||
|
@@ -634,54 +621,41 @@ impl Client { | |||
/// you can use the `processing_done_tracker` argument (but it's optional, it's safe to pass None there). | ||||
pub fn process_chunk_state_witness( | ||||
&mut self, | ||||
witness: ChunkStateWitness, | ||||
signed_witness: SignedEncodedChunkStateWitness, | ||||
processing_done_tracker: Option<ProcessingDoneTracker>, | ||||
) -> Result<(), Error> { | ||||
let witness = self.partially_validate_state_witness(&signed_witness)?; | ||||
|
||||
// Send the acknowledgement for the state witness back to the chunk producer. | ||||
// This is currently used for network roundtrip time measurement, so we do not need to | ||||
// wait for validation to finish. | ||||
if let Err(err) = self.send_state_witness_ack(&witness) { | ||||
tracing::warn!(target: "stateless_validation", error = &err as &dyn std::error::Error, | ||||
"Error sending chunk state witness acknowledgement"); | ||||
} | ||||
self.send_state_witness_ack(&witness); | ||||
|
||||
let prev_block_hash = witness.inner.chunk_header.prev_block_hash(); | ||||
let prev_block = match self.chain.get_block(prev_block_hash) { | ||||
Ok(block) => block, | ||||
match self.chain.get_block(witness.chunk_header.prev_block_hash()) { | ||||
Ok(block) => self.process_chunk_state_witness_with_prev_block( | ||||
witness, | ||||
&block, | ||||
processing_done_tracker, | ||||
), | ||||
Err(Error::DBNotFoundErr(_)) => { | ||||
// Previous block isn't available at the moment, add this witness to the orphan pool. | ||||
self.handle_orphan_state_witness(witness)?; | ||||
return Ok(()); | ||||
self.handle_orphan_state_witness( | ||||
witness, | ||||
signed_witness.witness_bytes.size_bytes(), | ||||
)?; | ||||
Ok(()) | ||||
} | ||||
Err(err) => return Err(err), | ||||
}; | ||||
|
||||
self.process_chunk_state_witness_with_prev_block( | ||||
witness, | ||||
&prev_block, | ||||
processing_done_tracker, | ||||
) | ||||
Err(err) => Err(err), | ||||
} | ||||
} | ||||
|
||||
fn send_state_witness_ack(&self, witness: &ChunkStateWitness) -> Result<(), Error> { | ||||
// First find the AccountId for the chunk producer and then send the ack to that account. | ||||
let chunk_header = &witness.inner.chunk_header; | ||||
let prev_block_hash = chunk_header.prev_block_hash(); | ||||
let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(prev_block_hash)?; | ||||
let chunk_producer = self.epoch_manager.get_chunk_producer( | ||||
&epoch_id, | ||||
chunk_header.height_created(), | ||||
chunk_header.shard_id(), | ||||
)?; | ||||
|
||||
fn send_state_witness_ack(&self, witness: &ChunkStateWitness) { | ||||
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( | ||||
NetworkRequests::ChunkStateWitnessAck( | ||||
chunk_producer, | ||||
ChunkStateWitnessAck::new(&witness), | ||||
witness.chunk_producer.clone(), | ||||
ChunkStateWitnessAck::new(witness), | ||||
), | ||||
)); | ||||
Comment on lines
-667
to
658
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I agree that it is redundant, but it makes code a bit simpler since it is directly available as part of |
||||
|
||||
Ok(()) | ||||
} | ||||
|
||||
pub fn process_chunk_state_witness_with_prev_block( | ||||
|
@@ -690,14 +664,73 @@ impl Client { | |||
prev_block: &Block, | ||||
processing_done_tracker: Option<ProcessingDoneTracker>, | ||||
) -> Result<(), Error> { | ||||
if witness.inner.chunk_header.prev_block_hash() != prev_block.hash() { | ||||
if witness.chunk_header.prev_block_hash() != prev_block.hash() { | ||||
return Err(Error::Other(format!( | ||||
"process_chunk_state_witness_with_prev_block - prev_block doesn't match ({} != {})", | ||||
witness.inner.chunk_header.prev_block_hash(), | ||||
witness.chunk_header.prev_block_hash(), | ||||
prev_block.hash() | ||||
))); | ||||
} | ||||
|
||||
self.chunk_validator.start_validating_chunk(witness, &self.chain, processing_done_tracker) | ||||
} | ||||
|
||||
/// Performs state witness decoding and partial validation without requiring the previous block. | ||||
/// Here we rely on epoch_id provided as part of the state witness. Later we verify that this | ||||
/// epoch_id actually corresponds to the chunk's previous block. | ||||
fn partially_validate_state_witness( | ||||
&self, | ||||
signed_witness: &SignedEncodedChunkStateWitness, | ||||
) -> Result<ChunkStateWitness, Error> { | ||||
let witness = signed_witness.witness_bytes.decode()?; | ||||
let chunk_header = &witness.chunk_header; | ||||
let witness_height = chunk_header.height_created(); | ||||
let witness_shard = chunk_header.shard_id(); | ||||
|
||||
if !self | ||||
.epoch_manager | ||||
.get_shard_layout(&witness.epoch_id)? | ||||
.shard_ids() | ||||
.contains(&witness_shard) | ||||
{ | ||||
return Err(Error::InvalidChunkStateWitness(format!( | ||||
"Invalid shard_id in ChunkStateWitness: {}", | ||||
witness_shard | ||||
))); | ||||
} | ||||
|
||||
let chunk_producer = self.epoch_manager.get_chunk_producer( | ||||
&witness.epoch_id, | ||||
witness_height, | ||||
witness_shard, | ||||
)?; | ||||
if witness.chunk_producer != chunk_producer { | ||||
tayfunelmas marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
return Err(Error::InvalidChunkStateWitness(format!( | ||||
"Incorrect chunk producer for epoch {:?} at height {}: expected {}, got {}", | ||||
witness.epoch_id, witness_height, chunk_producer, witness.chunk_producer, | ||||
))); | ||||
} | ||||
|
||||
// Reject witnesses for chunks for which this node isn't a validator. | ||||
// It's an error, as chunk producer shouldn't send the witness to a non-validator node. | ||||
let my_signer = self.chunk_validator.my_signer.as_ref().ok_or(Error::NotAValidator)?; | ||||
let chunk_validator_assignments = self.epoch_manager.get_chunk_validator_assignments( | ||||
&witness.epoch_id, | ||||
witness_shard, | ||||
witness_height, | ||||
)?; | ||||
if !chunk_validator_assignments.contains(my_signer.validator_id()) { | ||||
return Err(Error::NotAChunkValidator); | ||||
} | ||||
|
||||
if !self.epoch_manager.verify_chunk_state_witness_signature( | ||||
&signed_witness, | ||||
&witness.chunk_producer, | ||||
&witness.epoch_id, | ||||
)? { | ||||
return Err(Error::InvalidChunkStateWitness("Invalid signature".to_string())); | ||||
} | ||||
|
||||
Ok(witness) | ||||
} | ||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,6 @@ | |
//! arrives, all witnesses that were waiting for it can be processed. | ||
|
||
use crate::Client; | ||
use itertools::Itertools; | ||
use near_chain::Block; | ||
use near_chain_primitives::Error; | ||
use near_primitives::stateless_validation::ChunkStateWitness; | ||
|
@@ -24,8 +23,9 @@ impl Client { | |
pub fn handle_orphan_state_witness( | ||
&mut self, | ||
witness: ChunkStateWitness, | ||
witness_size: usize, | ||
) -> Result<HandleOrphanWitnessOutcome, Error> { | ||
let chunk_header = &witness.inner.chunk_header; | ||
let chunk_header = &witness.chunk_header; | ||
let witness_height = chunk_header.height_created(); | ||
let witness_shard = chunk_header.shard_id(); | ||
|
||
|
@@ -53,7 +53,6 @@ impl Client { | |
} | ||
|
||
// Don't save orphaned state witnesses which are bigger than the allowed limit. | ||
let witness_size = borsh::to_vec(&witness)?.len(); | ||
let witness_size_u64: u64 = witness_size.try_into().map_err(|_| { | ||
Error::Other(format!("Cannot convert witness size to u64: {}", witness_size)) | ||
})?; | ||
|
@@ -77,31 +76,8 @@ impl Client { | |
let possible_epochs = | ||
self.epoch_manager.possible_epochs_of_height_around_tip(&chain_head, witness_height)?; | ||
|
||
// Try to validate the witness assuming that it resides in one of the possible epochs. | ||
// The witness must pass validation in one of these epochs before it can be admitted to the pool. | ||
let mut epoch_validation_result: Option<Result<(), Error>> = None; | ||
for epoch_id in possible_epochs { | ||
match self.partially_validate_orphan_witness_in_epoch(&witness, &epoch_id) { | ||
Ok(()) => { | ||
epoch_validation_result = Some(Ok(())); | ||
break; | ||
} | ||
Err(err) => epoch_validation_result = Some(Err(err)), | ||
} | ||
} | ||
match epoch_validation_result { | ||
Some(Ok(())) => {} // Validation passed in one of the possible epochs, witness can be added to the pool. | ||
Some(Err(err)) => { | ||
// Validation failed in all possible epochs, reject the witness | ||
return Err(err); | ||
} | ||
None => { | ||
// possible_epochs was empty. This shouldn't happen as all epochs around the chain head are known. | ||
return Err(Error::Other(format!( | ||
"Couldn't find any matching EpochId for orphan chunk state witness with height {}", | ||
witness_height | ||
))); | ||
} | ||
if !possible_epochs.contains(&witness.epoch_id) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we are doing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought about that as well, but there is one nasty scenario where this could backfire: state witness has random non-existing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's really nice that we don't have to validate in multiple epochs now, the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But don't we have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
But I guess it's fine to leave it here for now and think about it later There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, we can consider moving There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We do, but I think we also need to check that the epoch is within some distance from the tip. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As for why it's good to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will go through the comments and would have to think harder. It's still not super clear to me why we need this but will spend some time :) Thanks @jancionear for the context! Meanwhile this definitely doesn't need to be a blocker. |
||
return Ok(HandleOrphanWitnessOutcome::UnsupportedEpochId(witness.epoch_id)); | ||
} | ||
|
||
// Orphan witness is OK, save it to the pool | ||
|
@@ -110,45 +86,6 @@ impl Client { | |
Ok(HandleOrphanWitnessOutcome::SavedToPool) | ||
} | ||
|
||
fn partially_validate_orphan_witness_in_epoch( | ||
&self, | ||
witness: &ChunkStateWitness, | ||
epoch_id: &EpochId, | ||
) -> Result<(), Error> { | ||
let chunk_header = &witness.inner.chunk_header; | ||
let witness_height = chunk_header.height_created(); | ||
let witness_shard = chunk_header.shard_id(); | ||
|
||
// Validate shard_id | ||
if !self.epoch_manager.get_shard_layout(&epoch_id)?.shard_ids().contains(&witness_shard) { | ||
return Err(Error::InvalidChunkStateWitness(format!( | ||
"Invalid shard_id in ChunkStateWitness: {}", | ||
witness_shard | ||
))); | ||
} | ||
|
||
// Reject witnesses for chunks for which which this node isn't a validator. | ||
// It's an error, as the sender shouldn't send the witness to a non-validator node. | ||
let Some(my_signer) = self.chunk_validator.my_signer.as_ref() else { | ||
return Err(Error::NotAValidator); | ||
}; | ||
let chunk_validator_assignments = self.epoch_manager.get_chunk_validator_assignments( | ||
&epoch_id, | ||
witness_shard, | ||
witness_height, | ||
)?; | ||
if !chunk_validator_assignments.contains(my_signer.validator_id()) { | ||
return Err(Error::NotAChunkValidator); | ||
} | ||
|
||
// Verify signature | ||
if !self.epoch_manager.verify_chunk_state_witness_signature_in_epoch(&witness, &epoch_id)? { | ||
return Err(Error::InvalidChunkStateWitness("Invalid signature".to_string())); | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
/// Once a new block arrives, we can process the orphaned chunk state witnesses that were waiting | ||
/// for this block. This function takes the ready witnesses out of the orhan pool and process them. | ||
/// It also removes old witnesses (below final height) from the orphan pool to save memory. | ||
|
@@ -158,7 +95,7 @@ impl Client { | |
.orphan_witness_pool | ||
.take_state_witnesses_waiting_for_block(new_block.hash()); | ||
for witness in ready_witnesses { | ||
let header = &witness.inner.chunk_header; | ||
let header = &witness.chunk_header; | ||
tracing::debug!( | ||
target: "client", | ||
witness_height = header.height_created(), | ||
|
@@ -201,9 +138,10 @@ impl Client { | |
/// of other reasons. In such cases the handler function returns Ok(outcome) to let the caller | ||
/// know what happened with the witness. | ||
/// It's useful in tests. | ||
#[derive(Debug, Clone, Copy, PartialEq, Eq)] | ||
#[derive(Debug, Clone, PartialEq, Eq)] | ||
pub enum HandleOrphanWitnessOutcome { | ||
SavedToPool, | ||
TooBig(usize), | ||
TooFarFromHead { head_height: BlockHeight, witness_height: BlockHeight }, | ||
UnsupportedEpochId(EpochId), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be the size of a decoded
ChunkStateWitness
. For now that's the same size as encoded, so it's ok, but later when compression is introduced it'll have to be changed.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, good point, I will keep that in mind when adding compression