Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: State sync from local filesystem #8913

Merged
merged 24 commits into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@

### Non-protocol Changes

* Node can sync State from S3. [#8789](https://github.com/near/nearcore/pull/8789)
* The contract runtime switched to using our fork of wasmer, with various improvements.
* undo-block tool to reset the chain head from current head to its prev block. Use the tool by running: `./target/release/neard --home {path_to_config_directory} undo-block`. [#8681](https://github.com/near/nearcore/pull/8681)
* Node can sync State from S3. [#8789](https://github.com/near/nearcore/pull/8789)
* Node can sync State from local filesystem. [#8789](https://github.com/near/nearcore/pull/8789)
* Add per shard granularity for chunks in validator info metric. [#8934](https://github.com/near/nearcore/pull/8934)
* Add prometheus metrics for expected number of blocks/chunks at the end of the epoch. [#8759](https://github.com/near/nearcore/pull/8759)

Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4521,7 +4521,10 @@ impl Chain {
{
let prev_hash = *sync_block.header().prev_hash();
// If sync_hash is not on the Epoch boundary, it's malicious behavior
Ok(self.runtime_adapter.is_next_block_epoch_start(&prev_hash)?)
let is_next_block_epoch_start =
self.runtime_adapter.is_next_block_epoch_start(&prev_hash)?;

Ok(is_next_block_epoch_start)
} else {
Ok(false) // invalid Epoch of sync_hash, possible malicious behavior
}
Expand Down
4 changes: 2 additions & 2 deletions chain/client-primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ pub enum AccountOrPeerIdOrHash {
AccountId(AccountId),
PeerId(PeerId),
Hash(CryptoHash),
ExternalStorage,
}

#[derive(Debug, serde::Serialize)]
Expand All @@ -63,7 +62,8 @@ pub struct DownloadStatus {
pub state_requests_count: u64,
pub last_target: Option<AccountOrPeerIdOrHash>,
#[serde(skip_serializing, skip_deserializing)]
pub response: Arc<Mutex<Option<Result<(u16, Vec<u8>), String>>>>,
// Use type `String` as an error to avoid a dependency on the `rust-s3` or `anyhow` crates.
pub response: Arc<Mutex<Option<Result<Vec<u8>, String>>>>,
ppca marked this conversation as resolved.
Show resolved Hide resolved
}

impl DownloadStatus {
Expand Down
22 changes: 12 additions & 10 deletions chain/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ edition.workspace = true
actix-rt.workspace = true
actix.workspace = true
ansi_term.workspace = true
anyhow.workspace = true
async-trait.workspace = true
borsh.workspace = true
chrono.workspace = true
Expand All @@ -20,6 +21,7 @@ num-rational.workspace = true
once_cell.workspace = true
rand.workspace = true
reed-solomon-erasure.workspace = true
regex.workspace = true
rust-s3.workspace = true
serde_json.workspace = true
strum.workspace = true
Expand All @@ -28,24 +30,24 @@ thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true

delay-detector.workspace = true
near-async.workspace = true
near-chain-primitives.workspace = true
near-crypto.workspace = true
near-primitives.workspace = true
near-store.workspace = true
near-chain-configs.workspace = true
near-chain-primitives.workspace = true
near-chain.workspace = true
near-chunks.workspace = true
near-client-primitives.workspace = true
near-crypto.workspace = true
near-dyn-configs.workspace = true
near-epoch-manager.workspace = true
near-network.workspace = true
near-pool.workspace = true
near-chunks.workspace = true
near-telemetry.workspace = true
near-o11y.workspace = true
near-performance-metrics.workspace = true
near-performance-metrics-macros.workspace = true
near-epoch-manager.workspace = true
delay-detector.workspace = true
near-performance-metrics.workspace = true
near-pool.workspace = true
near-primitives.workspace = true
near-store.workspace = true
near-telemetry.workspace = true

[dev-dependencies]
assert_matches.workspace = true
Expand Down
10 changes: 2 additions & 8 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,7 @@ impl Client {
network_adapter.clone(),
config.state_sync_timeout,
&config.chain_id,
config.state_sync_from_s3_enabled,
&config.state_sync_s3_bucket,
&config.state_sync_s3_region,
config.state_sync_num_concurrent_s3_requests,
&config.state_sync.sync,
);
let num_block_producer_seats = config.num_block_producer_seats as usize;
let data_parts = runtime_adapter.num_data_parts();
Expand Down Expand Up @@ -2131,10 +2128,7 @@ impl Client {
network_adapter1,
state_sync_timeout,
&self.config.chain_id,
self.config.state_sync_from_s3_enabled,
&self.config.state_sync_s3_bucket,
&self.config.state_sync_s3_region,
self.config.state_sync_num_concurrent_s3_requests,
&self.config.state_sync.sync,
),
new_shard_sync,
BlocksCatchUpState::new(sync_hash, epoch_id),
Expand Down
6 changes: 5 additions & 1 deletion chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,11 @@ impl Handler<WithSpanContext<Status>> for ClientActor {
sync_status: format!(
"{} ({})",
self.client.sync_status.as_variant_name().to_string(),
display_sync_status(&self.client.sync_status, &self.client.chain.head()?,),
display_sync_status(
&self.client.sync_status,
&self.client.chain.head()?,
&self.client.config.state_sync.sync,
),
),
catchup_status: self.client.get_catchup_status()?,
current_head_status: head.clone().into(),
Expand Down
32 changes: 19 additions & 13 deletions chain/client/src/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::config_updater::ConfigUpdater;
use crate::{metrics, SyncStatus};
use actix::Addr;
use itertools::Itertools;
use near_chain_configs::{ClientConfig, LogSummaryStyle};
use near_chain_configs::{ClientConfig, LogSummaryStyle, SyncConfig};
use near_network::types::NetworkInfo;
use near_primitives::block::Tip;
use near_primitives::network::PeerId;
Expand Down Expand Up @@ -343,10 +343,9 @@ impl InfoHelper {

let s = |num| if num == 1 { "" } else { "s" };

let sync_status_log = Some(display_sync_status(sync_status, head));

let sync_status_log =
Some(display_sync_status(sync_status, head, &client_config.state_sync.sync));
let catchup_status_log = display_catchup_status(catchup_status);

let validator_info_log = validator_info.as_ref().map(|info| {
format!(
" {}{} validator{}",
Expand Down Expand Up @@ -564,7 +563,11 @@ pub fn display_catchup_status(catchup_status: Vec<CatchupStatusView>) -> String
.join("\n")
}

pub fn display_sync_status(sync_status: &SyncStatus, head: &Tip) -> String {
pub fn display_sync_status(
sync_status: &SyncStatus,
head: &Tip,
state_sync_config: &SyncConfig,
) -> String {
metrics::SYNC_STATUS.set(sync_status.repr() as i64);
match sync_status {
SyncStatus::AwaitingPeers => format!("#{:>8} Waiting for peers", head.height),
Expand Down Expand Up @@ -609,14 +612,17 @@ pub fn display_sync_status(sync_status: &SyncStatus, head: &Tip) -> String {
for (shard_id, shard_status) in shard_statuses {
write!(res, "[{}: {}]", shard_id, shard_status.status.to_string(),).unwrap();
}
// TODO #8719
tracing::warn!(target: "stats",
"The node is syncing its State. The current implementation of this mechanism is known to be unreliable. It may never complete, or fail randomly and corrupt the DB.\n\
Suggestions:\n\
* Download a recent data snapshot and restart the node.\n\
* Disable state sync in the config. Add `\"state_sync_enabled\": false` to `config.json`.\n\
\n\
A better implementation of State Sync is work in progress.");
if matches!(state_sync_config, SyncConfig::Peers) {
// TODO #8719
tracing::warn!(
target: "stats",
"The node is syncing its State. The current implementation of this mechanism is known to be unreliable. It may never complete, or fail randomly and corrupt the DB.\n\
Suggestions:\n\
* Download a recent data snapshot and restart the node.\n\
* Disable state sync in the config. Add `\"state_sync_enabled\": false` to `config.json`.\n\
\n\
A better implementation of State Sync is work in progress.");
}
res
}
SyncStatus::StateSyncDone => "State sync done".to_string(),
Expand Down
115 changes: 55 additions & 60 deletions chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ pub(crate) static NODE_PROTOCOL_UPGRADE_VOTING_START: Lazy<IntGauge> = Lazy::new
.unwrap()
});

pub static PRODUCE_CHUNK_TIME: Lazy<near_o11y::metrics::HistogramVec> = Lazy::new(|| {
pub(crate) static PRODUCE_CHUNK_TIME: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_produce_chunk_time",
"Time taken to produce a chunk",
Expand All @@ -351,7 +351,7 @@ pub static PRODUCE_CHUNK_TIME: Lazy<near_o11y::metrics::HistogramVec> = Lazy::ne
.unwrap()
});

pub static VIEW_CLIENT_MESSAGE_TIME: Lazy<near_o11y::metrics::HistogramVec> = Lazy::new(|| {
pub(crate) static VIEW_CLIENT_MESSAGE_TIME: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_view_client_messages_processing_time",
"Time that view client takes to handle different messages",
Expand All @@ -361,16 +361,15 @@ pub static VIEW_CLIENT_MESSAGE_TIME: Lazy<near_o11y::metrics::HistogramVec> = La
.unwrap()
});

pub static PRODUCE_AND_DISTRIBUTE_CHUNK_TIME: Lazy<near_o11y::metrics::HistogramVec> =
Lazy::new(|| {
try_create_histogram_vec(
"near_produce_and_distribute_chunk_time",
"Time to produce a chunk and distribute it to peers",
&["shard_id"],
Some(exponential_buckets(0.001, 2.0, 16).unwrap()),
)
.unwrap()
});
pub(crate) static PRODUCE_AND_DISTRIBUTE_CHUNK_TIME: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_produce_and_distribute_chunk_time",
"Time to produce a chunk and distribute it to peers",
&["shard_id"],
Some(exponential_buckets(0.001, 2.0, 16).unwrap()),
)
.unwrap()
});
/// Exports neard, protocol and database versions via Prometheus metrics.
///
/// Sets metrics which export node’s max supported protocol version, used
Expand All @@ -391,7 +390,7 @@ pub(crate) fn export_version(neard_version: &near_primitives::version::Version)
.inc();
}

pub static STATE_SYNC_STAGE: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::new(|| {
pub(crate) static STATE_SYNC_STAGE: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_state_sync_stage",
"Stage of state sync per shard",
Expand All @@ -400,7 +399,7 @@ pub static STATE_SYNC_STAGE: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::new(|
.unwrap()
});

pub static STATE_SYNC_RETRY_PART: Lazy<near_o11y::metrics::IntCounterVec> = Lazy::new(|| {
pub(crate) static STATE_SYNC_RETRY_PART: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_retry_part_total",
"Number of part requests retried",
Expand All @@ -409,7 +408,7 @@ pub static STATE_SYNC_RETRY_PART: Lazy<near_o11y::metrics::IntCounterVec> = Lazy
.unwrap()
});

pub static STATE_SYNC_PARTS_DONE: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::new(|| {
pub(crate) static STATE_SYNC_PARTS_DONE: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_state_sync_parts_done",
"Number of parts downloaded",
Expand All @@ -418,16 +417,16 @@ pub static STATE_SYNC_PARTS_DONE: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::
.unwrap()
});

pub static STATE_SYNC_PARTS_TOTAL: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::new(|| {
pub(crate) static STATE_SYNC_PARTS_TOTAL: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_state_sync_parts_per_shard",
"Number of parts that need to be downloaded for the shard",
"Number of parts in the shard",
&["shard_id"],
)
.unwrap()
});

pub static STATE_SYNC_DISCARD_PARTS: Lazy<near_o11y::metrics::IntCounterVec> = Lazy::new(|| {
pub(crate) static STATE_SYNC_DISCARD_PARTS: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_discard_parts_total",
"Number of times all downloaded parts were discarded to try again",
Expand All @@ -436,54 +435,50 @@ pub static STATE_SYNC_DISCARD_PARTS: Lazy<near_o11y::metrics::IntCounterVec> = L
.unwrap()
});

pub static STATE_SYNC_EXTERNAL_PARTS_DONE: Lazy<near_o11y::metrics::IntCounterVec> =
Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_external_parts_done_total",
"Number of parts successfully retrieved from an external storage",
&["shard_id"],
)
.unwrap()
});

pub static STATE_SYNC_EXTERNAL_PARTS_FAILED: Lazy<near_o11y::metrics::IntCounterVec> =
Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_external_parts_failed_total",
"Number of parts failed attempts to retrieve parts from an external storage",
&["shard_id"],
)
.unwrap()
});
pub(crate) static STATE_SYNC_EXTERNAL_PARTS_DONE: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_external_parts_done_total",
"Number of parts retrieved from external storage",
&["shard_id"],
)
.unwrap()
});

