Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Implement database temp states to reduce memory usage #1798

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1498,7 +1498,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let block_root = fully_verified_block.block_root;
let mut state = fully_verified_block.state;
let current_slot = self.slot()?;
let mut ops = fully_verified_block.intermediate_states;
let mut ops = fully_verified_block.confirmation_db_batch;

let attestation_observation_timer =
metrics::start_timer(&metrics::BLOCK_PROCESSING_ATTESTATION_OBSERVATION);
Expand Down Expand Up @@ -1623,13 +1623,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

let db_write_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_WRITE);

// Store all the states between the parent block state and this block's slot, the block and state.
ops.push(StoreOp::PutBlock(block_root.into(), signed_block.clone()));
ops.push(StoreOp::PutState(
block.state_root.into(),
Cow::Borrowed(&state),
// Store the block and its state, and execute the confirmation batch for the intermediate
// states, which will delete their temporary flags.
ops.push(StoreOp::PutBlock(
block_root,
Box::new(signed_block.clone()),
));
ops.push(StoreOp::PutState(block.state_root, &state));
let txn_lock = self.store.hot_db.begin_rw_transaction();
self.store.do_atomically(ops)?;
drop(txn_lock);

// The fork choice write-lock is dropped *after* the on-disk database has been updated.
// This prevents inconsistency between the two at the expense of concurrency.
Expand Down
50 changes: 34 additions & 16 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use std::borrow::Cow;
use std::convert::TryFrom;
use std::fs;
use std::io::Write;
use store::{Error as DBError, HotColdDB, HotStateSummary, StoreOp};
use store::{Error as DBError, HotColdDB, HotStateSummary, KeyValueStore, StoreOp};
use tree_hash::TreeHash;
use types::{
BeaconBlock, BeaconState, BeaconStateError, ChainSpec, CloneConfig, EthSpec, Hash256,
Expand Down Expand Up @@ -363,7 +363,7 @@ pub struct FullyVerifiedBlock<'a, T: BeaconChainTypes> {
pub block_root: Hash256,
pub state: BeaconState<T::EthSpec>,
pub parent_block: SignedBeaconBlock<T::EthSpec>,
pub intermediate_states: Vec<StoreOp<'a, T::EthSpec>>,
pub confirmation_db_batch: Vec<StoreOp<'a, T::EthSpec>>,
}

/// Implemented on types that can be converted into a `FullyVerifiedBlock`.
Expand Down Expand Up @@ -676,9 +676,9 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> {

let catchup_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_CATCHUP_STATE);

// Keep a batch of any states that were "skipped" (block-less) in between the parent state
// slot and the block slot. These will be stored in the database.
let mut intermediate_states: Vec<StoreOp<T::EthSpec>> = Vec::new();
// Stage a batch of operations to be completed atomically if this block is imported
// successfully.
let mut confirmation_db_batch = vec![];

// The block must have a higher slot than its parent.
if block.slot() <= parent.beacon_state.slot {
Expand All @@ -702,18 +702,36 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> {
// processing, but we get early access to it.
let state_root = state.update_tree_hash_cache()?;

let op = if state.slot % T::EthSpec::slots_per_epoch() == 0 {
StoreOp::PutState(
state_root.into(),
Cow::Owned(state.clone_with(CloneConfig::committee_caches_only())),
)
// Store the state immediately, marking it as temporary, and staging the deletion
// of its temporary status as part of the larger atomic operation.
let txn_lock = chain.store.hot_db.begin_rw_transaction();
let state_already_exists =
chain.store.load_hot_state_summary(&state_root)?.is_some();

let state_batch = if state_already_exists {
// If the state exists, it could be temporary or permanent, but in neither case
// should we rewrite it or store a new temporary flag for it. We *will* stage
// the temporary flag for deletion because it's OK to double-delete the flag,
// and we don't mind if another thread gets there first.
vec![]
} else {
StoreOp::PutStateSummary(
state_root.into(),
HotStateSummary::new(&state_root, &state)?,
)
vec![
if state.slot % T::EthSpec::slots_per_epoch() == 0 {
StoreOp::PutState(state_root, &state)
} else {
StoreOp::PutStateSummary(
state_root,
HotStateSummary::new(&state_root, &state)?,
)
},
StoreOp::PutStateTemporaryFlag(state_root),
]
};
intermediate_states.push(op);
chain.store.do_atomically(state_batch)?;
drop(txn_lock);

confirmation_db_batch.push(StoreOp::DeleteStateTemporaryFlag(state_root));

state_root
};

Expand Down Expand Up @@ -801,7 +819,7 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> {
block_root,
state,
parent_block: parent.beacon_block,
intermediate_states,
confirmation_db_batch,
})
}
}
Expand Down
5 changes: 1 addition & 4 deletions beacon_node/beacon_chain/src/chain_config.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use serde_derive::{Deserialize, Serialize};
use types::Checkpoint;

/// There is a 693 block skip in the current canonical Medalla chain, we use 700 to be safe.
pub const DEFAULT_IMPORT_BLOCK_MAX_SKIP_SLOTS: u64 = 700;

#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
pub struct ChainConfig {
/// Maximum number of slots to skip when importing a consensus message (e.g., block,
Expand All @@ -20,7 +17,7 @@ pub struct ChainConfig {
impl Default for ChainConfig {
fn default() -> Self {
Self {
import_max_skip_slots: Some(DEFAULT_IMPORT_BLOCK_MAX_SKIP_SLOTS),
import_max_skip_slots: None,
weak_subjectivity_checkpoint: None,
}
}
Expand Down
3 changes: 2 additions & 1 deletion beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,11 +436,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho

let batch: Vec<StoreOp<E>> = abandoned_blocks
.into_iter()
.map(Into::into)
.map(StoreOp::DeleteBlock)
.chain(
abandoned_states
.into_iter()
.map(|(slot, state_hash)| StoreOp::DeleteState(state_hash, slot)),
.map(|(slot, state_hash)| StoreOp::DeleteState(state_hash.into(), Some(slot))),
)
.collect();

Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ where
}
}

fn set_current_slot(&self, slot: Slot) {
pub fn set_current_slot(&self, slot: Slot) {
let current_slot = self.chain.slot().unwrap();
let current_epoch = current_slot.epoch(E::slots_per_epoch());
let epoch = slot.epoch(E::slots_per_epoch());
Expand Down
38 changes: 38 additions & 0 deletions beacon_node/beacon_chain/tests/store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1614,6 +1614,44 @@ fn pruning_test(
check_no_blocks_exist(&harness, stray_blocks.values());
}

#[test]
fn garbage_collect_temp_states_from_failed_block() {
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
let slots_per_epoch = E::slots_per_epoch();

let genesis_state = harness.get_current_state();
let block_slot = Slot::new(2 * slots_per_epoch);
let (mut block, state) = harness.make_block(genesis_state, block_slot);

// Mutate the block to make it invalid, and re-sign it.
block.message.state_root = Hash256::repeat_byte(0xff);
let proposer_index = block.message.proposer_index as usize;
let block = block.message.sign(
&harness.validator_keypairs[proposer_index].sk,
&state.fork,
state.genesis_validators_root,
&harness.spec,
);

// The block should be rejected, but should store a bunch of temporary states.
harness.set_current_slot(block_slot);
harness.process_block_result(block).unwrap_err();

assert_eq!(
store.iter_temporary_state_roots().count(),
block_slot.as_usize() - 1
);

drop(harness);
drop(store);

// On startup, the store should garbage collect all the temporary states.
let store = get_store(&db_path);
assert_eq!(store.iter_temporary_state_roots().count(), 0);
}

/// Check that the head state's slot matches `expected_slot`.
fn check_slot(harness: &TestHarness, expected_slot: u64) {
let state = &harness.chain.head().expect("should get head").beacon_state;
Expand Down
6 changes: 2 additions & 4 deletions beacon_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,13 +312,11 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.long("max-skip-slots")
.help(
"Refuse to skip more than this many slots when processing a block or attestation. \
This prevents nodes on minority forks from wasting our time and RAM, \
but might need to be raised or set to 'none' in times of extreme network \
outage."
This prevents nodes on minority forks from wasting our time and disk space, \
but could also cause unnecessary consensus failures, so is disabled by default."
)
.value_name("NUM_SLOTS")
.takes_value(true)
.default_value("700")
)
.arg(
Arg::with_name("wss-checkpoint")
Expand Down
38 changes: 38 additions & 0 deletions beacon_node/store/src/garbage_collection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//! Garbage collection process that runs at start-up to clean up the database.
use crate::hot_cold_store::HotColdDB;
use crate::{Error, LevelDB, StoreOp};
use slog::debug;
use types::EthSpec;

impl<E> HotColdDB<E, LevelDB<E>, LevelDB<E>>
where
E: EthSpec,
{
/// Clean up the database by performing one-off maintenance at start-up.
pub fn remove_garbage(&self) -> Result<(), Error> {
self.delete_temp_states()
}

/// Delete the temporary states that were leftover by failed block imports.
pub fn delete_temp_states(&self) -> Result<(), Error> {
let delete_ops =
self.iter_temporary_state_roots()
.try_fold(vec![], |mut ops, state_root| {
let state_root = state_root?;
ops.push(StoreOp::DeleteState(state_root, None));
ops.push(StoreOp::DeleteStateTemporaryFlag(state_root));
Result::<_, Error>::Ok(ops)
})?;

if !delete_ops.is_empty() {
debug!(
self.log,
"Garbage collecting {} temporary states",
delete_ops.len() / 2
);
self.do_atomically(delete_ops)?;
}

Ok(())
}
}
Loading