Skip to content

Commit

Permalink
Compact database on finalization (#1871)
Browse files Browse the repository at this point in the history
## Issue Addressed

Closes #1866

## Proposed Changes

* Compact the database on finalization. This removes the deleted states from disk completely. Because it happens in the background migrator, it doesn't block other database operations while it runs. On my Medalla node it took about 1 minute and shrank the database from 90GB to 9GB.
* Fix an inefficiency in the pruning algorithm where it would always use the genesis checkpoint as the `old_finalized_checkpoint` when running for the first time after start-up. This would result in loading lots of states one-at-a-time back to genesis, and storing a lot of block roots in memory. The new code stores the old finalized checkpoint on disk and only uses genesis if no checkpoint is already stored. This makes it both backwards compatible _and_ forwards compatible -- no schema change required!
* Introduce two new `INFO` logs to indicate when pruning has started and completed. Users seem to want to know this information without enabling debug logs!
  • Loading branch information
michaelsproul committed Nov 9, 2020
1 parent b711cfe commit 556190f
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 27 deletions.
59 changes: 35 additions & 24 deletions beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::errors::BeaconChainError;
use crate::head_tracker::{HeadTracker, SszHeadTracker};
use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT};
use parking_lot::Mutex;
use slog::{debug, warn, Logger};
use slog::{debug, error, info, warn, Logger};
use std::collections::{HashMap, HashSet};
use std::mem;
use std::sync::mpsc;
Expand All @@ -29,7 +29,6 @@ pub struct BackgroundMigrator<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
thread::JoinHandle<()>,
)>,
>,
latest_checkpoint: Arc<Mutex<Checkpoint>>,
/// Genesis block root, for persisting the `PersistedBeaconChain`.
genesis_block_root: Hash256,
log: Logger,
Expand Down Expand Up @@ -74,7 +73,6 @@ pub struct MigrationNotification<E: EthSpec> {
finalized_state: BeaconState<E>,
finalized_checkpoint: Checkpoint,
head_tracker: Arc<HeadTracker>,
latest_checkpoint: Arc<Mutex<Checkpoint>>,
genesis_block_root: Hash256,
}

Expand All @@ -91,14 +89,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
} else {
Some(Mutex::new(Self::spawn_thread(db.clone(), log.clone())))
};
let latest_checkpoint = Arc::new(Mutex::new(Checkpoint {
root: Hash256::zero(),
epoch: Epoch::new(0),
}));
Self {
db,
tx_thread,
latest_checkpoint,
genesis_block_root,
log,
}
Expand All @@ -121,7 +114,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
finalized_state,
finalized_checkpoint,
head_tracker,
latest_checkpoint: self.latest_checkpoint.clone(),
genesis_block_root: self.genesis_block_root,
};

Expand Down Expand Up @@ -164,7 +156,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
notif: MigrationNotification<E>,
log: &Logger,
) {
let mut latest_checkpoint = notif.latest_checkpoint.lock();
let finalized_state_root = notif.finalized_state_root;
let finalized_state = notif.finalized_state;

Expand All @@ -173,11 +164,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
notif.head_tracker,
finalized_state_root,
&finalized_state,
*latest_checkpoint,
notif.finalized_checkpoint,
notif.genesis_block_root,
log,
) {
Ok(PruningOutcome::Successful) => {}
Ok(PruningOutcome::DeferredConcurrentMutation) => {
warn!(
log,
Expand All @@ -186,18 +177,13 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
);
return;
}
Ok(PruningOutcome::Successful) => {
// Update the migrator's idea of the latest checkpoint only if the
// pruning process was successful.
*latest_checkpoint = notif.finalized_checkpoint;
}
Err(e) => {
warn!(log, "Block pruning failed"; "error" => format!("{:?}", e));
return;
}
};

match migrate_database(db, finalized_state_root.into(), &finalized_state) {
match migrate_database(db.clone(), finalized_state_root.into(), &finalized_state) {
Ok(()) => {}
Err(Error::HotColdDBError(HotColdDBError::FreezeSlotUnaligned(slot))) => {
debug!(
Expand All @@ -212,8 +198,20 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
"Database migration failed";
"error" => format!("{:?}", e)
);
return;
}
};

// Finally, compact the database so that new free space is properly reclaimed.
debug!(log, "Starting database compaction");
if let Err(e) = db.compact() {
error!(
log,
"Database compaction failed";
"error" => format!("{:?}", e)
);
}
debug!(log, "Database compaction complete");
}