pub static STATE_SYNC_EXTERNAL_PARTS_SCHEDULING_DELAY: Lazy<near_o11y::metrics::HistogramVec> =
Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_external_parts_scheduling_delay_sec",
"Delay for a request for parts from an external storage",
&["shard_id"],
Some(exponential_buckets(0.001, 2.0, 20).unwrap()),
)
.unwrap()
});
pub(crate) static STATE_SYNC_EXTERNAL_PARTS_FAILED: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_external_parts_failed_total",
"Failed retrieval attempts from external storage",
&["shard_id"],
)
.unwrap()
});

pub static STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY: Lazy<near_o11y::metrics::HistogramVec> =
Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_external_parts_request_delay_sec",
"Latency of state part requests to external storage",
&["shard_id"],
Some(exponential_buckets(0.001, 2.0, 20).unwrap()),
)
.unwrap()
});
pub(crate) static STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_external_parts_request_delay_sec",
"Latency of state part requests to external storage",
&["shard_id"],
Some(exponential_buckets(0.001, 2.0, 20).unwrap()),
)
.unwrap()
});

pub static STATE_SYNC_EXTERNAL_PARTS_SIZE_DOWNLOADED: Lazy<near_o11y::metrics::IntCounterVec> =
pub(crate) static STATE_SYNC_EXTERNAL_PARTS_SIZE_DOWNLOADED: Lazy<IntCounterVec> =
Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_external_parts_size_downloaded_bytes_total",
"Amount of bytes downloaded from an external storage when requesting state parts for a shard",
"Bytes downloaded from an external storage",
&["shard_id"],
)
.unwrap()
.unwrap()
});

pub(crate) static STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_dump_put_object_elapsed_sec",
"Latency of writes to external storage",
&["shard_id"],
Some(exponential_buckets(0.001, 1.6, 25).unwrap()),
)
.unwrap()
});
Loading