From b4e58dd0edf0694ecac7ded8e6acee36512f0487 Mon Sep 17 00:00:00 2001 From: Bowen Wang Date: Sun, 20 Sep 2020 17:27:31 -0700 Subject: [PATCH] fix: always have partial encoded chunk when chunk exists (#3344) Since #3036, nodes will persist all the partial encoded chunk parts for the shard they track. However, on mainnet there are quite a few blocks produced before the PR landed and therefore we need to do a migration for archival nodes that contain all the historical data. Test plan ---------- Tested the migration on a mainnet archival node. --- chain/chain/src/chain.rs | 6 +-- chain/chain/src/test_utils.rs | 2 +- chain/chain/src/types.rs | 5 +- chain/chunks/src/lib.rs | 5 +- core/primitives/src/sharding.rs | 3 ++ core/primitives/src/version.rs | 2 +- core/store/src/migrations.rs | 87 +++++++++++++++++++++++++++++++-- neard/src/lib.rs | 16 ++++-- 8 files changed, 105 insertions(+), 21 deletions(-) diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 2cedb27727a..8f7809e3a00 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -15,7 +15,7 @@ use crate::lightclient::get_epoch_block_producers_view; use crate::store::{ChainStore, ChainStoreAccess, ChainStoreUpdate, GCMode}; use crate::types::{ AcceptedBlock, ApplyTransactionResult, Block, BlockEconomicsConfig, BlockHeader, - BlockHeaderInfo, BlockStatus, ChainGenesis, Provenance, ReceiptList, RuntimeAdapter, + BlockHeaderInfo, BlockStatus, ChainGenesis, Provenance, RuntimeAdapter, }; use crate::validate::{ validate_challenge, validate_chunk_proofs, validate_chunk_with_chunk_extra, @@ -34,8 +34,8 @@ use near_primitives::merkle::{ }; use near_primitives::receipt::Receipt; use near_primitives::sharding::{ - ChunkHash, ChunkHashHeight, ReceiptProof, ShardChunk, ShardChunkHeader, ShardInfo, ShardProof, - StateSyncInfo, + ChunkHash, ChunkHashHeight, ReceiptList, ReceiptProof, ShardChunk, ShardChunkHeader, ShardInfo, + ShardProof, StateSyncInfo, }; use near_primitives::syncing::{ get_num_state_parts, ReceiptProofResponse, ReceiptResponse, RootProof, diff --git a/chain/chain/src/test_utils.rs b/chain/chain/src/test_utils.rs index 379a38902d8..909e715dbad 100644 --- a/chain/chain/src/test_utils.rs +++ b/chain/chain/src/test_utils.rs @@ -1130,11 +1130,11 @@ impl ChainGenesis { #[cfg(test)] mod test { use super::KeyValueRuntime; - use crate::types::ReceiptList; use crate::RuntimeAdapter; use borsh::BorshSerialize; use near_primitives::hash::{hash, CryptoHash}; use near_primitives::receipt::Receipt; + use near_primitives::sharding::ReceiptList; use near_primitives::types::NumShards; use near_store::test_utils::create_test_store; use rand::Rng; diff --git a/chain/chain/src/types.rs b/chain/chain/src/types.rs index a6a245e0d07..b3955dcad29 100644 --- a/chain/chain/src/types.rs +++ b/chain/chain/src/types.rs @@ -13,7 +13,7 @@ use near_primitives::errors::InvalidTxError; use near_primitives::hash::{hash, CryptoHash}; use near_primitives::merkle::{merklize, MerklePath}; use near_primitives::receipt::Receipt; -use near_primitives::sharding::ShardChunkHeader; +use near_primitives::sharding::{ReceiptList, ShardChunkHeader}; use near_primitives::transaction::{ExecutionOutcomeWithId, SignedTransaction}; use near_primitives::types::{ AccountId, ApprovalStake, Balance, BlockHeight, BlockHeightDelta, EpochId, Gas, MerkleHash, @@ -586,9 +586,6 @@ pub trait RuntimeAdapter: Send + Sync { } } -#[derive(BorshSerialize, Serialize, Debug, Clone)] -pub struct ReceiptList<'a>(pub ShardId, pub &'a Vec); - /// The last known / checked height and time when we have processed it. /// Required to keep track of skipped blocks and not fallback to produce blocks at lower height. #[derive(BorshSerialize, BorshDeserialize, Serialize, Debug, Clone, Default)] diff --git a/chain/chunks/src/lib.rs b/chain/chunks/src/lib.rs index ef07b3559e4..55b21da5060 100644 --- a/chain/chunks/src/lib.rs +++ b/chain/chunks/src/lib.rs @@ -23,8 +23,8 @@ use near_primitives::hash::{hash, CryptoHash}; use near_primitives::merkle::{merklize, verify_path, MerklePath}; use near_primitives::receipt::Receipt; use near_primitives::sharding::{ - ChunkHash, EncodedShardChunk, PartialEncodedChunk, PartialEncodedChunkPart, ReceiptProof, - ReedSolomonWrapper, ShardChunkHeader, ShardChunkHeaderInner, ShardProof, + ChunkHash, EncodedShardChunk, PartialEncodedChunk, PartialEncodedChunkPart, ReceiptList, + ReceiptProof, ReedSolomonWrapper, ShardChunkHeader, ShardChunkHeaderInner, ShardProof, }; use near_primitives::transaction::SignedTransaction; use near_primitives::types::{ @@ -36,7 +36,6 @@ use near_primitives::validator_signer::ValidatorSigner; use crate::chunk_cache::{EncodedChunksCache, EncodedChunksCacheEntry}; pub use crate::types::Error; -use near_chain::types::ReceiptList; mod chunk_cache; pub mod test_utils; diff --git a/core/primitives/src/sharding.rs b/core/primitives/src/sharding.rs index 618d5d0741f..e7c938392f0 100644 --- a/core/primitives/src/sharding.rs +++ b/core/primitives/src/sharding.rs @@ -229,6 +229,9 @@ impl EncodedShardChunkBody { } } +#[derive(BorshSerialize, Serialize, Debug, Clone)] +pub struct ReceiptList<'a>(pub ShardId, pub &'a Vec); + #[derive(BorshSerialize, BorshDeserialize, Serialize)] struct TransactionReceipt(Vec, Vec); diff --git a/core/primitives/src/version.rs b/core/primitives/src/version.rs index bdcd77a3a84..1e31f7a7d57 100644 --- a/core/primitives/src/version.rs +++ b/core/primitives/src/version.rs @@ -12,7 +12,7 @@ pub struct Version { pub type DbVersion = u32; /// Current version of the database. -pub const DB_VERSION: DbVersion = 9; +pub const DB_VERSION: DbVersion = 10; /// Protocol version type. pub type ProtocolVersion = u32; diff --git a/core/store/src/migrations.rs b/core/store/src/migrations.rs index 23c41a124e0..73c2abb5c5d 100644 --- a/core/store/src/migrations.rs +++ b/core/store/src/migrations.rs @@ -2,20 +2,26 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use borsh::BorshDeserialize; +use borsh::{BorshDeserialize, BorshSerialize}; -use near_primitives::hash::CryptoHash; -use near_primitives::sharding::ShardChunk; +use near_primitives::hash::{hash, CryptoHash}; +use near_primitives::sharding::{ + EncodedShardChunk, PartialEncodedChunk, ReceiptList, ReceiptProof, ReedSolomonWrapper, + ShardChunk, ShardProof, +}; use near_primitives::transaction::ExecutionOutcomeWithIdAndProof; use near_primitives::version::DbVersion; -use crate::db::DBCol::ColStateParts; +use crate::db::DBCol::{ColChunks, ColPartialChunks, ColStateParts}; use crate::db::{DBCol, RocksDB, VERSION_KEY}; use crate::migrations::v6_to_v7::{ col_state_refcount_8byte, migrate_col_transaction_refcount, migrate_receipts_refcount, }; use crate::migrations::v8_to_v9::{repair_col_receipt_id_to_shard_id, repair_col_transactions}; use crate::{create_store, Store, StoreUpdate}; +use near_crypto::KeyType; +use near_primitives::merkle::merklize; +use near_primitives::validator_signer::InMemoryValidatorSigner; pub mod v6_to_v7; pub mod v8_to_v9; @@ -125,3 +131,76 @@ pub fn migrate_8_to_9(path: &String) { repair_col_receipt_id_to_shard_id(&store); set_store_version(&store, 9); } + +pub fn migrate_9_to_10(path: &String, is_archival: bool) { + let store = create_store(path); + if is_archival { + // Hard code the number of parts there. These numbers are only used for this migration. + let num_total_parts = 100; + let num_data_parts = (num_total_parts - 1) / 3; + let num_parity_parts = num_total_parts - num_data_parts; + let mut rs = ReedSolomonWrapper::new(num_data_parts, num_parity_parts); + let signer = InMemoryValidatorSigner::from_seed("test", KeyType::ED25519, "test"); + let mut store_update = store.store_update(); + let batch_size_limit = 10_000_000; + let mut batch_size = 0; + for (key, value) in store.iter_without_rc_logic(ColChunks) { + if let Ok(Some(partial_chunk)) = + store.get_ser::(ColPartialChunks, &key) + { + if partial_chunk.parts.len() == num_total_parts { + continue; + } + } + batch_size += key.len() + value.len() + 8; + let chunk: ShardChunk = BorshDeserialize::try_from_slice(&value) + .expect("Borsh deserialization should not fail"); + let ShardChunk { chunk_hash, header, transactions, receipts } = chunk; + let (mut encoded_chunk, merkle_paths) = EncodedShardChunk::new( + header.inner.prev_block_hash, + header.inner.prev_state_root, + header.inner.outcome_root, + header.inner.height_created, + header.inner.shard_id, + &mut rs, + header.inner.gas_used, + header.inner.gas_limit, + header.inner.balance_burnt, + header.inner.tx_root, + header.inner.validator_proposals.clone(), + transactions, + &receipts, + header.inner.outgoing_receipts_root, + &signer, + ) + .expect("create encoded chunk should not fail"); + encoded_chunk.header = header; + let outgoing_receipt_hashes = + vec![hash(&ReceiptList(0, &receipts).try_to_vec().unwrap())]; + let (_, outgoing_receipt_proof) = merklize(&outgoing_receipt_hashes); + + let partial_encoded_chunk = encoded_chunk.create_partial_encoded_chunk( + (0..num_total_parts as u64).collect(), + vec![ReceiptProof( + receipts, + ShardProof { + from_shard_id: 0, + to_shard_id: 0, + proof: outgoing_receipt_proof[0].clone(), + }, + )], + &merkle_paths, + ); + store_update + .set_ser(ColPartialChunks, chunk_hash.as_ref(), &partial_encoded_chunk) + .expect("storage update should not fail"); + if batch_size > batch_size_limit { + store_update.commit().expect("storage update should not fail"); + store_update = store.store_update(); + batch_size = 0; + } + } + store_update.commit().expect("storage update should not fail"); + } + set_store_version(&store, 10); +} diff --git a/neard/src/lib.rs b/neard/src/lib.rs index 130d5bb6412..2ecdde98a98 100644 --- a/neard/src/lib.rs +++ b/neard/src/lib.rs @@ -16,7 +16,7 @@ use near_network::{NetworkRecipient, PeerManagerActor}; use near_rosetta_rpc::start_rosetta_rpc; use near_store::migrations::{ fill_col_outcomes_by_hash, fill_col_transaction_refcount, get_store_version, migrate_6_to_7, - migrate_7_to_8, migrate_8_to_9, set_store_version, + migrate_7_to_8, migrate_8_to_9, migrate_9_to_10, set_store_version, }; use near_store::{create_store, Store}; use near_telemetry::TelemetryActor; @@ -60,7 +60,7 @@ pub fn get_default_home() -> String { } /// Function checks current version of the database and applies migrations to the database. -pub fn apply_store_migrations(path: &String) { +pub fn apply_store_migrations(path: &String, is_archival: bool) { let db_version = get_store_version(path); if db_version > near_primitives::version::DB_VERSION { error!(target: "near", "DB version {} is created by a newer version of neard, please update neard or delete data", db_version); @@ -129,16 +129,22 @@ pub fn apply_store_migrations(path: &String) { // Repair `ColTransactions`, `ColReceiptIdToShardId` migrate_8_to_9(path); } + if db_version <= 9 { + info!(target: "near", "Migrate DB from version 9 to 10"); + // version 9 => 10; + // populate partial encoded chunks for chunks that exist in storage + migrate_9_to_10(path, is_archival); + } let db_version = get_store_version(path); debug_assert_eq!(db_version, near_primitives::version::DB_VERSION); } -pub fn init_and_migrate_store(home_dir: &Path) -> Arc { +pub fn init_and_migrate_store(home_dir: &Path, is_archival: bool) -> Arc { let path = get_store_path(home_dir); let store_exists = store_path_exists(&path); if store_exists { - apply_store_migrations(&path); + apply_store_migrations(&path, is_archival); } let store = create_store(&path); if !store_exists { @@ -151,7 +157,7 @@ pub fn start_with_config( home_dir: &Path, config: NearConfig, ) -> (Addr, Addr, Vec) { - let store = init_and_migrate_store(home_dir); + let store = init_and_migrate_store(home_dir, config.client_config.archive); near_actix_utils::init_stop_on_panic(); let runtime = Arc::new(NightshadeRuntime::new(