From 1df1f3491bada1d85087483b45da8ae12ba24982 Mon Sep 17 00:00:00 2001 From: Shreyan Gupta Date: Thu, 21 Dec 2023 10:57:59 +0530 Subject: [PATCH 1/2] [refactor] Move garbage collection related code into a separate file --- chain/chain/src/chain.rs | 306 +----- chain/chain/src/garbage_collection.rs | 1325 +++++++++++++++++++++++++ chain/chain/src/lib.rs | 1 + chain/chain/src/store.rs | 1099 +------------------- chain/chain/src/test_utils.rs | 24 + 5 files changed, 1390 insertions(+), 1365 deletions(-) create mode 100644 chain/chain/src/garbage_collection.rs diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 1a347e933fc..0c64f555375 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -10,7 +10,7 @@ use crate::missing_chunks::MissingChunksPool; use crate::orphan::{Orphan, OrphanBlockPool}; use crate::state_request_tracker::StateRequestTracker; use crate::state_snapshot_actor::SnapshotCallbacks; -use crate::store::{ChainStore, ChainStoreAccess, ChainStoreUpdate, GCMode}; +use crate::store::{ChainStore, ChainStoreAccess, ChainStoreUpdate}; use crate::types::{ AcceptedBlock, ApplySplitStateResultOrStateChanges, ApplyTransactionResult, @@ -94,7 +94,7 @@ use near_primitives::views::{ use near_store::config::StateSnapshotType; use near_store::flat::{store_helper, FlatStorageReadyStatus, FlatStorageStatus}; use near_store::get_genesis_state_roots; -use near_store::{DBCol, ShardTries}; +use near_store::DBCol; use once_cell::sync::OnceCell; use rand::seq::SliceRandom; use rand::SeedableRng; @@ -639,242 +639,6 @@ impl Chain { Ok(()) } - // GC CONTRACT - // === - // - // Prerequisites, guaranteed by the System: - // 1. Genesis block is available and should not be removed by GC. - // 2. No block in storage except Genesis has height lower or equal to `genesis_height`. - // 3. There is known lowest block height (Tail) came from Genesis or State Sync. - // a. Tail is always on the Canonical Chain. - // b. Only one Tail exists. - // c. Tail's height is higher than or equal to `genesis_height`, - // 4. There is a known highest block height (Head). - // a. Head is always on the Canonical Chain. - // 5. All blocks in the storage have heights in range [Tail; Head]. - // a. All forks end up on height of Head or lower. - // 6. If block A is ancestor of block B, height of A is strictly less then height of B. - // 7. (Property 1). A block with the lowest height among all the blocks at which the fork has started, - // i.e. all the blocks with the outgoing degree 2 or more, - // has the least height among all blocks on the fork. - // 8. (Property 2). The oldest block where the fork happened is never affected - // by Canonical Chain Switching and always stays on Canonical Chain. - // - // Overall: - // 1. GC procedure is handled by `clear_data()` function. - // 2. `clear_data()` runs GC process for all blocks from the Tail to GC Stop Height provided by Epoch Manager. - // 3. `clear_data()` executes separately: - // a. Forks Clearing runs for each height from Tail up to GC Stop Height. - // b. Canonical Chain Clearing from (Tail + 1) up to GC Stop Height. - // 4. Before actual clearing is started, Block Reference Map should be built. - // 5. `clear_data()` executes every time when block at new height is added. - // 6. In case of State Sync, State Sync Clearing happens. - // - // Forks Clearing: - // 1. Any fork which ends up on height `height` INCLUSIVELY and earlier will be completely deleted - // from the Store with all its ancestors up to the ancestor block where fork is happened - // EXCLUDING the ancestor block where fork is happened. - // 2. The oldest ancestor block always remains on the Canonical Chain by property 2. - // 3. All forks which end up on height `height + 1` and further are protected from deletion and - // no their ancestor will be deleted (even with lowest heights). - // 4. `clear_forks_data()` handles forks clearing for fixed height `height`. - // - // Canonical Chain Clearing: - // 1. Blocks on the Canonical Chain with the only descendant (if no forks started from them) - // are unlocked for Canonical Chain Clearing. - // 2. If Forks Clearing ended up on the Canonical Chain, the block may be unlocked - // for the Canonical Chain Clearing. There is no other reason to unlock the block exists. - // 3. All the unlocked blocks will be completely deleted - // from the Tail up to GC Stop Height EXCLUSIVELY. - // 4. (Property 3, GC invariant). Tail can be shifted safely to the height of the - // earliest existing block. There is always only one Tail (based on property 1) - // and it's always on the Canonical Chain (based on property 2). - // - // Example: - // - // height: 101 102 103 104 - // --------[A]---[B]---[C]---[D] - // \ \ - // \ \---[E] - // \ - // \-[F]---[G] - // - // 1. Let's define clearing height = 102. It this case fork A-F-G is protected from deletion - // because of G which is on height 103. Nothing will be deleted. - // 2. Let's define clearing height = 103. It this case Fork Clearing will be executed for A - // to delete blocks G and F, then Fork Clearing will be executed for B to delete block E. - // Then Canonical Chain Clearing will delete blocks A and B as unlocked. - // Block C is the only block of height 103 remains on the Canonical Chain (invariant). - // - // State Sync Clearing: - // 1. Executing State Sync means that no data in the storage is useful for block processing - // and should be removed completely. - // 2. The Tail should be set to the block preceding Sync Block. - // 3. All the data preceding new Tail is deleted in State Sync Clearing - // and the Trie is updated with having only Genesis data. - // 4. State Sync Clearing happens in `reset_data_pre_state_sync()`. - // - pub fn clear_data( - &mut self, - tries: ShardTries, - gc_config: &near_chain_configs::GCConfig, - ) -> Result<(), Error> { - let _span = tracing::debug_span!(target: "garbage_collection", "clear_data").entered(); - - let head = self.store.head()?; - let tail = self.store.tail()?; - let gc_stop_height = self.runtime_adapter.get_gc_stop_height(&head.last_block_hash); - if gc_stop_height > head.height { - return Err(Error::GCError("gc_stop_height cannot be larger than head.height".into())); - } - let prev_epoch_id = self.get_block_header(&head.prev_block_hash)?.epoch_id().clone(); - let epoch_change = prev_epoch_id != head.epoch_id; - let mut fork_tail = self.store.fork_tail()?; - metrics::TAIL_HEIGHT.set(tail as i64); - metrics::FORK_TAIL_HEIGHT.set(fork_tail as i64); - metrics::CHUNK_TAIL_HEIGHT.set(self.store.chunk_tail()? as i64); - metrics::GC_STOP_HEIGHT.set(gc_stop_height as i64); - if epoch_change && fork_tail < gc_stop_height { - // if head doesn't change on the epoch boundary, we may update fork tail several times - // but that is fine since it doesn't affect correctness and also we limit the number of - // heights that fork cleaning goes through so it doesn't slow down client either. - let mut chain_store_update = self.store.store_update(); - chain_store_update.update_fork_tail(gc_stop_height); - chain_store_update.commit()?; - fork_tail = gc_stop_height; - } - let mut gc_blocks_remaining = gc_config.gc_blocks_limit; - - // Forks Cleaning - let gc_fork_clean_step = gc_config.gc_fork_clean_step; - let stop_height = tail.max(fork_tail.saturating_sub(gc_fork_clean_step)); - for height in (stop_height..fork_tail).rev() { - self.clear_forks_data(tries.clone(), height, &mut gc_blocks_remaining)?; - if gc_blocks_remaining == 0 { - return Ok(()); - } - let mut chain_store_update = self.store.store_update(); - chain_store_update.update_fork_tail(height); - chain_store_update.commit()?; - } - - // Canonical Chain Clearing - for height in tail + 1..gc_stop_height { - if gc_blocks_remaining == 0 { - return Ok(()); - } - let blocks_current_height = self - .store - .get_all_block_hashes_by_height(height)? - .values() - .flatten() - .cloned() - .collect::>(); - let mut chain_store_update = self.store.store_update(); - if let Some(block_hash) = blocks_current_height.first() { - let prev_hash = *chain_store_update.get_block_header(block_hash)?.prev_hash(); - let prev_block_refcount = chain_store_update.get_block_refcount(&prev_hash)?; - if prev_block_refcount > 1 { - // Block of `prev_hash` starts a Fork, stopping - break; - } else if prev_block_refcount == 1 { - debug_assert_eq!(blocks_current_height.len(), 1); - chain_store_update.clear_block_data( - self.epoch_manager.as_ref(), - *block_hash, - GCMode::Canonical(tries.clone()), - )?; - chain_store_update.clear_resharding_data( - self.runtime_adapter.as_ref(), - self.epoch_manager.as_ref(), - *block_hash, - )?; - gc_blocks_remaining -= 1; - } else { - return Err(Error::GCError( - "block on canonical chain shouldn't have refcount 0".into(), - )); - } - } - chain_store_update.update_tail(height)?; - chain_store_update.commit()?; - } - Ok(()) - } - - /// Garbage collect data which archival node doesn’t need to keep. - /// - /// Normally, archival nodes keep all the data from the genesis block and - /// don’t run garbage collection. On the other hand, for better performance - /// the storage contains some data duplication, i.e. values in some of the - /// columns can be recomputed from data in different columns. To save on - /// storage, archival nodes do garbage collect that data. - /// - /// `gc_height_limit` limits how many heights will the function process. - pub fn clear_archive_data(&mut self, gc_height_limit: BlockHeightDelta) -> Result<(), Error> { - let _span = tracing::debug_span!(target: "chain", "clear_archive_data").entered(); - - let head = self.store.head()?; - let gc_stop_height = self.runtime_adapter.get_gc_stop_height(&head.last_block_hash); - if gc_stop_height > head.height { - return Err(Error::GCError("gc_stop_height cannot be larger than head.height".into())); - } - - let mut chain_store_update = self.store.store_update(); - chain_store_update.clear_redundant_chunk_data(gc_stop_height, gc_height_limit)?; - metrics::CHUNK_TAIL_HEIGHT.set(chain_store_update.chunk_tail()? as i64); - metrics::GC_STOP_HEIGHT.set(gc_stop_height as i64); - chain_store_update.commit() - } - - pub fn clear_forks_data( - &mut self, - tries: ShardTries, - height: BlockHeight, - gc_blocks_remaining: &mut NumBlocks, - ) -> Result<(), Error> { - let blocks_current_height = self - .store - .get_all_block_hashes_by_height(height)? - .values() - .flatten() - .cloned() - .collect::>(); - for block_hash in blocks_current_height.iter() { - let mut current_hash = *block_hash; - loop { - if *gc_blocks_remaining == 0 { - return Ok(()); - } - // Block `block_hash` is not on the Canonical Chain - // because shorter chain cannot be Canonical one - // and it may be safely deleted - // and all its ancestors while there are no other sibling blocks rely on it. - let mut chain_store_update = self.store.store_update(); - if chain_store_update.get_block_refcount(¤t_hash)? == 0 { - let prev_hash = - *chain_store_update.get_block_header(¤t_hash)?.prev_hash(); - - // It's safe to call `clear_block_data` for prev data because it clears fork only here - chain_store_update.clear_block_data( - self.epoch_manager.as_ref(), - current_hash, - GCMode::Fork(tries.clone()), - )?; - chain_store_update.commit()?; - *gc_blocks_remaining -= 1; - - current_hash = prev_hash; - } else { - // Block of `current_hash` is an ancestor for some other blocks, stopping - break; - } - } - } - - Ok(()) - } - fn maybe_mark_block_invalid(&mut self, block_hash: CryptoHash, error: &Error) { metrics::NUM_INVALID_BLOCKS.with_label_values(&[error.prometheus_label_value()]).inc(); // We only mark the block as invalid if the block has bad data (not for other errors that would @@ -1754,72 +1518,6 @@ impl Chain { } } - pub fn reset_data_pre_state_sync(&mut self, sync_hash: CryptoHash) -> Result<(), Error> { - let _span = tracing::debug_span!(target: "sync", "reset_data_pre_state_sync").entered(); - let head = self.head()?; - // Get header we were syncing into. - let header = self.get_block_header(&sync_hash)?; - let prev_hash = *header.prev_hash(); - let sync_height = header.height(); - let gc_height = std::cmp::min(head.height + 1, sync_height); - - // GC all the data from current tail up to `gc_height`. In case tail points to a height where - // there is no block, we need to make sure that the last block before tail is cleaned. - let tail = self.store.tail()?; - let mut tail_prev_block_cleaned = false; - for height in tail..gc_height { - let blocks_current_height = self - .store - .get_all_block_hashes_by_height(height)? - .values() - .flatten() - .cloned() - .collect::>(); - for block_hash in blocks_current_height { - let epoch_manager = self.epoch_manager.clone(); - let mut chain_store_update = self.mut_store().store_update(); - if !tail_prev_block_cleaned { - let prev_block_hash = - *chain_store_update.get_block_header(&block_hash)?.prev_hash(); - if chain_store_update.get_block(&prev_block_hash).is_ok() { - chain_store_update.clear_block_data( - epoch_manager.as_ref(), - prev_block_hash, - GCMode::StateSync { clear_block_info: true }, - )?; - } - tail_prev_block_cleaned = true; - } - chain_store_update.clear_block_data( - epoch_manager.as_ref(), - block_hash, - GCMode::StateSync { clear_block_info: block_hash != prev_hash }, - )?; - chain_store_update.commit()?; - } - } - - // Clear Chunks data - let mut chain_store_update = self.mut_store().store_update(); - // The largest height of chunk we have in storage is head.height + 1 - let chunk_height = std::cmp::min(head.height + 2, sync_height); - chain_store_update.clear_chunk_data_and_headers(chunk_height)?; - chain_store_update.commit()?; - - // clear all trie data - - let tries = self.runtime_adapter.get_tries(); - let mut chain_store_update = self.mut_store().store_update(); - let mut store_update = tries.store_update(); - store_update.delete_all(DBCol::State); - chain_store_update.merge(store_update); - - // The reason to reset tail here is not to allow Tail be greater than Head - chain_store_update.reset_tail(); - chain_store_update.commit()?; - Ok(()) - } - /// Set the new head after state sync was completed if it is indeed newer. /// Check for potentially unlocked orphans after this update. pub fn reset_heads_post_state_sync( diff --git a/chain/chain/src/garbage_collection.rs b/chain/chain/src/garbage_collection.rs new file mode 100644 index 00000000000..554c5e23fef --- /dev/null +++ b/chain/chain/src/garbage_collection.rs @@ -0,0 +1,1325 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::{fmt, io}; + +use near_chain_configs::GCConfig; +use near_chain_primitives::Error; +use near_epoch_manager::EpochManagerAdapter; +use near_primitives::block::Block; +use near_primitives::hash::CryptoHash; +use near_primitives::shard_layout::get_block_shard_uid; +use near_primitives::state_sync::{StateHeaderKey, StatePartKey}; +use near_primitives::types::{BlockHeight, BlockHeightDelta, EpochId, NumBlocks, ShardId}; +use near_primitives::utils::{get_block_shard_id, get_outcome_id_block_hash, index_to_bytes}; +use near_store::flat::store_helper; +use near_store::{DBCol, KeyForStateChanges, ShardTries, ShardUId}; + +use crate::types::RuntimeAdapter; +use crate::{metrics, Chain, ChainStoreAccess, ChainStoreUpdate}; + +#[derive(Clone)] +enum GCMode { + Fork(ShardTries), + Canonical(ShardTries), + StateSync { clear_block_info: bool }, +} + +impl fmt::Debug for GCMode { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + GCMode::Fork(_) => write!(f, "GCMode::Fork"), + GCMode::Canonical(_) => write!(f, "GCMode::Canonical"), + GCMode::StateSync { .. } => write!(f, "GCMode::StateSync"), + } + } +} + +impl Chain { + // GC CONTRACT + // === + // + // Prerequisites, guaranteed by the System: + // 1. Genesis block is available and should not be removed by GC. + // 2. No block in storage except Genesis has height lower or equal to `genesis_height`. + // 3. There is known lowest block height (Tail) came from Genesis or State Sync. + // a. Tail is always on the Canonical Chain. + // b. Only one Tail exists. + // c. Tail's height is higher than or equal to `genesis_height`, + // 4. There is a known highest block height (Head). + // a. Head is always on the Canonical Chain. + // 5. All blocks in the storage have heights in range [Tail; Head]. + // a. All forks end up on height of Head or lower. + // 6. If block A is ancestor of block B, height of A is strictly less then height of B. + // 7. (Property 1). A block with the lowest height among all the blocks at which the fork has started, + // i.e. all the blocks with the outgoing degree 2 or more, + // has the least height among all blocks on the fork. + // 8. (Property 2). The oldest block where the fork happened is never affected + // by Canonical Chain Switching and always stays on Canonical Chain. + // + // Overall: + // 1. GC procedure is handled by `clear_data()` function. + // 2. `clear_data()` runs GC process for all blocks from the Tail to GC Stop Height provided by Epoch Manager. + // 3. `clear_data()` executes separately: + // a. Forks Clearing runs for each height from Tail up to GC Stop Height. + // b. Canonical Chain Clearing from (Tail + 1) up to GC Stop Height. + // 4. Before actual clearing is started, Block Reference Map should be built. + // 5. `clear_data()` executes every time when block at new height is added. + // 6. In case of State Sync, State Sync Clearing happens. + // + // Forks Clearing: + // 1. Any fork which ends up on height `height` INCLUSIVELY and earlier will be completely deleted + // from the Store with all its ancestors up to the ancestor block where fork is happened + // EXCLUDING the ancestor block where fork is happened. + // 2. The oldest ancestor block always remains on the Canonical Chain by property 2. + // 3. All forks which end up on height `height + 1` and further are protected from deletion and + // no their ancestor will be deleted (even with lowest heights). + // 4. `clear_forks_data()` handles forks clearing for fixed height `height`. + // + // Canonical Chain Clearing: + // 1. Blocks on the Canonical Chain with the only descendant (if no forks started from them) + // are unlocked for Canonical Chain Clearing. + // 2. If Forks Clearing ended up on the Canonical Chain, the block may be unlocked + // for the Canonical Chain Clearing. There is no other reason to unlock the block exists. + // 3. All the unlocked blocks will be completely deleted + // from the Tail up to GC Stop Height EXCLUSIVELY. + // 4. (Property 3, GC invariant). Tail can be shifted safely to the height of the + // earliest existing block. There is always only one Tail (based on property 1) + // and it's always on the Canonical Chain (based on property 2). + // + // Example: + // + // height: 101 102 103 104 + // --------[A]---[B]---[C]---[D] + // \ \ + // \ \---[E] + // \ + // \-[F]---[G] + // + // 1. Let's define clearing height = 102. It this case fork A-F-G is protected from deletion + // because of G which is on height 103. Nothing will be deleted. + // 2. Let's define clearing height = 103. It this case Fork Clearing will be executed for A + // to delete blocks G and F, then Fork Clearing will be executed for B to delete block E. + // Then Canonical Chain Clearing will delete blocks A and B as unlocked. + // Block C is the only block of height 103 remains on the Canonical Chain (invariant). + // + // State Sync Clearing: + // 1. Executing State Sync means that no data in the storage is useful for block processing + // and should be removed completely. + // 2. The Tail should be set to the block preceding Sync Block. + // 3. All the data preceding new Tail is deleted in State Sync Clearing + // and the Trie is updated with having only Genesis data. + // 4. State Sync Clearing happens in `reset_data_pre_state_sync()`. + // + pub fn clear_data(&mut self, tries: ShardTries, gc_config: &GCConfig) -> Result<(), Error> { + let _span = tracing::debug_span!(target: "garbage_collection", "clear_data").entered(); + + let head = self.store().head()?; + let tail = self.store().tail()?; + let gc_stop_height = self.runtime_adapter.get_gc_stop_height(&head.last_block_hash); + if gc_stop_height > head.height { + return Err(Error::GCError("gc_stop_height cannot be larger than head.height".into())); + } + let prev_epoch_id = self.get_block_header(&head.prev_block_hash)?.epoch_id().clone(); + let epoch_change = prev_epoch_id != head.epoch_id; + let mut fork_tail = self.store().fork_tail()?; + metrics::TAIL_HEIGHT.set(tail as i64); + metrics::FORK_TAIL_HEIGHT.set(fork_tail as i64); + metrics::CHUNK_TAIL_HEIGHT.set(self.store().chunk_tail()? as i64); + metrics::GC_STOP_HEIGHT.set(gc_stop_height as i64); + if epoch_change && fork_tail < gc_stop_height { + // if head doesn't change on the epoch boundary, we may update fork tail several times + // but that is fine since it doesn't affect correctness and also we limit the number of + // heights that fork cleaning goes through so it doesn't slow down client either. + let mut chain_store_update = self.mut_store().store_update(); + chain_store_update.update_fork_tail(gc_stop_height); + chain_store_update.commit()?; + fork_tail = gc_stop_height; + } + let mut gc_blocks_remaining = gc_config.gc_blocks_limit; + + // Forks Cleaning + let gc_fork_clean_step = gc_config.gc_fork_clean_step; + let stop_height = tail.max(fork_tail.saturating_sub(gc_fork_clean_step)); + for height in (stop_height..fork_tail).rev() { + self.clear_forks_data(tries.clone(), height, &mut gc_blocks_remaining)?; + if gc_blocks_remaining == 0 { + return Ok(()); + } + let mut chain_store_update = self.mut_store().store_update(); + chain_store_update.update_fork_tail(height); + chain_store_update.commit()?; + } + + // Canonical Chain Clearing + for height in tail + 1..gc_stop_height { + if gc_blocks_remaining == 0 { + return Ok(()); + } + let blocks_current_height = self + .store() + .get_all_block_hashes_by_height(height)? + .values() + .flatten() + .cloned() + .collect::>(); + let epoch_manager = self.epoch_manager.clone(); + let runtime = self.runtime_adapter.clone(); + let mut chain_store_update = self.mut_store().store_update(); + if let Some(block_hash) = blocks_current_height.first() { + let prev_hash = *chain_store_update.get_block_header(block_hash)?.prev_hash(); + let prev_block_refcount = chain_store_update.get_block_refcount(&prev_hash)?; + if prev_block_refcount > 1 { + // Block of `prev_hash` starts a Fork, stopping + break; + } else if prev_block_refcount == 1 { + debug_assert_eq!(blocks_current_height.len(), 1); + chain_store_update.clear_block_data( + epoch_manager.as_ref(), + *block_hash, + GCMode::Canonical(tries.clone()), + )?; + chain_store_update.clear_resharding_data( + runtime.as_ref(), + epoch_manager.as_ref(), + *block_hash, + )?; + gc_blocks_remaining -= 1; + } else { + return Err(Error::GCError( + "block on canonical chain shouldn't have refcount 0".into(), + )); + } + } + chain_store_update.update_tail(height)?; + chain_store_update.commit()?; + } + Ok(()) + } + + /// Garbage collect data which archival node doesn’t need to keep. + /// + /// Normally, archival nodes keep all the data from the genesis block and + /// don’t run garbage collection. On the other hand, for better performance + /// the storage contains some data duplication, i.e. values in some of the + /// columns can be recomputed from data in different columns. To save on + /// storage, archival nodes do garbage collect that data. + /// + /// `gc_height_limit` limits how many heights will the function process. + pub fn clear_archive_data(&mut self, gc_height_limit: BlockHeightDelta) -> Result<(), Error> { + let _span = tracing::debug_span!(target: "chain", "clear_archive_data").entered(); + + let head = self.store().head()?; + let gc_stop_height = self.runtime_adapter.get_gc_stop_height(&head.last_block_hash); + if gc_stop_height > head.height { + return Err(Error::GCError("gc_stop_height cannot be larger than head.height".into())); + } + + let mut chain_store_update = self.mut_store().store_update(); + chain_store_update.clear_redundant_chunk_data(gc_stop_height, gc_height_limit)?; + metrics::CHUNK_TAIL_HEIGHT.set(chain_store_update.chunk_tail()? as i64); + metrics::GC_STOP_HEIGHT.set(gc_stop_height as i64); + chain_store_update.commit() + } + + fn clear_forks_data( + &mut self, + tries: ShardTries, + height: BlockHeight, + gc_blocks_remaining: &mut NumBlocks, + ) -> Result<(), Error> { + let blocks_current_height = self + .store() + .get_all_block_hashes_by_height(height)? + .values() + .flatten() + .cloned() + .collect::>(); + for block_hash in blocks_current_height.iter() { + let mut current_hash = *block_hash; + loop { + if *gc_blocks_remaining == 0 { + return Ok(()); + } + // Block `block_hash` is not on the Canonical Chain + // because shorter chain cannot be Canonical one + // and it may be safely deleted + // and all its ancestors while there are no other sibling blocks rely on it. + let epoch_manager = self.epoch_manager.clone(); + let mut chain_store_update = self.mut_store().store_update(); + if chain_store_update.get_block_refcount(¤t_hash)? == 0 { + let prev_hash = + *chain_store_update.get_block_header(¤t_hash)?.prev_hash(); + + // It's safe to call `clear_block_data` for prev data because it clears fork only here + chain_store_update.clear_block_data( + epoch_manager.as_ref(), + current_hash, + GCMode::Fork(tries.clone()), + )?; + chain_store_update.commit()?; + *gc_blocks_remaining -= 1; + + current_hash = prev_hash; + } else { + // Block of `current_hash` is an ancestor for some other blocks, stopping + break; + } + } + } + + Ok(()) + } + + pub fn reset_data_pre_state_sync(&mut self, sync_hash: CryptoHash) -> Result<(), Error> { + let _span = tracing::debug_span!(target: "sync", "reset_data_pre_state_sync").entered(); + let head = self.head()?; + // Get header we were syncing into. + let header = self.get_block_header(&sync_hash)?; + let prev_hash = *header.prev_hash(); + let sync_height = header.height(); + let gc_height = std::cmp::min(head.height + 1, sync_height); + + // GC all the data from current tail up to `gc_height`. In case tail points to a height where + // there is no block, we need to make sure that the last block before tail is cleaned. + let tail = self.store().tail()?; + let mut tail_prev_block_cleaned = false; + for height in tail..gc_height { + let blocks_current_height = self + .store() + .get_all_block_hashes_by_height(height)? + .values() + .flatten() + .cloned() + .collect::>(); + for block_hash in blocks_current_height { + let epoch_manager = self.epoch_manager.clone(); + let mut chain_store_update = self.mut_store().store_update(); + if !tail_prev_block_cleaned { + let prev_block_hash = + *chain_store_update.get_block_header(&block_hash)?.prev_hash(); + if chain_store_update.get_block(&prev_block_hash).is_ok() { + chain_store_update.clear_block_data( + epoch_manager.as_ref(), + prev_block_hash, + GCMode::StateSync { clear_block_info: true }, + )?; + } + tail_prev_block_cleaned = true; + } + chain_store_update.clear_block_data( + epoch_manager.as_ref(), + block_hash, + GCMode::StateSync { clear_block_info: block_hash != prev_hash }, + )?; + chain_store_update.commit()?; + } + } + + // Clear Chunks data + let mut chain_store_update = self.mut_store().store_update(); + // The largest height of chunk we have in storage is head.height + 1 + let chunk_height = std::cmp::min(head.height + 2, sync_height); + chain_store_update.clear_chunk_data_and_headers(chunk_height)?; + chain_store_update.commit()?; + + // clear all trie data + + let tries = self.runtime_adapter.get_tries(); + let mut chain_store_update = self.mut_store().store_update(); + let mut store_update = tries.store_update(); + store_update.delete_all(DBCol::State); + chain_store_update.merge(store_update); + + // The reason to reset tail here is not to allow Tail be greater than Head + chain_store_update.reset_tail(); + chain_store_update.commit()?; + Ok(()) + } +} + +impl<'a> ChainStoreUpdate<'a> { + fn clear_header_data_for_heights( + &mut self, + start: BlockHeight, + end: BlockHeight, + ) -> Result<(), Error> { + for height in start..=end { + let header_hashes = self.chain_store().get_all_header_hashes_by_height(height)?; + for header_hash in header_hashes { + // Delete header_hash-indexed data: block header + let mut store_update = self.store().store_update(); + let key: &[u8] = header_hash.as_bytes(); + store_update.delete(DBCol::BlockHeader, key); + self.chain_store().headers.pop(key); + self.merge(store_update); + } + let key = index_to_bytes(height); + self.gc_col(DBCol::HeaderHashesByHeight, &key); + } + Ok(()) + } + + fn clear_chunk_data_and_headers(&mut self, min_chunk_height: BlockHeight) -> Result<(), Error> { + let chunk_tail = self.chunk_tail()?; + for height in chunk_tail..min_chunk_height { + let chunk_hashes = self.chain_store().get_all_chunk_hashes_by_height(height)?; + for chunk_hash in chunk_hashes { + // 1. Delete chunk-related data + let chunk = self.get_chunk(&chunk_hash)?.clone(); + debug_assert_eq!(chunk.cloned_header().height_created(), height); + for transaction in chunk.transactions() { + self.gc_col(DBCol::Transactions, transaction.get_hash().as_bytes()); + } + for receipt in chunk.prev_outgoing_receipts() { + self.gc_col(DBCol::Receipts, receipt.get_hash().as_bytes()); + } + + // 2. Delete chunk_hash-indexed data + let chunk_hash = chunk_hash.as_bytes(); + self.gc_col(DBCol::Chunks, chunk_hash); + self.gc_col(DBCol::PartialChunks, chunk_hash); + self.gc_col(DBCol::InvalidChunks, chunk_hash); + } + + let header_hashes = self.chain_store().get_all_header_hashes_by_height(height)?; + for _header_hash in header_hashes { + // 3. Delete header_hash-indexed data + // TODO #3488: enable + //self.gc_col(DBCol::BlockHeader, header_hash.as_bytes()); + } + + // 4. Delete chunks_tail-related data + let key = index_to_bytes(height); + self.gc_col(DBCol::ChunkHashesByHeight, &key); + self.gc_col(DBCol::HeaderHashesByHeight, &key); + } + self.update_chunk_tail(min_chunk_height); + Ok(()) + } + + /// Clears chunk data which can be computed from other data in the storage. + /// + /// We are storing PartialEncodedChunk objects in the DBCol::PartialChunks in + /// the storage. However, those objects can be computed from data in + /// DBCol::Chunks and as such are redundant. For performance reasons we want to + /// keep that data when operating at head of the chain but the data can be + /// safely removed from archival storage. + /// + /// `gc_stop_height` indicates height starting from which no data should be + /// garbage collected. Roughly speaking this represents start of the ‘hot’ + /// data that we want to keep. + /// + /// `gt_height_limit` indicates limit of how many non-empty heights to + /// process. This limit means that the method may stop garbage collection + /// before reaching `gc_stop_height`. + fn clear_redundant_chunk_data( + &mut self, + gc_stop_height: BlockHeight, + gc_height_limit: BlockHeightDelta, + ) -> Result<(), Error> { + let mut height = self.chunk_tail()?; + let mut remaining = gc_height_limit; + while height < gc_stop_height && remaining > 0 { + let chunk_hashes = self.chain_store().get_all_chunk_hashes_by_height(height)?; + height += 1; + if !chunk_hashes.is_empty() { + remaining -= 1; + for chunk_hash in chunk_hashes { + let chunk_hash = chunk_hash.as_bytes(); + self.gc_col(DBCol::PartialChunks, chunk_hash); + // Data in DBCol::InvalidChunks isn’t technically redundant (it + // cannot be calculated from other data) but it is data we + // don’t need for anything so it can be deleted as well. + self.gc_col(DBCol::InvalidChunks, chunk_hash); + } + } + } + self.update_chunk_tail(height); + Ok(()) + } + + fn get_shard_uids_to_gc( + &mut self, + epoch_manager: &dyn EpochManagerAdapter, + block_hash: &CryptoHash, + ) -> Vec { + let block_header = self.get_block_header(block_hash).expect("block header must exist"); + let shard_layout = + epoch_manager.get_shard_layout(block_header.epoch_id()).expect("epoch info must exist"); + // gc shards in this epoch + let mut shard_uids_to_gc: Vec<_> = shard_layout.shard_uids().collect(); + // gc shards in the shard layout in the next epoch if shards will change in the next epoch + // Suppose shard changes at epoch T, we need to garbage collect the new shard layout + // from the last block in epoch T-2 to the last block in epoch T-1 + // Because we need to gc the last block in epoch T-2, we can't simply use + // block_header.epoch_id() as next_epoch_id + let next_epoch_id = block_header.next_epoch_id(); + let next_shard_layout = + epoch_manager.get_shard_layout(next_epoch_id).expect("epoch info must exist"); + if shard_layout != next_shard_layout { + shard_uids_to_gc.extend(next_shard_layout.shard_uids()); + } + shard_uids_to_gc + } + + /// GC trie state and flat state data after a resharding event + /// Most of the work happens on the last block of the epoch when resharding is COMPLETED + /// During GC, when we detect a change in shard layout, we can clear off all entries from + /// the parent shards + /// TODO(resharding): Need to clean remaining columns after resharding + fn clear_resharding_data( + &mut self, + runtime: &dyn RuntimeAdapter, + epoch_manager: &dyn EpochManagerAdapter, + block_hash: CryptoHash, + ) -> Result<(), Error> { + // Need to check if this is the last block of the epoch where resharding is completed + // which means shard layout changed in the previous epoch + if !epoch_manager.is_last_block_in_finished_epoch(&block_hash)? { + return Ok(()); + } + + // Since this code is related to GC, we need to be careful about accessing block_infos. Note + // that the BlockInfo exists for the current block_hash as it's not been GC'd yet. + // However, we need to use the block header to get the epoch_id and shard_layout for + // first_block_epoch_header and last_block_prev_epoch_hash as BlockInfo for these blocks is + // already GC'd while BlockHeader isn't GC'd. + let block_info = epoch_manager.get_block_info(&block_hash)?; + let first_block_epoch_hash = block_info.epoch_first_block(); + if first_block_epoch_hash == &CryptoHash::default() { + return Ok(()); + } + let first_block_epoch_header = self.get_block_header(first_block_epoch_hash)?; + let last_block_prev_epoch_header = + self.get_block_header(first_block_epoch_header.prev_hash())?; + + let epoch_id = first_block_epoch_header.epoch_id(); + let shard_layout = epoch_manager.get_shard_layout(epoch_id)?; + let prev_epoch_id = last_block_prev_epoch_header.epoch_id(); + let prev_shard_layout = epoch_manager.get_shard_layout(prev_epoch_id)?; + if shard_layout == prev_shard_layout { + return Ok(()); + } + + // Now we can proceed to removing the trie state and flat state + let mut store_update = self.store().store_update(); + for shard_uid in prev_shard_layout.shard_uids() { + tracing::info!(target: "garbage_collection", ?block_hash, ?shard_uid, "GC resharding"); + runtime.get_tries().delete_trie_for_shard(shard_uid, &mut store_update); + runtime + .get_flat_storage_manager() + .remove_flat_storage_for_shard(shard_uid, &mut store_update)?; + } + + self.merge(store_update); + Ok(()) + } + + // Clearing block data of `block_hash`, if on a fork. + // Clearing block data of `block_hash.prev`, if on the Canonical Chain. + fn clear_block_data( + &mut self, + epoch_manager: &dyn EpochManagerAdapter, + mut block_hash: CryptoHash, + gc_mode: GCMode, + ) -> Result<(), Error> { + let mut store_update = self.store().store_update(); + + tracing::info!(target: "garbage_collection", ?gc_mode, ?block_hash, "GC block_hash"); + + // 1. Apply revert insertions or deletions from DBCol::TrieChanges for Trie + { + let shard_uids_to_gc: Vec<_> = self.get_shard_uids_to_gc(epoch_manager, &block_hash); + match gc_mode.clone() { + GCMode::Fork(tries) => { + // If the block is on a fork, we delete the state that's the result of applying this block + for shard_uid in shard_uids_to_gc { + let trie_changes = self.store().get_ser( + DBCol::TrieChanges, + &get_block_shard_uid(&block_hash, &shard_uid), + )?; + if let Some(trie_changes) = trie_changes { + tries.revert_insertions(&trie_changes, shard_uid, &mut store_update); + self.gc_col( + DBCol::TrieChanges, + &get_block_shard_uid(&block_hash, &shard_uid), + ); + } + } + } + GCMode::Canonical(tries) => { + // If the block is on canonical chain, we delete the state that's before applying this block + for shard_uid in shard_uids_to_gc { + let trie_changes = self.store().get_ser( + DBCol::TrieChanges, + &get_block_shard_uid(&block_hash, &shard_uid), + )?; + if let Some(trie_changes) = trie_changes { + tries.apply_deletions(&trie_changes, shard_uid, &mut store_update); + self.gc_col( + DBCol::TrieChanges, + &get_block_shard_uid(&block_hash, &shard_uid), + ); + } + } + // Set `block_hash` on previous one + block_hash = *self.get_block_header(&block_hash)?.prev_hash(); + } + GCMode::StateSync { .. } => { + // Not apply the data from DBCol::TrieChanges + for shard_uid in shard_uids_to_gc { + self.gc_col( + DBCol::TrieChanges, + &get_block_shard_uid(&block_hash, &shard_uid), + ); + } + } + } + } + + let block = + self.get_block(&block_hash).expect("block data is not expected to be already cleaned"); + let height = block.header().height(); + + // 2. Delete shard_id-indexed data (Receipts, State Headers and Parts, etc.) + for shard_id in 0..block.header().chunk_mask().len() as ShardId { + let block_shard_id = get_block_shard_id(&block_hash, shard_id); + self.gc_outgoing_receipts(&block_hash, shard_id); + self.gc_col(DBCol::IncomingReceipts, &block_shard_id); + + // For incoming State Parts it's done in chain.clear_downloaded_parts() + // The following code is mostly for outgoing State Parts. + // However, if node crashes while State Syncing, it may never clear + // downloaded State parts in `clear_downloaded_parts`. + // We need to make sure all State Parts are removed. + if let Ok(shard_state_header) = + self.chain_store().get_state_header(shard_id, block_hash) + { + let state_num_parts = shard_state_header.num_state_parts(); + self.gc_col_state_parts(block_hash, shard_id, state_num_parts)?; + let key = borsh::to_vec(&StateHeaderKey(shard_id, block_hash))?; + self.gc_col(DBCol::StateHeaders, &key); + } + } + // gc DBCol::ChunkExtra based on shard_uid since it's indexed by shard_uid in the storage + for shard_uid in self.get_shard_uids_to_gc(epoch_manager, &block_hash) { + let block_shard_uid = get_block_shard_uid(&block_hash, &shard_uid); + self.gc_col(DBCol::ChunkExtra, &block_shard_uid); + } + + // 3. Delete block_hash-indexed data + self.gc_col(DBCol::Block, block_hash.as_bytes()); + self.gc_col(DBCol::BlockExtra, block_hash.as_bytes()); + self.gc_col(DBCol::NextBlockHashes, block_hash.as_bytes()); + self.gc_col(DBCol::ChallengedBlocks, block_hash.as_bytes()); + self.gc_col(DBCol::BlocksToCatchup, block_hash.as_bytes()); + let storage_key = KeyForStateChanges::for_block(&block_hash); + let stored_state_changes: Vec> = self + .store() + .iter_prefix(DBCol::StateChanges, storage_key.as_ref()) + .map(|item| item.map(|(key, _)| key)) + .collect::>>()?; + for key in stored_state_changes { + self.gc_col(DBCol::StateChanges, &key); + } + self.gc_col(DBCol::BlockRefCount, block_hash.as_bytes()); + self.gc_outcomes(&block)?; + match gc_mode { + GCMode::StateSync { clear_block_info: false } => {} + _ => self.gc_col(DBCol::BlockInfo, block_hash.as_bytes()), + } + self.gc_col(DBCol::StateDlInfos, block_hash.as_bytes()); + + // 4. Update or delete block_hash_per_height + self.gc_col_block_per_height(&block_hash, height, block.header().epoch_id())?; + + match gc_mode { + GCMode::Fork(_) => { + // 5. Forks only clearing + self.dec_block_refcount(block.header().prev_hash())?; + } + GCMode::Canonical(_) => { + // 6. Canonical Chain only clearing + // Delete chunks, chunk-indexed data and block headers + let mut min_chunk_height = self.tail()?; + for chunk_header in block.chunks().iter() { + if min_chunk_height > chunk_header.height_created() { + min_chunk_height = chunk_header.height_created(); + } + } + self.clear_chunk_data_and_headers(min_chunk_height)?; + } + GCMode::StateSync { .. } => { + // 7. State Sync clearing + // Chunks deleted separately + } + }; + self.merge(store_update); + Ok(()) + } + + // Delete all data in rocksdb that are partially or wholly indexed and can be looked up by hash of the current head of the chain + // and that indicates a link between current head and its prev block + pub fn clear_head_block_data( + &mut self, + epoch_manager: &dyn EpochManagerAdapter, + ) -> Result<(), Error> { + let header_head = self.header_head().unwrap(); + let header_head_height = header_head.height; + let block_hash = self.head().unwrap().last_block_hash; + + let block = + self.get_block(&block_hash).expect("block data is not expected to be already cleaned"); + + let epoch_id = block.header().epoch_id(); + + let head_height = block.header().height(); + + // 1. Delete shard_id-indexed data (TrieChanges, Receipts, ChunkExtra, State Headers and Parts, FlatStorage data) + for shard_id in 0..block.header().chunk_mask().len() as ShardId { + let shard_uid = epoch_manager.shard_id_to_uid(shard_id, epoch_id).unwrap(); + let block_shard_id = get_block_shard_uid(&block_hash, &shard_uid); + + // delete TrieChanges + self.gc_col(DBCol::TrieChanges, &block_shard_id); + + // delete Receipts + self.gc_outgoing_receipts(&block_hash, shard_id); + self.gc_col(DBCol::IncomingReceipts, &block_shard_id); + + // delete DBCol::ChunkExtra based on shard_uid since it's indexed by shard_uid in the storage + self.gc_col(DBCol::ChunkExtra, &block_shard_id); + + // delete state parts and state headers + if let Ok(shard_state_header) = + self.chain_store().get_state_header(shard_id, block_hash) + { + let state_num_parts = shard_state_header.num_state_parts(); + self.gc_col_state_parts(block_hash, shard_id, state_num_parts)?; + let state_header_key = borsh::to_vec(&StateHeaderKey(shard_id, block_hash))?; + self.gc_col(DBCol::StateHeaders, &state_header_key); + } + + // delete flat storage columns: FlatStateChanges and FlatStateDeltaMetadata + let mut store_update = self.store().store_update(); + store_helper::remove_delta(&mut store_update, shard_uid, block_hash); + self.merge(store_update); + } + + // 2. Delete block_hash-indexed data + self.gc_col(DBCol::Block, block_hash.as_bytes()); + self.gc_col(DBCol::BlockExtra, block_hash.as_bytes()); + self.gc_col(DBCol::NextBlockHashes, block_hash.as_bytes()); + self.gc_col(DBCol::ChallengedBlocks, block_hash.as_bytes()); + self.gc_col(DBCol::BlocksToCatchup, block_hash.as_bytes()); + let storage_key = KeyForStateChanges::for_block(&block_hash); + let stored_state_changes: Vec> = self + .store() + .iter_prefix(DBCol::StateChanges, storage_key.as_ref()) + .map(|item| item.map(|(key, _)| key)) + .collect::>>()?; + for key in stored_state_changes { + self.gc_col(DBCol::StateChanges, &key); + } + self.gc_col(DBCol::BlockRefCount, block_hash.as_bytes()); + self.gc_outcomes(&block)?; + self.gc_col(DBCol::BlockInfo, block_hash.as_bytes()); + self.gc_col(DBCol::StateDlInfos, block_hash.as_bytes()); + + // 3. update columns related to prev block (block refcount and NextBlockHashes) + self.dec_block_refcount(block.header().prev_hash())?; + self.gc_col(DBCol::NextBlockHashes, block.header().prev_hash().as_bytes()); + + // 4. Update or delete block_hash_per_height + self.gc_col_block_per_height(&block_hash, head_height, block.header().epoch_id())?; + + self.clear_chunk_data_at_height(head_height)?; + + self.clear_header_data_for_heights(head_height, header_head_height)?; + + Ok(()) + } + + fn clear_chunk_data_at_height(&mut self, height: BlockHeight) -> Result<(), Error> { + let chunk_hashes = self.chain_store().get_all_chunk_hashes_by_height(height)?; + for chunk_hash in chunk_hashes { + // 1. Delete chunk-related data + let chunk = self.get_chunk(&chunk_hash)?.clone(); + debug_assert_eq!(chunk.cloned_header().height_created(), height); + for transaction in chunk.transactions() { + self.gc_col(DBCol::Transactions, transaction.get_hash().as_bytes()); + } + for receipt in chunk.prev_outgoing_receipts() { + self.gc_col(DBCol::Receipts, receipt.get_hash().as_bytes()); + } + + // 2. Delete chunk_hash-indexed data + let chunk_hash = chunk_hash.as_bytes(); + self.gc_col(DBCol::Chunks, chunk_hash); + self.gc_col(DBCol::PartialChunks, chunk_hash); + self.gc_col(DBCol::InvalidChunks, chunk_hash); + } + + // 4. Delete chunk hashes per height + let key = index_to_bytes(height); + self.gc_col(DBCol::ChunkHashesByHeight, &key); + + Ok(()) + } + + fn gc_col_block_per_height( + &mut self, + block_hash: &CryptoHash, + height: BlockHeight, + epoch_id: &EpochId, + ) -> Result<(), Error> { + let mut store_update = self.store().store_update(); + let mut epoch_to_hashes = + HashMap::clone(self.chain_store().get_all_block_hashes_by_height(height)?.as_ref()); + let hashes = epoch_to_hashes.get_mut(epoch_id).ok_or_else(|| { + near_chain_primitives::Error::Other("current epoch id should exist".into()) + })?; + hashes.remove(block_hash); + if hashes.is_empty() { + epoch_to_hashes.remove(epoch_id); + } + let key = &index_to_bytes(height)[..]; + if epoch_to_hashes.is_empty() { + store_update.delete(DBCol::BlockPerHeight, key); + self.chain_store().block_hash_per_height.pop(key); + } else { + store_update.set_ser(DBCol::BlockPerHeight, key, &epoch_to_hashes)?; + self.chain_store().block_hash_per_height.put(key.to_vec(), Arc::new(epoch_to_hashes)); + } + if self.is_height_processed(height)? { + self.gc_col(DBCol::ProcessedBlockHeights, key); + } + self.merge(store_update); + Ok(()) + } + + pub fn gc_col_state_parts( + &mut self, + sync_hash: CryptoHash, + shard_id: ShardId, + num_parts: u64, + ) -> Result<(), Error> { + for part_id in 0..num_parts { + let key = borsh::to_vec(&StatePartKey(sync_hash, shard_id, part_id))?; + self.gc_col(DBCol::StateParts, &key); + } + Ok(()) + } + + fn gc_outgoing_receipts(&mut self, block_hash: &CryptoHash, shard_id: ShardId) { + let mut store_update = self.store().store_update(); + match self + .get_outgoing_receipts(block_hash, shard_id) + .map(|receipts| receipts.iter().map(|receipt| receipt.receipt_id).collect::>()) + { + Ok(receipt_ids) => { + for receipt_id in receipt_ids { + let key: Vec = receipt_id.into(); + store_update.decrement_refcount(DBCol::ReceiptIdToShardId, &key); + self.chain_store().receipt_id_to_shard_id.pop(&key); + } + } + Err(error) => { + match error { + Error::DBNotFoundErr(_) => { + // Sometimes we don't save outgoing receipts. See the usages of save_outgoing_receipt. + // The invariant is that DBCol::OutgoingReceipts has same receipts as DBCol::ReceiptIdToShardId. + } + _ => { + tracing::error!(target: "chain", "Error getting outgoing receipts for block {}, shard {}: {:?}", block_hash, shard_id, error); + } + } + } + } + + let key = get_block_shard_id(block_hash, shard_id); + store_update.delete(DBCol::OutgoingReceipts, &key); + self.chain_store().outgoing_receipts.pop(&key); + self.merge(store_update); + } + + fn gc_outcomes(&mut self, block: &Block) -> Result<(), Error> { + let block_hash = block.hash(); + let store_update = self.store().store_update(); + for chunk_header in + block.chunks().iter().filter(|h| h.height_included() == block.header().height()) + { + let shard_id = chunk_header.shard_id(); + let outcome_ids = + self.chain_store().get_outcomes_by_block_hash_and_shard_id(block_hash, shard_id)?; + for outcome_id in outcome_ids { + self.gc_col( + DBCol::TransactionResultForBlock, + &get_outcome_id_block_hash(&outcome_id, block_hash), + ); + } + self.gc_col(DBCol::OutcomeIds, &get_block_shard_id(block_hash, shard_id)); + } + self.merge(store_update); + Ok(()) + } + + fn gc_col(&mut self, col: DBCol, key: &[u8]) { + let mut store_update = self.store().store_update(); + match col { + DBCol::OutgoingReceipts => { + panic!("Must use gc_outgoing_receipts"); + } + DBCol::IncomingReceipts => { + store_update.delete(col, key); + self.chain_store().incoming_receipts.pop(key); + } + DBCol::StateHeaders => { + store_update.delete(col, key); + } + DBCol::BlockHeader => { + // TODO(#3488) At the moment header sync needs block headers. + // However, we want to eventually garbage collect headers. + // When that happens we should make sure that block headers is + // copied to the cold storage. + store_update.delete(col, key); + self.chain_store().headers.pop(key); + unreachable!(); + } + DBCol::Block => { + store_update.delete(col, key); + self.chain_store().blocks.pop(key); + } + DBCol::BlockExtra => { + store_update.delete(col, key); + self.chain_store().block_extras.pop(key); + } + DBCol::NextBlockHashes => { + store_update.delete(col, key); + self.chain_store().next_block_hashes.pop(key); + } + DBCol::ChallengedBlocks => { + store_update.delete(col, key); + } + DBCol::BlocksToCatchup => { + store_update.delete(col, key); + } + DBCol::StateChanges => { + store_update.delete(col, key); + } + DBCol::BlockRefCount => { + store_update.delete(col, key); + self.chain_store().block_refcounts.pop(key); + } + DBCol::ReceiptIdToShardId => { + panic!("Must use gc_outgoing_receipts"); + } + DBCol::Transactions => { + store_update.decrement_refcount(col, key); + self.chain_store().transactions.pop(key); + } + DBCol::Receipts => { + store_update.decrement_refcount(col, key); + self.chain_store().receipts.pop(key); + } + DBCol::Chunks => { + store_update.delete(col, key); + self.chain_store().chunks.pop(key); + } + DBCol::ChunkExtra => { + store_update.delete(col, key); + self.chain_store().chunk_extras.pop(key); + } + DBCol::PartialChunks => { + store_update.delete(col, key); + self.chain_store().partial_chunks.pop(key); + } + DBCol::InvalidChunks => { + store_update.delete(col, key); + self.chain_store().invalid_chunks.pop(key); + } + DBCol::ChunkHashesByHeight => { + store_update.delete(col, key); + } + DBCol::StateParts => { + store_update.delete(col, key); + } + DBCol::State => { + panic!("Actual gc happens elsewhere, call inc_gc_col_state to increase gc count"); + } + DBCol::TrieChanges => { + store_update.delete(col, key); + } + DBCol::BlockPerHeight => { + panic!("Must use gc_col_glock_per_height method to gc DBCol::BlockPerHeight"); + } + DBCol::TransactionResultForBlock => { + store_update.delete(col, key); + } + DBCol::OutcomeIds => { + store_update.delete(col, key); + } + DBCol::StateDlInfos => { + store_update.delete(col, key); + } + DBCol::BlockInfo => { + store_update.delete(col, key); + } + DBCol::ProcessedBlockHeights => { + store_update.delete(col, key); + self.chain_store().processed_block_heights.pop(key); + } + DBCol::HeaderHashesByHeight => { + store_update.delete(col, key); + } + DBCol::DbVersion + | DBCol::BlockMisc + | DBCol::_GCCount + | DBCol::BlockHeight // block sync needs it + genesis should be accessible + | DBCol::_Peers + | DBCol::RecentOutboundConnections + | DBCol::BlockMerkleTree + | DBCol::AccountAnnouncements + | DBCol::EpochLightClientBlocks + | DBCol::PeerComponent + | DBCol::LastComponentNonce + | DBCol::ComponentEdges + // https://github.com/nearprotocol/nearcore/pull/2952 + | DBCol::EpochInfo + | DBCol::EpochStart + | DBCol::EpochValidatorInfo + | DBCol::BlockOrdinal + | DBCol::_ChunkPerHeightShard + | DBCol::_NextBlockWithNewChunk + | DBCol::_LastBlockWithNewChunk + | DBCol::_TransactionRefCount + | DBCol::_TransactionResult + | DBCol::StateChangesForSplitStates + | DBCol::CachedContractCode + | DBCol::FlatState + | DBCol::FlatStateChanges + | DBCol::FlatStateDeltaMetadata + | DBCol::FlatStorageStatus + | DBCol::Misc + => unreachable!(), + #[cfg(feature = "new_epoch_sync")] + DBCol::EpochSyncInfo => unreachable!(), + } + self.merge(store_update); + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use near_chain_configs::{GCConfig, GenesisConfig}; + use near_epoch_manager::EpochManagerAdapter; + use near_primitives::block::{Block, Tip}; + use near_primitives::epoch_manager::block_info::BlockInfo; + use near_primitives::test_utils::{create_test_signer, TestBlockBuilder}; + use near_primitives::types::{BlockHeight, NumBlocks}; + use near_primitives::validator_signer::InMemoryValidatorSigner; + use near_store::DBCol; + + use crate::garbage_collection::GCMode; + use crate::test_utils::{get_chain, get_chain_with_epoch_length}; + use crate::{Chain, ChainStoreAccess, StoreValidator}; + + /// Test that garbage collection works properly. The blocks behind gc head should be garbage + /// collected while the blocks that are ahead of it should not. + #[test] + fn test_clear_old_data() { + let mut chain = get_chain_with_epoch_length(1); + let epoch_manager = chain.epoch_manager.clone(); + let genesis = chain.get_block_by_height(0).unwrap(); + let signer = Arc::new(create_test_signer("test1")); + let mut prev_block = genesis; + let mut blocks = vec![prev_block.clone()]; + for i in 1..15 { + add_block( + &mut chain, + epoch_manager.as_ref(), + &mut prev_block, + &mut blocks, + signer.clone(), + i, + ); + } + + let trie = chain.runtime_adapter.get_tries(); + chain.clear_data(trie, &GCConfig { gc_blocks_limit: 100, ..GCConfig::default() }).unwrap(); + + // epoch didn't change so no data is garbage collected. + for i in 0..15 { + println!("height = {} hash = {}", i, blocks[i].hash()); + if i < 8 { + assert!(chain.get_block(blocks[i].hash()).is_err()); + assert!(chain + .mut_store() + .get_all_block_hashes_by_height(i as BlockHeight) + .unwrap() + .is_empty()); + } else { + assert!(chain.get_block(blocks[i].hash()).is_ok()); + assert!(!chain + .mut_store() + .get_all_block_hashes_by_height(i as BlockHeight) + .unwrap() + .is_empty()); + } + } + } + + // Adds block to the chain at given height after prev_block. + fn add_block( + chain: &mut Chain, + epoch_manager: &dyn EpochManagerAdapter, + prev_block: &mut Block, + blocks: &mut Vec, + signer: Arc, + height: u64, + ) { + let next_epoch_id = epoch_manager + .get_next_epoch_id_from_prev_block(prev_block.hash()) + .expect("block must exist"); + let mut store_update = chain.mut_store().store_update(); + + let block = if next_epoch_id == *prev_block.header().next_epoch_id() { + TestBlockBuilder::new(&prev_block, signer).height(height).build() + } else { + let prev_hash = prev_block.hash(); + let epoch_id = prev_block.header().next_epoch_id().clone(); + let next_bp_hash = Chain::compute_bp_hash( + epoch_manager, + next_epoch_id.clone(), + epoch_id.clone(), + &prev_hash, + ) + .unwrap(); + TestBlockBuilder::new(&prev_block, signer) + .height(height) + .epoch_id(epoch_id) + .next_epoch_id(next_epoch_id) + .next_bp_hash(next_bp_hash) + .build() + }; + blocks.push(block.clone()); + store_update.save_block(block.clone()); + store_update.inc_block_refcount(block.header().prev_hash()).unwrap(); + store_update.save_block_header(block.header().clone()).unwrap(); + store_update.save_head(&Tip::from_header(block.header())).unwrap(); + store_update + .chain_store_cache_update + .height_to_hashes + .insert(height, Some(*block.header().hash())); + store_update.save_next_block_hash(prev_block.hash(), *block.hash()); + store_update.commit().unwrap(); + *prev_block = block.clone(); + } + + #[test] + fn test_clear_old_data_fixed_height() { + let mut chain = get_chain(); + let epoch_manager = chain.epoch_manager.clone(); + let genesis = chain.get_block_by_height(0).unwrap(); + let signer = Arc::new(create_test_signer("test1")); + let mut prev_block = genesis; + let mut blocks = vec![prev_block.clone()]; + for i in 1..10 { + add_block( + &mut chain, + epoch_manager.as_ref(), + &mut prev_block, + &mut blocks, + signer.clone(), + i, + ); + } + + assert!(chain.get_block(blocks[4].hash()).is_ok()); + assert!(chain.get_block(blocks[5].hash()).is_ok()); + assert!(chain.get_block(blocks[6].hash()).is_ok()); + assert!(chain.get_block_header(blocks[5].hash()).is_ok()); + assert_eq!( + chain + .mut_store() + .get_all_block_hashes_by_height(5) + .unwrap() + .values() + .flatten() + .collect::>(), + vec![blocks[5].hash()] + ); + assert!(chain.mut_store().get_next_block_hash(blocks[5].hash()).is_ok()); + + let trie = chain.runtime_adapter.get_tries(); + let mut store_update = chain.mut_store().store_update(); + assert!(store_update + .clear_block_data(epoch_manager.as_ref(), *blocks[5].hash(), GCMode::Canonical(trie)) + .is_ok()); + store_update.commit().unwrap(); + + assert!(chain.get_block(blocks[4].hash()).is_err()); + assert!(chain.get_block(blocks[5].hash()).is_ok()); + assert!(chain.get_block(blocks[6].hash()).is_ok()); + // block header should be available + assert!(chain.get_block_header(blocks[4].hash()).is_ok()); + assert!(chain.get_block_header(blocks[5].hash()).is_ok()); + assert!(chain.get_block_header(blocks[6].hash()).is_ok()); + assert!(chain.mut_store().get_all_block_hashes_by_height(4).unwrap().is_empty()); + assert!(!chain.mut_store().get_all_block_hashes_by_height(5).unwrap().is_empty()); + assert!(!chain.mut_store().get_all_block_hashes_by_height(6).unwrap().is_empty()); + assert!(chain.mut_store().get_next_block_hash(blocks[4].hash()).is_err()); + assert!(chain.mut_store().get_next_block_hash(blocks[5].hash()).is_ok()); + assert!(chain.mut_store().get_next_block_hash(blocks[6].hash()).is_ok()); + } + + /// Test that `gc_blocks_limit` works properly + #[test] + #[cfg_attr(not(feature = "expensive_tests"), ignore)] + fn test_clear_old_data_too_many_heights() { + for i in 1..5 { + println!("gc_blocks_limit == {:?}", i); + test_clear_old_data_too_many_heights_common(i); + } + test_clear_old_data_too_many_heights_common(25); + test_clear_old_data_too_many_heights_common(50); + test_clear_old_data_too_many_heights_common(87); + } + + fn test_clear_old_data_too_many_heights_common(gc_blocks_limit: NumBlocks) { + let mut chain = get_chain_with_epoch_length(1); + let genesis = chain.get_block_by_height(0).unwrap(); + let signer = Arc::new(create_test_signer("test1")); + let mut prev_block = genesis; + let mut blocks = vec![prev_block.clone()]; + { + let mut store_update = chain.store().store().store_update(); + let block_info = BlockInfo::default(); + store_update + .insert_ser(DBCol::BlockInfo, prev_block.hash().as_ref(), &block_info) + .unwrap(); + store_update.commit().unwrap(); + } + for i in 1..1000 { + let block = TestBlockBuilder::new(&prev_block, signer.clone()).height(i).build(); + blocks.push(block.clone()); + + let mut store_update = chain.mut_store().store_update(); + store_update.save_block(block.clone()); + store_update.inc_block_refcount(block.header().prev_hash()).unwrap(); + store_update.save_block_header(block.header().clone()).unwrap(); + store_update.save_head(&Tip::from_header(&block.header())).unwrap(); + { + let mut store_update = store_update.store().store_update(); + let block_info = BlockInfo::default(); + store_update + .insert_ser(DBCol::BlockInfo, block.hash().as_ref(), &block_info) + .unwrap(); + store_update.commit().unwrap(); + } + store_update + .chain_store_cache_update + .height_to_hashes + .insert(i, Some(*block.header().hash())); + store_update.save_next_block_hash(&prev_block.hash(), *block.hash()); + store_update.commit().unwrap(); + + prev_block = block.clone(); + } + + let trie = chain.runtime_adapter.get_tries(); + + for iter in 0..10 { + println!("ITERATION #{:?}", iter); + assert!(chain + .clear_data(trie.clone(), &GCConfig { gc_blocks_limit, ..GCConfig::default() }) + .is_ok()); + + // epoch didn't change so no data is garbage collected. + for i in 0..1000 { + if i < (iter + 1) * gc_blocks_limit as usize { + assert!(chain.get_block(&blocks[i].hash()).is_err()); + assert!(chain + .mut_store() + .get_all_block_hashes_by_height(i as BlockHeight) + .unwrap() + .is_empty()); + } else { + assert!(chain.get_block(&blocks[i].hash()).is_ok()); + assert!(!chain + .mut_store() + .get_all_block_hashes_by_height(i as BlockHeight) + .unwrap() + .is_empty()); + } + } + let mut genesis = GenesisConfig::default(); + genesis.genesis_height = 0; + let mut store_validator = StoreValidator::new( + None, + genesis.clone(), + chain.epoch_manager.clone(), + chain.shard_tracker.clone(), + chain.runtime_adapter.clone(), + chain.store().store().clone(), + false, + ); + store_validator.validate(); + println!("errors = {:?}", store_validator.errors); + assert!(!store_validator.is_failed()); + } + } + + #[test] + fn test_fork_chunk_tail_updates() { + let mut chain = get_chain(); + let epoch_manager = chain.epoch_manager.clone(); + let genesis = chain.get_block_by_height(0).unwrap(); + let signer = Arc::new(create_test_signer("test1")); + let mut prev_block = genesis; + let mut blocks = vec![prev_block.clone()]; + for i in 1..10 { + add_block( + &mut chain, + epoch_manager.as_ref(), + &mut prev_block, + &mut blocks, + signer.clone(), + i, + ); + } + assert_eq!(chain.tail().unwrap(), 0); + + { + let mut store_update = chain.mut_store().store_update(); + assert_eq!(store_update.tail().unwrap(), 0); + store_update.update_tail(1).unwrap(); + store_update.commit().unwrap(); + } + // Chunk tail should be auto updated to genesis (if not set) and fork_tail to the tail. + { + let store_update = chain.mut_store().store_update(); + assert_eq!(store_update.tail().unwrap(), 1); + assert_eq!(store_update.fork_tail().unwrap(), 1); + assert_eq!(store_update.chunk_tail().unwrap(), 0); + } + { + let mut store_update = chain.mut_store().store_update(); + store_update.update_fork_tail(3); + store_update.commit().unwrap(); + } + { + let mut store_update = chain.mut_store().store_update(); + store_update.update_tail(2).unwrap(); + store_update.commit().unwrap(); + } + { + let store_update = chain.mut_store().store_update(); + assert_eq!(store_update.tail().unwrap(), 2); + assert_eq!(store_update.fork_tail().unwrap(), 3); + assert_eq!(store_update.chunk_tail().unwrap(), 0); + } + } +} diff --git a/chain/chain/src/lib.rs b/chain/chain/src/lib.rs index 5e506ff65e9..88395957c13 100644 --- a/chain/chain/src/lib.rs +++ b/chain/chain/src/lib.rs @@ -15,6 +15,7 @@ pub mod chunks_store; pub mod crypto_hash_timer; mod doomslug; pub mod flat_storage_creator; +mod garbage_collection; mod lightclient; mod metrics; pub mod migrations; diff --git a/chain/chain/src/store.rs b/chain/chain/src/store.rs index 7ffa00f0bbf..27dca858e7a 100644 --- a/chain/chain/src/store.rs +++ b/chain/chain/src/store.rs @@ -1,6 +1,6 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; -use std::{fmt, io}; +use std::io; use borsh::{BorshDeserialize, BorshSerialize}; use chrono::Utc; @@ -23,8 +23,7 @@ use near_primitives::sharding::{ StateSyncInfo, }; use near_primitives::state_sync::{ - ReceiptProofResponse, ShardStateSyncResponseHeader, StateHeaderKey, StatePartKey, - StateSyncDumpProgress, + ReceiptProofResponse, ShardStateSyncResponseHeader, StateHeaderKey, StateSyncDumpProgress, }; use near_primitives::transaction::{ ExecutionOutcomeWithId, ExecutionOutcomeWithIdAndProof, ExecutionOutcomeWithProof, @@ -33,9 +32,8 @@ use near_primitives::transaction::{ use near_primitives::trie_key::{trie_key_parsers, TrieKey}; use near_primitives::types::chunk_extra::ChunkExtra; use near_primitives::types::{ - BlockExtra, BlockHeight, BlockHeightDelta, EpochId, NumBlocks, ShardId, StateChanges, - StateChangesExt, StateChangesForSplitStates, StateChangesKinds, StateChangesKindsExt, - StateChangesRequest, + BlockExtra, BlockHeight, EpochId, NumBlocks, ShardId, StateChanges, StateChangesExt, + StateChangesForSplitStates, StateChangesKinds, StateChangesKindsExt, StateChangesRequest, }; use near_primitives::utils::{ get_block_shard_id, get_outcome_id_block_hash, get_outcome_id_block_hash_rev, index_to_bytes, @@ -44,16 +42,15 @@ use near_primitives::utils::{ use near_primitives::version::ProtocolVersion; use near_primitives::views::LightClientBlockView; use near_store::{ - DBCol, KeyForStateChanges, ShardTries, Store, StoreUpdate, WrappedTrieChanges, CHUNK_TAIL_KEY, + DBCol, KeyForStateChanges, Store, StoreUpdate, WrappedTrieChanges, CHUNK_TAIL_KEY, FINAL_HEAD_KEY, FORK_TAIL_KEY, HEADER_HEAD_KEY, HEAD_KEY, LARGEST_TARGET_HEIGHT_KEY, LATEST_KNOWN_KEY, TAIL_KEY, }; use crate::byzantine_assert; use crate::chunks_store::ReadOnlyChunksStore; -use crate::types::{Block, BlockHeader, LatestKnown, RuntimeAdapter}; +use crate::types::{Block, BlockHeader, LatestKnown}; use near_store::db::{StoreStatistics, STATE_SYNC_DUMP_KEY}; -use near_store::flat::store_helper; use std::sync::Arc; /// lru cache size @@ -67,25 +64,10 @@ const CACHE_SIZE: usize = 1; #[cfg(feature = "no_cache")] const CHUNK_CACHE_SIZE: usize = 1; -#[derive(Clone)] -pub enum GCMode { - Fork(ShardTries), - Canonical(ShardTries), - StateSync { clear_block_info: bool }, -} - -impl fmt::Debug for GCMode { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - GCMode::Fork(_) => write!(f, "GCMode::Fork"), - GCMode::Canonical(_) => write!(f, "GCMode::Canonical"), - GCMode::StateSync { .. } => write!(f, "GCMode::StateSync"), - } - } -} - /// Accesses the chain store. Used to create atomic editable views that can be reverted. pub trait ChainStoreAccess { + /// Returns underlying chain store + fn chain_store(&self) -> &ChainStore; /// Returns underlaying store. fn store(&self) -> &Store; /// The chain head. @@ -422,45 +404,46 @@ pub struct ChainStore { /// Tail height of the chain, tail: Option, /// Cache with headers. - headers: CellLruCache, BlockHeader>, + pub(crate) headers: CellLruCache, BlockHeader>, /// Cache with blocks. - blocks: CellLruCache, Block>, + pub(crate) blocks: CellLruCache, Block>, /// Cache with chunks - chunks: CellLruCache, Arc>, + pub(crate) chunks: CellLruCache, Arc>, /// Cache with partial chunks - partial_chunks: CellLruCache, Arc>, + pub(crate) partial_chunks: CellLruCache, Arc>, /// Cache with block extra. - block_extras: CellLruCache, Arc>, + pub(crate) block_extras: CellLruCache, Arc>, /// Cache with chunk extra. - chunk_extras: CellLruCache, Arc>, + pub(crate) chunk_extras: CellLruCache, Arc>, /// Cache with height to hash on the main chain. height: CellLruCache, CryptoHash>, /// Cache with height to block hash on any chain. - block_hash_per_height: CellLruCache, Arc>>>, + pub(crate) block_hash_per_height: + CellLruCache, Arc>>>, /// Next block hashes for each block on the canonical chain - next_block_hashes: CellLruCache, CryptoHash>, + pub(crate) next_block_hashes: CellLruCache, CryptoHash>, /// Light client blocks corresponding to the last finalized block of each epoch epoch_light_client_blocks: CellLruCache, Arc>, /// Cache with outgoing receipts. - outgoing_receipts: CellLruCache, Arc>>, + pub(crate) outgoing_receipts: CellLruCache, Arc>>, /// Cache with incoming receipts. - incoming_receipts: CellLruCache, Arc>>, + pub(crate) incoming_receipts: CellLruCache, Arc>>, /// Invalid chunks. - invalid_chunks: CellLruCache, Arc>, + pub(crate) invalid_chunks: CellLruCache, Arc>, /// Mapping from receipt id to destination shard id - receipt_id_to_shard_id: CellLruCache, ShardId>, + pub(crate) receipt_id_to_shard_id: CellLruCache, ShardId>, /// Transactions - transactions: CellLruCache, Arc>, + pub(crate) transactions: CellLruCache, Arc>, /// Receipts - receipts: CellLruCache, Arc>, + pub(crate) receipts: CellLruCache, Arc>, /// Cache with Block Refcounts - block_refcounts: CellLruCache, u64>, + pub(crate) block_refcounts: CellLruCache, u64>, /// Cache of block hash -> block merkle tree at the current block block_merkle_tree: CellLruCache, Arc>, /// Cache of block ordinal to block hash. block_ordinal_to_hash: CellLruCache, CryptoHash>, /// Processed block heights. - processed_block_heights: CellLruCache, ()>, + pub(crate) processed_block_heights: CellLruCache, ()>, /// save_trie_changes should be set to true iff /// - archive if false - non-archival nodes need trie changes to perform garbage collection /// - archive is true, cold_store is configured and migration to split_storage is finished - node @@ -1114,6 +1097,10 @@ impl ChainStore { } impl ChainStoreAccess for ChainStore { + fn chain_store(&self) -> &ChainStore { + &self + } + fn store(&self) -> &Store { &self.store } @@ -1411,7 +1398,7 @@ impl ChainStoreAccess for ChainStore { /// Cache update for ChainStore #[derive(Default)] -struct ChainStoreCacheUpdate { +pub(crate) struct ChainStoreCacheUpdate { blocks: HashMap, headers: HashMap, block_extras: HashMap>, @@ -1419,7 +1406,7 @@ struct ChainStoreCacheUpdate { chunks: HashMap>, partial_chunks: HashMap>, block_hash_per_height: HashMap>>, - height_to_hashes: HashMap>, + pub(crate) height_to_hashes: HashMap>, next_block_hashes: HashMap, epoch_light_client_blocks: HashMap>, outgoing_receipts: HashMap<(CryptoHash, ShardId), Arc>>, @@ -1442,7 +1429,7 @@ pub struct ChainStoreUpdate<'a> { chain_store: &'a mut ChainStore, store_updates: Vec, /// Blocks added during this update. Takes ownership (unclear how to not do it because of failure exists). - chain_store_cache_update: ChainStoreCacheUpdate, + pub(crate) chain_store_cache_update: ChainStoreCacheUpdate, head: Option, tail: Option, chunk_tail: Option, @@ -1491,6 +1478,10 @@ impl<'a> ChainStoreUpdate<'a> { } impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> { + fn chain_store(&self) -> &ChainStore { + &self.chain_store + } + fn store(&self) -> &Store { &self.chain_store.store } @@ -1807,14 +1798,6 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> { } impl<'a> ChainStoreUpdate<'a> { - pub fn get_state_changes_for_split_states( - &self, - block_hash: &CryptoHash, - shard_id: ShardId, - ) -> Result { - self.chain_store.get_state_changes_for_split_states(block_hash, shard_id) - } - /// Update both header and block body head. pub fn save_head(&mut self, t: &Tip) -> Result<(), Error> { self.save_body_head(t)?; @@ -2200,679 +2183,6 @@ impl<'a> ChainStoreUpdate<'a> { self.chunk_tail = Some(height); } - fn clear_header_data_for_heights( - &mut self, - start: BlockHeight, - end: BlockHeight, - ) -> Result<(), Error> { - for height in start..=end { - let header_hashes = self.chain_store.get_all_header_hashes_by_height(height)?; - for header_hash in header_hashes { - // Delete header_hash-indexed data: block header - let mut store_update = self.store().store_update(); - let key: &[u8] = header_hash.as_bytes(); - store_update.delete(DBCol::BlockHeader, key); - self.chain_store.headers.pop(key); - self.merge(store_update); - } - let key = index_to_bytes(height); - self.gc_col(DBCol::HeaderHashesByHeight, &key); - } - Ok(()) - } - - pub fn clear_chunk_data_and_headers( - &mut self, - min_chunk_height: BlockHeight, - ) -> Result<(), Error> { - let chunk_tail = self.chunk_tail()?; - for height in chunk_tail..min_chunk_height { - let chunk_hashes = self.chain_store.get_all_chunk_hashes_by_height(height)?; - for chunk_hash in chunk_hashes { - // 1. Delete chunk-related data - let chunk = self.get_chunk(&chunk_hash)?.clone(); - debug_assert_eq!(chunk.cloned_header().height_created(), height); - for transaction in chunk.transactions() { - self.gc_col(DBCol::Transactions, transaction.get_hash().as_bytes()); - } - for receipt in chunk.prev_outgoing_receipts() { - self.gc_col(DBCol::Receipts, receipt.get_hash().as_bytes()); - } - - // 2. Delete chunk_hash-indexed data - let chunk_hash = chunk_hash.as_bytes(); - self.gc_col(DBCol::Chunks, chunk_hash); - self.gc_col(DBCol::PartialChunks, chunk_hash); - self.gc_col(DBCol::InvalidChunks, chunk_hash); - } - - let header_hashes = self.chain_store.get_all_header_hashes_by_height(height)?; - for _header_hash in header_hashes { - // 3. Delete header_hash-indexed data - // TODO #3488: enable - //self.gc_col(DBCol::BlockHeader, header_hash.as_bytes()); - } - - // 4. Delete chunks_tail-related data - let key = index_to_bytes(height); - self.gc_col(DBCol::ChunkHashesByHeight, &key); - self.gc_col(DBCol::HeaderHashesByHeight, &key); - } - self.update_chunk_tail(min_chunk_height); - Ok(()) - } - - /// Clears chunk data which can be computed from other data in the storage. - /// - /// We are storing PartialEncodedChunk objects in the DBCol::PartialChunks in - /// the storage. However, those objects can be computed from data in - /// DBCol::Chunks and as such are redundant. For performance reasons we want to - /// keep that data when operating at head of the chain but the data can be - /// safely removed from archival storage. - /// - /// `gc_stop_height` indicates height starting from which no data should be - /// garbage collected. Roughly speaking this represents start of the ‘hot’ - /// data that we want to keep. - /// - /// `gt_height_limit` indicates limit of how many non-empty heights to - /// process. This limit means that the method may stop garbage collection - /// before reaching `gc_stop_height`. - pub fn clear_redundant_chunk_data( - &mut self, - gc_stop_height: BlockHeight, - gc_height_limit: BlockHeightDelta, - ) -> Result<(), Error> { - let mut height = self.chunk_tail()?; - let mut remaining = gc_height_limit; - while height < gc_stop_height && remaining > 0 { - let chunk_hashes = self.chain_store.get_all_chunk_hashes_by_height(height)?; - height += 1; - if !chunk_hashes.is_empty() { - remaining -= 1; - for chunk_hash in chunk_hashes { - let chunk_hash = chunk_hash.as_bytes(); - self.gc_col(DBCol::PartialChunks, chunk_hash); - // Data in DBCol::InvalidChunks isn’t technically redundant (it - // cannot be calculated from other data) but it is data we - // don’t need for anything so it can be deleted as well. - self.gc_col(DBCol::InvalidChunks, chunk_hash); - } - } - } - self.update_chunk_tail(height); - Ok(()) - } - - fn get_shard_uids_to_gc( - &mut self, - epoch_manager: &dyn EpochManagerAdapter, - block_hash: &CryptoHash, - ) -> Vec { - let block_header = self.get_block_header(block_hash).expect("block header must exist"); - let shard_layout = - epoch_manager.get_shard_layout(block_header.epoch_id()).expect("epoch info must exist"); - // gc shards in this epoch - let mut shard_uids_to_gc: Vec<_> = shard_layout.shard_uids().collect(); - // gc shards in the shard layout in the next epoch if shards will change in the next epoch - // Suppose shard changes at epoch T, we need to garbage collect the new shard layout - // from the last block in epoch T-2 to the last block in epoch T-1 - // Because we need to gc the last block in epoch T-2, we can't simply use - // block_header.epoch_id() as next_epoch_id - let next_epoch_id = block_header.next_epoch_id(); - let next_shard_layout = - epoch_manager.get_shard_layout(next_epoch_id).expect("epoch info must exist"); - if shard_layout != next_shard_layout { - shard_uids_to_gc.extend(next_shard_layout.shard_uids()); - } - shard_uids_to_gc - } - - /// GC trie state and flat state data after a resharding event - /// Most of the work happens on the last block of the epoch when resharding is COMPLETED - /// During GC, when we detect a change in shard layout, we can clear off all entries from - /// the parent shards - /// TODO(resharding): Need to clean remaining columns after resharding - pub fn clear_resharding_data( - &mut self, - runtime: &dyn RuntimeAdapter, - epoch_manager: &dyn EpochManagerAdapter, - block_hash: CryptoHash, - ) -> Result<(), Error> { - // Need to check if this is the last block of the epoch where resharding is completed - // which means shard layout changed in the previous epoch - if !epoch_manager.is_last_block_in_finished_epoch(&block_hash)? { - return Ok(()); - } - - // Since this code is related to GC, we need to be careful about accessing block_infos. Note - // that the BlockInfo exists for the current block_hash as it's not been GC'd yet. - // However, we need to use the block header to get the epoch_id and shard_layout for - // first_block_epoch_header and last_block_prev_epoch_hash as BlockInfo for these blocks is - // already GC'd while BlockHeader isn't GC'd. - let block_info = epoch_manager.get_block_info(&block_hash)?; - let first_block_epoch_hash = block_info.epoch_first_block(); - if first_block_epoch_hash == &CryptoHash::default() { - return Ok(()); - } - let first_block_epoch_header = self.get_block_header(first_block_epoch_hash)?; - let last_block_prev_epoch_header = - self.get_block_header(first_block_epoch_header.prev_hash())?; - - let epoch_id = first_block_epoch_header.epoch_id(); - let shard_layout = epoch_manager.get_shard_layout(epoch_id)?; - let prev_epoch_id = last_block_prev_epoch_header.epoch_id(); - let prev_shard_layout = epoch_manager.get_shard_layout(prev_epoch_id)?; - if shard_layout == prev_shard_layout { - return Ok(()); - } - - // Now we can proceed to removing the trie state and flat state - let mut store_update = self.store().store_update(); - for shard_uid in prev_shard_layout.shard_uids() { - tracing::info!(target: "garbage_collection", ?block_hash, ?shard_uid, "GC resharding"); - runtime.get_tries().delete_trie_for_shard(shard_uid, &mut store_update); - runtime - .get_flat_storage_manager() - .remove_flat_storage_for_shard(shard_uid, &mut store_update)?; - } - - self.merge(store_update); - Ok(()) - } - - // Clearing block data of `block_hash`, if on a fork. - // Clearing block data of `block_hash.prev`, if on the Canonical Chain. - pub fn clear_block_data( - &mut self, - epoch_manager: &dyn EpochManagerAdapter, - mut block_hash: CryptoHash, - gc_mode: GCMode, - ) -> Result<(), Error> { - let mut store_update = self.store().store_update(); - - tracing::info!(target: "garbage_collection", ?gc_mode, ?block_hash, "GC block_hash"); - - // 1. Apply revert insertions or deletions from DBCol::TrieChanges for Trie - { - let shard_uids_to_gc: Vec<_> = self.get_shard_uids_to_gc(epoch_manager, &block_hash); - match gc_mode.clone() { - GCMode::Fork(tries) => { - // If the block is on a fork, we delete the state that's the result of applying this block - for shard_uid in shard_uids_to_gc { - let trie_changes = self.store().get_ser( - DBCol::TrieChanges, - &get_block_shard_uid(&block_hash, &shard_uid), - )?; - if let Some(trie_changes) = trie_changes { - tries.revert_insertions(&trie_changes, shard_uid, &mut store_update); - self.gc_col( - DBCol::TrieChanges, - &get_block_shard_uid(&block_hash, &shard_uid), - ); - } - } - } - GCMode::Canonical(tries) => { - // If the block is on canonical chain, we delete the state that's before applying this block - for shard_uid in shard_uids_to_gc { - let trie_changes = self.store().get_ser( - DBCol::TrieChanges, - &get_block_shard_uid(&block_hash, &shard_uid), - )?; - if let Some(trie_changes) = trie_changes { - tries.apply_deletions(&trie_changes, shard_uid, &mut store_update); - self.gc_col( - DBCol::TrieChanges, - &get_block_shard_uid(&block_hash, &shard_uid), - ); - } - } - // Set `block_hash` on previous one - block_hash = *self.get_block_header(&block_hash)?.prev_hash(); - } - GCMode::StateSync { .. } => { - // Not apply the data from DBCol::TrieChanges - for shard_uid in shard_uids_to_gc { - self.gc_col( - DBCol::TrieChanges, - &get_block_shard_uid(&block_hash, &shard_uid), - ); - } - } - } - } - - let block = - self.get_block(&block_hash).expect("block data is not expected to be already cleaned"); - let height = block.header().height(); - - // 2. Delete shard_id-indexed data (Receipts, State Headers and Parts, etc.) - for shard_id in 0..block.header().chunk_mask().len() as ShardId { - let block_shard_id = get_block_shard_id(&block_hash, shard_id); - self.gc_outgoing_receipts(&block_hash, shard_id); - self.gc_col(DBCol::IncomingReceipts, &block_shard_id); - - // For incoming State Parts it's done in chain.clear_downloaded_parts() - // The following code is mostly for outgoing State Parts. - // However, if node crashes while State Syncing, it may never clear - // downloaded State parts in `clear_downloaded_parts`. - // We need to make sure all State Parts are removed. - if let Ok(shard_state_header) = self.chain_store.get_state_header(shard_id, block_hash) - { - let state_num_parts = shard_state_header.num_state_parts(); - self.gc_col_state_parts(block_hash, shard_id, state_num_parts)?; - let key = borsh::to_vec(&StateHeaderKey(shard_id, block_hash))?; - self.gc_col(DBCol::StateHeaders, &key); - } - } - // gc DBCol::ChunkExtra based on shard_uid since it's indexed by shard_uid in the storage - for shard_uid in self.get_shard_uids_to_gc(epoch_manager, &block_hash) { - let block_shard_uid = get_block_shard_uid(&block_hash, &shard_uid); - self.gc_col(DBCol::ChunkExtra, &block_shard_uid); - } - - // 3. Delete block_hash-indexed data - self.gc_col(DBCol::Block, block_hash.as_bytes()); - self.gc_col(DBCol::BlockExtra, block_hash.as_bytes()); - self.gc_col(DBCol::NextBlockHashes, block_hash.as_bytes()); - self.gc_col(DBCol::ChallengedBlocks, block_hash.as_bytes()); - self.gc_col(DBCol::BlocksToCatchup, block_hash.as_bytes()); - let storage_key = KeyForStateChanges::for_block(&block_hash); - let stored_state_changes: Vec> = self - .chain_store - .store() - .iter_prefix(DBCol::StateChanges, storage_key.as_ref()) - .map(|item| item.map(|(key, _)| key)) - .collect::>>()?; - for key in stored_state_changes { - self.gc_col(DBCol::StateChanges, &key); - } - self.gc_col(DBCol::BlockRefCount, block_hash.as_bytes()); - self.gc_outcomes(&block)?; - match gc_mode { - GCMode::StateSync { clear_block_info: false } => {} - _ => self.gc_col(DBCol::BlockInfo, block_hash.as_bytes()), - } - self.gc_col(DBCol::StateDlInfos, block_hash.as_bytes()); - - // 4. Update or delete block_hash_per_height - self.gc_col_block_per_height(&block_hash, height, block.header().epoch_id())?; - - match gc_mode { - GCMode::Fork(_) => { - // 5. Forks only clearing - self.dec_block_refcount(block.header().prev_hash())?; - } - GCMode::Canonical(_) => { - // 6. Canonical Chain only clearing - // Delete chunks, chunk-indexed data and block headers - let mut min_chunk_height = self.tail()?; - for chunk_header in block.chunks().iter() { - if min_chunk_height > chunk_header.height_created() { - min_chunk_height = chunk_header.height_created(); - } - } - self.clear_chunk_data_and_headers(min_chunk_height)?; - } - GCMode::StateSync { .. } => { - // 7. State Sync clearing - // Chunks deleted separately - } - }; - self.merge(store_update); - Ok(()) - } - - // Delete all data in rocksdb that are partially or wholly indexed and can be looked up by hash of the current head of the chain - // and that indicates a link between current head and its prev block - pub fn clear_head_block_data( - &mut self, - epoch_manager: &dyn EpochManagerAdapter, - ) -> Result<(), Error> { - let header_head = self.header_head().unwrap(); - let header_head_height = header_head.height; - let block_hash = self.head().unwrap().last_block_hash; - - let block = - self.get_block(&block_hash).expect("block data is not expected to be already cleaned"); - - let epoch_id = block.header().epoch_id(); - - let head_height = block.header().height(); - - // 1. Delete shard_id-indexed data (TrieChanges, Receipts, ChunkExtra, State Headers and Parts, FlatStorage data) - for shard_id in 0..block.header().chunk_mask().len() as ShardId { - let shard_uid = epoch_manager.shard_id_to_uid(shard_id, epoch_id).unwrap(); - let block_shard_id = get_block_shard_uid(&block_hash, &shard_uid); - - // delete TrieChanges - self.gc_col(DBCol::TrieChanges, &block_shard_id); - - // delete Receipts - self.gc_outgoing_receipts(&block_hash, shard_id); - self.gc_col(DBCol::IncomingReceipts, &block_shard_id); - - // delete DBCol::ChunkExtra based on shard_uid since it's indexed by shard_uid in the storage - self.gc_col(DBCol::ChunkExtra, &block_shard_id); - - // delete state parts and state headers - if let Ok(shard_state_header) = self.chain_store.get_state_header(shard_id, block_hash) - { - let state_num_parts = shard_state_header.num_state_parts(); - self.gc_col_state_parts(block_hash, shard_id, state_num_parts)?; - let state_header_key = borsh::to_vec(&StateHeaderKey(shard_id, block_hash))?; - self.gc_col(DBCol::StateHeaders, &state_header_key); - } - - // delete flat storage columns: FlatStateChanges and FlatStateDeltaMetadata - let mut store_update = self.store().store_update(); - store_helper::remove_delta(&mut store_update, shard_uid, block_hash); - self.merge(store_update); - } - - // 2. Delete block_hash-indexed data - self.gc_col(DBCol::Block, block_hash.as_bytes()); - self.gc_col(DBCol::BlockExtra, block_hash.as_bytes()); - self.gc_col(DBCol::NextBlockHashes, block_hash.as_bytes()); - self.gc_col(DBCol::ChallengedBlocks, block_hash.as_bytes()); - self.gc_col(DBCol::BlocksToCatchup, block_hash.as_bytes()); - let storage_key = KeyForStateChanges::for_block(&block_hash); - let stored_state_changes: Vec> = self - .chain_store - .store() - .iter_prefix(DBCol::StateChanges, storage_key.as_ref()) - .map(|item| item.map(|(key, _)| key)) - .collect::>>()?; - for key in stored_state_changes { - self.gc_col(DBCol::StateChanges, &key); - } - self.gc_col(DBCol::BlockRefCount, block_hash.as_bytes()); - self.gc_outcomes(&block)?; - self.gc_col(DBCol::BlockInfo, block_hash.as_bytes()); - self.gc_col(DBCol::StateDlInfos, block_hash.as_bytes()); - - // 3. update columns related to prev block (block refcount and NextBlockHashes) - self.dec_block_refcount(block.header().prev_hash())?; - self.gc_col(DBCol::NextBlockHashes, block.header().prev_hash().as_bytes()); - - // 4. Update or delete block_hash_per_height - self.gc_col_block_per_height(&block_hash, head_height, block.header().epoch_id())?; - - self.clear_chunk_data_at_height(head_height)?; - - self.clear_header_data_for_heights(head_height, header_head_height)?; - - Ok(()) - } - - fn clear_chunk_data_at_height(&mut self, height: BlockHeight) -> Result<(), Error> { - let chunk_hashes = self.chain_store.get_all_chunk_hashes_by_height(height)?; - for chunk_hash in chunk_hashes { - // 1. Delete chunk-related data - let chunk = self.get_chunk(&chunk_hash)?.clone(); - debug_assert_eq!(chunk.cloned_header().height_created(), height); - for transaction in chunk.transactions() { - self.gc_col(DBCol::Transactions, transaction.get_hash().as_bytes()); - } - for receipt in chunk.prev_outgoing_receipts() { - self.gc_col(DBCol::Receipts, receipt.get_hash().as_bytes()); - } - - // 2. Delete chunk_hash-indexed data - let chunk_hash = chunk_hash.as_bytes(); - self.gc_col(DBCol::Chunks, chunk_hash); - self.gc_col(DBCol::PartialChunks, chunk_hash); - self.gc_col(DBCol::InvalidChunks, chunk_hash); - } - - // 4. Delete chunk hashes per height - let key = index_to_bytes(height); - self.gc_col(DBCol::ChunkHashesByHeight, &key); - - Ok(()) - } - - pub fn gc_col_block_per_height( - &mut self, - block_hash: &CryptoHash, - height: BlockHeight, - epoch_id: &EpochId, - ) -> Result<(), Error> { - let mut store_update = self.store().store_update(); - let mut epoch_to_hashes = - HashMap::clone(self.chain_store.get_all_block_hashes_by_height(height)?.as_ref()); - let hashes = epoch_to_hashes.get_mut(epoch_id).ok_or_else(|| { - near_chain_primitives::Error::Other("current epoch id should exist".into()) - })?; - hashes.remove(block_hash); - if hashes.is_empty() { - epoch_to_hashes.remove(epoch_id); - } - let key = &index_to_bytes(height)[..]; - if epoch_to_hashes.is_empty() { - store_update.delete(DBCol::BlockPerHeight, key); - self.chain_store.block_hash_per_height.pop(key); - } else { - store_update.set_ser(DBCol::BlockPerHeight, key, &epoch_to_hashes)?; - self.chain_store.block_hash_per_height.put(key.to_vec(), Arc::new(epoch_to_hashes)); - } - if self.is_height_processed(height)? { - self.gc_col(DBCol::ProcessedBlockHeights, key); - } - self.merge(store_update); - Ok(()) - } - - pub fn gc_col_state_parts( - &mut self, - sync_hash: CryptoHash, - shard_id: ShardId, - num_parts: u64, - ) -> Result<(), Error> { - for part_id in 0..num_parts { - let key = borsh::to_vec(&StatePartKey(sync_hash, shard_id, part_id))?; - self.gc_col(DBCol::StateParts, &key); - } - Ok(()) - } - - pub fn gc_outgoing_receipts(&mut self, block_hash: &CryptoHash, shard_id: ShardId) { - let mut store_update = self.store().store_update(); - match self - .get_outgoing_receipts(block_hash, shard_id) - .map(|receipts| receipts.iter().map(|receipt| receipt.receipt_id).collect::>()) - { - Ok(receipt_ids) => { - for receipt_id in receipt_ids { - let key: Vec = receipt_id.into(); - store_update.decrement_refcount(DBCol::ReceiptIdToShardId, &key); - self.chain_store.receipt_id_to_shard_id.pop(&key); - } - } - Err(error) => { - match error { - Error::DBNotFoundErr(_) => { - // Sometimes we don't save outgoing receipts. See the usages of save_outgoing_receipt. - // The invariant is that DBCol::OutgoingReceipts has same receipts as DBCol::ReceiptIdToShardId. - } - _ => { - tracing::error!(target: "chain", "Error getting outgoing receipts for block {}, shard {}: {:?}", block_hash, shard_id, error); - } - } - } - } - - let key = get_block_shard_id(block_hash, shard_id); - store_update.delete(DBCol::OutgoingReceipts, &key); - self.chain_store.outgoing_receipts.pop(&key); - self.merge(store_update); - } - - pub fn gc_outcomes(&mut self, block: &Block) -> Result<(), Error> { - let block_hash = block.hash(); - let store_update = self.store().store_update(); - for chunk_header in - block.chunks().iter().filter(|h| h.height_included() == block.header().height()) - { - let shard_id = chunk_header.shard_id(); - let outcome_ids = - self.chain_store.get_outcomes_by_block_hash_and_shard_id(block_hash, shard_id)?; - for outcome_id in outcome_ids { - self.gc_col( - DBCol::TransactionResultForBlock, - &get_outcome_id_block_hash(&outcome_id, block_hash), - ); - } - self.gc_col(DBCol::OutcomeIds, &get_block_shard_id(block_hash, shard_id)); - } - self.merge(store_update); - Ok(()) - } - - fn gc_col(&mut self, col: DBCol, key: &[u8]) { - let mut store_update = self.store().store_update(); - match col { - DBCol::OutgoingReceipts => { - panic!("Must use gc_outgoing_receipts"); - } - DBCol::IncomingReceipts => { - store_update.delete(col, key); - self.chain_store.incoming_receipts.pop(key); - } - DBCol::StateHeaders => { - store_update.delete(col, key); - } - DBCol::BlockHeader => { - // TODO(#3488) At the moment header sync needs block headers. - // However, we want to eventually garbage collect headers. - // When that happens we should make sure that block headers is - // copied to the cold storage. - store_update.delete(col, key); - self.chain_store.headers.pop(key); - unreachable!(); - } - DBCol::Block => { - store_update.delete(col, key); - self.chain_store.blocks.pop(key); - } - DBCol::BlockExtra => { - store_update.delete(col, key); - self.chain_store.block_extras.pop(key); - } - DBCol::NextBlockHashes => { - store_update.delete(col, key); - self.chain_store.next_block_hashes.pop(key); - } - DBCol::ChallengedBlocks => { - store_update.delete(col, key); - } - DBCol::BlocksToCatchup => { - store_update.delete(col, key); - } - DBCol::StateChanges => { - store_update.delete(col, key); - } - DBCol::BlockRefCount => { - store_update.delete(col, key); - self.chain_store.block_refcounts.pop(key); - } - DBCol::ReceiptIdToShardId => { - panic!("Must use gc_outgoing_receipts"); - } - DBCol::Transactions => { - store_update.decrement_refcount(col, key); - self.chain_store.transactions.pop(key); - } - DBCol::Receipts => { - store_update.decrement_refcount(col, key); - self.chain_store.receipts.pop(key); - } - DBCol::Chunks => { - store_update.delete(col, key); - self.chain_store.chunks.pop(key); - } - DBCol::ChunkExtra => { - store_update.delete(col, key); - self.chain_store.chunk_extras.pop(key); - } - DBCol::PartialChunks => { - store_update.delete(col, key); - self.chain_store.partial_chunks.pop(key); - } - DBCol::InvalidChunks => { - store_update.delete(col, key); - self.chain_store.invalid_chunks.pop(key); - } - DBCol::ChunkHashesByHeight => { - store_update.delete(col, key); - } - DBCol::StateParts => { - store_update.delete(col, key); - } - DBCol::State => { - panic!("Actual gc happens elsewhere, call inc_gc_col_state to increase gc count"); - } - DBCol::TrieChanges => { - store_update.delete(col, key); - } - DBCol::BlockPerHeight => { - panic!("Must use gc_col_glock_per_height method to gc DBCol::BlockPerHeight"); - } - DBCol::TransactionResultForBlock => { - store_update.delete(col, key); - } - DBCol::OutcomeIds => { - store_update.delete(col, key); - } - DBCol::StateDlInfos => { - store_update.delete(col, key); - } - DBCol::BlockInfo => { - store_update.delete(col, key); - } - DBCol::ProcessedBlockHeights => { - store_update.delete(col, key); - self.chain_store.processed_block_heights.pop(key); - } - DBCol::HeaderHashesByHeight => { - store_update.delete(col, key); - } - DBCol::DbVersion - | DBCol::BlockMisc - | DBCol::_GCCount - | DBCol::BlockHeight // block sync needs it + genesis should be accessible - | DBCol::_Peers - | DBCol::RecentOutboundConnections - | DBCol::BlockMerkleTree - | DBCol::AccountAnnouncements - | DBCol::EpochLightClientBlocks - | DBCol::PeerComponent - | DBCol::LastComponentNonce - | DBCol::ComponentEdges - // https://github.com/nearprotocol/nearcore/pull/2952 - | DBCol::EpochInfo - | DBCol::EpochStart - | DBCol::EpochValidatorInfo - | DBCol::BlockOrdinal - | DBCol::_ChunkPerHeightShard - | DBCol::_NextBlockWithNewChunk - | DBCol::_LastBlockWithNewChunk - | DBCol::_TransactionRefCount - | DBCol::_TransactionResult - | DBCol::StateChangesForSplitStates - | DBCol::CachedContractCode - | DBCol::FlatState - | DBCol::FlatStateChanges - | DBCol::FlatStateDeltaMetadata - | DBCol::FlatStorageStatus - | DBCol::Misc - => unreachable!(), - #[cfg(feature = "new_epoch_sync")] - DBCol::EpochSyncInfo => unreachable!(), - } - self.merge(store_update); - } - /// Merge another StoreUpdate into this one pub fn merge(&mut self, store_update: StoreUpdate) { self.store_updates.push(store_update); @@ -3424,50 +2734,13 @@ impl<'a> ChainStoreUpdate<'a> { mod tests { use std::sync::Arc; - use near_chain_configs::{GCConfig, GenesisConfig}; - use near_epoch_manager::shard_tracker::ShardTracker; - use near_epoch_manager::EpochManagerAdapter; - use near_primitives::block::{Block, Tip}; - use near_primitives::epoch_manager::block_info::BlockInfo; + use crate::test_utils::get_chain; use near_primitives::errors::InvalidTxError; use near_primitives::hash::hash; use near_primitives::test_utils::create_test_signer; use near_primitives::test_utils::TestBlockBuilder; - use near_primitives::types::{BlockHeight, EpochId, NumBlocks}; + use near_primitives::types::EpochId; use near_primitives::utils::index_to_bytes; - use near_primitives::validator_signer::InMemoryValidatorSigner; - use near_store::test_utils::create_test_store; - use near_store::DBCol; - - use crate::store::{ChainStoreAccess, GCMode}; - use crate::store_validator::StoreValidator; - use crate::test_utils::{KeyValueRuntime, MockEpochManager, ValidatorSchedule}; - use crate::types::ChainConfig; - use crate::{Chain, ChainGenesis, DoomslugThresholdMode}; - - fn get_chain() -> Chain { - get_chain_with_epoch_length(10) - } - - fn get_chain_with_epoch_length(epoch_length: NumBlocks) -> Chain { - let store = create_test_store(); - let chain_genesis = ChainGenesis::test(); - let vs = ValidatorSchedule::new() - .block_producers_per_epoch(vec![vec!["test1".parse().unwrap()]]); - let epoch_manager = MockEpochManager::new_with_validators(store.clone(), vs, epoch_length); - let shard_tracker = ShardTracker::new_empty(epoch_manager.clone()); - let runtime = KeyValueRuntime::new(store, epoch_manager.as_ref()); - Chain::new( - epoch_manager, - shard_tracker, - runtime, - &chain_genesis, - DoomslugThresholdMode::NoApprovals, - ChainConfig::test(), - None, - ) - .unwrap() - } #[test] fn test_tx_validity_long_fork() { @@ -3657,300 +2930,4 @@ mod tests { assert_ne!(block_hash, block_hash1); assert_ne!(epoch_id_to_hash, epoch_id_to_hash1); } - - /// Test that garbage collection works properly. The blocks behind gc head should be garbage - /// collected while the blocks that are ahead of it should not. - #[test] - fn test_clear_old_data() { - let mut chain = get_chain_with_epoch_length(1); - let epoch_manager = chain.epoch_manager.clone(); - let genesis = chain.get_block_by_height(0).unwrap(); - let signer = Arc::new(create_test_signer("test1")); - let mut prev_block = genesis; - let mut blocks = vec![prev_block.clone()]; - for i in 1..15 { - add_block( - &mut chain, - epoch_manager.as_ref(), - &mut prev_block, - &mut blocks, - signer.clone(), - i, - ); - } - - let trie = chain.runtime_adapter.get_tries(); - chain.clear_data(trie, &GCConfig { gc_blocks_limit: 100, ..GCConfig::default() }).unwrap(); - - // epoch didn't change so no data is garbage collected. - for i in 0..15 { - println!("height = {} hash = {}", i, blocks[i].hash()); - if i < 8 { - assert!(chain.get_block(blocks[i].hash()).is_err()); - assert!(chain - .mut_store() - .get_all_block_hashes_by_height(i as BlockHeight) - .unwrap() - .is_empty()); - } else { - assert!(chain.get_block(blocks[i].hash()).is_ok()); - assert!(!chain - .mut_store() - .get_all_block_hashes_by_height(i as BlockHeight) - .unwrap() - .is_empty()); - } - } - } - - // Adds block to the chain at given height after prev_block. - fn add_block( - chain: &mut Chain, - epoch_manager: &dyn EpochManagerAdapter, - prev_block: &mut Block, - blocks: &mut Vec, - signer: Arc, - height: u64, - ) { - let next_epoch_id = epoch_manager - .get_next_epoch_id_from_prev_block(prev_block.hash()) - .expect("block must exist"); - let mut store_update = chain.mut_store().store_update(); - - let block = if next_epoch_id == *prev_block.header().next_epoch_id() { - TestBlockBuilder::new(&prev_block, signer).height(height).build() - } else { - let prev_hash = prev_block.hash(); - let epoch_id = prev_block.header().next_epoch_id().clone(); - let next_bp_hash = Chain::compute_bp_hash( - epoch_manager, - next_epoch_id.clone(), - epoch_id.clone(), - &prev_hash, - ) - .unwrap(); - TestBlockBuilder::new(&prev_block, signer) - .height(height) - .epoch_id(epoch_id) - .next_epoch_id(next_epoch_id) - .next_bp_hash(next_bp_hash) - .build() - }; - blocks.push(block.clone()); - store_update.save_block(block.clone()); - store_update.inc_block_refcount(block.header().prev_hash()).unwrap(); - store_update.save_block_header(block.header().clone()).unwrap(); - store_update.save_head(&Tip::from_header(block.header())).unwrap(); - store_update - .chain_store_cache_update - .height_to_hashes - .insert(height, Some(*block.header().hash())); - store_update.save_next_block_hash(prev_block.hash(), *block.hash()); - store_update.commit().unwrap(); - *prev_block = block.clone(); - } - - #[test] - fn test_clear_old_data_fixed_height() { - let mut chain = get_chain(); - let epoch_manager = chain.epoch_manager.clone(); - let genesis = chain.get_block_by_height(0).unwrap(); - let signer = Arc::new(create_test_signer("test1")); - let mut prev_block = genesis; - let mut blocks = vec![prev_block.clone()]; - for i in 1..10 { - add_block( - &mut chain, - epoch_manager.as_ref(), - &mut prev_block, - &mut blocks, - signer.clone(), - i, - ); - } - - assert!(chain.get_block(blocks[4].hash()).is_ok()); - assert!(chain.get_block(blocks[5].hash()).is_ok()); - assert!(chain.get_block(blocks[6].hash()).is_ok()); - assert!(chain.get_block_header(blocks[5].hash()).is_ok()); - assert_eq!( - chain - .mut_store() - .get_all_block_hashes_by_height(5) - .unwrap() - .values() - .flatten() - .collect::>(), - vec![blocks[5].hash()] - ); - assert!(chain.mut_store().get_next_block_hash(blocks[5].hash()).is_ok()); - - let trie = chain.runtime_adapter.get_tries(); - let mut store_update = chain.mut_store().store_update(); - assert!(store_update - .clear_block_data(epoch_manager.as_ref(), *blocks[5].hash(), GCMode::Canonical(trie)) - .is_ok()); - store_update.commit().unwrap(); - - assert!(chain.get_block(blocks[4].hash()).is_err()); - assert!(chain.get_block(blocks[5].hash()).is_ok()); - assert!(chain.get_block(blocks[6].hash()).is_ok()); - // block header should be available - assert!(chain.get_block_header(blocks[4].hash()).is_ok()); - assert!(chain.get_block_header(blocks[5].hash()).is_ok()); - assert!(chain.get_block_header(blocks[6].hash()).is_ok()); - assert!(chain.mut_store().get_all_block_hashes_by_height(4).unwrap().is_empty()); - assert!(!chain.mut_store().get_all_block_hashes_by_height(5).unwrap().is_empty()); - assert!(!chain.mut_store().get_all_block_hashes_by_height(6).unwrap().is_empty()); - assert!(chain.mut_store().get_next_block_hash(blocks[4].hash()).is_err()); - assert!(chain.mut_store().get_next_block_hash(blocks[5].hash()).is_ok()); - assert!(chain.mut_store().get_next_block_hash(blocks[6].hash()).is_ok()); - } - - /// Test that `gc_blocks_limit` works properly - #[test] - #[cfg_attr(not(feature = "expensive_tests"), ignore)] - fn test_clear_old_data_too_many_heights() { - for i in 1..5 { - println!("gc_blocks_limit == {:?}", i); - test_clear_old_data_too_many_heights_common(i); - } - test_clear_old_data_too_many_heights_common(25); - test_clear_old_data_too_many_heights_common(50); - test_clear_old_data_too_many_heights_common(87); - } - - fn test_clear_old_data_too_many_heights_common(gc_blocks_limit: NumBlocks) { - let mut chain = get_chain_with_epoch_length(1); - let genesis = chain.get_block_by_height(0).unwrap(); - let signer = Arc::new(create_test_signer("test1")); - let mut prev_block = genesis; - let mut blocks = vec![prev_block.clone()]; - { - let mut store_update = chain.store().store().store_update(); - let block_info = BlockInfo::default(); - store_update - .insert_ser(DBCol::BlockInfo, prev_block.hash().as_ref(), &block_info) - .unwrap(); - store_update.commit().unwrap(); - } - for i in 1..1000 { - let block = TestBlockBuilder::new(&prev_block, signer.clone()).height(i).build(); - blocks.push(block.clone()); - - let mut store_update = chain.mut_store().store_update(); - store_update.save_block(block.clone()); - store_update.inc_block_refcount(block.header().prev_hash()).unwrap(); - store_update.save_block_header(block.header().clone()).unwrap(); - store_update.save_head(&Tip::from_header(&block.header())).unwrap(); - { - let mut store_update = store_update.store().store_update(); - let block_info = BlockInfo::default(); - store_update - .insert_ser(DBCol::BlockInfo, block.hash().as_ref(), &block_info) - .unwrap(); - store_update.commit().unwrap(); - } - store_update - .chain_store_cache_update - .height_to_hashes - .insert(i, Some(*block.header().hash())); - store_update.save_next_block_hash(&prev_block.hash(), *block.hash()); - store_update.commit().unwrap(); - - prev_block = block.clone(); - } - - let trie = chain.runtime_adapter.get_tries(); - - for iter in 0..10 { - println!("ITERATION #{:?}", iter); - assert!(chain - .clear_data(trie.clone(), &GCConfig { gc_blocks_limit, ..GCConfig::default() }) - .is_ok()); - - // epoch didn't change so no data is garbage collected. - for i in 0..1000 { - if i < (iter + 1) * gc_blocks_limit as usize { - assert!(chain.get_block(&blocks[i].hash()).is_err()); - assert!(chain - .mut_store() - .get_all_block_hashes_by_height(i as BlockHeight) - .unwrap() - .is_empty()); - } else { - assert!(chain.get_block(&blocks[i].hash()).is_ok()); - assert!(!chain - .mut_store() - .get_all_block_hashes_by_height(i as BlockHeight) - .unwrap() - .is_empty()); - } - } - let mut genesis = GenesisConfig::default(); - genesis.genesis_height = 0; - let mut store_validator = StoreValidator::new( - None, - genesis.clone(), - chain.epoch_manager.clone(), - chain.shard_tracker.clone(), - chain.runtime_adapter.clone(), - chain.store().store().clone(), - false, - ); - store_validator.validate(); - println!("errors = {:?}", store_validator.errors); - assert!(!store_validator.is_failed()); - } - } - #[test] - fn test_fork_chunk_tail_updates() { - let mut chain = get_chain(); - let epoch_manager = chain.epoch_manager.clone(); - let genesis = chain.get_block_by_height(0).unwrap(); - let signer = Arc::new(create_test_signer("test1")); - let mut prev_block = genesis; - let mut blocks = vec![prev_block.clone()]; - for i in 1..10 { - add_block( - &mut chain, - epoch_manager.as_ref(), - &mut prev_block, - &mut blocks, - signer.clone(), - i, - ); - } - assert_eq!(chain.tail().unwrap(), 0); - - { - let mut store_update = chain.mut_store().store_update(); - assert_eq!(store_update.tail().unwrap(), 0); - store_update.update_tail(1).unwrap(); - store_update.commit().unwrap(); - } - // Chunk tail should be auto updated to genesis (if not set) and fork_tail to the tail. - { - let store_update = chain.mut_store().store_update(); - assert_eq!(store_update.tail().unwrap(), 1); - assert_eq!(store_update.fork_tail().unwrap(), 1); - assert_eq!(store_update.chunk_tail().unwrap(), 0); - } - { - let mut store_update = chain.mut_store().store_update(); - store_update.update_fork_tail(3); - store_update.commit().unwrap(); - } - { - let mut store_update = chain.mut_store().store_update(); - store_update.update_tail(2).unwrap(); - store_update.commit().unwrap(); - } - { - let store_update = chain.mut_store().store_update(); - assert_eq!(store_update.tail().unwrap(), 2); - assert_eq!(store_update.fork_tail().unwrap(), 3); - assert_eq!(store_update.chunk_tail().unwrap(), 0); - } - } } diff --git a/chain/chain/src/test_utils.rs b/chain/chain/src/test_utils.rs index eb632d50c93..7731ba984c1 100644 --- a/chain/chain/src/test_utils.rs +++ b/chain/chain/src/test_utils.rs @@ -38,6 +38,30 @@ pub use self::kv_runtime::MockEpochManager; pub use self::validator_schedule::ValidatorSchedule; +pub fn get_chain() -> Chain { + get_chain_with_epoch_length(10) +} + +pub fn get_chain_with_epoch_length(epoch_length: NumBlocks) -> Chain { + let store = create_test_store(); + let chain_genesis = ChainGenesis::test(); + let vs = + ValidatorSchedule::new().block_producers_per_epoch(vec![vec!["test1".parse().unwrap()]]); + let epoch_manager = MockEpochManager::new_with_validators(store.clone(), vs, epoch_length); + let shard_tracker = ShardTracker::new_empty(epoch_manager.clone()); + let runtime = KeyValueRuntime::new(store, epoch_manager.as_ref()); + Chain::new( + epoch_manager, + shard_tracker, + runtime, + &chain_genesis, + DoomslugThresholdMode::NoApprovals, + ChainConfig::test(), + None, + ) + .unwrap() +} + /// Wait for all blocks that started processing to be ready for postprocessing /// Returns true if there are new blocks that are ready pub fn wait_for_all_blocks_in_processing(chain: &Chain) -> bool { From 11f0574150e5f54016021c005dd38b2f49ba1dd3 Mon Sep 17 00:00:00 2001 From: Shreyan Gupta Date: Thu, 21 Dec 2023 23:30:20 +0530 Subject: [PATCH 2/2] move tests to gc.rs --- chain/chain/src/garbage_collection.rs | 319 +--------------- chain/chain/src/test_utils.rs | 60 +-- .../tests/{gc.rs => garbage_collection.rs} | 351 ++++++++++++++++-- chain/chain/src/tests/mod.rs | 2 +- 4 files changed, 346 insertions(+), 386 deletions(-) rename chain/chain/src/tests/{gc.rs => garbage_collection.rs} (67%) diff --git a/chain/chain/src/garbage_collection.rs b/chain/chain/src/garbage_collection.rs index 554c5e23fef..33424e900f2 100644 --- a/chain/chain/src/garbage_collection.rs +++ b/chain/chain/src/garbage_collection.rs @@ -18,7 +18,7 @@ use crate::types::RuntimeAdapter; use crate::{metrics, Chain, ChainStoreAccess, ChainStoreUpdate}; #[derive(Clone)] -enum GCMode { +pub enum GCMode { Fork(ShardTries), Canonical(ShardTries), StateSync { clear_block_info: bool }, @@ -517,7 +517,7 @@ impl<'a> ChainStoreUpdate<'a> { // Clearing block data of `block_hash`, if on a fork. // Clearing block data of `block_hash.prev`, if on the Canonical Chain. - fn clear_block_data( + pub fn clear_block_data( &mut self, epoch_manager: &dyn EpochManagerAdapter, mut block_hash: CryptoHash, @@ -1008,318 +1008,3 @@ impl<'a> ChainStoreUpdate<'a> { self.merge(store_update); } } - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use near_chain_configs::{GCConfig, GenesisConfig}; - use near_epoch_manager::EpochManagerAdapter; - use near_primitives::block::{Block, Tip}; - use near_primitives::epoch_manager::block_info::BlockInfo; - use near_primitives::test_utils::{create_test_signer, TestBlockBuilder}; - use near_primitives::types::{BlockHeight, NumBlocks}; - use near_primitives::validator_signer::InMemoryValidatorSigner; - use near_store::DBCol; - - use crate::garbage_collection::GCMode; - use crate::test_utils::{get_chain, get_chain_with_epoch_length}; - use crate::{Chain, ChainStoreAccess, StoreValidator}; - - /// Test that garbage collection works properly. The blocks behind gc head should be garbage - /// collected while the blocks that are ahead of it should not. - #[test] - fn test_clear_old_data() { - let mut chain = get_chain_with_epoch_length(1); - let epoch_manager = chain.epoch_manager.clone(); - let genesis = chain.get_block_by_height(0).unwrap(); - let signer = Arc::new(create_test_signer("test1")); - let mut prev_block = genesis; - let mut blocks = vec![prev_block.clone()]; - for i in 1..15 { - add_block( - &mut chain, - epoch_manager.as_ref(), - &mut prev_block, - &mut blocks, - signer.clone(), - i, - ); - } - - let trie = chain.runtime_adapter.get_tries(); - chain.clear_data(trie, &GCConfig { gc_blocks_limit: 100, ..GCConfig::default() }).unwrap(); - - // epoch didn't change so no data is garbage collected. - for i in 0..15 { - println!("height = {} hash = {}", i, blocks[i].hash()); - if i < 8 { - assert!(chain.get_block(blocks[i].hash()).is_err()); - assert!(chain - .mut_store() - .get_all_block_hashes_by_height(i as BlockHeight) - .unwrap() - .is_empty()); - } else { - assert!(chain.get_block(blocks[i].hash()).is_ok()); - assert!(!chain - .mut_store() - .get_all_block_hashes_by_height(i as BlockHeight) - .unwrap() - .is_empty()); - } - } - } - - // Adds block to the chain at given height after prev_block. - fn add_block( - chain: &mut Chain, - epoch_manager: &dyn EpochManagerAdapter, - prev_block: &mut Block, - blocks: &mut Vec, - signer: Arc, - height: u64, - ) { - let next_epoch_id = epoch_manager - .get_next_epoch_id_from_prev_block(prev_block.hash()) - .expect("block must exist"); - let mut store_update = chain.mut_store().store_update(); - - let block = if next_epoch_id == *prev_block.header().next_epoch_id() { - TestBlockBuilder::new(&prev_block, signer).height(height).build() - } else { - let prev_hash = prev_block.hash(); - let epoch_id = prev_block.header().next_epoch_id().clone(); - let next_bp_hash = Chain::compute_bp_hash( - epoch_manager, - next_epoch_id.clone(), - epoch_id.clone(), - &prev_hash, - ) - .unwrap(); - TestBlockBuilder::new(&prev_block, signer) - .height(height) - .epoch_id(epoch_id) - .next_epoch_id(next_epoch_id) - .next_bp_hash(next_bp_hash) - .build() - }; - blocks.push(block.clone()); - store_update.save_block(block.clone()); - store_update.inc_block_refcount(block.header().prev_hash()).unwrap(); - store_update.save_block_header(block.header().clone()).unwrap(); - store_update.save_head(&Tip::from_header(block.header())).unwrap(); - store_update - .chain_store_cache_update - .height_to_hashes - .insert(height, Some(*block.header().hash())); - store_update.save_next_block_hash(prev_block.hash(), *block.hash()); - store_update.commit().unwrap(); - *prev_block = block.clone(); - } - - #[test] - fn test_clear_old_data_fixed_height() { - let mut chain = get_chain(); - let epoch_manager = chain.epoch_manager.clone(); - let genesis = chain.get_block_by_height(0).unwrap(); - let signer = Arc::new(create_test_signer("test1")); - let mut prev_block = genesis; - let mut blocks = vec![prev_block.clone()]; - for i in 1..10 { - add_block( - &mut chain, - epoch_manager.as_ref(), - &mut prev_block, - &mut blocks, - signer.clone(), - i, - ); - } - - assert!(chain.get_block(blocks[4].hash()).is_ok()); - assert!(chain.get_block(blocks[5].hash()).is_ok()); - assert!(chain.get_block(blocks[6].hash()).is_ok()); - assert!(chain.get_block_header(blocks[5].hash()).is_ok()); - assert_eq!( - chain - .mut_store() - .get_all_block_hashes_by_height(5) - .unwrap() - .values() - .flatten() - .collect::>(), - vec![blocks[5].hash()] - ); - assert!(chain.mut_store().get_next_block_hash(blocks[5].hash()).is_ok()); - - let trie = chain.runtime_adapter.get_tries(); - let mut store_update = chain.mut_store().store_update(); - assert!(store_update - .clear_block_data(epoch_manager.as_ref(), *blocks[5].hash(), GCMode::Canonical(trie)) - .is_ok()); - store_update.commit().unwrap(); - - assert!(chain.get_block(blocks[4].hash()).is_err()); - assert!(chain.get_block(blocks[5].hash()).is_ok()); - assert!(chain.get_block(blocks[6].hash()).is_ok()); - // block header should be available - assert!(chain.get_block_header(blocks[4].hash()).is_ok()); - assert!(chain.get_block_header(blocks[5].hash()).is_ok()); - assert!(chain.get_block_header(blocks[6].hash()).is_ok()); - assert!(chain.mut_store().get_all_block_hashes_by_height(4).unwrap().is_empty()); - assert!(!chain.mut_store().get_all_block_hashes_by_height(5).unwrap().is_empty()); - assert!(!chain.mut_store().get_all_block_hashes_by_height(6).unwrap().is_empty()); - assert!(chain.mut_store().get_next_block_hash(blocks[4].hash()).is_err()); - assert!(chain.mut_store().get_next_block_hash(blocks[5].hash()).is_ok()); - assert!(chain.mut_store().get_next_block_hash(blocks[6].hash()).is_ok()); - } - - /// Test that `gc_blocks_limit` works properly - #[test] - #[cfg_attr(not(feature = "expensive_tests"), ignore)] - fn test_clear_old_data_too_many_heights() { - for i in 1..5 { - println!("gc_blocks_limit == {:?}", i); - test_clear_old_data_too_many_heights_common(i); - } - test_clear_old_data_too_many_heights_common(25); - test_clear_old_data_too_many_heights_common(50); - test_clear_old_data_too_many_heights_common(87); - } - - fn test_clear_old_data_too_many_heights_common(gc_blocks_limit: NumBlocks) { - let mut chain = get_chain_with_epoch_length(1); - let genesis = chain.get_block_by_height(0).unwrap(); - let signer = Arc::new(create_test_signer("test1")); - let mut prev_block = genesis; - let mut blocks = vec![prev_block.clone()]; - { - let mut store_update = chain.store().store().store_update(); - let block_info = BlockInfo::default(); - store_update - .insert_ser(DBCol::BlockInfo, prev_block.hash().as_ref(), &block_info) - .unwrap(); - store_update.commit().unwrap(); - } - for i in 1..1000 { - let block = TestBlockBuilder::new(&prev_block, signer.clone()).height(i).build(); - blocks.push(block.clone()); - - let mut store_update = chain.mut_store().store_update(); - store_update.save_block(block.clone()); - store_update.inc_block_refcount(block.header().prev_hash()).unwrap(); - store_update.save_block_header(block.header().clone()).unwrap(); - store_update.save_head(&Tip::from_header(&block.header())).unwrap(); - { - let mut store_update = store_update.store().store_update(); - let block_info = BlockInfo::default(); - store_update - .insert_ser(DBCol::BlockInfo, block.hash().as_ref(), &block_info) - .unwrap(); - store_update.commit().unwrap(); - } - store_update - .chain_store_cache_update - .height_to_hashes - .insert(i, Some(*block.header().hash())); - store_update.save_next_block_hash(&prev_block.hash(), *block.hash()); - store_update.commit().unwrap(); - - prev_block = block.clone(); - } - - let trie = chain.runtime_adapter.get_tries(); - - for iter in 0..10 { - println!("ITERATION #{:?}", iter); - assert!(chain - .clear_data(trie.clone(), &GCConfig { gc_blocks_limit, ..GCConfig::default() }) - .is_ok()); - - // epoch didn't change so no data is garbage collected. - for i in 0..1000 { - if i < (iter + 1) * gc_blocks_limit as usize { - assert!(chain.get_block(&blocks[i].hash()).is_err()); - assert!(chain - .mut_store() - .get_all_block_hashes_by_height(i as BlockHeight) - .unwrap() - .is_empty()); - } else { - assert!(chain.get_block(&blocks[i].hash()).is_ok()); - assert!(!chain - .mut_store() - .get_all_block_hashes_by_height(i as BlockHeight) - .unwrap() - .is_empty()); - } - } - let mut genesis = GenesisConfig::default(); - genesis.genesis_height = 0; - let mut store_validator = StoreValidator::new( - None, - genesis.clone(), - chain.epoch_manager.clone(), - chain.shard_tracker.clone(), - chain.runtime_adapter.clone(), - chain.store().store().clone(), - false, - ); - store_validator.validate(); - println!("errors = {:?}", store_validator.errors); - assert!(!store_validator.is_failed()); - } - } - - #[test] - fn test_fork_chunk_tail_updates() { - let mut chain = get_chain(); - let epoch_manager = chain.epoch_manager.clone(); - let genesis = chain.get_block_by_height(0).unwrap(); - let signer = Arc::new(create_test_signer("test1")); - let mut prev_block = genesis; - let mut blocks = vec![prev_block.clone()]; - for i in 1..10 { - add_block( - &mut chain, - epoch_manager.as_ref(), - &mut prev_block, - &mut blocks, - signer.clone(), - i, - ); - } - assert_eq!(chain.tail().unwrap(), 0); - - { - let mut store_update = chain.mut_store().store_update(); - assert_eq!(store_update.tail().unwrap(), 0); - store_update.update_tail(1).unwrap(); - store_update.commit().unwrap(); - } - // Chunk tail should be auto updated to genesis (if not set) and fork_tail to the tail. - { - let store_update = chain.mut_store().store_update(); - assert_eq!(store_update.tail().unwrap(), 1); - assert_eq!(store_update.fork_tail().unwrap(), 1); - assert_eq!(store_update.chunk_tail().unwrap(), 0); - } - { - let mut store_update = chain.mut_store().store_update(); - store_update.update_fork_tail(3); - store_update.commit().unwrap(); - } - { - let mut store_update = chain.mut_store().store_update(); - store_update.update_tail(2).unwrap(); - store_update.commit().unwrap(); - } - { - let store_update = chain.mut_store().store_update(); - assert_eq!(store_update.tail().unwrap(), 2); - assert_eq!(store_update.fork_tail().unwrap(), 3); - assert_eq!(store_update.chunk_tail().unwrap(), 0); - } - } -} diff --git a/chain/chain/src/test_utils.rs b/chain/chain/src/test_utils.rs index 7731ba984c1..723a9c43645 100644 --- a/chain/chain/src/test_utils.rs +++ b/chain/chain/src/test_utils.rs @@ -1,52 +1,56 @@ mod kv_runtime; mod validator_schedule; -use std::cmp::Ordering; -use std::sync::Arc; - use chrono::{DateTime, Utc}; -use near_epoch_manager::shard_tracker::ShardTracker; -use near_primitives::test_utils::create_test_signer; use num_rational::Ratio; -use tracing::debug; - -use near_chain_primitives::Error; - -use near_primitives::block::Block; - -use near_primitives::hash::CryptoHash; - -use near_primitives::types::{AccountId, NumBlocks}; -use near_primitives::validator_signer::InMemoryValidatorSigner; -use near_primitives::version::PROTOCOL_VERSION; - -use near_store::test_utils::create_test_store; -use near_store::DBCol; +use std::cmp::Ordering; +use std::sync::Arc; +pub use self::kv_runtime::account_id_to_shard_id; +pub use self::kv_runtime::KeyValueRuntime; +pub use self::kv_runtime::MockEpochManager; +pub use self::validator_schedule::ValidatorSchedule; use crate::block_processing_utils::BlockNotInPoolError; use crate::chain::Chain; use crate::store::ChainStoreAccess; use crate::types::{AcceptedBlock, ChainConfig, ChainGenesis}; use crate::DoomslugThresholdMode; use crate::{BlockProcessingArtifact, Provenance}; +use near_chain_primitives::Error; +use near_epoch_manager::shard_tracker::ShardTracker; +use near_primitives::block::Block; +use near_primitives::hash::CryptoHash; use near_primitives::static_clock::StaticClock; +use near_primitives::test_utils::create_test_signer; +use near_primitives::types::{AccountId, NumBlocks, NumShards}; use near_primitives::utils::MaybeValidated; - -pub use self::kv_runtime::account_id_to_shard_id; -pub use self::kv_runtime::KeyValueRuntime; -pub use self::kv_runtime::MockEpochManager; - -pub use self::validator_schedule::ValidatorSchedule; +use near_primitives::validator_signer::InMemoryValidatorSigner; +use near_primitives::version::PROTOCOL_VERSION; +use near_store::test_utils::create_test_store; +use near_store::DBCol; +use tracing::debug; pub fn get_chain() -> Chain { - get_chain_with_epoch_length(10) + get_chain_with_epoch_length_and_num_shards(10, 1) +} + +pub fn get_chain_with_num_shards(num_shards: NumShards) -> Chain { + get_chain_with_epoch_length_and_num_shards(10, num_shards) } pub fn get_chain_with_epoch_length(epoch_length: NumBlocks) -> Chain { + get_chain_with_epoch_length_and_num_shards(epoch_length, 1) +} + +pub fn get_chain_with_epoch_length_and_num_shards( + epoch_length: NumBlocks, + num_shards: NumShards, +) -> Chain { let store = create_test_store(); let chain_genesis = ChainGenesis::test(); - let vs = - ValidatorSchedule::new().block_producers_per_epoch(vec![vec!["test1".parse().unwrap()]]); + let vs = ValidatorSchedule::new() + .block_producers_per_epoch(vec![vec!["test1".parse().unwrap()]]) + .num_shards(num_shards); let epoch_manager = MockEpochManager::new_with_validators(store.clone(), vs, epoch_length); let shard_tracker = ShardTracker::new_empty(epoch_manager.clone()); let runtime = KeyValueRuntime::new(store, epoch_manager.as_ref()); diff --git a/chain/chain/src/tests/gc.rs b/chain/chain/src/tests/garbage_collection.rs similarity index 67% rename from chain/chain/src/tests/gc.rs rename to chain/chain/src/tests/garbage_collection.rs index 898894d8396..09247fcbc66 100644 --- a/chain/chain/src/tests/gc.rs +++ b/chain/chain/src/tests/garbage_collection.rs @@ -1,48 +1,26 @@ +use rand::Rng; use std::sync::Arc; use crate::chain::Chain; -use crate::test_utils::{KeyValueRuntime, MockEpochManager, ValidatorSchedule}; -use crate::types::{ChainConfig, ChainGenesis, Tip}; -use crate::DoomslugThresholdMode; - -use near_chain_configs::GCConfig; -use near_epoch_manager::shard_tracker::ShardTracker; +use crate::garbage_collection::GCMode; +use crate::test_utils::{ + get_chain, get_chain_with_epoch_length, get_chain_with_epoch_length_and_num_shards, + get_chain_with_num_shards, +}; +use crate::types::Tip; +use crate::{ChainStoreAccess, StoreValidator}; + +use near_chain_configs::{GCConfig, GenesisConfig}; +use near_epoch_manager::EpochManagerAdapter; use near_primitives::block::Block; +use near_primitives::epoch_manager::block_info::BlockInfo; use near_primitives::merkle::PartialMerkleTree; use near_primitives::shard_layout::ShardUId; use near_primitives::test_utils::{create_test_signer, TestBlockBuilder}; -use near_primitives::types::{NumBlocks, NumShards, StateRoot}; -use near_store::test_utils::{create_test_store, gen_changes}; -use near_store::{ShardTries, Trie, WrappedTrieChanges}; -use rand::Rng; - -fn get_chain(num_shards: NumShards) -> Chain { - get_chain_with_epoch_length_and_num_shards(10, num_shards) -} - -fn get_chain_with_epoch_length_and_num_shards( - epoch_length: NumBlocks, - num_shards: NumShards, -) -> Chain { - let store = create_test_store(); - let chain_genesis = ChainGenesis::test(); - let vs = ValidatorSchedule::new() - .block_producers_per_epoch(vec![vec!["test1".parse().unwrap()]]) - .num_shards(num_shards); - let epoch_manager = MockEpochManager::new_with_validators(store.clone(), vs, epoch_length); - let shard_tracker = ShardTracker::new_empty(epoch_manager.clone()); - let runtime = KeyValueRuntime::new(store, epoch_manager.as_ref()); - Chain::new( - epoch_manager, - shard_tracker, - runtime, - &chain_genesis, - DoomslugThresholdMode::NoApprovals, - ChainConfig::test(), - None, - ) - .unwrap() -} +use near_primitives::types::{BlockHeight, NumBlocks, StateRoot}; +use near_primitives::validator_signer::InMemoryValidatorSigner; +use near_store::test_utils::gen_changes; +use near_store::{DBCol, ShardTries, Trie, WrappedTrieChanges}; // Build a chain of num_blocks on top of prev_block fn do_fork( @@ -169,7 +147,7 @@ fn gc_fork_common(simple_chains: Vec, max_changes: usize) { let num_shards = rand::thread_rng().gen_range(1..3); // Init Chain 1 - let mut chain1 = get_chain(num_shards); + let mut chain1 = get_chain_with_num_shards(num_shards); let tries1 = chain1.runtime_adapter.get_tries(); let mut rng = rand::thread_rng(); let shard_to_check_trie = rng.gen_range(0..num_shards); @@ -201,7 +179,7 @@ fn gc_fork_common(simple_chains: Vec, max_changes: usize) { .clear_data(tries1.clone(), &GCConfig { gc_blocks_limit: 1000, ..GCConfig::default() }) .unwrap(); - let tries2 = get_chain(num_shards).runtime_adapter.get_tries(); + let tries2 = get_chain_with_num_shards(num_shards).runtime_adapter.get_tries(); // Find gc_height let mut gc_height = simple_chains[0].length - 51; @@ -718,3 +696,296 @@ fn test_fork_far_away_from_epoch_end() { ); } } + +/// Test that garbage collection works properly. The blocks behind gc head should be garbage +/// collected while the blocks that are ahead of it should not. +#[test] +fn test_clear_old_data() { + let mut chain = get_chain_with_epoch_length(1); + let epoch_manager = chain.epoch_manager.clone(); + let genesis = chain.get_block_by_height(0).unwrap(); + let signer = Arc::new(create_test_signer("test1")); + let mut prev_block = genesis; + let mut blocks = vec![prev_block.clone()]; + for i in 1..15 { + add_block( + &mut chain, + epoch_manager.as_ref(), + &mut prev_block, + &mut blocks, + signer.clone(), + i, + ); + } + + let trie = chain.runtime_adapter.get_tries(); + chain.clear_data(trie, &GCConfig { gc_blocks_limit: 100, ..GCConfig::default() }).unwrap(); + + // epoch didn't change so no data is garbage collected. + for i in 0..15 { + println!("height = {} hash = {}", i, blocks[i].hash()); + if i < 8 { + assert!(chain.get_block(blocks[i].hash()).is_err()); + assert!(chain + .mut_store() + .get_all_block_hashes_by_height(i as BlockHeight) + .unwrap() + .is_empty()); + } else { + assert!(chain.get_block(blocks[i].hash()).is_ok()); + assert!(!chain + .mut_store() + .get_all_block_hashes_by_height(i as BlockHeight) + .unwrap() + .is_empty()); + } + } +} + +// Adds block to the chain at given height after prev_block. +fn add_block( + chain: &mut Chain, + epoch_manager: &dyn EpochManagerAdapter, + prev_block: &mut Block, + blocks: &mut Vec, + signer: Arc, + height: u64, +) { + let next_epoch_id = epoch_manager + .get_next_epoch_id_from_prev_block(prev_block.hash()) + .expect("block must exist"); + let mut store_update = chain.mut_store().store_update(); + + let block = if next_epoch_id == *prev_block.header().next_epoch_id() { + TestBlockBuilder::new(&prev_block, signer).height(height).build() + } else { + let prev_hash = prev_block.hash(); + let epoch_id = prev_block.header().next_epoch_id().clone(); + let next_bp_hash = Chain::compute_bp_hash( + epoch_manager, + next_epoch_id.clone(), + epoch_id.clone(), + &prev_hash, + ) + .unwrap(); + TestBlockBuilder::new(&prev_block, signer) + .height(height) + .epoch_id(epoch_id) + .next_epoch_id(next_epoch_id) + .next_bp_hash(next_bp_hash) + .build() + }; + blocks.push(block.clone()); + store_update.save_block(block.clone()); + store_update.inc_block_refcount(block.header().prev_hash()).unwrap(); + store_update.save_block_header(block.header().clone()).unwrap(); + store_update.save_head(&Tip::from_header(block.header())).unwrap(); + store_update + .chain_store_cache_update + .height_to_hashes + .insert(height, Some(*block.header().hash())); + store_update.save_next_block_hash(prev_block.hash(), *block.hash()); + store_update.commit().unwrap(); + *prev_block = block.clone(); +} + +#[test] +fn test_clear_old_data_fixed_height() { + let mut chain = get_chain(); + let epoch_manager = chain.epoch_manager.clone(); + let genesis = chain.get_block_by_height(0).unwrap(); + let signer = Arc::new(create_test_signer("test1")); + let mut prev_block = genesis; + let mut blocks = vec![prev_block.clone()]; + for i in 1..10 { + add_block( + &mut chain, + epoch_manager.as_ref(), + &mut prev_block, + &mut blocks, + signer.clone(), + i, + ); + } + + assert!(chain.get_block(blocks[4].hash()).is_ok()); + assert!(chain.get_block(blocks[5].hash()).is_ok()); + assert!(chain.get_block(blocks[6].hash()).is_ok()); + assert!(chain.get_block_header(blocks[5].hash()).is_ok()); + assert_eq!( + chain + .mut_store() + .get_all_block_hashes_by_height(5) + .unwrap() + .values() + .flatten() + .collect::>(), + vec![blocks[5].hash()] + ); + assert!(chain.mut_store().get_next_block_hash(blocks[5].hash()).is_ok()); + + let trie = chain.runtime_adapter.get_tries(); + let mut store_update = chain.mut_store().store_update(); + assert!(store_update + .clear_block_data(epoch_manager.as_ref(), *blocks[5].hash(), GCMode::Canonical(trie)) + .is_ok()); + store_update.commit().unwrap(); + + assert!(chain.get_block(blocks[4].hash()).is_err()); + assert!(chain.get_block(blocks[5].hash()).is_ok()); + assert!(chain.get_block(blocks[6].hash()).is_ok()); + // block header should be available + assert!(chain.get_block_header(blocks[4].hash()).is_ok()); + assert!(chain.get_block_header(blocks[5].hash()).is_ok()); + assert!(chain.get_block_header(blocks[6].hash()).is_ok()); + assert!(chain.mut_store().get_all_block_hashes_by_height(4).unwrap().is_empty()); + assert!(!chain.mut_store().get_all_block_hashes_by_height(5).unwrap().is_empty()); + assert!(!chain.mut_store().get_all_block_hashes_by_height(6).unwrap().is_empty()); + assert!(chain.mut_store().get_next_block_hash(blocks[4].hash()).is_err()); + assert!(chain.mut_store().get_next_block_hash(blocks[5].hash()).is_ok()); + assert!(chain.mut_store().get_next_block_hash(blocks[6].hash()).is_ok()); +} + +/// Test that `gc_blocks_limit` works properly +#[test] +#[cfg_attr(not(feature = "expensive_tests"), ignore)] +fn test_clear_old_data_too_many_heights() { + for i in 1..5 { + println!("gc_blocks_limit == {:?}", i); + test_clear_old_data_too_many_heights_common(i); + } + test_clear_old_data_too_many_heights_common(25); + test_clear_old_data_too_many_heights_common(50); + test_clear_old_data_too_many_heights_common(87); +} + +fn test_clear_old_data_too_many_heights_common(gc_blocks_limit: NumBlocks) { + let mut chain = get_chain_with_epoch_length(1); + let genesis = chain.get_block_by_height(0).unwrap(); + let signer = Arc::new(create_test_signer("test1")); + let mut prev_block = genesis; + let mut blocks = vec![prev_block.clone()]; + { + let mut store_update = chain.store().store().store_update(); + let block_info = BlockInfo::default(); + store_update.insert_ser(DBCol::BlockInfo, prev_block.hash().as_ref(), &block_info).unwrap(); + store_update.commit().unwrap(); + } + for i in 1..1000 { + let block = TestBlockBuilder::new(&prev_block, signer.clone()).height(i).build(); + blocks.push(block.clone()); + + let mut store_update = chain.mut_store().store_update(); + store_update.save_block(block.clone()); + store_update.inc_block_refcount(block.header().prev_hash()).unwrap(); + store_update.save_block_header(block.header().clone()).unwrap(); + store_update.save_head(&Tip::from_header(&block.header())).unwrap(); + { + let mut store_update = store_update.store().store_update(); + let block_info = BlockInfo::default(); + store_update.insert_ser(DBCol::BlockInfo, block.hash().as_ref(), &block_info).unwrap(); + store_update.commit().unwrap(); + } + store_update + .chain_store_cache_update + .height_to_hashes + .insert(i, Some(*block.header().hash())); + store_update.save_next_block_hash(&prev_block.hash(), *block.hash()); + store_update.commit().unwrap(); + + prev_block = block.clone(); + } + + let trie = chain.runtime_adapter.get_tries(); + + for iter in 0..10 { + println!("ITERATION #{:?}", iter); + assert!(chain + .clear_data(trie.clone(), &GCConfig { gc_blocks_limit, ..GCConfig::default() }) + .is_ok()); + + // epoch didn't change so no data is garbage collected. + for i in 0..1000 { + if i < (iter + 1) * gc_blocks_limit as usize { + assert!(chain.get_block(&blocks[i].hash()).is_err()); + assert!(chain + .mut_store() + .get_all_block_hashes_by_height(i as BlockHeight) + .unwrap() + .is_empty()); + } else { + assert!(chain.get_block(&blocks[i].hash()).is_ok()); + assert!(!chain + .mut_store() + .get_all_block_hashes_by_height(i as BlockHeight) + .unwrap() + .is_empty()); + } + } + let mut genesis = GenesisConfig::default(); + genesis.genesis_height = 0; + let mut store_validator = StoreValidator::new( + None, + genesis.clone(), + chain.epoch_manager.clone(), + chain.shard_tracker.clone(), + chain.runtime_adapter.clone(), + chain.store().store().clone(), + false, + ); + store_validator.validate(); + println!("errors = {:?}", store_validator.errors); + assert!(!store_validator.is_failed()); + } +} + +#[test] +fn test_fork_chunk_tail_updates() { + let mut chain = get_chain(); + let epoch_manager = chain.epoch_manager.clone(); + let genesis = chain.get_block_by_height(0).unwrap(); + let signer = Arc::new(create_test_signer("test1")); + let mut prev_block = genesis; + let mut blocks = vec![prev_block.clone()]; + for i in 1..10 { + add_block( + &mut chain, + epoch_manager.as_ref(), + &mut prev_block, + &mut blocks, + signer.clone(), + i, + ); + } + assert_eq!(chain.tail().unwrap(), 0); + + { + let mut store_update = chain.mut_store().store_update(); + assert_eq!(store_update.tail().unwrap(), 0); + store_update.update_tail(1).unwrap(); + store_update.commit().unwrap(); + } + // Chunk tail should be auto updated to genesis (if not set) and fork_tail to the tail. + { + let store_update = chain.mut_store().store_update(); + assert_eq!(store_update.tail().unwrap(), 1); + assert_eq!(store_update.fork_tail().unwrap(), 1); + assert_eq!(store_update.chunk_tail().unwrap(), 0); + } + { + let mut store_update = chain.mut_store().store_update(); + store_update.update_fork_tail(3); + store_update.commit().unwrap(); + } + { + let mut store_update = chain.mut_store().store_update(); + store_update.update_tail(2).unwrap(); + store_update.commit().unwrap(); + } + { + let store_update = chain.mut_store().store_update(); + assert_eq!(store_update.tail().unwrap(), 2); + assert_eq!(store_update.fork_tail().unwrap(), 3); + assert_eq!(store_update.chunk_tail().unwrap(), 0); + } +} diff --git a/chain/chain/src/tests/mod.rs b/chain/chain/src/tests/mod.rs index 7caf1595a55..f5eb59f392f 100644 --- a/chain/chain/src/tests/mod.rs +++ b/chain/chain/src/tests/mod.rs @@ -1,6 +1,6 @@ mod challenges; mod doomslug; -mod gc; +mod garbage_collection; mod simple_chain; mod sync_chain;