Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Remove DB migrations for legacy database schemas #3181

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub enum BeaconChainError {
BlockSignatureVerifierError(state_processing::block_signature_verifier::Error),
BlockReplayError(BlockReplayError),
DuplicateValidatorPublicKey,
ValidatorPubkeyCacheFileError(String),
ValidatorPubkeyCacheError(String),
ValidatorIndexUnknown(usize),
ValidatorPubkeyUnknown(PublicKeyBytes),
OpPoolError(OpPoolError),
Expand Down
99 changes: 6 additions & 93 deletions beacon_node/beacon_chain/src/schema_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,14 @@ mod migration_schema_v8;
mod migration_schema_v9;
mod types;

use crate::beacon_chain::{BeaconChainTypes, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY};
use crate::beacon_chain::{BeaconChainTypes, FORK_CHOICE_DB_KEY};
use crate::persisted_fork_choice::{PersistedForkChoiceV1, PersistedForkChoiceV7};
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use operation_pool::{PersistedOperationPool, PersistedOperationPoolBase};
use slog::{warn, Logger};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use std::fs;
use std::path::Path;
use std::sync::Arc;
use store::config::OnDiskStoreConfig;
use store::hot_cold_store::{HotColdDB, HotColdDBError};
use store::metadata::{SchemaVersion, CONFIG_KEY, CURRENT_SCHEMA_VERSION};
use store::{DBColumn, Error as StoreError, ItemStore, StoreItem};

const PUBKEY_CACHE_FILENAME: &str = "pubkey_cache.ssz";
use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION};
use store::{Error as StoreError, StoreItem};

/// Migrate the database from one schema version to another, applying all requisite mutations.
pub fn migrate_schema<T: BeaconChainTypes>(
Expand All @@ -39,69 +31,11 @@ pub fn migrate_schema<T: BeaconChainTypes>(
migrate_schema::<T>(db.clone(), datadir, from, next, log.clone())?;
migrate_schema::<T>(db, datadir, next, to, log)
}
// Migration from v0.3.0 to v0.3.x, adding the temporary states column.
// Nothing actually needs to be done, but once a DB uses v2 it shouldn't go back.
(SchemaVersion(1), SchemaVersion(2)) => {
db.store_schema_version(to)?;
Ok(())
}
// Migration for removing the pubkey cache.
(SchemaVersion(2), SchemaVersion(3)) => {
let pk_cache_path = datadir.join(PUBKEY_CACHE_FILENAME);

// Load from file, store to DB.
ValidatorPubkeyCache::<T>::load_from_file(&pk_cache_path)
.and_then(|cache| ValidatorPubkeyCache::convert(cache, db.clone()))
.map_err(|e| StoreError::SchemaMigrationError(format!("{:?}", e)))?;

db.store_schema_version(to)?;

// Delete cache file now that keys are stored in the DB.
fs::remove_file(&pk_cache_path).map_err(|e| {
StoreError::SchemaMigrationError(format!(
"unable to delete {}: {:?}",
pk_cache_path.display(),
e
))
})?;

Ok(())
}
// Migration for adding sync committee contributions to the persisted op pool.
(SchemaVersion(3), SchemaVersion(4)) => {
// Deserialize from what exists in the database using the `PersistedOperationPoolBase`
// variant and convert it to the Altair variant.
let pool_opt = db
.get_item::<PersistedOperationPoolBase<T::EthSpec>>(&OP_POOL_DB_KEY)?
macladson marked this conversation as resolved.
Show resolved Hide resolved
.map(PersistedOperationPool::Base)
.map(PersistedOperationPool::base_to_altair);

if let Some(pool) = pool_opt {
// Store the converted pool under the same key.
db.put_item::<PersistedOperationPool<T::EthSpec>>(&OP_POOL_DB_KEY, &pool)?;
}

db.store_schema_version(to)?;

Ok(())
}
// Migration for weak subjectivity sync support and clean up of `OnDiskStoreConfig` (#1784).
(SchemaVersion(4), SchemaVersion(5)) => {
if let Some(OnDiskStoreConfigV4 {
macladson marked this conversation as resolved.
Show resolved Hide resolved
slots_per_restore_point,
..
}) = db.hot_db.get(&CONFIG_KEY)?
{
let new_config = OnDiskStoreConfig {
slots_per_restore_point,
};
db.hot_db.put(&CONFIG_KEY, &new_config)?;
}

db.store_schema_version(to)?;
//
// Migrations from before SchemaVersion(5) are deprecated.
//

Ok(())
}
// Migration for adding `execution_status` field to the fork choice store.
(SchemaVersion(5), SchemaVersion(6)) => {
// Database operations to be done atomically
Expand Down Expand Up @@ -201,24 +135,3 @@ pub fn migrate_schema<T: BeaconChainTypes>(
.into()),
}
}

