Skip to content

Commit

Permalink
serialize incremental_snapshot_hash (backport #26839) (#27212)
Browse files Browse the repository at this point in the history
serialize incremental_snapshot_hash (#26839)

* serialize incremental_snapshot_hash

* pr feedback

(cherry picked from commit 225cddc)

Co-authored-by: Jeff Washington (jwash) <[email protected]>
  • Loading branch information
mergify[bot] and jeffwashington authored Aug 23, 2022
1 parent f7959af commit c6ea14c
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 16 deletions.
1 change: 1 addition & 0 deletions core/src/accounts_hash_verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ impl AccountsHashVerifier {
accounts_package.snapshot_links.path(),
accounts_package.slot,
&accounts_hash,
None,
);
datapoint_info!(
"accounts_hash_verifier",
Expand Down
3 changes: 3 additions & 0 deletions core/tests/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ fn run_bank_forks_snapshot_n<F>(
accounts_package.snapshot_links.path(),
accounts_package.slot,
&last_bank.get_accounts_hash(),
None,
);
let snapshot_package = SnapshotPackage::new(accounts_package, last_bank.get_accounts_hash());
snapshot_utils::archive_snapshot_package(
Expand Down Expand Up @@ -492,6 +493,7 @@ fn test_concurrent_snapshot_packaging(
accounts_package.snapshot_links.path(),
accounts_package.slot,
&Hash::default(),
None,
);
let snapshot_package = SnapshotPackage::new(accounts_package, Hash::default());
pending_snapshot_package
Expand Down Expand Up @@ -535,6 +537,7 @@ fn test_concurrent_snapshot_packaging(
saved_snapshots_dir.path(),
saved_slot,
&Hash::default(),
None,
);

snapshot_utils::verify_snapshot_archive(
Expand Down
26 changes: 26 additions & 0 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,25 @@ impl RentDebit {
}
}

/// Incremental snapshots only calculate their accounts hash based on the account changes WITHIN the incremental slot range.
/// So, we need to keep track of the full snapshot expected accounts hash results.
/// We also need to keep track of the hash and capitalization specific to the incremental snapshot slot range.
/// The capitalization we calculate for the incremental slot will NOT be consistent with the bank's capitalization.
/// It is not feasible to calculate a capitalization delta that is correct given just incremental slots account data and the full snapshot's capitalization.
#[derive(Serialize, Deserialize, AbiExample, Clone, Debug, Default, PartialEq, Eq)]
pub struct BankIncrementalSnapshotPersistence {
/// slot of full snapshot
pub full_slot: Slot,
/// accounts hash from the full snapshot
pub full_hash: Hash,
/// capitalization from the full snapshot
pub full_capitalization: u64,
/// hash of the accounts in the incremental snapshot slot range, including zero-lamport accounts
pub incremental_hash: Hash,
/// capitalization of the accounts in the incremental snapshot slot range
pub incremental_capitalization: u64,
}

#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct RentDebits(HashMap<Pubkey, RentDebit>);
impl RentDebits {
Expand Down Expand Up @@ -976,6 +995,7 @@ pub struct BankFieldsToDeserialize {
pub(crate) epoch_stakes: HashMap<Epoch, EpochStakes>,
pub(crate) is_delta: bool,
pub(crate) accounts_data_len: u64,
pub(crate) incremental_snapshot_persistence: Option<BankIncrementalSnapshotPersistence>,
}

// Bank's common fields shared by all supported snapshot versions for serialization.
Expand Down Expand Up @@ -1083,6 +1103,7 @@ impl PartialEq for Bank {
accounts_data_size_delta_on_chain: _,
accounts_data_size_delta_off_chain: _,
fee_structure: _,
incremental_snapshot_persistence: _,
// Ignore new fields explicitly if they do not impact PartialEq.
// Adding ".." will remove compile-time checks that if a new field
// is added to the struct, this ParitalEq is accordingly updated.
Expand Down Expand Up @@ -1335,6 +1356,8 @@ pub struct Bank {

/// Transaction fee structure
pub fee_structure: FeeStructure,

pub incremental_snapshot_persistence: Option<BankIncrementalSnapshotPersistence>,
}

struct VoteWithStakeDelegations {
Expand Down Expand Up @@ -1463,6 +1486,7 @@ impl Bank {

fn default_with_accounts(accounts: Accounts) -> Self {
let mut bank = Self {
incremental_snapshot_persistence: None,
rewrites_skipped_this_slot: Rewrites::default(),
rc: BankRc::new(accounts, Slot::default()),
status_cache: Arc::<RwLock<BankStatusCache>>::default(),
Expand Down Expand Up @@ -1770,6 +1794,7 @@ impl Bank {

let accounts_data_size_initial = parent.load_accounts_data_size();
let mut new = Bank {
incremental_snapshot_persistence: None,
rewrites_skipped_this_slot: Rewrites::default(),
rc,
status_cache,
Expand Down Expand Up @@ -2130,6 +2155,7 @@ impl Bank {
}
let feature_set = new();
let mut bank = Self {
incremental_snapshot_persistence: fields.incremental_snapshot_persistence,
rewrites_skipped_this_slot: Rewrites::default(),
rc: bank_rc,
status_cache: new(),
Expand Down
14 changes: 12 additions & 2 deletions runtime/src/serde_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use {
accounts_index::AccountSecondaryIndexes,
accounts_update_notifier_interface::AccountsUpdateNotifier,
append_vec::{AppendVec, StoredMetaWriteVersion},
bank::{Bank, BankFieldsToDeserialize, BankRc},
bank::{Bank, BankFieldsToDeserialize, BankIncrementalSnapshotPersistence, BankRc},
blockhash_queue::BlockhashQueue,
builtins::Builtins,
epoch_stakes::EpochStakes,
Expand Down Expand Up @@ -76,6 +76,7 @@ pub struct AccountsDbFields<T>(
/// slots that were roots within the last epoch for which we care about the hash value
#[serde(deserialize_with = "default_on_eof")]
Vec<(Slot, Hash)>,
// here?
);

/// Helper type to wrap BufReader streams when deserializing and reconstructing from either just a
Expand Down Expand Up @@ -192,6 +193,7 @@ trait TypeContext<'a>: PartialEq {
stream_reader: &mut BufReader<R>,
stream_writer: &mut BufWriter<W>,
accounts_hash: &Hash,
incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
) -> std::result::Result<(), Box<bincode::ErrorKind>>
where
R: Read,
Expand Down Expand Up @@ -367,12 +369,18 @@ fn reserialize_bank_fields_with_new_hash<W, R>(
stream_reader: &mut BufReader<R>,
stream_writer: &mut BufWriter<W>,
accounts_hash: &Hash,
incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
) -> Result<(), Error>
where
W: Write,
R: Read,
{
newer::Context::reserialize_bank_fields_with_hash(stream_reader, stream_writer, accounts_hash)
newer::Context::reserialize_bank_fields_with_hash(
stream_reader,
stream_writer,
accounts_hash,
incremental_snapshot_persistence,
)
}

/// effectively updates the accounts hash in the serialized bank file on disk
Expand All @@ -384,6 +392,7 @@ pub fn reserialize_bank_with_new_accounts_hash(
bank_snapshots_dir: impl AsRef<Path>,
slot: Slot,
accounts_hash: &Hash,
incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
) -> bool {
let bank_post = snapshot_utils::get_bank_snapshots_dir(bank_snapshots_dir, slot);
let bank_post = bank_post.join(snapshot_utils::get_snapshot_file_name(slot));
Expand All @@ -401,6 +410,7 @@ pub fn reserialize_bank_with_new_accounts_hash(
&mut BufReader::new(file),
&mut BufWriter::new(file_out),
accounts_hash,
incremental_snapshot_persistence,
)
.unwrap();
}
Expand Down
17 changes: 15 additions & 2 deletions runtime/src/serde_snapshot/newer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ impl From<DeserializableVersionedBank> for BankFieldsToDeserialize {
stakes: dvb.stakes,
epoch_stakes: dvb.epoch_stakes,
is_delta: dvb.is_delta,
incremental_snapshot_persistence: None,
}
}
}
Expand Down Expand Up @@ -209,6 +210,7 @@ impl<'a> TypeContext<'a> for Context {
// we can grab it on restart.
// TODO: if we do a snapshot version bump, consider moving this out.
lamports_per_signature,
None::<BankIncrementalSnapshotPersistence>,
)
.serialize(serializer)
}
Expand Down Expand Up @@ -314,6 +316,10 @@ impl<'a> TypeContext<'a> for Context {
bank_fields.fee_rate_governor = bank_fields
.fee_rate_governor
.clone_with_lamports_per_signature(lamports_per_signature);

let incremental_snapshot_persistence = ignore_eof_error(deserialize_from(stream))?;
bank_fields.incremental_snapshot_persistence = incremental_snapshot_persistence;

Ok((bank_fields, accounts_db_fields))
}

Expand All @@ -327,12 +333,13 @@ impl<'a> TypeContext<'a> for Context {
}

/// deserialize the bank from 'stream_reader'
/// modify the accounts_hash
/// modify the accounts_hash and incremental_snapshot_persistence
/// reserialize the bank to 'stream_writer'
fn reserialize_bank_fields_with_hash<R, W>(
stream_reader: &mut BufReader<R>,
stream_writer: &mut BufWriter<W>,
accounts_hash: &Hash,
incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
) -> std::result::Result<(), Box<bincode::ErrorKind>>
where
R: Read,
Expand All @@ -345,6 +352,7 @@ impl<'a> TypeContext<'a> for Context {
let blockhash_queue = RwLock::new(rhs.blockhash_queue.clone());
let hard_forks = RwLock::new(rhs.hard_forks.clone());
let lamports_per_signature = rhs.fee_rate_governor.lamports_per_signature;

let bank = SerializableVersionedBank {
blockhash_queue: &blockhash_queue,
ancestors: &rhs.ancestors,
Expand Down Expand Up @@ -382,7 +390,12 @@ impl<'a> TypeContext<'a> for Context {

bincode::serialize_into(
stream_writer,
&(bank, accounts_db_fields, lamports_per_signature),
&(
bank,
accounts_db_fields,
lamports_per_signature,
incremental_snapshot_persistence,
),
)
}
}
54 changes: 42 additions & 12 deletions runtime/src/serde_snapshot/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ fn test_bank_serialize_style(
serde_style: SerdeStyle,
reserialize_accounts_hash: bool,
update_accounts_hash: bool,
incremental_snapshot_persistence: bool,
) {
solana_logger::setup();
let (genesis_config, _) = create_genesis_config(500);
Expand Down Expand Up @@ -236,8 +237,18 @@ fn test_bank_serialize_style(
} else {
bank2.get_accounts_hash()
};
if reserialize_accounts_hash {
let slot = bank2.slot();

let slot = bank2.slot();
let incremental =
incremental_snapshot_persistence.then(|| BankIncrementalSnapshotPersistence {
full_slot: slot + 1,
full_hash: Hash::new(&[1; 32]),
full_capitalization: 31,
incremental_hash: Hash::new(&[2; 32]),
incremental_capitalization: 32,
});

if reserialize_accounts_hash || incremental_snapshot_persistence {
let temp_dir = TempDir::new().unwrap();
let slot_dir = temp_dir.path().join(slot.to_string());
let post_path = slot_dir.join(slot.to_string());
Expand All @@ -248,21 +259,32 @@ fn test_bank_serialize_style(
let mut f = std::fs::File::create(&pre_path).unwrap();
f.write_all(&buf).unwrap();
}

assert!(reserialize_bank_with_new_accounts_hash(
temp_dir.path(),
slot,
&accounts_hash
&accounts_hash,
incremental.as_ref(),
));
let previous_len = buf.len();
// larger buffer than expected to make sure the file isn't larger than expected
let mut buf_reserialized = vec![0; previous_len + 1];
let sizeof_none = std::mem::size_of::<u64>();
let sizeof_incremental_snapshot_persistence =
std::mem::size_of::<Option<BankIncrementalSnapshotPersistence>>();
let mut buf_reserialized =
vec![0; previous_len + sizeof_incremental_snapshot_persistence + 1];
{
let mut f = std::fs::File::open(post_path).unwrap();
let size = f.read(&mut buf_reserialized).unwrap();
assert_eq!(size, previous_len);
let expected = if !incremental_snapshot_persistence {
previous_len
} else {
previous_len + sizeof_incremental_snapshot_persistence - sizeof_none
};
assert_eq!(size, expected);
buf_reserialized.truncate(size);
}
if update_accounts_hash {
if update_accounts_hash || incremental_snapshot_persistence {
// We cannot guarantee buffer contents are exactly the same if hash is the same.
// Things like hashsets/maps have randomness in their in-mem representations.
// This make serialized bytes not deterministic.
Expand Down Expand Up @@ -310,6 +332,7 @@ fn test_bank_serialize_style(
assert_eq!(dbank.get_balance(&key3.pubkey()), 0);
assert_eq!(dbank.get_accounts_hash(), accounts_hash);
assert!(bank2 == dbank);
assert_eq!(dbank.incremental_snapshot_persistence, incremental);
}

pub(crate) fn reconstruct_accounts_db_via_serialization(
Expand Down Expand Up @@ -358,11 +381,18 @@ fn test_bank_serialize_newer() {
for (reserialize_accounts_hash, update_accounts_hash) in
[(false, false), (true, false), (true, true)]
{
test_bank_serialize_style(
SerdeStyle::Newer,
reserialize_accounts_hash,
update_accounts_hash,
)
for incremental_snapshot_persistence in if reserialize_accounts_hash {
[false, true].to_vec()
} else {
[false].to_vec()
} {
test_bank_serialize_style(
SerdeStyle::Newer,
reserialize_accounts_hash,
update_accounts_hash,
incremental_snapshot_persistence,
)
}
}
}

Expand Down Expand Up @@ -551,7 +581,7 @@ mod test_bank_serialize {

// This some what long test harness is required to freeze the ABI of
// Bank's serialization due to versioned nature
#[frozen_abi(digest = "9vGBt7YfymKUTPWLHVVpQbDtPD7dFDwXRMFkCzwujNqJ")]
#[frozen_abi(digest = "5py4Wkuj5fV2sLyA1MrPg4pGNwMEaygQLnpLyY8MMLGC")]
#[derive(Serialize, AbiExample)]
pub struct BankAbiTestWrapperNewer {
#[serde(serialize_with = "wrapper_newer")]
Expand Down
2 changes: 2 additions & 0 deletions runtime/src/snapshot_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2038,6 +2038,7 @@ pub fn package_and_archive_full_snapshot(
accounts_package.snapshot_links.path(),
accounts_package.slot,
&bank.get_accounts_hash(),
None,
);

let snapshot_package = SnapshotPackage::new(accounts_package, bank.get_accounts_hash());
Expand Down Expand Up @@ -2090,6 +2091,7 @@ pub fn package_and_archive_incremental_snapshot(
accounts_package.snapshot_links.path(),
accounts_package.slot,
&bank.get_accounts_hash(),
None,
);

let snapshot_package = SnapshotPackage::new(accounts_package, bank.get_accounts_hash());
Expand Down

0 comments on commit c6ea14c

Please sign in to comment.