Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

serialize incremental_snapshot_hash (backport #26839) #27212

Merged
merged 1 commit into from
Aug 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/accounts_hash_verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ impl AccountsHashVerifier {
accounts_package.snapshot_links.path(),
accounts_package.slot,
&accounts_hash,
None,
);
datapoint_info!(
"accounts_hash_verifier",
Expand Down
3 changes: 3 additions & 0 deletions core/tests/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ fn run_bank_forks_snapshot_n<F>(
accounts_package.snapshot_links.path(),
accounts_package.slot,
&last_bank.get_accounts_hash(),
None,
);
let snapshot_package = SnapshotPackage::new(accounts_package, last_bank.get_accounts_hash());
snapshot_utils::archive_snapshot_package(
Expand Down Expand Up @@ -492,6 +493,7 @@ fn test_concurrent_snapshot_packaging(
accounts_package.snapshot_links.path(),
accounts_package.slot,
&Hash::default(),
None,
);
let snapshot_package = SnapshotPackage::new(accounts_package, Hash::default());
pending_snapshot_package
Expand Down Expand Up @@ -535,6 +537,7 @@ fn test_concurrent_snapshot_packaging(
saved_snapshots_dir.path(),
saved_slot,
&Hash::default(),
None,
);

snapshot_utils::verify_snapshot_archive(
Expand Down
26 changes: 26 additions & 0 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,25 @@ impl RentDebit {
}
}

/// Incremental snapshots only calculate their accounts hash based on the account changes WITHIN the incremental slot range.
/// So, we need to keep track of the full snapshot expected accounts hash results.
/// We also need to keep track of the hash and capitalization specific to the incremental snapshot slot range.
/// The capitalization we calculate for the incremental slot will NOT be consistent with the bank's capitalization.
/// It is not feasible to calculate a capitalization delta that is correct given just incremental slots account data and the full snapshot's capitalization.
#[derive(Serialize, Deserialize, AbiExample, Clone, Debug, Default, PartialEq, Eq)]
pub struct BankIncrementalSnapshotPersistence {
/// slot of full snapshot
pub full_slot: Slot,
/// accounts hash from the full snapshot
pub full_hash: Hash,
/// capitalization from the full snapshot
pub full_capitalization: u64,
/// hash of the accounts in the incremental snapshot slot range, including zero-lamport accounts
pub incremental_hash: Hash,
/// capitalization of the accounts in the incremental snapshot slot range
pub incremental_capitalization: u64,
}

#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct RentDebits(HashMap<Pubkey, RentDebit>);
impl RentDebits {
Expand Down Expand Up @@ -976,6 +995,7 @@ pub struct BankFieldsToDeserialize {
pub(crate) epoch_stakes: HashMap<Epoch, EpochStakes>,
pub(crate) is_delta: bool,
pub(crate) accounts_data_len: u64,
pub(crate) incremental_snapshot_persistence: Option<BankIncrementalSnapshotPersistence>,
}

// Bank's common fields shared by all supported snapshot versions for serialization.
Expand Down Expand Up @@ -1083,6 +1103,7 @@ impl PartialEq for Bank {
accounts_data_size_delta_on_chain: _,
accounts_data_size_delta_off_chain: _,
fee_structure: _,
incremental_snapshot_persistence: _,
// Ignore new fields explicitly if they do not impact PartialEq.
// Adding ".." will remove compile-time checks that if a new field
// is added to the struct, this ParitalEq is accordingly updated.
Expand Down Expand Up @@ -1335,6 +1356,8 @@ pub struct Bank {

/// Transaction fee structure
pub fee_structure: FeeStructure,

pub incremental_snapshot_persistence: Option<BankIncrementalSnapshotPersistence>,
}

struct VoteWithStakeDelegations {
Expand Down Expand Up @@ -1463,6 +1486,7 @@ impl Bank {

fn default_with_accounts(accounts: Accounts) -> Self {
let mut bank = Self {
incremental_snapshot_persistence: None,
rewrites_skipped_this_slot: Rewrites::default(),
rc: BankRc::new(accounts, Slot::default()),
status_cache: Arc::<RwLock<BankStatusCache>>::default(),
Expand Down Expand Up @@ -1770,6 +1794,7 @@ impl Bank {

let accounts_data_size_initial = parent.load_accounts_data_size();
let mut new = Bank {
incremental_snapshot_persistence: None,
rewrites_skipped_this_slot: Rewrites::default(),
rc,
status_cache,
Expand Down Expand Up @@ -2130,6 +2155,7 @@ impl Bank {
}
let feature_set = new();
let mut bank = Self {
incremental_snapshot_persistence: fields.incremental_snapshot_persistence,
rewrites_skipped_this_slot: Rewrites::default(),
rc: bank_rc,
status_cache: new(),
Expand Down
14 changes: 12 additions & 2 deletions runtime/src/serde_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use {
accounts_index::AccountSecondaryIndexes,
accounts_update_notifier_interface::AccountsUpdateNotifier,
append_vec::{AppendVec, StoredMetaWriteVersion},
bank::{Bank, BankFieldsToDeserialize, BankRc},
bank::{Bank, BankFieldsToDeserialize, BankIncrementalSnapshotPersistence, BankRc},
blockhash_queue::BlockhashQueue,
builtins::Builtins,
epoch_stakes::EpochStakes,
Expand Down Expand Up @@ -76,6 +76,7 @@ pub struct AccountsDbFields<T>(
/// slots that were roots within the last epoch for which we care about the hash value
#[serde(deserialize_with = "default_on_eof")]
Vec<(Slot, Hash)>,
// here?
);

/// Helper type to wrap BufReader streams when deserializing and reconstructing from either just a
Expand Down Expand Up @@ -192,6 +193,7 @@ trait TypeContext<'a>: PartialEq {
stream_reader: &mut BufReader<R>,
stream_writer: &mut BufWriter<W>,
accounts_hash: &Hash,
incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
) -> std::result::Result<(), Box<bincode::ErrorKind>>
where
R: Read,
Expand Down Expand Up @@ -367,12 +369,18 @@ fn reserialize_bank_fields_with_new_hash<W, R>(
stream_reader: &mut BufReader<R>,
stream_writer: &mut BufWriter<W>,
accounts_hash: &Hash,
incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
) -> Result<(), Error>
where
W: Write,
R: Read,
{
newer::Context::reserialize_bank_fields_with_hash(stream_reader, stream_writer, accounts_hash)
newer::Context::reserialize_bank_fields_with_hash(
stream_reader,
stream_writer,
accounts_hash,
incremental_snapshot_persistence,
)
}

/// effectively updates the accounts hash in the serialized bank file on disk
Expand All @@ -384,6 +392,7 @@ pub fn reserialize_bank_with_new_accounts_hash(
bank_snapshots_dir: impl AsRef<Path>,
slot: Slot,
accounts_hash: &Hash,
incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
) -> bool {
let bank_post = snapshot_utils::get_bank_snapshots_dir(bank_snapshots_dir, slot);
let bank_post = bank_post.join(snapshot_utils::get_snapshot_file_name(slot));
Expand All @@ -401,6 +410,7 @@ pub fn reserialize_bank_with_new_accounts_hash(
&mut BufReader::new(file),
&mut BufWriter::new(file_out),
accounts_hash,
incremental_snapshot_persistence,
)
.unwrap();
}
Expand Down
17 changes: 15 additions & 2 deletions runtime/src/serde_snapshot/newer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ impl From<DeserializableVersionedBank> for BankFieldsToDeserialize {
stakes: dvb.stakes,
epoch_stakes: dvb.epoch_stakes,
is_delta: dvb.is_delta,
incremental_snapshot_persistence: None,
}
}
}
Expand Down Expand Up @@ -209,6 +210,7 @@ impl<'a> TypeContext<'a> for Context {
// we can grab it on restart.
// TODO: if we do a snapshot version bump, consider moving this out.
lamports_per_signature,
None::<BankIncrementalSnapshotPersistence>,
)
.serialize(serializer)
}
Expand Down Expand Up @@ -314,6 +316,10 @@ impl<'a> TypeContext<'a> for Context {
bank_fields.fee_rate_governor = bank_fields
.fee_rate_governor
.clone_with_lamports_per_signature(lamports_per_signature);

let incremental_snapshot_persistence = ignore_eof_error(deserialize_from(stream))?;
bank_fields.incremental_snapshot_persistence = incremental_snapshot_persistence;

Ok((bank_fields, accounts_db_fields))
}

Expand All @@ -327,12 +333,13 @@ impl<'a> TypeContext<'a> for Context {
}

/// deserialize the bank from 'stream_reader'
/// modify the accounts_hash
/// modify the accounts_hash and incremental_snapshot_persistence
/// reserialize the bank to 'stream_writer'
fn reserialize_bank_fields_with_hash<R, W>(
stream_reader: &mut BufReader<R>,
stream_writer: &mut BufWriter<W>,
accounts_hash: &Hash,
incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
) -> std::result::Result<(), Box<bincode::ErrorKind>>
where
R: Read,
Expand All @@ -345,6 +352,7 @@ impl<'a> TypeContext<'a> for Context {
let blockhash_queue = RwLock::new(rhs.blockhash_queue.clone());
let hard_forks = RwLock::new(rhs.hard_forks.clone());
let lamports_per_signature = rhs.fee_rate_governor.lamports_per_signature;

let bank = SerializableVersionedBank {
blockhash_queue: &blockhash_queue,
ancestors: &rhs.ancestors,
Expand Down Expand Up @@ -382,7 +390,12 @@ impl<'a> TypeContext<'a> for Context {

bincode::serialize_into(
stream_writer,
&(bank, accounts_db_fields, lamports_per_signature),
&(
bank,
accounts_db_fields,
lamports_per_signature,
incremental_snapshot_persistence,
),
)
}
}
54 changes: 42 additions & 12 deletions runtime/src/serde_snapshot/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ fn test_bank_serialize_style(
serde_style: SerdeStyle,
reserialize_accounts_hash: bool,
update_accounts_hash: bool,
incremental_snapshot_persistence: bool,
) {
solana_logger::setup();
let (genesis_config, _) = create_genesis_config(500);
Expand Down Expand Up @@ -236,8 +237,18 @@ fn test_bank_serialize_style(
} else {
bank2.get_accounts_hash()
};
if reserialize_accounts_hash {
let slot = bank2.slot();

let slot = bank2.slot();
let incremental =
incremental_snapshot_persistence.then(|| BankIncrementalSnapshotPersistence {
full_slot: slot + 1,
full_hash: Hash::new(&[1; 32]),
full_capitalization: 31,
incremental_hash: Hash::new(&[2; 32]),
incremental_capitalization: 32,
});

if reserialize_accounts_hash || incremental_snapshot_persistence {
let temp_dir = TempDir::new().unwrap();
let slot_dir = temp_dir.path().join(slot.to_string());
let post_path = slot_dir.join(slot.to_string());
Expand All @@ -248,21 +259,32 @@ fn test_bank_serialize_style(
let mut f = std::fs::File::create(&pre_path).unwrap();
f.write_all(&buf).unwrap();
}

assert!(reserialize_bank_with_new_accounts_hash(
temp_dir.path(),
slot,
&accounts_hash
&accounts_hash,
incremental.as_ref(),
));
let previous_len = buf.len();
// larger buffer than expected to make sure the file isn't larger than expected
let mut buf_reserialized = vec![0; previous_len + 1];
let sizeof_none = std::mem::size_of::<u64>();
let sizeof_incremental_snapshot_persistence =
std::mem::size_of::<Option<BankIncrementalSnapshotPersistence>>();
let mut buf_reserialized =
vec![0; previous_len + sizeof_incremental_snapshot_persistence + 1];
{
let mut f = std::fs::File::open(post_path).unwrap();
let size = f.read(&mut buf_reserialized).unwrap();
assert_eq!(size, previous_len);
let expected = if !incremental_snapshot_persistence {
previous_len
} else {
previous_len + sizeof_incremental_snapshot_persistence - sizeof_none
};
assert_eq!(size, expected);
buf_reserialized.truncate(size);
}
if update_accounts_hash {
if update_accounts_hash || incremental_snapshot_persistence {
// We cannot guarantee buffer contents are exactly the same if hash is the same.
// Things like hashsets/maps have randomness in their in-mem representations.
// This make serialized bytes not deterministic.
Expand Down Expand Up @@ -310,6 +332,7 @@ fn test_bank_serialize_style(
assert_eq!(dbank.get_balance(&key3.pubkey()), 0);
assert_eq!(dbank.get_accounts_hash(), accounts_hash);
assert!(bank2 == dbank);
assert_eq!(dbank.incremental_snapshot_persistence, incremental);
}

pub(crate) fn reconstruct_accounts_db_via_serialization(
Expand Down Expand Up @@ -358,11 +381,18 @@ fn test_bank_serialize_newer() {
for (reserialize_accounts_hash, update_accounts_hash) in
[(false, false), (true, false), (true, true)]
{
test_bank_serialize_style(
SerdeStyle::Newer,
reserialize_accounts_hash,
update_accounts_hash,
)
for incremental_snapshot_persistence in if reserialize_accounts_hash {
[false, true].to_vec()
} else {
[false].to_vec()
} {
test_bank_serialize_style(
SerdeStyle::Newer,
reserialize_accounts_hash,
update_accounts_hash,
incremental_snapshot_persistence,
)
}
}
}

Expand Down Expand Up @@ -551,7 +581,7 @@ mod test_bank_serialize {

// This some what long test harness is required to freeze the ABI of
// Bank's serialization due to versioned nature
#[frozen_abi(digest = "9vGBt7YfymKUTPWLHVVpQbDtPD7dFDwXRMFkCzwujNqJ")]
#[frozen_abi(digest = "5py4Wkuj5fV2sLyA1MrPg4pGNwMEaygQLnpLyY8MMLGC")]
#[derive(Serialize, AbiExample)]
pub struct BankAbiTestWrapperNewer {
#[serde(serialize_with = "wrapper_newer")]
Expand Down
2 changes: 2 additions & 0 deletions runtime/src/snapshot_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2036,6 +2036,7 @@ pub fn package_and_archive_full_snapshot(
accounts_package.snapshot_links.path(),
accounts_package.slot,
&bank.get_accounts_hash(),
None,
);

let snapshot_package = SnapshotPackage::new(accounts_package, bank.get_accounts_hash());
Expand Down Expand Up @@ -2088,6 +2089,7 @@ pub fn package_and_archive_incremental_snapshot(
accounts_package.snapshot_links.path(),
accounts_package.slot,
&bank.get_accounts_hash(),
None,
);

let snapshot_package = SnapshotPackage::new(accounts_package, bank.get_accounts_hash());
Expand Down