diff --git a/crates/sui-core/src/authority_active/checkpoint_driver/mod.rs b/crates/sui-core/src/authority_active/checkpoint_driver/mod.rs index 872d4ae605b9f..4dad2e2dc1246 100644 --- a/crates/sui-core/src/authority_active/checkpoint_driver/mod.rs +++ b/crates/sui-core/src/authority_active/checkpoint_driver/mod.rs @@ -777,20 +777,23 @@ pub async fn create_fragments( where A: AuthorityAPI + Send + Sync + 'static + Clone, { + let next_cp_seq = checkpoint_db.lock().next_checkpoint(); + let mut available_authorities = committee.shuffle_by_stake(None, None); // Remove ourselves and all validators that we have already diffed with. - let already_fragmented = checkpoint_db.lock().validators_already_fragmented_with(); + let already_fragmented = checkpoint_db + .lock() + .validators_already_fragmented_with(next_cp_seq); // TODO: We can also use AuthorityHealth to pick healthy authorities first. available_authorities .retain(|name| name != &active_authority.state.name && !already_fragmented.contains(name)); debug!( + ?next_cp_seq, fragmented_count=?already_fragmented.len(), to_be_fragmented_count=?available_authorities.len(), "Going through remaining validators to generate fragments", ); - let next_cp_seq = checkpoint_db.lock().next_checkpoint(); - let result = checkpoint_db .lock() .attempt_to_construct_checkpoint(committee); diff --git a/crates/sui-core/src/checkpoints/mod.rs b/crates/sui-core/src/checkpoints/mod.rs index 93ff4e06368e7..29eea907dcda5 100644 --- a/crates/sui-core/src/checkpoints/mod.rs +++ b/crates/sui-core/src/checkpoints/mod.rs @@ -12,7 +12,7 @@ use narwhal_executor::ExecutionIndices; use rocksdb::Options; use serde::{Deserialize, Serialize}; use std::collections::BTreeSet; -use std::{collections::HashSet, path::Path, sync::Arc}; +use std::{path::Path, sync::Arc}; use sui_storage::default_db_options; use sui_types::messages_checkpoint::{CheckpointProposal, CheckpointProposalContents}; use sui_types::{ @@ -110,7 +110,7 @@ pub struct CheckpointStoreTables { // to. These are used for the local node to potentially reconstruct the full // transaction set. #[default_options_override_fn = "local_fragments_table_default_config"] - pub local_fragments: DBMap, + pub local_fragments: DBMap<(CheckpointSequenceNumber, AuthorityName), CheckpointFragment>, /// Store the fragments received in order, the counter is purely internal, /// to allow us to provide a list in order they were received. We only store @@ -419,7 +419,17 @@ impl CheckpointStore { )? .delete_batch( &self.tables.local_fragments, - self.tables.local_fragments.keys(), + self.tables + .local_fragments + .iter() + .filter_map(|((seq, name), _)| { + // Delete all keys for checkpoints smaller than what we are committing now. + if seq <= checkpoint_sequence_number { + Some((seq, name)) + } else { + None + } + }), )?; // Update the transactions databases. @@ -480,14 +490,19 @@ impl CheckpointStore { } else { fragment.proposer.authority() }; - if !self.tables.local_fragments.contains_key(other_name)? { - self.tables.local_fragments.insert(other_name, fragment)?; - } else { - // We already have this fragment, so we can ignore it. + if self + .tables + .local_fragments + .contains_key(&(next_checkpoint_seq, *other_name))? + { + // If we already have this fragment, we can ignore it. return Err(SuiError::GenericAuthorityError { error: format!("Already processed fragment with {:?}", other_name), }); } + self.tables + .local_fragments + .insert(&(next_checkpoint_seq, *other_name), fragment)?; // Send to consensus for sequencing. if let Some(sender) = &self.sender { @@ -535,18 +550,16 @@ impl CheckpointStore { self.tables.fragments.insert(&seq, &fragment)?; // If the fragment contains us also save it in the list of local fragments - let next_sequence_number = self.next_checkpoint(); - if fragment.proposer.summary.sequence_number == next_sequence_number { - if fragment.proposer.authority() == &self.name { - self.tables - .local_fragments - .insert(fragment.other.authority(), &fragment)?; - } - if fragment.other.authority() == &self.name { - self.tables - .local_fragments - .insert(fragment.proposer.authority(), &fragment)?; - } + let fragment_seq = fragment.proposer.summary.sequence_number; + if fragment.proposer.authority() == &self.name { + self.tables + .local_fragments + .insert(&(fragment_seq, *fragment.other.authority()), &fragment)?; + } + if fragment.other.authority() == &self.name { + self.tables + .local_fragments + .insert(&(fragment_seq, *fragment.proposer.authority()), &fragment)?; } Ok(()) @@ -644,9 +657,8 @@ impl CheckpointStore { } // Strategy 2 to reconstruct checkpoint -- There is a link between us and the checkpoint set - - let local_links: HashSet<_> = self.tables.local_fragments.keys().collect(); - let checkpoint_keys: HashSet<_> = reconstructed + let local_links = self.validators_already_fragmented_with(next_sequence_number); + let checkpoint_keys: BTreeSet<_> = reconstructed .global .authority_waypoints .keys() @@ -654,7 +666,11 @@ impl CheckpointStore { .collect(); if let Some(auth) = local_links.intersection(&checkpoint_keys).next() { - let fragment = self.tables.local_fragments.get(auth)?.unwrap(); + let fragment = self + .tables + .local_fragments + .get(&(next_sequence_number, *auth))? + .unwrap(); // Extract the diff let diff = if fragment.proposer.authority() == &self.name { @@ -788,11 +804,14 @@ impl CheckpointStore { next_seq % CHECKPOINT_COUNT_PER_EPOCH == 1 && next_seq != 1 } - pub fn validators_already_fragmented_with(&mut self) -> BTreeSet { + pub fn validators_already_fragmented_with( + &mut self, + next_seq: CheckpointSequenceNumber, + ) -> BTreeSet { self.tables .local_fragments - .iter() - .map(|(name, _)| name) + .keys() + .filter_map(|(seq, name)| if seq == next_seq { Some(name) } else { None }) .collect() }