From 60ca9546d6d105bc52511a2abcb0f746f280a5a5 Mon Sep 17 00:00:00 2001 From: nikurt <86772482+nikurt@users.noreply.github.com> Date: Sat, 29 Jul 2023 00:34:06 +0200 Subject: [PATCH] fix(flat-storage and state-sync): Delay updating Flat Head to let us obtain state parts from a state snapshot (#9338) When requested to move flat head to block X, set flat head to the second-to-last predecessor block with flat changes. This way blocks without flat state changes are effectively ignored. This is done by keeping a bit of metadata in `FlatStateDeltaMetadata`, it keeps a pointer to a predecessor block with flat state change. The metadata bit is trivially constructed from the current and the prev block. Getting flat values remains to be O(1). Block head and flat head can have arbitrarily large height differences, but the number of blocks with flat state changes is limited to 2 (explained above) + 2 (gap between the final block and the block head). The PR lets us obtain state parts when the last chunk of an epoch is not in the last block. --- chain/chain/src/chain.rs | 39 +- chain/chain/src/state_snapshot_actor.rs | 19 +- chain/client/src/test_utils.rs | 4 +- core/o11y/src/metrics.rs | 1 + core/store/src/flat/delta.rs | 12 +- core/store/src/flat/metrics.rs | 10 +- core/store/src/flat/storage.rs | 654 ++++++++++++++++-- core/store/src/flat/store_helper.rs | 65 +- core/store/src/flat/types.rs | 4 +- core/store/src/metadata.rs | 2 +- core/store/src/metrics.rs | 10 +- core/store/src/migrations.rs | 24 + core/store/src/trie/shard_tries.rs | 31 +- .../src/tests/client/flat_storage.rs | 63 +- .../src/tests/nearcore/sync_state_nodes.rs | 72 +- nearcore/src/migrations.rs | 1 + nearcore/src/runtime/mod.rs | 1 + neard/src/cli.rs | 2 +- .../src/estimator_context.rs | 2 +- tools/flat-storage/src/commands.rs | 16 +- 20 files changed, 938 insertions(+), 94 deletions(-) diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index fb7ccbe8b76..2dad88bb7b5 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -2274,7 +2274,7 @@ impl Chain { new_flat_head = *self.genesis.hash(); } // Try to update flat head. - flat_storage.update_flat_head(&new_flat_head).unwrap_or_else(|err| { + flat_storage.update_flat_head(&new_flat_head, false).unwrap_or_else(|err| { match &err { FlatStorageError::BlockNotSupported(_) => { // It's possible that new head is not a child of current flat head, e.g. when we have a @@ -3385,15 +3385,22 @@ impl Chain { // Flat storage must not exist at this point because leftover keys corrupt its state. assert!(flat_storage_manager.get_flat_storage_for_shard(shard_uid).is_none()); + let flat_head_hash = *chunk.prev_block(); + let flat_head_header = self.get_block_header(&flat_head_hash)?; + let flat_head_prev_hash = *flat_head_header.prev_hash(); + let flat_head_height = flat_head_header.height(); + + tracing::debug!(target: "store", ?shard_uid, ?flat_head_hash, flat_head_height, "set_state_finalize - initialized flat storage"); + let mut store_update = self.runtime_adapter.store().store_update(); store_helper::set_flat_storage_status( &mut store_update, shard_uid, FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: near_store::flat::BlockInfo { - hash: *block_hash, - prev_hash: *block_header.prev_hash(), - height: block_header.height(), + hash: flat_head_hash, + prev_hash: flat_head_prev_hash, + height: flat_head_height, }, }), ); @@ -4188,7 +4195,12 @@ impl Chain { let head = self.head()?; let epoch_id = self.epoch_manager.get_epoch_id(&head.prev_block_hash)?; let shard_layout = self.epoch_manager.get_shard_layout(&epoch_id)?; - (helper.make_snapshot_callback)(head.prev_block_hash, shard_layout.get_shard_uids()) + let last_block = self.get_block(&head.last_block_hash)?; + (helper.make_snapshot_callback)( + head.prev_block_hash, + shard_layout.get_shard_uids(), + last_block, + ) } } Ok(()) @@ -5077,10 +5089,27 @@ impl<'a> ChainUpdate<'a> { shard_uid: ShardUId, trie_changes: &WrappedTrieChanges, ) -> Result<(), Error> { + let prev_block_with_changes = if trie_changes.state_changes().is_empty() { + // The current block has no flat state changes. + // Find the last block with flat state changes by looking it up in + // the prev block. + store_helper::get_prev_block_with_changes( + self.chain_store_update.store(), + shard_uid, + block_hash, + prev_hash, + ) + .map_err(|e| StorageError::from(e))? + } else { + // The current block has flat state changes. + None + }; + let delta = FlatStateDelta { changes: FlatStateChanges::from_state_changes(&trie_changes.state_changes()), metadata: FlatStateDeltaMetadata { block: near_store::flat::BlockInfo { hash: block_hash, height, prev_hash }, + prev_block_with_changes, }, }; diff --git a/chain/chain/src/state_snapshot_actor.rs b/chain/chain/src/state_snapshot_actor.rs index 6035b77c43b..144ac99ea84 100644 --- a/chain/chain/src/state_snapshot_actor.rs +++ b/chain/chain/src/state_snapshot_actor.rs @@ -1,5 +1,6 @@ use actix::AsyncContext; use near_o11y::{handler_debug_span, OpenTelemetrySpanExt, WithSpanContext, WithSpanContextExt}; +use near_primitives::block::Block; use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::ShardUId; use near_store::flat::FlatStorageManager; @@ -29,6 +30,8 @@ struct MakeSnapshotRequest { prev_block_hash: CryptoHash, /// Shards that need to be present in the snapshot. shard_uids: Vec, + /// Last block of the prev epoch. + block: Block, /// Whether to perform compaction. compaction_enabled: bool, } @@ -47,9 +50,9 @@ impl actix::Handler> for StateSnapshotActor _ctx: &mut actix::Context, ) -> Self::Result { let (_span, msg) = handler_debug_span!(target: "state_snapshot", msg); - let MakeSnapshotRequest { prev_block_hash, shard_uids, compaction_enabled } = msg; + let MakeSnapshotRequest { prev_block_hash, shard_uids, block, compaction_enabled } = msg; - let res = self.tries.make_state_snapshot(&prev_block_hash, &shard_uids); + let res = self.tries.make_state_snapshot(&prev_block_hash, &shard_uids, &block); if !self.flat_storage_manager.set_flat_state_updates_mode(true) { tracing::error!(target: "state_snapshot", ?prev_block_hash, ?shard_uids, "Failed to unlock flat state updates"); } @@ -88,7 +91,7 @@ impl actix::Handler> for StateSnapshotAc } pub type MakeSnapshotCallback = - Arc) -> () + Send + Sync + 'static>; + Arc, Block) -> () + Send + Sync + 'static>; /// Sends a request to make a state snapshot. pub fn get_make_snapshot_callback( @@ -96,11 +99,15 @@ pub fn get_make_snapshot_callback( flat_storage_manager: FlatStorageManager, compaction_enabled: bool, ) -> MakeSnapshotCallback { - Arc::new(move |prev_block_hash, shard_uids| { - tracing::info!(target: "state_snapshot", ?prev_block_hash, ?shard_uids, "start_snapshot_callback sends `MakeSnapshotCallback` to state_snapshot_addr"); + Arc::new(move |prev_block_hash, shard_uids, block| { + tracing::info!( + target: "state_snapshot", + ?prev_block_hash, + ?shard_uids, + "start_snapshot_callback sends `MakeSnapshotCallback` to state_snapshot_addr"); if flat_storage_manager.set_flat_state_updates_mode(false) { state_snapshot_addr.do_send( - MakeSnapshotRequest { prev_block_hash, shard_uids, compaction_enabled } + MakeSnapshotRequest { prev_block_hash, shard_uids, block, compaction_enabled } .with_span_context(), ); } diff --git a/chain/client/src/test_utils.rs b/chain/client/src/test_utils.rs index 2fe2960828c..51011567e98 100644 --- a/chain/client/src/test_utils.rs +++ b/chain/client/src/test_utils.rs @@ -1765,9 +1765,9 @@ impl TestEnvBuilder { }; let make_state_snapshot_callback : Option = if self.add_state_snapshots { let runtime = runtime.clone(); - let snapshot : MakeSnapshotCallback = Arc::new(move |prev_block_hash, shard_uids| { + let snapshot : MakeSnapshotCallback = Arc::new(move |prev_block_hash, shard_uids, block| { tracing::info!(target: "state_snapshot", ?prev_block_hash, "make_snapshot_callback"); - runtime.get_tries().make_state_snapshot(&prev_block_hash, &shard_uids).unwrap(); + runtime.get_tries().make_state_snapshot(&prev_block_hash, &shard_uids, &block).unwrap(); }); Some(snapshot) } else { diff --git a/core/o11y/src/metrics.rs b/core/o11y/src/metrics.rs index 5be4c10602e..db019c9fbb1 100644 --- a/core/o11y/src/metrics.rs +++ b/core/o11y/src/metrics.rs @@ -208,6 +208,7 @@ static EXCEPTIONS: Lazy> = Lazy::new(|| { "flat_storage_creation_threads_used", "flat_storage_distance_to_head", "flat_storage_head_height", + "flat_storage_hops_to_head", ]) }); diff --git a/core/store/src/flat/delta.rs b/core/store/src/flat/delta.rs index d7bffeea840..8ef7544e8ff 100644 --- a/core/store/src/flat/delta.rs +++ b/core/store/src/flat/delta.rs @@ -3,7 +3,7 @@ use borsh::{BorshDeserialize, BorshSerialize}; use near_primitives::hash::hash; use near_primitives::shard_layout::ShardUId; use near_primitives::state::{FlatStateValue, ValueRef}; -use near_primitives::types::RawStateChangesWithTrieKey; +use near_primitives::types::{BlockHeight, RawStateChangesWithTrieKey}; use std::collections::HashMap; use std::sync::Arc; @@ -16,9 +16,19 @@ pub struct FlatStateDelta { pub changes: FlatStateChanges, } +#[derive(BorshSerialize, BorshDeserialize, Debug, Clone, Copy, serde::Serialize)] +pub struct BlockWithChangesInfo { + pub(crate) hash: CryptoHash, + pub(crate) height: BlockHeight, +} + #[derive(BorshSerialize, BorshDeserialize, Debug, Clone, Copy, serde::Serialize)] pub struct FlatStateDeltaMetadata { pub block: BlockInfo, + /// `None` if the block itself has flat state changes. + /// `Some` if the block has no flat state changes, and contains + /// info of the last block with some flat state changes. + pub prev_block_with_changes: Option, } #[derive(BorshSerialize, BorshDeserialize, Debug)] diff --git a/core/store/src/flat/metrics.rs b/core/store/src/flat/metrics.rs index dcbec0c70f8..10605c6f169 100644 --- a/core/store/src/flat/metrics.rs +++ b/core/store/src/flat/metrics.rs @@ -1,12 +1,13 @@ use crate::metrics::flat_state_metrics; use near_o11y::metrics::{IntCounter, IntGauge}; -use near_primitives::types::ShardId; +use near_primitives::types::{BlockHeight, ShardId}; use super::FlatStorageStatus; pub(crate) struct FlatStorageMetrics { flat_head_height: IntGauge, distance_to_head: IntGauge, + hops_to_head: IntGauge, cached_deltas: IntGauge, cached_changes_num_items: IntGauge, cached_changes_size: IntGauge, @@ -20,6 +21,8 @@ impl FlatStorageMetrics { .with_label_values(&[&shard_id_label]), distance_to_head: flat_state_metrics::FLAT_STORAGE_DISTANCE_TO_HEAD .with_label_values(&[&shard_id_label]), + hops_to_head: flat_state_metrics::FLAT_STORAGE_HOPS_TO_HEAD + .with_label_values(&[&shard_id_label]), cached_deltas: flat_state_metrics::FLAT_STORAGE_CACHED_DELTAS .with_label_values(&[&shard_id_label]), cached_changes_num_items: flat_state_metrics::FLAT_STORAGE_CACHED_CHANGES_NUM_ITEMS @@ -29,8 +32,9 @@ impl FlatStorageMetrics { } } - pub(crate) fn set_distance_to_head(&self, distance: usize) { - self.distance_to_head.set(distance as i64); + pub(crate) fn set_distance_to_head(&self, distance: usize, height: Option) { + self.distance_to_head.set(height.unwrap_or(0) as i64); + self.hops_to_head.set(distance as i64); } pub(crate) fn set_flat_head_height(&self, height: u64) { diff --git a/core/store/src/flat/storage.rs b/core/store/src/flat/storage.rs index 23c491dcf2a..df27bac47c7 100644 --- a/core/store/src/flat/storage.rs +++ b/core/store/src/flat/storage.rs @@ -5,16 +5,18 @@ use near_primitives::errors::StorageError; use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::ShardUId; use near_primitives::state::FlatStateValue; +use near_primitives::types::BlockHeight; use tracing::{debug, warn}; -use crate::flat::delta::CachedFlatStateChanges; +use crate::flat::delta::{BlockWithChangesInfo, CachedFlatStateChanges}; +use crate::flat::BlockInfo; use crate::flat::{FlatStorageReadyStatus, FlatStorageStatus}; use crate::{Store, StoreUpdate}; use super::delta::{CachedFlatStateDelta, FlatStateDelta}; use super::metrics::FlatStorageMetrics; +use super::store_helper; use super::types::FlatStorageError; -use super::{store_helper, BlockInfo}; /// FlatStorage stores information on which blocks flat storage current supports key lookups on. /// Note that this struct is shared by multiple threads, the chain thread, threads that apply chunks, @@ -60,6 +62,8 @@ impl FlatStorageInner { /// means 150 MiB per shards. const CACHED_CHANGES_SIZE_LIMIT: bytesize::ByteSize = bytesize::ByteSize(150 * bytesize::MIB); + const BLOCKS_WITH_CHANGES_FLAT_HEAD_GAP: BlockHeight = 2; + /// Creates `BlockNotSupported` error for the given block. fn create_block_not_supported_error(&self, block_hash: &CryptoHash) -> FlatStorageError { FlatStorageError::BlockNotSupported((self.flat_head.hash, *block_hash)) @@ -87,17 +91,37 @@ impl FlatStorageInner { let flat_head = &self.flat_head; let mut block_hash = *target_block_hash; let mut blocks = vec![]; + let mut first_height = None; while block_hash != flat_head.hash { - blocks.push(block_hash); - block_hash = self + let metadata = self .deltas .get(&block_hash) .ok_or_else(|| self.create_block_not_supported_error(target_block_hash))? - .metadata - .block - .prev_hash; + .metadata; + if first_height.is_none() { + // Keep track of the height of the initial block. + first_height = Some(metadata.block.height); + } + + // Maybe skip to the previous block with changes. + block_hash = match metadata.prev_block_with_changes { + None => { + blocks.push(block_hash); + metadata.block.prev_hash + } + Some(prev_block_with_changes) => { + if prev_block_with_changes.height > flat_head.height { + prev_block_with_changes.hash + } else { + flat_head.hash + } + } + }; } - self.metrics.set_distance_to_head(blocks.len()); + self.metrics.set_distance_to_head( + blocks.len(), + first_height.map(|height| height - flat_head.height), + ); Ok(blocks) } @@ -127,6 +151,63 @@ impl FlatStorageInner { warn!(target: "chain", %shard_id, %flat_head_height, %cached_deltas, %cached_changes_size_bytes, "Flat storage cached deltas exceeded expected limits"); } } + + // Determine a block to make the new flat head. + // If `strict`, uses `block_hash` as the new flat head. + // If not `strict`, uses the second most recent block with flat state + // changes from `block_hash`, if it exists. + fn get_new_flat_head( + &self, + block_hash: CryptoHash, + strict: bool, + ) -> Result { + if strict { + return Ok(block_hash); + } + + let current_flat_head_hash = self.flat_head.hash; + let current_flat_head_height = self.flat_head.height; + + let mut new_head = block_hash; + let mut blocks_with_changes = 0; + // Delays updating flat head, keeps this many blocks with non-empty flat + // state changes between the requested flat head and the chosen head to + // make flat state snapshots function properly. + while blocks_with_changes < Self::BLOCKS_WITH_CHANGES_FLAT_HEAD_GAP { + if new_head == current_flat_head_hash { + return Ok(current_flat_head_hash); + } + let metadata = + self.deltas.get(&new_head).ok_or(missing_delta_error(&new_head))?.metadata; + new_head = match metadata.prev_block_with_changes { + None => { + // The block has flat state changes. + blocks_with_changes += 1; + if blocks_with_changes == Self::BLOCKS_WITH_CHANGES_FLAT_HEAD_GAP { + break; + } + metadata.block.prev_hash + } + Some(BlockWithChangesInfo { hash, height, .. }) => { + // The block has no flat state changes. + if height <= current_flat_head_height { + return Ok(current_flat_head_hash); + } + hash + } + }; + } + Ok(new_head) + } + + #[cfg(test)] + pub fn test_get_new_flat_head( + &self, + block_hash: CryptoHash, + strict: bool, + ) -> Result { + self.get_new_flat_head(block_hash, strict) + } } impl FlatStorage { @@ -213,17 +294,50 @@ impl FlatStorage { Ok(value) } - /// Update the head of the flat storage, including updating the flat state in memory and on disk - /// and updating the flat state to reflect the state at the new head. If updating to given head is not possible, - /// returns an error. - pub fn update_flat_head(&self, new_head: &CryptoHash) -> Result<(), FlatStorageError> { + /// Update the head of the flat storage, including updating the flat state + /// in memory and on disk and updating the flat state to reflect the state + /// at the new head. If updating to given head is not possible, returns an + /// error. + /// If `strict`, then unconditionally sets flat head to the given block. + /// If not `strict`, then it updates the flat head to the latest block X, + /// such that [X, block_hash] contains 2 blocks with flat state changes. If possible. + /// + /// The function respects the current flat head and will never try to + /// set flat head to a block older than the current flat head. + // + // Let's denote blocks with flat state changes as X, and blocks without + // flat state changes as O. + // + // block_hash + // | + // v + // ...-O-O-X-O-O-O-X-O-O-O-X-O-O-O-X-....->future + // ^ + // | + // new_head + // + // The segment [new_head, block_hash] contains two blocks with flat state changes. + pub fn update_flat_head( + &self, + block_hash: &CryptoHash, + strict: bool, + ) -> Result<(), FlatStorageError> { let mut guard = self.0.write().expect(crate::flat::POISONED_LOCK_ERR); if !guard.move_head_enabled { return Ok(()); } + + let new_head = guard.get_new_flat_head(*block_hash, strict)?; + if new_head == guard.flat_head.hash { + return Ok(()); + } + let shard_uid = guard.shard_uid; let shard_id = shard_uid.shard_id(); - let blocks = guard.get_blocks_to_head(new_head)?; + + tracing::debug!(target: "store", flat_head = ?guard.flat_head.hash, ?new_head, shard_id, "Moving flat head"); + let blocks = guard.get_blocks_to_head(&new_head)?; + for block_hash in blocks.into_iter().rev() { let mut store_update = StoreUpdate::new(guard.store.storage.clone()); // Delta must exist because flat storage is locked and we could retrieve @@ -231,29 +345,28 @@ impl FlatStorage { let changes = store_helper::get_delta_changes(&guard.store, shard_uid, block_hash)? .ok_or_else(|| missing_delta_error(&block_hash))?; changes.apply_to_flat_state(&mut store_update, guard.shard_uid); - let block = &guard.deltas[&block_hash].metadata.block; + let metadata = guard + .deltas + .get(&block_hash) + .ok_or_else(|| missing_delta_error(&block_hash))? + .metadata; + let block = metadata.block; let block_height = block.height; store_helper::set_flat_storage_status( &mut store_update, shard_uid, - FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: *block }), + FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: block }), ); guard.metrics.set_flat_head_height(block.height); - guard.flat_head = *block; + guard.flat_head = block; // Remove old deltas from disk and memory. // Do it for each head update separately to ensure that old data is removed properly if node was // interrupted in the middle. // TODO (#7327): in case of long forks it can take a while and delay processing of some chunk. // Consider avoid iterating over all blocks and make removals lazy. - let gc_height = guard - .deltas - .get(&block_hash) - .ok_or_else(|| missing_delta_error(&block_hash))? - .metadata - .block - .height; + let gc_height = metadata.block.height; let hashes_to_remove: Vec<_> = guard .deltas .iter() @@ -349,19 +462,23 @@ fn missing_delta_error(block_hash: &CryptoHash) -> FlatStorageError { #[cfg(test)] mod tests { - use crate::flat::delta::{FlatStateChanges, FlatStateDelta, FlatStateDeltaMetadata}; + use crate::flat::delta::{ + BlockWithChangesInfo, FlatStateChanges, FlatStateDelta, FlatStateDeltaMetadata, + }; use crate::flat::manager::FlatStorageManager; + use crate::flat::storage::FlatStorageInner; use crate::flat::types::{BlockInfo, FlatStorageError}; use crate::flat::{store_helper, FlatStorageReadyStatus, FlatStorageStatus}; use crate::test_utils::create_test_store; use crate::StorageError; + use assert_matches::assert_matches; use borsh::BorshSerialize; + use near_o11y::testonly::init_test_logger; use near_primitives::hash::{hash, CryptoHash}; - use near_primitives::types::BlockHeight; - - use assert_matches::assert_matches; use near_primitives::shard_layout::ShardUId; use near_primitives::state::FlatStateValue; + use near_primitives::types::BlockHeight; + use rand::{thread_rng, Rng}; use std::collections::HashMap; struct MockChain { @@ -478,7 +595,10 @@ mod tests { for i in 1..5 { let delta = FlatStateDelta { changes: FlatStateChanges::default(), - metadata: FlatStateDeltaMetadata { block: chain.get_block(i) }, + metadata: FlatStateDeltaMetadata { + block: chain.get_block(i), + prev_block_with_changes: None, + }, }; store_helper::set_delta(&mut store_update, shard_uid, &delta); } @@ -491,23 +611,23 @@ mod tests { // Check `BlockNotSupported` errors which are fine to occur during regular block processing. // First, check that flat head can be moved to block 1. let flat_head_hash = chain.get_block_hash(1); - assert_eq!(flat_storage.update_flat_head(&flat_head_hash), Ok(())); + assert_eq!(flat_storage.update_flat_head(&flat_head_hash, true), Ok(())); // Check that attempt to move flat head to block 2 results in error because it lays in unreachable fork. let fork_block_hash = chain.get_block_hash(2); assert_eq!( - flat_storage.update_flat_head(&fork_block_hash), + flat_storage.update_flat_head(&fork_block_hash, true), Err(FlatStorageError::BlockNotSupported((flat_head_hash, fork_block_hash))) ); // Check that attempt to move flat head to block 0 results in error because it is an unreachable parent. let parent_block_hash = chain.get_block_hash(0); assert_eq!( - flat_storage.update_flat_head(&parent_block_hash), + flat_storage.update_flat_head(&parent_block_hash, true), Err(FlatStorageError::BlockNotSupported((flat_head_hash, parent_block_hash))) ); // Check that attempt to move flat head to non-existent block results in the same error. let not_existing_hash = hash(&[1, 2, 3]); assert_eq!( - flat_storage.update_flat_head(¬_existing_hash), + flat_storage.update_flat_head(¬_existing_hash, true), Err(FlatStorageError::BlockNotSupported((flat_head_hash, not_existing_hash))) ); // Corrupt DB state for block 3 and try moving flat head to it. @@ -516,7 +636,7 @@ mod tests { store_helper::remove_delta(&mut store_update, shard_uid, chain.get_block_hash(3)); store_update.commit().unwrap(); assert_matches!( - flat_storage.update_flat_head(&chain.get_block_hash(3)), + flat_storage.update_flat_head(&chain.get_block_hash(3), true), Err(FlatStorageError::StorageInternalError(_)) ); } @@ -536,7 +656,10 @@ mod tests { for i in 1..5 { let delta = FlatStateDelta { changes: FlatStateChanges::default(), - metadata: FlatStateDeltaMetadata { block: chain.get_block(i * 2) }, + metadata: FlatStateDeltaMetadata { + block: chain.get_block(i * 2), + prev_block_with_changes: None, + }, }; store_helper::set_delta(&mut store_update, shard_uid, &delta); } @@ -549,7 +672,7 @@ mod tests { // Check that flat head can be moved to block 8. let flat_head_hash = chain.get_block_hash(8); - assert_eq!(flat_storage.update_flat_head(&flat_head_hash), Ok(())); + assert_eq!(flat_storage.update_flat_head(&flat_head_hash, false), Ok(())); } // This tests basic use cases for FlatStorageChunkView and FlatStorage. @@ -581,7 +704,10 @@ mod tests { vec![1], Some(FlatStateValue::value_ref(&[i as u8])), )]), - metadata: FlatStateDeltaMetadata { block: chain.get_block(i) }, + metadata: FlatStateDeltaMetadata { + block: chain.get_block(i), + prev_block_with_changes: None, + }, }; store_helper::set_delta(&mut store_update, shard_uid, &delta); } @@ -612,7 +738,10 @@ mod tests { (vec![1], None), (vec![2], Some(FlatStateValue::value_ref(&[1]))), ]), - metadata: FlatStateDeltaMetadata { block: chain.get_block_info(&hash) }, + metadata: FlatStateDeltaMetadata { + block: chain.get_block_info(&hash), + prev_block_with_changes: None, + }, }) .unwrap(); store_update.commit().unwrap(); @@ -640,7 +769,7 @@ mod tests { // 5. Move the flat head to block 5, verify that chunk_view0 still returns the same values // and chunk_view1 returns an error. Also check that DBCol::FlatState is updated correctly - flat_storage.update_flat_head(&chain.get_block_hash(5)).unwrap(); + flat_storage.update_flat_head(&chain.get_block_hash(5), true).unwrap(); assert_eq!( store_helper::get_flat_state_value(&store, shard_uid, &[1]).unwrap(), Some(FlatStateValue::value_ref(&[5])) @@ -664,7 +793,7 @@ mod tests { // 6. Move the flat head to block 10, verify that chunk_view0 still returns the same values // Also checks that DBCol::FlatState is updated correctly. - flat_storage.update_flat_head(&chain.get_block_hash(10)).unwrap(); + flat_storage.update_flat_head(&chain.get_block_hash(10), true).unwrap(); let blocks = flat_storage.get_blocks_to_head(&chain.get_block_hash(10)).unwrap(); assert_eq!(blocks.len(), 0); assert_eq!(store_helper::get_flat_state_value(&store, shard_uid, &[1]).unwrap(), None); @@ -679,4 +808,451 @@ mod tests { None ); } + + #[test] + fn flat_storage_with_hops() { + init_test_logger(); + // 1. Create a chain with no forks. Set flat head to be at block 0. + let num_blocks = 15; + let chain = MockChain::linear_chain(num_blocks); + let shard_uid = ShardUId::single_shard(); + let store = create_test_store(); + let mut store_update = store.store_update(); + store_helper::set_flat_storage_status( + &mut store_update, + shard_uid, + FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: chain.get_block(0) }), + ); + store_helper::set_flat_state_value( + &mut store_update, + shard_uid, + vec![1], + Some(FlatStateValue::value_ref(&[0])), + ); + store_update.commit().unwrap(); + + for i in 1..num_blocks as BlockHeight { + let mut store_update = store.store_update(); + let changes = if i % 3 == 0 { + // Add a change. + FlatStateChanges::from([(vec![1], Some(FlatStateValue::value_ref(&[i as u8])))]) + } else { + // No changes. + FlatStateChanges::from([]) + }; + + // Simulates `Chain::save_flat_state_changes()`. + let prev_block_with_changes = if changes.0.is_empty() { + store_helper::get_prev_block_with_changes( + &store, + shard_uid, + chain.get_block(i).hash, + chain.get_block(i).prev_hash, + ) + .unwrap() + } else { + None + }; + let delta = FlatStateDelta { + changes, + metadata: FlatStateDeltaMetadata { + block: chain.get_block(i), + prev_block_with_changes, + }, + }; + tracing::info!(?i, ?delta); + store_helper::set_delta(&mut store_update, shard_uid, &delta); + store_update.commit().unwrap(); + } + + let flat_storage_manager = FlatStorageManager::new(store.clone()); + flat_storage_manager.create_flat_storage_for_shard(shard_uid).unwrap(); + let flat_storage = flat_storage_manager.get_flat_storage_for_shard(shard_uid).unwrap(); + + // 2. Check that the chunk_view at block i reads the value of key &[1] as &[round_down_to_a_multiple_of_3(i)] + for i in 0..num_blocks as BlockHeight { + let block_hash = chain.get_block_hash(i); + let blocks = flat_storage.get_blocks_to_head(&block_hash).unwrap(); + let chunk_view = flat_storage_manager.chunk_view(shard_uid, block_hash).unwrap(); + let value = chunk_view.get_value(&[1]).unwrap(); + tracing::info!(?i, ?block_hash, ?value, blocks_to_head = ?blocks); + assert_eq!(value, Some(FlatStateValue::value_ref(&[((i / 3) * 3) as u8]))); + + for block_hash in blocks { + let delta = store_helper::get_delta_changes(&store.clone(), shard_uid, block_hash) + .unwrap() + .unwrap(); + assert!( + !delta.0.is_empty(), + "i: {i}, block_hash: {block_hash:?}, delta: {delta:?}" + ); + } + } + + // 3. Simulate moving head forward with a delay of two from the tip. + // flat head is chosen to keep 2 blocks between the suggested head and the chosen head, + // resulting in <=4 blocks on the way from the tip to the chosen head. + for i in 2..num_blocks as BlockHeight { + let final_block_hash = chain.get_block_hash(i - 2); + flat_storage.update_flat_head(&final_block_hash, false).unwrap(); + + let block_hash = chain.get_block_hash(i); + let blocks = flat_storage.get_blocks_to_head(&block_hash).unwrap(); + + assert!( + blocks.len() <= 2 + FlatStorageInner::BLOCKS_WITH_CHANGES_FLAT_HEAD_GAP as usize + ); + } + } + + #[test] + fn flat_storage_with_hops_random() { + init_test_logger(); + // 1. Create a long chain with no forks. Set flat head to be at block 0. + let num_blocks = 1000; + let mut rng = thread_rng(); + let chain = MockChain::linear_chain(num_blocks); + let shard_uid = ShardUId::single_shard(); + let store = create_test_store(); + let mut store_update = store.store_update(); + store_helper::set_flat_storage_status( + &mut store_update, + shard_uid, + FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: chain.get_block(0) }), + ); + store_helper::set_flat_state_value( + &mut store_update, + shard_uid, + vec![1], + Some(FlatStateValue::value_ref(&[0])), + ); + store_update.commit().unwrap(); + + for i in 1..num_blocks as BlockHeight { + let mut store_update = store.store_update(); + let changes = if rng.gen_bool(0.3) { + // Add a change. + FlatStateChanges::from([(vec![1], Some(FlatStateValue::value_ref(&[i as u8])))]) + } else { + // No changes. + FlatStateChanges::default() + }; + + // Simulates `Chain::save_flat_state_changes()`. + let prev_block_with_changes = if changes.0.is_empty() { + store_helper::get_prev_block_with_changes( + &store, + shard_uid, + chain.get_block(i).hash, + chain.get_block(i).prev_hash, + ) + .unwrap() + } else { + None + }; + let delta = FlatStateDelta { + changes, + metadata: FlatStateDeltaMetadata { + block: chain.get_block(i), + prev_block_with_changes, + }, + }; + tracing::info!(?i, ?delta); + store_helper::set_delta(&mut store_update, shard_uid, &delta); + store_update.commit().unwrap(); + } + + let flat_storage_manager = FlatStorageManager::new(store.clone()); + flat_storage_manager.create_flat_storage_for_shard(shard_uid).unwrap(); + let flat_storage = flat_storage_manager.get_flat_storage_for_shard(shard_uid).unwrap(); + + let hashes = (0..num_blocks as BlockHeight) + .map(|height| (chain.get_block_hash(height), height)) + .collect::>(); + + // 2. Simulate moving head, with the suggested head lagging by 2 blocks behind the tip. + let mut max_lag = None; + for i in 2..num_blocks as BlockHeight { + let final_block_hash = chain.get_block_hash(i - 2); + flat_storage.update_flat_head(&final_block_hash, false).unwrap(); + + let block_hash = chain.get_block_hash(i); + let blocks = flat_storage.get_blocks_to_head(&block_hash).unwrap(); + assert!( + blocks.len() <= 2 + FlatStorageInner::BLOCKS_WITH_CHANGES_FLAT_HEAD_GAP as usize + ); + for block_hash in blocks { + let delta = store_helper::get_delta_changes(&store.clone(), shard_uid, block_hash) + .unwrap() + .unwrap(); + assert!( + !delta.0.is_empty(), + "i: {i}, block_hash: {block_hash:?}, delta: {delta:?}" + ); + } + + let flat_head_hash = flat_storage.get_head_hash(); + let flat_head_height = hashes.get(&flat_head_hash).unwrap(); + + let flat_head_lag = i - flat_head_height; + let delta = store_helper::get_delta_changes(&store.clone(), shard_uid, block_hash) + .unwrap() + .unwrap(); + let has_changes = !delta.0.is_empty(); + tracing::info!(?i, has_changes, ?flat_head_lag); + max_lag = max_lag.max(Some(flat_head_lag)); + } + tracing::info!(?max_lag); + } + + #[test] + fn test_new_flat_head() { + init_test_logger(); + + let shard_uid = ShardUId::single_shard(); + + // Case 1. Each block has flat state changes. + { + tracing::info!("Case 1"); + let num_blocks = 10; + let chain = MockChain::linear_chain(num_blocks); + let store = create_test_store(); + let mut store_update = store.store_update(); + store_helper::set_flat_storage_status( + &mut store_update, + shard_uid, + FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: chain.get_block(0) }), + ); + store_helper::set_flat_state_value( + &mut store_update, + shard_uid, + vec![1], + Some(FlatStateValue::value_ref(&[0])), + ); + store_update.commit().unwrap(); + + for i in 1..num_blocks as BlockHeight { + let mut store_update = store.store_update(); + // Add a change. + let changes = FlatStateChanges::from([( + vec![1], + Some(FlatStateValue::value_ref(&[i as u8])), + )]); + let delta = FlatStateDelta { + changes, + metadata: FlatStateDeltaMetadata { + block: chain.get_block(i), + prev_block_with_changes: None, + }, + }; + tracing::info!(?i, ?delta); + store_helper::set_delta(&mut store_update, shard_uid, &delta); + store_update.commit().unwrap(); + } + + let flat_storage_manager = FlatStorageManager::new(store); + flat_storage_manager.create_flat_storage_for_shard(shard_uid).unwrap(); + let flat_storage = flat_storage_manager.get_flat_storage_for_shard(shard_uid).unwrap(); + let guard = flat_storage.0.write().expect(crate::flat::POISONED_LOCK_ERR); + + for i in 0..num_blocks as BlockHeight { + let block_hash = chain.get_block_hash(i); + let new_head = guard.test_get_new_flat_head(block_hash, false).unwrap(); + tracing::info!(i, ?block_hash, ?new_head); + if i == 0 { + assert_eq!(new_head, chain.get_block_hash(0)); + } else { + assert_eq!(new_head, chain.get_block_hash(i - 1)); + } + } + } + + // Case 2. Even-numbered blocks have flat changes + { + tracing::info!("Case 2"); + let num_blocks = 20; + let chain = MockChain::linear_chain(num_blocks); + let store = create_test_store(); + let mut store_update = store.store_update(); + store_helper::set_flat_storage_status( + &mut store_update, + shard_uid, + FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: chain.get_block(0) }), + ); + store_helper::set_flat_state_value( + &mut store_update, + shard_uid, + vec![1], + Some(FlatStateValue::value_ref(&[0])), + ); + store_update.commit().unwrap(); + + for i in 1..num_blocks as BlockHeight { + let mut store_update = store.store_update(); + let (changes, prev_block_with_changes) = if i % 2 == 0 { + // Add a change. + ( + FlatStateChanges::from([( + vec![1], + Some(FlatStateValue::value_ref(&[i as u8])), + )]), + None, + ) + } else { + // No changes. + ( + FlatStateChanges::default(), + Some(BlockWithChangesInfo { + hash: chain.get_block_hash(i - 1), + height: i - 1, + }), + ) + }; + + let delta = FlatStateDelta { + changes, + metadata: FlatStateDeltaMetadata { + block: chain.get_block(i), + prev_block_with_changes, + }, + }; + tracing::info!(?i, ?delta); + store_helper::set_delta(&mut store_update, shard_uid, &delta); + store_update.commit().unwrap(); + } + + let flat_storage_manager = FlatStorageManager::new(store); + flat_storage_manager.create_flat_storage_for_shard(shard_uid).unwrap(); + let flat_storage = flat_storage_manager.get_flat_storage_for_shard(shard_uid).unwrap(); + let guard = flat_storage.0.write().expect(crate::flat::POISONED_LOCK_ERR); + + // A chain looks like this: + // X-O-X-O-X-O-... + // where X is a block with flat state changes and O is a block without flat state changes. + for i in 0..num_blocks as BlockHeight { + let new_head = guard.get_new_flat_head(chain.get_block_hash(i), false).unwrap(); + if i <= 3 { + assert_eq!(new_head, chain.get_block_hash(0)); + } else { + // if i is odd, then it's pointing at an O, and the head is expected to be 3 blocks ago. + // i + // | + // v + // X-O-X-O-X-O-X-O + // ^ + // | + // new_head + // + // if i is even, then it's pointing at an X, and the head is expected to be the previous X: + // i + // | + // v + // X-O-X-O-X-O-X-O + // ^ + // | + // new_head + + // Both of these cases can be computed as rounding (i-2) down to a multiple of 2. + assert_eq!(new_head, chain.get_block_hash(((i - 2) / 2) * 2)); + } + } + } + + // Case 3. Triplets of blocks: HasChanges, NoChanges, HasChanges. + { + tracing::info!("Case 3"); + let num_blocks = 20; + let chain = MockChain::linear_chain(num_blocks); + let store = create_test_store(); + let mut store_update = store.store_update(); + store_helper::set_flat_storage_status( + &mut store_update, + shard_uid, + FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: chain.get_block(0) }), + ); + store_helper::set_flat_state_value( + &mut store_update, + shard_uid, + vec![1], + Some(FlatStateValue::value_ref(&[0])), + ); + store_update.commit().unwrap(); + + for i in 1..num_blocks as BlockHeight { + let mut store_update = store.store_update(); + let (changes, prev_block_with_changes) = if i % 3 == 1 { + // No changes. + ( + FlatStateChanges::default(), + Some(BlockWithChangesInfo { + hash: chain.get_block_hash(i - 1), + height: i - 1, + }), + ) + } else { + // Add a change. + ( + FlatStateChanges::from([( + vec![1], + Some(FlatStateValue::value_ref(&[i as u8])), + )]), + None, + ) + }; + + let delta = FlatStateDelta { + changes, + metadata: FlatStateDeltaMetadata { + block: chain.get_block(i), + prev_block_with_changes, + }, + }; + tracing::info!(?i, ?delta); + store_helper::set_delta(&mut store_update, shard_uid, &delta); + store_update.commit().unwrap(); + } + + let flat_storage_manager = FlatStorageManager::new(store); + flat_storage_manager.create_flat_storage_for_shard(shard_uid).unwrap(); + let flat_storage = flat_storage_manager.get_flat_storage_for_shard(shard_uid).unwrap(); + let guard = flat_storage.0.write().expect(crate::flat::POISONED_LOCK_ERR); + + for i in 0..num_blocks as BlockHeight { + let new_head = guard.get_new_flat_head(chain.get_block_hash(i), false).unwrap(); + // if i%3 == 0 + // i + // | + // v + // X-O-X-X-O-X-X-O-X + // ^ + // | + // new_head + // + // if i%3 == 1 + // i + // | + // v + // X-O-X-X-O-X-X-O-X + // ^ + // | + // new_head + // + // if i%3 == 2 + // i + // | + // v + // X-O-X-X-O-X-X-O-X + // ^ + // | + // new_head + if i <= 2 { + assert_eq!(new_head, chain.get_block_hash(0)); + } else if i % 3 == 0 { + assert_eq!(new_head, chain.get_block_hash(i - 1)); + } else { + assert_eq!(new_head, chain.get_block_hash(i - 2)); + } + } + } + } } diff --git a/core/store/src/flat/store_helper.rs b/core/store/src/flat/store_helper.rs index f0405a6be09..d0b4a015d38 100644 --- a/core/store/src/flat/store_helper.rs +++ b/core/store/src/flat/store_helper.rs @@ -1,21 +1,20 @@ //! This file contains helper functions for accessing flat storage data in DB //! TODO(#8577): remove this file and move functions to the corresponding structs -use std::io; - +use super::delta::{FlatStateDelta, FlatStateDeltaMetadata}; +use super::types::{ + FlatStateIterator, FlatStateValuesInliningMigrationStatus, FlatStorageResult, FlatStorageStatus, +}; use crate::db::FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY; -use crate::flat::delta::{FlatStateChanges, KeyForFlatStateDelta}; +use crate::flat::delta::{BlockWithChangesInfo, FlatStateChanges, KeyForFlatStateDelta}; use crate::flat::types::FlatStorageError; +use crate::flat::FlatStorageReadyStatus; use crate::{DBCol, Store, StoreUpdate}; use borsh::BorshDeserialize; use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::ShardUId; use near_primitives::state::FlatStateValue; - -use super::delta::{FlatStateDelta, FlatStateDeltaMetadata}; -use super::types::{ - FlatStateIterator, FlatStateValuesInliningMigrationStatus, FlatStorageResult, FlatStorageStatus, -}; +use std::io; pub fn get_delta_changes( store: &Store, @@ -46,6 +45,56 @@ pub fn get_all_deltas_metadata( .collect() } +/// Retrieves a row of `FlatStateDeltaMetadata` for the given key. +fn get_delta_metadata( + store: &Store, + shard_uid: ShardUId, + block_hash: CryptoHash, +) -> FlatStorageResult> { + let key = KeyForFlatStateDelta { shard_uid, block_hash }.to_bytes(); + store.get_ser(DBCol::FlatStateDeltaMetadata, &key).map_err(|err| { + FlatStorageError::StorageInternalError(format!( + "failed to read delta metadata for {key:?}: {err}" + )) + }) +} + +pub fn get_prev_block_with_changes( + store: &Store, + shard_uid: ShardUId, + block_hash: CryptoHash, + prev_hash: CryptoHash, +) -> FlatStorageResult> { + let prev_delta_metadata = get_delta_metadata(store, shard_uid, prev_hash)?; + let prev_block_with_changes = match prev_delta_metadata { + None => { + // DeltaMetadata not found, which means the prev block is the flat head. + let flat_storage_status = get_flat_storage_status(store, shard_uid)?; + match flat_storage_status { + FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head }) => { + if flat_head.hash == prev_hash { + Some(BlockWithChangesInfo { hash: prev_hash, height: flat_head.height }) + } else { + tracing::error!(target: "store", ?block_hash, ?prev_hash, "Missing delta metadata"); + None + } + } + // Don't do any performance optimizations while flat storage is not ready. + _ => None, + } + } + Some(metadata) => { + // If the prev block contains `prev_block_with_changes`, then use that value. + // Otherwise reference the prev block. + Some(metadata.prev_block_with_changes.unwrap_or(BlockWithChangesInfo { + hash: metadata.block.hash, + height: metadata.block.height, + })) + } + }; + Ok(prev_block_with_changes) +} + pub fn set_delta(store_update: &mut StoreUpdate, shard_uid: ShardUId, delta: &FlatStateDelta) { let key = KeyForFlatStateDelta { shard_uid, block_hash: delta.metadata.block.hash }.to_bytes(); store_update diff --git a/core/store/src/flat/types.rs b/core/store/src/flat/types.rs index e7148702a60..0cce82be461 100644 --- a/core/store/src/flat/types.rs +++ b/core/store/src/flat/types.rs @@ -12,8 +12,8 @@ pub struct BlockInfo { } impl BlockInfo { - pub fn genesis(hash: CryptoHash, height: BlockHeight) -> BlockInfo { - BlockInfo { hash, height, prev_hash: CryptoHash::default() } + pub fn genesis(hash: CryptoHash, height: BlockHeight) -> Self { + Self { hash, height, prev_hash: CryptoHash::default() } } } diff --git a/core/store/src/metadata.rs b/core/store/src/metadata.rs index b516296a5c7..eab2c05041c 100644 --- a/core/store/src/metadata.rs +++ b/core/store/src/metadata.rs @@ -2,7 +2,7 @@ pub type DbVersion = u32; /// Current version of the database. -pub const DB_VERSION: DbVersion = 37; +pub const DB_VERSION: DbVersion = 38; /// Database version at which point DbKind was introduced. const DB_VERSION_WITH_KIND: DbVersion = 34; diff --git a/core/store/src/metrics.rs b/core/store/src/metrics.rs index f480561e17f..0820027f9c2 100644 --- a/core/store/src/metrics.rs +++ b/core/store/src/metrics.rs @@ -464,7 +464,15 @@ pub mod flat_state_metrics { pub static FLAT_STORAGE_DISTANCE_TO_HEAD: Lazy = Lazy::new(|| { try_create_int_gauge_vec( "flat_storage_distance_to_head", - "Distance between processed block and flat storage head", + "Height distance between processed block and flat storage head", + &["shard_id"], + ) + .unwrap() + }); + pub static FLAT_STORAGE_HOPS_TO_HEAD: Lazy = Lazy::new(|| { + try_create_int_gauge_vec( + "flat_storage_hops_to_head", + "Number of blocks visited to flat storage head", &["shard_id"], ) .unwrap() diff --git a/core/store/src/migrations.rs b/core/store/src/migrations.rs index 8e137c60441..a08475dc4e0 100644 --- a/core/store/src/migrations.rs +++ b/core/store/src/migrations.rs @@ -206,3 +206,27 @@ pub fn migrate_36_to_37(store: &Store) -> anyhow::Result<()> { update.commit()?; Ok(()) } + +/// Migrates the database from version 37 to 38. +/// +/// Rewrites FlatStateDeltaMetadata to add a bit to Metadata, `prev_block_with_changes`. +/// That bit is initialized with a `None` regardless of the corresponding flat state changes. +pub fn migrate_37_to_38(store: &Store) -> anyhow::Result<()> { + #[derive(borsh::BorshDeserialize)] + struct LegacyFlatStateDeltaMetadata { + block: crate::flat::BlockInfo, + } + + let mut update = store.store_update(); + update.delete_all(DBCol::FlatStateDeltaMetadata); + for result in store.iter(DBCol::FlatStateDeltaMetadata) { + let (key, old_value) = result?; + let LegacyFlatStateDeltaMetadata { block } = + LegacyFlatStateDeltaMetadata::try_from_slice(&old_value)?; + let new_value = + crate::flat::FlatStateDeltaMetadata { block, prev_block_with_changes: None }; + update.set(DBCol::FlatStateDeltaMetadata, &key, &new_value.try_to_vec()?); + } + update.commit()?; + Ok(()) +} diff --git a/core/store/src/trie/shard_tries.rs b/core/store/src/trie/shard_tries.rs index 45d8c24142b..0c74dd4992a 100644 --- a/core/store/src/trie/shard_tries.rs +++ b/core/store/src/trie/shard_tries.rs @@ -3,9 +3,11 @@ use crate::trie::config::TrieConfig; use crate::trie::prefetching_trie_storage::PrefetchingThreadsHandle; use crate::trie::trie_storage::{TrieCache, TrieCachingStorage}; use crate::trie::{TrieRefcountChange, POISONED_LOCK_ERR}; +use crate::Mode; use crate::{checkpoint_hot_storage_and_cleanup_columns, metrics, DBCol, NodeStorage, PrefetchApi}; use crate::{Store, StoreConfig, StoreUpdate, Trie, TrieChanges, TrieUpdate}; use borsh::BorshSerialize; +use near_primitives::block::Block; use near_primitives::borsh::maybestd::collections::HashMap; use near_primitives::errors::EpochError; use near_primitives::errors::StorageError; @@ -60,22 +62,36 @@ impl StateSnapshot { prev_block_hash: CryptoHash, flat_storage_manager: FlatStorageManager, shard_uids: &[ShardUId], + block: Option<&Block>, ) -> Self { tracing::debug!(target: "state_snapshot", ?shard_uids, ?prev_block_hash, "new StateSnapshot"); for shard_uid in shard_uids { if let Err(err) = flat_storage_manager.create_flat_storage_for_shard(*shard_uid) { tracing::warn!(target: "state_snapshot", ?err, ?shard_uid, "Failed to create a flat storage for snapshot shard"); - } else { + continue; + } + if let Some(block) = block { let flat_storage = flat_storage_manager.get_flat_storage_for_shard(*shard_uid).unwrap(); - tracing::debug!(target: "state_snapshot", ?shard_uid, current_flat_head = ?flat_storage.get_head_hash(), desired_flat_head = ?prev_block_hash, "Moving FlatStorage head of the snapshot"); + let current_flat_head = flat_storage.get_head_hash(); + tracing::debug!(target: "state_snapshot", ?shard_uid, ?current_flat_head, block_hash = ?block.header().hash(), block_height = block.header().height(), "Moving FlatStorage head of the snapshot"); let _timer = metrics::MOVE_STATE_SNAPSHOT_FLAT_HEAD_ELAPSED .with_label_values(&[&shard_uid.shard_id.to_string()]) .start_timer(); - if let Err(err) = flat_storage.update_flat_head(&prev_block_hash) { - tracing::error!(target: "state_snapshot", ?err, ?shard_uid, current_flat_head = ?flat_storage.get_head_hash(), ?prev_block_hash, "Failed to Move FlatStorage head of the snapshot"); + if let Some(chunk) = block.chunks().get(shard_uid.shard_id as usize) { + // Flat state snapshot needs to be at a height that lets it + // replay the last chunk of the shard. + let desired_flat_head = chunk.prev_block_hash(); + match flat_storage.update_flat_head(desired_flat_head, true) { + Ok(_) => { + tracing::debug!(target: "state_snapshot", ?shard_uid, ?current_flat_head, ?desired_flat_head, "Successfully moved FlatStorage head of the snapshot"); + } + Err(err) => { + tracing::error!(target: "state_snapshot", ?shard_uid, ?err, ?current_flat_head, ?desired_flat_head, "Failed to move FlatStorage head of the snapshot"); + } + } } else { - tracing::debug!(target: "state_snapshot", ?shard_uid, new_flat_head = ?flat_storage.get_head_hash(), desired_flat_head = ?prev_block_hash, "Successfully moved FlatStorage head of the snapshot"); + tracing::error!(target: "state_snapshot", ?shard_uid, current_flat_head = ?flat_storage.get_head_hash(), ?prev_block_hash, "Failed to move FlatStorage head of the snapshot, no chunk"); } } } @@ -437,6 +453,7 @@ impl ShardTries { &self, prev_block_hash: &CryptoHash, shard_uids: &[ShardUId], + block: &Block, ) -> Result<(), anyhow::Error> { metrics::HAS_STATE_SNAPSHOT.set(0); // The function returns an `anyhow::Error`, because no special handling of errors is done yet. The errors are logged and ignored. @@ -507,6 +524,7 @@ impl ShardTries { *prev_block_hash, flat_storage_manager, shard_uids, + Some(block), )); metrics::HAS_STATE_SNAPSHOT.set(1); tracing::info!(target: "state_snapshot", ?prev_block_hash, "Made a checkpoint"); @@ -631,7 +649,7 @@ impl ShardTries { let store_config = StoreConfig::default(); let opener = NodeStorage::opener(&snapshot_dir, false, &store_config, None); - let storage = opener.open()?; + let storage = opener.open_in_mode(Mode::ReadOnly)?; let store = storage.get_hot_store(); let flat_storage_manager = FlatStorageManager::new(store.clone()); @@ -642,6 +660,7 @@ impl ShardTries { *prev_block_hash, flat_storage_manager, &shard_uids, + None, )); metrics::HAS_STATE_SNAPSHOT.set(1); tracing::info!(target: "runtime", ?prev_block_hash, ?snapshot_dir, "Detected and opened a state snapshot."); diff --git a/integration-tests/src/tests/client/flat_storage.rs b/integration-tests/src/tests/client/flat_storage.rs index 2f371e79d80..789e0861cab 100644 --- a/integration-tests/src/tests/client/flat_storage.rs +++ b/integration-tests/src/tests/client/flat_storage.rs @@ -3,9 +3,12 @@ use assert_matches::assert_matches; use near_chain::{ChainGenesis, Provenance}; use near_chain_configs::Genesis; use near_client::test_utils::TestEnv; +use near_client::ProcessTxResponse; +use near_crypto::{InMemorySigner, KeyType}; use near_o11y::testonly::init_test_logger; use near_primitives::errors::StorageError; use near_primitives::shard_layout::{ShardLayout, ShardUId}; +use near_primitives::transaction::SignedTransaction; use near_primitives::trie_key::TrieKey; use near_primitives::types::AccountId; use near_primitives_core::types::BlockHeight; @@ -24,7 +27,7 @@ use std::time::Duration; use super::utils::TestEnvNightshadeSetupExt; /// Height on which we start flat storage background creation. -const START_HEIGHT: BlockHeight = 4; +const START_HEIGHT: BlockHeight = 7; /// Number of steps which should be enough to create flat storage. const CREATION_TIMEOUT: BlockHeight = 30; @@ -130,13 +133,25 @@ fn test_flat_storage_creation_sanity() { // Process some blocks with flat storage. Then remove flat storage data from disk. { let mut env = setup_env(&genesis, store.clone()); + let signer = InMemorySigner::from_seed("test0".parse().unwrap(), KeyType::ED25519, "test0"); + let genesis_hash = *env.clients[0].chain.genesis().hash(); for height in 1..START_HEIGHT { env.produce_block(0, height); + + let tx = SignedTransaction::send_money( + height, + "test0".parse().unwrap(), + "test0".parse().unwrap(), + &signer, + 1, + genesis_hash, + ); + assert_eq!(env.clients[0].process_tx(tx, false, false), ProcessTxResponse::ValidTx); } // If chain was initialized from scratch, flat storage state should be created. During block processing, flat - // storage head should be moved to block `START_HEIGHT - 3`. - let flat_head_height = START_HEIGHT - 3; + // storage head should be moved to block `START_HEIGHT - 4`. + let flat_head_height = START_HEIGHT - 4; let expected_flat_storage_head = env.clients[0].chain.get_block_hash_by_height(flat_head_height).unwrap(); let status = store_helper::get_flat_storage_status(&store, shard_uid); @@ -147,18 +162,19 @@ fn test_flat_storage_creation_sanity() { panic!("expected FlatStorageStatus::Ready status, got {status:?}"); } - // Deltas for blocks until `START_HEIGHT - 2` should not exist. - for height in 0..START_HEIGHT - 2 { + // Deltas for blocks until `flat_head_height` should not exist. + for height in 0..=flat_head_height { let block_hash = env.clients[0].chain.get_block_hash_by_height(height).unwrap(); assert_eq!(store_helper::get_delta_changes(&store, shard_uid, block_hash), Ok(None)); } // Deltas for blocks until `START_HEIGHT` should still exist, // because they come after flat storage head. - for height in START_HEIGHT - 2..START_HEIGHT { + for height in flat_head_height + 1..START_HEIGHT { let block_hash = env.clients[0].chain.get_block_hash_by_height(height).unwrap(); assert_matches!( store_helper::get_delta_changes(&store, shard_uid, block_hash), - Ok(Some(_)) + Ok(Some(_)), + "height: {height}" ); } @@ -236,8 +252,20 @@ fn test_flat_storage_creation_two_shards() { // Process some blocks with flat storages for two shards. Then remove flat storage data from disk for shard 0. { let mut env = setup_env(&genesis, store.clone()); + let signer = InMemorySigner::from_seed("test0".parse().unwrap(), KeyType::ED25519, "test0"); + let genesis_hash = *env.clients[0].chain.genesis().hash(); for height in 1..START_HEIGHT { env.produce_block(0, height); + + let tx = SignedTransaction::send_money( + height, + "test0".parse().unwrap(), + "test0".parse().unwrap(), + &signer, + 1, + genesis_hash, + ); + assert_eq!(env.clients[0].process_tx(tx, false, false), ProcessTxResponse::ValidTx); } for &shard_uid in &shard_uids { @@ -461,6 +489,10 @@ fn test_flat_storage_iter() { } #[test] +/// Initializes flat storage, then creates a Trie to read the flat storage +/// exactly at the flat head block. +/// Add another block to the flat state, which moves flat head and makes the +/// state of the previous flat head inaccessible. fn test_not_supported_block() { init_test_logger(); let genesis = Genesis::test(vec!["test0".parse().unwrap()], 1); @@ -469,11 +501,24 @@ fn test_not_supported_block() { let store = create_test_store(); let mut env = setup_env(&genesis, store); + let signer = InMemorySigner::from_seed("test0".parse().unwrap(), KeyType::ED25519, "test0"); + let genesis_hash = *env.clients[0].chain.genesis().hash(); + // Produce blocks up to `START_HEIGHT`. for height in 1..START_HEIGHT { env.produce_block(0, height); + let tx = SignedTransaction::send_money( + height, + "test0".parse().unwrap(), + "test0".parse().unwrap(), + &signer, + 1, + genesis_hash, + ); + assert_eq!(env.clients[0].process_tx(tx, false, false), ProcessTxResponse::ValidTx); } + let flat_head_height = START_HEIGHT - 4; // Trie key which must exist in the storage. let trie_key_bytes = near_primitives::trie_key::TrieKey::Account { account_id: "test0".parse().unwrap() } @@ -483,7 +528,7 @@ fn test_not_supported_block() { // After creating the first trie, produce block `START_HEIGHT` which moves flat storage // head 1 block further and invalidates it. let mut get_ref_results = vec![]; - for height in START_HEIGHT - 3..START_HEIGHT - 1 { + for height in flat_head_height..START_HEIGHT - 1 { let block_hash = env.clients[0].chain.get_block_hash_by_height(height).unwrap(); let state_root = *env.clients[0] .chain @@ -495,7 +540,7 @@ fn test_not_supported_block() { .runtime_adapter .get_trie_for_shard(shard_uid.shard_id(), &block_hash, state_root, true) .unwrap(); - if height == START_HEIGHT - 3 { + if height == flat_head_height { env.produce_block(0, START_HEIGHT); } get_ref_results.push(trie.get_ref(&trie_key_bytes, KeyLookupMode::FlatStorage)); diff --git a/integration-tests/src/tests/nearcore/sync_state_nodes.rs b/integration-tests/src/tests/nearcore/sync_state_nodes.rs index 47954f8805d..755ba7ff431 100644 --- a/integration-tests/src/tests/nearcore/sync_state_nodes.rs +++ b/integration-tests/src/tests/nearcore/sync_state_nodes.rs @@ -2,6 +2,7 @@ use crate::test_helpers::heavy_test; use actix::{Actor, System}; use futures::{future, FutureExt}; use near_actix_test_utils::run_actix; +use near_chain::chain::ApplyStatePartsRequest; use near_chain::types::RuntimeAdapter; use near_chain::{ChainGenesis, Provenance}; use near_chain_configs::ExternalStorageLocation::Filesystem; @@ -14,12 +15,14 @@ use near_network::tcp; use near_network::test_utils::{convert_boot_nodes, wait_or_timeout, WaitOrTimeoutActor}; use near_o11y::testonly::{init_integration_logger, init_test_logger}; use near_o11y::WithSpanContextExt; +use near_primitives::shard_layout::ShardUId; use near_primitives::state_part::PartId; -use near_primitives::syncing::get_num_state_parts; +use near_primitives::syncing::{get_num_state_parts, StatePartKey}; use near_primitives::transaction::SignedTransaction; use near_primitives::utils::MaybeValidated; +use near_primitives_core::types::ShardId; use near_store::genesis::initialize_genesis_state; -use near_store::{NodeStorage, Store}; +use near_store::{DBCol, NodeStorage, Store}; use nearcore::{config::GenesisExt, load_test_config, start_with_config, NightshadeRuntime}; use std::ops::ControlFlow; use std::sync::{Arc, RwLock}; @@ -544,7 +547,6 @@ fn sync_state_dump() { } #[test] -#[ignore] // Test that state sync behaves well when the chunks are absent at the end of the epoch. // The test actually fails and the code needs fixing. fn test_dump_epoch_missing_chunk_in_last_block() { @@ -552,7 +554,7 @@ fn test_dump_epoch_missing_chunk_in_last_block() { init_test_logger(); let epoch_length = 10; - for num_last_chunks_missing in 0..5 { + for num_last_chunks_missing in 0..6 { assert!(num_last_chunks_missing < epoch_length); let mut genesis = Genesis::test(vec!["test0".parse().unwrap(), "test1".parse().unwrap()], 1); @@ -659,7 +661,7 @@ fn test_dump_epoch_missing_chunk_in_last_block() { let state_root_node = state_sync_header.state_root_node(); let num_parts = get_num_state_parts(state_root_node.memory_usage); // Check that state parts can be obtained. - let state_parts: Vec<_> = (0..num_parts) + let state_sync_parts: Vec<_> = (0..num_parts) .map(|i| { // This should obviously not fail, aka succeed. env.clients[0].chain.get_state_response_part(0, i, sync_hash).unwrap() @@ -675,11 +677,69 @@ fn test_dump_epoch_missing_chunk_in_last_block() { 0, &state_root, PartId::new(i, num_parts), - &state_parts[i as usize], + &state_sync_parts[i as usize], &epoch_id, ) .unwrap(); } + + env.clients[1].chain.set_state_header(0, sync_hash, state_sync_header).unwrap(); + for i in 0..num_parts { + env.clients[1] + .chain + .set_state_part( + 0, + sync_hash, + PartId::new(i, num_parts), + &state_sync_parts[i as usize], + ) + .unwrap(); + } + let rt = Arc::clone(&env.clients[1].runtime_adapter); + let f = move |msg: ApplyStatePartsRequest| { + use borsh::BorshSerialize; + let store = rt.store(); + + let shard_id = msg.shard_uid.shard_id as ShardId; + for part_id in 0..msg.num_parts { + let key = StatePartKey(msg.sync_hash, shard_id, part_id).try_to_vec().unwrap(); + let part = store.get(DBCol::StateParts, &key).unwrap().unwrap(); + + rt.apply_state_part( + shard_id, + &msg.state_root, + PartId::new(part_id, msg.num_parts), + &part, + &msg.epoch_id, + ) + .unwrap(); + } + }; + env.clients[1].chain.schedule_apply_state_parts(0, sync_hash, num_parts, &f).unwrap(); + env.clients[1].chain.set_state_finalize(0, sync_hash, Ok(())).unwrap(); + let last_chunk_height = epoch_length - num_last_chunks_missing; + for height in 1..epoch_length { + if height < last_chunk_height { + assert!(env.clients[1] + .chain + .get_chunk_extra(blocks[height as usize].hash(), &ShardUId::single_shard()) + .is_err()); + } else { + let chunk_extra = env.clients[1] + .chain + .get_chunk_extra(blocks[height as usize].hash(), &ShardUId::single_shard()) + .unwrap(); + let expected_chunk_extra = env.clients[0] + .chain + .get_chunk_extra( + blocks[last_chunk_height as usize].hash(), + &ShardUId::single_shard(), + ) + .unwrap(); + // The chunk extra of the prev block of sync block should be the same as the node that it is syncing from + assert_eq!(chunk_extra, expected_chunk_extra); + } + } } }); } diff --git a/nearcore/src/migrations.rs b/nearcore/src/migrations.rs index 43812efec3e..367f14fee74 100644 --- a/nearcore/src/migrations.rs +++ b/nearcore/src/migrations.rs @@ -112,6 +112,7 @@ impl<'a> near_store::StoreMigrator for Migrator<'a> { Ok(()) } 36 => near_store::migrations::migrate_36_to_37(store), + 37 => near_store::migrations::migrate_37_to_38(store), DB_VERSION.. => unreachable!(), } } diff --git a/nearcore/src/runtime/mod.rs b/nearcore/src/runtime/mod.rs index f313a0b352b..a0316fabe86 100644 --- a/nearcore/src/runtime/mod.rs +++ b/nearcore/src/runtime/mod.rs @@ -1389,6 +1389,7 @@ mod test { height, prev_hash: *prev_block_hash, }, + prev_block_with_changes: None, }, }; let new_store_update = flat_storage.add_delta(delta).unwrap(); diff --git a/neard/src/cli.rs b/neard/src/cli.rs index 5eab29ea758..a89f67da8b3 100644 --- a/neard/src/cli.rs +++ b/neard/src/cli.rs @@ -121,7 +121,7 @@ impl NeardCmd { cmd.run()?; } NeardSubCommand::FlatStorage(cmd) => { - cmd.run(&home_dir)?; + cmd.run(&home_dir, genesis_validation)?; } NeardSubCommand::ValidateConfig(cmd) => { cmd.run(&home_dir)?; diff --git a/runtime/runtime-params-estimator/src/estimator_context.rs b/runtime/runtime-params-estimator/src/estimator_context.rs index 1885726e74a..d6cefee1f48 100644 --- a/runtime/runtime-params-estimator/src/estimator_context.rs +++ b/runtime/runtime-params-estimator/src/estimator_context.rs @@ -178,7 +178,7 @@ impl<'c> EstimatorContext<'c> { flat_storage .add_delta(FlatStateDelta { changes: FlatStateChanges::from(random_data), - metadata: FlatStateDeltaMetadata { block }, + metadata: FlatStateDeltaMetadata { block, prev_block_with_changes: None }, }) .unwrap(); } diff --git a/tools/flat-storage/src/commands.rs b/tools/flat-storage/src/commands.rs index d667115fff8..f0633389e0f 100644 --- a/tools/flat-storage/src/commands.rs +++ b/tools/flat-storage/src/commands.rs @@ -4,6 +4,7 @@ use clap::Parser; use near_chain::flat_storage_creator::FlatStorageShardCreator; use near_chain::types::RuntimeAdapter; use near_chain::{ChainStore, ChainStoreAccess}; +use near_chain_configs::GenesisValidationMode; use near_epoch_manager::{EpochManager, EpochManagerAdapter, EpochManagerHandle}; use near_store::flat::{ inline_flat_state_values, store_helper, FlatStateDelta, FlatStateDeltaMetadata, @@ -142,9 +143,18 @@ impl FlatStorageCommand { (node_storage, epoch_manager, hot_runtime, chain_store, hot_store) } - pub fn run(&self, home_dir: &PathBuf) -> anyhow::Result<()> { - let near_config = load_config(home_dir, near_chain_configs::GenesisValidationMode::Full)?; - let opener = NodeStorage::opener(home_dir, false, &near_config.config.store, None); + pub fn run( + &self, + home_dir: &PathBuf, + genesis_validation: GenesisValidationMode, + ) -> anyhow::Result<()> { + let near_config = load_config(home_dir, genesis_validation)?; + let opener = NodeStorage::opener( + home_dir, + near_config.config.archive, + &near_config.config.store, + None, + ); match &self.subcmd { SubCommand::View => {