Skip to content

Commit

Permalink
[refactor] Move garbage collection related code into a separate file (#…
Browse files Browse the repository at this point in the history
…10361)

Trying to push all things GC into a separate file.

This is part of the efforts highlighted in issue
#10275
  • Loading branch information
Shreyan Gupta authored Dec 21, 2023
1 parent a7c3e2b commit e703deb
Show file tree
Hide file tree
Showing 7 changed files with 1,414 additions and 1,429 deletions.
306 changes: 2 additions & 304 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::missing_chunks::MissingChunksPool;
use crate::orphan::{Orphan, OrphanBlockPool};
use crate::state_request_tracker::StateRequestTracker;
use crate::state_snapshot_actor::SnapshotCallbacks;
use crate::store::{ChainStore, ChainStoreAccess, ChainStoreUpdate, GCMode};
use crate::store::{ChainStore, ChainStoreAccess, ChainStoreUpdate};

use crate::types::{
AcceptedBlock, ApplySplitStateResultOrStateChanges, ApplyTransactionResult,
Expand Down Expand Up @@ -94,7 +94,7 @@ use near_primitives::views::{
use near_store::config::StateSnapshotType;
use near_store::flat::{store_helper, FlatStorageReadyStatus, FlatStorageStatus};
use near_store::get_genesis_state_roots;
use near_store::{DBCol, ShardTries};
use near_store::DBCol;
use once_cell::sync::OnceCell;
use rand::seq::SliceRandom;
use rand::SeedableRng;
Expand Down Expand Up @@ -639,242 +639,6 @@ impl Chain {
Ok(())
}

// GC CONTRACT
// ===
//
// Prerequisites, guaranteed by the System:
// 1. Genesis block is available and should not be removed by GC.
// 2. No block in storage except Genesis has height lower or equal to `genesis_height`.
// 3. There is known lowest block height (Tail) came from Genesis or State Sync.
// a. Tail is always on the Canonical Chain.
// b. Only one Tail exists.
// c. Tail's height is higher than or equal to `genesis_height`,
// 4. There is a known highest block height (Head).
// a. Head is always on the Canonical Chain.
// 5. All blocks in the storage have heights in range [Tail; Head].
// a. All forks end up on height of Head or lower.
// 6. If block A is ancestor of block B, height of A is strictly less then height of B.
// 7. (Property 1). A block with the lowest height among all the blocks at which the fork has started,
// i.e. all the blocks with the outgoing degree 2 or more,
// has the least height among all blocks on the fork.
// 8. (Property 2). The oldest block where the fork happened is never affected
// by Canonical Chain Switching and always stays on Canonical Chain.
//
// Overall:
// 1. GC procedure is handled by `clear_data()` function.
// 2. `clear_data()` runs GC process for all blocks from the Tail to GC Stop Height provided by Epoch Manager.
// 3. `clear_data()` executes separately:
// a. Forks Clearing runs for each height from Tail up to GC Stop Height.
// b. Canonical Chain Clearing from (Tail + 1) up to GC Stop Height.
// 4. Before actual clearing is started, Block Reference Map should be built.
// 5. `clear_data()` executes every time when block at new height is added.
// 6. In case of State Sync, State Sync Clearing happens.
//
// Forks Clearing:
// 1. Any fork which ends up on height `height` INCLUSIVELY and earlier will be completely deleted
// from the Store with all its ancestors up to the ancestor block where fork is happened
// EXCLUDING the ancestor block where fork is happened.
// 2. The oldest ancestor block always remains on the Canonical Chain by property 2.
// 3. All forks which end up on height `height + 1` and further are protected from deletion and
// no their ancestor will be deleted (even with lowest heights).
// 4. `clear_forks_data()` handles forks clearing for fixed height `height`.
//
// Canonical Chain Clearing:
// 1. Blocks on the Canonical Chain with the only descendant (if no forks started from them)
// are unlocked for Canonical Chain Clearing.
// 2. If Forks Clearing ended up on the Canonical Chain, the block may be unlocked
// for the Canonical Chain Clearing. There is no other reason to unlock the block exists.
// 3. All the unlocked blocks will be completely deleted
// from the Tail up to GC Stop Height EXCLUSIVELY.
// 4. (Property 3, GC invariant). Tail can be shifted safely to the height of the
// earliest existing block. There is always only one Tail (based on property 1)
// and it's always on the Canonical Chain (based on property 2).
//
// Example:
//
// height: 101 102 103 104
// --------[A]---[B]---[C]---[D]
// \ \
// \ \---[E]
// \
// \-[F]---[G]
//
// 1. Let's define clearing height = 102. It this case fork A-F-G is protected from deletion
// because of G which is on height 103. Nothing will be deleted.
// 2. Let's define clearing height = 103. It this case Fork Clearing will be executed for A
// to delete blocks G and F, then Fork Clearing will be executed for B to delete block E.
// Then Canonical Chain Clearing will delete blocks A and B as unlocked.
// Block C is the only block of height 103 remains on the Canonical Chain (invariant).
//
// State Sync Clearing:
// 1. Executing State Sync means that no data in the storage is useful for block processing
// and should be removed completely.
// 2. The Tail should be set to the block preceding Sync Block.
// 3. All the data preceding new Tail is deleted in State Sync Clearing
// and the Trie is updated with having only Genesis data.
// 4. State Sync Clearing happens in `reset_data_pre_state_sync()`.
//
pub fn clear_data(
&mut self,
tries: ShardTries,
gc_config: &near_chain_configs::GCConfig,
) -> Result<(), Error> {
let _span = tracing::debug_span!(target: "garbage_collection", "clear_data").entered();

let head = self.store.head()?;
let tail = self.store.tail()?;
let gc_stop_height = self.runtime_adapter.get_gc_stop_height(&head.last_block_hash);
if gc_stop_height > head.height {
return Err(Error::GCError("gc_stop_height cannot be larger than head.height".into()));
}
let prev_epoch_id = self.get_block_header(&head.prev_block_hash)?.epoch_id().clone();
let epoch_change = prev_epoch_id != head.epoch_id;
let mut fork_tail = self.store.fork_tail()?;
metrics::TAIL_HEIGHT.set(tail as i64);
metrics::FORK_TAIL_HEIGHT.set(fork_tail as i64);
metrics::CHUNK_TAIL_HEIGHT.set(self.store.chunk_tail()? as i64);
metrics::GC_STOP_HEIGHT.set(gc_stop_height as i64);
if epoch_change && fork_tail < gc_stop_height {
// if head doesn't change on the epoch boundary, we may update fork tail several times
// but that is fine since it doesn't affect correctness and also we limit the number of
// heights that fork cleaning goes through so it doesn't slow down client either.
let mut chain_store_update = self.store.store_update();
chain_store_update.update_fork_tail(gc_stop_height);
chain_store_update.commit()?;
fork_tail = gc_stop_height;
}
let mut gc_blocks_remaining = gc_config.gc_blocks_limit;

// Forks Cleaning
let gc_fork_clean_step = gc_config.gc_fork_clean_step;
let stop_height = tail.max(fork_tail.saturating_sub(gc_fork_clean_step));
for height in (stop_height..fork_tail).rev() {
self.clear_forks_data(tries.clone(), height, &mut gc_blocks_remaining)?;
if gc_blocks_remaining == 0 {
return Ok(());
}
let mut chain_store_update = self.store.store_update();
chain_store_update.update_fork_tail(height);
chain_store_update.commit()?;
}

// Canonical Chain Clearing
for height in tail + 1..gc_stop_height {
if gc_blocks_remaining == 0 {
return Ok(());
}
let blocks_current_height = self
.store
.get_all_block_hashes_by_height(height)?
.values()
.flatten()
.cloned()
.collect::<Vec<_>>();
let mut chain_store_update = self.store.store_update();
if let Some(block_hash) = blocks_current_height.first() {
let prev_hash = *chain_store_update.get_block_header(block_hash)?.prev_hash();
let prev_block_refcount = chain_store_update.get_block_refcount(&prev_hash)?;
if prev_block_refcount > 1 {
// Block of `prev_hash` starts a Fork, stopping
break;
} else if prev_block_refcount == 1 {
debug_assert_eq!(blocks_current_height.len(), 1);
chain_store_update.clear_block_data(
self.epoch_manager.as_ref(),
*block_hash,
GCMode::Canonical(tries.clone()),
)?;
chain_store_update.clear_resharding_data(
self.runtime_adapter.as_ref(),
self.epoch_manager.as_ref(),
*block_hash,
)?;
gc_blocks_remaining -= 1;
} else {
return Err(Error::GCError(
"block on canonical chain shouldn't have refcount 0".into(),
));
}
}
chain_store_update.update_tail(height)?;
chain_store_update.commit()?;
}
Ok(())
}

/// Garbage collect data which archival node doesn’t need to keep.
///
/// Normally, archival nodes keep all the data from the genesis block and
/// don’t run garbage collection. On the other hand, for better performance
/// the storage contains some data duplication, i.e. values in some of the
/// columns can be recomputed from data in different columns. To save on
/// storage, archival nodes do garbage collect that data.
///
/// `gc_height_limit` limits how many heights will the function process.
pub fn clear_archive_data(&mut self, gc_height_limit: BlockHeightDelta) -> Result<(), Error> {
let _span = tracing::debug_span!(target: "chain", "clear_archive_data").entered();

let head = self.store.head()?;
let gc_stop_height = self.runtime_adapter.get_gc_stop_height(&head.last_block_hash);
if gc_stop_height > head.height {
return Err(Error::GCError("gc_stop_height cannot be larger than head.height".into()));
}

let mut chain_store_update = self.store.store_update();
chain_store_update.clear_redundant_chunk_data(gc_stop_height, gc_height_limit)?;
metrics::CHUNK_TAIL_HEIGHT.set(chain_store_update.chunk_tail()? as i64);
metrics::GC_STOP_HEIGHT.set(gc_stop_height as i64);
chain_store_update.commit()
}

pub fn clear_forks_data(
&mut self,
tries: ShardTries,
height: BlockHeight,
gc_blocks_remaining: &mut NumBlocks,
) -> Result<(), Error> {
let blocks_current_height = self
.store
.get_all_block_hashes_by_height(height)?
.values()
.flatten()
.cloned()
.collect::<Vec<_>>();
for block_hash in blocks_current_height.iter() {
let mut current_hash = *block_hash;
loop {
if *gc_blocks_remaining == 0 {
return Ok(());
}
// Block `block_hash` is not on the Canonical Chain
// because shorter chain cannot be Canonical one
// and it may be safely deleted
// and all its ancestors while there are no other sibling blocks rely on it.
let mut chain_store_update = self.store.store_update();
if chain_store_update.get_block_refcount(&current_hash)? == 0 {
let prev_hash =
*chain_store_update.get_block_header(&current_hash)?.prev_hash();

// It's safe to call `clear_block_data` for prev data because it clears fork only here
chain_store_update.clear_block_data(
self.epoch_manager.as_ref(),
current_hash,
GCMode::Fork(tries.clone()),
)?;
chain_store_update.commit()?;
*gc_blocks_remaining -= 1;

current_hash = prev_hash;
} else {
// Block of `current_hash` is an ancestor for some other blocks, stopping
break;
}
}
}

Ok(())
}

fn maybe_mark_block_invalid(&mut self, block_hash: CryptoHash, error: &Error) {
metrics::NUM_INVALID_BLOCKS.with_label_values(&[error.prometheus_label_value()]).inc();
// We only mark the block as invalid if the block has bad data (not for other errors that would
Expand Down Expand Up @@ -1754,72 +1518,6 @@ impl Chain {
}
}

pub fn reset_data_pre_state_sync(&mut self, sync_hash: CryptoHash) -> Result<(), Error> {
let _span = tracing::debug_span!(target: "sync", "reset_data_pre_state_sync").entered();
let head = self.head()?;
// Get header we were syncing into.
let header = self.get_block_header(&sync_hash)?;
let prev_hash = *header.prev_hash();
let sync_height = header.height();
let gc_height = std::cmp::min(head.height + 1, sync_height);

// GC all the data from current tail up to `gc_height`. In case tail points to a height where
// there is no block, we need to make sure that the last block before tail is cleaned.
let tail = self.store.tail()?;
let mut tail_prev_block_cleaned = false;
for height in tail..gc_height {
let blocks_current_height = self
.store
.get_all_block_hashes_by_height(height)?
.values()
.flatten()
.cloned()
.collect::<Vec<_>>();
for block_hash in blocks_current_height {
let epoch_manager = self.epoch_manager.clone();
let mut chain_store_update = self.mut_store().store_update();
if !tail_prev_block_cleaned {
let prev_block_hash =
*chain_store_update.get_block_header(&block_hash)?.prev_hash();
if chain_store_update.get_block(&prev_block_hash).is_ok() {
chain_store_update.clear_block_data(
epoch_manager.as_ref(),
prev_block_hash,
GCMode::StateSync { clear_block_info: true },
)?;
}
tail_prev_block_cleaned = true;
}
chain_store_update.clear_block_data(
epoch_manager.as_ref(),
block_hash,
GCMode::StateSync { clear_block_info: block_hash != prev_hash },
)?;
chain_store_update.commit()?;
}
}

// Clear Chunks data
let mut chain_store_update = self.mut_store().store_update();
// The largest height of chunk we have in storage is head.height + 1
let chunk_height = std::cmp::min(head.height + 2, sync_height);
chain_store_update.clear_chunk_data_and_headers(chunk_height)?;
chain_store_update.commit()?;

// clear all trie data

let tries = self.runtime_adapter.get_tries();
let mut chain_store_update = self.mut_store().store_update();
let mut store_update = tries.store_update();
store_update.delete_all(DBCol::State);
chain_store_update.merge(store_update);

// The reason to reset tail here is not to allow Tail be greater than Head
chain_store_update.reset_tail();
chain_store_update.commit()?;
Ok(())
}

/// Set the new head after state sync was completed if it is indeed newer.
/// Check for potentially unlocked orphans after this update.
pub fn reset_heads_post_state_sync(
Expand Down
Loading

0 comments on commit e703deb

Please sign in to comment.