Skip to content

Commit

Permalink
feat(Store Validator): move check_refcount_map into Store Validator
Browse files Browse the repository at this point in the history
  • Loading branch information
Kouprin committed Jul 4, 2020
1 parent 0b65f75 commit 21053ce
Show file tree
Hide file tree
Showing 12 changed files with 188 additions and 323 deletions.
74 changes: 0 additions & 74 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3554,77 +3554,3 @@ pub fn collect_receipts_from_response(
receipt_proof_response.iter().flat_map(|ReceiptProofResponse(_, proofs)| proofs),
)
}

// Used in testing only
pub fn check_refcount_map(chain: &mut Chain) -> Result<(), Error> {
let head = chain.head()?;
let mut block_refcounts = HashMap::new();
// TODO #2352: make sure there is no block with height > head.height and set highest_height to `head.height`
let highest_height = head.height + 100;
for height in chain.store().get_genesis_height() + 1..=highest_height {
let blocks_current_height = match chain.mut_store().get_all_block_hashes_by_height(height) {
Ok(blocks_current_height) => {
blocks_current_height.values().flatten().cloned().collect()
}
_ => vec![],
};
for block_hash in blocks_current_height.iter() {
if let Ok(prev_hash) =
chain.get_block(&block_hash).map(|block| *block.header().prev_hash())
{
*block_refcounts.entry(prev_hash).or_insert(0) += 1;
}
// This is temporary workaround to ignore all blocks with height >= highest_height
// TODO #2352: remove `if` and keep only `block_refcounts.entry(*block_hash).or_insert(0)`
if height < highest_height {
block_refcounts.entry(*block_hash).or_insert(0);
}
}
}
let mut chain_store_update = ChainStoreUpdate::new(chain.mut_store());
let mut tail_blocks = 0;
for (block_hash, refcount) in block_refcounts {
let block_refcount = chain_store_update.get_block_refcount(&block_hash)?.clone();
match chain_store_update.get_block(&block_hash) {
Ok(_) => {
if block_refcount != refcount {
return Err(ErrorKind::GCError(format!(
"invalid number of references in Block {:?}, expected {:?}, found {:?}",
block_hash, refcount, block_refcount
))
.into());
}
}
Err(e) => {
if block_refcount != 0 {
// May be the tail block
if block_refcount != refcount {
return Err(ErrorKind::GCError(format!(
"invalid number of references in deleted Block {:?}, expected {:?}, found {:?}; get_block failed: {:?}",
block_hash, refcount, block_refcount, e
))
.into());
}
}
if refcount >= 2 {
return Err(ErrorKind::GCError(format!(
"Block {:?} expected to be deleted, found {:?} references instead; get_block failed: {:?}",
block_hash, refcount, e
))
.into());
} else if refcount == 1 {
// If Block `block_hash` is successfully GCed, the only its descendant is alive.
tail_blocks += 1;
}
}
}
}
if tail_blocks >= 2 {
return Err(ErrorKind::GCError(format!(
"There are {:?} tail blocks found, expected no more than 1",
tail_blocks,
))
.into());
}
Ok(())
}
2 changes: 1 addition & 1 deletion chain/chain/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[macro_use]
extern crate lazy_static;

