Skip to content

Commit

Permalink
fix: always have partial encoded chunk when chunk exists (#3344)
Browse files Browse the repository at this point in the history
Since #3036, nodes will persist all the partial encoded chunk parts for the shard they track. However, on mainnet there are quite a few blocks produced before the PR landed and therefore we need to do a migration for archival nodes that contain all the historical data.

Test plan
----------
Tested the migration on a mainnet archival node.
  • Loading branch information
bowenwang1996 authored Sep 21, 2020
1 parent 8f8dfcc commit b4e58dd
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 21 deletions.
6 changes: 3 additions & 3 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::lightclient::get_epoch_block_producers_view;
use crate::store::{ChainStore, ChainStoreAccess, ChainStoreUpdate, GCMode};
use crate::types::{
AcceptedBlock, ApplyTransactionResult, Block, BlockEconomicsConfig, BlockHeader,
BlockHeaderInfo, BlockStatus, ChainGenesis, Provenance, ReceiptList, RuntimeAdapter,
BlockHeaderInfo, BlockStatus, ChainGenesis, Provenance, RuntimeAdapter,
};
use crate::validate::{
validate_challenge, validate_chunk_proofs, validate_chunk_with_chunk_extra,
Expand All @@ -34,8 +34,8 @@ use near_primitives::merkle::{
};
use near_primitives::receipt::Receipt;
use near_primitives::sharding::{
ChunkHash, ChunkHashHeight, ReceiptProof, ShardChunk, ShardChunkHeader, ShardInfo, ShardProof,
StateSyncInfo,
ChunkHash, ChunkHashHeight, ReceiptList, ReceiptProof, ShardChunk, ShardChunkHeader, ShardInfo,
ShardProof, StateSyncInfo,
};
use near_primitives::syncing::{
get_num_state_parts, ReceiptProofResponse, ReceiptResponse, RootProof,
Expand Down
2 changes: 1 addition & 1 deletion chain/chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1130,11 +1130,11 @@ impl ChainGenesis {
#[cfg(test)]
mod test {
use super::KeyValueRuntime;
use crate::types::ReceiptList;
use crate::RuntimeAdapter;
use borsh::BorshSerialize;
use near_primitives::hash::{hash, CryptoHash};
use near_primitives::receipt::Receipt;
use near_primitives::sharding::ReceiptList;
use near_primitives::types::NumShards;
use near_store::test_utils::create_test_store;
use rand::Rng;
Expand Down
5 changes: 1 addition & 4 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use near_primitives::errors::InvalidTxError;
use near_primitives::hash::{hash, CryptoHash};
use near_primitives::merkle::{merklize, MerklePath};
use near_primitives::receipt::Receipt;
use near_primitives::sharding::ShardChunkHeader;
use near_primitives::sharding::{ReceiptList, ShardChunkHeader};
use near_primitives::transaction::{ExecutionOutcomeWithId, SignedTransaction};
use near_primitives::types::{
AccountId, ApprovalStake, Balance, BlockHeight, BlockHeightDelta, EpochId, Gas, MerkleHash,
Expand Down Expand Up @@ -586,9 +586,6 @@ pub trait RuntimeAdapter: Send + Sync {
}
}

#[derive(BorshSerialize, Serialize, Debug, Clone)]
pub struct ReceiptList<'a>(pub ShardId, pub &'a Vec<Receipt>);

/// The last known / checked height and time when we have processed it.
/// Required to keep track of skipped blocks and not fallback to produce blocks at lower height.
#[derive(BorshSerialize, BorshDeserialize, Serialize, Debug, Clone, Default)]
Expand Down
5 changes: 2 additions & 3 deletions chain/chunks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use near_primitives::hash::{hash, CryptoHash};
use near_primitives::merkle::{merklize, verify_path, MerklePath};
use near_primitives::receipt::Receipt;
use near_primitives::sharding::{
ChunkHash, EncodedShardChunk, PartialEncodedChunk, PartialEncodedChunkPart, ReceiptProof,
ReedSolomonWrapper, ShardChunkHeader, ShardChunkHeaderInner, ShardProof,
ChunkHash, EncodedShardChunk, PartialEncodedChunk, PartialEncodedChunkPart, ReceiptList,
ReceiptProof, ReedSolomonWrapper, ShardChunkHeader, ShardChunkHeaderInner, ShardProof,
};
use near_primitives::transaction::SignedTransaction;
use near_primitives::types::{
Expand All @@ -36,7 +36,6 @@ use near_primitives::validator_signer::ValidatorSigner;

use crate::chunk_cache::{EncodedChunksCache, EncodedChunksCacheEntry};
pub use crate::types::Error;
use near_chain::types::ReceiptList;

mod chunk_cache;
pub mod test_utils;
Expand Down
3 changes: 3 additions & 0 deletions core/primitives/src/sharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ impl EncodedShardChunkBody {
}
}

#[derive(BorshSerialize, Serialize, Debug, Clone)]
pub struct ReceiptList<'a>(pub ShardId, pub &'a Vec<Receipt>);

#[derive(BorshSerialize, BorshDeserialize, Serialize)]
struct TransactionReceipt(Vec<SignedTransaction>, Vec<Receipt>);

Expand Down
2 changes: 1 addition & 1 deletion core/primitives/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct Version {
pub type DbVersion = u32;

/// Current version of the database.
pub const DB_VERSION: DbVersion = 9;
pub const DB_VERSION: DbVersion = 10;

/// Protocol version type.
pub type ProtocolVersion = u32;
Expand Down
87 changes: 83 additions & 4 deletions core/store/src/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,26 @@ use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use borsh::BorshDeserialize;
use borsh::{BorshDeserialize, BorshSerialize};

use near_primitives::hash::CryptoHash;
use near_primitives::sharding::ShardChunk;
use near_primitives::hash::{hash, CryptoHash};
use near_primitives::sharding::{
EncodedShardChunk, PartialEncodedChunk, ReceiptList, ReceiptProof, ReedSolomonWrapper,
ShardChunk, ShardProof,
};
use near_primitives::transaction::ExecutionOutcomeWithIdAndProof;
use near_primitives::version::DbVersion;

use crate::db::DBCol::ColStateParts;
use crate::db::DBCol::{ColChunks, ColPartialChunks, ColStateParts};
use crate::db::{DBCol, RocksDB, VERSION_KEY};
use crate::migrations::v6_to_v7::{
col_state_refcount_8byte, migrate_col_transaction_refcount, migrate_receipts_refcount,
};
use crate::migrations::v8_to_v9::{repair_col_receipt_id_to_shard_id, repair_col_transactions};
use crate::{create_store, Store, StoreUpdate};
use near_crypto::KeyType;
use near_primitives::merkle::merklize;
use near_primitives::validator_signer::InMemoryValidatorSigner;

pub mod v6_to_v7;
pub mod v8_to_v9;
Expand Down Expand Up @@ -125,3 +131,76 @@ pub fn migrate_8_to_9(path: &String) {
repair_col_receipt_id_to_shard_id(&store);
set_store_version(&store, 9);
}

pub fn migrate_9_to_10(path: &String, is_archival: bool) {
let store = create_store(path);
if is_archival {
// Hard code the number of parts there. These numbers are only used for this migration.
let num_total_parts = 100;
let num_data_parts = (num_total_parts - 1) / 3;
let num_parity_parts = num_total_parts - num_data_parts;
let mut rs = ReedSolomonWrapper::new(num_data_parts, num_parity_parts);
let signer = InMemoryValidatorSigner::from_seed("test", KeyType::ED25519, "test");
let mut store_update = store.store_update();
let batch_size_limit = 10_000_000;
let mut batch_size = 0;
for (key, value) in store.iter_without_rc_logic(ColChunks) {
if let Ok(Some(partial_chunk)) =
store.get_ser::<PartialEncodedChunk>(ColPartialChunks, &key)
{
if partial_chunk.parts.len() == num_total_parts {
continue;
}
}
batch_size += key.len() + value.len() + 8;
let chunk: ShardChunk = BorshDeserialize::try_from_slice(&value)
.expect("Borsh deserialization should not fail");
let ShardChunk { chunk_hash, header, transactions, receipts } = chunk;
let (mut encoded_chunk, merkle_paths) = EncodedShardChunk::new(
header.inner.prev_block_hash,
header.inner.prev_state_root,
header.inner.outcome_root,
header.inner.height_created,
header.inner.shard_id,
&mut rs,
header.inner.gas_used,
header.inner.gas_limit,
header.inner.balance_burnt,
header.inner.tx_root,
header.inner.validator_proposals.clone(),
transactions,
&receipts,
header.inner.outgoing_receipts_root,
&signer,
)
.expect("create encoded chunk should not fail");
encoded_chunk.header = header;
let outgoing_receipt_hashes =
vec![hash(&ReceiptList(0, &receipts).try_to_vec().unwrap())];
let (_, outgoing_receipt_proof) = merklize(&outgoing_receipt_hashes);

let partial_encoded_chunk = encoded_chunk.create_partial_encoded_chunk(
(0..num_total_parts as u64).collect(),
vec![ReceiptProof(
receipts,
ShardProof {
from_shard_id: 0,
to_shard_id: 0,
proof: outgoing_receipt_proof[0].clone(),
},
)],
&merkle_paths,
);
store_update
.set_ser(ColPartialChunks, chunk_hash.as_ref(), &partial_encoded_chunk)
.expect("storage update should not fail");
if batch_size > batch_size_limit {
store_update.commit().expect("storage update should not fail");
store_update = store.store_update();
batch_size = 0;
}
}
store_update.commit().expect("storage update should not fail");
}
set_store_version(&store, 10);
}
16 changes: 11 additions & 5 deletions neard/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use near_network::{NetworkRecipient, PeerManagerActor};
use near_rosetta_rpc::start_rosetta_rpc;
use near_store::migrations::{
fill_col_outcomes_by_hash, fill_col_transaction_refcount, get_store_version, migrate_6_to_7,
migrate_7_to_8, migrate_8_to_9, set_store_version,
migrate_7_to_8, migrate_8_to_9, migrate_9_to_10, set_store_version,
};
use near_store::{create_store, Store};
use near_telemetry::TelemetryActor;
Expand Down Expand Up @@ -60,7 +60,7 @@ pub fn get_default_home() -> String {
}

/// Function checks current version of the database and applies migrations to the database.
pub fn apply_store_migrations(path: &String) {
pub fn apply_store_migrations(path: &String, is_archival: bool) {
let db_version = get_store_version(path);
if db_version > near_primitives::version::DB_VERSION {
error!(target: "near", "DB version {} is created by a newer version of neard, please update neard or delete data", db_version);
Expand Down Expand Up @@ -129,16 +129,22 @@ pub fn apply_store_migrations(path: &String) {
// Repair `ColTransactions`, `ColReceiptIdToShardId`
migrate_8_to_9(path);
}
if db_version <= 9 {
info!(target: "near", "Migrate DB from version 9 to 10");
// version 9 => 10;
// populate partial encoded chunks for chunks that exist in storage
migrate_9_to_10(path, is_archival);
}

let db_version = get_store_version(path);
debug_assert_eq!(db_version, near_primitives::version::DB_VERSION);
}

pub fn init_and_migrate_store(home_dir: &Path) -> Arc<Store> {
pub fn init_and_migrate_store(home_dir: &Path, is_archival: bool) -> Arc<Store> {
let path = get_store_path(home_dir);
let store_exists = store_path_exists(&path);
if store_exists {
apply_store_migrations(&path);
apply_store_migrations(&path, is_archival);
}
let store = create_store(&path);
if !store_exists {
Expand All @@ -151,7 +157,7 @@ pub fn start_with_config(
home_dir: &Path,
config: NearConfig,
) -> (Addr<ClientActor>, Addr<ViewClientActor>, Vec<Arbiter>) {
let store = init_and_migrate_store(home_dir);
let store = init_and_migrate_store(home_dir, config.client_config.archive);
near_actix_utils::init_stop_on_panic();

let runtime = Arc::new(NightshadeRuntime::new(
Expand Down

0 comments on commit b4e58dd

Please sign in to comment.