From 0c1707522541ebb6df550112d62ca7a53fe64684 Mon Sep 17 00:00:00 2001 From: Nikolay Kurtov Date: Mon, 14 Aug 2023 14:04:50 +0200 Subject: [PATCH 1/7] fix(state-sync): Make partial chunks message deterministic --- chain/chunks/src/lib.rs | 16 ++++++++++++++++ docs/misc/state_sync_from_external_storage.md | 10 ++++++---- pytest/tests/sanity/state_sync_epoch_boundary.py | 2 +- 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/chain/chunks/src/lib.rs b/chain/chunks/src/lib.rs index 710d5c13b52..f103efbd881 100644 --- a/chain/chunks/src/lib.rs +++ b/chain/chunks/src/lib.rs @@ -792,9 +792,25 @@ impl ShardsManager { /// Finds the parts and receipt proofs asked for in the request, and returns a response /// containing whatever was found. See comment for PartialEncodedChunkResponseSource for /// an explanation of that part of the return value. + /// Ensures the receipts in the response are in a deterministic order. fn prepare_partial_encoded_chunk_response( &mut self, request: PartialEncodedChunkRequestMsg, + ) -> (PartialEncodedChunkResponseSource, PartialEncodedChunkResponseMsg) { + let (src, mut response_msg) = self.prepare_partial_encoded_chunk_response_unsorted(request); + // Note that the PartialChunks column is a write-once column, and needs + // the values to be deterministic. + response_msg.receipts.sort_by_key( + |ReceiptProof(_receipt, ShardProof { from_shard_id, to_shard_id, proof: _proof })| { + (*from_shard_id, *to_shard_id) + }, + ); + (src, response_msg) + } + + fn prepare_partial_encoded_chunk_response_unsorted( + &mut self, + request: PartialEncodedChunkRequestMsg, ) -> (PartialEncodedChunkResponseSource, PartialEncodedChunkResponseMsg) { let PartialEncodedChunkRequestMsg { chunk_hash, part_ords, mut tracking_shards } = request; let mut response = PartialEncodedChunkResponseMsg { diff --git a/docs/misc/state_sync_from_external_storage.md b/docs/misc/state_sync_from_external_storage.md index 9ed16b747d1..aafd6eaa8f1 100644 --- a/docs/misc/state_sync_from_external_storage.md +++ b/docs/misc/state_sync_from_external_storage.md @@ -49,9 +49,11 @@ your `config.json` file: } } }, -"state_sync_timeout": { - "secs": 30, - "nanos": 0 +"consensus": { + "state_sync_timeout": { + "secs": 30, + "nanos": 0 + } } ``` @@ -70,7 +72,7 @@ shards that can be downloaded in parallel during state sync. across all shards that can be downloaded in parallel during catchup. Generally, this number should not be higher than `num_concurrent_requests`. Keep it reasonably low to allow the node to process chunks of other shards. -* `state_sync_timeout` determines the max duration of an attempt to download a +* `consensus.state_sync_timeout` determines the max duration of an attempt to download a state part. Setting it too low may cause too many unsuccessful attempts. ### Amazon S3 diff --git a/pytest/tests/sanity/state_sync_epoch_boundary.py b/pytest/tests/sanity/state_sync_epoch_boundary.py index 82eb3440b1c..af60624d826 100755 --- a/pytest/tests/sanity/state_sync_epoch_boundary.py +++ b/pytest/tests/sanity/state_sync_epoch_boundary.py @@ -66,7 +66,7 @@ } }, 'state_sync_enabled': True, - 'state_sync_timeout': { + 'consensus.state_sync_timeout': { 'secs': 0, 'nanos': 500000000 }, From 5435ce80278ef4c865a55c0a0ab0d9469f7dd7d3 Mon Sep 17 00:00:00 2001 From: Nikolay Kurtov Date: Mon, 14 Aug 2023 14:06:21 +0200 Subject: [PATCH 2/7] fix(state-sync): Make partial chunks message deterministic --- pytest/lib/cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pytest/lib/cluster.py b/pytest/lib/cluster.py index f62e83cd501..9be0fa9178a 100644 --- a/pytest/lib/cluster.py +++ b/pytest/lib/cluster.py @@ -815,6 +815,7 @@ def apply_config_changes(node_dir, client_config_change): # when None. allowed_missing_configs = ( 'archive', + 'consensus.state_sync_timeout', 'log_summary_period', 'max_gas_burnt_view', 'rosetta_rpc', @@ -822,7 +823,6 @@ def apply_config_changes(node_dir, client_config_change): 'split_storage', 'state_sync', 'state_sync_enabled', - 'state_sync_timeout', 'store.state_snapshot_enabled', 'tracked_shard_schedule', ) From f2352393916220c394624c9d4612b4bca47761cc Mon Sep 17 00:00:00 2001 From: Nikolay Kurtov Date: Mon, 14 Aug 2023 14:46:11 +0200 Subject: [PATCH 3/7] Test --- chain/chunks/src/lib.rs | 6 +- chain/chunks/src/logic.rs | 4 +- core/primitives/src/sharding.rs | 14 ++ nightly/pytest-sanity.txt | 2 + .../tests/sanity/state_sync_then_catchup.py | 200 ++++++++++++++++++ 5 files changed, 220 insertions(+), 6 deletions(-) create mode 100644 pytest/tests/sanity/state_sync_then_catchup.py diff --git a/chain/chunks/src/lib.rs b/chain/chunks/src/lib.rs index f103efbd881..b734563e322 100644 --- a/chain/chunks/src/lib.rs +++ b/chain/chunks/src/lib.rs @@ -800,11 +800,7 @@ impl ShardsManager { let (src, mut response_msg) = self.prepare_partial_encoded_chunk_response_unsorted(request); // Note that the PartialChunks column is a write-once column, and needs // the values to be deterministic. - response_msg.receipts.sort_by_key( - |ReceiptProof(_receipt, ShardProof { from_shard_id, to_shard_id, proof: _proof })| { - (*from_shard_id, *to_shard_id) - }, - ); + response_msg.receipts.sort(); (src, response_msg) } diff --git a/chain/chunks/src/logic.rs b/chain/chunks/src/logic.rs index ee42c120e2e..8586d3cc810 100644 --- a/chain/chunks/src/logic.rs +++ b/chain/chunks/src/logic.rs @@ -118,13 +118,15 @@ pub fn make_partial_encoded_chunk_from_owned_parts_and_needed_receipts<'a>( }) .cloned() .collect(); - let receipts = receipts + let mut receipts: Vec<_> = receipts .filter(|receipt| { cares_about_shard || need_receipt(prev_block_hash, receipt.1.to_shard_id, me, shard_tracker) }) .cloned() .collect(); + // Make sure the receipts are in a deterministic order. + receipts.sort(); match header.clone() { ShardChunkHeader::V1(header) => { PartialEncodedChunk::V1(PartialEncodedChunkV1 { header, parts, receipts }) diff --git a/core/primitives/src/sharding.rs b/core/primitives/src/sharding.rs index 12bc7026273..13030c78567 100644 --- a/core/primitives/src/sharding.rs +++ b/core/primitives/src/sharding.rs @@ -10,6 +10,7 @@ use borsh::{BorshDeserialize, BorshSerialize}; use near_crypto::Signature; use reed_solomon_erasure::galois_8::{Field, ReedSolomon}; use reed_solomon_erasure::ReconstructShard; +use std::cmp::Ordering; use std::sync::Arc; #[derive( @@ -601,6 +602,19 @@ pub struct ShardProof { /// For each Merkle proof there is a subset of receipts which may be proven. pub struct ReceiptProof(pub Vec, pub ShardProof); +impl PartialOrd for ReceiptProof { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ReceiptProof { + fn cmp(&self, other: &Self) -> Ordering { + (self.1.from_shard_id, self.1.to_shard_id) + .cmp(&(other.1.from_shard_id, other.1.to_shard_id)) + } +} + #[derive(BorshSerialize, BorshDeserialize, Debug, Clone, Eq, PartialEq)] pub struct PartialEncodedChunkPart { pub part_ord: u64, diff --git a/nightly/pytest-sanity.txt b/nightly/pytest-sanity.txt index 02bbac22ad5..e93ffddde45 100644 --- a/nightly/pytest-sanity.txt +++ b/nightly/pytest-sanity.txt @@ -65,6 +65,8 @@ pytest --timeout=120 sanity/block_sync_flat_storage.py pytest --timeout=120 sanity/block_sync_flat_storage.py --features nightly pytest --timeout=240 sanity/state_sync_epoch_boundary.py pytest --timeout=240 sanity/state_sync_epoch_boundary.py --features nightly +pytest --timeout=240 sanity/state_sync_then_catchup.py +pytest --timeout=240 sanity/state_sync_then_catchup.py --features nightly pytest --timeout=240 sanity/validator_switch.py pytest --timeout=240 sanity/validator_switch.py --features nightly pytest --timeout=240 sanity/rpc_state_changes.py diff --git a/pytest/tests/sanity/state_sync_then_catchup.py b/pytest/tests/sanity/state_sync_then_catchup.py new file mode 100644 index 00000000000..4ce7cd255db --- /dev/null +++ b/pytest/tests/sanity/state_sync_then_catchup.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python3 +# Spins up one validating node. +# Spins a non-validating node that tracks some shards and the set of tracked shards changes regularly. +# The node gets stopped, and gets restarted close to an epoch boundary but in a way to trigger epoch sync. +# +# After the state sync the node has to do a catchup. +# +# Note that the test must generate outgoing receipts for most shards almost +# every block in order to crash if creation of partial encoded chunks becomes +# non-deterministic. + +import pathlib +import random +import sys +import tempfile + +sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib')) + +from cluster import init_cluster, spin_up_node, load_config, apply_config_changes +import account +import transaction +import utils + +from configured_logger import logger + +EPOCH_LENGTH = 50 + +state_parts_dir = str(pathlib.Path(tempfile.gettempdir()) / 'state_parts') + +config0 = { + 'enable_multiline_logging': False, + 'gc_num_epochs_to_keep': 100, + 'log_summary_period': { + 'secs': 0, + 'nanos': 100000000 + }, + 'log_summary_style': 'plain', + 'state_sync': { + 'dump': { + 'location': { + 'Filesystem': { + 'root_dir': state_parts_dir + } + }, + 'iteration_delay': { + 'secs': 0, + 'nanos': 100000000 + }, + } + }, + 'store.state_snapshot_enabled': True, + 'tracked_shards': [0], +} +config1 = { + 'enable_multiline_logging': False, + 'gc_num_epochs_to_keep': 100, + 'log_summary_period': { + 'secs': 0, + 'nanos': 100000000 + }, + 'log_summary_style': 'plain', + 'state_sync': { + 'sync': { + 'ExternalStorage': { + 'location': { + 'Filesystem': { + 'root_dir': state_parts_dir + } + } + } + } + }, + 'state_sync_enabled': True, + 'consensus.state_sync_timeout': { + 'secs': 0, + 'nanos': 500000000 + }, + 'tracked_shard_schedule': [[0], [0], [1], [2], [3], [1], [2], [3],], + 'tracked_shards': [], +} +logger.info(f'state_parts_dir: {state_parts_dir}') +logger.info(f'config0: {config0}') +logger.info(f'config1: {config1}') + +config = load_config() +near_root, node_dirs = init_cluster(1, 1, 4, config, + [["epoch_length", EPOCH_LENGTH]], { + 0: config0, + 1: config1 + }) + +boot_node = spin_up_node(config, near_root, node_dirs[0], 0) +logger.info('started boot_node') +node1 = spin_up_node(config, near_root, node_dirs[1], 1, boot_node=boot_node) +logger.info('started node1') + +contract = utils.load_test_contract() + +latest_block_hash = boot_node.get_latest_block().hash_bytes +deploy_contract_tx = transaction.sign_deploy_contract_tx( + boot_node.signer_key, contract, 10, latest_block_hash) +result = boot_node.send_tx_and_wait(deploy_contract_tx, 10) +assert 'result' in result and 'error' not in result, ( + 'Expected "result" and no "error" in response, got: {}'.format(result)) + +latest_block_hash = boot_node.get_latest_block().hash_bytes +deploy_contract_tx = transaction.sign_deploy_contract_tx( + node1.signer_key, contract, 10, latest_block_hash) +result = boot_node.send_tx_and_wait(deploy_contract_tx, 10) +assert 'result' in result and 'error' not in result, ( + 'Expected "result" and no "error" in response, got: {}'.format(result)) + + +def epoch_height(block_height): + if block_height == 0: + return 0 + if block_height <= EPOCH_LENGTH: + # According to the protocol specifications, there are two epochs with height 1. + return "1*" + return int((block_height - 1) / EPOCH_LENGTH) + + +# Generates traffic for all possible shards. +# Assumes that `test0`, `test1`, `near` all belong to different shards. +def random_workload_until(target, nonce, keys, target_node): + last_height = -1 + while True: + nonce += 1 + + last_block = target_node.get_latest_block() + height = last_block.height + if height > target: + break + if height != last_height: + logger.info(f'@{height}, epoch_height: {epoch_height(height)}') + last_height = height + + last_block_hash = boot_node.get_latest_block().hash_bytes + if (len(keys) > 100 and random.random() < 0.2) or len(keys) > 1000: + key = keys[random.randint(0, len(keys) - 1)] + call_function('read', key, nonce, boot_node.signer_key, + last_block_hash) + call_function('read', key, nonce, node1.signer_key, last_block_hash) + elif random.random() < 0.5: + if random.random() < 0.3: + key_from, account_to = boot_node.signer_key, node1.signer_key.account_id + elif random.random() < 0.3: + key_from, account_to = boot_node.signer_key, "near" + elif random.random() < 0.5: + key_from, account_to = node1.signer_key, boot_node.signer_key.account_id + else: + key_from, account_to = node1.signer_key, "near" + payment_tx = transaction.sign_payment_tx(key_from, account_to, 1, + nonce, last_block_hash) + boot_node.send_tx(payment_tx).get('result') + else: + key = random_u64() + keys.append(key) + call_function('write', key, nonce, boot_node.signer_key, + last_block_hash) + call_function('write', key, nonce, node1.signer_key, + last_block_hash) + return (nonce, keys) + + +def random_u64(): + return bytes(random.randint(0, 255) for _ in range(8)) + + +def call_function(op, key, nonce, signer_key, last_block_hash): + if op == 'read': + args = key + fn = 'read_value' + else: + args = key + random_u64() + fn = 'write_key_value' + + tx = transaction.sign_function_call_tx(signer_key, signer_key.account_id, + fn, args, 300 * account.TGAS, 0, + nonce, last_block_hash) + return boot_node.send_tx(tx).get('result') + + +nonce, keys = random_workload_until(EPOCH_LENGTH - 5, 1, [], boot_node) + +node1_height = node1.get_latest_block().height +logger.info(f'node1@{node1_height}') +node1.kill() +logger.info(f'killed node1') + +# Run node0 more to trigger block sync in node1. +nonce, keys = random_workload_until(int(EPOCH_LENGTH * 3), nonce, keys, + boot_node) + +# Node1 is now behind and needs to do header sync and block sync. +node1.start(boot_node=boot_node) +node1_height = node1.get_latest_block().height +logger.info(f'started node1@{node1_height}') + +nonce, keys = random_workload_until(int(EPOCH_LENGTH * 3.7), nonce, keys, node1) From 007a527612160c23b005154b5233559d92b4a781 Mon Sep 17 00:00:00 2001 From: Nikolay Kurtov Date: Mon, 14 Aug 2023 17:12:51 +0200 Subject: [PATCH 4/7] Comments --- core/primitives/src/sharding.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/primitives/src/sharding.rs b/core/primitives/src/sharding.rs index 13030c78567..16c1dd6446c 100644 --- a/core/primitives/src/sharding.rs +++ b/core/primitives/src/sharding.rs @@ -602,6 +602,8 @@ pub struct ShardProof { /// For each Merkle proof there is a subset of receipts which may be proven. pub struct ReceiptProof(pub Vec, pub ShardProof); +// Implement ordering to ensure `ReceiptProofs` are ordered consistently, +// because we expect messages with ReceiptProofs to be deterministic. impl PartialOrd for ReceiptProof { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) From 3386fb1bad1ec25f077e2b1d5edb72f7f4f3603e Mon Sep 17 00:00:00 2001 From: Shreyan Gupta Date: Mon, 14 Aug 2023 14:35:19 -0700 Subject: [PATCH 5/7] [resharding] Include state_changes in ApplySplitStateResult for split_state (#9419) For more context on the change, please look at https://github.com/near/nearcore/issues/9420 and https://github.com/near/nearcore/issues/9418 --- chain/chain/src/resharding.rs | 13 +++++---- core/primitives/src/types.rs | 2 +- core/store/src/trie/shard_tries.rs | 17 ++++++------ core/store/src/trie/split_state.rs | 24 ++++++++-------- nearcore/src/runtime/mod.rs | 44 +++++++++++++++++++----------- 5 files changed, 57 insertions(+), 43 deletions(-) diff --git a/chain/chain/src/resharding.rs b/chain/chain/src/resharding.rs index 2d1cafb83b4..0810c49a03a 100644 --- a/chain/chain/src/resharding.rs +++ b/chain/chain/src/resharding.rs @@ -362,7 +362,7 @@ mod tests { state_root = new_state_root; // update split states - let trie_changes = tries + let mut trie_updates = tries .apply_state_changes_to_split_states( &split_state_roots, StateChangesForSplitStates::from_raw_state_changes( @@ -372,13 +372,14 @@ mod tests { account_id_to_shard_id, ) .unwrap(); - split_state_roots = trie_changes - .iter() - .map(|(shard_uid, trie_changes)| { + split_state_roots = trie_updates + .drain() + .map(|(shard_uid, trie_update)| { let mut state_update = tries.store_update(); - let state_root = tries.apply_all(trie_changes, *shard_uid, &mut state_update); + let (_, trie_changes, _) = trie_update.finalize().unwrap(); + let state_root = tries.apply_all(&trie_changes, shard_uid, &mut state_update); state_update.commit().unwrap(); - (*shard_uid, state_root) + (shard_uid, state_root) }) .collect(); diff --git a/core/primitives/src/types.rs b/core/primitives/src/types.rs index b1b4d0f345d..1343c5aa51e 100644 --- a/core/primitives/src/types.rs +++ b/core/primitives/src/types.rs @@ -154,7 +154,7 @@ impl StateChangesKinds { } /// A structure used to index state changes due to transaction/receipt processing and other things. -#[derive(Debug, Clone, BorshSerialize, BorshDeserialize)] +#[derive(Debug, Clone, BorshSerialize, BorshDeserialize, PartialEq)] pub enum StateChangeCause { /// A type of update that does not get finalized. Used for verification and execution of /// immutable smart contract methods. Attempt fo finalize a `TrieUpdate` containing such diff --git a/core/store/src/trie/shard_tries.rs b/core/store/src/trie/shard_tries.rs index da1bafc09c7..f1264a8002a 100644 --- a/core/store/src/trie/shard_tries.rs +++ b/core/store/src/trie/shard_tries.rs @@ -768,7 +768,7 @@ impl WrappedTrieChanges { /// /// NOTE: the changes are drained from `self`. pub fn state_changes_into(&mut self, store_update: &mut StoreUpdate) { - for change_with_trie_key in self.state_changes.drain(..) { + for mut change_with_trie_key in self.state_changes.drain(..) { assert!( !change_with_trie_key.changes.iter().any(|RawStateChange { cause, .. }| matches!( cause, @@ -777,13 +777,14 @@ impl WrappedTrieChanges { "NotWritableToDisk changes must never be finalized." ); - assert!( - !change_with_trie_key.changes.iter().any(|RawStateChange { cause, .. }| matches!( - cause, - StateChangeCause::Resharding - )), - "Resharding changes must never be finalized." - ); + // Resharding changes must not be finalized, however they may be introduced here when we are + // evaluating changes for split state in process_split_state function + change_with_trie_key + .changes + .retain(|change| change.cause != StateChangeCause::Resharding); + if change_with_trie_key.changes.is_empty() { + continue; + } let storage_key = if cfg!(feature = "serialize_all_state_changes") { // Serialize all kinds of state changes without any filtering. diff --git a/core/store/src/trie/split_state.rs b/core/store/src/trie/split_state.rs index 11d836231bd..d9470caee71 100644 --- a/core/store/src/trie/split_state.rs +++ b/core/store/src/trie/split_state.rs @@ -1,8 +1,6 @@ use crate::flat::FlatStateChanges; use crate::trie::iterator::TrieItem; -use crate::{ - get, get_delayed_receipt_indices, set, ShardTries, StoreUpdate, Trie, TrieChanges, TrieUpdate, -}; +use crate::{get, get_delayed_receipt_indices, set, ShardTries, StoreUpdate, Trie, TrieUpdate}; use borsh::BorshDeserialize; use bytesize::ByteSize; use near_primitives::account::id::AccountId; @@ -34,7 +32,7 @@ impl Trie { impl ShardTries { /// applies `changes` to split states - /// and returns the generated TrieChanges for all split states + /// and returns the generated TrieUpdate for all split states /// Note that this function is different from the function `add_values_to_split_states` /// This function is used for applying updates to split states when processing blocks /// `add_values_to_split_states` are used to generate the initial states for shards split @@ -44,7 +42,7 @@ impl ShardTries { state_roots: &HashMap, changes: StateChangesForSplitStates, account_id_to_shard_id: &dyn Fn(&AccountId) -> ShardUId, - ) -> Result, StorageError> { + ) -> Result, StorageError> { let mut trie_updates: HashMap<_, _> = self.get_trie_updates(state_roots); let mut insert_receipts = Vec::new(); for ConsolidatedStateChange { trie_key, value } in changes.changes { @@ -72,7 +70,8 @@ impl ShardTries { | TrieKey::PostponedReceipt { receiver_id: account_id, .. } | TrieKey::ContractData { account_id, .. } => { let new_shard_uid = account_id_to_shard_id(account_id); - // we can safely unwrap here because the caller of this function guarantees trie_updates contains all shard_uids for the new shards + // we can safely unwrap here because the caller of this function guarantees trie_updates + // contains all shard_uids for the new shards let trie_update = trie_updates.get_mut(&new_shard_uid).unwrap(); match value { Some(value) => trie_update.set(trie_key, value), @@ -82,6 +81,9 @@ impl ShardTries { } } for (_, update) in trie_updates.iter_mut() { + // StateChangeCause should always be Resharding for processing split state. + // We do not want to commit the state_changes from resharding as they are already handled while + // processing parent shard update.commit(StateChangeCause::Resharding); } @@ -97,12 +99,7 @@ impl ShardTries { account_id_to_shard_id, )?; - let mut trie_changes_map = HashMap::new(); - for (shard_uid, update) in trie_updates { - let (_, trie_changes, _) = update.finalize()?; - trie_changes_map.insert(shard_uid, trie_changes); - } - Ok(trie_changes_map) + Ok(trie_updates) } /// add `values` (key-value pairs of items stored in states) to build states for new shards @@ -275,6 +272,9 @@ fn apply_delayed_receipts_to_split_states_impl( TrieKey::DelayedReceiptIndices, delayed_receipts_indices_by_shard.get(shard_uid).unwrap(), ); + // StateChangeCause should always be Resharding for processing split state. + // We do not want to commit the state_changes from resharding as they are already handled while + // processing parent shard trie_update.commit(StateChangeCause::Resharding); } Ok(()) diff --git a/nearcore/src/runtime/mod.rs b/nearcore/src/runtime/mod.rs index a0316fabe86..0728ead7de5 100644 --- a/nearcore/src/runtime/mod.rs +++ b/nearcore/src/runtime/mod.rs @@ -1076,28 +1076,40 @@ impl RuntimeAdapter for NightshadeRuntime { block_hash: &CryptoHash, state_roots: HashMap, next_epoch_shard_layout: &ShardLayout, - state_changes: StateChangesForSplitStates, + state_changes_for_split_states: StateChangesForSplitStates, ) -> Result, Error> { - let trie_changes = self.tries.apply_state_changes_to_split_states( + let trie_updates = self.tries.apply_state_changes_to_split_states( &state_roots, - state_changes, + state_changes_for_split_states, &|account_id| account_id_to_shard_uid(account_id, next_epoch_shard_layout), )?; - Ok(trie_changes - .into_iter() - .map(|(shard_uid, trie_changes)| ApplySplitStateResult { + let mut applied_split_state_results: Vec<_> = vec![]; + for (shard_uid, trie_update) in trie_updates { + let (_, trie_changes, state_changes) = trie_update.finalize()?; + // All state changes that are related to split state should have StateChangeCause as Resharding + // We do not want to commit the state_changes from resharding as they are already handled while + // processing parent shard + debug_assert!(state_changes.iter().all(|raw_state_changes| raw_state_changes + .changes + .iter() + .all(|state_change| state_change.cause == StateChangeCause::Resharding))); + let new_root = trie_changes.new_root; + let wrapped_trie_changes = WrappedTrieChanges::new( + self.get_tries(), + shard_uid, + trie_changes, + state_changes, + *block_hash, + ); + applied_split_state_results.push(ApplySplitStateResult { shard_uid, - new_root: trie_changes.new_root, - trie_changes: WrappedTrieChanges::new( - self.get_tries(), - shard_uid, - trie_changes, - vec![], - *block_hash, - ), - }) - .collect()) + new_root, + trie_changes: wrapped_trie_changes, + }); + } + + Ok(applied_split_state_results) } fn apply_state_part( From 792a331cda100c1f8af389c5191e9ea6ca590057 Mon Sep 17 00:00:00 2001 From: Nikolay Kurtov Date: Tue, 15 Aug 2023 12:20:31 +0200 Subject: [PATCH 6/7] python readability --- .../tests/sanity/state_sync_epoch_boundary.py | 17 +- .../tests/sanity/state_sync_then_catchup.py | 268 ++++++++++-------- 2 files changed, 150 insertions(+), 135 deletions(-) diff --git a/pytest/tests/sanity/state_sync_epoch_boundary.py b/pytest/tests/sanity/state_sync_epoch_boundary.py index af60624d826..5655f72a269 100755 --- a/pytest/tests/sanity/state_sync_epoch_boundary.py +++ b/pytest/tests/sanity/state_sync_epoch_boundary.py @@ -1,7 +1,9 @@ #!/usr/bin/env python3 # Spins up one validating node. -# Spins a non-validating node that tracks some shards and the set of tracked shards changes regularly. -# The node gets stopped, and gets restarted close to an epoch boundary but in a way to trigger epoch sync. +# Spins a non-validating node that tracks some shards and the set of tracked +# shards changes regularly. +# The node gets stopped, and gets restarted close to an epoch boundary but in a +# way to trigger state sync. # # This test is a regression test to ensure that the node doesn't panic during # function execution during block sync after a state sync. @@ -25,10 +27,10 @@ state_parts_dir = str(pathlib.Path(tempfile.gettempdir()) / 'state_parts') config0 = { - 'enable_multiline_logging': False, + 'gc_num_epochs_to_keep': 100, 'log_summary_period': { 'secs': 0, - 'nanos': 100000000 + 'nanos': 500000000 }, 'log_summary_style': 'plain', 'state_sync': { @@ -48,10 +50,10 @@ 'tracked_shards': [0], } config1 = { - 'enable_multiline_logging': False, + 'gc_num_epochs_to_keep': 100, 'log_summary_period': { 'secs': 0, - 'nanos': 100000000 + 'nanos': 500000000 }, 'log_summary_style': 'plain', 'state_sync': { @@ -74,9 +76,6 @@ [0, 1]], 'tracked_shards': [], } -logger.info(f'state_parts_dir: {state_parts_dir}') -logger.info(f'config0: {config0}') -logger.info(f'config1: {config1}') config = load_config() near_root, node_dirs = init_cluster(1, 1, 4, config, diff --git a/pytest/tests/sanity/state_sync_then_catchup.py b/pytest/tests/sanity/state_sync_then_catchup.py index 4ce7cd255db..94f33ba0a39 100644 --- a/pytest/tests/sanity/state_sync_then_catchup.py +++ b/pytest/tests/sanity/state_sync_then_catchup.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # Spins up one validating node. # Spins a non-validating node that tracks some shards and the set of tracked shards changes regularly. -# The node gets stopped, and gets restarted close to an epoch boundary but in a way to trigger epoch sync. +# The node gets stopped, and gets restarted close to an epoch boundary but in a way to trigger state sync. # # After the state sync the node has to do a catchup. # @@ -25,91 +25,6 @@ EPOCH_LENGTH = 50 -state_parts_dir = str(pathlib.Path(tempfile.gettempdir()) / 'state_parts') - -config0 = { - 'enable_multiline_logging': False, - 'gc_num_epochs_to_keep': 100, - 'log_summary_period': { - 'secs': 0, - 'nanos': 100000000 - }, - 'log_summary_style': 'plain', - 'state_sync': { - 'dump': { - 'location': { - 'Filesystem': { - 'root_dir': state_parts_dir - } - }, - 'iteration_delay': { - 'secs': 0, - 'nanos': 100000000 - }, - } - }, - 'store.state_snapshot_enabled': True, - 'tracked_shards': [0], -} -config1 = { - 'enable_multiline_logging': False, - 'gc_num_epochs_to_keep': 100, - 'log_summary_period': { - 'secs': 0, - 'nanos': 100000000 - }, - 'log_summary_style': 'plain', - 'state_sync': { - 'sync': { - 'ExternalStorage': { - 'location': { - 'Filesystem': { - 'root_dir': state_parts_dir - } - } - } - } - }, - 'state_sync_enabled': True, - 'consensus.state_sync_timeout': { - 'secs': 0, - 'nanos': 500000000 - }, - 'tracked_shard_schedule': [[0], [0], [1], [2], [3], [1], [2], [3],], - 'tracked_shards': [], -} -logger.info(f'state_parts_dir: {state_parts_dir}') -logger.info(f'config0: {config0}') -logger.info(f'config1: {config1}') - -config = load_config() -near_root, node_dirs = init_cluster(1, 1, 4, config, - [["epoch_length", EPOCH_LENGTH]], { - 0: config0, - 1: config1 - }) - -boot_node = spin_up_node(config, near_root, node_dirs[0], 0) -logger.info('started boot_node') -node1 = spin_up_node(config, near_root, node_dirs[1], 1, boot_node=boot_node) -logger.info('started node1') - -contract = utils.load_test_contract() - -latest_block_hash = boot_node.get_latest_block().hash_bytes -deploy_contract_tx = transaction.sign_deploy_contract_tx( - boot_node.signer_key, contract, 10, latest_block_hash) -result = boot_node.send_tx_and_wait(deploy_contract_tx, 10) -assert 'result' in result and 'error' not in result, ( - 'Expected "result" and no "error" in response, got: {}'.format(result)) - -latest_block_hash = boot_node.get_latest_block().hash_bytes -deploy_contract_tx = transaction.sign_deploy_contract_tx( - node1.signer_key, contract, 10, latest_block_hash) -result = boot_node.send_tx_and_wait(deploy_contract_tx, 10) -assert 'result' in result and 'error' not in result, ( - 'Expected "result" and no "error" in response, got: {}'.format(result)) - def epoch_height(block_height): if block_height == 0: @@ -122,7 +37,7 @@ def epoch_height(block_height): # Generates traffic for all possible shards. # Assumes that `test0`, `test1`, `near` all belong to different shards. -def random_workload_until(target, nonce, keys, target_node): +def random_workload_until(target, nonce, keys, node0, node1, target_node): last_height = -1 while True: nonce += 1 @@ -135,39 +50,40 @@ def random_workload_until(target, nonce, keys, target_node): logger.info(f'@{height}, epoch_height: {epoch_height(height)}') last_height = height - last_block_hash = boot_node.get_latest_block().hash_bytes - if (len(keys) > 100 and random.random() < 0.2) or len(keys) > 1000: - key = keys[random.randint(0, len(keys) - 1)] - call_function('read', key, nonce, boot_node.signer_key, - last_block_hash) - call_function('read', key, nonce, node1.signer_key, last_block_hash) - elif random.random() < 0.5: - if random.random() < 0.3: - key_from, account_to = boot_node.signer_key, node1.signer_key.account_id - elif random.random() < 0.3: - key_from, account_to = boot_node.signer_key, "near" - elif random.random() < 0.5: - key_from, account_to = node1.signer_key, boot_node.signer_key.account_id - else: - key_from, account_to = node1.signer_key, "near" + last_block_hash = node0.get_latest_block().hash_bytes + if random.random() < 0.5: + # Make a transfer between shards. + # The goal is to generate cross-shard receipts. + key_from = random.choice([node0, node1]).signer_key + account_to = random.choice([ + node0.signer_key.account_id, node1.signer_key.account_id, "near" + ]) payment_tx = transaction.sign_payment_tx(key_from, account_to, 1, nonce, last_block_hash) - boot_node.send_tx(payment_tx).get('result') + node0.send_tx(payment_tx).get('result') + elif (len(keys) > 100 and random.random() < 0.5) or len(keys) > 1000: + # Do some flat storage reads, but only if we have enough keys populated. + key = keys[random.randint(0, len(keys) - 1)] + for node in [node0, node1]: + call_function('read', key, nonce, node.signer_key, + last_block_hash, node0) + call_function('read', key, nonce, node.signer_key, + last_block_hash, node0) else: + # Generate some data for flat storage reads key = random_u64() keys.append(key) - call_function('write', key, nonce, boot_node.signer_key, - last_block_hash) - call_function('write', key, nonce, node1.signer_key, - last_block_hash) - return (nonce, keys) + for node in [node0, node1]: + call_function('write', key, nonce, node.signer_key, + last_block_hash, node0) + return nonce, keys def random_u64(): return bytes(random.randint(0, 255) for _ in range(8)) -def call_function(op, key, nonce, signer_key, last_block_hash): +def call_function(op, key, nonce, signer_key, last_block_hash, node): if op == 'read': args = key fn = 'read_value' @@ -178,23 +94,123 @@ def call_function(op, key, nonce, signer_key, last_block_hash): tx = transaction.sign_function_call_tx(signer_key, signer_key.account_id, fn, args, 300 * account.TGAS, 0, nonce, last_block_hash) - return boot_node.send_tx(tx).get('result') - + return node.send_tx(tx).get('result') -nonce, keys = random_workload_until(EPOCH_LENGTH - 5, 1, [], boot_node) -node1_height = node1.get_latest_block().height -logger.info(f'node1@{node1_height}') -node1.kill() -logger.info(f'killed node1') +def main(): + state_parts_dir = str(pathlib.Path(tempfile.gettempdir()) / 'state_parts') -# Run node0 more to trigger block sync in node1. -nonce, keys = random_workload_until(int(EPOCH_LENGTH * 3), nonce, keys, - boot_node) - -# Node1 is now behind and needs to do header sync and block sync. -node1.start(boot_node=boot_node) -node1_height = node1.get_latest_block().height -logger.info(f'started node1@{node1_height}') - -nonce, keys = random_workload_until(int(EPOCH_LENGTH * 3.7), nonce, keys, node1) + config0 = { + 'gc_num_epochs_to_keep': 100, + 'log_summary_period': { + 'secs': 0, + 'nanos': 500000000 + }, + 'log_summary_style': 'plain', + 'state_sync': { + 'dump': { + 'location': { + 'Filesystem': { + 'root_dir': state_parts_dir + } + }, + 'iteration_delay': { + 'secs': 0, + 'nanos': 100000000 + }, + } + }, + 'store.state_snapshot_enabled': True, + 'tracked_shards': [0], + } + config1 = { + 'gc_num_epochs_to_keep': 100, + 'log_summary_period': { + 'secs': 0, + 'nanos': 500000000 + }, + 'log_summary_style': 'plain', + 'state_sync': { + 'sync': { + 'ExternalStorage': { + 'location': { + 'Filesystem': { + 'root_dir': state_parts_dir + } + } + } + } + }, + 'state_sync_enabled': True, + 'consensus.state_sync_timeout': { + 'secs': 0, + 'nanos': 500000000 + }, + 'tracked_shard_schedule': [ + [0], + [0], + [1], + [2], + [3], + [1], + [2], + [3], + ], + 'tracked_shards': [], + } + + config = load_config() + near_root, node_dirs = init_cluster(1, 1, 4, config, + [["epoch_length", EPOCH_LENGTH]], { + 0: config0, + 1: config1 + }) + + boot_node = spin_up_node(config, near_root, node_dirs[0], 0) + logger.info('started boot_node') + node1 = spin_up_node(config, + near_root, + node_dirs[1], + 1, + boot_node=boot_node) + logger.info('started node1') + + contract = utils.load_test_contract() + + latest_block_hash = boot_node.get_latest_block().hash_bytes + deploy_contract_tx = transaction.sign_deploy_contract_tx( + boot_node.signer_key, contract, 10, latest_block_hash) + result = boot_node.send_tx_and_wait(deploy_contract_tx, 10) + assert 'result' in result and 'error' not in result, ( + 'Expected "result" and no "error" in response, got: {}'.format(result)) + + latest_block_hash = boot_node.get_latest_block().hash_bytes + deploy_contract_tx = transaction.sign_deploy_contract_tx( + node1.signer_key, contract, 10, latest_block_hash) + result = boot_node.send_tx_and_wait(deploy_contract_tx, 10) + assert 'result' in result and 'error' not in result, ( + 'Expected "result" and no "error" in response, got: {}'.format(result)) + + nonce, keys = random_workload_until(EPOCH_LENGTH - 5, 1, [], boot_node, + node1, boot_node) + + node1_height = node1.get_latest_block().height + logger.info(f'node1@{node1_height}') + node1.kill() + logger.info(f'killed node1') + + # Run node0 more to trigger block sync in node1. + nonce, keys = random_workload_until(int(EPOCH_LENGTH * 3), nonce, keys, + boot_node, node1, boot_node) + + # Node1 is now behind and needs to do header sync and block sync. + node1.start(boot_node=boot_node) + node1_height = node1.get_latest_block().height + logger.info(f'started node1@{node1_height}') + + nonce, keys = random_workload_until(int(EPOCH_LENGTH * 3.7), nonce, keys, + boot_node, node1, node1) + + +if __name__ == "__main__": + main() From 6b71549062df4f1d96015b86e65814c4815ceac9 Mon Sep 17 00:00:00 2001 From: Nikolay Kurtov Date: Tue, 15 Aug 2023 13:34:50 +0200 Subject: [PATCH 7/7] python readability --- pytest/tests/sanity/state_sync_then_catchup.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pytest/tests/sanity/state_sync_then_catchup.py b/pytest/tests/sanity/state_sync_then_catchup.py index 94f33ba0a39..341abeae3ec 100644 --- a/pytest/tests/sanity/state_sync_then_catchup.py +++ b/pytest/tests/sanity/state_sync_then_catchup.py @@ -35,6 +35,10 @@ def epoch_height(block_height): return int((block_height - 1) / EPOCH_LENGTH) +def random_u64(): + return bytes(random.randint(0, 255) for _ in range(8)) + + # Generates traffic for all possible shards. # Assumes that `test0`, `test1`, `near` all belong to different shards. def random_workload_until(target, nonce, keys, node0, node1, target_node): @@ -79,10 +83,6 @@ def random_workload_until(target, nonce, keys, node0, node1, target_node): return nonce, keys -def random_u64(): - return bytes(random.randint(0, 255) for _ in range(8)) - - def call_function(op, key, nonce, signer_key, last_block_hash, node): if op == 'read': args = key