/// Spawn a new child thread to run the migration process.
Expand Down Expand Up @@ -244,11 +242,18 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
head_tracker: Arc<HeadTracker>,
new_finalized_state_hash: BeaconStateHash,
new_finalized_state: &BeaconState<E>,
old_finalized_checkpoint: Checkpoint,
new_finalized_checkpoint: Checkpoint,
genesis_block_root: Hash256,
log: &Logger,
) -> Result<PruningOutcome, BeaconChainError> {
let old_finalized_checkpoint =
store
.load_pruning_checkpoint()?
.unwrap_or_else(|| Checkpoint {
epoch: Epoch::new(0),
root: Hash256::zero(),
});

let old_finalized_slot = old_finalized_checkpoint
.epoch
.start_slot(E::slots_per_epoch());
Expand All @@ -267,15 +272,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
.into());
}

debug!(
info!(
log,
"Starting database pruning";
"old_finalized_epoch" => old_finalized_checkpoint.epoch,
"old_finalized_root" => format!("{:?}", old_finalized_checkpoint.root),
"new_finalized_epoch" => new_finalized_checkpoint.epoch,
"new_finalized_root" => format!("{:?}", new_finalized_checkpoint.root),
);

// For each slot between the new finalized checkpoint and the old finalized checkpoint,
// collect the beacon block root and state root of the canonical chain.
let newly_finalized_chain: HashMap<Slot, (SignedBeaconBlockHash, BeaconStateHash)> =
Expand Down Expand Up @@ -303,7 +305,13 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
let mut abandoned_heads: HashSet<Hash256> = HashSet::new();

let heads = head_tracker.heads();
debug!(log, "Pruning {} heads", heads.len());
debug!(
log,
"Extra pruning information";
"old_finalized_root" => format!("{:?}", old_finalized_checkpoint.root),
"new_finalized_root" => format!("{:?}", new_finalized_checkpoint.root),
"head_count" => heads.len(),
);

for (head_hash, head_slot) in heads {
let mut potentially_abandoned_head = Some(head_hash);
Expand Down Expand Up @@ -457,8 +465,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
drop(head_tracker_lock);
kv_batch.push(persisted_head.as_kv_store_op(BEACON_CHAIN_DB_KEY));

// Persist the new finalized checkpoint as the pruning checkpoint.
kv_batch.push(store.pruning_checkpoint_store_op(new_finalized_checkpoint));

store.hot_db.do_atomically(kv_batch)?;
debug!(log, "Database pruning complete");
info!(log, "Database pruning complete");

Ok(PruningOutcome::Successful)
}
Expand Down
3 changes: 2 additions & 1 deletion beacon_node/store/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ where
{
/// Clean up the database by performing one-off maintenance at start-up.
pub fn remove_garbage(&self) -> Result<(), Error> {
self.delete_temp_states()
self.delete_temp_states()?;
Ok(())
}

/// Delete the temporary states that were leftover by failed block imports.
Expand Down
22 changes: 21 additions & 1 deletion beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use crate::leveldb_store::BytesKey;
use crate::leveldb_store::LevelDB;
use crate::memory_store::MemoryStore;
use crate::metadata::{
SchemaVersion, CONFIG_KEY, CURRENT_SCHEMA_VERSION, SCHEMA_VERSION_KEY, SPLIT_KEY,
PruningCheckpoint, SchemaVersion, CONFIG_KEY, CURRENT_SCHEMA_VERSION, PRUNING_CHECKPOINT_KEY,
SCHEMA_VERSION_KEY, SPLIT_KEY,
};
use crate::metrics;
use crate::{
Expand Down Expand Up @@ -924,6 +925,25 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
})
}
}

/// Run a compaction pass to free up space used by deleted states.
pub fn compact(&self) -> Result<(), Error> {
self.hot_db.compact()?;
Ok(())
}

