Skip to content

Commit

Permalink
fix(client): setup separate SyncArbiter for ViewClientActor with 4 th…
Browse files Browse the repository at this point in the history
…reads

Change from 2752 + allow storage failures in code reachable from view
client.

Test plan
---------
Run existing tests
  • Loading branch information
ilblackdragon authored and mikhailOK committed Jul 10, 2020
1 parent 06b21f7 commit c527f90
Show file tree
Hide file tree
Showing 26 changed files with 311 additions and 131 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,5 @@ expensive_tests = ["neard/expensive_tests"]
regression_tests = []
old_tests = []
adversarial = ["neard/adversarial", "near-jsonrpc/adversarial", "near-store/adversarial"]
no_cache = ["node-runtime/no_cache", "near-store/no_cache"]
no_cache = ["neard/no_cache"]
metric_recorder = ["neard/metric_recorder"]
1 change: 1 addition & 0 deletions chain/chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ near-logger-utils = {path = "../../test-utils/logger"}
byzantine_asserts = []
expensive_tests = []
adversarial = []
no_cache = ["near-store/no_cache"]
28 changes: 18 additions & 10 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use near_primitives::views::{
};
use near_store::{ColState, ColStateHeaders, ColStateParts, ShardTries, StoreUpdate};

use crate::error::{Error, ErrorKind};
use crate::error::{Error, ErrorKind, LogTransientStorageError};
use crate::lightclient::get_epoch_block_producers_view;
use crate::store::{ChainStore, ChainStoreAccess, ChainStoreUpdate, GCMode};
use crate::types::{
Expand Down Expand Up @@ -1345,8 +1345,9 @@ impl Chain {
// Let's call it `current`.
// 2a. `prev_` means we're working with height before current.
// 3. In inner loops we use all prefixes with no relation to the context described above.
let sync_block =
self.get_block(&sync_hash).expect("block has already been checked for existence");
let sync_block = self
.get_block(&sync_hash)
.log_storage_error("block has already been checked for existence")?;
let sync_block_header = sync_block.header().clone();
let sync_block_epoch_id = sync_block.header().epoch_id().clone();
if shard_id as usize >= sync_block.chunks().len() {
Expand Down Expand Up @@ -1464,8 +1465,9 @@ impl Chain {
root_proofs.push(root_proofs_cur);
}

let state_root_node =
self.runtime_adapter.get_state_root_node(shard_id, &chunk_header.inner.prev_state_root);
let state_root_node = self
.runtime_adapter
.get_state_root_node(shard_id, &chunk_header.inner.prev_state_root)?;

let shard_state_header = ShardStateSyncResponseHeader {
chunk,
Expand Down Expand Up @@ -1497,8 +1499,9 @@ impl Chain {
return Ok(state_part);
}

let sync_block =
self.get_block(&sync_hash).expect("block has already been checked for existence");
let sync_block = self
.get_block(&sync_hash)
.log_storage_error("block has already been checked for existence")?;
let sync_block_header = sync_block.header().clone();
let sync_block_epoch_id = sync_block.header().epoch_id().clone();
if shard_id as usize >= sync_block.chunks().len() {
Expand All @@ -1515,14 +1518,19 @@ impl Chain {
return Err(ErrorKind::InvalidStateRequest("shard_id out of bounds".into()).into());
}
let state_root = sync_prev_block.chunks()[shard_id as usize].inner.prev_state_root.clone();
let state_root_node = self.runtime_adapter.get_state_root_node(shard_id, &state_root);
let state_root_node = self
.runtime_adapter
.get_state_root_node(shard_id, &state_root)
.log_storage_error("get_state_root_node fail")?;
let num_parts = get_num_state_parts(state_root_node.memory_usage);

if part_id >= num_parts {
return Err(ErrorKind::InvalidStateRequest("part_id out of bound".to_string()).into());
}
let state_part =
self.runtime_adapter.obtain_state_part(shard_id, &state_root, part_id, num_parts);
let state_part = self
.runtime_adapter
.obtain_state_part(shard_id, &state_root, part_id, num_parts)
.log_storage_error("obtain_state_part fail")?;

// Before saving State Part data, we need to make sure we can calculate and save State Header
self.get_state_response_header(shard_id, sync_hash)?;
Expand Down
17 changes: 17 additions & 0 deletions chain/chain/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::io;

use chrono::{DateTime, Utc};
use failure::{Backtrace, Context, Fail};
use log::error;

use near_primitives::challenge::{ChunkProofs, ChunkState};
use near_primitives::errors::{EpochError, StorageError};
Expand Down Expand Up @@ -173,6 +174,22 @@ pub enum ErrorKind {
Other(String),
}

/// For now StorageError can happen at any time from ViewClient because of
/// the used isolation level + running ViewClient in a separate thread.
pub trait LogTransientStorageError {
fn log_storage_error(self, message: &str) -> Self;
}

impl<T> LogTransientStorageError for Result<T, Error> {
fn log_storage_error(self, message: &str) -> Self {
error!(target: "client",
"Transient storage error: {}",
message
);
self
}
}

impl Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let cause = match self.cause() {
Expand Down
7 changes: 7 additions & 0 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,16 @@ use crate::error::{Error, ErrorKind};
use crate::types::{Block, BlockHeader, LatestKnown};

/// lru cache size
#[cfg(not(feature = "no_cache"))]
const CACHE_SIZE: usize = 100;
#[cfg(not(feature = "no_cache"))]
const CHUNK_CACHE_SIZE: usize = 1024;

#[cfg(feature = "no_cache")]
const CACHE_SIZE: usize = 1;
#[cfg(feature = "no_cache")]
const CHUNK_CACHE_SIZE: usize = 1;

#[derive(Clone)]
pub enum GCMode {
Fork(ShardTries),
Expand Down
14 changes: 9 additions & 5 deletions chain/chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ impl RuntimeAdapter for KeyValueRuntime {
state_root: &StateRoot,
part_id: u64,
num_parts: u64,
) -> Vec<u8> {
) -> Result<Vec<u8>, Error> {
assert!(part_id < num_parts);
let state = self.state.read().unwrap().get(&state_root).unwrap().clone();
let data = state.try_to_vec().expect("should never fall");
Expand All @@ -771,7 +771,7 @@ impl RuntimeAdapter for KeyValueRuntime {
if part_id + 1 == num_parts {
end = state_size;
}
data[begin as usize..end as usize].to_vec()
Ok(data[begin as usize..end as usize].to_vec())
}

fn validate_state_part(
Expand Down Expand Up @@ -805,8 +805,12 @@ impl RuntimeAdapter for KeyValueRuntime {
Ok(())
}

fn get_state_root_node(&self, _shard_id: ShardId, state_root: &StateRoot) -> StateRootNode {
StateRootNode {
fn get_state_root_node(
&self,
_shard_id: ShardId,
state_root: &StateRoot,
) -> Result<StateRootNode, Error> {
Ok(StateRootNode {
data: self
.state
.read()
Expand All @@ -817,7 +821,7 @@ impl RuntimeAdapter for KeyValueRuntime {
.try_to_vec()
.expect("should never fall"),
memory_usage: self.state_size.read().unwrap().get(&state_root).unwrap().clone(),
}
})
}

fn validate_state_root_node(
Expand Down
8 changes: 6 additions & 2 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ pub trait RuntimeAdapter: Send + Sync {
state_root: &StateRoot,
part_id: u64,
num_parts: u64,
) -> Vec<u8>;
) -> Result<Vec<u8>, Error>;

/// Validate state part that expected to be given state root with provided data.
/// Returns false if the resulting part doesn't match the expected one.
Expand All @@ -442,7 +442,11 @@ pub trait RuntimeAdapter: Send + Sync {
/// Returns StateRootNode of a state.
/// Panics if requested hash is not in storage.
/// Never returns Error
fn get_state_root_node(&self, shard_id: ShardId, state_root: &StateRoot) -> StateRootNode;
fn get_state_root_node(
&self,
shard_id: ShardId,
state_root: &StateRoot,
) -> Result<StateRootNode, Error>;

/// Validate StateRootNode of a state.
fn validate_state_root_node(
Expand Down
29 changes: 28 additions & 1 deletion chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::{Arc, RwLock};
use std::thread;
use std::time::{Duration, Instant};

use actix::{Actor, Addr, AsyncContext, Context, Handler};
use actix::{Actor, Addr, Arbiter, AsyncContext, Context, Handler};
use chrono::{DateTime, Utc};
use log::{debug, error, info, trace, warn};

Expand Down Expand Up @@ -1237,3 +1237,30 @@ impl ClientActor {
});
}
}

/// Starts client in a separate Arbiter (thread).
pub fn start_client(
client_config: ClientConfig,
chain_genesis: ChainGenesis,
runtime_adapter: Arc<dyn RuntimeAdapter>,
node_id: PeerId,
network_adapter: Arc<dyn NetworkAdapter>,
validator_signer: Option<Arc<dyn ValidatorSigner>>,
telemetry_actor: Addr<TelemetryActor>,
) -> (Addr<ClientActor>, Arbiter) {
let client_arbiter = Arbiter::current();
let client_addr = ClientActor::start_in_arbiter(&client_arbiter, move |_ctx| {
ClientActor::new(
client_config,
chain_genesis,
runtime_adapter,
node_id,
network_adapter,
validator_signer,
telemetry_actor,
true,
)
.unwrap()
});
(client_addr, client_arbiter)
}
4 changes: 2 additions & 2 deletions chain/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
extern crate lazy_static;

pub use crate::client::Client;
pub use crate::client_actor::ClientActor;
pub use crate::client_actor::{start_client, ClientActor};
pub use crate::types::{
Error, GetBlock, GetBlockProof, GetBlockProofResponse, GetBlockWithMerkleTree, GetChunk,
GetExecutionOutcome, GetExecutionOutcomeResponse, GetGasPrice, GetNetworkInfo,
GetNextLightClientBlock, GetStateChanges, GetStateChangesInBlock, GetValidatorInfo, Query,
Status, StatusResponse, SyncStatus, TxStatus, TxStatusError,
};
pub use crate::view_client::ViewClientActor;
pub use crate::view_client::{start_view_client, ViewClientActor};

mod client;
mod client_actor;
Expand Down
20 changes: 9 additions & 11 deletions chain/client/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use near_store::test_utils::create_test_store;
use near_store::Store;
use near_telemetry::TelemetryActor;

use crate::{Client, ClientActor, SyncStatus, ViewClientActor};
use crate::{start_view_client, Client, ClientActor, SyncStatus, ViewClientActor};
use near_network::test_utils::MockNetworkAdapter;
use num_rational::Rational;

Expand All @@ -58,7 +58,7 @@ pub fn setup(
network_adapter: Arc<dyn NetworkAdapter>,
transaction_validity_period: NumBlocks,
genesis_time: DateTime<Utc>,
) -> (Block, ClientActor, ViewClientActor) {
) -> (Block, ClientActor, Addr<ViewClientActor>) {
let store = create_test_store();
let num_validator_seats = validators.iter().map(|x| x.len()).sum::<usize>() as NumSeats;
let runtime = Arc::new(KeyValueRuntime::new_with_validators(
Expand Down Expand Up @@ -99,14 +99,13 @@ pub fn setup(
num_validator_seats,
archive,
);
let view_client = ViewClientActor::new(
let view_client_addr = start_view_client(
Some(signer.validator_id().clone()),
&chain_genesis,
chain_genesis.clone(),
runtime.clone(),
network_adapter.clone(),
config.clone(),
)
.unwrap();
);

let client = ClientActor::new(
config,
Expand All @@ -119,7 +118,7 @@ pub fn setup(
enable_doomslug,
)
.unwrap();
(genesis_block, client, view_client)
(genesis_block, client, view_client_addr)
}

/// Sets up ClientActor and ViewClientActor with mock PeerManager.
Expand Down Expand Up @@ -161,7 +160,7 @@ pub fn setup_mock_with_validity_period(
transaction_validity_period: NumBlocks,
) -> (Addr<ClientActor>, Addr<ViewClientActor>) {
let network_adapter = Arc::new(NetworkRecipient::new());
let (_, client, view_client) = setup(
let (_, client, view_client_addr) = setup(
vec![validators],
1,
1,
Expand All @@ -177,7 +176,6 @@ pub fn setup_mock_with_validity_period(
Utc::now(),
);
let client_addr = client.start();
let view_client_addr = view_client.start();
let client_addr1 = client_addr.clone();

let network_actor = NetworkMock::mock(Box::new(move |msg, ctx| {
Expand Down Expand Up @@ -663,7 +661,7 @@ pub fn setup_mock_all_validators(
.start();
let network_adapter = NetworkRecipient::new();
network_adapter.set_recipient(pm.recipient());
let (block, client, view_client) = setup(
let (block, client, view_client_addr) = setup(
validators_clone1.clone(),
validator_groups,
num_shards,
Expand All @@ -678,7 +676,7 @@ pub fn setup_mock_all_validators(
10000,
genesis_time,
);
*view_client_addr1.write().unwrap() = Some(view_client.start());
*view_client_addr1.write().unwrap() = Some(view_client_addr);
*genesis_block1.write().unwrap() = Some(block);
client
});
Expand Down
Loading

0 comments on commit c527f90

Please sign in to comment.