Skip to content

Commit

Permalink
Use the database to persist the pubkey cache (#2234)
Browse files Browse the repository at this point in the history
## Issue Addressed

Closes #1787

## Proposed Changes

* Abstract the `ValidatorPubkeyCache` over a "backing" which is either a file (legacy), or the database.
* Implement a migration from schema v2 to schema v3, whereby the contents of the cache file are copied to the DB, and then the file is deleted. The next release to include this change must be a minor version bump, and we will need to warn users of the inability to downgrade (this is our first DB schema change since mainnet genesis).
* Move the schema migration code from the `store` crate into the `beacon_chain` crate so that it can access the datadir and the `ValidatorPubkeyCache`, etc. It gets injected back into the `store` via a closure (similar to what we do in fork choice).
  • Loading branch information
michaelsproul committed Mar 10, 2021
1 parent eea09d0 commit 20c395b
Show file tree
Hide file tree
Showing 17 changed files with 263 additions and 147 deletions.
16 changes: 11 additions & 5 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,22 @@ pub type BeaconForkChoice<T> = ForkChoice<
<T as BeaconChainTypes>::EthSpec,
>;

pub type BeaconStore<T> = Arc<
HotColdDB<
<T as BeaconChainTypes>::EthSpec,
<T as BeaconChainTypes>::HotStore,
<T as BeaconChainTypes>::ColdStore,
>,
>;

/// Represents the "Beacon Chain" component of Ethereum 2.0. Allows import of blocks and block
/// operations and chooses a canonical head.
pub struct BeaconChain<T: BeaconChainTypes> {
pub spec: ChainSpec,
/// Configuration for `BeaconChain` runtime behaviour.
pub config: ChainConfig,
/// Persistent storage for blocks, states, etc. Typically an on-disk store, such as LevelDB.
pub store: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
pub store: BeaconStore<T>,
/// Database migrator for running background maintenance on the store.
pub store_migrator: BackgroundMigrator<T::EthSpec, T::HotStore, T::ColdStore>,
/// Reports the current slot, typically based upon the system clock.
Expand Down Expand Up @@ -237,7 +245,7 @@ pub struct BeaconChain<T: BeaconChainTypes> {
/// Caches the beacon block proposer shuffling for a given epoch and shuffling key root.
pub(crate) beacon_proposer_cache: Mutex<BeaconProposerCache>,
/// Caches a map of `validator_index -> validator_pubkey`.
pub(crate) validator_pubkey_cache: TimeoutRwLock<ValidatorPubkeyCache>,
pub(crate) validator_pubkey_cache: TimeoutRwLock<ValidatorPubkeyCache<T>>,
/// A list of any hard-coded forks that have been disabled.
pub disabled_forks: Vec<String>,
/// Sender given to tasks, so that if they encounter a state in which execution cannot
Expand Down Expand Up @@ -300,9 +308,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

/// Load fork choice from disk, returning `None` if it isn't found.
pub fn load_fork_choice(
store: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
) -> Result<Option<BeaconForkChoice<T>>, Error> {
pub fn load_fork_choice(store: BeaconStore<T>) -> Result<Option<BeaconForkChoice<T>>, Error> {
let persisted_fork_choice =
match store.get_item::<PersistedForkChoice>(&FORK_CHOICE_DB_KEY)? {
Some(fc) => fc,
Expand Down
10 changes: 5 additions & 5 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1336,7 +1336,7 @@ fn cheap_state_advance_to_obtain_committees<'a, E: EthSpec>(
/// Obtains a read-locked `ValidatorPubkeyCache` from the `chain`.
fn get_validator_pubkey_cache<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
) -> Result<RwLockReadGuard<ValidatorPubkeyCache>, BlockError<T::EthSpec>> {
) -> Result<RwLockReadGuard<ValidatorPubkeyCache<T>>, BlockError<T::EthSpec>> {
chain
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
Expand All @@ -1348,11 +1348,11 @@ fn get_validator_pubkey_cache<T: BeaconChainTypes>(
///
/// The signature verifier is empty because it does not yet have any of this block's signatures
/// added to it. Use `Self::apply_to_signature_verifier` to apply the signatures.
fn get_signature_verifier<'a, E: EthSpec>(
state: &'a BeaconState<E>,
validator_pubkey_cache: &'a ValidatorPubkeyCache,
fn get_signature_verifier<'a, T: BeaconChainTypes>(
state: &'a BeaconState<T::EthSpec>,
validator_pubkey_cache: &'a ValidatorPubkeyCache<T>,
spec: &'a ChainSpec,
) -> BlockSignatureVerifier<'a, E, impl Fn(usize) -> Option<Cow<'a, PublicKey>> + Clone> {
) -> BlockSignatureVerifier<'a, T::EthSpec, impl Fn(usize) -> Option<Cow<'a, PublicKey>> + Clone> {
BlockSignatureVerifier::new(
state,
move |validator_index| {
Expand Down
20 changes: 3 additions & 17 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ use types::{
SignedBeaconBlock, Slot,
};

pub const PUBKEY_CACHE_FILENAME: &str = "pubkey_cache.ssz";

/// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing
/// functionality and only exists to satisfy the type system.
pub struct Witness<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>(
Expand Down Expand Up @@ -81,8 +79,7 @@ pub struct BeaconChainBuilder<T: BeaconChainTypes> {
shutdown_sender: Option<Sender<&'static str>>,
head_tracker: Option<HeadTracker>,
data_dir: Option<PathBuf>,
pubkey_cache_path: Option<PathBuf>,
validator_pubkey_cache: Option<ValidatorPubkeyCache>,
validator_pubkey_cache: Option<ValidatorPubkeyCache<T>>,
spec: ChainSpec,
chain_config: ChainConfig,
disabled_forks: Vec<String>,
Expand Down Expand Up @@ -119,7 +116,6 @@ where
slot_clock: None,
shutdown_sender: None,
head_tracker: None,
pubkey_cache_path: None,
data_dir: None,
disabled_forks: Vec::new(),
validator_pubkey_cache: None,
Expand Down Expand Up @@ -182,7 +178,6 @@ where
///
/// Should generally be called early in the build chain.
pub fn data_dir(mut self, path: PathBuf) -> Self {
self.pubkey_cache_path = Some(path.join(PUBKEY_CACHE_FILENAME));
self.data_dir = Some(path);
self
}
Expand Down Expand Up @@ -224,11 +219,6 @@ where
pub fn resume_from_db(mut self) -> Result<Self, String> {
let log = self.log.as_ref().ok_or("resume_from_db requires a log")?;

let pubkey_cache_path = self
.pubkey_cache_path
.as_ref()
.ok_or("resume_from_db requires a data_dir")?;

info!(
log,
"Starting beacon chain";
Expand Down Expand Up @@ -274,7 +264,7 @@ where
.unwrap_or_else(OperationPool::new),
);

let pubkey_cache = ValidatorPubkeyCache::load_from_file(pubkey_cache_path)
let pubkey_cache = ValidatorPubkeyCache::load_from_store(store)
.map_err(|e| format!("Unable to open persisted pubkey cache: {:?}", e))?;

self.genesis_block_root = Some(chain.genesis_block_root);
Expand Down Expand Up @@ -496,12 +486,8 @@ where
}
}

let pubkey_cache_path = self
.pubkey_cache_path
.ok_or("Cannot build without a pubkey cache path")?;

let validator_pubkey_cache = self.validator_pubkey_cache.map(Ok).unwrap_or_else(|| {
ValidatorPubkeyCache::new(&canonical_head.beacon_state, pubkey_cache_path)
ValidatorPubkeyCache::new(&canonical_head.beacon_state, store.clone())
.map_err(|e| format!("Unable to init validator pubkey cache: {:?}", e))
})?;

Expand Down
3 changes: 2 additions & 1 deletion beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod observed_block_producers;
pub mod observed_operations;
mod persisted_beacon_chain;
mod persisted_fork_choice;
pub mod schema_change;
mod shuffling_cache;
mod snapshot_cache;
pub mod state_advance_timer;
Expand All @@ -29,7 +30,7 @@ pub mod validator_monitor;
mod validator_pubkey_cache;

pub use self::beacon_chain::{
AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, ChainSegmentResult,
AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult,
ForkChoiceError, StateSkipConfig, MAXIMUM_GOSSIP_CLOCK_DISPARITY,
};
pub use self::beacon_snapshot::BeaconSnapshot;
Expand Down
64 changes: 64 additions & 0 deletions beacon_node/beacon_chain/src/schema_change.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
//! Utilities for managing database schema changes.
use crate::beacon_chain::BeaconChainTypes;
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use std::fs;
use std::path::Path;
use std::sync::Arc;
use store::hot_cold_store::{HotColdDB, HotColdDBError};
use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION};
use store::Error as StoreError;

const PUBKEY_CACHE_FILENAME: &str = "pubkey_cache.ssz";

/// Migrate the database from one schema version to another, applying all requisite mutations.
pub fn migrate_schema<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
datadir: &Path,
from: SchemaVersion,
to: SchemaVersion,
) -> Result<(), StoreError> {
match (from, to) {
// Migrating from the current schema version to iself is always OK, a no-op.
(_, _) if from == to && to == CURRENT_SCHEMA_VERSION => Ok(()),
// Migrate across multiple versions by recursively migrating one step at a time.
(_, _) if from.as_u64() + 1 < to.as_u64() => {
let next = SchemaVersion(from.as_u64() + 1);
migrate_schema::<T>(db.clone(), datadir, from, next)?;
migrate_schema::<T>(db, datadir, next, to)
}
// Migration from v0.3.0 to v0.3.x, adding the temporary states column.
// Nothing actually needs to be done, but once a DB uses v2 it shouldn't go back.
(SchemaVersion(1), SchemaVersion(2)) => {
db.store_schema_version(to)?;
Ok(())
}
// Migration for removing the pubkey cache.
(SchemaVersion(2), SchemaVersion(3)) => {
let pk_cache_path = datadir.join(PUBKEY_CACHE_FILENAME);

// Load from file, store to DB.
ValidatorPubkeyCache::<T>::load_from_file(&pk_cache_path)
.and_then(|cache| ValidatorPubkeyCache::convert(cache, db.clone()))
.map_err(|e| StoreError::SchemaMigrationError(format!("{:?}", e)))?;

db.store_schema_version(to)?;

// Delete cache file now that keys are stored in the DB.
fs::remove_file(&pk_cache_path).map_err(|e| {
StoreError::SchemaMigrationError(format!(
"unable to delete {}: {:?}",
pk_cache_path.display(),
e
))
})?;

Ok(())
}
// Anything else is an error.
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
target_version: to,
current_version: from,
}
.into()),
}
}
Loading

0 comments on commit 20c395b

Please sign in to comment.