Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor] Move garbage collection related code into a separate file #10361

Merged
merged 2 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
306 changes: 2 additions & 304 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::missing_chunks::MissingChunksPool;
use crate::orphan::{Orphan, OrphanBlockPool};
use crate::state_request_tracker::StateRequestTracker;
use crate::state_snapshot_actor::SnapshotCallbacks;
use crate::store::{ChainStore, ChainStoreAccess, ChainStoreUpdate, GCMode};
use crate::store::{ChainStore, ChainStoreAccess, ChainStoreUpdate};

use crate::types::{
AcceptedBlock, ApplySplitStateResultOrStateChanges, ApplyTransactionResult,
Expand Down Expand Up @@ -94,7 +94,7 @@ use near_primitives::views::{
use near_store::config::StateSnapshotType;
use near_store::flat::{store_helper, FlatStorageReadyStatus, FlatStorageStatus};
use near_store::get_genesis_state_roots;
use near_store::{DBCol, ShardTries};
use near_store::DBCol;
use once_cell::sync::OnceCell;
use rand::seq::SliceRandom;
use rand::SeedableRng;
Expand Down Expand Up @@ -639,242 +639,6 @@ impl Chain {
Ok(())
}

// GC CONTRACT
// ===
//
// Prerequisites, guaranteed by the System:
// 1. Genesis block is available and should not be removed by GC.
// 2. No block in storage except Genesis has height lower or equal to `genesis_height`.
// 3. There is known lowest block height (Tail) came from Genesis or State Sync.
// a. Tail is always on the Canonical Chain.
// b. Only one Tail exists.
// c. Tail's height is higher than or equal to `genesis_height`,
// 4. There is a known highest block height (Head).
// a. Head is always on the Canonical Chain.
// 5. All blocks in the storage have heights in range [Tail; Head].
// a. All forks end up on height of Head or lower.
// 6. If block A is ancestor of block B, height of A is strictly less then height of B.
// 7. (Property 1). A block with the lowest height among all the blocks at which the fork has started,
// i.e. all the blocks with the outgoing degree 2 or more,
// has the least height among all blocks on the fork.
// 8. (Property 2). The oldest block where the fork happened is never affected
// by Canonical Chain Switching and always stays on Canonical Chain.
//
// Overall:
// 1. GC procedure is handled by `clear_data()` function.
// 2. `clear_data()` runs GC process for all blocks from the Tail to GC Stop Height provided by Epoch Manager.
// 3. `clear_data()` executes separately:
// a. Forks Clearing runs for each height from Tail up to GC Stop Height.
// b. Canonical Chain Clearing from (Tail + 1) up to GC Stop Height.
// 4. Before actual clearing is started, Block Reference Map should be built.
// 5. `clear_data()` executes every time when block at new height is added.
// 6. In case of State Sync, State Sync Clearing happens.
//
// Forks Clearing:
// 1. Any fork which ends up on height `height` INCLUSIVELY and earlier will be completely deleted
// from the Store with all its ancestors up to the ancestor block where fork is happened
// EXCLUDING the ancestor block where fork is happened.
// 2. The oldest ancestor block always remains on the Canonical Chain by property 2.
// 3. All forks which end up on height `height + 1` and further are protected from deletion and
// no their ancestor will be deleted (even with lowest heights).
// 4. `clear_forks_data()` handles forks clearing for fixed height `height`.
//
// Canonical Chain Clearing:
// 1. Blocks on the Canonical Chain with the only descendant (if no forks started from them)
// are unlocked for Canonical Chain Clearing.
// 2. If Forks Clearing ended up on the Canonical Chain, the block may be unlocked
// for the Canonical Chain Clearing. There is no other reason to unlock the block exists.
// 3. All the unlocked blocks will be completely deleted
// from the Tail up to GC Stop Height EXCLUSIVELY.
// 4. (Property 3, GC invariant). Tail can be shifted safely to the height of the
// earliest existing block. There is always only one Tail (based on property 1)
// and it's always on the Canonical Chain (based on property 2).
//
// Example:
//
// height: 101 102 103 104
// --------[A]---[B]---[C]---[D]
// \ \
// \ \---[E]
// \
// \-[F]---[G]
//
// 1. Let's define clearing height = 102. It this case fork A-F-G is protected from deletion
// because of G which is on height 103. Nothing will be deleted.
// 2. Let's define clearing height = 103. It this case Fork Clearing will be executed for A
// to delete blocks G and F, then Fork Clearing will be executed for B to delete block E.
// Then Canonical Chain Clearing will delete blocks A and B as unlocked.
// Block C is the only block of height 103 remains on the Canonical Chain (invariant).
//
// State Sync Clearing:
// 1. Executing State Sync means that no data in the storage is useful for block processing
// and should be removed completely.
// 2. The Tail should be set to the block preceding Sync Block.
// 3. All the data preceding new Tail is deleted in State Sync Clearing
// and the Trie is updated with having only Genesis data.
// 4. State Sync Clearing happens in `reset_data_pre_state_sync()`.
//
pub fn clear_data(
&mut self,
tries: ShardTries,
gc_config: &near_chain_configs::GCConfig,
) -> Result<(), Error> {
let _span = tracing::debug_span!(target: "garbage_collection", "clear_data").entered();

let head = self.store.head()?;
let tail = self.store.tail()?;
let gc_stop_height = self.runtime_adapter.get_gc_stop_height(&head.last_block_hash);
if gc_stop_height > head.height {
return Err(Error::GCError("gc_stop_height cannot be larger than head.height".into()));
}
let prev_epoch_id = self.get_block_header(&head.prev_block_hash)?.epoch_id().clone();
let epoch_change = prev_epoch_id != head.epoch_id;
let mut fork_tail = self.store.fork_tail()?;
metrics::TAIL_HEIGHT.set(tail as i64);
metrics::FORK_TAIL_HEIGHT.set(fork_tail as i64);
metrics::CHUNK_TAIL_HEIGHT.set(self.store.chunk_tail()? as i64);
metrics::GC_STOP_HEIGHT.set(gc_stop_height as i64);
if epoch_change && fork_tail < gc_stop_height {
// if head doesn't change on the epoch boundary, we may update fork tail several times
// but that is fine since it doesn't affect correctness and also we limit the number of
// heights that fork cleaning goes through so it doesn't slow down client either.
let mut chain_store_update = self.store.store_update();
chain_store_update.update_fork_tail(gc_stop_height);
chain_store_update.commit()?;
fork_tail = gc_stop_height;
}
let mut gc_blocks_remaining = gc_config.gc_blocks_limit;

// Forks Cleaning
let gc_fork_clean_step = gc_config.gc_fork_clean_step;
let stop_height = tail.max(fork_tail.saturating_sub(gc_fork_clean_step));
for height in (stop_height..fork_tail).rev() {
self.clear_forks_data(tries.clone(), height, &mut gc_blocks_remaining)?;
if gc_blocks_remaining == 0 {
return Ok(());
}
let mut chain_store_update = self.store.store_update();
chain_store_update.update_fork_tail(height);
chain_store_update.commit()?;
}

// Canonical Chain Clearing
for height in tail + 1..gc_stop_height {
if gc_blocks_remaining == 0 {
return Ok(());
}
let blocks_current_height = self
.store
.get_all_block_hashes_by_height(height)?
.values()
.flatten()
.cloned()
.collect::<Vec<_>>();
let mut chain_store_update = self.store.store_update();
if let Some(block_hash) = blocks_current_height.first() {
let prev_hash = *chain_store_update.get_block_header(block_hash)?.prev_hash();
let prev_block_refcount = chain_store_update.get_block_refcount(&prev_hash)?;
if prev_block_refcount > 1 {
// Block of `prev_hash` starts a Fork, stopping
break;
} else if prev_block_refcount == 1 {
debug_assert_eq!(blocks_current_height.len(), 1);
chain_store_update.clear_block_data(
self.epoch_manager.as_ref(),
*block_hash,
GCMode::Canonical(tries.clone()),
)?;
chain_store_update.clear_resharding_data(
self.runtime_adapter.as_ref(),
self.epoch_manager.as_ref(),
*block_hash,
)?;
gc_blocks_remaining -= 1;
} else {
return Err(Error::GCError(
"block on canonical chain shouldn't have refcount 0".into(),
));
}
}
chain_store_update.update_tail(height)?;
chain_store_update.commit()?;
}
Ok(())
}

/// Garbage collect data which archival node doesn’t need to keep.
///
/// Normally, archival nodes keep all the data from the genesis block and
/// don’t run garbage collection. On the other hand, for better performance
/// the storage contains some data duplication, i.e. values in some of the
/// columns can be recomputed from data in different columns. To save on
/// storage, archival nodes do garbage collect that data.
///
/// `gc_height_limit` limits how many heights will the function process.
pub fn clear_archive_data(&mut self, gc_height_limit: BlockHeightDelta) -> Result<(), Error> {
let _span = tracing::debug_span!(target: "chain", "clear_archive_data").entered();

let head = self.store.head()?;
let gc_stop_height = self.runtime_adapter.get_gc_stop_height(&head.last_block_hash);
if gc_stop_height > head.height {
return Err(Error::GCError("gc_stop_height cannot be larger than head.height".into()));
}

let mut chain_store_update = self.store.store_update();
chain_store_update.clear_redundant_chunk_data(gc_stop_height, gc_height_limit)?;
metrics::CHUNK_TAIL_HEIGHT.set(chain_store_update.chunk_tail()? as i64);
metrics::GC_STOP_HEIGHT.set(gc_stop_height as i64);
chain_store_update.commit()
}

pub fn clear_forks_data(
&mut self,
tries: ShardTries,
height: BlockHeight,
gc_blocks_remaining: &mut NumBlocks,
) -> Result<(), Error> {
let blocks_current_height = self
.store
.get_all_block_hashes_by_height(height)?
.values()
.flatten()
.cloned()
.collect::<Vec<_>>();
for block_hash in blocks_current_height.iter() {
let mut current_hash = *block_hash;
loop {
if *gc_blocks_remaining == 0 {
return Ok(());
}
// Block `block_hash` is not on the Canonical Chain
// because shorter chain cannot be Canonical one
// and it may be safely deleted
// and all its ancestors while there are no other sibling blocks rely on it.
let mut chain_store_update = self.store.store_update();
if chain_store_update.get_block_refcount(&current_hash)? == 0 {
let prev_hash =
*chain_store_update.get_block_header(&current_hash)?.prev_hash();

// It's safe to call `clear_block_data` for prev data because it clears fork only here
chain_store_update.clear_block_data(
self.epoch_manager.as_ref(),
current_hash,
GCMode::Fork(tries.clone()),
)?;
chain_store_update.commit()?;
*gc_blocks_remaining -= 1;

current_hash = prev_hash;
} else {
// Block of `current_hash` is an ancestor for some other blocks, stopping
break;
}
}
}

Ok(())
}

fn maybe_mark_block_invalid(&mut self, block_hash: CryptoHash, error: &Error) {
metrics::NUM_INVALID_BLOCKS.with_label_values(&[error.prometheus_label_value()]).inc();
// We only mark the block as invalid if the block has bad data (not for other errors that would
Expand Down Expand Up @@ -1754,72 +1518,6 @@ impl Chain {
}
}

pub fn reset_data_pre_state_sync(&mut self, sync_hash: CryptoHash) -> Result<(), Error> {
let _span = tracing::debug_span!(target: "sync", "reset_data_pre_state_sync").entered();
let head = self.head()?;
// Get header we were syncing into.
let header = self.get_block_header(&sync_hash)?;
let prev_hash = *header.prev_hash();
let sync_height = header.height();
let gc_height = std::cmp::min(head.height + 1, sync_height);

// GC all the data from current tail up to `gc_height`. In case tail points to a height where
// there is no block, we need to make sure that the last block before tail is cleaned.
let tail = self.store.tail()?;
let mut tail_prev_block_cleaned = false;
for height in tail..gc_height {
let blocks_current_height = self
.store
.get_all_block_hashes_by_height(height)?
.values()
.flatten()
.cloned()
.collect::<Vec<_>>();
for block_hash in blocks_current_height {
let epoch_manager = self.epoch_manager.clone();
let mut chain_store_update = self.mut_store().store_update();
if !tail_prev_block_cleaned {
let prev_block_hash =
*chain_store_update.get_block_header(&block_hash)?.prev_hash();
if chain_store_update.get_block(&prev_block_hash).is_ok() {
chain_store_update.clear_block_data(
epoch_manager.as_ref(),
prev_block_hash,
GCMode::StateSync { clear_block_info: true },
)?;
}
tail_prev_block_cleaned = true;
}
chain_store_update.clear_block_data(
epoch_manager.as_ref(),
block_hash,
GCMode::StateSync { clear_block_info: block_hash != prev_hash },
)?;
chain_store_update.commit()?;
}
}

// Clear Chunks data
let mut chain_store_update = self.mut_store().store_update();
// The largest height of chunk we have in storage is head.height + 1
let chunk_height = std::cmp::min(head.height + 2, sync_height);
chain_store_update.clear_chunk_data_and_headers(chunk_height)?;
chain_store_update.commit()?;

// clear all trie data

let tries = self.runtime_adapter.get_tries();
let mut chain_store_update = self.mut_store().store_update();
let mut store_update = tries.store_update();
store_update.delete_all(DBCol::State);
chain_store_update.merge(store_update);

// The reason to reset tail here is not to allow Tail be greater than Head
chain_store_update.reset_tail();
chain_store_update.commit()?;
Ok(())
}

/// Set the new head after state sync was completed if it is indeed newer.
/// Check for potentially unlocked orphans after this update.
pub fn reset_heads_post_state_sync(
Expand Down
Loading
Loading