Skip to content

Commit

Permalink
Merge branch 'main' into metrics_count_warn_info
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson authored Jun 18, 2024
2 parents a3a79d3 + 972fa9e commit 22e6bd1
Show file tree
Hide file tree
Showing 15 changed files with 265 additions and 56 deletions.
18 changes: 13 additions & 5 deletions storage/aptosdb/src/backup/backup_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,26 @@ impl BackupHandler {
Ok((accumulator_proof, ledger_info))
}

/// Gets an iterator which can yield all accounts in the state tree.
pub fn get_account_iter(
/// Gets the number of items in a state snapshot.
pub fn get_state_item_count(&self, version: Version) -> Result<usize> {
self.state_store.get_value_count(version)
}

/// Iterate through items in a state snapshot
pub fn get_state_item_iter(
&self,
version: Version,
) -> Result<Box<dyn Iterator<Item = Result<(StateKey, StateValue)>> + Send + Sync>> {
start_idx: usize,
limit: usize,
) -> Result<impl Iterator<Item = Result<(StateKey, StateValue)>> + Send> {
let iterator = self
.state_store
.get_state_key_and_value_iter(version, HashValue::zero())?
.get_state_key_and_value_iter(version, start_idx)?
.take(limit)
.enumerate()
.map(move |(idx, res)| {
BACKUP_STATE_SNAPSHOT_VERSION.set(version as i64);
BACKUP_STATE_SNAPSHOT_LEAF_IDX.set(idx as i64);
BACKUP_STATE_SNAPSHOT_LEAF_IDX.set((start_idx + idx) as i64);
res
});
Ok(Box::new(iterator))
Expand Down
10 changes: 10 additions & 0 deletions storage/aptosdb/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,13 @@ pub(crate) static BACKUP_STATE_SNAPSHOT_LEAF_IDX: Lazy<IntGauge> = Lazy::new(||
)
.unwrap()
});

