Skip to content

Commit

Permalink
fix(state-sync): Make partial chunks message deterministic (near#9427)
Browse files Browse the repository at this point in the history
The issue only happens if a node tracks a subset of shards.
The order of shards is arbitrary because:

* Shard ids are in a HashSet
* In one case the first the node adds the shards that are cached, and later the shards that are only available on disk.
  • Loading branch information
nikurt committed Aug 24, 2023
1 parent 9a26688 commit cfa12b5
Show file tree
Hide file tree
Showing 8 changed files with 265 additions and 16 deletions.
12 changes: 12 additions & 0 deletions chain/chunks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,9 +792,21 @@ impl ShardsManager {
/// Finds the parts and receipt proofs asked for in the request, and returns a response
/// containing whatever was found. See comment for PartialEncodedChunkResponseSource for
/// an explanation of that part of the return value.
/// Ensures the receipts in the response are in a deterministic order.
fn prepare_partial_encoded_chunk_response(
&mut self,
request: PartialEncodedChunkRequestMsg,
) -> (PartialEncodedChunkResponseSource, PartialEncodedChunkResponseMsg) {
let (src, mut response_msg) = self.prepare_partial_encoded_chunk_response_unsorted(request);
// Note that the PartialChunks column is a write-once column, and needs
// the values to be deterministic.
response_msg.receipts.sort();
(src, response_msg)
}

fn prepare_partial_encoded_chunk_response_unsorted(
&mut self,
request: PartialEncodedChunkRequestMsg,
) -> (PartialEncodedChunkResponseSource, PartialEncodedChunkResponseMsg) {
let PartialEncodedChunkRequestMsg { chunk_hash, part_ords, mut tracking_shards } = request;
let mut response = PartialEncodedChunkResponseMsg {
Expand Down
4 changes: 3 additions & 1 deletion chain/chunks/src/logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,15 @@ pub fn make_partial_encoded_chunk_from_owned_parts_and_needed_receipts<'a>(
})
.cloned()
.collect();
let receipts = receipts
let mut receipts: Vec<_> = receipts
.filter(|receipt| {
cares_about_shard
|| need_receipt(prev_block_hash, receipt.1.to_shard_id, me, shard_tracker)
})
.cloned()
.collect();
// Make sure the receipts are in a deterministic order.
receipts.sort();
match header.clone() {
ShardChunkHeader::V1(header) => {
PartialEncodedChunk::V1(PartialEncodedChunkV1 { header, parts, receipts })
Expand Down
16 changes: 16 additions & 0 deletions core/primitives/src/sharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use borsh::{BorshDeserialize, BorshSerialize};
use near_crypto::Signature;
use reed_solomon_erasure::galois_8::{Field, ReedSolomon};
use reed_solomon_erasure::ReconstructShard;
use std::cmp::Ordering;
use std::sync::Arc;

#[derive(
Expand Down Expand Up @@ -601,6 +602,21 @@ pub struct ShardProof {
/// For each Merkle proof there is a subset of receipts which may be proven.
pub struct ReceiptProof(pub Vec<Receipt>, pub ShardProof);

// Implement ordering to ensure `ReceiptProofs` are ordered consistently,
// because we expect messages with ReceiptProofs to be deterministic.
impl PartialOrd<Self> for ReceiptProof {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for ReceiptProof {
fn cmp(&self, other: &Self) -> Ordering {
(self.1.from_shard_id, self.1.to_shard_id)
.cmp(&(other.1.from_shard_id, other.1.to_shard_id))
}
}

#[derive(BorshSerialize, BorshDeserialize, Debug, Clone, Eq, PartialEq)]
pub struct PartialEncodedChunkPart {
pub part_ord: u64,
Expand Down
10 changes: 6 additions & 4 deletions docs/misc/state_sync_from_external_storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ your `config.json` file:
}
}
},
"state_sync_timeout": {
"secs": 30,
"nanos": 0
"consensus": {
"state_sync_timeout": {
"secs": 30,
"nanos": 0
}
}
```

Expand All @@ -70,7 +72,7 @@ shards that can be downloaded in parallel during state sync.
across all shards that can be downloaded in parallel during catchup. Generally,
this number should not be higher than `num_concurrent_requests`. Keep it
reasonably low to allow the node to process chunks of other shards.
* `state_sync_timeout` determines the max duration of an attempt to download a
* `consensus.state_sync_timeout` determines the max duration of an attempt to download a
state part. Setting it too low may cause too many unsuccessful attempts.

### Amazon S3
Expand Down
2 changes: 2 additions & 0 deletions nightly/pytest-sanity.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ pytest --timeout=120 sanity/block_sync_flat_storage.py
pytest --timeout=120 sanity/block_sync_flat_storage.py --features nightly
pytest --timeout=240 sanity/state_sync_epoch_boundary.py
pytest --timeout=240 sanity/state_sync_epoch_boundary.py --features nightly
pytest --timeout=240 sanity/state_sync_then_catchup.py
pytest --timeout=240 sanity/state_sync_then_catchup.py --features nightly
pytest --timeout=240 sanity/validator_switch.py
pytest --timeout=240 sanity/validator_switch.py --features nightly
pytest --timeout=240 sanity/rpc_state_changes.py
Expand Down
2 changes: 1 addition & 1 deletion pytest/lib/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -815,14 +815,14 @@ def apply_config_changes(node_dir, client_config_change):
# when None.
allowed_missing_configs = (
'archive',
'consensus.state_sync_timeout',
'log_summary_period',
'max_gas_burnt_view',
'rosetta_rpc',
'save_trie_changes',
'split_storage',
'state_sync',
'state_sync_enabled',
'state_sync_timeout',
'store.state_snapshot_enabled',
'tracked_shard_schedule',
)
Expand Down
19 changes: 9 additions & 10 deletions pytest/tests/sanity/state_sync_epoch_boundary.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#!/usr/bin/env python3
# Spins up one validating node.
# Spins a non-validating node that tracks some shards and the set of tracked shards changes regularly.
# The node gets stopped, and gets restarted close to an epoch boundary but in a way to trigger epoch sync.
# Spins a non-validating node that tracks some shards and the set of tracked
# shards changes regularly.
# The node gets stopped, and gets restarted close to an epoch boundary but in a
# way to trigger state sync.
#
# This test is a regression test to ensure that the node doesn't panic during
# function execution during block sync after a state sync.
Expand All @@ -25,10 +27,10 @@
state_parts_dir = str(pathlib.Path(tempfile.gettempdir()) / 'state_parts')

config0 = {
'enable_multiline_logging': False,
'gc_num_epochs_to_keep': 100,
'log_summary_period': {
'secs': 0,
'nanos': 100000000
'nanos': 500000000
},
'log_summary_style': 'plain',
'state_sync': {
Expand All @@ -48,10 +50,10 @@
'tracked_shards': [0],
}
config1 = {
'enable_multiline_logging': False,
'gc_num_epochs_to_keep': 100,
'log_summary_period': {
'secs': 0,
'nanos': 100000000
'nanos': 500000000
},
'log_summary_style': 'plain',
'state_sync': {
Expand All @@ -66,17 +68,14 @@
}
},
'state_sync_enabled': True,
'state_sync_timeout': {
'consensus.state_sync_timeout': {
'secs': 0,
'nanos': 500000000
},
'tracked_shard_schedule': [[0, 2, 3], [0, 2, 3], [0, 1], [0, 1], [0, 1],
[0, 1]],
'tracked_shards': [],
}
logger.info(f'state_parts_dir: {state_parts_dir}')
logger.info(f'config0: {config0}')
logger.info(f'config1: {config1}')

config = load_config()
near_root, node_dirs = init_cluster(1, 1, 4, config,
Expand Down
216 changes: 216 additions & 0 deletions pytest/tests/sanity/state_sync_then_catchup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
#!/usr/bin/env python3
# Spins up one validating node.
# Spins a non-validating node that tracks some shards and the set of tracked shards changes regularly.
# The node gets stopped, and gets restarted close to an epoch boundary but in a way to trigger state sync.
#
# After the state sync the node has to do a catchup.
#
# Note that the test must generate outgoing receipts for most shards almost
# every block in order to crash if creation of partial encoded chunks becomes
# non-deterministic.

import pathlib
import random
import sys
import tempfile

sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib'))

from cluster import init_cluster, spin_up_node, load_config, apply_config_changes
import account
import transaction
import utils

from configured_logger import logger

EPOCH_LENGTH = 50


def epoch_height(block_height):
if block_height == 0:
return 0
if block_height <= EPOCH_LENGTH:
# According to the protocol specifications, there are two epochs with height 1.
return "1*"
return int((block_height - 1) / EPOCH_LENGTH)


def random_u64():
return bytes(random.randint(0, 255) for _ in range(8))


# Generates traffic for all possible shards.
# Assumes that `test0`, `test1`, `near` all belong to different shards.
def random_workload_until(target, nonce, keys, node0, node1, target_node):
last_height = -1
while True:
nonce += 1

last_block = target_node.get_latest_block()
height = last_block.height
if height > target:
break
if height != last_height:
logger.info(f'@{height}, epoch_height: {epoch_height(height)}')
last_height = height

last_block_hash = node0.get_latest_block().hash_bytes
if random.random() < 0.5:
# Make a transfer between shards.
# The goal is to generate cross-shard receipts.
key_from = random.choice([node0, node1]).signer_key
account_to = random.choice([
node0.signer_key.account_id, node1.signer_key.account_id, "near"
])
payment_tx = transaction.sign_payment_tx(key_from, account_to, 1,
nonce, last_block_hash)
node0.send_tx(payment_tx).get('result')
elif (len(keys) > 100 and random.random() < 0.5) or len(keys) > 1000:
# Do some flat storage reads, but only if we have enough keys populated.
key = keys[random.randint(0, len(keys) - 1)]
for node in [node0, node1]:
call_function('read', key, nonce, node.signer_key,
last_block_hash, node0)
call_function('read', key, nonce, node.signer_key,
last_block_hash, node0)
else:
# Generate some data for flat storage reads
key = random_u64()
keys.append(key)
for node in [node0, node1]:
call_function('write', key, nonce, node.signer_key,
last_block_hash, node0)
return nonce, keys


def call_function(op, key, nonce, signer_key, last_block_hash, node):
if op == 'read':
args = key
fn = 'read_value'
else:
args = key + random_u64()
fn = 'write_key_value'

tx = transaction.sign_function_call_tx(signer_key, signer_key.account_id,
fn, args, 300 * account.TGAS, 0,
nonce, last_block_hash)
return node.send_tx(tx).get('result')


def main():
state_parts_dir = str(pathlib.Path(tempfile.gettempdir()) / 'state_parts')

config0 = {
'gc_num_epochs_to_keep': 100,
'log_summary_period': {
'secs': 0,
'nanos': 500000000
},
'log_summary_style': 'plain',
'state_sync': {
'dump': {
'location': {
'Filesystem': {
'root_dir': state_parts_dir
}
},
'iteration_delay': {
'secs': 0,
'nanos': 100000000
},
}
},
'store.state_snapshot_enabled': True,
'tracked_shards': [0],
}
config1 = {
'gc_num_epochs_to_keep': 100,
'log_summary_period': {
'secs': 0,
'nanos': 500000000
},
'log_summary_style': 'plain',
'state_sync': {
'sync': {
'ExternalStorage': {
'location': {
'Filesystem': {
'root_dir': state_parts_dir
}
}
}
}
},
'state_sync_enabled': True,
'consensus.state_sync_timeout': {
'secs': 0,
'nanos': 500000000
},
'tracked_shard_schedule': [
[0],
[0],
[1],
[2],
[3],
[1],
[2],
[3],
],
'tracked_shards': [],
}

config = load_config()
near_root, node_dirs = init_cluster(1, 1, 4, config,
[["epoch_length", EPOCH_LENGTH]], {
0: config0,
1: config1
})

boot_node = spin_up_node(config, near_root, node_dirs[0], 0)
logger.info('started boot_node')
node1 = spin_up_node(config,
near_root,
node_dirs[1],
1,
boot_node=boot_node)
logger.info('started node1')

contract = utils.load_test_contract()

latest_block_hash = boot_node.get_latest_block().hash_bytes
deploy_contract_tx = transaction.sign_deploy_contract_tx(
boot_node.signer_key, contract, 10, latest_block_hash)
result = boot_node.send_tx_and_wait(deploy_contract_tx, 10)
assert 'result' in result and 'error' not in result, (
'Expected "result" and no "error" in response, got: {}'.format(result))

latest_block_hash = boot_node.get_latest_block().hash_bytes
deploy_contract_tx = transaction.sign_deploy_contract_tx(
node1.signer_key, contract, 10, latest_block_hash)
result = boot_node.send_tx_and_wait(deploy_contract_tx, 10)
assert 'result' in result and 'error' not in result, (
'Expected "result" and no "error" in response, got: {}'.format(result))

nonce, keys = random_workload_until(EPOCH_LENGTH - 5, 1, [], boot_node,
node1, boot_node)

node1_height = node1.get_latest_block().height
logger.info(f'node1@{node1_height}')
node1.kill()
logger.info(f'killed node1')

# Run node0 more to trigger block sync in node1.
nonce, keys = random_workload_until(int(EPOCH_LENGTH * 3), nonce, keys,
boot_node, node1, boot_node)

# Node1 is now behind and needs to do header sync and block sync.
node1.start(boot_node=boot_node)
node1_height = node1.get_latest_block().height
logger.info(f'started node1@{node1_height}')

nonce, keys = random_workload_until(int(EPOCH_LENGTH * 3.7), nonce, keys,
boot_node, node1, node1)


if __name__ == "__main__":
main()

0 comments on commit cfa12b5

Please sign in to comment.