Skip to content

Commit

Permalink
(fix): Making stress.py with node_restart mode pass, and fixing #2916 (
Browse files Browse the repository at this point in the history
…#3036)

After this change stress.py node_restart passes relatively consistently, and is reintroduced to nightly.

Nearcore fixes:

- We had a bug in the syncing logic (with a low chance of being
triggered in the wild): if a block is produced, and between 1/3 and 2/3
of block producers received it, and the rest have not, the system
stalls, because no 2/3 of block producers have the same head, but also
nobody is two blocks behind the highest peer to start syncing. Fixing it
by forcing sync if we've been 1 block behind for too long.
stress.py was reproducing this issue in every run

- (#2916) we had an issue that if a node produced a chunk, and then
crashed, on recovery it was not able to serve it because it didn't have
all the parts and receipts stored in the storage from which we recover
cache entries in the shards manager. Fixing it by always storing all the
parts and receipts (redundantly) for chunks in the shards we care about.

Test fixes

[v] Fixing a scenario in which a failure to send a transaction to all
validators resulted in recording an incorrect tx hash alongside the tx.
Later when checking balances using the incorrect hash resulted in
getting incorrect success value, and thus applying incorrect corrections
to the expected balances;

[v] Changing the order of magnitude of staking transactions, so that the
validator set actually changes.

Other issues discovered while fixing stress.py:
- #2906
  • Loading branch information
SkidanovAlex authored and bowenwang1996 committed Aug 14, 2020
1 parent 2da194e commit 062dce7
Show file tree
Hide file tree
Showing 10 changed files with 351 additions and 86 deletions.
16 changes: 14 additions & 2 deletions chain/chunks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1226,14 +1226,24 @@ impl ShardsManager {
chunk_entry: &EncodedChunksCacheEntry,
store_update: &mut ChainStoreUpdate<'_>,
) {
let cares_about_shard = self.cares_about_shard_this_or_next_epoch(
self.me.as_ref(),
&chunk_entry.header.inner.prev_block_hash,
chunk_entry.header.inner.shard_id,
true,
);
let prev_block_hash = chunk_entry.header.inner.prev_block_hash;
let partial_chunk = PartialEncodedChunk {
header: chunk_entry.header.clone(),
parts: chunk_entry
.parts
.iter()
.filter_map(|(part_ord, part_entry)| {
if let Ok(need_part) = self.need_part(&prev_block_hash, *part_ord) {
if cares_about_shard
|| self.need_part(&prev_block_hash, *part_ord).unwrap_or(false)
{
Some(part_entry.clone())
} else if let Ok(need_part) = self.need_part(&prev_block_hash, *part_ord) {
if need_part {
Some(part_entry.clone())
} else {
Expand All @@ -1248,7 +1258,9 @@ impl ShardsManager {
.receipts
.iter()
.filter_map(|(shard_id, receipt)| {
if self.need_receipt(&prev_block_hash, *shard_id) {
if cares_about_shard || self.need_receipt(&prev_block_hash, *shard_id) {
Some(receipt.clone())
} else if self.need_receipt(&prev_block_hash, *shard_id) {
Some(receipt.clone())
} else {
None
Expand Down
43 changes: 41 additions & 2 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ use crate::StatusResponse;
const STATUS_WAIT_TIME_MULTIPLIER: u64 = 10;
/// Drop blocks whose height are beyond head + horizon.
const BLOCK_HORIZON: u64 = 500;
/// How many intervals of max_block_production_delay to wait being several blocks behind before
/// kicking off syncing
const SEVERAL_BLOCKS_BEHIND_WAIT_MULTIPLIER: u32 = 5;

pub struct ClientActor {
/// Adversarial controls
Expand Down Expand Up @@ -981,7 +984,7 @@ impl ClientActor {
let mut is_syncing = self.client.sync_status.is_syncing();

let full_peer_info = if let Some(full_peer_info) =
highest_height_peer(&self.network_info.highest_height_peers)
highest_height_peer(&self.network_info.highest_height_peers, head.height)
{
full_peer_info
} else {
Expand Down Expand Up @@ -1011,6 +1014,21 @@ impl ClientActor {
full_peer_info.chain_info.height,
);
is_syncing = true;
} else {
if let SyncStatus::NoSyncSeveralBlocksBehind { since_when, our_height } =
self.client.sync_status
{
let now = Utc::now();
if now > since_when
&& (now - since_when).to_std().unwrap()
>= self.client.config.max_block_production_delay
* SEVERAL_BLOCKS_BEHIND_WAIT_MULTIPLIER
&& our_height == head.height
{
info!(target: "sync", "Have been at the same height for too long, while peers have newer bocks. Forcing synchronization");
is_syncing = true;
}
}
}
}
Ok((is_syncing, full_peer_info.chain_info.height))
Expand Down Expand Up @@ -1134,6 +1152,27 @@ impl ClientActor {
self.check_send_announce_account(head.prev_block_hash);
}
wait_period = self.client.config.sync_check_period;

// Handle the case in which we failed to receive a block, and our inactivity prevents
// the network from making progress
if let Ok(head) = self.client.chain.head() {
match self.client.sync_status {
SyncStatus::NoSync => {
if head.height < highest_height {
self.client.sync_status = SyncStatus::NoSyncSeveralBlocksBehind {
since_when: Utc::now(),
our_height: head.height,
}
}
}
SyncStatus::NoSyncSeveralBlocksBehind { our_height, .. } => {
if head.height > our_height {
self.client.sync_status = SyncStatus::NoSync;
}
}
_ => {}
}
}
} else {
// Run each step of syncing separately.
unwrap_or_run_later!(self.client.header_sync.run(
Expand Down Expand Up @@ -1203,7 +1242,7 @@ impl ClientActor {
self.client.sync_status = SyncStatus::StateSync(sync_hash, new_shard_sync);
if fetch_block {
if let Some(peer_info) =
highest_height_peer(&self.network_info.highest_height_peers)
highest_height_peer(&self.network_info.highest_height_peers, 0)
{
if let Ok(header) = self.client.chain.get_block_header(&sync_hash) {
for hash in
Expand Down
4 changes: 3 additions & 1 deletion chain/client/src/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ fn display_sync_status(
) -> String {
match sync_status {
SyncStatus::AwaitingPeers => format!("#{:>8} Waiting for peers", head.height),
SyncStatus::NoSync => format!("#{:>8} {:>44}", head.height, head.last_block_hash),
SyncStatus::NoSync | SyncStatus::NoSyncSeveralBlocksBehind { .. } => {
format!("#{:>8} {:>44}", head.height, head.last_block_hash)
}
SyncStatus::HeaderSync { current_height, highest_height } => {
let percent = if *highest_height <= genesis_height {
0
Expand Down
30 changes: 22 additions & 8 deletions chain/client/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ansi_term::Color::{Purple, Yellow};
use chrono::{DateTime, Duration, Utc};
use futures::{future, FutureExt};
use log::{debug, error, info, warn};
use rand::seq::SliceRandom;
use rand::seq::{IteratorRandom, SliceRandom};
use rand::{thread_rng, Rng};

use near_chain::types::BlockSyncResponse;
Expand Down Expand Up @@ -52,12 +52,22 @@ pub const MAX_PENDING_PART: u64 = MAX_STATE_PART_REQUEST * 10000;
pub const NS_PER_SECOND: u128 = 1_000_000_000;

/// Get random peer from the hightest height peers.
pub fn highest_height_peer(highest_height_peers: &Vec<FullPeerInfo>) -> Option<FullPeerInfo> {
pub fn highest_height_peer(
highest_height_peers: &Vec<FullPeerInfo>,
min_height: BlockHeight,
) -> Option<FullPeerInfo> {
if highest_height_peers.len() == 0 {
return None;
}
let index = thread_rng().gen_range(0, highest_height_peers.len());
Some(highest_height_peers[index].clone())

match highest_height_peers
.iter()
.filter(|peer| peer.chain_info.height > min_height)
.choose(&mut thread_rng())
{
None => highest_height_peers.choose(&mut thread_rng()).cloned(),
Some(peer) => Some(peer.clone()),
}
}

/// Helper to keep track of sync headers.
Expand Down Expand Up @@ -112,7 +122,9 @@ impl HeaderSync {
SyncStatus::HeaderSync { .. }
| SyncStatus::BodySync { .. }
| SyncStatus::StateSyncDone => true,
SyncStatus::NoSync | SyncStatus::AwaitingPeers => {
SyncStatus::NoSync
| SyncStatus::NoSyncSeveralBlocksBehind { .. }
| SyncStatus::AwaitingPeers => {
let sync_head = chain.sync_head()?;
debug!(target: "sync", "Sync: initial transition to Header sync. Sync head: {} at {}, resetting to {} at {}",
sync_head.last_block_hash, sync_head.height,
Expand All @@ -131,7 +143,7 @@ impl HeaderSync {
SyncStatus::HeaderSync { current_height: header_head.height, highest_height };
let header_head = chain.header_head()?;
self.syncing_peer = None;
if let Some(peer) = highest_height_peer(&highest_height_peers) {
if let Some(peer) = highest_height_peer(&highest_height_peers, 0) {
if peer.chain_info.height > header_head.height {
self.syncing_peer = self.request_headers(chain, peer);
}
Expand Down Expand Up @@ -169,9 +181,11 @@ impl HeaderSync {
// Did we receive as many headers as we expected from the peer? Request more or ban peer.
let stalling = header_head.height <= old_expected_height && now > timeout;

// Always enable header sync on initial state transition from NoSync / AwaitingPeers.
// Always enable header sync on initial state transition from NoSync / NoSyncFewBlocksBehind / AwaitingPeers.
let force_sync = match sync_status {
SyncStatus::NoSync | SyncStatus::AwaitingPeers => true,
SyncStatus::NoSync
| SyncStatus::NoSyncSeveralBlocksBehind { .. }
| SyncStatus::AwaitingPeers => true,
_ => false,
};

Expand Down
4 changes: 3 additions & 1 deletion chain/client/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ pub enum SyncStatus {
AwaitingPeers,
/// Not syncing / Done syncing.
NoSync,
/// Not syncing, but have a peer that is one block ahead.
NoSyncSeveralBlocksBehind { since_when: DateTime<Utc>, our_height: BlockHeight },
/// Downloading block headers for fast sync.
HeaderSync { current_height: BlockHeight, highest_height: BlockHeight },
/// State sync, with different states of state sync for different shards.
Expand All @@ -140,7 +142,7 @@ impl SyncStatus {
/// True if currently engaged in syncing the chain.
pub fn is_syncing(&self) -> bool {
match self {
SyncStatus::NoSync => false,
SyncStatus::NoSync | SyncStatus::NoSyncSeveralBlocksBehind { .. } => false,
_ => true,
}
}
Expand Down
5 changes: 4 additions & 1 deletion nightly/nightly.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ pytest --timeout=300 sanity/gc_after_sync1.py
pytest --timeout=300 sanity/gc_sync_after_sync.py
pytest --timeout=300 sanity/gc_sync_after_sync.py swap_nodes
pytest --timeout=300 sanity/large_messages.py
pytest sanity/controlled_edge_nonce.py
pytest sanity/repro_2916.py
pytest --timeout=240 sanity/switch_node_key.py
# TODO: re-enable after #2949 is fixed
# pytest --timeout=240 sanity/validator_switch_key.py
pytest sanity/proxy_simple.py
Expand All @@ -52,7 +55,7 @@ pytest --timeout=240 contracts/gibberish.py

# python stress tests
# pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions local_network
# pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions node_restart
pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions node_restart
# pytest --timeout=2000 stress/stress.py 3 2 4 0 staking transactions node_set

# pytest stress/network_stress.py
Expand Down
12 changes: 11 additions & 1 deletion pytest/lib/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import retrying
import rc
from rc import gcloud
import traceback
import uuid
import network
from proxy import NodesProxy
Expand All @@ -37,6 +38,7 @@ def atexit_cleanup(node):
node.cleanup()
except:
print("Cleaning failed!")
traceback.print_exc()
pass


Expand Down Expand Up @@ -276,6 +278,7 @@ def __init__(self,
config_json['network']['addr'] = '0.0.0.0:%s' % port
config_json['network']['blacklist'] = blacklist
config_json['rpc']['addr'] = '0.0.0.0:%s' % rpc_port
config_json['rpc']['metrics_addr'] = '0.0.0.0:%s' % (rpc_port + 1000)
config_json['consensus']['min_num_peers'] = 1
with open(os.path.join(node_dir, "config.json"), 'w') as f:
f.write(json.dumps(config_json, indent=2))
Expand Down Expand Up @@ -349,7 +352,14 @@ def reset_validator_key(self, new_key):
def cleanup(self):
if self.cleaned:
return
self.kill()

try:
self.kill()
except:
print("Kill failed on cleanup!")
traceback.print_exc()
print("\n\n")

# move the node dir to avoid weird interactions with multiple serial test invocations
target_path = self.node_dir + '_finished'
if os.path.exists(target_path) and os.path.isdir(target_path):
Expand Down
2 changes: 2 additions & 0 deletions pytest/lib/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ def serialize_struct(self, obj):
self.serialize_num(idx, 1)
self.serialize_field(getattr(obj, fieldName), fieldType)
break
else:
assert False, name
else:
assert False, structSchema

Expand Down
108 changes: 108 additions & 0 deletions pytest/tests/sanity/repro_2916.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Spins up two nodes with two shards, waits for couple blocks, snapshots the
# latest chunks, and requests both chunks from the first node, asking for
# receipts for both shards in both requests. We expect the first node to
# respond to exactly one of the requests, for the shard it tracks (for the
# shard it doesn't track it will only have the receipts to the shard it does
# track).
#
# We then kill both nodes, and restart the first node, and do the same
# requests. We expect it to resond the same way. Before 2916 is fixed, it
# fails to respond to the request it was previously responding to due to
# incorrect reconstruction of the receipts.

import asyncio, sys, time
import socket, base58
import nacl.signing, hashlib

sys.path.append('lib')

from cluster import start_cluster
from peer import *
from utils import obj_to_string

from messages.tx import *
from messages.block import *
from messages.crypto import *
from messages.network import *

async def main():
# start a cluster with two shards
nodes = start_cluster(2, 0, 2, None, [], {})

started = time.time()

while True:
if time.time() - started > 10:
assert False, "Giving up waiting for two blocks"

status = nodes[0].get_status()
hash_ = status['sync_info']['latest_block_hash']
height = status['sync_info']['latest_block_height']

if height > 2:
block = nodes[0].get_block(hash_)
chunk_hashes = [base58.b58decode(x['chunk_hash']) for x in block['result']['chunks']]

assert len(chunk_hashes) == 2
assert all([len(x) == 32 for x in chunk_hashes])

break

my_key_pair_nacl = nacl.signing.SigningKey.generate()
received_responses = [None, None]

# step = 0: before the node is killed
# step = 1: after the node is killed
for step in range(2):

conn0 = await connect(nodes[0].addr())
await run_handshake(conn0, nodes[0].node_key.pk, my_key_pair_nacl)
for shard_ord, chunk_hash in enumerate(chunk_hashes):

request = PartialEncodedChunkRequestMsg()
request.chunk_hash = chunk_hash
request.part_ords = []
request.tracking_shards = [0, 1]

routed_msg_body = RoutedMessageBody()
routed_msg_body.enum = 'PartialEncodedChunkRequest'
routed_msg_body.PartialEncodedChunkRequest = request

peer_message = create_and_sign_routed_peer_message(routed_msg_body, nodes[0], my_key_pair_nacl)

await conn0.send(peer_message)

received_response = False

def predicate(response):
return response.enum == 'Routed' and response.Routed.body.enum == 'PartialEncodedChunkResponse'

try:
response = await asyncio.wait_for(conn0.recv(predicate), 5)
except concurrent.futures._base.TimeoutError:
response = None

if response is not None:
print("Received response for shard %s" % shard_ord)
received_response = True
else:
print("Didn't receive response for shard %s" % shard_ord)

if step == 0:
received_responses[shard_ord] = received_response
else:
assert received_responses[shard_ord] == received_response, "The response doesn't match for the chunk in shard %s. Received response before node killed: %s, after: %s" % (shard_ord, received_responses[shard_ord], received_response)

# we expect first node to only respond to one of the chunk requests, for the shard assigned to it
assert received_responses[0] != received_responses[1], received_responses

if step == 0:
print("Killing and restarting nodes")
nodes[1].kill()
nodes[0].kill()
nodes[0].start(None, None)
time.sleep(1)


asyncio.run(main())

Loading

0 comments on commit 062dce7

Please sign in to comment.