Skip to content

Commit

Permalink
Add database schema versioning
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsproul committed Sep 29, 2020
1 parent 55107d0 commit c31cbb6
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 11 deletions.
3 changes: 2 additions & 1 deletion beacon_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.long("slots-per-restore-point")
.value_name("SLOT_COUNT")
.help("Specifies how often a freezer DB restore point should be stored. \
DO NOT DECREASE AFTER INITIALIZATION. [default: 2048 (mainnet) or 64 (minimal)]")
Cannot be changed after initialization. \
[default: 2048 (mainnet) or 64 (minimal)]")
.takes_value(true)
)
.arg(
Expand Down
36 changes: 35 additions & 1 deletion beacon_node/store/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
use crate::{DBColumn, Error, StoreItem};
use serde_derive::{Deserialize, Serialize};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use types::{EthSpec, MinimalEthSpec};

pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048;
pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 5;

/// Database configuration parameters.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)]
pub struct StoreConfig {
/// Number of slots to wait between storing restore points in the freezer database.
pub slots_per_restore_point: u64,
/// Maximum number of blocks to store in the in-memory block cache.
pub block_cache_size: usize,
}

#[derive(Debug, Clone)]
pub enum StoreConfigError {
MismatchedSlotsPerRestorePoint { config: u64, on_disk: u64 },
}

impl Default for StoreConfig {
fn default() -> Self {
Self {
Expand All @@ -22,3 +30,29 @@ impl Default for StoreConfig {
}
}
}

impl StoreConfig {
pub fn check_compatibility(&self, on_disk_config: &Self) -> Result<(), StoreConfigError> {
if self.slots_per_restore_point != on_disk_config.slots_per_restore_point {
return Err(StoreConfigError::MismatchedSlotsPerRestorePoint {
config: self.slots_per_restore_point,
on_disk: on_disk_config.slots_per_restore_point,
});
}
Ok(())
}
}

impl StoreItem for StoreConfig {
fn db_column() -> DBColumn {
DBColumn::BeaconMeta
}

fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}

fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
Ok(Self::from_ssz_bytes(bytes)?)
}
}
8 changes: 8 additions & 0 deletions beacon_node/store/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::chunked_vector::ChunkError;
use crate::config::StoreConfigError;
use crate::hot_cold_store::HotColdDBError;
use ssz::DecodeError;
use types::{BeaconStateError, Hash256, Slot};
Expand All @@ -17,6 +18,7 @@ pub enum Error {
BlockNotFound(Hash256),
NoContinuationData,
SplitPointModified(Slot, Slot),
ConfigError(StoreConfigError),
}

impl From<DecodeError> for Error {
Expand Down Expand Up @@ -49,6 +51,12 @@ impl From<DBError> for Error {
}
}

impl From<StoreConfigError> for Error {
fn from(e: StoreConfigError) -> Error {
Error::ConfigError(e)
}
}

