From 062dce76ae9c0ba3fa05974b2a183a0fa7ac41a9 Mon Sep 17 00:00:00 2001 From: Alexander Skidanov Date: Thu, 30 Jul 2020 21:50:39 -0700 Subject: [PATCH] (fix): Making stress.py with node_restart mode pass, and fixing #2916 (#3036) After this change stress.py node_restart passes relatively consistently, and is reintroduced to nightly. Nearcore fixes: - We had a bug in the syncing logic (with a low chance of being triggered in the wild): if a block is produced, and between 1/3 and 2/3 of block producers received it, and the rest have not, the system stalls, because no 2/3 of block producers have the same head, but also nobody is two blocks behind the highest peer to start syncing. Fixing it by forcing sync if we've been 1 block behind for too long. stress.py was reproducing this issue in every run - (#2916) we had an issue that if a node produced a chunk, and then crashed, on recovery it was not able to serve it because it didn't have all the parts and receipts stored in the storage from which we recover cache entries in the shards manager. Fixing it by always storing all the parts and receipts (redundantly) for chunks in the shards we care about. Test fixes [v] Fixing a scenario in which a failure to send a transaction to all validators resulted in recording an incorrect tx hash alongside the tx. Later when checking balances using the incorrect hash resulted in getting incorrect success value, and thus applying incorrect corrections to the expected balances; [v] Changing the order of magnitude of staking transactions, so that the validator set actually changes. Other issues discovered while fixing stress.py: - https://github.com/nearprotocol/nearcore/issues/2906 --- chain/chunks/src/lib.rs | 16 ++- chain/client/src/client_actor.rs | 43 +++++- chain/client/src/info.rs | 4 +- chain/client/src/sync.rs | 30 +++-- chain/client/src/types.rs | 4 +- nightly/nightly.txt | 5 +- pytest/lib/cluster.py | 12 +- pytest/lib/serializer.py | 2 + pytest/tests/sanity/repro_2916.py | 108 +++++++++++++++ pytest/tests/stress/stress.py | 213 ++++++++++++++++++++---------- 10 files changed, 351 insertions(+), 86 deletions(-) create mode 100644 pytest/tests/sanity/repro_2916.py diff --git a/chain/chunks/src/lib.rs b/chain/chunks/src/lib.rs index 314d068917d..32fe98d4fca 100644 --- a/chain/chunks/src/lib.rs +++ b/chain/chunks/src/lib.rs @@ -1226,6 +1226,12 @@ impl ShardsManager { chunk_entry: &EncodedChunksCacheEntry, store_update: &mut ChainStoreUpdate<'_>, ) { + let cares_about_shard = self.cares_about_shard_this_or_next_epoch( + self.me.as_ref(), + &chunk_entry.header.inner.prev_block_hash, + chunk_entry.header.inner.shard_id, + true, + ); let prev_block_hash = chunk_entry.header.inner.prev_block_hash; let partial_chunk = PartialEncodedChunk { header: chunk_entry.header.clone(), @@ -1233,7 +1239,11 @@ impl ShardsManager { .parts .iter() .filter_map(|(part_ord, part_entry)| { - if let Ok(need_part) = self.need_part(&prev_block_hash, *part_ord) { + if cares_about_shard + || self.need_part(&prev_block_hash, *part_ord).unwrap_or(false) + { + Some(part_entry.clone()) + } else if let Ok(need_part) = self.need_part(&prev_block_hash, *part_ord) { if need_part { Some(part_entry.clone()) } else { @@ -1248,7 +1258,9 @@ impl ShardsManager { .receipts .iter() .filter_map(|(shard_id, receipt)| { - if self.need_receipt(&prev_block_hash, *shard_id) { + if cares_about_shard || self.need_receipt(&prev_block_hash, *shard_id) { + Some(receipt.clone()) + } else if self.need_receipt(&prev_block_hash, *shard_id) { Some(receipt.clone()) } else { None diff --git a/chain/client/src/client_actor.rs b/chain/client/src/client_actor.rs index 94ff9a38898..0427b98168a 100644 --- a/chain/client/src/client_actor.rs +++ b/chain/client/src/client_actor.rs @@ -55,6 +55,9 @@ use crate::StatusResponse; const STATUS_WAIT_TIME_MULTIPLIER: u64 = 10; /// Drop blocks whose height are beyond head + horizon. const BLOCK_HORIZON: u64 = 500; +/// How many intervals of max_block_production_delay to wait being several blocks behind before +/// kicking off syncing +const SEVERAL_BLOCKS_BEHIND_WAIT_MULTIPLIER: u32 = 5; pub struct ClientActor { /// Adversarial controls @@ -981,7 +984,7 @@ impl ClientActor { let mut is_syncing = self.client.sync_status.is_syncing(); let full_peer_info = if let Some(full_peer_info) = - highest_height_peer(&self.network_info.highest_height_peers) + highest_height_peer(&self.network_info.highest_height_peers, head.height) { full_peer_info } else { @@ -1011,6 +1014,21 @@ impl ClientActor { full_peer_info.chain_info.height, ); is_syncing = true; + } else { + if let SyncStatus::NoSyncSeveralBlocksBehind { since_when, our_height } = + self.client.sync_status + { + let now = Utc::now(); + if now > since_when + && (now - since_when).to_std().unwrap() + >= self.client.config.max_block_production_delay + * SEVERAL_BLOCKS_BEHIND_WAIT_MULTIPLIER + && our_height == head.height + { + info!(target: "sync", "Have been at the same height for too long, while peers have newer bocks. Forcing synchronization"); + is_syncing = true; + } + } } } Ok((is_syncing, full_peer_info.chain_info.height)) @@ -1134,6 +1152,27 @@ impl ClientActor { self.check_send_announce_account(head.prev_block_hash); } wait_period = self.client.config.sync_check_period; + + // Handle the case in which we failed to receive a block, and our inactivity prevents + // the network from making progress + if let Ok(head) = self.client.chain.head() { + match self.client.sync_status { + SyncStatus::NoSync => { + if head.height < highest_height { + self.client.sync_status = SyncStatus::NoSyncSeveralBlocksBehind { + since_when: Utc::now(), + our_height: head.height, + } + } + } + SyncStatus::NoSyncSeveralBlocksBehind { our_height, .. } => { + if head.height > our_height { + self.client.sync_status = SyncStatus::NoSync; + } + } + _ => {} + } + } } else { // Run each step of syncing separately. unwrap_or_run_later!(self.client.header_sync.run( @@ -1203,7 +1242,7 @@ impl ClientActor { self.client.sync_status = SyncStatus::StateSync(sync_hash, new_shard_sync); if fetch_block { if let Some(peer_info) = - highest_height_peer(&self.network_info.highest_height_peers) + highest_height_peer(&self.network_info.highest_height_peers, 0) { if let Ok(header) = self.client.chain.get_block_header(&sync_hash) { for hash in diff --git a/chain/client/src/info.rs b/chain/client/src/info.rs index 3ceee03b356..489cd8be2e4 100644 --- a/chain/client/src/info.rs +++ b/chain/client/src/info.rs @@ -168,7 +168,9 @@ fn display_sync_status( ) -> String { match sync_status { SyncStatus::AwaitingPeers => format!("#{:>8} Waiting for peers", head.height), - SyncStatus::NoSync => format!("#{:>8} {:>44}", head.height, head.last_block_hash), + SyncStatus::NoSync | SyncStatus::NoSyncSeveralBlocksBehind { .. } => { + format!("#{:>8} {:>44}", head.height, head.last_block_hash) + } SyncStatus::HeaderSync { current_height, highest_height } => { let percent = if *highest_height <= genesis_height { 0 diff --git a/chain/client/src/sync.rs b/chain/client/src/sync.rs index f47c19e1de2..49d58979bd5 100644 --- a/chain/client/src/sync.rs +++ b/chain/client/src/sync.rs @@ -9,7 +9,7 @@ use ansi_term::Color::{Purple, Yellow}; use chrono::{DateTime, Duration, Utc}; use futures::{future, FutureExt}; use log::{debug, error, info, warn}; -use rand::seq::SliceRandom; +use rand::seq::{IteratorRandom, SliceRandom}; use rand::{thread_rng, Rng}; use near_chain::types::BlockSyncResponse; @@ -52,12 +52,22 @@ pub const MAX_PENDING_PART: u64 = MAX_STATE_PART_REQUEST * 10000; pub const NS_PER_SECOND: u128 = 1_000_000_000; /// Get random peer from the hightest height peers. -pub fn highest_height_peer(highest_height_peers: &Vec) -> Option { +pub fn highest_height_peer( + highest_height_peers: &Vec, + min_height: BlockHeight, +) -> Option { if highest_height_peers.len() == 0 { return None; } - let index = thread_rng().gen_range(0, highest_height_peers.len()); - Some(highest_height_peers[index].clone()) + + match highest_height_peers + .iter() + .filter(|peer| peer.chain_info.height > min_height) + .choose(&mut thread_rng()) + { + None => highest_height_peers.choose(&mut thread_rng()).cloned(), + Some(peer) => Some(peer.clone()), + } } /// Helper to keep track of sync headers. @@ -112,7 +122,9 @@ impl HeaderSync { SyncStatus::HeaderSync { .. } | SyncStatus::BodySync { .. } | SyncStatus::StateSyncDone => true, - SyncStatus::NoSync | SyncStatus::AwaitingPeers => { + SyncStatus::NoSync + | SyncStatus::NoSyncSeveralBlocksBehind { .. } + | SyncStatus::AwaitingPeers => { let sync_head = chain.sync_head()?; debug!(target: "sync", "Sync: initial transition to Header sync. Sync head: {} at {}, resetting to {} at {}", sync_head.last_block_hash, sync_head.height, @@ -131,7 +143,7 @@ impl HeaderSync { SyncStatus::HeaderSync { current_height: header_head.height, highest_height }; let header_head = chain.header_head()?; self.syncing_peer = None; - if let Some(peer) = highest_height_peer(&highest_height_peers) { + if let Some(peer) = highest_height_peer(&highest_height_peers, 0) { if peer.chain_info.height > header_head.height { self.syncing_peer = self.request_headers(chain, peer); } @@ -169,9 +181,11 @@ impl HeaderSync { // Did we receive as many headers as we expected from the peer? Request more or ban peer. let stalling = header_head.height <= old_expected_height && now > timeout; - // Always enable header sync on initial state transition from NoSync / AwaitingPeers. + // Always enable header sync on initial state transition from NoSync / NoSyncFewBlocksBehind / AwaitingPeers. let force_sync = match sync_status { - SyncStatus::NoSync | SyncStatus::AwaitingPeers => true, + SyncStatus::NoSync + | SyncStatus::NoSyncSeveralBlocksBehind { .. } + | SyncStatus::AwaitingPeers => true, _ => false, }; diff --git a/chain/client/src/types.rs b/chain/client/src/types.rs index 0e631963b6c..315e91b806e 100644 --- a/chain/client/src/types.rs +++ b/chain/client/src/types.rs @@ -121,6 +121,8 @@ pub enum SyncStatus { AwaitingPeers, /// Not syncing / Done syncing. NoSync, + /// Not syncing, but have a peer that is one block ahead. + NoSyncSeveralBlocksBehind { since_when: DateTime, our_height: BlockHeight }, /// Downloading block headers for fast sync. HeaderSync { current_height: BlockHeight, highest_height: BlockHeight }, /// State sync, with different states of state sync for different shards. @@ -140,7 +142,7 @@ impl SyncStatus { /// True if currently engaged in syncing the chain. pub fn is_syncing(&self) -> bool { match self { - SyncStatus::NoSync => false, + SyncStatus::NoSync | SyncStatus::NoSyncSeveralBlocksBehind { .. } => false, _ => true, } } diff --git a/nightly/nightly.txt b/nightly/nightly.txt index 2c7a8e6b752..99464fa151d 100644 --- a/nightly/nightly.txt +++ b/nightly/nightly.txt @@ -37,6 +37,9 @@ pytest --timeout=300 sanity/gc_after_sync1.py pytest --timeout=300 sanity/gc_sync_after_sync.py pytest --timeout=300 sanity/gc_sync_after_sync.py swap_nodes pytest --timeout=300 sanity/large_messages.py +pytest sanity/controlled_edge_nonce.py +pytest sanity/repro_2916.py +pytest --timeout=240 sanity/switch_node_key.py # TODO: re-enable after #2949 is fixed # pytest --timeout=240 sanity/validator_switch_key.py pytest sanity/proxy_simple.py @@ -52,7 +55,7 @@ pytest --timeout=240 contracts/gibberish.py # python stress tests # pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions local_network -# pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions node_restart +pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions node_restart # pytest --timeout=2000 stress/stress.py 3 2 4 0 staking transactions node_set # pytest stress/network_stress.py diff --git a/pytest/lib/cluster.py b/pytest/lib/cluster.py index fc6db3b808e..7a8e025e5cf 100644 --- a/pytest/lib/cluster.py +++ b/pytest/lib/cluster.py @@ -15,6 +15,7 @@ import retrying import rc from rc import gcloud +import traceback import uuid import network from proxy import NodesProxy @@ -37,6 +38,7 @@ def atexit_cleanup(node): node.cleanup() except: print("Cleaning failed!") + traceback.print_exc() pass @@ -276,6 +278,7 @@ def __init__(self, config_json['network']['addr'] = '0.0.0.0:%s' % port config_json['network']['blacklist'] = blacklist config_json['rpc']['addr'] = '0.0.0.0:%s' % rpc_port + config_json['rpc']['metrics_addr'] = '0.0.0.0:%s' % (rpc_port + 1000) config_json['consensus']['min_num_peers'] = 1 with open(os.path.join(node_dir, "config.json"), 'w') as f: f.write(json.dumps(config_json, indent=2)) @@ -349,7 +352,14 @@ def reset_validator_key(self, new_key): def cleanup(self): if self.cleaned: return - self.kill() + + try: + self.kill() + except: + print("Kill failed on cleanup!") + traceback.print_exc() + print("\n\n") + # move the node dir to avoid weird interactions with multiple serial test invocations target_path = self.node_dir + '_finished' if os.path.exists(target_path) and os.path.isdir(target_path): diff --git a/pytest/lib/serializer.py b/pytest/lib/serializer.py index a7a1b804a3f..95ecdc56f88 100644 --- a/pytest/lib/serializer.py +++ b/pytest/lib/serializer.py @@ -113,6 +113,8 @@ def serialize_struct(self, obj): self.serialize_num(idx, 1) self.serialize_field(getattr(obj, fieldName), fieldType) break + else: + assert False, name else: assert False, structSchema diff --git a/pytest/tests/sanity/repro_2916.py b/pytest/tests/sanity/repro_2916.py new file mode 100644 index 00000000000..cb1254cedf7 --- /dev/null +++ b/pytest/tests/sanity/repro_2916.py @@ -0,0 +1,108 @@ +# Spins up two nodes with two shards, waits for couple blocks, snapshots the +# latest chunks, and requests both chunks from the first node, asking for +# receipts for both shards in both requests. We expect the first node to +# respond to exactly one of the requests, for the shard it tracks (for the +# shard it doesn't track it will only have the receipts to the shard it does +# track). +# +# We then kill both nodes, and restart the first node, and do the same +# requests. We expect it to resond the same way. Before 2916 is fixed, it +# fails to respond to the request it was previously responding to due to +# incorrect reconstruction of the receipts. + +import asyncio, sys, time +import socket, base58 +import nacl.signing, hashlib + +sys.path.append('lib') + +from cluster import start_cluster +from peer import * +from utils import obj_to_string + +from messages.tx import * +from messages.block import * +from messages.crypto import * +from messages.network import * + +async def main(): + # start a cluster with two shards + nodes = start_cluster(2, 0, 2, None, [], {}) + + started = time.time() + + while True: + if time.time() - started > 10: + assert False, "Giving up waiting for two blocks" + + status = nodes[0].get_status() + hash_ = status['sync_info']['latest_block_hash'] + height = status['sync_info']['latest_block_height'] + + if height > 2: + block = nodes[0].get_block(hash_) + chunk_hashes = [base58.b58decode(x['chunk_hash']) for x in block['result']['chunks']] + + assert len(chunk_hashes) == 2 + assert all([len(x) == 32 for x in chunk_hashes]) + + break + + my_key_pair_nacl = nacl.signing.SigningKey.generate() + received_responses = [None, None] + +# step = 0: before the node is killed +# step = 1: after the node is killed + for step in range(2): + + conn0 = await connect(nodes[0].addr()) + await run_handshake(conn0, nodes[0].node_key.pk, my_key_pair_nacl) + for shard_ord, chunk_hash in enumerate(chunk_hashes): + + request = PartialEncodedChunkRequestMsg() + request.chunk_hash = chunk_hash + request.part_ords = [] + request.tracking_shards = [0, 1] + + routed_msg_body = RoutedMessageBody() + routed_msg_body.enum = 'PartialEncodedChunkRequest' + routed_msg_body.PartialEncodedChunkRequest = request + + peer_message = create_and_sign_routed_peer_message(routed_msg_body, nodes[0], my_key_pair_nacl) + + await conn0.send(peer_message) + + received_response = False + + def predicate(response): + return response.enum == 'Routed' and response.Routed.body.enum == 'PartialEncodedChunkResponse' + + try: + response = await asyncio.wait_for(conn0.recv(predicate), 5) + except concurrent.futures._base.TimeoutError: + response = None + + if response is not None: + print("Received response for shard %s" % shard_ord) + received_response = True + else: + print("Didn't receive response for shard %s" % shard_ord) + + if step == 0: + received_responses[shard_ord] = received_response + else: + assert received_responses[shard_ord] == received_response, "The response doesn't match for the chunk in shard %s. Received response before node killed: %s, after: %s" % (shard_ord, received_responses[shard_ord], received_response) + + # we expect first node to only respond to one of the chunk requests, for the shard assigned to it + assert received_responses[0] != received_responses[1], received_responses + + if step == 0: + print("Killing and restarting nodes") + nodes[1].kill() + nodes[0].kill() + nodes[0].start(None, None) + time.sleep(1) + + +asyncio.run(main()) + diff --git a/pytest/tests/stress/stress.py b/pytest/tests/stress/stress.py index 8af37b24d49..7108c427c2a 100644 --- a/pytest/tests/stress/stress.py +++ b/pytest/tests/stress/stress.py @@ -23,7 +23,7 @@ # `staking2.py` tests some basic stake invariants # This test also completely disables rewards, which simplifies ensuring total supply invariance and balance invariances -import sys, time, base58, random, inspect, traceback, requests +import sys, time, base58, random, inspect, traceback, requests, logging from multiprocessing import Process, Value, Lock sys.path.append('lib') @@ -34,12 +34,17 @@ from network import init_network_pillager, stop_network, resume_network sys.stdout = Unbuffered(sys.stdout) +logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO) TIMEOUT = 1500 # after how much time to shut down the test -TIMEOUT_SHUTDOWN = 120 # time to wait after the shutdown was initiated before -MAX_STAKE = int(1e26) +TIMEOUT_SHUTDOWN = 120 # time to wait after the shutdown was initiated before failing the test due to process stalling +MAX_STAKE = int(1e32) EPOCH_LENGTH = 20 +# How many times to try to send transactions to each validator. +# Is only applicable in the scenarios where we expect failures in tx sends. +SEND_TX_ATTEMPTS = 5 + block_timeout = 20 # if two blocks are not produced within that many seconds, the test will fail. The timeout is increased if nodes are restarted or network is being messed up with balances_timeout = 15 # how long to tolerate for balances to update after txs are sent tx_tolerance = 0.1 @@ -68,13 +73,20 @@ def wrapper(stopped, error, *args): return wrapper -def get_recent_hash(node): +def get_recent_hash(node, sync_timeout): # return the parent of the last block known to the observer # don't return the last block itself, since some validators might have not seen it yet # also returns the height of the actual last block (so the height doesn't match the hash!) status = node.get_status() hash_ = status['sync_info']['latest_block_hash'] info = node.json_rpc('block', [hash_]) + + for attempt in range(sync_timeout): + if 'error' in info and info['error']['data'] == 'IsSyncing': + time.sleep(1) + info = node.json_rpc('block', [hash_]) + + assert 'result' in info, info hash_ = info['result']['header']['hash'] return hash_, status['sync_info']['latest_block_height'] @@ -104,7 +116,7 @@ def get_future_time(): if time.time() < change_status_at[i]: continue if nodes_stopped[i]: - print("Node set: starting node %s" % i) + logging.info("Node set: starting node %s" % i) # figuring out a live node with `node_restart` monkey is not trivial # for simplicity just boot from the observer node # `node_restart` doesn't boot from the observer, increasing coverage @@ -115,8 +127,8 @@ def get_future_time(): wipe = False if random.choice([True, False]): wipe = True - #node.reset_data() - print("Node set: stopping%s node %s" % + #node.reset_data() # TODO + logging.info("Node set: stopping%s node %s" % (" and wiping" if wipe else "", i)) nodes_stopped[i] = not nodes_stopped[i] change_status_at[i] = get_future_time() @@ -135,19 +147,23 @@ def monkey_node_restart(stopped, error, nodes, nonces): node = nodes[node_idx] # don't kill the same node too frequently, give it time to reboot and produce something while True: - _, h = get_recent_hash(node) + _, h = get_recent_hash(node, 30) assert h >= heights_after_restart[node_idx], "%s > %s" % ( h, heights_after_restart[node_idx]) if h > heights_after_restart[node_idx]: break time.sleep(1) - print("NUKING NODE %s" % node_idx) + reset_data = random.choice([True, False, False]) + logging.info("NUKING NODE %s%s" % (node_idx, " AND WIPING ITS STORAGE" if reset_data else "")) node.kill() + if reset_data: + #node.reset_data() # TODO + pass node.start(boot_node.node_key.pk, boot_node.addr()) - print("NODE %s IS BACK UP" % node_idx) + logging.info("NODE %s IS BACK UP" % node_idx) - _, heights_after_restart[node_idx] = get_recent_hash(node) + _, heights_after_restart[node_idx] = get_recent_hash(node, 30) time.sleep(5) @@ -184,7 +200,7 @@ def get_balances(): expected_balances = get_balances() min_balances = [x - MAX_STAKE for x in expected_balances] total_supply = (sum(expected_balances)) - print("TOTAL SUPPLY: %s" % total_supply) + logging.info("TOTAL SUPPLY: %s" % total_supply) last_iter_switch = time.time() mode = 0 # 0 = send more tx, 1 = wait for balances @@ -198,13 +214,22 @@ def get_balances(): validator_ids = get_validator_ids(nodes) if time.time() - last_iter_switch > balances_timeout: if mode == 0: - print("%s TRANSACTIONS SENT. WAITING FOR BALANCES" % tx_count) + logging.info("%s TRANSACTIONS SENT. WAITING FOR BALANCES" % tx_count) mode = 1 else: - print( + logging.info( "BALANCES NEVER CAUGHT UP, CHECKING UNFINISHED TRANSACTIONS" ) - snapshot_expected_balances = [x for x in expected_balances] + + def trace_reverted_txs(last_tx_set, tx_ords): + logging.info("\n\nREVERTING THE FOLLOWING TXS WOULD BE ENOUGH:\n") + for tx_ord in tx_ords: + tx = last_tx_set[tx_ord] + logging.info("\nTRANSACTION %s" % tx_ord) + logging.info("TX tuple: %s" % (tx[1:],)) + response = nodes[-1].json_rpc( + 'tx', [tx[3], "test%s" % tx[1]], timeout=1) + logging.info("Status: %s", response) def revert_txs(): nonlocal expected_balances @@ -212,26 +237,20 @@ def revert_txs(): bad = 0 for tx in last_tx_set: tx_happened = True - try: - response = nodes[-1].json_rpc( - 'tx', [tx[3], "test%s" % tx[1]], timeout=1) - - # due to #2195 if the tx was dropped, the query today times out. - if 'error' in response and 'data' in response[ - 'error'] and response['error'][ - 'data'] == 'Timeout': - tx_happened = False - elif 'result' in response and 'receipts_outcome' in response[ - 'result']: - tx_happened = len( - response['result']['receipts_outcome']) > 0 - else: - assert False, response - # This exception handler is also due to #2195 - except requests.exceptions.ReadTimeout: + + response = nodes[-1].json_rpc( + 'tx', [tx[3], "test%s" % tx[1]], timeout=1) + + if 'error' in response and 'data' in response[ + 'error'] and "doesn't exist" in response['error'][ + 'data']: tx_happened = False - except: - raise + elif 'result' in response and 'receipts_outcome' in response[ + 'result']: + tx_happened = len( + response['result']['receipts_outcome']) > 0 + else: + assert False, response if not tx_happened: bad += 1 @@ -244,7 +263,7 @@ def revert_txs(): good, bad = revert_txs() if expected_balances == get_balances(): # reverting helped - print("REVERTING HELPED, TX EXECUTED: %s, TX LOST: %s" % + logging.info("REVERTING HELPED, TX EXECUTED: %s, TX LOST: %s" % (good, bad)) bad_ratio = bad / (good + bad) if bad_ratio > rolling_tolerance: @@ -260,17 +279,46 @@ def revert_txs(): last_tx_set = [] else: # still no match, fail - print( + logging.info( "REVERTING DIDN'T HELP, TX EXECUTED: %s, TX LOST: %s" % (good, bad)) - for step in range( - 10 - ): # trace balances for 20 seconds to see if they are catching up - print(get_balances()) - time.sleep(2) - expected_balances = snapshot_expected_balances - good, bad = revert_txs() - print( + + for i in range(0, len(last_tx_set)): + tx = last_tx_set[i] + expected_balances[tx[1]] += tx[4] + expected_balances[tx[2]] -= tx[4] + + if get_balances() == expected_balances: + trace_reverted_txs(last_tx_set, [i]) + + for j in range(i + 1, len(last_tx_set)): + tx = last_tx_set[j] + expected_balances[tx[1]] += tx[4] + expected_balances[tx[2]] -= tx[4] + + if get_balances() == expected_balances: + trace_reverted_txs(last_tx_set, [i, j]) + + for k in range(j + 1, len(last_tx_set)): + tx = last_tx_set[k] + expected_balances[tx[1]] += tx[4] + expected_balances[tx[2]] -= tx[4] + + if get_balances() == expected_balances: + trace_reverted_txs(last_tx_set, [i, j, k]) + + expected_balances[tx[1]] -= tx[4] + expected_balances[tx[2]] += tx[4] + + tx = last_tx_set[j] + expected_balances[tx[1]] -= tx[4] + expected_balances[tx[2]] += tx[4] + + tx = last_tx_set[i] + expected_balances[tx[1]] -= tx[4] + expected_balances[tx[2]] += tx[4] + + logging.info( "The latest and greatest stats on successful/failed: %s/%s" % (good, bad)) assert False, "Balances didn't update in time. Expected: %s, received: %s" % ( @@ -289,20 +337,44 @@ def revert_txs(): amt = random.randint(0, min_balances[from_]) nonce_val, nonce_lock = nonces[from_] - hash_, _ = get_recent_hash(nodes[-1]) + hash_, _ = get_recent_hash(nodes[-1], 5) with nonce_lock: tx = sign_payment_tx(nodes[from_].signer_key, 'test%s' % to, amt, nonce_val.value, base58.b58decode(hash_.encode('utf8'))) - for validator_id in validator_ids: - try: - tx_hash = nodes[validator_id].send_tx(tx)['result'] - except (requests.exceptions.ReadTimeout, - requests.exceptions.ConnectionError): - if not network_issues_expected and not nodes[ - validator_id].mess_with: - raise + + # Loop trying to send the tx to all the validators, until at least one receives it + tx_hash = None + for send_attempt in range(SEND_TX_ATTEMPTS): + shuffled_validator_ids = [x for x in validator_ids] + random.shuffle(shuffled_validator_ids) + for validator_id in shuffled_validator_ids: + try: + info = nodes[validator_id].send_tx(tx) + if 'error' in info and info['error']['data'] == 'IsSyncing': + pass + + elif 'result' in info: + tx_hash = info['result'] + break + + else: + assert False, info + + except (requests.exceptions.ReadTimeout, + requests.exceptions.ConnectionError): + if not network_issues_expected and not nodes[ + validator_id].mess_with: + raise + + if tx_hash is not None: + break + + time.sleep(1) + + else: + assert False, "Failed to send the transation after %s attempts" % SEND_TX_ATTEMPTS last_tx_set.append((tx, from_, to, tx_hash, amt)) nonce_val.value = nonce_val.value + 1 @@ -315,7 +387,7 @@ def revert_txs(): else: if get_balances() == expected_balances: - print("BALANCES CAUGHT UP, BACK TO SPAMMING TXS") + logging.info("BALANCES CAUGHT UP, BACK TO SPAMMING TXS") min_balances = [x - MAX_STAKE for x in expected_balances] tx_count = 0 mode = 0 @@ -329,7 +401,7 @@ def revert_txs(): def get_the_guy_to_mess_up_with(nodes): - _, height = get_recent_hash(nodes[-1]) + _, height = get_recent_hash(nodes[-1], 5) return (height // EPOCH_LENGTH) % (len(nodes) - 1) @@ -340,7 +412,7 @@ def monkey_staking(stopped, error, nodes, nonces): whom = random.randint(0, len(nonces) - 2) status = nodes[-1].get_status() - hash_, _ = get_recent_hash(nodes[-1]) + hash_, _ = get_recent_hash(nodes[-1], 5) who_can_unstake = get_the_guy_to_mess_up_with(nodes) @@ -389,7 +461,7 @@ def blocks_tracker(stopped, error, nodes, nonces): status = nodes[val_id].get_status() if status['validators'] != last_validators and val_id == -1: last_validators = status['validators'] - print( + logging.info( "VALIDATORS TRACKER: validators set changed, new set: %s" % [x['account_id'] for x in last_validators]) hash_ = status['sync_info']['latest_block_hash'] @@ -399,11 +471,11 @@ def blocks_tracker(stopped, error, nodes, nonces): if stopped.value != 0: done = True if not every_ten or largest_height % 10 == 0: - print("BLOCK TRACKER: new height %s" % largest_height) + logging.info("BLOCK TRACKER: new height %s" % largest_height) if largest_height >= 20: if not every_ten: every_ten = True - print( + logging.info( "BLOCK TRACKER: switching to tracing every ten blocks to reduce spam" ) largest_height = height @@ -456,10 +528,10 @@ def blocks_tracker(stopped, error, nodes, nonces): if divergence > largest_divergence: largest_divergence = divergence - print("=== BLOCK TRACKER SUMMARY ===") - print("Largest height: %s" % largest_height) - print("Largest divergence: %s" % largest_divergence) - print("Per node: %s" % largest_per_node) + logging.info("=== BLOCK TRACKER SUMMARY ===") + logging.info("Largest height: %s" % largest_height) + logging.info("Largest divergence: %s" % largest_divergence) + logging.info("Per node: %s" % largest_per_node) if not network_issues_expected: assert largest_divergence < len(nodes) @@ -483,7 +555,8 @@ def doit(s, n, N, k, monkeys, timeout): N, k + 1, s, config, [["min_gas_price", 0], ["max_inflation_rate", [0, 1]], ["epoch_length", EPOCH_LENGTH], - ["block_producer_kickout_threshold", 70]], local_config_changes) + ["block_producer_kickout_threshold", 10], + ["chunk_producer_kickout_threshold", 10]], local_config_changes) started = time.time() @@ -502,9 +575,9 @@ def doit(s, n, N, k, monkeys, timeout): node.mess_with = False monkey_names = [x.__name__ for x in monkeys] - print(monkey_names) + logging.info(monkey_names) if 'monkey_local_network' in monkey_names or 'monkey_global_network' in monkey_names: - print( + logging.info( "There are monkeys messing up with network, initializing the infra") if config['local']: init_network_pillager() @@ -547,10 +620,10 @@ def check_errors(): check_errors() time.sleep(1) - print("") - print("==========================================") - print("# TIMEOUT IS HIT, SHUTTING DOWN THE TEST #") - print("==========================================") + logging.info("") + logging.info("==========================================") + logging.info("# TIMEOUT IS HIT, SHUTTING DOWN THE TEST #") + logging.info("==========================================") stopped.value = 1 started_shutdown = time.time() while True: