Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(state-sync): Download state header from external storage #10515

Merged
merged 9 commits into from
Jan 30, 2024
9 changes: 9 additions & 0 deletions chain/client-primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,15 @@ impl ShardSyncDownload {
}
Self { downloads, status: ShardSyncStatus::StateDownloadParts }
}

/// Get the header download status.
/// returns None if state sync status is not ShardSyncStatus::StateDownloadHeader
pub fn get_header_download_mut(&mut self) -> Option<&mut DownloadStatus> {
if self.status != ShardSyncStatus::StateDownloadHeader {
return None;
}
self.downloads.get_mut(0)
}
}

pub fn format_shard_sync_phase_per_shard(
Expand Down
8 changes: 4 additions & 4 deletions chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ pub(crate) static STATE_SYNC_EXTERNAL_PARTS_DONE: Lazy<IntCounterVec> = Lazy::ne
try_create_int_counter_vec(
"near_state_sync_external_parts_done_total",
"Number of parts retrieved from external storage",
&["shard_id"],
&["shard_id", "type"],
VanBarbascu marked this conversation as resolved.
Show resolved Hide resolved
)
.unwrap()
});
Expand All @@ -492,7 +492,7 @@ pub(crate) static STATE_SYNC_EXTERNAL_PARTS_FAILED: Lazy<IntCounterVec> = Lazy::
try_create_int_counter_vec(
"near_state_sync_external_parts_failed_total",
"Failed retrieval attempts from external storage",
&["shard_id"],
&["shard_id", "type"],
)
.unwrap()
});
Expand All @@ -501,7 +501,7 @@ pub(crate) static STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY: Lazy<HistogramVec> =
try_create_histogram_vec(
"near_state_sync_external_parts_request_delay_sec",
"Latency of state part requests to external storage",
&["shard_id"],
&["shard_id", "type"],
Some(exponential_buckets(0.001, 2.0, 20).unwrap()),
)
.unwrap()
Expand All @@ -512,7 +512,7 @@ pub(crate) static STATE_SYNC_EXTERNAL_PARTS_SIZE_DOWNLOADED: Lazy<IntCounterVec>
try_create_int_counter_vec(
"near_state_sync_external_parts_size_downloaded_bytes_total",
"Bytes downloaded from an external storage",
&["shard_id"],
&["shard_id", "type"],
)
.unwrap()
});
Expand Down
30 changes: 20 additions & 10 deletions chain/client/src/sync/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum StateFileType {
StatePart { part_id: u64, num_parts: u64 },
StateHeader,
Expand All @@ -16,13 +16,21 @@ pub enum StateFileType {
impl ToString for StateFileType {
fn to_string(&self) -> String {
match self {
StateFileType::StatePart { .. } => String::from("part"),
StateFileType::StateHeader => String::from("header"),
StateFileType::StatePart { .. } => StateFileType::part_str(),
StateFileType::StateHeader => StateFileType::header_str(),
}
}
}

impl StateFileType {
pub fn part_str() -> String {
String::from("part")
}

pub fn header_str() -> String {
String::from("header")
}

pub fn filename(&self) -> String {
match self {
StateFileType::StatePart { part_id, num_parts } => {
Expand Down Expand Up @@ -56,13 +64,14 @@ const GCS_ENCODE_SET: &percent_encoding::AsciiSet =
&percent_encoding::NON_ALPHANUMERIC.remove(b'-').remove(b'.').remove(b'_');

impl ExternalConnection {
pub async fn get_part(
pub async fn get_file(
&self,
shard_id: ShardId,
location: &str,
file_type: &StateFileType,
) -> Result<Vec<u8>, anyhow::Error> {
let _timer = metrics::STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY
.with_label_values(&[&shard_id.to_string()])
.with_label_values(&[&shard_id.to_string(), &file_type.to_string()])
.start_timer();
match self {
ExternalConnection::S3 { bucket } => {
Expand Down Expand Up @@ -414,7 +423,7 @@ mod test {
// Directory resembles real usecase.
let dir = "test_folder/chain_id=test/epoch_height=1/epoch_id=test/shard_id=0".to_string();
let full_filename = format!("{}/{}", dir, filename);
let file_type = StateFileType::StatePart { part_id: 1, num_parts: 1 };
let file_type = StateFileType::StatePart { part_id: 0, num_parts: 1 };

// Before uploading we shouldn't see filename in the list of files.
let files = rt.block_on(async { connection.list_objects(0, &dir).await.unwrap() });
Expand All @@ -423,7 +432,7 @@ mod test {

// Uploading the file.
rt.block_on(async {
connection.put_file(file_type, &data, 0, &full_filename).await.unwrap()
connection.put_file(file_type.clone(), &data, 0, &full_filename).await.unwrap()
});

// After uploading we should see filename in the list of files.
Expand All @@ -432,15 +441,16 @@ mod test {
assert_eq!(files.into_iter().filter(|x| *x == filename).collect::<Vec<String>>().len(), 1);

// And the data should match generates data.
let download_data =
rt.block_on(async { connection.get_part(0, &full_filename).await.unwrap() });
let download_data = rt
.block_on(async { connection.get_file(0, &full_filename, &file_type).await.unwrap() });
assert_eq!(download_data, data);

// Also try to download some data at nonexistent location and expect to fail.
let filename = random_string(8);
let full_filename = format!("{}/{}", dir, filename);

let download_data = rt.block_on(async { connection.get_part(0, &full_filename).await });
let download_data =
rt.block_on(async { connection.get_file(0, &full_filename, &file_type).await });
assert!(download_data.is_err(), "{:?}", download_data);
}
}
Loading
Loading