pub static BACKUP_TIMER: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"aptos_backup_handler_timers_seconds",
"Various timers for performance analysis.",
&["name"],
exponential_buckets(/*start=*/ 1e-6, /*factor=*/ 2.0, /*count=*/ 32).unwrap(),
)
.unwrap()
});
6 changes: 3 additions & 3 deletions storage/aptosdb/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -973,13 +973,13 @@ impl StateStore {
pub fn get_state_key_and_value_iter(
self: &Arc<Self>,
version: Version,
start_hashed_key: HashValue,
start_idx: usize,
) -> Result<impl Iterator<Item = Result<(StateKey, StateValue)>> + Send + Sync> {
let store = Arc::clone(self);
Ok(JellyfishMerkleIterator::new(
Ok(JellyfishMerkleIterator::new_by_index(
Arc::clone(&self.state_merkle_db),
version,
start_hashed_key,
start_idx,
)?
.map(|it| it.map_err(Into::into))
.map(move |res| match res {
Expand Down
4 changes: 2 additions & 2 deletions storage/aptosdb/src/state_store/state_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ proptest! {
for i in 0..kvs.len() {
let actual_values = db
.get_backup_handler()
.get_account_iter(i as Version)
.get_state_item_iter(i as Version, 0, usize::MAX)
.unwrap()
.collect::<Result<Vec<_>>>()
.unwrap();
Expand Down Expand Up @@ -531,7 +531,7 @@ proptest! {
let last_version = next_version - 1;
let snapshot = db
.get_backup_handler()
.get_account_iter(last_version)
.get_state_item_iter(last_version, 0, usize::MAX)
.unwrap()
.collect::<Result<Vec<_>>>()
.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ fn end_to_end() {
},
GlobalBackupOpt {
max_chunk_size: 1024,
concurrent_data_requests: 2,
},
client,
Arc::clone(&store),
Expand Down Expand Up @@ -198,6 +199,7 @@ async fn test_trusted_waypoints_impl(
},
GlobalBackupOpt {
max_chunk_size: 1024,
concurrent_data_requests: 2,
},
client.clone(),
Arc::clone(&store),
Expand Down
131 changes: 112 additions & 19 deletions storage/backup/backup-cli/src/backup_types/state_snapshot/backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ use aptos_types::{
};
use bytes::{BufMut, Bytes, BytesMut};
use clap::Parser;
use futures::TryStreamExt;
use futures::{StreamExt, TryStream, TryStreamExt};
use once_cell::sync::Lazy;
use std::{convert::TryInto, str::FromStr, sync::Arc, time::Instant};
use tokio::io::{AsyncRead, AsyncWriteExt};
use tokio::{io::AsyncWriteExt, sync::mpsc::Sender};
use tokio_stream::wrappers::ReceiverStream;

#[derive(Parser)]
pub struct StateSnapshotBackupOpt {
Expand All @@ -46,8 +47,8 @@ struct Chunk {
last_idx: usize,
}

struct ChunkerState<R> {
state_snapshot_file: Option<R>,
struct ChunkerState<RecordStream> {
record_stream: Option<RecordStream>,
buf: BytesMut,
chunk_first_key: HashValue,
prev_record_len: usize,
Expand All @@ -56,10 +57,13 @@ struct ChunkerState<R> {
max_chunk_size: usize,
}

impl<R: AsyncRead + Send + Unpin> ChunkerState<R> {
async fn new(mut state_snapshot_file: R, max_chunk_size: usize) -> Result<Self> {
let first_record = state_snapshot_file
.read_record_bytes()
impl<RecordStream> ChunkerState<RecordStream>
where
RecordStream: TryStream<Ok = Bytes, Error = anyhow::Error> + Unpin,
{
async fn new(mut record_stream: RecordStream, max_chunk_size: usize) -> Result<Self> {
let first_record = record_stream
.try_next()
.await?
.ok_or_else(|| anyhow!("State is empty."))?;

Expand All @@ -71,7 +75,7 @@ impl<R: AsyncRead + Send + Unpin> ChunkerState<R> {
buf.extend(first_record);

Ok(Self {
state_snapshot_file: Some(state_snapshot_file),
record_stream: Some(record_stream),
buf,
chunk_first_key,
prev_record_len,
Expand All @@ -85,11 +89,11 @@ impl<R: AsyncRead + Send + Unpin> ChunkerState<R> {
let _timer = BACKUP_TIMER.timer_with(&["state_snapshot_next_full_chunk"]);

let input = self
.state_snapshot_file
.record_stream
.as_mut()
.expect("get_next_full_chunk after EOF.");

while let Some(record_bytes) = input.read_record_bytes().await? {
while let Some(record_bytes) = input.try_next().await? {
let _timer = BACKUP_TIMER.timer_with(&["state_snapshot_process_records"]);

// If buf + current_record exceeds max_chunk_size, dump current buf to a new chunk
Expand Down Expand Up @@ -122,20 +126,19 @@ impl<R: AsyncRead + Send + Unpin> ChunkerState<R> {

// Return the full chunk if found
if let Some(chunk) = chunk_cut_opt {
// FIXME(aldenhu): add logging, maybe not here
return Ok(Some(chunk));
}
}

// Input file ended, full chunk not found.
// The call site will call get_last_chunk which consume ChunkerState
let _ = self.state_snapshot_file.take();
let _ = self.record_stream.take();
Ok(None)
}

async fn last_chunk(self) -> Result<Chunk> {
let Self {
state_snapshot_file,
record_stream: state_snapshot_file,
buf,
chunk_first_key,
prev_record_len,
Expand Down Expand Up @@ -171,10 +174,13 @@ struct Chunker<R> {
state: Option<ChunkerState<R>>,
}

impl<R: AsyncRead + Send + Unpin> Chunker<R> {
async fn new(state_snapshot_file: R, max_chunk_size: usize) -> Result<Self> {
impl<RecordStream> Chunker<RecordStream>
where
RecordStream: TryStream<Ok = Bytes, Error = anyhow::Error> + Unpin,
{
async fn new(record_stream: RecordStream, max_chunk_size: usize) -> Result<Self> {
Ok(Self {
state: Some(ChunkerState::new(state_snapshot_file, max_chunk_size).await?),
state: Some(ChunkerState::new(record_stream, max_chunk_size).await?),
})
}

Expand All @@ -197,6 +203,7 @@ pub struct StateSnapshotBackupController {
max_chunk_size: usize,
client: Arc<BackupServiceClient>,
storage: Arc<dyn BackupStorage>,
concurrent_data_requests: usize,
}

impl StateSnapshotBackupController {
Expand All @@ -212,6 +219,7 @@ impl StateSnapshotBackupController {
max_chunk_size: global_opt.max_chunk_size,
client,
storage,
concurrent_data_requests: global_opt.concurrent_data_requests,
}
}

Expand All @@ -232,8 +240,8 @@ impl StateSnapshotBackupController {
.create_backup_with_random_suffix(&self.backup_name())
.await?;

let state_snapshot_file = self.client.get_state_snapshot(self.version()).await?;
let chunker = Chunker::new(state_snapshot_file, self.max_chunk_size).await?;
let record_stream = Box::pin(self.record_stream(self.concurrent_data_requests).await?);
let chunker = Chunker::new(record_stream, self.max_chunk_size).await?;

let start = Instant::now();
let chunk_stream = futures::stream::try_unfold(chunker, |mut chunker| async {
Expand All @@ -260,6 +268,91 @@ impl StateSnapshotBackupController {

self.write_manifest(&backup_handle, chunks).await
}

async fn record_stream(
&self,
concurrency: usize,
) -> Result<impl TryStream<Ok = Bytes, Error = anyhow::Error, Item = Result<Bytes>>> {
const CHUNK_SIZE: usize = if cfg!(test) { 100_000 } else { 2 };

let count = self.client.get_state_item_count(self.version()).await?;
let version = self.version();
let client = self.client.clone();

let chunks_stream = futures::stream::unfold(0, move |start_idx| async move {
if start_idx >= count {
return None;
}

let next_start_idx = start_idx + CHUNK_SIZE;
let chunk_size = CHUNK_SIZE.min(count - start_idx);

Some(((start_idx, chunk_size), next_start_idx))
})
.map(Result::<_>::Ok);

let record_stream_stream = chunks_stream.map_ok(move |(start_idx, chunk_size)| {
let client = client.clone();
async move {
let (tx, rx) = tokio::sync::mpsc::channel(chunk_size);
// spawn and forget, propagate error through channel
let _join_handle = tokio::spawn(send_records(
client.clone(),
version,
start_idx,
chunk_size,
tx,
));

Ok(ReceiverStream::new(rx))
}
});

Ok(record_stream_stream
.try_buffered_x(concurrency * 2, concurrency)
.try_flatten())
}
}

async fn send_records(
client: Arc<BackupServiceClient>,
version: Version,
start_idx: usize,
chunk_size: usize,
sender: Sender<Result<Bytes>>,
) {
if let Err(err) = send_records_inner(client, version, start_idx, chunk_size, &sender).await {
let _ = sender.send(Err(err)).await;
}
}

async fn send_records_inner(
client: Arc<BackupServiceClient>,
version: Version,
start_idx: usize,
chunk_size: usize,
sender: &Sender<Result<Bytes>>,
) -> Result<()> {
let _timer = BACKUP_TIMER.timer_with(&["state_snapshot_record_stream_all"]);
let mut input = client
.get_state_snapshot_chunk(version, start_idx, chunk_size)
.await?;
let mut count = 0;
while let Some(record_bytes) = {
let _timer = BACKUP_TIMER.timer_with(&["state_snapshot_read_record_bytes"]);
input.read_record_bytes().await?
} {
let _timer = BACKUP_TIMER.timer_with(&["state_snapshot_record_stream_send_bytes"]);
count += 1;
sender.send(Ok(record_bytes)).await?;
}
ensure!(
count == chunk_size,
"expecting {} records, got {}",
chunk_size,
count
);
Ok(())
}

impl StateSnapshotBackupController {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ fn end_to_end() {
StateSnapshotBackupOpt { epoch },
GlobalBackupOpt {
max_chunk_size: 500,
concurrent_data_requests: 2,
},
client,
Arc::clone(&store),
Expand Down
1 change: 1 addition & 0 deletions storage/backup/backup-cli/src/backup_types/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ fn test_end_to_end_impl(d: TestData) {
// Backup
let global_backup_opt = GlobalBackupOpt {
max_chunk_size: 2048,
concurrent_data_requests: 2,
};
let state_snapshot_manifest = d.state_snapshot_epoch.map(|epoch| {
rt.block_on(
Expand Down
10 changes: 8 additions & 2 deletions storage/backup/backup-cli/src/backup_types/transaction/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ fn end_to_end() {
start_version: 0,
num_transactions: first_ver_to_backup as usize,
},
GlobalBackupOpt { max_chunk_size },
GlobalBackupOpt {
max_chunk_size,
concurrent_data_requests: 2,
},
client.clone(),
Arc::clone(&store),
)
Expand All @@ -83,7 +86,10 @@ fn end_to_end() {
start_version: first_ver_to_backup,
num_transactions: num_txns_to_backup,
},
GlobalBackupOpt { max_chunk_size },
GlobalBackupOpt {
max_chunk_size,
concurrent_data_requests: 2,
},
client,
Arc::clone(&store),
)
Expand Down
10 changes: 10 additions & 0 deletions storage/backup/backup-cli/src/metrics/backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

use aptos_metrics_core::{register_int_counter_vec, IntCounterVec};
use aptos_push_metrics::{
exponential_buckets, register_histogram_vec, register_int_gauge, HistogramVec, IntGauge,
};
Expand Down Expand Up @@ -56,3 +57,12 @@ pub static BACKUP_TIMER: Lazy<HistogramVec> = Lazy::new(|| {
)
.unwrap()
});

pub static THROUGHPUT_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_db_backup_received_bytes",
"Backup controller throughput in bytes.",
&["endpoint"]
)
.unwrap()
});
Loading

0 comments on commit 22e6bd1

Please sign in to comment.