From fd654c45c3c7a3d9146955013942165e6102e513 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Chuda=C5=9B?= Date: Mon, 5 Feb 2024 15:20:02 +0100 Subject: [PATCH 1/6] Pending chunk endorsement cache --- chain/client/src/client.rs | 11 ++++ .../chunk_endorsement_tracker.rs | 65 ++++++++++++++++--- .../stateless_validation/chunk_validator.rs | 2 +- 3 files changed, 67 insertions(+), 11 deletions(-) diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index bdfa922bfd3..46db5f6c707 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -1373,8 +1373,19 @@ impl Client { self.chain.blocks_delay_tracker.mark_chunk_completed(&chunk_header, StaticClock::utc()); self.block_production_info .record_chunk_collected(partial_chunk.height_created(), partial_chunk.shard_id()); + persist_chunk(partial_chunk, shard_chunk, self.chain.mut_chain_store()) .expect("Could not persist chunk"); + + // We process chunk endorsements that were blocked by not having chunk complete. + self.chunk_endorsement_tracker.process_pending_endorsements(&chunk_header).expect( + &format!( + "Could not process pending endorsements for chunk {:?}", + &chunk_header.chunk_hash() + ) + .to_string(), + ); + // We're marking chunk as accepted. self.chain.blocks_with_missing_chunks.accept_chunk(&chunk_header.chunk_hash()); // If this was the last chunk that was missing for a block, it will be processed now. diff --git a/chain/client/src/stateless_validation/chunk_endorsement_tracker.rs b/chain/client/src/stateless_validation/chunk_endorsement_tracker.rs index e28dcbe5e72..9f8b9f6bc69 100644 --- a/chain/client/src/stateless_validation/chunk_endorsement_tracker.rs +++ b/chain/client/src/stateless_validation/chunk_endorsement_tracker.rs @@ -23,6 +23,10 @@ pub struct ChunkEndorsementTracker { /// This is keyed on chunk_hash and account_id of validator to avoid duplicates. /// Chunk endorsements would later be used as a part of block production. chunk_endorsements: SyncLruCache>, + /// We store chunk endorsements to be processed later because we did not have + /// chunks ready at the time we received that endorsements from validators. + /// This is keyed on chunk_hash and account_id of validator to avoid duplicates. + pending_chunk_endorsements: SyncLruCache>, } impl Client { @@ -30,8 +34,18 @@ impl Client { &mut self, endorsement: ChunkEndorsement, ) -> Result<(), Error> { - let chunk_header = self.chain.get_chunk(endorsement.chunk_hash())?.cloned_header(); - self.chunk_endorsement_tracker.process_chunk_endorsement(&chunk_header, endorsement) + // We should not need whole chunk ready here, we only need chunk header. + match self.chain.get_chunk(endorsement.chunk_hash()) { + Ok(chunk) => { + let chunk_header = &chunk.cloned_header(); + self.chunk_endorsement_tracker + .process_chunk_endorsement(endorsement, Some(chunk_header)) + } + Err(_) => { + tracing::debug!(target: "stateless_validation", ?endorsement, "Endorsement arrived before chunk."); + self.chunk_endorsement_tracker.process_chunk_endorsement(endorsement, None) + } + } } } @@ -40,23 +54,52 @@ impl ChunkEndorsementTracker { Self { epoch_manager, chunk_endorsements: SyncLruCache::new(NUM_CHUNKS_IN_CHUNK_ENDORSEMENTS_CACHE), + // We can use a different cache size if needed, it does not have to be the same as for `chunk_endorsements`. + pending_chunk_endorsements: SyncLruCache::new(NUM_CHUNKS_IN_CHUNK_ENDORSEMENTS_CACHE), } } + pub fn process_pending_endorsements( + &self, + chunk_header: &ShardChunkHeader, + ) -> Result<(), Error> { + let chunk_hash = &chunk_header.chunk_hash(); + let chunk_endorsements = { + let mut guard = self.pending_chunk_endorsements.lock(); + guard.pop(chunk_hash) + }; + let chunk_endorsements = match chunk_endorsements { + Some(chunk_endorsements) => chunk_endorsements, + None => { + tracing::debug!(target: "stateless_validation", ?chunk_hash, "No pending chunk endorsements."); + return Ok(()); + } + }; + chunk_endorsements.values().try_for_each(|endorsement| { + self.process_chunk_endorsement(endorsement.clone(), Some(&chunk_header)) + }) + } + /// Function to process an incoming chunk endorsement from chunk validators. - /// We first verify the chunk endorsement and then store it in a cache. + /// If the chunk header is available, we will verify the chunk endorsement and then store it in a cache. + /// Otherwise, we store the endorsement in a separate cache of endorsements to be processed when the chunk is ready. /// We would later include the endorsements in the block production. pub(crate) fn process_chunk_endorsement( &self, - chunk_header: &ShardChunkHeader, endorsement: ChunkEndorsement, + chunk_header: Option<&ShardChunkHeader>, ) -> Result<(), Error> { let chunk_hash = endorsement.chunk_hash(); let account_id = &endorsement.account_id; + let endorsement_cache = if chunk_header.is_some() { + &self.chunk_endorsements + } else { + &self.pending_chunk_endorsements + }; + // If we have already processed this chunk endorsement, return early. - if self - .chunk_endorsements + if endorsement_cache .get(chunk_hash) .is_some_and(|existing_endorsements| existing_endorsements.get(account_id).is_some()) { @@ -64,9 +107,11 @@ impl ChunkEndorsementTracker { return Ok(()); } - if !self.epoch_manager.verify_chunk_endorsement(chunk_header, &endorsement)? { - tracing::error!(target: "stateless_validation", ?endorsement, "Invalid chunk endorsement."); - return Err(Error::InvalidChunkEndorsement); + if let Some(chunk_header) = chunk_header { + if !self.epoch_manager.verify_chunk_endorsement(&chunk_header, &endorsement)? { + tracing::error!(target: "stateless_validation", ?endorsement, "Invalid chunk endorsement."); + return Err(Error::InvalidChunkEndorsement); + } } // If we are the current block producer, we store the chunk endorsement for each chunk which @@ -76,7 +121,7 @@ impl ChunkEndorsementTracker { // Maybe add check to ensure we don't accept endorsements from chunks already included in some block? // Maybe add check to ensure we don't accept endorsements from chunks that have too old height_created? tracing::debug!(target: "stateless_validation", ?endorsement, "Received and saved chunk endorsement."); - let mut guard = self.chunk_endorsements.lock(); + let mut guard = endorsement_cache.lock(); guard.get_or_insert(chunk_hash.clone(), || HashMap::new()); let chunk_endorsements = guard.get_mut(chunk_hash).unwrap(); chunk_endorsements.insert(account_id.clone(), endorsement); diff --git a/chain/client/src/stateless_validation/chunk_validator.rs b/chain/client/src/stateless_validation/chunk_validator.rs index 46b184558f0..adf3c1ba25e 100644 --- a/chain/client/src/stateless_validation/chunk_validator.rs +++ b/chain/client/src/stateless_validation/chunk_validator.rs @@ -568,7 +568,7 @@ pub(crate) fn send_chunk_endorsement_to_block_producers( if signer.validator_id() == &block_producer { // Unwrap here as we always expect our own endorsements to be valid chunk_endorsement_tracker - .process_chunk_endorsement(chunk_header, endorsement.clone()) + .process_chunk_endorsement(endorsement.clone(), Some(chunk_header)) .unwrap(); } else { network_sender.send(PeerManagerMessageRequest::NetworkRequests( From d99220624f8cd731c6992319ee2ad8ebe551baeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Chuda=C5=9B?= Date: Mon, 5 Feb 2024 15:52:56 +0100 Subject: [PATCH 2/6] Clippy fix --- chain/client/src/client.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index 46db5f6c707..51415c09d93 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -1378,12 +1378,13 @@ impl Client { .expect("Could not persist chunk"); // We process chunk endorsements that were blocked by not having chunk complete. - self.chunk_endorsement_tracker.process_pending_endorsements(&chunk_header).expect( - &format!( - "Could not process pending endorsements for chunk {:?}", - &chunk_header.chunk_hash() - ) - .to_string(), + self.chunk_endorsement_tracker.process_pending_endorsements(&chunk_header).unwrap_or_else( + |_| { + panic!( + "Could not process pending endorsements for chunk {:?}", + &chunk_header.chunk_hash() + ) + }, ); // We're marking chunk as accepted. From 1c531114f90172a9a32d13b12edd9dfe7f1f46ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Chuda=C5=9B?= Date: Mon, 5 Feb 2024 17:12:25 +0100 Subject: [PATCH 3/6] Build statelessnet protocol version --- chain/client/src/client.rs | 2 ++ .../src/stateless_validation/chunk_endorsement_tracker.rs | 5 ++++- core/primitives-core/src/version.rs | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index 51415c09d93..ac7f8f05b76 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -1374,10 +1374,12 @@ impl Client { self.block_production_info .record_chunk_collected(partial_chunk.height_created(), partial_chunk.shard_id()); + // TODO(stateless_validation) We would like a proper error handling here instead of `expect`. persist_chunk(partial_chunk, shard_chunk, self.chain.mut_chain_store()) .expect("Could not persist chunk"); // We process chunk endorsements that were blocked by not having chunk complete. + // TODO(stateless_validation) We would like a proper error handling here instead of `panic`. self.chunk_endorsement_tracker.process_pending_endorsements(&chunk_header).unwrap_or_else( |_| { panic!( diff --git a/chain/client/src/stateless_validation/chunk_endorsement_tracker.rs b/chain/client/src/stateless_validation/chunk_endorsement_tracker.rs index 9f8b9f6bc69..22beeb4d429 100644 --- a/chain/client/src/stateless_validation/chunk_endorsement_tracker.rs +++ b/chain/client/src/stateless_validation/chunk_endorsement_tracker.rs @@ -41,10 +41,11 @@ impl Client { self.chunk_endorsement_tracker .process_chunk_endorsement(endorsement, Some(chunk_header)) } - Err(_) => { + Err(Error::ChunkMissing(_)) => { tracing::debug!(target: "stateless_validation", ?endorsement, "Endorsement arrived before chunk."); self.chunk_endorsement_tracker.process_chunk_endorsement(endorsement, None) } + Err(error) => return Err(error), } } } @@ -59,6 +60,8 @@ impl ChunkEndorsementTracker { } } + /// Process pending endorsements for the given chunk header. + /// It removes these endorsements from the `pending_chunk_endorsements` cache. pub fn process_pending_endorsements( &self, chunk_header: &ShardChunkHeader, diff --git a/core/primitives-core/src/version.rs b/core/primitives-core/src/version.rs index a37ba910603..d5316a1aa83 100644 --- a/core/primitives-core/src/version.rs +++ b/core/primitives-core/src/version.rs @@ -198,7 +198,7 @@ impl ProtocolFeature { /// Current protocol version used on the mainnet. /// Some features (e. g. FixStorageUsage) require that there is at least one epoch with exactly /// the corresponding version -const STABLE_PROTOCOL_VERSION: ProtocolVersion = 64; +const STABLE_PROTOCOL_VERSION: ProtocolVersion = 81; /// Largest protocol version supported by the current binary. pub const PROTOCOL_VERSION: ProtocolVersion = if cfg!(feature = "statelessnet_protocol") { From 3d992a9196086c2357cdfc36c693f961e088986e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Chuda=C5=9B?= Date: Tue, 6 Feb 2024 09:43:11 +0100 Subject: [PATCH 4/6] PR fixes --- chain/client/src/client.rs | 14 +----- .../chunk_endorsement_tracker.rs | 50 ++++++++++++------- .../stateless_validation/chunk_validator.rs | 2 +- core/primitives-core/src/version.rs | 2 +- 4 files changed, 37 insertions(+), 31 deletions(-) diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index ac7f8f05b76..b8eaea25409 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -1374,21 +1374,11 @@ impl Client { self.block_production_info .record_chunk_collected(partial_chunk.height_created(), partial_chunk.shard_id()); - // TODO(stateless_validation) We would like a proper error handling here instead of `expect`. + // TODO We would like a proper error handling here instead of `expect`. persist_chunk(partial_chunk, shard_chunk, self.chain.mut_chain_store()) .expect("Could not persist chunk"); - // We process chunk endorsements that were blocked by not having chunk complete. - // TODO(stateless_validation) We would like a proper error handling here instead of `panic`. - self.chunk_endorsement_tracker.process_pending_endorsements(&chunk_header).unwrap_or_else( - |_| { - panic!( - "Could not process pending endorsements for chunk {:?}", - &chunk_header.chunk_hash() - ) - }, - ); - + self.chunk_endorsement_tracker.process_pending_endorsements(&chunk_header); // We're marking chunk as accepted. self.chain.blocks_with_missing_chunks.accept_chunk(&chunk_header.chunk_hash()); // If this was the last chunk that was missing for a block, it will be processed now. diff --git a/chain/client/src/stateless_validation/chunk_endorsement_tracker.rs b/chain/client/src/stateless_validation/chunk_endorsement_tracker.rs index 22beeb4d429..92ca32ff47f 100644 --- a/chain/client/src/stateless_validation/chunk_endorsement_tracker.rs +++ b/chain/client/src/stateless_validation/chunk_endorsement_tracker.rs @@ -36,14 +36,12 @@ impl Client { ) -> Result<(), Error> { // We should not need whole chunk ready here, we only need chunk header. match self.chain.get_chunk(endorsement.chunk_hash()) { - Ok(chunk) => { - let chunk_header = &chunk.cloned_header(); - self.chunk_endorsement_tracker - .process_chunk_endorsement(endorsement, Some(chunk_header)) - } + Ok(chunk) => self + .chunk_endorsement_tracker + .process_chunk_endorsement(&chunk.cloned_header(), endorsement), Err(Error::ChunkMissing(_)) => { tracing::debug!(target: "stateless_validation", ?endorsement, "Endorsement arrived before chunk."); - self.chunk_endorsement_tracker.process_chunk_endorsement(endorsement, None) + self.chunk_endorsement_tracker.add_chunk_endorsement_to_pending_cache(endorsement) } Err(error) => return Err(error), } @@ -62,10 +60,7 @@ impl ChunkEndorsementTracker { /// Process pending endorsements for the given chunk header. /// It removes these endorsements from the `pending_chunk_endorsements` cache. - pub fn process_pending_endorsements( - &self, - chunk_header: &ShardChunkHeader, - ) -> Result<(), Error> { + pub fn process_pending_endorsements(&self, chunk_header: &ShardChunkHeader) { let chunk_hash = &chunk_header.chunk_hash(); let chunk_endorsements = { let mut guard = self.pending_chunk_endorsements.lock(); @@ -74,20 +69,41 @@ impl ChunkEndorsementTracker { let chunk_endorsements = match chunk_endorsements { Some(chunk_endorsements) => chunk_endorsements, None => { - tracing::debug!(target: "stateless_validation", ?chunk_hash, "No pending chunk endorsements."); - return Ok(()); + return; } }; - chunk_endorsements.values().try_for_each(|endorsement| { - self.process_chunk_endorsement(endorsement.clone(), Some(&chunk_header)) - }) + tracing::debug!(target: "stateless_validation", ?chunk_hash, "Processing pending chunk endorsements."); + for endorsement in chunk_endorsements.values() { + let _ = self.process_chunk_endorsement(chunk_header, endorsement.clone()) + .map_err(|error| { + tracing::debug!(target: "stateless_validation", ?endorsement, "Error processing pending chunk endorsement: {:?}", error); + error + }); + } + } + + /// Add the chunk endorsement to a cache of pending chunk endorsements (if not yet there). + pub(crate) fn add_chunk_endorsement_to_pending_cache( + &self, + endorsement: ChunkEndorsement, + ) -> Result<(), Error> { + self.process_chunk_endorsement_impl(endorsement, None) } /// Function to process an incoming chunk endorsement from chunk validators. - /// If the chunk header is available, we will verify the chunk endorsement and then store it in a cache. - /// Otherwise, we store the endorsement in a separate cache of endorsements to be processed when the chunk is ready. + /// We first verify the chunk endorsement and then store it in a cache. /// We would later include the endorsements in the block production. pub(crate) fn process_chunk_endorsement( + &self, + chunk_header: &ShardChunkHeader, + endorsement: ChunkEndorsement, + ) -> Result<(), Error> { + self.process_chunk_endorsement_impl(endorsement, Some(chunk_header)) + } + + /// If the chunk header is available, we will verify the chunk endorsement and then store it in a cache. + /// Otherwise, we store the endorsement in a separate cache of endorsements to be processed when the chunk is ready. + fn process_chunk_endorsement_impl( &self, endorsement: ChunkEndorsement, chunk_header: Option<&ShardChunkHeader>, diff --git a/chain/client/src/stateless_validation/chunk_validator.rs b/chain/client/src/stateless_validation/chunk_validator.rs index adf3c1ba25e..46b184558f0 100644 --- a/chain/client/src/stateless_validation/chunk_validator.rs +++ b/chain/client/src/stateless_validation/chunk_validator.rs @@ -568,7 +568,7 @@ pub(crate) fn send_chunk_endorsement_to_block_producers( if signer.validator_id() == &block_producer { // Unwrap here as we always expect our own endorsements to be valid chunk_endorsement_tracker - .process_chunk_endorsement(endorsement.clone(), Some(chunk_header)) + .process_chunk_endorsement(chunk_header, endorsement.clone()) .unwrap(); } else { network_sender.send(PeerManagerMessageRequest::NetworkRequests( diff --git a/core/primitives-core/src/version.rs b/core/primitives-core/src/version.rs index d5316a1aa83..a37ba910603 100644 --- a/core/primitives-core/src/version.rs +++ b/core/primitives-core/src/version.rs @@ -198,7 +198,7 @@ impl ProtocolFeature { /// Current protocol version used on the mainnet. /// Some features (e. g. FixStorageUsage) require that there is at least one epoch with exactly /// the corresponding version -const STABLE_PROTOCOL_VERSION: ProtocolVersion = 81; +const STABLE_PROTOCOL_VERSION: ProtocolVersion = 64; /// Largest protocol version supported by the current binary. pub const PROTOCOL_VERSION: ProtocolVersion = if cfg!(feature = "statelessnet_protocol") { From e1bd8fceafa6ceec214c31fd556e189771fcd0df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Chuda=C5=9B?= Date: Tue, 6 Feb 2024 09:45:23 +0100 Subject: [PATCH 5/6] Update comment --- chain/client/src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index b8eaea25409..a30419741db 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -1374,7 +1374,7 @@ impl Client { self.block_production_info .record_chunk_collected(partial_chunk.height_created(), partial_chunk.shard_id()); - // TODO We would like a proper error handling here instead of `expect`. + // TODO(#10569) We would like a proper error handling here instead of `expect`. persist_chunk(partial_chunk, shard_chunk, self.chain.mut_chain_store()) .expect("Could not persist chunk"); From 9d42db3ea7ccec04e75a9787d13b57da7b8b6729 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Chuda=C5=9B?= Date: Tue, 6 Feb 2024 11:35:07 +0100 Subject: [PATCH 6/6] fixes --- .../chunk_endorsement_tracker.rs | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/chain/client/src/stateless_validation/chunk_endorsement_tracker.rs b/chain/client/src/stateless_validation/chunk_endorsement_tracker.rs index 92ca32ff47f..47472560177 100644 --- a/chain/client/src/stateless_validation/chunk_endorsement_tracker.rs +++ b/chain/client/src/stateless_validation/chunk_endorsement_tracker.rs @@ -66,19 +66,14 @@ impl ChunkEndorsementTracker { let mut guard = self.pending_chunk_endorsements.lock(); guard.pop(chunk_hash) }; - let chunk_endorsements = match chunk_endorsements { - Some(chunk_endorsements) => chunk_endorsements, - None => { - return; - } + let Some(chunk_endorsements) = chunk_endorsements else { + return; }; tracing::debug!(target: "stateless_validation", ?chunk_hash, "Processing pending chunk endorsements."); for endorsement in chunk_endorsements.values() { - let _ = self.process_chunk_endorsement(chunk_header, endorsement.clone()) - .map_err(|error| { - tracing::debug!(target: "stateless_validation", ?endorsement, "Error processing pending chunk endorsement: {:?}", error); - error - }); + if let Err(error) = self.process_chunk_endorsement(chunk_header, endorsement.clone()) { + tracing::debug!(target: "stateless_validation", ?endorsement, "Error processing pending chunk endorsement: {:?}", error); + } } }