Skip to content

Commit

Permalink
fix(state-sync): save to BlockMisc:STATE_SNAPSHOT_KEY the latest snap…
Browse files Browse the repository at this point in the history
…shot's hash (#9343)

and a few other fixes:
- retry setting and deleting snapshots from rocksdb and file system until success
- replace all unwrap() of state_snapshot locks that may result in unable to start node with returning errors
  • Loading branch information
ppca authored Jul 31, 2023
1 parent 2f53a5d commit 8b0541e
Show file tree
Hide file tree
Showing 5 changed files with 410 additions and 83 deletions.
1 change: 1 addition & 0 deletions core/store/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub const GENESIS_JSON_HASH_KEY: &[u8; 17] = b"GENESIS_JSON_HASH";
pub const GENESIS_STATE_ROOTS_KEY: &[u8; 19] = b"GENESIS_STATE_ROOTS";
pub const COLD_HEAD_KEY: &[u8; 9] = b"COLD_HEAD";
pub const STATE_SYNC_DUMP_KEY: &[u8; 15] = b"STATE_SYNC_DUMP";
pub const STATE_SNAPSHOT_KEY: &[u8; 18] = b"STATE_SNAPSHOT_KEY";

// `DBCol::Misc` keys
pub const FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY: &[u8] =
Expand Down
13 changes: 12 additions & 1 deletion core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub use columns::DBCol;
pub use db::{
CHUNK_TAIL_KEY, COLD_HEAD_KEY, FINAL_HEAD_KEY, FORK_TAIL_KEY, GENESIS_JSON_HASH_KEY,
GENESIS_STATE_ROOTS_KEY, HEADER_HEAD_KEY, HEAD_KEY, LARGEST_TARGET_HEIGHT_KEY,
LATEST_KNOWN_KEY, STATE_SYNC_DUMP_KEY, TAIL_KEY,
LATEST_KNOWN_KEY, STATE_SNAPSHOT_KEY, STATE_SYNC_DUMP_KEY, TAIL_KEY,
};
use near_crypto::PublicKey;
use near_fmt::{AbbrBytes, StorageKey};
Expand Down Expand Up @@ -856,6 +856,17 @@ pub fn set_genesis_state_roots(store_update: &mut StoreUpdate, genesis_roots: &[
.expect("Borsh cannot fail");
}

fn option_to_not_found<T, F>(res: io::Result<Option<T>>, field_name: F) -> io::Result<T>
where
F: std::string::ToString,
{
match res {
Ok(Some(o)) => Ok(o),
Ok(None) => Err(io::Error::new(io::ErrorKind::NotFound, field_name.to_string())),
Err(e) => Err(e),
}
}

pub struct StoreCompiledContractCache {
db: Arc<dyn Database>,
}
Expand Down
208 changes: 126 additions & 82 deletions core/store/src/trie/shard_tries.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::db::STATE_SNAPSHOT_KEY;
use crate::flat::FlatStorageManager;
use crate::option_to_not_found;
use crate::trie::config::TrieConfig;
use crate::trie::prefetching_trie_storage::PrefetchingThreadsHandle;
use crate::trie::trie_storage::{TrieCache, TrieCachingStorage};
Expand All @@ -18,9 +20,9 @@ use near_primitives::trie_key::TrieKey;
use near_primitives::types::{
NumShards, RawStateChange, RawStateChangesWithTrieKey, StateChangeCause, StateRoot,
};
use std::io;
use std::path::{Path, PathBuf};
use std::rc::Rc;
use std::str::FromStr;
use std::sync::{Arc, RwLock, TryLockError};

struct ShardTriesInner {
Expand Down Expand Up @@ -475,21 +477,55 @@ impl ShardTries {
let _timer = metrics::MAKE_STATE_SNAPSHOT_ELAPSED.start_timer();
// `write()` lock is held for the whole duration of this function.
// Accessing the snapshot in other parts of the system will fail.
let mut state_snapshot_lock = self.0.state_snapshot.write().unwrap();
let mut state_snapshot_lock = self.0.state_snapshot.write().map_err(|_| {
anyhow::Error::msg("error accessing write lock of state_snapshot")
})?;
let db_snapshot_hash = self.get_state_snapshot_hash();

if let Some(state_snapshot) = &*state_snapshot_lock {
if &state_snapshot.prev_block_hash == prev_block_hash {
// only return Ok() when the hash stored in STATE_SNAPSHOT_KEY and in state_snapshot_lock and prev_block_hash are the same
if db_snapshot_hash.is_ok()
&& db_snapshot_hash.unwrap() == *prev_block_hash
&& state_snapshot.prev_block_hash == *prev_block_hash
{
tracing::warn!(target: "state_snapshot", ?prev_block_hash, "Requested a state snapshot but that is already available");
return Ok(());
} else {
let prev_block_hash = state_snapshot.prev_block_hash;
// Drop Store before deleting the underlying data.
*state_snapshot_lock = None;
self.delete_state_snapshot(
&prev_block_hash,
home_dir,
hot_store_path,
state_snapshot_subdir,
);

// This will delete all existing snapshots from file system. If failed, will retry until success
let mut delete_state_snapshots_from_file_system = false;
let mut file_system_delete_retries = 0;
while !delete_state_snapshots_from_file_system
&& file_system_delete_retries < 3
{
delete_state_snapshots_from_file_system = self
.delete_all_state_snapshots(
home_dir,
hot_store_path,
state_snapshot_subdir,
);
file_system_delete_retries += 1;
}

// this will delete the STATE_SNAPSHOT_KEY-value pair from db. If failed, will retry until success
let mut delete_state_snapshot_from_db = false;
let mut db_delete_retries = 0;
while !delete_state_snapshot_from_db && db_delete_retries < 3 {
delete_state_snapshot_from_db = match self.set_state_snapshot_hash(None)
{
Ok(_) => true,
Err(err) => {
// This will be retried.
tracing::debug!(target: "state_snapshot", ?err, "Failed to delete the old state snapshot for BlockMisc::STATE_SNAPSHOT_KEY in rocksdb");
false
}
};
db_delete_retries += 1;
}

metrics::HAS_STATE_SNAPSHOT.set(0);
}
}

Expand Down Expand Up @@ -526,6 +562,22 @@ impl ShardTries {
shard_uids,
Some(block),
));

// this will set the new hash for state snapshot in rocksdb. will retry until success.
let mut set_state_snapshot_in_db = false;
while !set_state_snapshot_in_db {
set_state_snapshot_in_db = match self
.set_state_snapshot_hash(Some(*prev_block_hash))
{
Ok(_) => true,
Err(err) => {
// This will be retried.
tracing::debug!(target: "state_snapshot", ?err, "Failed to set the new state snapshot for BlockMisc::STATE_SNAPSHOT_KEY in rocksdb");
false
}
}
}

metrics::HAS_STATE_SNAPSHOT.set(1);
tracing::info!(target: "state_snapshot", ?prev_block_hash, "Made a checkpoint");
Ok(())
Expand All @@ -538,7 +590,11 @@ impl ShardTries {
let _span =
tracing::info_span!(target: "state_snapshot", "compact_state_snapshot").entered();
// It's fine if the access to state snapshot blocks.
let state_snapshot_lock = self.0.state_snapshot.read().unwrap();
let state_snapshot_lock = self
.0
.state_snapshot
.read()
.map_err(|_| anyhow::Error::msg("error accessing read lock of state_snapshot"))?;
if let Some(state_snapshot) = &*state_snapshot_lock {
let _timer = metrics::COMPACT_STATE_SNAPSHOT_ELAPSED.start_timer();
Ok(state_snapshot.store.compact()?)
Expand All @@ -548,30 +604,25 @@ impl ShardTries {
}
}

/// Deletes a previously open state snapshot.
/// Drops the Store and deletes the files.
fn delete_state_snapshot(
/// Deletes all existing state snapshots in the parent directory
fn delete_all_state_snapshots(
&self,
prev_block_hash: &CryptoHash,
home_dir: &Path,
hot_store_path: &Path,
state_snapshot_subdir: &Path,
) {
) -> bool {
let _timer = metrics::DELETE_STATE_SNAPSHOT_ELAPSED.start_timer();
let _span =
tracing::info_span!(target: "state_snapshot", "delete_state_snapshot").entered();
let path = Self::get_state_snapshot_base_dir(
prev_block_hash,
home_dir,
hot_store_path,
state_snapshot_subdir,
);
let path = home_dir.join(hot_store_path).join(state_snapshot_subdir);
match std::fs::remove_dir_all(&path) {
Ok(_) => {
tracing::info!(target: "state_snapshot", ?path, ?prev_block_hash, "Deleted a state snapshot");
tracing::info!(target: "state_snapshot", ?path, "Deleted all state snapshots");
true
}
Err(err) => {
tracing::warn!(target: "state_snapshot", ?err, ?path, ?prev_block_hash, "Failed to delete a state snapshot");
tracing::warn!(target: "state_snapshot", ?err, ?path, "Failed to delete all state snapshots");
false
}
}
}
Expand All @@ -588,8 +639,27 @@ impl ShardTries {
home_dir.join(hot_store_path).join(state_snapshot_subdir).join(format!("{prev_block_hash}"))
}

/// Looks for directories on disk with names that look like `prev_block_hash`.
/// Checks that there is at most one such directory. Opens it as a Store.
/// Retrieves STATE_SNAPSHOT_KEY
pub fn get_state_snapshot_hash(&self) -> Result<CryptoHash, io::Error> {
option_to_not_found(
self.0.store.get_ser(DBCol::BlockMisc, STATE_SNAPSHOT_KEY),
"STATE_SNAPSHOT_KEY",
)
}

/// Updates STATE_SNAPSHOT_KEY.
pub fn set_state_snapshot_hash(&self, value: Option<CryptoHash>) -> Result<(), io::Error> {
let mut store_update = self.0.store.store_update();
let key = STATE_SNAPSHOT_KEY;
match value {
None => store_update.delete(DBCol::BlockMisc, key),
Some(value) => store_update.set_ser(DBCol::BlockMisc, key, &value)?,
}
store_update.commit().map_err(|err| err.into())
}

/// Read RocksDB for the latest available snapshot hash, if available, open base_path+snapshot_hash for the state snapshot
/// we don't deal with multiple snapshots here because we will deal with it whenever a new snapshot is created and saved to file system
pub fn maybe_open_state_snapshot(
&self,
get_shard_uids_fn: impl Fn(CryptoHash) -> Result<Vec<ShardUId>, EpochError>,
Expand All @@ -608,67 +678,41 @@ impl ShardTries {
state_snapshot_subdir,
compaction_enabled: _,
} => {
let path = Self::get_state_snapshot_base_dir(
&CryptoHash::new(),
// directly return error if no snapshot is found
let snapshot_hash: CryptoHash = self.get_state_snapshot_hash()?;

let snapshot_path = Self::get_state_snapshot_base_dir(
&snapshot_hash,
&home_dir,
&hot_store_path,
&state_snapshot_subdir,
);
let parent_path =
path.parent().ok_or(anyhow::anyhow!("{path:?} needs to have a parent dir"))?;
tracing::debug!(target: "state_snapshot", ?path, ?parent_path);

let snapshots = match std::fs::read_dir(parent_path) {
Err(err) => {
if err.kind() == std::io::ErrorKind::NotFound {
tracing::debug!(target: "state_snapshot", ?parent_path, "State Snapshot base directory doesn't exist.");
return Ok(());
} else {
return Err(err.into());
}
}
Ok(entries) => {
let mut snapshots = vec![];
for entry in entries.filter_map(Result::ok) {
let file_name = entry.file_name().into_string().map_err(|err| {
anyhow::anyhow!("Can't display file_name: {err:?}")
})?;
if let Ok(prev_block_hash) = CryptoHash::from_str(&file_name) {
snapshots.push((prev_block_hash, entry.path()));
}
}
snapshots
}
};
let parent_path = snapshot_path
.parent()
.ok_or(anyhow::anyhow!("{snapshot_path:?} needs to have a parent dir"))?;
tracing::debug!(target: "state_snapshot", ?snapshot_path, ?parent_path);

if snapshots.is_empty() {
Ok(())
} else if snapshots.len() == 1 {
let (prev_block_hash, snapshot_dir) = &snapshots[0];

let store_config = StoreConfig::default();

let opener = NodeStorage::opener(&snapshot_dir, false, &store_config, None);
let storage = opener.open_in_mode(Mode::ReadOnly)?;
let store = storage.get_hot_store();
let flat_storage_manager = FlatStorageManager::new(store.clone());

let shard_uids = get_shard_uids_fn(*prev_block_hash)?;
let mut guard = self.0.state_snapshot.write().unwrap();
*guard = Some(StateSnapshot::new(
store,
*prev_block_hash,
flat_storage_manager,
&shard_uids,
None,
));
metrics::HAS_STATE_SNAPSHOT.set(1);
tracing::info!(target: "runtime", ?prev_block_hash, ?snapshot_dir, "Detected and opened a state snapshot.");
Ok(())
} else {
tracing::error!(target: "runtime", ?snapshots, "Detected multiple state snapshots. Please keep at most one snapshot and delete others.");
Err(anyhow::anyhow!("More than one state snapshot detected {:?}", snapshots))
}
let store_config = StoreConfig::default();

let opener = NodeStorage::opener(&snapshot_path, false, &store_config, None);
let storage = opener.open_in_mode(Mode::ReadOnly)?;
let store = storage.get_hot_store();
let flat_storage_manager = FlatStorageManager::new(store.clone());

let shard_uids = get_shard_uids_fn(snapshot_hash)?;
let mut guard = self.0.state_snapshot.write().map_err(|_| {
anyhow::Error::msg("error accessing write lock of state_snapshot")
})?;
*guard = Some(StateSnapshot::new(
store,
snapshot_hash,
flat_storage_manager,
&shard_uids,
None,
));
metrics::HAS_STATE_SNAPSHOT.set(1);
tracing::info!(target: "runtime", ?snapshot_hash, ?snapshot_path, "Detected and opened a state snapshot.");
Ok(())
}
}
}
Expand Down
1 change: 1 addition & 0 deletions integration-tests/src/tests/nearcore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod rpc_error_structs;
mod rpc_nodes;
mod run_nodes;
mod stake_nodes;
mod state_snapshot;
mod sync_nodes;
mod sync_state_nodes;
mod track_shards;
Loading

0 comments on commit 8b0541e

Please sign in to comment.