From aaa8e740d824977d1d626b961ff8b0cab3e15078 Mon Sep 17 00:00:00 2001 From: Illia Polosukhin Date: Sat, 30 May 2020 13:48:55 -0700 Subject: [PATCH 1/6] fix(client): setup separate SyncArbiter for ViewClientActor with 4 threads Change from 2752 + allow storage failures in code reachable from view client. Test plan --------- Run existing tests --- Cargo.toml | 2 +- chain/chain/Cargo.toml | 1 + chain/chain/src/chain.rs | 28 +++-- chain/chain/src/error.rs | 17 +++ chain/chain/src/store.rs | 7 ++ chain/chain/src/test_utils.rs | 14 ++- chain/chain/src/types.rs | 8 +- chain/client/src/client_actor.rs | 29 ++++- chain/client/src/lib.rs | 4 +- chain/client/src/test_utils.rs | 20 ++-- chain/client/src/view_client.rs | 148 +++++++++++++++++------- chain/network/tests/runner/mod.rs | 10 +- core/chain-configs/src/client_config.rs | 3 + neard/Cargo.toml | 2 + neard/src/config.rs | 8 ++ neard/src/lib.rs | 23 ++-- neard/src/main.rs | 3 +- neard/src/runtime.rs | 23 ++-- neard/tests/rpc_nodes.rs | 1 + neard/tests/stake_nodes.rs | 7 +- neard/tests/sync_nodes.rs | 25 ++-- neard/tests/sync_state_nodes.rs | 42 +++++-- rustfmt.toml | 1 + test-utils/testlib/src/actix_utils.rs | 4 + test-utils/testlib/src/lib.rs | 8 +- tests/test_tps_regression.rs | 7 +- 26 files changed, 314 insertions(+), 131 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index edd44c23f00..b6718ff24f1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -96,5 +96,5 @@ expensive_tests = ["neard/expensive_tests"] regression_tests = [] old_tests = [] adversarial = ["neard/adversarial", "near-jsonrpc/adversarial", "near-store/adversarial"] -no_cache = ["node-runtime/no_cache", "near-store/no_cache"] +no_cache = ["neard/no_cache"] metric_recorder = ["neard/metric_recorder"] diff --git a/chain/chain/Cargo.toml b/chain/chain/Cargo.toml index 93906dba07b..23250ef5878 100644 --- a/chain/chain/Cargo.toml +++ b/chain/chain/Cargo.toml @@ -37,3 +37,4 @@ near-logger-utils = {path = "../../test-utils/logger"} byzantine_asserts = [] expensive_tests = [] adversarial = [] +no_cache = ["near-store/no_cache"] diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 52f11a8f455..c2e82eb38b8 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -43,7 +43,7 @@ use near_primitives::views::{ }; use near_store::{ColState, ColStateHeaders, ColStateParts, ShardTries, StoreUpdate}; -use crate::error::{Error, ErrorKind}; +use crate::error::{Error, ErrorKind, LogTransientStorageError}; use crate::lightclient::get_epoch_block_producers_view; use crate::store::{ChainStore, ChainStoreAccess, ChainStoreUpdate, GCMode}; use crate::types::{ @@ -1345,8 +1345,9 @@ impl Chain { // Let's call it `current`. // 2a. `prev_` means we're working with height before current. // 3. In inner loops we use all prefixes with no relation to the context described above. - let sync_block = - self.get_block(&sync_hash).expect("block has already been checked for existence"); + let sync_block = self + .get_block(&sync_hash) + .log_storage_error("block has already been checked for existence")?; let sync_block_header = sync_block.header().clone(); let sync_block_epoch_id = sync_block.header().epoch_id().clone(); if shard_id as usize >= sync_block.chunks().len() { @@ -1464,8 +1465,9 @@ impl Chain { root_proofs.push(root_proofs_cur); } - let state_root_node = - self.runtime_adapter.get_state_root_node(shard_id, &chunk_header.inner.prev_state_root); + let state_root_node = self + .runtime_adapter + .get_state_root_node(shard_id, &chunk_header.inner.prev_state_root)?; let shard_state_header = ShardStateSyncResponseHeader { chunk, @@ -1497,8 +1499,9 @@ impl Chain { return Ok(state_part); } - let sync_block = - self.get_block(&sync_hash).expect("block has already been checked for existence"); + let sync_block = self + .get_block(&sync_hash) + .log_storage_error("block has already been checked for existence")?; let sync_block_header = sync_block.header().clone(); let sync_block_epoch_id = sync_block.header().epoch_id().clone(); if shard_id as usize >= sync_block.chunks().len() { @@ -1515,14 +1518,19 @@ impl Chain { return Err(ErrorKind::InvalidStateRequest("shard_id out of bounds".into()).into()); } let state_root = sync_prev_block.chunks()[shard_id as usize].inner.prev_state_root.clone(); - let state_root_node = self.runtime_adapter.get_state_root_node(shard_id, &state_root); + let state_root_node = self + .runtime_adapter + .get_state_root_node(shard_id, &state_root) + .log_storage_error("get_state_root_node fail")?; let num_parts = get_num_state_parts(state_root_node.memory_usage); if part_id >= num_parts { return Err(ErrorKind::InvalidStateRequest("part_id out of bound".to_string()).into()); } - let state_part = - self.runtime_adapter.obtain_state_part(shard_id, &state_root, part_id, num_parts); + let state_part = self + .runtime_adapter + .obtain_state_part(shard_id, &state_root, part_id, num_parts) + .log_storage_error("obtain_state_part fail")?; // Before saving State Part data, we need to make sure we can calculate and save State Header self.get_state_response_header(shard_id, sync_hash)?; diff --git a/chain/chain/src/error.rs b/chain/chain/src/error.rs index b94ddbaf5fd..3f9e8eb4e64 100644 --- a/chain/chain/src/error.rs +++ b/chain/chain/src/error.rs @@ -3,6 +3,7 @@ use std::io; use chrono::{DateTime, Utc}; use failure::{Backtrace, Context, Fail}; +use log::error; use near_primitives::challenge::{ChunkProofs, ChunkState}; use near_primitives::errors::{EpochError, StorageError}; @@ -173,6 +174,22 @@ pub enum ErrorKind { Other(String), } +/// For now StorageError can happen at any time from ViewClient because of +/// the used isolation level + running ViewClient in a separate thread. +pub trait LogTransientStorageError { + fn log_storage_error(self, message: &str) -> Self; +} + +impl LogTransientStorageError for Result { + fn log_storage_error(self, message: &str) -> Self { + error!(target: "client", + "Transient storage error: {}", + message + ); + self + } +} + impl Display for Error { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let cause = match self.cause() { diff --git a/chain/chain/src/store.rs b/chain/chain/src/store.rs index d6e1e0a6d5f..785eadc5de4 100644 --- a/chain/chain/src/store.rs +++ b/chain/chain/src/store.rs @@ -50,9 +50,16 @@ use crate::error::{Error, ErrorKind}; use crate::types::{Block, BlockHeader, LatestKnown}; /// lru cache size +#[cfg(not(feature = "no_cache"))] const CACHE_SIZE: usize = 100; +#[cfg(not(feature = "no_cache"))] const CHUNK_CACHE_SIZE: usize = 1024; +#[cfg(feature = "no_cache")] +const CACHE_SIZE: usize = 1; +#[cfg(feature = "no_cache")] +const CHUNK_CACHE_SIZE: usize = 1; + #[derive(Clone)] pub enum GCMode { Fork(ShardTries), diff --git a/chain/chain/src/test_utils.rs b/chain/chain/src/test_utils.rs index 747c99a5931..49be09ddb2c 100644 --- a/chain/chain/src/test_utils.rs +++ b/chain/chain/src/test_utils.rs @@ -761,7 +761,7 @@ impl RuntimeAdapter for KeyValueRuntime { state_root: &StateRoot, part_id: u64, num_parts: u64, - ) -> Vec { + ) -> Result, Error> { assert!(part_id < num_parts); let state = self.state.read().unwrap().get(&state_root).unwrap().clone(); let data = state.try_to_vec().expect("should never fall"); @@ -771,7 +771,7 @@ impl RuntimeAdapter for KeyValueRuntime { if part_id + 1 == num_parts { end = state_size; } - data[begin as usize..end as usize].to_vec() + Ok(data[begin as usize..end as usize].to_vec()) } fn validate_state_part( @@ -805,8 +805,12 @@ impl RuntimeAdapter for KeyValueRuntime { Ok(()) } - fn get_state_root_node(&self, _shard_id: ShardId, state_root: &StateRoot) -> StateRootNode { - StateRootNode { + fn get_state_root_node( + &self, + _shard_id: ShardId, + state_root: &StateRoot, + ) -> Result { + Ok(StateRootNode { data: self .state .read() @@ -817,7 +821,7 @@ impl RuntimeAdapter for KeyValueRuntime { .try_to_vec() .expect("should never fall"), memory_usage: self.state_size.read().unwrap().get(&state_root).unwrap().clone(), - } + }) } fn validate_state_root_node( diff --git a/chain/chain/src/types.rs b/chain/chain/src/types.rs index 9027c37f0fc..a49c28706eb 100644 --- a/chain/chain/src/types.rs +++ b/chain/chain/src/types.rs @@ -419,7 +419,7 @@ pub trait RuntimeAdapter: Send + Sync { state_root: &StateRoot, part_id: u64, num_parts: u64, - ) -> Vec; + ) -> Result, Error>; /// Validate state part that expected to be given state root with provided data. /// Returns false if the resulting part doesn't match the expected one. @@ -442,7 +442,11 @@ pub trait RuntimeAdapter: Send + Sync { /// Returns StateRootNode of a state. /// Panics if requested hash is not in storage. /// Never returns Error - fn get_state_root_node(&self, shard_id: ShardId, state_root: &StateRoot) -> StateRootNode; + fn get_state_root_node( + &self, + shard_id: ShardId, + state_root: &StateRoot, + ) -> Result; /// Validate StateRootNode of a state. fn validate_state_root_node( diff --git a/chain/client/src/client_actor.rs b/chain/client/src/client_actor.rs index 927e28b5fa7..c4ba23aabc0 100644 --- a/chain/client/src/client_actor.rs +++ b/chain/client/src/client_actor.rs @@ -5,7 +5,7 @@ use std::sync::{Arc, RwLock}; use std::thread; use std::time::{Duration, Instant}; -use actix::{Actor, Addr, AsyncContext, Context, Handler}; +use actix::{Actor, Addr, Arbiter, AsyncContext, Context, Handler}; use chrono::{DateTime, Utc}; use log::{debug, error, info, trace, warn}; @@ -1237,3 +1237,30 @@ impl ClientActor { }); } } + +/// Starts client in a separate Arbiter (thread). +pub fn start_client( + client_config: ClientConfig, + chain_genesis: ChainGenesis, + runtime_adapter: Arc, + node_id: PeerId, + network_adapter: Arc, + validator_signer: Option>, + telemetry_actor: Addr, +) -> (Addr, Arbiter) { + let client_arbiter = Arbiter::current(); + let client_addr = ClientActor::start_in_arbiter(&client_arbiter, move |_ctx| { + ClientActor::new( + client_config, + chain_genesis, + runtime_adapter, + node_id, + network_adapter, + validator_signer, + telemetry_actor, + true, + ) + .unwrap() + }); + (client_addr, client_arbiter) +} diff --git a/chain/client/src/lib.rs b/chain/client/src/lib.rs index b85b8a74be9..08189cc19b0 100644 --- a/chain/client/src/lib.rs +++ b/chain/client/src/lib.rs @@ -2,14 +2,14 @@ extern crate lazy_static; pub use crate::client::Client; -pub use crate::client_actor::ClientActor; +pub use crate::client_actor::{start_client, ClientActor}; pub use crate::types::{ Error, GetBlock, GetBlockProof, GetBlockProofResponse, GetBlockWithMerkleTree, GetChunk, GetExecutionOutcome, GetExecutionOutcomeResponse, GetGasPrice, GetNetworkInfo, GetNextLightClientBlock, GetStateChanges, GetStateChangesInBlock, GetValidatorInfo, Query, Status, StatusResponse, SyncStatus, TxStatus, TxStatusError, }; -pub use crate::view_client::ViewClientActor; +pub use crate::view_client::{start_view_client, ViewClientActor}; mod client; mod client_actor; diff --git a/chain/client/src/test_utils.rs b/chain/client/src/test_utils.rs index 378b4ead1dc..fe10a366255 100644 --- a/chain/client/src/test_utils.rs +++ b/chain/client/src/test_utils.rs @@ -37,7 +37,7 @@ use near_store::test_utils::create_test_store; use near_store::Store; use near_telemetry::TelemetryActor; -use crate::{Client, ClientActor, SyncStatus, ViewClientActor}; +use crate::{start_view_client, Client, ClientActor, SyncStatus, ViewClientActor}; use near_network::test_utils::MockNetworkAdapter; use num_rational::Rational; @@ -58,7 +58,7 @@ pub fn setup( network_adapter: Arc, transaction_validity_period: NumBlocks, genesis_time: DateTime, -) -> (Block, ClientActor, ViewClientActor) { +) -> (Block, ClientActor, Addr) { let store = create_test_store(); let num_validator_seats = validators.iter().map(|x| x.len()).sum::() as NumSeats; let runtime = Arc::new(KeyValueRuntime::new_with_validators( @@ -99,14 +99,13 @@ pub fn setup( num_validator_seats, archive, ); - let view_client = ViewClientActor::new( + let view_client_addr = start_view_client( Some(signer.validator_id().clone()), - &chain_genesis, + chain_genesis.clone(), runtime.clone(), network_adapter.clone(), config.clone(), - ) - .unwrap(); + ); let client = ClientActor::new( config, @@ -119,7 +118,7 @@ pub fn setup( enable_doomslug, ) .unwrap(); - (genesis_block, client, view_client) + (genesis_block, client, view_client_addr) } /// Sets up ClientActor and ViewClientActor with mock PeerManager. @@ -161,7 +160,7 @@ pub fn setup_mock_with_validity_period( transaction_validity_period: NumBlocks, ) -> (Addr, Addr) { let network_adapter = Arc::new(NetworkRecipient::new()); - let (_, client, view_client) = setup( + let (_, client, view_client_addr) = setup( vec![validators], 1, 1, @@ -177,7 +176,6 @@ pub fn setup_mock_with_validity_period( Utc::now(), ); let client_addr = client.start(); - let view_client_addr = view_client.start(); let client_addr1 = client_addr.clone(); let network_actor = NetworkMock::mock(Box::new(move |msg, ctx| { @@ -663,7 +661,7 @@ pub fn setup_mock_all_validators( .start(); let network_adapter = NetworkRecipient::new(); network_adapter.set_recipient(pm.recipient()); - let (block, client, view_client) = setup( + let (block, client, view_client_addr) = setup( validators_clone1.clone(), validator_groups, num_shards, @@ -678,7 +676,7 @@ pub fn setup_mock_all_validators( 10000, genesis_time, ); - *view_client_addr1.write().unwrap() = Some(view_client.start()); + *view_client_addr1.write().unwrap() = Some(view_client_addr); *genesis_block1.write().unwrap() = Some(block); client }); diff --git a/chain/client/src/view_client.rs b/chain/client/src/view_client.rs index f5c88709b51..338acff8d52 100644 --- a/chain/client/src/view_client.rs +++ b/chain/client/src/view_client.rs @@ -3,10 +3,10 @@ use std::cmp::Ordering; use std::hash::Hash; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; -use actix::{Actor, Context, Handler}; +use actix::{Actor, Addr, Handler, SyncArbiter, SyncContext}; use cached::{Cached, SizedCache}; use log::{debug, error, info, trace, warn}; @@ -49,6 +49,22 @@ const QUERY_REQUEST_LIMIT: usize = 500; /// Waiting time between requests, in ms const REQUEST_WAIT_TIME: u64 = 1000; +const POISONED_LOCK_ERR: &str = "The lock was poisoned."; + +/// Request and response manager across all instances of ViewClientActor. +pub struct ViewClientRequestManager { + /// Transaction query that needs to be forwarded to other shards + pub tx_status_requests: SizedCache, + /// Transaction status response + pub tx_status_response: SizedCache, + /// Query requests that need to be forwarded to other shards + pub query_requests: SizedCache, + /// Query responses from other nodes (can be errors) + pub query_responses: SizedCache>, + /// Receipt outcome requests + pub receipt_outcome_requests: SizedCache, +} + /// View client provides currently committed (to the storage) view of the current chain and state. pub struct ViewClientActor { #[cfg(feature = "adversarial")] @@ -64,16 +80,19 @@ pub struct ViewClientActor { runtime_adapter: Arc, network_adapter: Arc, pub config: ClientConfig, - /// Transaction query that needs to be forwarded to other shards - pub tx_status_requests: SizedCache, - /// Transaction status response - pub tx_status_response: SizedCache, - /// Query requests that need to be forwarded to other shards - pub query_requests: SizedCache, - /// Query responses from other nodes (can be errors) - pub query_responses: SizedCache>, - /// Receipt outcome requests - pub receipt_outcome_requests: SizedCache, + request_manager: Arc>, +} + +impl ViewClientRequestManager { + pub fn new() -> Self { + Self { + tx_status_requests: SizedCache::with_size(QUERY_REQUEST_LIMIT), + tx_status_response: SizedCache::with_size(QUERY_REQUEST_LIMIT), + query_requests: SizedCache::with_size(QUERY_REQUEST_LIMIT), + query_responses: SizedCache::with_size(QUERY_REQUEST_LIMIT), + receipt_outcome_requests: SizedCache::with_size(QUERY_REQUEST_LIMIT), + } + } } impl ViewClientActor { @@ -83,6 +102,7 @@ impl ViewClientActor { runtime_adapter: Arc, network_adapter: Arc, config: ClientConfig, + request_manager: Arc>, ) -> Result { // TODO: should we create shared ChainStore that is passed to both Client and ViewClient? let chain = @@ -99,11 +119,7 @@ impl ViewClientActor { runtime_adapter, network_adapter, config, - tx_status_requests: SizedCache::with_size(QUERY_REQUEST_LIMIT), - tx_status_response: SizedCache::with_size(QUERY_REQUEST_LIMIT), - query_requests: SizedCache::with_size(QUERY_REQUEST_LIMIT), - query_responses: SizedCache::with_size(QUERY_REQUEST_LIMIT), - receipt_outcome_requests: SizedCache::with_size(QUERY_REQUEST_LIMIT), + request_manager, }) } @@ -140,9 +156,12 @@ impl ViewClientActor { } fn handle_query(&mut self, msg: Query) -> Result, String> { - if let Some(response) = self.query_responses.cache_remove(&msg.query_id) { - self.query_requests.cache_remove(&msg.query_id); - return response.map(Some); + { + let mut request_manager = self.request_manager.write().expect(POISONED_LOCK_ERR); + if let Some(response) = request_manager.query_responses.cache_remove(&msg.query_id) { + request_manager.query_requests.cache_remove(&msg.query_id); + return response.map(Some); + } } let header = match msg.block_id_or_finality { @@ -195,7 +214,8 @@ impl ViewClientActor { } } // route request - if Self::need_request(msg.query_id.clone(), &mut self.query_requests) { + let mut request_manager = self.request_manager.write().expect(POISONED_LOCK_ERR); + if Self::need_request(msg.query_id.clone(), &mut request_manager.query_requests) { let validator = self .chain .find_validator_for_forwarding(shard_id) @@ -220,7 +240,8 @@ impl ViewClientActor { ) -> Result<(), TxStatusError> { if let Ok(&dst_shard_id) = self.chain.get_shard_id_for_receipt_id(&receipt_id) { if self.chain.get_chunk_extra(last_block_hash, dst_shard_id).is_err() { - if Self::need_request(receipt_id, &mut self.receipt_outcome_requests) { + let mut request_manager = self.request_manager.write().expect(POISONED_LOCK_ERR); + if Self::need_request(receipt_id, &mut request_manager.receipt_outcome_requests) { let validator = self .chain .find_validator_for_forwarding(dst_shard_id) @@ -239,10 +260,14 @@ impl ViewClientActor { tx_hash: CryptoHash, signer_account_id: AccountId, ) -> Result, TxStatusError> { - if let Some(res) = self.tx_status_response.cache_remove(&tx_hash) { - self.tx_status_requests.cache_remove(&tx_hash); - return Ok(Some(res)); + { + let mut request_manager = self.request_manager.write().expect(POISONED_LOCK_ERR); + if let Some(res) = request_manager.tx_status_response.cache_remove(&tx_hash) { + request_manager.tx_status_requests.cache_remove(&tx_hash); + return Ok(Some(res)); + } } + let head = self.chain.head().map_err(|e| TxStatusError::ChainError(e))?; let target_shard_id = self.runtime_adapter.account_id_to_shard_id(&signer_account_id); // Check if we are tracking this shard. @@ -288,7 +313,8 @@ impl ViewClientActor { }, } } else { - if Self::need_request(tx_hash, &mut self.tx_status_requests) { + let mut request_manager = self.request_manager.write().expect(POISONED_LOCK_ERR); + if Self::need_request(tx_hash, &mut request_manager.tx_status_requests) { let target_shard_id = self.runtime_adapter.account_id_to_shard_id(&signer_account_id); let validator = self @@ -360,14 +386,14 @@ impl ViewClientActor { } impl Actor for ViewClientActor { - type Context = Context; + type Context = SyncContext; } /// Handles runtime query. impl Handler for ViewClientActor { type Result = Result, String>; - fn handle(&mut self, msg: Query, _: &mut Context) -> Self::Result { + fn handle(&mut self, msg: Query, _: &mut Self::Context) -> Self::Result { self.handle_query(msg) } } @@ -376,7 +402,7 @@ impl Handler for ViewClientActor { impl Handler for ViewClientActor { type Result = Result; - fn handle(&mut self, msg: GetBlock, _: &mut Context) -> Self::Result { + fn handle(&mut self, msg: GetBlock, _: &mut Self::Context) -> Self::Result { match msg.0 { BlockIdOrFinality::Finality(finality) => { let block_hash = @@ -402,7 +428,7 @@ impl Handler for ViewClientActor { impl Handler for ViewClientActor { type Result = Result<(BlockView, PartialMerkleTree), String>; - fn handle(&mut self, msg: GetBlockWithMerkleTree, ctx: &mut Context) -> Self::Result { + fn handle(&mut self, msg: GetBlockWithMerkleTree, ctx: &mut Self::Context) -> Self::Result { let block_view = self.handle(GetBlock(msg.0), ctx)?; self.chain .mut_store() @@ -464,7 +490,7 @@ impl Handler for ViewClientActor { impl Handler for ViewClientActor { type Result = Result, TxStatusError>; - fn handle(&mut self, msg: TxStatus, _: &mut Context) -> Self::Result { + fn handle(&mut self, msg: TxStatus, _: &mut Self::Context) -> Self::Result { self.get_tx_status(msg.tx_hash, msg.signer_account_id) } } @@ -472,7 +498,7 @@ impl Handler for ViewClientActor { impl Handler for ViewClientActor { type Result = Result; - fn handle(&mut self, msg: GetValidatorInfo, _: &mut Context) -> Self::Result { + fn handle(&mut self, msg: GetValidatorInfo, _: &mut Self::Context) -> Self::Result { self.maybe_block_id_to_block_hash(msg.block_id) .and_then(|block_hash| self.runtime_adapter.get_validator_info(&block_hash)) .map_err(|err| err.to_string()) @@ -483,7 +509,7 @@ impl Handler for ViewClientActor { impl Handler for ViewClientActor { type Result = Result; - fn handle(&mut self, msg: GetStateChangesInBlock, _: &mut Context) -> Self::Result { + fn handle(&mut self, msg: GetStateChangesInBlock, _: &mut Self::Context) -> Self::Result { self.chain .store() .get_state_changes_in_block(&msg.block_hash) @@ -496,7 +522,7 @@ impl Handler for ViewClientActor { impl Handler for ViewClientActor { type Result = Result; - fn handle(&mut self, msg: GetStateChanges, _: &mut Context) -> Self::Result { + fn handle(&mut self, msg: GetStateChanges, _: &mut Self::Context) -> Self::Result { self.chain .store() .get_state_changes(&msg.block_hash, &msg.state_changes_request.into()) @@ -516,7 +542,7 @@ impl Handler for ViewClientActor { impl Handler for ViewClientActor { type Result = Result, String>; - fn handle(&mut self, request: GetNextLightClientBlock, _: &mut Context) -> Self::Result { + fn handle(&mut self, request: GetNextLightClientBlock, _: &mut Self::Context) -> Self::Result { let last_block_header = self.chain.get_block_header(&request.last_block_hash).map_err(|err| err.to_string())?; let last_epoch_id = last_block_header.epoch_id().clone(); @@ -559,7 +585,7 @@ impl Handler for ViewClientActor { impl Handler for ViewClientActor { type Result = Result; - fn handle(&mut self, msg: GetExecutionOutcome, _: &mut Context) -> Self::Result { + fn handle(&mut self, msg: GetExecutionOutcome, _: &mut Self::Context) -> Self::Result { let (id, target_shard_id) = match msg.id { TransactionOrReceiptId::Transaction { transaction_hash, sender_id } => { (transaction_hash, self.runtime_adapter.account_id_to_shard_id(&sender_id)) @@ -625,7 +651,7 @@ impl Handler for ViewClientActor { impl Handler for ViewClientActor { type Result = Result; - fn handle(&mut self, msg: GetBlockProof, _: &mut Context) -> Self::Result { + fn handle(&mut self, msg: GetBlockProof, _: &mut Self::Context) -> Self::Result { self.chain.check_block_final_and_canonical(&msg.block_hash).map_err(|e| e.to_string())?; self.chain .check_block_final_and_canonical(&msg.head_block_hash) @@ -643,7 +669,7 @@ impl Handler for ViewClientActor { impl Handler for ViewClientActor { type Result = NetworkViewClientResponses; - fn handle(&mut self, msg: NetworkViewClientMessages, _ctx: &mut Context) -> Self::Result { + fn handle(&mut self, msg: NetworkViewClientMessages, _ctx: &mut Self::Context) -> Self::Result { match msg { #[cfg(feature = "adversarial")] NetworkViewClientMessages::Adversarial(adversarial_msg) => { @@ -686,8 +712,9 @@ impl Handler for ViewClientActor { } NetworkViewClientMessages::TxStatusResponse(tx_result) => { let tx_hash = tx_result.transaction_outcome.id; - if self.tx_status_requests.cache_remove(&tx_hash).is_some() { - self.tx_status_response.cache_set(tx_hash, *tx_result); + let mut request_manager = self.request_manager.write().expect(POISONED_LOCK_ERR); + if request_manager.tx_status_requests.cache_remove(&tx_hash).is_some() { + request_manager.tx_status_response.cache_set(tx_hash, *tx_result); } NetworkViewClientResponses::NoResponse } @@ -704,8 +731,9 @@ impl Handler for ViewClientActor { } } NetworkViewClientMessages::QueryResponse { query_id, response } => { - if self.query_requests.cache_get(&query_id).is_some() { - self.query_responses.cache_set(query_id, response); + let mut request_manager = self.request_manager.write().expect(POISONED_LOCK_ERR); + if request_manager.query_requests.cache_get(&query_id).is_some() { + request_manager.query_responses.cache_set(query_id, response); } NetworkViewClientResponses::NoResponse } @@ -719,7 +747,13 @@ impl Handler for ViewClientActor { } } NetworkViewClientMessages::ReceiptOutcomeResponse(response) => { - if self.receipt_outcome_requests.cache_remove(response.id()).is_some() { + let have_request = { + let mut request_manager = + self.request_manager.write().expect(POISONED_LOCK_ERR); + request_manager.receipt_outcome_requests.cache_remove(response.id()).is_some() + }; + + if have_request { if let Ok(&shard_id) = self.chain.get_shard_id_for_receipt_id(response.id()) { let block_hash = response.block_hash; if let Ok(Some(&next_block_hash)) = @@ -911,3 +945,31 @@ impl Handler for ViewClientActor { header.map(|b| GasPriceView { gas_price: b.gas_price() }).map_err(|e| e.to_string()) } } + +/// Starts the View Client in a new arbiter (thread). +pub fn start_view_client( + validator_account_id: Option, + chain_genesis: ChainGenesis, + runtime_adapter: Arc, + network_adapter: Arc, + config: ClientConfig, +) -> Addr { + let request_manager = Arc::new(RwLock::new(ViewClientRequestManager::new())); + SyncArbiter::start(config.view_client_threads, move || { + // ViewClientActor::start_in_arbiter(&Arbiter::current(), move |_ctx| { + let validator_account_id1 = validator_account_id.clone(); + let runtime_adapter1 = runtime_adapter.clone(); + let network_adapter1 = network_adapter.clone(); + let config1 = config.clone(); + let request_manager1 = request_manager.clone(); + ViewClientActor::new( + validator_account_id1, + &chain_genesis, + runtime_adapter1, + network_adapter1, + config1, + request_manager1, + ) + .unwrap() + }) +} diff --git a/chain/network/tests/runner/mod.rs b/chain/network/tests/runner/mod.rs index be4255434e5..28f1e30bd32 100644 --- a/chain/network/tests/runner/mod.rs +++ b/chain/network/tests/runner/mod.rs @@ -11,7 +11,7 @@ use futures::{future, FutureExt, TryFutureExt}; use near_chain::test_utils::KeyValueRuntime; use near_chain::ChainGenesis; use near_chain_configs::ClientConfig; -use near_client::{ClientActor, ViewClientActor}; +use near_client::{start_view_client, ClientActor}; use near_crypto::KeyType; use near_logger_utils::init_test_logger; use near_network::test_utils::{ @@ -91,15 +91,13 @@ pub fn setup_network_node( ) .unwrap() .start(); - let view_client_actor = ViewClientActor::new( + let view_client_actor = start_view_client( config.account_id.clone(), - &chain_genesis, + chain_genesis.clone(), runtime.clone(), network_adapter.clone(), client_config, - ) - .unwrap() - .start(); + ); PeerManagerActor::new( store.clone(), diff --git a/core/chain-configs/src/client_config.rs b/core/chain-configs/src/client_config.rs index 70047a7bca4..3ad39f26612 100644 --- a/core/chain-configs/src/client_config.rs +++ b/core/chain-configs/src/client_config.rs @@ -75,6 +75,8 @@ pub struct ClientConfig { pub tracked_shards: Vec, /// Not clear old data, set `true` for archive nodes. pub archive: bool, + /// Number of threads for ViewClientActor pool. + pub view_client_threads: usize, } impl ClientConfig { @@ -125,6 +127,7 @@ impl ClientConfig { tracked_accounts: vec![], tracked_shards: vec![], archive, + view_client_threads: 1, } } } diff --git a/neard/Cargo.toml b/neard/Cargo.toml index 32d8d6a6e09..9dfc9dc2369 100644 --- a/neard/Cargo.toml +++ b/neard/Cargo.toml @@ -3,6 +3,7 @@ name = "neard" version = "1.2.0" authors = ["Near Inc "] edition = "2018" +default-run = "neard" [dependencies] actix = "0.9" @@ -50,6 +51,7 @@ testlib = { path = "../test-utils/testlib" } adversarial = ["near-client/adversarial", "near-network/adversarial", "near-store/adversarial"] expensive_tests = ["near-client/expensive_tests", "near-epoch-manager/expensive_tests", "near-chain/expensive_tests"] metric_recorder = ["near-network/metric_recorder", "near-client/metric_recorder"] +no_cache = ["node-runtime/no_cache", "near-store/no_cache", "near-chain/no_cache"] [[bin]] path = "src/main.rs" diff --git a/neard/src/config.rs b/neard/src/config.rs index 9991c553e1d..367f01986fc 100644 --- a/neard/src/config.rs +++ b/neard/src/config.rs @@ -289,6 +289,10 @@ fn default_gc_blocks_limit() -> NumBlocks { 2 } +fn default_view_client_threads() -> usize { + 4 +} + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Consensus { /// Minimum number of peers to start syncing. @@ -378,6 +382,8 @@ pub struct Config { pub archive: bool, #[serde(default = "default_gc_blocks_limit")] pub gc_blocks_limit: NumBlocks, + #[serde(default = "default_view_client_threads")] + pub view_client_threads: usize, } impl Default for Config { @@ -395,6 +401,7 @@ impl Default for Config { tracked_shards: vec![], archive: false, gc_blocks_limit: default_gc_blocks_limit(), + view_client_threads: 4, } } } @@ -559,6 +566,7 @@ impl NearConfig { tracked_shards: config.tracked_shards, archive: config.archive, gc_blocks_limit: config.gc_blocks_limit, + view_client_threads: config.view_client_threads, }, network_config: NetworkConfig { public_key: network_key_pair.public_key, diff --git a/neard/src/lib.rs b/neard/src/lib.rs index a71c687159c..e6abc2b0322 100644 --- a/neard/src/lib.rs +++ b/neard/src/lib.rs @@ -7,7 +7,7 @@ use log::{error, info}; use tracing::trace; use near_chain::ChainGenesis; -use near_client::{ClientActor, ViewClientActor}; +use near_client::{start_client, start_view_client, ClientActor, ViewClientActor}; use near_jsonrpc::start_http; use near_network::{NetworkRecipient, PeerManagerActor}; use near_store::migrations::{ @@ -110,9 +110,10 @@ pub fn init_and_migrate_store(home_dir: &Path) -> Arc { pub fn start_with_config( home_dir: &Path, config: NearConfig, -) -> (Addr, Addr) { +) -> (Addr, Addr, Vec) { let store = init_and_migrate_store(home_dir); near_actix_utils::init_stop_on_panic(); + let runtime = Arc::new(NightshadeRuntime::new( home_dir, Arc::clone(&store), @@ -126,17 +127,14 @@ pub fn start_with_config( let node_id = config.network_config.public_key.clone().into(); let network_adapter = Arc::new(NetworkRecipient::new()); - let view_client = ViewClientActor::new( + let view_client = start_view_client( config.validator_signer.as_ref().map(|signer| signer.validator_id().clone()), - &chain_genesis, + chain_genesis.clone(), runtime.clone(), network_adapter.clone(), config.client_config.clone(), - ) - .unwrap() - .start(); - - let client_actor = ClientActor::new( + ); + let (client_actor, client_arbiter) = start_client( config.client_config, chain_genesis, runtime, @@ -144,10 +142,7 @@ pub fn start_with_config( network_adapter.clone(), config.validator_signer, telemetry, - true, - ) - .unwrap() - .start(); + ); start_http( config.rpc_config, Arc::clone(&config.genesis), @@ -171,5 +166,5 @@ pub fn start_with_config( trace!(target: "diagnostic", key="log", "Starting NEAR node with diagnostic activated"); - (client_actor, view_client) + (client_actor, view_client, vec![client_arbiter, arbiter]) } diff --git a/neard/src/main.rs b/neard/src/main.rs index 2d74fbb9977..eda2d9643ba 100644 --- a/neard/src/main.rs +++ b/neard/src/main.rs @@ -222,8 +222,9 @@ fn main() { } let system = System::new("NEAR"); - start_with_config(home_dir, near_config); + let (_, _, arbiters) = start_with_config(home_dir, near_config); system.run().unwrap(); + arbiters.into_iter().map(|mut a| a.join()).for_each(drop); } ("unsafe_reset_data", Some(_args)) => { let store_path = get_store_path(home_dir); diff --git a/neard/src/runtime.rs b/neard/src/runtime.rs index 33dacc1e067..be2b774f2df 100644 --- a/neard/src/runtime.rs +++ b/neard/src/runtime.rs @@ -1135,27 +1135,30 @@ impl RuntimeAdapter for NightshadeRuntime { epoch_manager.get_validator_info(block_hash).map_err(|e| e.into()) } + /// Returns StorageError when storage is inconsistent. + /// This is possible with the used isolation level + running ViewClient in a separate thread fn obtain_state_part( &self, shard_id: ShardId, state_root: &StateRoot, part_id: u64, num_parts: u64, - ) -> Vec { + ) -> Result, Error> { assert!(part_id < num_parts); let trie = self.get_trie_for_shard(shard_id); - match trie.get_trie_nodes_for_part(part_id, num_parts, state_root) { + let result = match trie.get_trie_nodes_for_part(part_id, num_parts, state_root) { Ok(partial_state) => partial_state, Err(e) => { error!(target: "runtime", "Can't get_trie_nodes_for_part for {:?}, part_id {:?}, num_parts {:?}, {:?}", state_root, part_id, num_parts, e ); - panic!("RuntimeError::StorageInconsistentState, {:?}", e) + return Err(e.to_string().into()); } } .try_to_vec() - .expect("serializer should not fail") + .expect("serializer should not fail"); + Ok(result) } fn validate_state_part( @@ -1203,10 +1206,14 @@ impl RuntimeAdapter for NightshadeRuntime { Ok(store_update.commit()?) } - fn get_state_root_node(&self, shard_id: ShardId, state_root: &StateRoot) -> StateRootNode { + fn get_state_root_node( + &self, + shard_id: ShardId, + state_root: &StateRoot, + ) -> Result { self.get_trie_for_shard(shard_id) .retrieve_root_node(state_root) - .expect("Failed to get root node") + .map_err(|e| e.to_string().into()) } fn validate_state_root_node( @@ -1934,8 +1941,8 @@ mod test { let staking_transaction = stake(1, &signer, &block_producers[0], TESTING_INIT_STAKE + 1); env.step_default(vec![staking_transaction]); env.step_default(vec![]); - let state_part = env.runtime.obtain_state_part(0, &env.state_roots[0], 0, 1); - let root_node = env.runtime.get_state_root_node(0, &env.state_roots[0]); + let state_part = env.runtime.obtain_state_part(0, &env.state_roots[0], 0, 1).unwrap(); + let root_node = env.runtime.get_state_root_node(0, &env.state_roots[0]).unwrap(); let mut new_env = TestEnv::new("test_state_sync", vec![validators.clone()], 2, vec![], vec![], true); for i in 1..=2 { diff --git a/neard/tests/rpc_nodes.rs b/neard/tests/rpc_nodes.rs index 522a757c7e4..24e9e5e40a7 100644 --- a/neard/tests/rpc_nodes.rs +++ b/neard/tests/rpc_nodes.rs @@ -594,6 +594,7 @@ fn test_get_execution_outcome(is_tx_successful: bool) { .start(); system.run().unwrap(); + clients.into_iter().flat_map(|c| c.2.into_iter().map(|mut a| a.join())).for_each(drop); }); } diff --git a/neard/tests/stake_nodes.rs b/neard/tests/stake_nodes.rs index 7cddac72c33..b5bc576006a 100644 --- a/neard/tests/stake_nodes.rs +++ b/neard/tests/stake_nodes.rs @@ -2,7 +2,7 @@ use std::path::Path; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use actix::{Actor, Addr, System}; +use actix::{Actor, Addr, Arbiter, System}; use futures::{future, FutureExt}; use num_rational::Rational; use rand::Rng; @@ -29,6 +29,7 @@ struct TestNode { client: Addr, view_client: Addr, genesis_hash: CryptoHash, + arbiters: Vec, } fn init_test_staking( @@ -72,11 +73,11 @@ fn init_test_staking( .enumerate() .map(|(i, config)| { let genesis_hash = genesis_hash(Arc::clone(&config.genesis)); - let (client, view_client) = start_with_config(paths[i], config.clone()); + let (client, view_client, arbiters) = start_with_config(paths[i], config.clone()); let account_id = format!("near.{}", i); let signer = Arc::new(InMemorySigner::from_seed(&account_id, KeyType::ED25519, &account_id)); - TestNode { account_id, signer, config, client, view_client, genesis_hash } + TestNode { account_id, signer, config, client, view_client, genesis_hash, arbiters } }) .collect() } diff --git a/neard/tests/sync_nodes.rs b/neard/tests/sync_nodes.rs index 1e5a01f120a..a39aace6c7f 100644 --- a/neard/tests/sync_nodes.rs +++ b/neard/tests/sync_nodes.rs @@ -1,5 +1,5 @@ use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::time::Duration; use actix::{Actor, Addr, System}; @@ -111,13 +111,13 @@ fn sync_nodes() { let system = System::new("NEAR"); let dir1 = tempfile::Builder::new().prefix("sync_nodes_1").tempdir().unwrap(); - let (client1, _) = start_with_config(dir1.path(), near1); + let (client1, _, arbiters) = start_with_config(dir1.path(), near1); let signer = InMemoryValidatorSigner::from_seed("other", KeyType::ED25519, "other"); let _ = add_blocks(vec![genesis_block], client1, 13, genesis.config.epoch_length, &signer); let dir2 = tempfile::Builder::new().prefix("sync_nodes_2").tempdir().unwrap(); - let (_, view_client2) = start_with_config(dir2.path(), near2); + let (_, view_client2, arbiters2) = start_with_config(dir2.path(), near2); WaitOrTimeout::new( Box::new(move |_ctx| { @@ -136,6 +136,8 @@ fn sync_nodes() { .start(); system.run().unwrap(); + arbiters.into_iter().map(|mut a| a.join()).for_each(drop); + arbiters2.into_iter().map(|mut a| a.join()).for_each(drop); }); } @@ -161,10 +163,10 @@ fn sync_after_sync_nodes() { let system = System::new("NEAR"); let dir1 = tempfile::Builder::new().prefix("sync_nodes_1").tempdir().unwrap(); - let (client1, _) = start_with_config(dir1.path(), near1); + let (client1, _, arbiters) = start_with_config(dir1.path(), near1); let dir2 = tempfile::Builder::new().prefix("sync_nodes_2").tempdir().unwrap(); - let (_, view_client2) = start_with_config(dir2.path(), near2); + let (_, view_client2, arbiters2) = start_with_config(dir2.path(), near2); let signer = InMemoryValidatorSigner::from_seed("other", KeyType::ED25519, "other"); let blocks = add_blocks( @@ -204,6 +206,8 @@ fn sync_after_sync_nodes() { .start(); system.run().unwrap(); + arbiters.into_iter().map(|mut a| a.join()).for_each(drop); + arbiters2.into_iter().map(|mut a| a.join()).for_each(drop); }); } @@ -234,7 +238,7 @@ fn sync_state_stake_change() { let dir1 = tempfile::Builder::new().prefix("sync_state_stake_change_1").tempdir().unwrap(); let dir2 = tempfile::Builder::new().prefix("sync_state_stake_change_2").tempdir().unwrap(); - let (client1, view_client1) = start_with_config(dir1.path(), near1.clone()); + let (client1, view_client1, arbiters) = start_with_config(dir1.path(), near1.clone()); let genesis_hash = *genesis_block(genesis).hash(); let signer = Arc::new(InMemorySigner::from_seed("test1", KeyType::ED25519, "test1")); @@ -258,16 +262,21 @@ fn sync_state_stake_change() { let started = Arc::new(AtomicBool::new(false)); let dir2_path = dir2.path().to_path_buf(); + let arbiters_holder = Arc::new(RwLock::new(vec![])); + let arbiters_holder2 = arbiters_holder.clone(); WaitOrTimeout::new( Box::new(move |_ctx| { let started_copy = started.clone(); let near2_copy = near2.clone(); let dir2_path_copy = dir2_path.clone(); + let arbiters_holder2 = arbiters_holder2.clone(); actix::spawn(view_client1.send(GetBlock::latest()).then(move |res| { let latest_height = res.unwrap().unwrap().header.height; if !started_copy.load(Ordering::SeqCst) && latest_height > 10 { started_copy.store(true, Ordering::SeqCst); - let (_, view_client2) = start_with_config(&dir2_path_copy, near2_copy); + let (_, view_client2, arbiters) = + start_with_config(&dir2_path_copy, near2_copy); + *arbiters_holder2.write().unwrap() = arbiters; WaitOrTimeout::new( Box::new(move |_ctx| { @@ -296,5 +305,7 @@ fn sync_state_stake_change() { .start(); system.run().unwrap(); + arbiters.into_iter().map(|mut a| a.join()).for_each(drop); + arbiters_holder.write().unwrap().iter_mut().map(|a| a.join()).for_each(drop); }); } diff --git a/neard/tests/sync_state_nodes.rs b/neard/tests/sync_state_nodes.rs index 8472b859cdd..cf704d764a1 100644 --- a/neard/tests/sync_state_nodes.rs +++ b/neard/tests/sync_state_nodes.rs @@ -26,14 +26,17 @@ fn sync_state_nodes() { let system = System::new("NEAR"); let dir1 = tempfile::Builder::new().prefix("sync_nodes_1").tempdir().unwrap(); - let (_, view_client1) = start_with_config(dir1.path(), near1); + let (_, view_client1, arbiters) = start_with_config(dir1.path(), near1); let view_client2_holder = Arc::new(RwLock::new(None)); + let arbiters_holder = Arc::new(RwLock::new(vec![])); + let arbiters_holder2 = arbiters_holder.clone(); WaitOrTimeout::new( Box::new(move |_ctx| { if view_client2_holder.read().unwrap().is_none() { let view_client2_holder2 = view_client2_holder.clone(); + let arbiters_holder2 = arbiters_holder2.clone(); let genesis2 = Arc::clone(&genesis); actix::spawn(view_client1.send(GetBlock::latest()).then(move |res| { @@ -41,6 +44,7 @@ fn sync_state_nodes() { Ok(Ok(b)) if b.header.height >= 101 => { let mut view_client2_holder2 = view_client2_holder2.write().unwrap(); + let mut arbiters_holder2 = arbiters_holder2.write().unwrap(); if view_client2_holder2.is_none() { let mut near2 = @@ -54,8 +58,10 @@ fn sync_state_nodes() { .prefix("sync_nodes_2") .tempdir() .unwrap(); - let (_, view_client2) = start_with_config(dir2.path(), near2); + let (_, view_client2, arbiters) = + start_with_config(dir2.path(), near2); *view_client2_holder2 = Some(view_client2); + *arbiters_holder2 = arbiters; } } Ok(Ok(b)) if b.header.height < 101 => { @@ -89,6 +95,8 @@ fn sync_state_nodes() { .start(); system.run().unwrap(); + arbiters.into_iter().map(|mut a| a.join()).for_each(drop); + arbiters_holder.write().unwrap().iter_mut().map(|a| a.join()).for_each(drop); }); } @@ -136,20 +144,23 @@ fn sync_state_nodes_multishard() { near1.client_config.max_block_production_delay; let dir1 = tempfile::Builder::new().prefix("sync_nodes_1").tempdir().unwrap(); - let (_, view_client1) = start_with_config(dir1.path(), near1); + let (_, view_client1, arbiters1) = start_with_config(dir1.path(), near1); let dir3 = tempfile::Builder::new().prefix("sync_nodes_3").tempdir().unwrap(); - let (_, _) = start_with_config(dir3.path(), near3); + let (_, _, arbiters2) = start_with_config(dir3.path(), near3); let dir4 = tempfile::Builder::new().prefix("sync_nodes_4").tempdir().unwrap(); - let (_, _) = start_with_config(dir4.path(), near4); + let (_, _, arbiters3) = start_with_config(dir4.path(), near4); let view_client2_holder = Arc::new(RwLock::new(None)); + let arbiter_holder = Arc::new(RwLock::new(vec![])); + let arbiter_holder2 = arbiter_holder.clone(); WaitOrTimeout::new( Box::new(move |_ctx| { if view_client2_holder.read().unwrap().is_none() { let view_client2_holder2 = view_client2_holder.clone(); + let arbiter_holder2 = arbiter_holder2.clone(); let genesis2 = Arc::clone(&genesis); actix::spawn(view_client1.send(GetBlock::latest()).then(move |res| { @@ -157,6 +168,7 @@ fn sync_state_nodes_multishard() { Ok(Ok(b)) if b.header.height >= 101 => { let mut view_client2_holder2 = view_client2_holder2.write().unwrap(); + let mut arbiter_holder2 = arbiter_holder2.write().unwrap(); if view_client2_holder2.is_none() { let mut near2 = @@ -177,8 +189,10 @@ fn sync_state_nodes_multishard() { .prefix("sync_nodes_2") .tempdir() .unwrap(); - let (_, view_client2) = start_with_config(dir2.path(), near2); + let (_, view_client2, arbiters) = + start_with_config(dir2.path(), near2); *view_client2_holder2 = Some(view_client2); + *arbiter_holder2 = arbiters; } } Ok(Ok(b)) if b.header.height < 101 => { @@ -220,6 +234,10 @@ fn sync_state_nodes_multishard() { .start(); system.run().unwrap(); + arbiters1.into_iter().map(|mut a| a.join()).for_each(drop); + arbiters2.into_iter().map(|mut a| a.join()).for_each(drop); + arbiters3.into_iter().map(|mut a| a.join()).for_each(drop); + arbiter_holder.write().unwrap().iter_mut().map(|a| a.join()).for_each(drop); }); } @@ -247,15 +265,18 @@ fn sync_empty_state() { near1.client_config.max_block_production_delay = Duration::from_millis(400); let dir1 = tempfile::Builder::new().prefix("sync_nodes_1").tempdir().unwrap(); - let (_, view_client1) = start_with_config(dir1.path(), near1); + let (_, view_client1, arbiters) = start_with_config(dir1.path(), near1); let dir2 = Arc::new(tempfile::Builder::new().prefix("sync_nodes_2").tempdir().unwrap()); let view_client2_holder = Arc::new(RwLock::new(None)); + let arbiters_holder = Arc::new(RwLock::new(vec![])); + let arbiters_holder2 = arbiters_holder.clone(); WaitOrTimeout::new( Box::new(move |_ctx| { if view_client2_holder.read().unwrap().is_none() { let view_client2_holder2 = view_client2_holder.clone(); + let arbiters_holder2 = arbiters_holder2.clone(); let genesis2 = Arc::clone(&genesis); let dir2 = dir2.clone(); @@ -264,6 +285,7 @@ fn sync_empty_state() { Ok(Ok(b)) if b.header.height >= state_sync_horizon + 1 => { let mut view_client2_holder2 = view_client2_holder2.write().unwrap(); + let mut arbiters_holder2 = arbiters_holder2.write().unwrap(); if view_client2_holder2.is_none() { let mut near2 = @@ -281,8 +303,10 @@ fn sync_empty_state() { near2.client_config.block_fetch_horizon = block_fetch_horizon; near2.client_config.tracked_shards = vec![0, 1, 2, 3]; - let (_, view_client2) = start_with_config(dir2.path(), near2); + let (_, view_client2, arbiters) = + start_with_config(dir2.path(), near2); *view_client2_holder2 = Some(view_client2); + *arbiters_holder2 = arbiters; } } Ok(Ok(b)) if b.header.height <= state_sync_horizon => { @@ -324,5 +348,7 @@ fn sync_empty_state() { .start(); system.run().unwrap(); + arbiters.into_iter().map(|mut a| a.join()).for_each(drop); + arbiters_holder.write().unwrap().iter_mut().map(|a| a.join()).for_each(drop); }); } diff --git a/rustfmt.toml b/rustfmt.toml index ee85e33011a..784c074834a 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,4 +1,5 @@ use_small_heuristics = "Max" reorder_imports = true +edition = "2018" # fn_args_density = "Compressed" # overflow_delimited_expr = "true" diff --git a/test-utils/testlib/src/actix_utils.rs b/test-utils/testlib/src/actix_utils.rs index 4f1e7a7ba64..e7e8e5bd661 100644 --- a/test-utils/testlib/src/actix_utils.rs +++ b/test-utils/testlib/src/actix_utils.rs @@ -1,4 +1,6 @@ use std::sync::mpsc; +use std::thread; +use std::time::Duration; use actix::System; @@ -32,6 +34,8 @@ impl ShutdownableThread { impl Drop for ShutdownableThread { fn drop(&mut self) { self.shutdown(); + // Leaving some time for all threads to stop after system is stopped. + thread::sleep(Duration::from_millis(100)); self.join.take().unwrap().join().unwrap(); } } diff --git a/test-utils/testlib/src/lib.rs b/test-utils/testlib/src/lib.rs index 8d706706c3c..a9c658eecd8 100644 --- a/test-utils/testlib/src/lib.rs +++ b/test-utils/testlib/src/lib.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use actix::Addr; +use actix::{Addr, Arbiter}; use tempfile::{tempdir, TempDir}; use near_chain::{Chain, ChainGenesis, DoomslugThresholdMode}; @@ -54,7 +54,7 @@ pub fn start_nodes( num_lightclient: usize, epoch_length: BlockHeightDelta, genesis_height: BlockHeight, -) -> (Arc, Vec, Vec<(Addr, Addr)>) { +) -> (Arc, Vec, Vec<(Addr, Addr, Vec)>) { init_integration_logger(); let num_nodes = dirs.len(); @@ -100,8 +100,8 @@ pub fn start_nodes( let mut res = vec![]; for (i, near_config) in near_configs.into_iter().enumerate() { - let (client, view_client) = start_with_config(dirs[i].path(), near_config); - res.push((client, view_client)) + let (client, view_client, arbiters) = start_with_config(dirs[i].path(), near_config); + res.push((client, view_client, arbiters)) } (genesis, rpc_addrs, res) } diff --git a/tests/test_tps_regression.rs b/tests/test_tps_regression.rs index 8ca9c7a5f25..098c3822f29 100644 --- a/tests/test_tps_regression.rs +++ b/tests/test_tps_regression.rs @@ -140,17 +140,14 @@ mod test { .map(|idx| node.read().unwrap().user().get_block(idx).unwrap()) .collect::>(); for b in &blocks { - let gas_used = b.chunks.iter().fold(0, |acc, chunk| { + let _gas_used = b.chunks.iter().fold(0, |acc, chunk| { if chunk.height_included == b.header.height { acc + chunk.gas_used } else { acc } }); - observed_transactions - .write() - .unwrap() - .push((gas_used, Instant::now())); + observed_transactions.write().unwrap().push((1, Instant::now())); } prev_ind = new_ind; } From 2c2ad3f5bf55608a64b0ebb23d9d2be3c4510616 Mon Sep 17 00:00:00 2001 From: mikhailOK Date: Fri, 10 Jul 2020 00:17:10 -0700 Subject: [PATCH 2/6] Update neard/src/main.rs Co-authored-by: Vlad Frolov --- neard/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/neard/src/main.rs b/neard/src/main.rs index eda2d9643ba..70a1363d7e4 100644 --- a/neard/src/main.rs +++ b/neard/src/main.rs @@ -224,7 +224,7 @@ fn main() { let system = System::new("NEAR"); let (_, _, arbiters) = start_with_config(home_dir, near_config); system.run().unwrap(); - arbiters.into_iter().map(|mut a| a.join()).for_each(drop); + arbiters.into_iter().for_each(|mut a| a.join()); } ("unsafe_reset_data", Some(_args)) => { let store_path = get_store_path(home_dir); From c9f454892db1702a083a8532f8a117ad410a8833 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Fri, 10 Jul 2020 00:19:06 -0700 Subject: [PATCH 3/6] revert test_tps_regression --- tests/test_tps_regression.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/test_tps_regression.rs b/tests/test_tps_regression.rs index 098c3822f29..8ca9c7a5f25 100644 --- a/tests/test_tps_regression.rs +++ b/tests/test_tps_regression.rs @@ -140,14 +140,17 @@ mod test { .map(|idx| node.read().unwrap().user().get_block(idx).unwrap()) .collect::>(); for b in &blocks { - let _gas_used = b.chunks.iter().fold(0, |acc, chunk| { + let gas_used = b.chunks.iter().fold(0, |acc, chunk| { if chunk.height_included == b.header.height { acc + chunk.gas_used } else { acc } }); - observed_transactions.write().unwrap().push((1, Instant::now())); + observed_transactions + .write() + .unwrap() + .push((gas_used, Instant::now())); } prev_ind = new_ind; } From d8c06377ea6dbd85b98ed4915cc4d5bebf1ee4b3 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Fri, 10 Jul 2020 00:29:10 -0700 Subject: [PATCH 4/6] Revert "Update neard/src/main.rs" This reverts commit 2c2ad3f5bf55608a64b0ebb23d9d2be3c4510616. --- neard/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/neard/src/main.rs b/neard/src/main.rs index 70a1363d7e4..eda2d9643ba 100644 --- a/neard/src/main.rs +++ b/neard/src/main.rs @@ -224,7 +224,7 @@ fn main() { let system = System::new("NEAR"); let (_, _, arbiters) = start_with_config(home_dir, near_config); system.run().unwrap(); - arbiters.into_iter().for_each(|mut a| a.join()); + arbiters.into_iter().map(|mut a| a.join()).for_each(drop); } ("unsafe_reset_data", Some(_args)) => { let store_path = get_store_path(home_dir); From bc3e001f8ca7a62ed7b6a4602b142fe3f4482753 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Fri, 10 Jul 2020 01:06:47 -0700 Subject: [PATCH 5/6] testing --- neard/src/main.rs | 2 +- neard/tests/rpc_nodes.rs | 2 +- neard/tests/sync_nodes.rs | 12 ++++++------ neard/tests/sync_state_nodes.rs | 16 ++++++++-------- test-utils/testlib/src/actix_utils.rs | 4 ---- 5 files changed, 16 insertions(+), 20 deletions(-) diff --git a/neard/src/main.rs b/neard/src/main.rs index eda2d9643ba..1c22e297f28 100644 --- a/neard/src/main.rs +++ b/neard/src/main.rs @@ -224,7 +224,7 @@ fn main() { let system = System::new("NEAR"); let (_, _, arbiters) = start_with_config(home_dir, near_config); system.run().unwrap(); - arbiters.into_iter().map(|mut a| a.join()).for_each(drop); + arbiters.into_iter().for_each(|mut a| a.join().unwrap()); } ("unsafe_reset_data", Some(_args)) => { let store_path = get_store_path(home_dir); diff --git a/neard/tests/rpc_nodes.rs b/neard/tests/rpc_nodes.rs index 24e9e5e40a7..7dd63b39f7f 100644 --- a/neard/tests/rpc_nodes.rs +++ b/neard/tests/rpc_nodes.rs @@ -594,7 +594,7 @@ fn test_get_execution_outcome(is_tx_successful: bool) { .start(); system.run().unwrap(); - clients.into_iter().flat_map(|c| c.2.into_iter().map(|mut a| a.join())).for_each(drop); + clients.into_iter().for_each(|c| c.2.into_iter().for_each(|mut a| a.join().unwrap())); }); } diff --git a/neard/tests/sync_nodes.rs b/neard/tests/sync_nodes.rs index a39aace6c7f..1e0ba1f71d8 100644 --- a/neard/tests/sync_nodes.rs +++ b/neard/tests/sync_nodes.rs @@ -136,8 +136,8 @@ fn sync_nodes() { .start(); system.run().unwrap(); - arbiters.into_iter().map(|mut a| a.join()).for_each(drop); - arbiters2.into_iter().map(|mut a| a.join()).for_each(drop); + arbiters.into_iter().for_each(|mut a| a.join().unwrap()); + arbiters2.into_iter().for_each(|mut a| a.join().unwrap()); }); } @@ -206,8 +206,8 @@ fn sync_after_sync_nodes() { .start(); system.run().unwrap(); - arbiters.into_iter().map(|mut a| a.join()).for_each(drop); - arbiters2.into_iter().map(|mut a| a.join()).for_each(drop); + arbiters.into_iter().for_each(|mut a| a.join().unwrap()); + arbiters2.into_iter().for_each(|mut a| a.join().unwrap()); }); } @@ -305,7 +305,7 @@ fn sync_state_stake_change() { .start(); system.run().unwrap(); - arbiters.into_iter().map(|mut a| a.join()).for_each(drop); - arbiters_holder.write().unwrap().iter_mut().map(|a| a.join()).for_each(drop); + arbiters.into_iter().for_each(|mut a| a.join().unwrap()); + arbiters_holder.write().unwrap().iter_mut().for_each(|a| a.join().unwrap()); }); } diff --git a/neard/tests/sync_state_nodes.rs b/neard/tests/sync_state_nodes.rs index cf704d764a1..3c0253c9c0e 100644 --- a/neard/tests/sync_state_nodes.rs +++ b/neard/tests/sync_state_nodes.rs @@ -95,8 +95,8 @@ fn sync_state_nodes() { .start(); system.run().unwrap(); - arbiters.into_iter().map(|mut a| a.join()).for_each(drop); - arbiters_holder.write().unwrap().iter_mut().map(|a| a.join()).for_each(drop); + arbiters.into_iter().for_each(|mut a| a.join().unwrap()); + arbiters_holder.write().unwrap().iter_mut().for_each(|a| a.join().unwrap()); }); } @@ -234,10 +234,10 @@ fn sync_state_nodes_multishard() { .start(); system.run().unwrap(); - arbiters1.into_iter().map(|mut a| a.join()).for_each(drop); - arbiters2.into_iter().map(|mut a| a.join()).for_each(drop); - arbiters3.into_iter().map(|mut a| a.join()).for_each(drop); - arbiter_holder.write().unwrap().iter_mut().map(|a| a.join()).for_each(drop); + arbiters1.into_iter().for_each(|mut a| a.join().unwrap()); + arbiters2.into_iter().for_each(|mut a| a.join().unwrap()); + arbiters3.into_iter().for_each(|mut a| a.join().unwrap()); + arbiter_holder.write().unwrap().iter_mut().for_each(|a| a.join().unwrap()); }); } @@ -348,7 +348,7 @@ fn sync_empty_state() { .start(); system.run().unwrap(); - arbiters.into_iter().map(|mut a| a.join()).for_each(drop); - arbiters_holder.write().unwrap().iter_mut().map(|a| a.join()).for_each(drop); + arbiters.into_iter().for_each(|mut a| a.join().unwrap()); + arbiters_holder.write().unwrap().iter_mut().for_each(|a| a.join().unwrap()); }); } diff --git a/test-utils/testlib/src/actix_utils.rs b/test-utils/testlib/src/actix_utils.rs index e7e8e5bd661..4f1e7a7ba64 100644 --- a/test-utils/testlib/src/actix_utils.rs +++ b/test-utils/testlib/src/actix_utils.rs @@ -1,6 +1,4 @@ use std::sync::mpsc; -use std::thread; -use std::time::Duration; use actix::System; @@ -34,8 +32,6 @@ impl ShutdownableThread { impl Drop for ShutdownableThread { fn drop(&mut self) { self.shutdown(); - // Leaving some time for all threads to stop after system is stopped. - thread::sleep(Duration::from_millis(100)); self.join.take().unwrap().join().unwrap(); } } From 09126cb569bf6acb49c66909015f73aa677912fc Mon Sep 17 00:00:00 2001 From: Mikhail Date: Fri, 10 Jul 2020 15:01:06 -0700 Subject: [PATCH 6/6] fix view client Chain::new --- chain/chain/src/chain.rs | 39 +++++++++++++++++++++++++++++++++ chain/client/src/view_client.rs | 7 ++++-- 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index c2e82eb38b8..1799f8ad9ae 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -260,6 +260,45 @@ pub struct Chain { } impl Chain { + pub fn new_for_view_client( + runtime_adapter: Arc, + chain_genesis: &ChainGenesis, + doomslug_threshold_mode: DoomslugThresholdMode, + ) -> Result { + let (store, _state_store_update, state_roots) = runtime_adapter.genesis_state(); + let store = ChainStore::new(store, chain_genesis.height); + let genesis_chunks = genesis_chunks( + state_roots.clone(), + runtime_adapter.num_shards(), + chain_genesis.gas_limit, + chain_genesis.height, + ); + let genesis = Block::genesis( + chain_genesis.protocol_version, + genesis_chunks.iter().map(|chunk| chunk.header.clone()).collect(), + chain_genesis.time, + chain_genesis.height, + chain_genesis.min_gas_price, + chain_genesis.total_supply, + Chain::compute_bp_hash(&*runtime_adapter, EpochId::default(), &CryptoHash::default())?, + ); + Ok(Chain { + store, + runtime_adapter, + orphans: OrphanBlockPool::new(), + blocks_with_missing_chunks: OrphanBlockPool::new(), + genesis: genesis.header().clone(), + transaction_validity_period: chain_genesis.transaction_validity_period, + epoch_length: chain_genesis.epoch_length, + block_economics_config: BlockEconomicsConfig { + gas_price_adjustment_rate: chain_genesis.gas_price_adjustment_rate, + min_gas_price: chain_genesis.min_gas_price, + max_gas_price: chain_genesis.max_gas_price, + }, + doomslug_threshold_mode, + }) + } + pub fn new( runtime_adapter: Arc, chain_genesis: &ChainGenesis, diff --git a/chain/client/src/view_client.rs b/chain/client/src/view_client.rs index 338acff8d52..aa2db41a619 100644 --- a/chain/client/src/view_client.rs +++ b/chain/client/src/view_client.rs @@ -105,8 +105,11 @@ impl ViewClientActor { request_manager: Arc>, ) -> Result { // TODO: should we create shared ChainStore that is passed to both Client and ViewClient? - let chain = - Chain::new(runtime_adapter.clone(), chain_genesis, DoomslugThresholdMode::TwoThirds)?; + let chain = Chain::new_for_view_client( + runtime_adapter.clone(), + chain_genesis, + DoomslugThresholdMode::TwoThirds, + )?; Ok(ViewClientActor { #[cfg(feature = "adversarial")] adv_disable_header_sync: false,