From 8f0f75e5dd52bb848e930e83385d3fc815999ea9 Mon Sep 17 00:00:00 2001 From: nikurt <86772482+nikurt@users.noreply.github.com> Date: Mon, 31 Jul 2023 09:04:58 +0200 Subject: [PATCH] fix(flat-storage): Initiailze Flat Storage respecting the shards that the node will care about (#9368) Added a test reproducing this issue. Steps to reproduce: * Download an S3 snapshot * Change config.json to track some shard in the next epoch but not in the current epoch. * * Use `tracked_shard_schedule` or be a chunk-only producer. Observe that the node tries to use flat storage to process chunks from shards that it will care about, but then it prints "FlatStorage not ready" and uses the usual Trie storage, resulting in `IncorrectStateRoot`. --- chain/chain/src/flat_storage_creator.rs | 10 +- core/chain-configs/src/client_config.rs | 5 +- nightly/pytest-sanity.txt | 4 +- pytest/lib/cluster.py | 3 +- .../tests/sanity/block_sync_flat_storage.py | 101 ++++++++++++++++++ 5 files changed, 118 insertions(+), 5 deletions(-) create mode 100755 pytest/tests/sanity/block_sync_flat_storage.py diff --git a/chain/chain/src/flat_storage_creator.rs b/chain/chain/src/flat_storage_creator.rs index 7c0c15e27fd..9e968f3c605 100644 --- a/chain/chain/src/flat_storage_creator.rs +++ b/chain/chain/src/flat_storage_creator.rs @@ -446,7 +446,15 @@ impl FlatStorageCreator { return Ok(None); }; for shard_id in 0..num_shards { - if shard_tracker.care_about_shard(me, &chain_head.prev_block_hash, shard_id, true) { + // The node applies transactions from the shards it cares about this and the next epoch. + if shard_tracker.care_about_shard(me, &chain_head.prev_block_hash, shard_id, true) + || shard_tracker.will_care_about_shard( + me, + &chain_head.prev_block_hash, + shard_id, + true, + ) + { let shard_uid = epoch_manager.shard_id_to_uid(shard_id, &chain_head.epoch_id)?; let status = flat_storage_manager.get_flat_storage_status(shard_uid); diff --git a/core/chain-configs/src/client_config.rs b/core/chain-configs/src/client_config.rs index 5b9f3b6b4f2..948dc35c50c 100644 --- a/core/chain-configs/src/client_config.rs +++ b/core/chain-configs/src/client_config.rs @@ -224,12 +224,13 @@ pub struct ClientConfig { pub block_header_fetch_horizon: BlockHeightDelta, /// Garbage collection configuration. pub gc: GCConfig, - /// Accounts that this client tracks + /// Accounts that this client tracks. pub tracked_accounts: Vec, - /// Shards that this client tracks + /// Shards that this client tracks. pub tracked_shards: Vec, /// Rotate between these sets of tracked shards. /// Used to simulate the behavior of chunk only producers without staking tokens. + /// This field is only used if `tracked_shards` is empty. pub tracked_shard_schedule: Vec>, /// Not clear old data, set `true` for archive nodes. pub archive: bool, diff --git a/nightly/pytest-sanity.txt b/nightly/pytest-sanity.txt index c8f521b6fb0..3f7de18d15a 100644 --- a/nightly/pytest-sanity.txt +++ b/nightly/pytest-sanity.txt @@ -61,6 +61,8 @@ pytest --timeout=240 sanity/block_sync.py pytest --timeout=240 sanity/block_sync.py --features nightly pytest --timeout=10m sanity/block_sync_archival.py pytest --timeout=10m sanity/block_sync_archival.py --features nightly +pytest --timeout=120 sanity/block_sync_flat_storage.py +pytest --timeout=120 sanity/block_sync_flat_storage.py --features nightly pytest --timeout=240 sanity/validator_switch.py pytest --timeout=240 sanity/validator_switch.py --features nightly pytest --timeout=240 sanity/rpc_state_changes.py @@ -144,4 +146,4 @@ pytest sanity/meta_tx.py --features nightly # Tests for split storage and split storage migration pytest --timeout=600 sanity/split_storage.py -pytest --timeout=600 sanity/split_storage.py --features nightly \ No newline at end of file +pytest --timeout=600 sanity/split_storage.py --features nightly diff --git a/pytest/lib/cluster.py b/pytest/lib/cluster.py index 05c30ead51e..d517bbf38ae 100644 --- a/pytest/lib/cluster.py +++ b/pytest/lib/cluster.py @@ -818,6 +818,7 @@ def apply_config_changes(node_dir, client_config_change): 'split_storage', 'state_sync_enabled', 'store.state_snapshot_enabled', + 'tracked_shard_schedule', ) for k, v in client_config_change.items(): @@ -965,4 +966,4 @@ def get_binary_protocol_version(config) -> typing.Optional[int]: for i in range(n): if tokens[i] == "protocol" and i + 1 < n: return int(tokens[i + 1]) - return None \ No newline at end of file + return None diff --git a/pytest/tests/sanity/block_sync_flat_storage.py b/pytest/tests/sanity/block_sync_flat_storage.py new file mode 100755 index 00000000000..9c23cd86f7c --- /dev/null +++ b/pytest/tests/sanity/block_sync_flat_storage.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python3 +# Spins up one validating node. +# Spins a non-validating node that tracks all shards. +# In the middle of an epoch, the node gets stopped, and the set of tracked shards gets reduced. +# Test that the node correctly handles chunks for the shards that it will care about in the next epoch. +# Spam transactions that require the node to use flat storage to process them correctly. + +import pathlib +import random +import sys + +sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib')) + +from cluster import init_cluster, spin_up_node, load_config, apply_config_changes +import account +import transaction +import utils + +EPOCH_LENGTH = 30 + +config0 = { + 'tracked_shards': [0], +} +config1 = { + 'tracked_shards': [0], +} + +config = load_config() +near_root, node_dirs = init_cluster(1, 1, 4, config, + [["epoch_length", EPOCH_LENGTH]], { + 0: config0, + 1: config1 + }) + +boot_node = spin_up_node(config, near_root, node_dirs[0], 0) +node1 = spin_up_node(config, near_root, node_dirs[1], 1, boot_node=boot_node) + +contract_key = boot_node.signer_key +contract = utils.load_test_contract() +latest_block_hash = boot_node.get_latest_block().hash_bytes +deploy_contract_tx = transaction.sign_deploy_contract_tx( + contract_key, contract, 10, latest_block_hash) +result = boot_node.send_tx_and_wait(deploy_contract_tx, 10) +assert 'result' in result and 'error' not in result, ( + 'Expected "result" and no "error" in response, got: {}'.format(result)) + + +def random_workload_until(target, nonce, keys): + while True: + nonce += 1 + height = boot_node.get_latest_block().height + if height > target: + break + if (len(keys) > 100 and random.random() < 0.2) or len(keys) > 1000: + key = keys[random.randint(0, len(keys) - 1)] + call_function(boot_node, 'read', key, nonce) + else: + key = random_u64() + keys.append(key) + call_function(boot_node, 'write', key, nonce) + return (nonce, keys) + + +def random_u64(): + return bytes(random.randint(0, 255) for _ in range(8)) + + +def call_function(node, op, key, nonce): + last_block_hash = node.get_latest_block().hash_bytes + if op == 'read': + args = key + fn = 'read_value' + else: + args = key + random_u64() + fn = 'write_key_value' + + tx = transaction.sign_function_call_tx(node.signer_key, + node.signer_key.account_id, fn, args, + 300 * account.TGAS, 0, nonce, + last_block_hash) + return node.send_tx(tx).get('result') + + +nonce, keys = random_workload_until(EPOCH_LENGTH + 5, 1, []) + +node1.kill() +# Reduce the set of tracked shards and make it variable in time. +# The node is stopped in epoch_height = 1. +# Change the config of tracked shards such that after restart the node cares +# only about shard 0, and in the next epoch it will care about shards [1, 2, 3]. +apply_config_changes(node_dirs[1], { + "tracked_shards": [], + "tracked_shard_schedule": [[0], [0], [1, 2, 3]] +}) + +# Run node0 more to trigger block sync in node1. +nonce, keys = random_workload_until(EPOCH_LENGTH * 2 + 1, nonce, keys) + +# Node1 is now behind and needs to do header sync and block sync. +node1.start(boot_node=boot_node) +utils.wait_for_blocks(node1, target=EPOCH_LENGTH * 2 + 10)