Skip to content

Commit

Permalink
Flushes append vec mmaps only if dirty (anza-xyz#1676)
Browse files Browse the repository at this point in the history
  • Loading branch information
brooksprumo authored and samkim-crypto committed Jul 31, 2024
1 parent 48a4a5e commit 14be179
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 13 deletions.
10 changes: 9 additions & 1 deletion accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ use {
ancient_append_vecs::{
get_ancient_append_vec_capacity, is_ancient, AccountsToStore, StorageSelector,
},
append_vec::{aligned_stored_size, APPEND_VEC_MMAPPED_FILES_OPEN, STORE_META_OVERHEAD},
append_vec::{
aligned_stored_size, APPEND_VEC_MMAPPED_FILES_DIRTY, APPEND_VEC_MMAPPED_FILES_OPEN,
STORE_META_OVERHEAD,
},
cache_hash_data::{
CacheHashData, CacheHashDataFileReference, DeletionPolicy as CacheHashDeletionPolicy,
},
Expand Down Expand Up @@ -1905,6 +1908,11 @@ impl LatestAccountsIndexRootsStats {
"append_vecs_open",
APPEND_VEC_MMAPPED_FILES_OPEN.load(Ordering::Relaxed) as i64,
i64
),
(
"append_vecs_dirty",
APPEND_VEC_MMAPPED_FILES_DIRTY.load(Ordering::Relaxed),
i64
)
);

Expand Down
76 changes: 64 additions & 12 deletions accounts-db/src/append_vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use {
path::{Path, PathBuf},
ptr,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Mutex,
},
},
Expand Down Expand Up @@ -212,7 +212,19 @@ struct AccountOffsets {
#[derive(Debug)]
enum AppendVecFileBacking {
/// A file-backed block of memory that is used to store the data for each appended item.
MmapOnly(MmapMut),
MmapOnly(MmapOnly),
}

#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
#[derive(Debug)]
struct MmapOnly {
mmap: MmapMut,
/// Flags if the mmap is dirty or not.
/// Since fastboot requires that all mmaps are flushed to disk, be smart about it.
/// AppendVecs are (almost) always write-once. The common case is that an AppendVec
/// will only need to be flushed once. This avoids unnecessary syscalls/kernel work
/// when nothing in the AppendVec has changed.
is_dirty: AtomicBool,
}

/// A thread-safe, file-backed block of memory used to store `Account` instances. Append operations
Expand Down Expand Up @@ -240,11 +252,19 @@ pub struct AppendVec {

lazy_static! {
pub static ref APPEND_VEC_MMAPPED_FILES_OPEN: AtomicU64 = AtomicU64::default();
pub static ref APPEND_VEC_MMAPPED_FILES_DIRTY: AtomicU64 = AtomicU64::default();
}

impl Drop for AppendVec {
fn drop(&mut self) {
APPEND_VEC_MMAPPED_FILES_OPEN.fetch_sub(1, Ordering::Relaxed);
match &self.backing {
AppendVecFileBacking::MmapOnly(mmap_only) => {
if mmap_only.is_dirty.load(Ordering::Acquire) {
APPEND_VEC_MMAPPED_FILES_DIRTY.fetch_sub(1, Ordering::Relaxed);
}
}
}
if let Err(_err) = remove_file(&self.path) {
// promote this to panic soon.
// disabled due to many false positive warnings while running tests.
Expand Down Expand Up @@ -303,7 +323,10 @@ impl AppendVec {

AppendVec {
path: file,
backing: AppendVecFileBacking::MmapOnly(mmap),
backing: AppendVecFileBacking::MmapOnly(MmapOnly {
mmap,
is_dirty: AtomicBool::new(false),
}),
// This mutex forces append to be single threaded, but concurrent with reads
// See UNSAFE usage in `append_ptr`
append_lock: Mutex::new(()),
Expand Down Expand Up @@ -335,7 +358,15 @@ impl AppendVec {

pub fn flush(&self) -> Result<()> {
match &self.backing {
AppendVecFileBacking::MmapOnly(mmap) => Ok(mmap.flush()?),
AppendVecFileBacking::MmapOnly(mmap_only) => {
// Check to see if the mmap is actually dirty before flushing.
if mmap_only.is_dirty.load(Ordering::Acquire) {
mmap_only.mmap.flush()?;
mmap_only.is_dirty.store(false, Ordering::Release);
APPEND_VEC_MMAPPED_FILES_DIRTY.fetch_sub(1, Ordering::Relaxed);
}
Ok(())
}
}
}

Expand Down Expand Up @@ -419,7 +450,10 @@ impl AppendVec {

Ok(AppendVec {
path,
backing: AppendVecFileBacking::MmapOnly(mmap),
backing: AppendVecFileBacking::MmapOnly(MmapOnly {
mmap,
is_dirty: AtomicBool::new(false),
}),
append_lock: Mutex::new(()),
current_len: AtomicUsize::new(current_len),
file_size,
Expand Down Expand Up @@ -484,8 +518,8 @@ impl AppendVec {
fn append_ptr(&self, offset: &mut usize, src: *const u8, len: usize) {
let pos = u64_align!(*offset);
match &self.backing {
AppendVecFileBacking::MmapOnly(mmap) => {
let data = &mmap[pos..(pos + len)];
AppendVecFileBacking::MmapOnly(mmap_only) => {
let data = &mmap_only.mmap[pos..(pos + len)];
//UNSAFE: This mut append is safe because only 1 thread can append at a time
//Mutex<()> guarantees exclusive write access to the memory occupied in
//the range.
Expand Down Expand Up @@ -547,7 +581,7 @@ impl AppendVec {
mut callback: impl for<'local> FnMut(StoredAccountMeta<'local>) -> Ret,
) -> Option<Ret> {
match &self.backing {
AppendVecFileBacking::MmapOnly(mmap) => {
AppendVecFileBacking::MmapOnly(MmapOnly { mmap, .. }) => {
let slice = self.get_valid_slice_from_mmap(mmap);
let (meta, next): (&StoredMeta, _) = Self::get_type(slice, offset)?;
let (account_meta, next): (&AccountMeta, _) = Self::get_type(slice, next)?;
Expand Down Expand Up @@ -647,7 +681,7 @@ impl AppendVec {
pub(crate) fn scan_index(&self, mut callback: impl FnMut(IndexInfo)) {
let mut offset = 0;
match &self.backing {
AppendVecFileBacking::MmapOnly(mmap) => {
AppendVecFileBacking::MmapOnly(MmapOnly { mmap, .. }) => {
let slice = self.get_valid_slice_from_mmap(mmap);
loop {
let Some((stored_meta, next)) = Self::get_type::<StoredMeta>(slice, offset)
Expand Down Expand Up @@ -710,7 +744,7 @@ impl AppendVec {
pub(crate) fn get_account_sizes(&self, sorted_offsets: &[usize]) -> Vec<usize> {
let mut result = Vec::with_capacity(sorted_offsets.len());
match &self.backing {
AppendVecFileBacking::MmapOnly(mmap) => {
AppendVecFileBacking::MmapOnly(MmapOnly { mmap, .. }) => {
let slice = self.get_valid_slice_from_mmap(mmap);
for &offset in sorted_offsets {
let Some((stored_meta, _)) = Self::get_type::<StoredMeta>(slice, offset) else {
Expand All @@ -736,7 +770,7 @@ impl AppendVec {
pub(crate) fn scan_pubkeys(&self, mut callback: impl FnMut(&Pubkey)) {
let mut offset = 0;
match &self.backing {
AppendVecFileBacking::MmapOnly(mmap) => {
AppendVecFileBacking::MmapOnly(MmapOnly { mmap, .. }) => {
let slice = self.get_valid_slice_from_mmap(mmap);
loop {
let Some((stored_meta, _)) = Self::get_type::<StoredMeta>(slice, offset) else {
Expand Down Expand Up @@ -813,6 +847,22 @@ impl AppendVec {
});
}

match &self.backing {
AppendVecFileBacking::MmapOnly(mmap_only) => {
if !offsets.is_empty() {
// If we've actually written to the AppendVec, make sure we mark it as dirty.
// This ensures we properly flush it later.
// As an optimization to reduce unnecessary cache line invalidations,
// only write the `is_dirty` atomic if currently *not* dirty.
// (This also ensures the 'dirty counter' datapoint is correct.)
if !mmap_only.is_dirty.load(Ordering::Acquire) {
mmap_only.is_dirty.store(true, Ordering::Release);
APPEND_VEC_MMAPPED_FILES_DIRTY.fetch_add(1, Ordering::Relaxed);
}
}
}
}

(!offsets.is_empty()).then(|| {
// The last entry in the offsets needs to be the u64 aligned `offset`, because that's
// where the *next* entry will begin to be stored.
Expand All @@ -836,7 +886,9 @@ impl AppendVec {
pub(crate) fn internals_for_archive(&self) -> InternalsForArchive {
match &self.backing {
// note this returns the entire mmap slice, even bytes that we consider invalid
AppendVecFileBacking::MmapOnly(mmap) => InternalsForArchive::Mmap(mmap),
AppendVecFileBacking::MmapOnly(MmapOnly { mmap, .. }) => {
InternalsForArchive::Mmap(mmap)
}
}
}
}
Expand Down

0 comments on commit 14be179

Please sign in to comment.