Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(state-sync): Make partial chunks message deterministic #9427

Merged
merged 11 commits into from
Aug 17, 2023
12 changes: 12 additions & 0 deletions chain/chunks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,9 +792,21 @@ impl ShardsManager {
/// Finds the parts and receipt proofs asked for in the request, and returns a response
/// containing whatever was found. See comment for PartialEncodedChunkResponseSource for
/// an explanation of that part of the return value.
/// Ensures the receipts in the response are in a deterministic order.
fn prepare_partial_encoded_chunk_response(
&mut self,
request: PartialEncodedChunkRequestMsg,
) -> (PartialEncodedChunkResponseSource, PartialEncodedChunkResponseMsg) {
let (src, mut response_msg) = self.prepare_partial_encoded_chunk_response_unsorted(request);
// Note that the PartialChunks column is a write-once column, and needs
// the values to be deterministic.
response_msg.receipts.sort();
(src, response_msg)
}

fn prepare_partial_encoded_chunk_response_unsorted(
&mut self,
request: PartialEncodedChunkRequestMsg,
) -> (PartialEncodedChunkResponseSource, PartialEncodedChunkResponseMsg) {
let PartialEncodedChunkRequestMsg { chunk_hash, part_ords, mut tracking_shards } = request;
let mut response = PartialEncodedChunkResponseMsg {
Expand Down
4 changes: 3 additions & 1 deletion chain/chunks/src/logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,15 @@ pub fn make_partial_encoded_chunk_from_owned_parts_and_needed_receipts<'a>(
})
.cloned()
.collect();
let receipts = receipts
let mut receipts: Vec<_> = receipts
.filter(|receipt| {
cares_about_shard
|| need_receipt(prev_block_hash, receipt.1.to_shard_id, me, shard_tracker)
})
.cloned()
.collect();
// Make sure the receipts are in a deterministic order.
receipts.sort();
match header.clone() {
ShardChunkHeader::V1(header) => {
PartialEncodedChunk::V1(PartialEncodedChunkV1 { header, parts, receipts })
Expand Down
14 changes: 14 additions & 0 deletions core/primitives/src/sharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use borsh::{BorshDeserialize, BorshSerialize};
use near_crypto::Signature;
use reed_solomon_erasure::galois_8::{Field, ReedSolomon};
use reed_solomon_erasure::ReconstructShard;
use std::cmp::Ordering;
use std::sync::Arc;

#[derive(
Expand Down Expand Up @@ -601,6 +602,19 @@ pub struct ShardProof {
/// For each Merkle proof there is a subset of receipts which may be proven.
pub struct ReceiptProof(pub Vec<Receipt>, pub ShardProof);

impl PartialOrd<Self> for ReceiptProof {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for ReceiptProof {
fn cmp(&self, other: &Self) -> Ordering {
(self.1.from_shard_id, self.1.to_shard_id)
.cmp(&(other.1.from_shard_id, other.1.to_shard_id))
Comment on lines +615 to +616
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem sufficiently ordered, there can be multiple receitpts with the same from and to shard ids and thus still be non-deterministic. How about you also compare the hashes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The receipts are grouped by shard ids of outgoing and incoming:

pub struct ReceiptProof(pub Vec<Receipt>, pub ShardProof);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wacban or is your suggestion to order the receipts inside ReceiptProof? 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see now what's going on. You're claiming that there is at most one ReceiptProof per every pair (from, to) and as such the comparison is actually strict. As long as the assumption is correct I'm fine with this implementation.

I was not suggesting sorting the receipts but that doesn't sound too bad just to be extra cautious. I lack context to say if it is actually needed so I'll leave it up to your best judgment.

}
}

#[derive(BorshSerialize, BorshDeserialize, Debug, Clone, Eq, PartialEq)]
pub struct PartialEncodedChunkPart {
pub part_ord: u64,
Expand Down
10 changes: 6 additions & 4 deletions docs/misc/state_sync_from_external_storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ your `config.json` file:
}
}
},
"state_sync_timeout": {
"secs": 30,
"nanos": 0
"consensus": {
"state_sync_timeout": {
"secs": 30,
"nanos": 0
}
}
```

Expand All @@ -70,7 +72,7 @@ shards that can be downloaded in parallel during state sync.
across all shards that can be downloaded in parallel during catchup. Generally,
this number should not be higher than `num_concurrent_requests`. Keep it
reasonably low to allow the node to process chunks of other shards.
* `state_sync_timeout` determines the max duration of an attempt to download a
* `consensus.state_sync_timeout` determines the max duration of an attempt to download a
state part. Setting it too low may cause too many unsuccessful attempts.

### Amazon S3
Expand Down
2 changes: 2 additions & 0 deletions nightly/pytest-sanity.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ pytest --timeout=120 sanity/block_sync_flat_storage.py
pytest --timeout=120 sanity/block_sync_flat_storage.py --features nightly
pytest --timeout=240 sanity/state_sync_epoch_boundary.py
pytest --timeout=240 sanity/state_sync_epoch_boundary.py --features nightly
pytest --timeout=240 sanity/state_sync_then_catchup.py
pytest --timeout=240 sanity/state_sync_then_catchup.py --features nightly
pytest --timeout=240 sanity/validator_switch.py
pytest --timeout=240 sanity/validator_switch.py --features nightly
pytest --timeout=240 sanity/rpc_state_changes.py
Expand Down
2 changes: 1 addition & 1 deletion pytest/lib/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -815,14 +815,14 @@ def apply_config_changes(node_dir, client_config_change):
# when None.
allowed_missing_configs = (
'archive',
'consensus.state_sync_timeout',
'log_summary_period',
'max_gas_burnt_view',
'rosetta_rpc',
'save_trie_changes',
'split_storage',
'state_sync',
'state_sync_enabled',
'state_sync_timeout',
'store.state_snapshot_enabled',
'tracked_shard_schedule',
)
Expand Down
2 changes: 1 addition & 1 deletion pytest/tests/sanity/state_sync_epoch_boundary.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
}
},
'state_sync_enabled': True,
'state_sync_timeout': {
'consensus.state_sync_timeout': {
'secs': 0,
'nanos': 500000000
},
Expand Down
200 changes: 200 additions & 0 deletions pytest/tests/sanity/state_sync_then_catchup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
#!/usr/bin/env python3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It's good practice to wrap the code in a unittest and a _ _ main _ _ like so:
https://github.com/near/nearcore/tree/master/pytest#creating-new-tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added main(), thanks!
Nayduck doesn't expect these tests to be unittests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think as long as you run unittest.run() from within main it should work fine. Did you try and did it not work? Maybe you also need to put the test logic in a method with name beginning with test_. Not a biggie though, don't worry about it too much.

# Spins up one validating node.
# Spins a non-validating node that tracks some shards and the set of tracked shards changes regularly.
# The node gets stopped, and gets restarted close to an epoch boundary but in a way to trigger epoch sync.
nikurt marked this conversation as resolved.
Show resolved Hide resolved
#
# After the state sync the node has to do a catchup.
#
# Note that the test must generate outgoing receipts for most shards almost
# every block in order to crash if creation of partial encoded chunks becomes
# non-deterministic.

import pathlib
import random
import sys
import tempfile

sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib'))

from cluster import init_cluster, spin_up_node, load_config, apply_config_changes
import account
import transaction
import utils

from configured_logger import logger

EPOCH_LENGTH = 50

state_parts_dir = str(pathlib.Path(tempfile.gettempdir()) / 'state_parts')

config0 = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: refactor config generation to a helper function (most of the two configs is identical)

Copy link
Contributor Author

@nikurt nikurt Aug 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More than half of the fields are different or have different values:

  • state_sync.dump vs state_sync.sync
  • store.state_snapshot_enabled
  • tracked_shards
  • traced_shard_schedule
  • state_sync_enabled
  • consensus.state_sync_timeout

'enable_multiline_logging': False,
'gc_num_epochs_to_keep': 100,
'log_summary_period': {
'secs': 0,
'nanos': 100000000
},
'log_summary_style': 'plain',
'state_sync': {
'dump': {
'location': {
'Filesystem': {
'root_dir': state_parts_dir
}
},
'iteration_delay': {
'secs': 0,
'nanos': 100000000
},
}
},
'store.state_snapshot_enabled': True,
'tracked_shards': [0],
}
config1 = {
'enable_multiline_logging': False,
'gc_num_epochs_to_keep': 100,
'log_summary_period': {
'secs': 0,
'nanos': 100000000
},
'log_summary_style': 'plain',
'state_sync': {
'sync': {
'ExternalStorage': {
'location': {
'Filesystem': {
'root_dir': state_parts_dir
}
}
}
}
},
'state_sync_enabled': True,
'consensus.state_sync_timeout': {
'secs': 0,
'nanos': 500000000
},
'tracked_shard_schedule': [[0], [0], [1], [2], [3], [1], [2], [3],],
'tracked_shards': [],
}
logger.info(f'state_parts_dir: {state_parts_dir}')
logger.info(f'config0: {config0}')
logger.info(f'config1: {config1}')

config = load_config()
near_root, node_dirs = init_cluster(1, 1, 4, config,
[["epoch_length", EPOCH_LENGTH]], {
0: config0,
1: config1
})

boot_node = spin_up_node(config, near_root, node_dirs[0], 0)
logger.info('started boot_node')
node1 = spin_up_node(config, near_root, node_dirs[1], 1, boot_node=boot_node)
logger.info('started node1')

contract = utils.load_test_contract()

latest_block_hash = boot_node.get_latest_block().hash_bytes
deploy_contract_tx = transaction.sign_deploy_contract_tx(
boot_node.signer_key, contract, 10, latest_block_hash)
result = boot_node.send_tx_and_wait(deploy_contract_tx, 10)
assert 'result' in result and 'error' not in result, (
'Expected "result" and no "error" in response, got: {}'.format(result))

latest_block_hash = boot_node.get_latest_block().hash_bytes
deploy_contract_tx = transaction.sign_deploy_contract_tx(
node1.signer_key, contract, 10, latest_block_hash)
result = boot_node.send_tx_and_wait(deploy_contract_tx, 10)
assert 'result' in result and 'error' not in result, (
'Expected "result" and no "error" in response, got: {}'.format(result))


def epoch_height(block_height):
if block_height == 0:
return 0
if block_height <= EPOCH_LENGTH:
# According to the protocol specifications, there are two epochs with height 1.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL & WTF

return "1*"
return int((block_height - 1) / EPOCH_LENGTH)


# Generates traffic for all possible shards.
# Assumes that `test0`, `test1`, `near` all belong to different shards.
def random_workload_until(target, nonce, keys, target_node):
last_height = -1
while True:
nonce += 1

last_block = target_node.get_latest_block()
height = last_block.height
if height > target:
break
if height != last_height:
logger.info(f'@{height}, epoch_height: {epoch_height(height)}')
last_height = height

last_block_hash = boot_node.get_latest_block().hash_bytes
if (len(keys) > 100 and random.random() < 0.2) or len(keys) > 1000:
key = keys[random.randint(0, len(keys) - 1)]
call_function('read', key, nonce, boot_node.signer_key,
last_block_hash)
call_function('read', key, nonce, node1.signer_key, last_block_hash)
elif random.random() < 0.5:
if random.random() < 0.3:
key_from, account_to = boot_node.signer_key, node1.signer_key.account_id
elif random.random() < 0.3:
key_from, account_to = boot_node.signer_key, "near"
elif random.random() < 0.5:
key_from, account_to = node1.signer_key, boot_node.signer_key.account_id
else:
key_from, account_to = node1.signer_key, "near"
payment_tx = transaction.sign_payment_tx(key_from, account_to, 1,
nonce, last_block_hash)
boot_node.send_tx(payment_tx).get('result')
else:
key = random_u64()
keys.append(key)
call_function('write', key, nonce, boot_node.signer_key,
last_block_hash)
call_function('write', key, nonce, node1.signer_key,
last_block_hash)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a small comment on what exactly are you doing here?
Also nit: early returns may flatten this code a bit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Restructured and simplified and commented.

return (nonce, keys)


def random_u64():
return bytes(random.randint(0, 255) for _ in range(8))


def call_function(op, key, nonce, signer_key, last_block_hash):
if op == 'read':
args = key
fn = 'read_value'
else:
args = key + random_u64()
fn = 'write_key_value'

tx = transaction.sign_function_call_tx(signer_key, signer_key.account_id,
fn, args, 300 * account.TGAS, 0,
nonce, last_block_hash)
return boot_node.send_tx(tx).get('result')


nonce, keys = random_workload_until(EPOCH_LENGTH - 5, 1, [], boot_node)

node1_height = node1.get_latest_block().height
logger.info(f'node1@{node1_height}')
node1.kill()
logger.info(f'killed node1')

# Run node0 more to trigger block sync in node1.
nonce, keys = random_workload_until(int(EPOCH_LENGTH * 3), nonce, keys,
boot_node)

# Node1 is now behind and needs to do header sync and block sync.
node1.start(boot_node=boot_node)
node1_height = node1.get_latest_block().height
logger.info(f'started node1@{node1_height}')

nonce, keys = random_workload_until(int(EPOCH_LENGTH * 3.7), nonce, keys, node1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any checks that you also need to do here or does the workload do it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No checks needed. Without the fix the node simply crashes and doesn't reach the target height.

Loading