#[derive(Debug)]
pub struct DBError {
pub message: String,
Expand Down
63 changes: 54 additions & 9 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use crate::impls::beacon_state::{get_full_state, store_full_state};
use crate::iter::{ParentRootBlockIterator, StateRootsIterator};
use crate::leveldb_store::LevelDB;
use crate::memory_store::MemoryStore;
use crate::metadata::{
SchemaVersion, CONFIG_KEY, CURRENT_SCHEMA_VERSION, SCHEMA_VERSION_KEY, SPLIT_KEY,
};
use crate::metrics;
use crate::{
get_key_for_col, DBColumn, Error, ItemStore, KeyValueStoreOp, PartialBeaconState, StoreItem,
Expand All @@ -27,9 +30,6 @@ use std::path::Path;
use std::sync::Arc;
use types::*;

/// 32-byte key for accessing the `split` of the freezer DB.
pub const SPLIT_DB_KEY: &str = "FREEZERDBSPLITFREEZERDBSPLITFREE";

/// Defines how blocks should be replayed on states.
#[derive(PartialEq)]
pub enum BlockReplay {
Expand All @@ -46,6 +46,8 @@ pub enum BlockReplay {
/// intermittent "restore point" states pre-finalization.
#[derive(Debug)]
pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
/// The schema version. Loaded from disk on initialization.
schema_version: SchemaVersion,
/// The slot and state root at the point where the database is split between hot and cold.
///
/// States with slots less than `split.slot` are in the cold DB, while states with slots
Expand All @@ -70,6 +72,10 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {

#[derive(Debug, PartialEq)]
pub enum HotColdDBError {
UnsupportedSchemaVersion {
software_version: SchemaVersion,
disk_version: SchemaVersion,
},
/// Recoverable error indicating that the database freeze point couldn't be updated
/// due to the finalized block not lying on an epoch boundary (should be infrequent).
FreezeSlotUnaligned(Slot),
Expand Down Expand Up @@ -106,6 +112,7 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
Self::verify_slots_per_restore_point(config.slots_per_restore_point)?;

let db = HotColdDB {
schema_version: CURRENT_SCHEMA_VERSION,
split: RwLock::new(Split::default()),
cold_db: MemoryStore::open(),
hot_db: MemoryStore::open(),
Expand Down Expand Up @@ -134,6 +141,7 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
Self::verify_slots_per_restore_point(config.slots_per_restore_point)?;

let db = HotColdDB {
schema_version: CURRENT_SCHEMA_VERSION,
split: RwLock::new(Split::default()),
cold_db: LevelDB::open(cold_path)?,
hot_db: LevelDB::open(hot_path)?,
Expand All @@ -144,12 +152,33 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
_phantom: PhantomData,
};

// Ensure that the schema version of the on-disk database matches the software.
// In the future, this would be the spot to hook in auto-migration, etc.
if let Some(schema_version) = db.load_schema_version()? {
if schema_version != CURRENT_SCHEMA_VERSION {
return Err(HotColdDBError::UnsupportedSchemaVersion {
software_version: CURRENT_SCHEMA_VERSION,
disk_version: schema_version,
}
.into());
}
} else {
db.store_schema_version(CURRENT_SCHEMA_VERSION)?;
}

// Ensure that any on-disk config is compatible with the supplied config.
if let Some(disk_config) = db.load_config()? {
db.config.check_compatibility(&disk_config)?;
}
db.store_config()?;

// Load the previous split slot from the database (if any). This ensures we can
// stop and restart correctly.
if let Some(split) = db.load_split()? {
info!(
db.log,
"Hot-Cold DB initialized";
"version" => db.schema_version.0,
"split_slot" => split.slot,
"split_state" => format!("{:?}", split.state_root)
);
Expand Down Expand Up @@ -744,11 +773,29 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
* self.config.slots_per_restore_point
}

/// Load the database schema version from disk.
fn load_schema_version(&self) -> Result<Option<SchemaVersion>, Error> {
self.hot_db.get(&SCHEMA_VERSION_KEY)
}

/// Store the database schema version.
fn store_schema_version(&self, schema_version: SchemaVersion) -> Result<(), Error> {
self.hot_db.put(&SCHEMA_VERSION_KEY, &schema_version)
}

/// Load previously-stored config from disk.
fn load_config(&self) -> Result<Option<StoreConfig>, Error> {
self.hot_db.get(&CONFIG_KEY)
}

/// Write the config to disk.
fn store_config(&self) -> Result<(), Error> {
self.hot_db.put(&CONFIG_KEY, &self.config)
}

/// Load the split point from disk.
fn load_split(&self) -> Result<Option<Split>, Error> {
let key = Hash256::from_slice(SPLIT_DB_KEY.as_bytes());
let split: Option<Split> = self.hot_db.get(&key)?;
Ok(split)
self.hot_db.get(&SPLIT_KEY)
}

/// Load the state root of a restore point.
Expand Down Expand Up @@ -927,9 +974,7 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
slot: frozen_head.slot,
state_root: frozen_head_root,
};
store
.hot_db
.put_sync(&Hash256::from_slice(SPLIT_DB_KEY.as_bytes()), &split)?;
store.hot_db.put_sync(&SPLIT_KEY, &split)?;

// Split point is now persisted in the hot database on disk. The in-memory split point
// hasn't been modified elsewhere since we keep a write lock on it. It's safe to update
Expand Down
1 change: 1 addition & 0 deletions beacon_node/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod hot_cold_store;
mod impls;
mod leveldb_store;
mod memory_store;
mod metadata;
mod metrics;
mod partial_beacon_state;

Expand Down
29 changes: 29 additions & 0 deletions beacon_node/store/src/metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use crate::{DBColumn, Error, StoreItem};
use ssz::{Decode, Encode};
use types::Hash256;

pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(1);

// All the keys that get stored under the `BeaconMeta` column.
//
// We use `repeat_byte` because it's a const fn.
pub const SCHEMA_VERSION_KEY: Hash256 = Hash256::repeat_byte(0);
pub const CONFIG_KEY: Hash256 = Hash256::repeat_byte(1);
pub const SPLIT_KEY: Hash256 = Hash256::repeat_byte(2);

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct SchemaVersion(pub u64);

impl StoreItem for SchemaVersion {
fn db_column() -> DBColumn {
DBColumn::BeaconMeta
}

fn as_store_bytes(&self) -> Vec<u8> {
self.0.as_ssz_bytes()
}

fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
Ok(SchemaVersion(u64::from_ssz_bytes(bytes)?))
}
}

0 comments on commit c31cbb6

Please sign in to comment.