Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve DB validation tool #200

Merged
merged 2 commits into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ pub fn run() -> Result<(), String> {
check.range_end,
check.display,
check.display_value_max,
check.fast,
true,
);
db.dump(check_param).map_err(|e| format!("Check error: {e:?}"))?;
},
Expand Down Expand Up @@ -287,4 +289,9 @@ pub struct Check {
/// Max length for value to display (when using --display).
#[clap(long)]
pub display_value_max: Option<u64>,

/// Sort index to optimize disk access. Requires the largest DB index to be able to fit in
/// memory.
#[clap(long)]
pub fast: bool,
}
140 changes: 121 additions & 19 deletions src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ pub struct CorruptedIndexEntryInfo {

// Only used for DB validation and migration.
pub struct IterState {
pub chunk_index: u64,
pub item_index: u64,
pub total_items: u64,
pub key: Key,
pub rc: u32,
pub value: Vec<u8>,
Expand Down Expand Up @@ -782,8 +783,9 @@ impl HashColumn {
) -> Result<()> {
let tables = self.tables.read();
let source = &tables.index;
let total_chunks = source.id.total_chunks();

for c in start_chunk..source.id.total_chunks() {
for c in start_chunk..total_chunks {
let entries = source.entries(c, log.overlays())?;
for (sub_index, entry) in entries.iter().enumerate() {
if entry.is_empty() {
Expand Down Expand Up @@ -840,8 +842,13 @@ impl HashColumn {
hex(&key),
hex(&pk),
);
let state =
IterStateOrCorrupted::Item(IterState { chunk_index: c, key, rc, value });
let state = IterStateOrCorrupted::Item(IterState {
item_index: c,
total_items: total_chunks,
key,
rc,
value,
});
if !f(state)? {
return Ok(())
}
Expand All @@ -850,29 +857,112 @@ impl HashColumn {
Ok(())
}

fn dump(&self, log: &Log, check_param: &crate::CheckOptions, col: ColId) -> Result<()> {
let start_chunk = check_param.from.unwrap_or(0);
let end_chunk = check_param.bound;
fn iter_index_fast(
&self,
log: &Log,
mut f: impl FnMut(IterStateOrCorrupted) -> Result<bool>,
_start_chunk: u64,
) -> Result<()> {
let tables = self.tables.read();
let index = &tables.index;

let entries = index.sorted_entries()?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could probably use start_chunk as a restart parameter, but would still need to rebuild sorted entries at all restart so probably something that only make sense to put in place if really needed at some point.

let total = entries.len();
for (sub_index, entry) in entries.into_iter().enumerate() {
let (size_tier, offset) = {
let address = entry.address(index.id.index_bits());
(address.size_tier(), address.offset())
};

let value = tables.value[size_tier as usize].get_with_meta(offset, log.overlays());
let (value, rc, pk, compressed) = match value {
Ok(Some(v)) => v,
Ok(None) => {
let value_entry = tables.value[size_tier as usize].dump_entry(offset).ok();
if !f(IterStateOrCorrupted::Corrupted(CorruptedIndexEntryInfo {
chunk_index: sub_index as u64,
sub_index: sub_index as u32,
value_entry,
entry,
error: None,
}))? {
return Ok(())
}
continue
},
Err(e) => {
let value_entry = if let Error::Corruption(_) = &e {
tables.value[size_tier as usize].dump_entry(offset).ok()
} else {
None
};
if !f(IterStateOrCorrupted::Corrupted(CorruptedIndexEntryInfo {
chunk_index: sub_index as u64,
sub_index: sub_index as u32,
value_entry,
entry,
error: Some(e),
}))? {
return Ok(())
}
continue
},
};
let value = if compressed { self.compression.decompress(&value)? } else { value };
log::debug!(
target: "parity-db",
"{}: Iterating at {}/{}, pk={:?}",
index.id,
sub_index,
total,
hex(&pk),
);
let state = IterStateOrCorrupted::Item(IterState {
item_index: sub_index as u64,
total_items: total as u64,
key: Default::default(),
rc,
value,
});
if !f(state)? {
return Ok(())
}
}
Ok(())
}

fn dump(&self, log: &Log, check_params: &crate::CheckOptions, col: ColId) -> Result<()> {
let start_chunk = check_params.from.unwrap_or(0);
let end_chunk = check_params.bound;

let step = 10000;
let mut next_info_chunk = step;
let step = if check_params.fast { 1_000_000 } else { 10_000 };
let (denom, suffix) = if check_params.fast { (1_000_000, "m") } else { (1_000, "k") };
let mut next_info_at = step;
let start_time = std::time::Instant::now();
let total_chunks = self.tables.read().index.id.total_chunks();
let index_id = self.tables.read().index.id;
log::info!(target: "parity-db", "Column {} (hash): Starting index validation", col);
self.iter_index_internal(
let iter_fn =
if check_params.fast { Self::iter_index_fast } else { Self::iter_index_internal };
iter_fn(
self,
log,
|state| match state {
IterStateOrCorrupted::Item(IterState { chunk_index, key, rc, value }) => {
if Some(chunk_index) == end_chunk {
IterStateOrCorrupted::Item(IterState {
item_index,
total_items,
key,
rc,
value,
}) => {
if Some(item_index) == end_chunk {
return Ok(false)
}
if chunk_index >= next_info_chunk {
next_info_chunk += step;
log::info!(target: "parity-db", "Validated {} / {} chunks", chunk_index, total_chunks);
if item_index >= next_info_at {
next_info_at += step;
log::info!(target: "parity-db", "Validated {}{} / {}{} entries", item_index / denom, suffix, total_items / denom, suffix);
}

match check_param.display {
match check_params.display {
CheckDisplay::Full => {
log::info!(
"Index key: {:x?}\n \
Expand Down Expand Up @@ -913,6 +1003,18 @@ impl HashColumn {
)?;

log::info!(target: "parity-db", "Index validation complete successfully, elapsed {:?}", start_time.elapsed());
if check_params.validate_free_refs {
log::info!(target: "parity-db", "Validating free refs");
let tables = self.tables.read();
let mut total = 0;
for t in &tables.value {
match t.check_free_refs() {
Err(e) => log::warn!(target: "parity-db", "{}: Error: {:?}", t.id, e),
Ok(n) => total += n,
}
}
log::info!(target: "parity-db", "{} Total free refs", total);
}
Ok(())
}

Expand Down Expand Up @@ -1159,9 +1261,9 @@ impl Column {
}
}

pub fn dump(&self, log: &Log, check_param: &crate::CheckOptions, col: ColId) -> Result<()> {
pub fn dump(&self, log: &Log, check_params: &crate::CheckOptions, col: ColId) -> Result<()> {
match self {
Column::Hash(column) => column.dump(log, check_param, col),
Column::Hash(column) => column.dump(log, check_params, col),
Column::Tree(_column) => Ok(()),
}
}
Expand Down
8 changes: 7 additions & 1 deletion src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1462,6 +1462,10 @@ pub mod check {
pub bound: Option<u64>,
/// Verbosity.
pub display: CheckDisplay,
/// Ordered validation.
pub fast: bool,
/// Make sure free lists are correct.
pub validate_free_refs: bool,
}

impl CheckOptions {
Expand All @@ -1472,6 +1476,8 @@ pub mod check {
bound: Option<u64>,
display_content: bool,
truncate_value_display: Option<u64>,
fast: bool,
validate_free_refs: bool,
) -> Self {
let display = if display_content {
match truncate_value_display {
Expand All @@ -1481,7 +1487,7 @@ pub mod check {
} else {
CheckDisplay::None
};
CheckOptions { column, from, bound, display }
CheckOptions { column, from, bound, display, fast, validate_free_refs }
}
}
}
Expand Down
29 changes: 29 additions & 0 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,13 @@ impl IndexTable {
Ok(try_io!(Ok(ptr)))
}

fn chunk_entries_at(index: u64, map: &memmap2::MmapMut) -> Result<&[Entry; CHUNK_ENTRIES]> {
let offset = META_SIZE + index as usize * CHUNK_LEN;
let ptr = unsafe {
&*(map[offset..offset + CHUNK_LEN].as_ptr() as *const [Entry; CHUNK_ENTRIES])
};
Ok(try_io!(Ok(ptr)))
}
#[cfg(target_arch = "x86_64")]
fn find_entry(
&self,
Expand Down Expand Up @@ -365,6 +372,28 @@ impl IndexTable {
Ok(Self::transmute_chunk(EMPTY_CHUNK))
}

pub fn sorted_entries(&self) -> Result<Vec<Entry>> {
log::info!(target: "parity-db", "{}: Loading into memory", self.id);
let mut target = Vec::with_capacity(self.id.total_entries() as usize / 2);
if let Some(map) = &*self.map.read() {
for chunk_index in 0..self.id.total_chunks() {
let source = Self::chunk_entries_at(chunk_index, map)?;
for e in source {
if !e.is_empty() {
target.push(*e);
}
}
}
}
log::info!(target: "parity-db", "{}: Sorting index", self.id);
target.sort_unstable_by(|a, b| {
let a = a.address(self.id.index_bits());
let b = b.address(self.id.index_bits());
a.size_tier().cmp(&b.size_tier()).then_with(|| a.offset().cmp(&b.offset()))
});
Ok(target)
}

#[inline(always)]
fn transmute_chunk(chunk: [u8; CHUNK_LEN]) -> [Entry; CHUNK_ENTRIES] {
let mut result: [Entry; CHUNK_ENTRIES] = unsafe { std::mem::transmute(chunk) };
Expand Down
2 changes: 1 addition & 1 deletion src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub fn migrate(from: &Path, mut to: Options, overwrite: bool, force_migrate: &[u
log::info!("Migrating col {}", c);
source.iter_column_index_while(
c,
|IterState { chunk_index: index, key, rc, mut value }| {
|IterState { item_index: index, key, rc, mut value, .. }| {
//TODO: more efficient ref migration
for _ in 0..rc {
let value = std::mem::take(&mut value);
Expand Down
39 changes: 37 additions & 2 deletions src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,13 @@ impl ValueTable {
if filled == 0 {
filled = 1;
}
log::debug!(target: "parity-db", "Opened value table {} with {} entries, entry_size={}", id, filled, entry_size);
if last_removed >= filled {
return Err(crate::error::Error::Corruption(format!(
"Bad removed ref {} out of {}",
last_removed, filled
)))
}
log::debug!(target: "parity-db", "Opened value table {} with {} entries, entry_size={}, removed={}", id, filled, entry_size, last_removed);
}

Ok(ValueTable {
Expand Down Expand Up @@ -629,11 +635,19 @@ impl ValueTable {

pub fn read_next_free(&self, index: u64, log: &LogWriter) -> Result<u64> {
let mut buf = PartialEntry::new_uninit();
let filled = self.filled.load(Ordering::Relaxed);
if !log.value(self.id, index, buf.as_mut()) {
self.file.read_at(buf.as_mut(), index * self.entry_size as u64)?;
}
buf.skip_size();
Ok(buf.read_next())
let next = buf.read_next();
if next >= filled {
return Err(crate::error::Error::Corruption(format!(
"Bad removed ref {} out of {}",
next, filled
)))
}
Ok(next)
}

pub fn read_next_part(&self, index: u64, log: &LogWriter) -> Result<Option<u64>> {
Expand Down Expand Up @@ -1046,6 +1060,27 @@ impl ValueTable {
}
Ok(())
}

/// Validate free records sequence.
pub fn check_free_refs(&self) -> Result<u64> {
let filled = self.filled.load(Ordering::Relaxed);
let mut next = self.last_removed.load(Ordering::Relaxed);
let mut len = 0;
while next != 0 {
if next >= filled {
return Err(crate::error::Error::Corruption(format!(
"Bad removed ref {} out of {}",
next, filled
)))
}
let mut buf = PartialEntry::new_uninit();
self.file.read_at(buf.as_mut(), next * self.entry_size as u64)?;
buf.skip_size();
next = buf.read_next();
len += 1;
}
Ok(len)
}
}

pub mod key {
Expand Down