/// Load the checkpoint to begin pruning from (the "old finalized checkpoint").
pub fn load_pruning_checkpoint(&self) -> Result<Option<Checkpoint>, Error> {
Ok(self
.hot_db
.get(&PRUNING_CHECKPOINT_KEY)?
.map(|pc: PruningCheckpoint| pc.checkpoint))
}

/// Create a staged store for the pruning checkpoint.
pub fn pruning_checkpoint_store_op(&self, checkpoint: Checkpoint) -> KeyValueStoreOp {
PruningCheckpoint { checkpoint }.as_kv_store_op(PRUNING_CHECKPOINT_KEY)
}
}

/// Advance the split point of the store, moving new finalized states to the freezer.
Expand Down
22 changes: 22 additions & 0 deletions beacon_node/store/src/leveldb_store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::*;
use crate::metrics;
use db_key::Key;
use leveldb::compaction::Compaction;
use leveldb::database::batch::{Batch, Writebatch};
use leveldb::database::kv::KV;
use leveldb::database::Database;
Expand Down Expand Up @@ -152,6 +153,27 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
fn begin_rw_transaction(&self) -> MutexGuard<()> {
self.transaction_mutex.lock()
}

/// Compact all values in the states and states flag columns.
fn compact(&self) -> Result<(), Error> {
let endpoints = |column: DBColumn| {
(
BytesKey::from_vec(get_key_for_col(column.as_str(), Hash256::zero().as_bytes())),
BytesKey::from_vec(get_key_for_col(
column.as_str(),
Hash256::repeat_byte(0xff).as_bytes(),
)),
)
};

for (start_key, end_key) in vec![
endpoints(DBColumn::BeaconStateTemporary),
endpoints(DBColumn::BeaconState),
] {
self.db.compact(&start_key, &end_key);
}
Ok(())
}
}

impl<E: EthSpec> ItemStore<E> for LevelDB<E> {}
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
/// This doesn't prevent other threads writing to the DB unless they also use
/// this method. In future we may implement a safer mandatory locking scheme.
fn begin_rw_transaction(&self) -> MutexGuard<()>;

/// Compact the database, freeing space used by deleted items.
fn compact(&self) -> Result<(), Error>;
}

pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec<u8> {
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/store/src/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ impl<E: EthSpec> KeyValueStore<E> for MemoryStore<E> {
fn begin_rw_transaction(&self) -> MutexGuard<()> {
self.transaction_mutex.lock()
}

fn compact(&self) -> Result<(), Error> {
Ok(())
}
}

impl<E: EthSpec> ItemStore<E> for MemoryStore<E> {}
27 changes: 26 additions & 1 deletion beacon_node/store/src/metadata.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{DBColumn, Error, StoreItem};
use ssz::{Decode, Encode};
use types::Hash256;
use types::{Checkpoint, Hash256};

pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(2);

Expand All @@ -10,6 +10,7 @@ pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(2);
pub const SCHEMA_VERSION_KEY: Hash256 = Hash256::repeat_byte(0);
pub const CONFIG_KEY: Hash256 = Hash256::repeat_byte(1);
pub const SPLIT_KEY: Hash256 = Hash256::repeat_byte(2);
pub const PRUNING_CHECKPOINT_KEY: Hash256 = Hash256::repeat_byte(3);

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct SchemaVersion(pub u64);
Expand All @@ -33,3 +34,27 @@ impl StoreItem for SchemaVersion {
Ok(SchemaVersion(u64::from_ssz_bytes(bytes)?))
}
}

/// The checkpoint used for pruning the database.
///
/// Updated whenever pruning is successful.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct PruningCheckpoint {
pub checkpoint: Checkpoint,
}

impl StoreItem for PruningCheckpoint {
fn db_column() -> DBColumn {
DBColumn::BeaconMeta
}

fn as_store_bytes(&self) -> Vec<u8> {
self.checkpoint.as_ssz_bytes()
}

fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
Ok(PruningCheckpoint {
checkpoint: Checkpoint::from_ssz_bytes(bytes)?,
})
}
}

0 comments on commit 556190f

Please sign in to comment.