pub use chain::{check_refcount_map, collect_receipts, Chain, ChainGenesis, MAX_ORPHAN_SIZE};
pub use chain::{collect_receipts, Chain, ChainGenesis, MAX_ORPHAN_SIZE};
pub use doomslug::{Doomslug, DoomslugBlockProductionReadiness, DoomslugThresholdMode};
pub use error::{Error, ErrorKind};
pub use lightclient::create_light_client_block_view;
Expand Down
95 changes: 45 additions & 50 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ pub struct ChainStore {
last_block_with_new_chunk: SizedCache<Vec<u8>, CryptoHash>,
/// Transactions
transactions: SizedCache<Vec<u8>, SignedTransaction>,
/// Cache with height to hash on any chain.
/// Cache with Block Refcounts
block_refcounts: SizedCache<Vec<u8>, u64>,
/// Cache of block hash -> block merkle tree at the current block
block_merkle_tree: SizedCache<Vec<u8>, PartialMerkleTree>,
Expand Down Expand Up @@ -1092,6 +1092,7 @@ struct ChainStoreCacheUpdate {
next_block_with_new_chunk: HashMap<(CryptoHash, ShardId), CryptoHash>,
last_block_with_new_chunk: HashMap<ShardId, CryptoHash>,
transactions: HashSet<SignedTransaction>,
tx_refcounts: HashMap<CryptoHash, u64>,
block_refcounts: HashMap<CryptoHash, u64>,
block_merkle_tree: HashMap<CryptoHash, PartialMerkleTree>,
block_ordinal_to_hash: HashMap<NumBlocks, CryptoHash>,
Expand Down Expand Up @@ -1328,7 +1329,11 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {
}

fn get_tx_refcount(&mut self, tx_hash: &CryptoHash) -> Result<u64, Error> {
self.chain_store.get_tx_refcount(tx_hash)
if let Some(refcount) = self.chain_store_cache_update.tx_refcounts.get(tx_hash) {
Ok(*refcount)
} else {
self.chain_store.get_tx_refcount(tx_hash)
}
}

fn get_block_refcount(&mut self, block_hash: &CryptoHash) -> Result<&u64, Error> {
Expand Down Expand Up @@ -2154,17 +2159,12 @@ impl<'a> ChainStoreUpdate<'a> {
}

pub fn gc_col_transaction(&mut self, tx_hash: CryptoHash) -> Result<(), Error> {
let mut refs = self.get_tx_refcount(&tx_hash)?;
debug_assert!(refs > 0);
refs = refs.saturating_sub(1);
if refs == 0 {
let mut refcount = self.get_tx_refcount(&tx_hash)?;
refcount -= 1;
self.chain_store_cache_update.tx_refcounts.insert(tx_hash, refcount);
if refcount == 0 {
self.gc_col(ColTransactionRefCount, &tx_hash.into());
self.gc_col(ColTransactions, &tx_hash.into());
} else {
let mut store_update = self.store().store_update();
store_update.set_ser(ColTransactionRefCount, &tx_hash.as_ref(), &refs)?;
self.inc_gc(ColTransactionRefCount);
self.merge(store_update);
}
Ok(())
}
Expand Down Expand Up @@ -2340,43 +2340,44 @@ impl<'a> ChainStoreUpdate<'a> {
map.entry(block.header().epoch_id().clone())
.or_insert_with(|| HashSet::new())
.insert(*hash);
store_update
.set_ser(ColBlockPerHeight, &index_to_bytes(block.header().height()), &map)
.map_err::<Error, _>(|e| e.into())?;
store_update.set_ser(
ColBlockPerHeight,
&index_to_bytes(block.header().height()),
&map,
)?;
self.chain_store_cache_update
.block_hash_per_height
.insert(block.header().height(), map);
store_update
.set_ser(ColBlock, hash.as_ref(), block)
.map_err::<Error, _>(|e| e.into())?;
store_update.set_ser(ColBlock, hash.as_ref(), block)?;
}
for (hash, header) in self.chain_store_cache_update.headers.iter() {
store_update
.set_ser(ColBlockHeader, hash.as_ref(), header)
.map_err::<Error, _>(|e| e.into())?;
store_update.set_ser(ColBlockHeader, hash.as_ref(), header)?;
}
for ((block_hash, shard_id), chunk_extra) in
self.chain_store_cache_update.chunk_extras.iter()
{
store_update
.set_ser(ColChunkExtra, &get_block_shard_id(block_hash, *shard_id), chunk_extra)
.map_err::<Error, _>(|e| e.into())?;
store_update.set_ser(
ColChunkExtra,
&get_block_shard_id(block_hash, *shard_id),
chunk_extra,
)?;
}
for (block_hash, block_extra) in self.chain_store_cache_update.block_extras.iter() {
store_update
.set_ser(ColBlockExtra, block_hash.as_ref(), block_extra)
.map_err::<Error, _>(|e| e.into())?;
store_update.set_ser(ColBlockExtra, block_hash.as_ref(), block_extra)?;
}
for ((height, shard_id), chunk_hash) in
self.chain_store_cache_update.chunk_hash_per_height_shard.iter()
{
let key = get_height_shard_id(*height, *shard_id);
store_update
.set_ser(ColChunkPerHeightShard, &key, chunk_hash)
.map_err::<Error, _>(|e| e.into())?;
store_update.set_ser(ColChunkPerHeightShard, &key, chunk_hash)?;
}
let mut chunk_hashes_by_height: HashMap<BlockHeight, HashSet<ChunkHash>> = HashMap::new();
for (chunk_hash, chunk) in self.chain_store_cache_update.chunks.iter() {
if self.chain_store.get_chunk(chunk_hash).is_ok() {
// No need to add same Chunk once again
continue;
}

match chunk_hashes_by_height.entry(chunk.header.inner.height_created) {
Entry::Occupied(mut entry) => {
entry.get_mut().insert(chunk_hash.clone());
Expand All @@ -2393,32 +2394,25 @@ impl<'a> ChainStoreUpdate<'a> {
entry.insert(hash_set);
}
};

// Increase transaction refcounts for all included txs
for tx in chunk.transactions.iter() {
let mut refs = self.chain_store.get_tx_refcount(&tx.get_hash())?;
refs += 1;
store_update.set_ser(ColTransactionRefCount, tx.get_hash().as_ref(), &refs)?;
let mut refcount = self.chain_store.get_tx_refcount(&tx.get_hash())?;
refcount += 1;
store_update.set_ser(ColTransactionRefCount, tx.get_hash().as_ref(), &refcount)?;
}

store_update
.set_ser(ColChunks, chunk_hash.as_ref(), chunk)
.map_err::<Error, _>(|e| e.into())?;
store_update.set_ser(ColChunks, chunk_hash.as_ref(), chunk)?;
}
for (height, hash_set) in chunk_hashes_by_height {
store_update
.set_ser(ColChunkHashesByHeight, &index_to_bytes(height), &hash_set)
.map_err::<Error, _>(|e| e.into())?;
store_update.set_ser(ColChunkHashesByHeight, &index_to_bytes(height), &hash_set)?;
}
for (chunk_hash, partial_chunk) in self.chain_store_cache_update.partial_chunks.iter() {
store_update
.set_ser(ColPartialChunks, chunk_hash.as_ref(), partial_chunk)
.map_err::<Error, _>(|e| e.into())?;
store_update.set_ser(ColPartialChunks, chunk_hash.as_ref(), partial_chunk)?;
}
for (height, hash) in self.chain_store_cache_update.height_to_hashes.iter() {
if let Some(hash) = hash {
store_update
.set_ser(ColBlockHeight, &index_to_bytes(*height), hash)
.map_err::<Error, _>(|e| e.into())?;
store_update.set_ser(ColBlockHeight, &index_to_bytes(*height), hash)?;
} else {
store_update.delete(ColBlockHeight, &index_to_bytes(*height));
}
Expand Down Expand Up @@ -2496,6 +2490,13 @@ impl<'a> ChainStoreUpdate<'a> {
for transaction in self.chain_store_cache_update.transactions.iter() {
store_update.set_ser(ColTransactions, transaction.get_hash().as_ref(), transaction)?;
}
for (tx_hash, refcount) in self.chain_store_cache_update.tx_refcounts.drain() {
// tx_refcounts cache is used in GC only.
// While increasing, we write to the storage directly because we add no transaction twice.
if refcount > 0 {
store_update.set_ser(ColTransactionRefCount, &tx_hash.as_ref(), &refcount)?;
}
}
for (block_hash, refcount) in self.chain_store_cache_update.block_refcounts.iter() {
store_update.set_ser(ColBlockRefCount, block_hash.as_ref(), refcount)?;
}
Expand Down Expand Up @@ -2750,7 +2751,6 @@ mod tests {
use near_primitives::validator_signer::InMemoryValidatorSigner;
use near_store::test_utils::create_test_store;

use crate::chain::check_refcount_map;
use crate::store::{ChainStoreAccess, GCMode};
use crate::test_utils::KeyValueRuntime;
use crate::{Chain, ChainGenesis, DoomslugThresholdMode};
Expand Down Expand Up @@ -3003,7 +3003,6 @@ mod tests {
prev_block = block.clone();
}

assert!(check_refcount_map(&mut chain).is_ok());
chain.epoch_length = 1;
let trie = chain.runtime_adapter.get_tries();
assert!(chain.clear_data(trie, 100).is_ok());
Expand All @@ -3024,7 +3023,6 @@ mod tests {
assert!(chain.mut_store().get_all_block_hashes_by_height(i as BlockHeight).is_ok());
}
}
assert!(check_refcount_map(&mut chain).is_ok());

let gced_cols = [
DBCol::ColBlock,
Expand Down Expand Up @@ -3098,7 +3096,6 @@ mod tests {

prev_block = block.clone();
}
assert!(check_refcount_map(&mut chain).is_ok());

assert!(chain.get_block(&blocks[4].hash()).is_ok());
assert!(chain.get_block(&blocks[5].hash()).is_ok());
Expand Down Expand Up @@ -3192,7 +3189,6 @@ mod tests {
prev_block = block.clone();
}

assert!(check_refcount_map(&mut chain).is_ok());
let trie = chain.runtime_adapter.get_tries();

for iter in 0..10 {
Expand All @@ -3217,7 +3213,6 @@ mod tests {
.is_ok());
}
}
assert!(check_refcount_map(&mut chain).is_ok());
let mut genesis = GenesisConfig::default();
genesis.genesis_height = 0;
let mut store_validator = StoreValidator::new(
Expand Down
Loading

0 comments on commit 21053ce

Please sign in to comment.