// Store config used in v4 schema and earlier.
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
pub struct OnDiskStoreConfigV4 {
pub slots_per_restore_point: u64,
pub _block_cache_size: usize,
}

impl StoreItem for OnDiskStoreConfigV4 {
fn db_column() -> DBColumn {
DBColumn::BeaconMeta
}

fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}

fn from_store_bytes(bytes: &[u8]) -> Result<Self, StoreError> {
Ok(Self::from_ssz_bytes(bytes)?)
}
}
190 changes: 12 additions & 178 deletions beacon_node/beacon_chain/src/validator_pubkey_cache.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use crate::errors::BeaconChainError;
use crate::{BeaconChainTypes, BeaconStore};
use ssz::{Decode, DecodeError, Encode};
use ssz::{Decode, Encode};
use std::collections::HashMap;
use std::convert::TryInto;
use std::fs::File;
use std::io::{self, Read, Write};
use std::path::Path;
use store::{DBColumn, Error as StoreError, StoreItem};
use types::{BeaconState, Hash256, PublicKey, PublicKeyBytes};

Expand All @@ -24,15 +21,7 @@ pub struct ValidatorPubkeyCache<T: BeaconChainTypes> {
pubkeys: Vec<PublicKey>,
indices: HashMap<PublicKeyBytes, usize>,
pubkey_bytes: Vec<PublicKeyBytes>,
backing: PubkeyCacheBacking<T>,
}

/// Abstraction over on-disk backing.
///
/// `File` backing is legacy, `Database` is current.
enum PubkeyCacheBacking<T: BeaconChainTypes> {
File(ValidatorPubkeyCacheFile),
Database(BeaconStore<T>),
store: BeaconStore<T>,
}

impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
Expand All @@ -48,7 +37,7 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
pubkeys: vec![],
indices: HashMap::new(),
pubkey_bytes: vec![],
backing: PubkeyCacheBacking::Database(store),
store,
};

cache.import_new_pubkeys(state)?;
Expand All @@ -66,7 +55,9 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
if let Some(DatabasePubkey(pubkey)) =
store.get_item(&DatabasePubkey::key_for_index(validator_index))?
{
pubkeys.push((&pubkey).try_into().map_err(Error::PubkeyDecode)?);
pubkeys.push((&pubkey).try_into().map_err(|e| {
BeaconChainError::ValidatorPubkeyCacheError(format!("{:?}", e))
})?);
pubkey_bytes.push(pubkey);
indices.insert(pubkey, validator_index);
} else {
Expand All @@ -78,31 +69,10 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
pubkeys,
indices,
pubkey_bytes,
backing: PubkeyCacheBacking::Database(store),
store,
})
}

/// DEPRECATED: used only for migration
pub fn load_from_file<P: AsRef<Path>>(path: P) -> Result<Self, BeaconChainError> {
ValidatorPubkeyCacheFile::open(&path)
.and_then(ValidatorPubkeyCacheFile::into_cache)
.map_err(Into::into)
}

/// Convert a cache using `File` backing to one using `Database` backing.
///
/// This will write all of the keys from `existing_cache` to `store`.
pub fn convert(existing_cache: Self, store: BeaconStore<T>) -> Result<Self, BeaconChainError> {
let mut result = ValidatorPubkeyCache {
pubkeys: Vec::with_capacity(existing_cache.pubkeys.len()),
indices: HashMap::with_capacity(existing_cache.indices.len()),
pubkey_bytes: Vec::with_capacity(existing_cache.indices.len()),
backing: PubkeyCacheBacking::Database(store),
};
result.import(existing_cache.pubkeys.iter().map(PublicKeyBytes::from))?;
Ok(result)
}

/// Scan the given `state` and add any new validator public keys.
///
/// Does not delete any keys from `self` if they don't appear in `state`.
Expand Down Expand Up @@ -146,14 +116,8 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
// The motivation behind this ordering is that we do not want to have states that
// reference a pubkey that is not in our cache. However, it's fine to have pubkeys
// that are never referenced in a state.
match &mut self.backing {
PubkeyCacheBacking::File(persistence_file) => {
persistence_file.append(i, &pubkey)?;
}
PubkeyCacheBacking::Database(store) => {
store.put_item(&DatabasePubkey::key_for_index(i), &DatabasePubkey(pubkey))?;
}
}
self.store
.put_item(&DatabasePubkey::key_for_index(i), &DatabasePubkey(pubkey))?;

self.pubkeys.push(
(&pubkey)
Expand Down Expand Up @@ -219,116 +183,14 @@ impl DatabasePubkey {
}
}

/// Allows for maintaining an on-disk copy of the `ValidatorPubkeyCache`. The file is raw SSZ bytes
/// (not ASCII encoded).
///
/// ## Writes
///
/// Each entry is simply appended to the file.
///
/// ## Reads
///
/// The whole file is parsed as an SSZ "variable list" of objects.
///
/// This parsing method is possible because the items in the list are fixed-length SSZ objects.
struct ValidatorPubkeyCacheFile(File);

