Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[stateless_validation] fix: Pending chunk endorsement cache #10568

Merged
merged 6 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1373,8 +1373,20 @@ impl Client {
self.chain.blocks_delay_tracker.mark_chunk_completed(&chunk_header, StaticClock::utc());
self.block_production_info
.record_chunk_collected(partial_chunk.height_created(), partial_chunk.shard_id());

persist_chunk(partial_chunk, shard_chunk, self.chain.mut_chain_store())
.expect("Could not persist chunk");

// We process chunk endorsements that were blocked by not having chunk complete.
self.chunk_endorsement_tracker.process_pending_endorsements(&chunk_header).unwrap_or_else(
|_| {
panic!(
staffik marked this conversation as resolved.
Show resolved Hide resolved
"Could not process pending endorsements for chunk {:?}",
&chunk_header.chunk_hash()
)
},
);

// We're marking chunk as accepted.
self.chain.blocks_with_missing_chunks.accept_chunk(&chunk_header.chunk_hash());
// If this was the last chunk that was missing for a block, it will be processed now.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,29 @@ pub struct ChunkEndorsementTracker {
/// This is keyed on chunk_hash and account_id of validator to avoid duplicates.
/// Chunk endorsements would later be used as a part of block production.
chunk_endorsements: SyncLruCache<ChunkHash, HashMap<AccountId, ChunkEndorsement>>,
/// We store chunk endorsements to be processed later because we did not have
/// chunks ready at the time we received that endorsements from validators.
/// This is keyed on chunk_hash and account_id of validator to avoid duplicates.
pending_chunk_endorsements: SyncLruCache<ChunkHash, HashMap<AccountId, ChunkEndorsement>>,
}

impl Client {
pub fn process_chunk_endorsement(
&mut self,
endorsement: ChunkEndorsement,
) -> Result<(), Error> {
let chunk_header = self.chain.get_chunk(endorsement.chunk_hash())?.cloned_header();
self.chunk_endorsement_tracker.process_chunk_endorsement(&chunk_header, endorsement)
// We should not need whole chunk ready here, we only need chunk header.
match self.chain.get_chunk(endorsement.chunk_hash()) {
Ok(chunk) => {
let chunk_header = &chunk.cloned_header();
self.chunk_endorsement_tracker
.process_chunk_endorsement(endorsement, Some(chunk_header))
}
Err(_) => {
tracing::debug!(target: "stateless_validation", ?endorsement, "Endorsement arrived before chunk.");
staffik marked this conversation as resolved.
Show resolved Hide resolved
self.chunk_endorsement_tracker.process_chunk_endorsement(endorsement, None)
}
}
}
}

Expand All @@ -40,33 +54,64 @@ impl ChunkEndorsementTracker {
Self {
epoch_manager,
chunk_endorsements: SyncLruCache::new(NUM_CHUNKS_IN_CHUNK_ENDORSEMENTS_CACHE),
// We can use a different cache size if needed, it does not have to be the same as for `chunk_endorsements`.
pending_chunk_endorsements: SyncLruCache::new(NUM_CHUNKS_IN_CHUNK_ENDORSEMENTS_CACHE),
}
}

pub fn process_pending_endorsements(
staffik marked this conversation as resolved.
Show resolved Hide resolved
&self,
chunk_header: &ShardChunkHeader,
) -> Result<(), Error> {
let chunk_hash = &chunk_header.chunk_hash();
let chunk_endorsements = {
let mut guard = self.pending_chunk_endorsements.lock();
guard.pop(chunk_hash)
};
let chunk_endorsements = match chunk_endorsements {
staffik marked this conversation as resolved.
Show resolved Hide resolved
Some(chunk_endorsements) => chunk_endorsements,
None => {
tracing::debug!(target: "stateless_validation", ?chunk_hash, "No pending chunk endorsements.");
staffik marked this conversation as resolved.
Show resolved Hide resolved
return Ok(());
}
};
chunk_endorsements.values().try_for_each(|endorsement| {
staffik marked this conversation as resolved.
Show resolved Hide resolved
self.process_chunk_endorsement(endorsement.clone(), Some(&chunk_header))
})
staffik marked this conversation as resolved.
Show resolved Hide resolved
}

/// Function to process an incoming chunk endorsement from chunk validators.
/// We first verify the chunk endorsement and then store it in a cache.
/// If the chunk header is available, we will verify the chunk endorsement and then store it in a cache.
/// Otherwise, we store the endorsement in a separate cache of endorsements to be processed when the chunk is ready.
/// We would later include the endorsements in the block production.
pub(crate) fn process_chunk_endorsement(
&self,
chunk_header: &ShardChunkHeader,
endorsement: ChunkEndorsement,
chunk_header: Option<&ShardChunkHeader>,
staffik marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<(), Error> {
let chunk_hash = endorsement.chunk_hash();
let account_id = &endorsement.account_id;

let endorsement_cache = if chunk_header.is_some() {
&self.chunk_endorsements
} else {
&self.pending_chunk_endorsements
};
staffik marked this conversation as resolved.
Show resolved Hide resolved

// If we have already processed this chunk endorsement, return early.
if self
.chunk_endorsements
if endorsement_cache
.get(chunk_hash)
.is_some_and(|existing_endorsements| existing_endorsements.get(account_id).is_some())
{
tracing::debug!(target: "stateless_validation", ?endorsement, "Already received chunk endorsement.");
return Ok(());
}

if !self.epoch_manager.verify_chunk_endorsement(chunk_header, &endorsement)? {
tracing::error!(target: "stateless_validation", ?endorsement, "Invalid chunk endorsement.");
return Err(Error::InvalidChunkEndorsement);
if let Some(chunk_header) = chunk_header {
if !self.epoch_manager.verify_chunk_endorsement(&chunk_header, &endorsement)? {
tracing::error!(target: "stateless_validation", ?endorsement, "Invalid chunk endorsement.");
wacban marked this conversation as resolved.
Show resolved Hide resolved
return Err(Error::InvalidChunkEndorsement);
}
}

// If we are the current block producer, we store the chunk endorsement for each chunk which
Expand All @@ -76,7 +121,7 @@ impl ChunkEndorsementTracker {
// Maybe add check to ensure we don't accept endorsements from chunks already included in some block?
// Maybe add check to ensure we don't accept endorsements from chunks that have too old height_created?
tracing::debug!(target: "stateless_validation", ?endorsement, "Received and saved chunk endorsement.");
let mut guard = self.chunk_endorsements.lock();
let mut guard = endorsement_cache.lock();
guard.get_or_insert(chunk_hash.clone(), || HashMap::new());
let chunk_endorsements = guard.get_mut(chunk_hash).unwrap();
chunk_endorsements.insert(account_id.clone(), endorsement);
Expand Down
2 changes: 1 addition & 1 deletion chain/client/src/stateless_validation/chunk_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ pub(crate) fn send_chunk_endorsement_to_block_producers(
if signer.validator_id() == &block_producer {
// Unwrap here as we always expect our own endorsements to be valid
chunk_endorsement_tracker
.process_chunk_endorsement(chunk_header, endorsement.clone())
.process_chunk_endorsement(endorsement.clone(), Some(chunk_header))
.unwrap();
} else {
network_sender.send(PeerManagerMessageRequest::NetworkRequests(
Expand Down
Loading