Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(client): setup separate SyncArbiter for ViewClientActor with 4 threads #2970

Merged
merged 8 commits into from
Jul 11, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,5 @@ expensive_tests = ["neard/expensive_tests"]
regression_tests = []
old_tests = []
adversarial = ["neard/adversarial", "near-jsonrpc/adversarial", "near-store/adversarial"]
no_cache = ["node-runtime/no_cache", "near-store/no_cache"]
no_cache = ["neard/no_cache"]
metric_recorder = ["neard/metric_recorder"]
1 change: 1 addition & 0 deletions chain/chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ near-logger-utils = {path = "../../test-utils/logger"}
byzantine_asserts = []
expensive_tests = []
adversarial = []
no_cache = ["near-store/no_cache"]
28 changes: 18 additions & 10 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use near_primitives::views::{
};
use near_store::{ColState, ColStateHeaders, ColStateParts, ShardTries, StoreUpdate};

use crate::error::{Error, ErrorKind};
use crate::error::{Error, ErrorKind, LogTransientStorageError};
use crate::lightclient::get_epoch_block_producers_view;
use crate::store::{ChainStore, ChainStoreAccess, ChainStoreUpdate, GCMode};
use crate::types::{
Expand Down Expand Up @@ -1345,8 +1345,9 @@ impl Chain {
// Let's call it `current`.
// 2a. `prev_` means we're working with height before current.
// 3. In inner loops we use all prefixes with no relation to the context described above.
let sync_block =
self.get_block(&sync_hash).expect("block has already been checked for existence");
let sync_block = self
.get_block(&sync_hash)
.log_storage_error("block has already been checked for existence")?;
let sync_block_header = sync_block.header().clone();
let sync_block_epoch_id = sync_block.header().epoch_id().clone();
if shard_id as usize >= sync_block.chunks().len() {
Expand Down Expand Up @@ -1464,8 +1465,9 @@ impl Chain {
root_proofs.push(root_proofs_cur);
}

let state_root_node =
self.runtime_adapter.get_state_root_node(shard_id, &chunk_header.inner.prev_state_root);
let state_root_node = self
.runtime_adapter
.get_state_root_node(shard_id, &chunk_header.inner.prev_state_root)?;

let shard_state_header = ShardStateSyncResponseHeader {
chunk,
Expand Down Expand Up @@ -1497,8 +1499,9 @@ impl Chain {
return Ok(state_part);
}

let sync_block =
self.get_block(&sync_hash).expect("block has already been checked for existence");
let sync_block = self
.get_block(&sync_hash)
.log_storage_error("block has already been checked for existence")?;
let sync_block_header = sync_block.header().clone();
let sync_block_epoch_id = sync_block.header().epoch_id().clone();
if shard_id as usize >= sync_block.chunks().len() {
Expand All @@ -1515,14 +1518,19 @@ impl Chain {
return Err(ErrorKind::InvalidStateRequest("shard_id out of bounds".into()).into());
}
let state_root = sync_prev_block.chunks()[shard_id as usize].inner.prev_state_root.clone();
let state_root_node = self.runtime_adapter.get_state_root_node(shard_id, &state_root);
let state_root_node = self
.runtime_adapter
.get_state_root_node(shard_id, &state_root)
.log_storage_error("get_state_root_node fail")?;
let num_parts = get_num_state_parts(state_root_node.memory_usage);

if part_id >= num_parts {
return Err(ErrorKind::InvalidStateRequest("part_id out of bound".to_string()).into());
}
let state_part =
self.runtime_adapter.obtain_state_part(shard_id, &state_root, part_id, num_parts);
let state_part = self
.runtime_adapter
.obtain_state_part(shard_id, &state_root, part_id, num_parts)
.log_storage_error("obtain_state_part fail")?;

// Before saving State Part data, we need to make sure we can calculate and save State Header
self.get_state_response_header(shard_id, sync_hash)?;
Expand Down
17 changes: 17 additions & 0 deletions chain/chain/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::io;

use chrono::{DateTime, Utc};
use failure::{Backtrace, Context, Fail};
use log::error;

use near_primitives::challenge::{ChunkProofs, ChunkState};
use near_primitives::errors::{EpochError, StorageError};
Expand Down Expand Up @@ -173,6 +174,22 @@ pub enum ErrorKind {
Other(String),
}

/// For now StorageError can happen at any time from ViewClient because of
/// the used isolation level + running ViewClient in a separate thread.
pub trait LogTransientStorageError {
fn log_storage_error(self, message: &str) -> Self;
}

impl<T> LogTransientStorageError for Result<T, Error> {
fn log_storage_error(self, message: &str) -> Self {
error!(target: "client",
"Transient storage error: {}",
message
);
self
}
}

impl Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let cause = match self.cause() {
Expand Down
7 changes: 7 additions & 0 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,16 @@ use crate::error::{Error, ErrorKind};
use crate::types::{Block, BlockHeader, LatestKnown};

/// lru cache size
#[cfg(not(feature = "no_cache"))]
const CACHE_SIZE: usize = 100;
#[cfg(not(feature = "no_cache"))]
const CHUNK_CACHE_SIZE: usize = 1024;

#[cfg(feature = "no_cache")]
const CACHE_SIZE: usize = 1;
#[cfg(feature = "no_cache")]
const CHUNK_CACHE_SIZE: usize = 1;

#[derive(Clone)]
pub enum GCMode {
Fork(ShardTries),
Expand Down
14 changes: 9 additions & 5 deletions chain/chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ impl RuntimeAdapter for KeyValueRuntime {
state_root: &StateRoot,
part_id: u64,
num_parts: u64,
) -> Vec<u8> {
) -> Result<Vec<u8>, Error> {
assert!(part_id < num_parts);
let state = self.state.read().unwrap().get(&state_root).unwrap().clone();
let data = state.try_to_vec().expect("should never fall");
Expand All @@ -771,7 +771,7 @@ impl RuntimeAdapter for KeyValueRuntime {
if part_id + 1 == num_parts {
end = state_size;
}
data[begin as usize..end as usize].to_vec()
Ok(data[begin as usize..end as usize].to_vec())
}

fn validate_state_part(
Expand Down Expand Up @@ -805,8 +805,12 @@ impl RuntimeAdapter for KeyValueRuntime {
Ok(())
}

fn get_state_root_node(&self, _shard_id: ShardId, state_root: &StateRoot) -> StateRootNode {
StateRootNode {
fn get_state_root_node(
&self,
_shard_id: ShardId,
state_root: &StateRoot,
) -> Result<StateRootNode, Error> {
Ok(StateRootNode {
data: self
.state
.read()
Expand All @@ -817,7 +821,7 @@ impl RuntimeAdapter for KeyValueRuntime {
.try_to_vec()
.expect("should never fall"),
memory_usage: self.state_size.read().unwrap().get(&state_root).unwrap().clone(),
}
})
}

fn validate_state_root_node(
Expand Down
8 changes: 6 additions & 2 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ pub trait RuntimeAdapter: Send + Sync {
state_root: &StateRoot,
part_id: u64,
num_parts: u64,
) -> Vec<u8>;
) -> Result<Vec<u8>, Error>;

/// Validate state part that expected to be given state root with provided data.
/// Returns false if the resulting part doesn't match the expected one.
Expand All @@ -442,7 +442,11 @@ pub trait RuntimeAdapter: Send + Sync {
/// Returns StateRootNode of a state.
/// Panics if requested hash is not in storage.
/// Never returns Error
fn get_state_root_node(&self, shard_id: ShardId, state_root: &StateRoot) -> StateRootNode;
fn get_state_root_node(
&self,
shard_id: ShardId,
state_root: &StateRoot,
) -> Result<StateRootNode, Error>;

/// Validate StateRootNode of a state.
fn validate_state_root_node(
Expand Down
29 changes: 28 additions & 1 deletion chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::{Arc, RwLock};
use std::thread;
use std::time::{Duration, Instant};

use actix::{Actor, Addr, AsyncContext, Context, Handler};
use actix::{Actor, Addr, Arbiter, AsyncContext, Context, Handler};
use chrono::{DateTime, Utc};
use log::{debug, error, info, trace, warn};

Expand Down Expand Up @@ -1237,3 +1237,30 @@ impl ClientActor {
});
}
}

/// Starts client in a separate Arbiter (thread).
pub fn start_client(
client_config: ClientConfig,
chain_genesis: ChainGenesis,
runtime_adapter: Arc<dyn RuntimeAdapter>,
node_id: PeerId,
network_adapter: Arc<dyn NetworkAdapter>,
validator_signer: Option<Arc<dyn ValidatorSigner>>,
telemetry_actor: Addr<TelemetryActor>,
) -> (Addr<ClientActor>, Arbiter) {
let client_arbiter = Arbiter::current();
let client_addr = ClientActor::start_in_arbiter(&client_arbiter, move |_ctx| {
ClientActor::new(
client_config,
chain_genesis,
runtime_adapter,
node_id,
network_adapter,
validator_signer,
telemetry_actor,
true,
)
.unwrap()
});
(client_addr, client_arbiter)
}
4 changes: 2 additions & 2 deletions chain/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
extern crate lazy_static;

pub use crate::client::Client;
pub use crate::client_actor::ClientActor;
pub use crate::client_actor::{start_client, ClientActor};
pub use crate::types::{
Error, GetBlock, GetBlockProof, GetBlockProofResponse, GetBlockWithMerkleTree, GetChunk,
GetExecutionOutcome, GetExecutionOutcomeResponse, GetGasPrice, GetNetworkInfo,
GetNextLightClientBlock, GetStateChanges, GetStateChangesInBlock, GetValidatorInfo, Query,
Status, StatusResponse, SyncStatus, TxStatus, TxStatusError,
};
pub use crate::view_client::ViewClientActor;
pub use crate::view_client::{start_view_client, ViewClientActor};

mod client;
mod client_actor;
Expand Down
20 changes: 9 additions & 11 deletions chain/client/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use near_store::test_utils::create_test_store;
use near_store::Store;
use near_telemetry::TelemetryActor;

use crate::{Client, ClientActor, SyncStatus, ViewClientActor};
use crate::{start_view_client, Client, ClientActor, SyncStatus, ViewClientActor};
use near_network::test_utils::MockNetworkAdapter;
use num_rational::Rational;

Expand All @@ -58,7 +58,7 @@ pub fn setup(
network_adapter: Arc<dyn NetworkAdapter>,
transaction_validity_period: NumBlocks,
genesis_time: DateTime<Utc>,
) -> (Block, ClientActor, ViewClientActor) {
) -> (Block, ClientActor, Addr<ViewClientActor>) {
let store = create_test_store();
let num_validator_seats = validators.iter().map(|x| x.len()).sum::<usize>() as NumSeats;
let runtime = Arc::new(KeyValueRuntime::new_with_validators(
Expand Down Expand Up @@ -99,14 +99,13 @@ pub fn setup(
num_validator_seats,
archive,
);
let view_client = ViewClientActor::new(
let view_client_addr = start_view_client(
Some(signer.validator_id().clone()),
&chain_genesis,
chain_genesis.clone(),
runtime.clone(),
network_adapter.clone(),
config.clone(),
)
.unwrap();
);

let client = ClientActor::new(
config,
Expand All @@ -119,7 +118,7 @@ pub fn setup(
enable_doomslug,
)
.unwrap();
(genesis_block, client, view_client)
(genesis_block, client, view_client_addr)
}

/// Sets up ClientActor and ViewClientActor with mock PeerManager.
Expand Down Expand Up @@ -161,7 +160,7 @@ pub fn setup_mock_with_validity_period(
transaction_validity_period: NumBlocks,
) -> (Addr<ClientActor>, Addr<ViewClientActor>) {
let network_adapter = Arc::new(NetworkRecipient::new());
let (_, client, view_client) = setup(
let (_, client, view_client_addr) = setup(
vec![validators],
1,
1,
Expand All @@ -177,7 +176,6 @@ pub fn setup_mock_with_validity_period(
Utc::now(),
);
let client_addr = client.start();
let view_client_addr = view_client.start();
let client_addr1 = client_addr.clone();

let network_actor = NetworkMock::mock(Box::new(move |msg, ctx| {
Expand Down Expand Up @@ -663,7 +661,7 @@ pub fn setup_mock_all_validators(
.start();
let network_adapter = NetworkRecipient::new();
network_adapter.set_recipient(pm.recipient());
let (block, client, view_client) = setup(
let (block, client, view_client_addr) = setup(
validators_clone1.clone(),
validator_groups,
num_shards,
Expand All @@ -678,7 +676,7 @@ pub fn setup_mock_all_validators(
10000,
genesis_time,
);
*view_client_addr1.write().unwrap() = Some(view_client.start());
*view_client_addr1.write().unwrap() = Some(view_client_addr);
*genesis_block1.write().unwrap() = Some(block);
client
});
Expand Down
Loading