#[derive(Debug)]
enum Error {
Io(io::Error),
Ssz(DecodeError),
PubkeyDecode(bls::Error),
/// The file read from disk does not have a contiguous list of validator public keys. The file
/// has become corrupted.
InconsistentIndex {
_expected: Option<usize>,
_found: usize,
},
}

impl From<Error> for BeaconChainError {
fn from(e: Error) -> BeaconChainError {
BeaconChainError::ValidatorPubkeyCacheFileError(format!("{:?}", e))
}
}

impl ValidatorPubkeyCacheFile {
/// Opens an existing file for reading and writing.
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
File::options()
.read(true)
.write(true)
.create(false)
.append(true)
.open(path)
.map(Self)
.map_err(Error::Io)
}

/// Append a public key to file.
///
/// The provided `index` should each be one greater than the previous and start at 0.
/// Otherwise, the file will become corrupted and unable to be converted into a cache .
pub fn append(&mut self, index: usize, pubkey: &PublicKeyBytes) -> Result<(), Error> {
append_to_file(&mut self.0, index, pubkey)
}

/// Creates a `ValidatorPubkeyCache` by reading and parsing the underlying file.
pub fn into_cache<T: BeaconChainTypes>(mut self) -> Result<ValidatorPubkeyCache<T>, Error> {
let mut bytes = vec![];
self.0.read_to_end(&mut bytes).map_err(Error::Io)?;

let list: Vec<(usize, PublicKeyBytes)> = Vec::from_ssz_bytes(&bytes).map_err(Error::Ssz)?;

let mut last = None;
let mut pubkeys = Vec::with_capacity(list.len());
let mut indices = HashMap::with_capacity(list.len());
let mut pubkey_bytes = Vec::with_capacity(list.len());

for (index, pubkey) in list {
let expected = last.map(|n| n + 1);
if expected.map_or(true, |expected| index == expected) {
last = Some(index);
pubkeys.push((&pubkey).try_into().map_err(Error::PubkeyDecode)?);
pubkey_bytes.push(pubkey);
indices.insert(pubkey, index);
} else {
return Err(Error::InconsistentIndex {
_expected: expected,
_found: index,
});
}
}

Ok(ValidatorPubkeyCache {
pubkeys,
indices,
pubkey_bytes,
backing: PubkeyCacheBacking::File(self),
})
}
}

fn append_to_file(file: &mut File, index: usize, pubkey: &PublicKeyBytes) -> Result<(), Error> {
let mut line = Vec::with_capacity(index.ssz_bytes_len() + pubkey.ssz_bytes_len());

index.ssz_append(&mut line);
pubkey.ssz_append(&mut line);

file.write_all(&line).map_err(Error::Io)
}

#[cfg(test)]
mod test {
use super::*;
use crate::test_utils::{BeaconChainHarness, EphemeralHarnessType};
use logging::test_logger;
use std::sync::Arc;
use store::HotColdDB;
use tempfile::tempdir;
use types::{
test_utils::generate_deterministic_keypair, BeaconState, EthSpec, Keypair, MainnetEthSpec,
};
use types::{BeaconState, EthSpec, Keypair, MainnetEthSpec};

type E = MainnetEthSpec;
type T = EphemeralHarnessType<E>;
Expand Down Expand Up @@ -422,7 +284,7 @@ mod test {
check_cache_get(&cache, &keypairs[..]);
drop(cache);

// Re-init the cache from the file.
// Re-init the cache from the store.
let mut cache =
ValidatorPubkeyCache::load_from_store(store.clone()).expect("should open cache");
check_cache_get(&cache, &keypairs[..]);
Expand All @@ -435,36 +297,8 @@ mod test {
check_cache_get(&cache, &keypairs[..]);
drop(cache);

// Re-init the cache from the file.
// Re-init the cache from the store.
let cache = ValidatorPubkeyCache::load_from_store(store).expect("should open cache");
check_cache_get(&cache, &keypairs[..]);
}

#[test]
fn invalid_persisted_file() {
let dir = tempdir().expect("should create tempdir");
let path = dir.path().join("cache.ssz");
let pubkey = generate_deterministic_keypair(0).pk.into();

let mut file = File::create(&path).expect("should create file");
append_to_file(&mut file, 0, &pubkey).expect("should write to file");
drop(file);

let cache = ValidatorPubkeyCache::<T>::load_from_file(&path).expect("should open cache");
drop(cache);

let mut file = File::options()
.write(true)
.append(true)
.open(&path)
.expect("should open file");

append_to_file(&mut file, 42, &pubkey).expect("should write bad data to file");
drop(file);

assert!(
ValidatorPubkeyCache::<T>::load_from_file(&path).is_err(),
"should not parse invalid file"
);
}
}
Loading