From cfa12b59c0be355e18f940b66f9671aca3a3e02e Mon Sep 17 00:00:00 2001 From: nikurt <86772482+nikurt@users.noreply.github.com> Date: Thu, 17 Aug 2023 13:20:33 +0200 Subject: [PATCH] fix(state-sync): Make partial chunks message deterministic (#9427) The issue only happens if a node tracks a subset of shards. The order of shards is arbitrary because: * Shard ids are in a HashSet * In one case the first the node adds the shards that are cached, and later the shards that are only available on disk. --- chain/chunks/src/lib.rs | 12 + chain/chunks/src/logic.rs | 4 +- core/primitives/src/sharding.rs | 16 ++ docs/misc/state_sync_from_external_storage.md | 10 +- nightly/pytest-sanity.txt | 2 + pytest/lib/cluster.py | 2 +- .../tests/sanity/state_sync_epoch_boundary.py | 19 +- .../tests/sanity/state_sync_then_catchup.py | 216 ++++++++++++++++++ 8 files changed, 265 insertions(+), 16 deletions(-) create mode 100644 pytest/tests/sanity/state_sync_then_catchup.py diff --git a/chain/chunks/src/lib.rs b/chain/chunks/src/lib.rs index 710d5c13b52..b734563e322 100644 --- a/chain/chunks/src/lib.rs +++ b/chain/chunks/src/lib.rs @@ -792,9 +792,21 @@ impl ShardsManager { /// Finds the parts and receipt proofs asked for in the request, and returns a response /// containing whatever was found. See comment for PartialEncodedChunkResponseSource for /// an explanation of that part of the return value. + /// Ensures the receipts in the response are in a deterministic order. fn prepare_partial_encoded_chunk_response( &mut self, request: PartialEncodedChunkRequestMsg, + ) -> (PartialEncodedChunkResponseSource, PartialEncodedChunkResponseMsg) { + let (src, mut response_msg) = self.prepare_partial_encoded_chunk_response_unsorted(request); + // Note that the PartialChunks column is a write-once column, and needs + // the values to be deterministic. + response_msg.receipts.sort(); + (src, response_msg) + } + + fn prepare_partial_encoded_chunk_response_unsorted( + &mut self, + request: PartialEncodedChunkRequestMsg, ) -> (PartialEncodedChunkResponseSource, PartialEncodedChunkResponseMsg) { let PartialEncodedChunkRequestMsg { chunk_hash, part_ords, mut tracking_shards } = request; let mut response = PartialEncodedChunkResponseMsg { diff --git a/chain/chunks/src/logic.rs b/chain/chunks/src/logic.rs index ee42c120e2e..8586d3cc810 100644 --- a/chain/chunks/src/logic.rs +++ b/chain/chunks/src/logic.rs @@ -118,13 +118,15 @@ pub fn make_partial_encoded_chunk_from_owned_parts_and_needed_receipts<'a>( }) .cloned() .collect(); - let receipts = receipts + let mut receipts: Vec<_> = receipts .filter(|receipt| { cares_about_shard || need_receipt(prev_block_hash, receipt.1.to_shard_id, me, shard_tracker) }) .cloned() .collect(); + // Make sure the receipts are in a deterministic order. + receipts.sort(); match header.clone() { ShardChunkHeader::V1(header) => { PartialEncodedChunk::V1(PartialEncodedChunkV1 { header, parts, receipts }) diff --git a/core/primitives/src/sharding.rs b/core/primitives/src/sharding.rs index 12bc7026273..16c1dd6446c 100644 --- a/core/primitives/src/sharding.rs +++ b/core/primitives/src/sharding.rs @@ -10,6 +10,7 @@ use borsh::{BorshDeserialize, BorshSerialize}; use near_crypto::Signature; use reed_solomon_erasure::galois_8::{Field, ReedSolomon}; use reed_solomon_erasure::ReconstructShard; +use std::cmp::Ordering; use std::sync::Arc; #[derive( @@ -601,6 +602,21 @@ pub struct ShardProof { /// For each Merkle proof there is a subset of receipts which may be proven. pub struct ReceiptProof(pub Vec, pub ShardProof); +// Implement ordering to ensure `ReceiptProofs` are ordered consistently, +// because we expect messages with ReceiptProofs to be deterministic. +impl PartialOrd for ReceiptProof { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ReceiptProof { + fn cmp(&self, other: &Self) -> Ordering { + (self.1.from_shard_id, self.1.to_shard_id) + .cmp(&(other.1.from_shard_id, other.1.to_shard_id)) + } +} + #[derive(BorshSerialize, BorshDeserialize, Debug, Clone, Eq, PartialEq)] pub struct PartialEncodedChunkPart { pub part_ord: u64, diff --git a/docs/misc/state_sync_from_external_storage.md b/docs/misc/state_sync_from_external_storage.md index 9ed16b747d1..aafd6eaa8f1 100644 --- a/docs/misc/state_sync_from_external_storage.md +++ b/docs/misc/state_sync_from_external_storage.md @@ -49,9 +49,11 @@ your `config.json` file: } } }, -"state_sync_timeout": { - "secs": 30, - "nanos": 0 +"consensus": { + "state_sync_timeout": { + "secs": 30, + "nanos": 0 + } } ``` @@ -70,7 +72,7 @@ shards that can be downloaded in parallel during state sync. across all shards that can be downloaded in parallel during catchup. Generally, this number should not be higher than `num_concurrent_requests`. Keep it reasonably low to allow the node to process chunks of other shards. -* `state_sync_timeout` determines the max duration of an attempt to download a +* `consensus.state_sync_timeout` determines the max duration of an attempt to download a state part. Setting it too low may cause too many unsuccessful attempts. ### Amazon S3 diff --git a/nightly/pytest-sanity.txt b/nightly/pytest-sanity.txt index 02bbac22ad5..e93ffddde45 100644 --- a/nightly/pytest-sanity.txt +++ b/nightly/pytest-sanity.txt @@ -65,6 +65,8 @@ pytest --timeout=120 sanity/block_sync_flat_storage.py pytest --timeout=120 sanity/block_sync_flat_storage.py --features nightly pytest --timeout=240 sanity/state_sync_epoch_boundary.py pytest --timeout=240 sanity/state_sync_epoch_boundary.py --features nightly +pytest --timeout=240 sanity/state_sync_then_catchup.py +pytest --timeout=240 sanity/state_sync_then_catchup.py --features nightly pytest --timeout=240 sanity/validator_switch.py pytest --timeout=240 sanity/validator_switch.py --features nightly pytest --timeout=240 sanity/rpc_state_changes.py diff --git a/pytest/lib/cluster.py b/pytest/lib/cluster.py index f62e83cd501..9be0fa9178a 100644 --- a/pytest/lib/cluster.py +++ b/pytest/lib/cluster.py @@ -815,6 +815,7 @@ def apply_config_changes(node_dir, client_config_change): # when None. allowed_missing_configs = ( 'archive', + 'consensus.state_sync_timeout', 'log_summary_period', 'max_gas_burnt_view', 'rosetta_rpc', @@ -822,7 +823,6 @@ def apply_config_changes(node_dir, client_config_change): 'split_storage', 'state_sync', 'state_sync_enabled', - 'state_sync_timeout', 'store.state_snapshot_enabled', 'tracked_shard_schedule', ) diff --git a/pytest/tests/sanity/state_sync_epoch_boundary.py b/pytest/tests/sanity/state_sync_epoch_boundary.py index 82eb3440b1c..5655f72a269 100755 --- a/pytest/tests/sanity/state_sync_epoch_boundary.py +++ b/pytest/tests/sanity/state_sync_epoch_boundary.py @@ -1,7 +1,9 @@ #!/usr/bin/env python3 # Spins up one validating node. -# Spins a non-validating node that tracks some shards and the set of tracked shards changes regularly. -# The node gets stopped, and gets restarted close to an epoch boundary but in a way to trigger epoch sync. +# Spins a non-validating node that tracks some shards and the set of tracked +# shards changes regularly. +# The node gets stopped, and gets restarted close to an epoch boundary but in a +# way to trigger state sync. # # This test is a regression test to ensure that the node doesn't panic during # function execution during block sync after a state sync. @@ -25,10 +27,10 @@ state_parts_dir = str(pathlib.Path(tempfile.gettempdir()) / 'state_parts') config0 = { - 'enable_multiline_logging': False, + 'gc_num_epochs_to_keep': 100, 'log_summary_period': { 'secs': 0, - 'nanos': 100000000 + 'nanos': 500000000 }, 'log_summary_style': 'plain', 'state_sync': { @@ -48,10 +50,10 @@ 'tracked_shards': [0], } config1 = { - 'enable_multiline_logging': False, + 'gc_num_epochs_to_keep': 100, 'log_summary_period': { 'secs': 0, - 'nanos': 100000000 + 'nanos': 500000000 }, 'log_summary_style': 'plain', 'state_sync': { @@ -66,7 +68,7 @@ } }, 'state_sync_enabled': True, - 'state_sync_timeout': { + 'consensus.state_sync_timeout': { 'secs': 0, 'nanos': 500000000 }, @@ -74,9 +76,6 @@ [0, 1]], 'tracked_shards': [], } -logger.info(f'state_parts_dir: {state_parts_dir}') -logger.info(f'config0: {config0}') -logger.info(f'config1: {config1}') config = load_config() near_root, node_dirs = init_cluster(1, 1, 4, config, diff --git a/pytest/tests/sanity/state_sync_then_catchup.py b/pytest/tests/sanity/state_sync_then_catchup.py new file mode 100644 index 00000000000..341abeae3ec --- /dev/null +++ b/pytest/tests/sanity/state_sync_then_catchup.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python3 +# Spins up one validating node. +# Spins a non-validating node that tracks some shards and the set of tracked shards changes regularly. +# The node gets stopped, and gets restarted close to an epoch boundary but in a way to trigger state sync. +# +# After the state sync the node has to do a catchup. +# +# Note that the test must generate outgoing receipts for most shards almost +# every block in order to crash if creation of partial encoded chunks becomes +# non-deterministic. + +import pathlib +import random +import sys +import tempfile + +sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib')) + +from cluster import init_cluster, spin_up_node, load_config, apply_config_changes +import account +import transaction +import utils + +from configured_logger import logger + +EPOCH_LENGTH = 50 + + +def epoch_height(block_height): + if block_height == 0: + return 0 + if block_height <= EPOCH_LENGTH: + # According to the protocol specifications, there are two epochs with height 1. + return "1*" + return int((block_height - 1) / EPOCH_LENGTH) + + +def random_u64(): + return bytes(random.randint(0, 255) for _ in range(8)) + + +# Generates traffic for all possible shards. +# Assumes that `test0`, `test1`, `near` all belong to different shards. +def random_workload_until(target, nonce, keys, node0, node1, target_node): + last_height = -1 + while True: + nonce += 1 + + last_block = target_node.get_latest_block() + height = last_block.height + if height > target: + break + if height != last_height: + logger.info(f'@{height}, epoch_height: {epoch_height(height)}') + last_height = height + + last_block_hash = node0.get_latest_block().hash_bytes + if random.random() < 0.5: + # Make a transfer between shards. + # The goal is to generate cross-shard receipts. + key_from = random.choice([node0, node1]).signer_key + account_to = random.choice([ + node0.signer_key.account_id, node1.signer_key.account_id, "near" + ]) + payment_tx = transaction.sign_payment_tx(key_from, account_to, 1, + nonce, last_block_hash) + node0.send_tx(payment_tx).get('result') + elif (len(keys) > 100 and random.random() < 0.5) or len(keys) > 1000: + # Do some flat storage reads, but only if we have enough keys populated. + key = keys[random.randint(0, len(keys) - 1)] + for node in [node0, node1]: + call_function('read', key, nonce, node.signer_key, + last_block_hash, node0) + call_function('read', key, nonce, node.signer_key, + last_block_hash, node0) + else: + # Generate some data for flat storage reads + key = random_u64() + keys.append(key) + for node in [node0, node1]: + call_function('write', key, nonce, node.signer_key, + last_block_hash, node0) + return nonce, keys + + +def call_function(op, key, nonce, signer_key, last_block_hash, node): + if op == 'read': + args = key + fn = 'read_value' + else: + args = key + random_u64() + fn = 'write_key_value' + + tx = transaction.sign_function_call_tx(signer_key, signer_key.account_id, + fn, args, 300 * account.TGAS, 0, + nonce, last_block_hash) + return node.send_tx(tx).get('result') + + +def main(): + state_parts_dir = str(pathlib.Path(tempfile.gettempdir()) / 'state_parts') + + config0 = { + 'gc_num_epochs_to_keep': 100, + 'log_summary_period': { + 'secs': 0, + 'nanos': 500000000 + }, + 'log_summary_style': 'plain', + 'state_sync': { + 'dump': { + 'location': { + 'Filesystem': { + 'root_dir': state_parts_dir + } + }, + 'iteration_delay': { + 'secs': 0, + 'nanos': 100000000 + }, + } + }, + 'store.state_snapshot_enabled': True, + 'tracked_shards': [0], + } + config1 = { + 'gc_num_epochs_to_keep': 100, + 'log_summary_period': { + 'secs': 0, + 'nanos': 500000000 + }, + 'log_summary_style': 'plain', + 'state_sync': { + 'sync': { + 'ExternalStorage': { + 'location': { + 'Filesystem': { + 'root_dir': state_parts_dir + } + } + } + } + }, + 'state_sync_enabled': True, + 'consensus.state_sync_timeout': { + 'secs': 0, + 'nanos': 500000000 + }, + 'tracked_shard_schedule': [ + [0], + [0], + [1], + [2], + [3], + [1], + [2], + [3], + ], + 'tracked_shards': [], + } + + config = load_config() + near_root, node_dirs = init_cluster(1, 1, 4, config, + [["epoch_length", EPOCH_LENGTH]], { + 0: config0, + 1: config1 + }) + + boot_node = spin_up_node(config, near_root, node_dirs[0], 0) + logger.info('started boot_node') + node1 = spin_up_node(config, + near_root, + node_dirs[1], + 1, + boot_node=boot_node) + logger.info('started node1') + + contract = utils.load_test_contract() + + latest_block_hash = boot_node.get_latest_block().hash_bytes + deploy_contract_tx = transaction.sign_deploy_contract_tx( + boot_node.signer_key, contract, 10, latest_block_hash) + result = boot_node.send_tx_and_wait(deploy_contract_tx, 10) + assert 'result' in result and 'error' not in result, ( + 'Expected "result" and no "error" in response, got: {}'.format(result)) + + latest_block_hash = boot_node.get_latest_block().hash_bytes + deploy_contract_tx = transaction.sign_deploy_contract_tx( + node1.signer_key, contract, 10, latest_block_hash) + result = boot_node.send_tx_and_wait(deploy_contract_tx, 10) + assert 'result' in result and 'error' not in result, ( + 'Expected "result" and no "error" in response, got: {}'.format(result)) + + nonce, keys = random_workload_until(EPOCH_LENGTH - 5, 1, [], boot_node, + node1, boot_node) + + node1_height = node1.get_latest_block().height + logger.info(f'node1@{node1_height}') + node1.kill() + logger.info(f'killed node1') + + # Run node0 more to trigger block sync in node1. + nonce, keys = random_workload_until(int(EPOCH_LENGTH * 3), nonce, keys, + boot_node, node1, boot_node) + + # Node1 is now behind and needs to do header sync and block sync. + node1.start(boot_node=boot_node) + node1_height = node1.get_latest_block().height + logger.info(f'started node1@{node1_height}') + + nonce, keys = random_workload_until(int(EPOCH_LENGTH * 3.7), nonce, keys, + boot_node, node1, node1) + + +if __name__ == "__main__": + main()