Skip to content

Commit

Permalink
[State Sync] Add metrics for data stream chunk sizes.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Sep 20, 2023
1 parent 0abe6e0 commit f3eaf0c
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 3 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ members = [
"secure/storage",
"secure/storage/vault",
"state-sync/aptos-data-client",
"state-sync/data-streaming-service",
"state-sync/inter-component/consensus-notifications",
"state-sync/inter-component/event-notifications",
"state-sync/inter-component/mempool-notifications",
"state-sync/inter-component/storage-service-notifications",
"state-sync/data-streaming-service",
"state-sync/state-sync-driver",
"state-sync/storage-service/client",
"state-sync/storage-service/server",
Expand Down
30 changes: 30 additions & 0 deletions state-sync/aptos-data-client/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ pub enum ResponsePayload {
}

impl ResponsePayload {
/// Returns a label for the response payload. This is useful
/// for logging and metrics.
pub fn get_label(&self) -> &'static str {
match self {
Self::EpochEndingLedgerInfos(_) => "epoch_ending_ledger_infos",
Expand All @@ -272,6 +274,34 @@ impl ResponsePayload {
Self::TransactionsWithProof(_) => "transactions_with_proof",
}
}

/// Returns the chunk size of the response payload (i.e., the
/// number of data items held in the response).
pub fn get_data_chunk_size(&self) -> usize {
match self {
Self::EpochEndingLedgerInfos(epoch_ending_ledger_infos) => {
epoch_ending_ledger_infos.len()
},
Self::NewTransactionOutputsWithProof((outputs_with_proof, _)) => {
outputs_with_proof.transactions_and_outputs.len()
},
Self::NewTransactionsWithProof((transactions_with_proof, _)) => {
transactions_with_proof.transactions.len()
},
Self::NumberOfStates(_) => {
1 // The number of states is a single u64
},
Self::StateValuesWithProof(state_values_with_proof) => {
state_values_with_proof.raw_values.len()
},
Self::TransactionOutputsWithProof(outputs_with_proof) => {
outputs_with_proof.transactions_and_outputs.len()
},
Self::TransactionsWithProof(transactions_with_proof) => {
transactions_with_proof.transactions.len()
},
}
}
}

impl From<StateValueChunkWithProof> for ResponsePayload {
Expand Down
29 changes: 29 additions & 0 deletions state-sync/data-streaming-service/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ use once_cell::sync::Lazy;
// Subscription stream termination labels
pub const MAX_CONSECUTIVE_REQUESTS_LABEL: &str = "max_consecutive_requests";

// Histogram buckets for tracking chunk sizes of data responses
const DATA_RESPONSE_CHUNK_SIZE_BUCKETS: &[f64] = &[
1.0, 2.0, 4.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0, 5000.0,
7500.0, 10_000.0, 12_500.0, 15_000.0, 17_500.0, 20_000.0, 25_000.0, 30_000.0, 35_000.0,
40_000.0, 45_000.0, 50_000.0, 75_000.0, 100_000.0,
];

// Latency buckets for network latencies (i.e., the defaults only go up
// to 10 seconds, but we usually require more).
const NETWORK_LATENCY_BUCKETS: [f64; 14] = [
Expand Down Expand Up @@ -144,6 +151,16 @@ pub static RECEIVED_DATA_RESPONSE: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});

/// Counter for tracking the sizes of received data chunks
pub static RECEIVED_DATA_RESPONSE_CHUNK_SIZE: Lazy<HistogramVec> = Lazy::new(|| {
let histogram_opts = histogram_opts!(
"aptos_data_streaming_service_received_data_chunk_sizes",
"Counter for tracking sizes of data chunks received by the data stream",
DATA_RESPONSE_CHUNK_SIZE_BUCKETS.to_vec()
);
register_histogram_vec!(histogram_opts, &["request_type", "response_type"]).unwrap()
});

/// Counter for tracking received data responses
pub static RECEIVED_RESPONSE_ERROR: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
Expand Down Expand Up @@ -180,6 +197,18 @@ pub fn increment_counter_multiple_labels(
.inc();
}

/// Adds a new observation for the given histogram, labels and value
pub fn observe_value(
histogram: &Lazy<HistogramVec>,
first_label: &str,
second_label: &str,
value: u64,
) {
histogram
.with_label_values(&[first_label, second_label])
.observe(value as f64);
}

/// Sets the number of active data streams
pub fn set_active_data_streams(value: usize) {
ACTIVE_DATA_STREAMS.set(value as i64);
Expand Down
27 changes: 27 additions & 0 deletions state-sync/data-streaming-service/src/stream_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,10 @@ impl DataStreamEngine for StateStreamEngine {
client_response_payload: ResponsePayload,
notification_id_generator: Arc<U64IdGenerator>,
) -> Result<Option<DataNotification>, Error> {
// Update the metrics for the number of received items
update_response_chunk_size_metrics(client_request, &client_response_payload);

// Handle and transform the response
match client_request {
StateValuesWithProof(request) => {
// Verify the client request indices
Expand Down Expand Up @@ -1250,6 +1254,9 @@ impl DataStreamEngine for ContinuousTransactionStreamEngine {
self.optimistic_fetch_requested = false;
}

// Update the metrics for the number of received items
update_response_chunk_size_metrics(client_request, &client_response_payload);

// Handle and transform the response
match client_request {
EpochEndingLedgerInfos(_) => {
Expand Down Expand Up @@ -1481,6 +1488,10 @@ impl DataStreamEngine for EpochEndingStreamEngine {
client_response_payload: ResponsePayload,
notification_id_generator: Arc<U64IdGenerator>,
) -> Result<Option<DataNotification>, Error> {
// Update the metrics for the number of received items
update_response_chunk_size_metrics(client_request, &client_response_payload);

// Handle and transform the response
match client_request {
EpochEndingLedgerInfos(request) => {
// Verify the client request indices
Expand Down Expand Up @@ -1755,6 +1766,9 @@ impl DataStreamEngine for TransactionStreamEngine {
client_response_payload: ResponsePayload,
notification_id_generator: Arc<U64IdGenerator>,
) -> Result<Option<DataNotification>, Error> {
// Update the metrics for the number of received items
update_response_chunk_size_metrics(client_request, &client_response_payload);

// Identify the version information of the stream and client requests
let (stream_end_version, request_start_version, request_end_version) = match &self.request {
StreamRequest::GetAllTransactions(stream_request) => match client_request {
Expand Down Expand Up @@ -2165,6 +2179,19 @@ fn extract_new_versions_and_target(
Ok((num_versions, target_ledger_info))
}

/// Updates the response chunk size metrics for the given request and response
fn update_response_chunk_size_metrics(
client_request: &DataClientRequest,
client_response_payload: &ResponsePayload,
) {
metrics::observe_value(
&metrics::RECEIVED_DATA_RESPONSE_CHUNK_SIZE,
client_request.get_label(),
client_response_payload.get_label(),
client_response_payload.get_data_chunk_size() as u64,
);
}

/// Updates the metrics with a terminated subscription event and reason
fn update_terminated_subscription_metrics(termination_reason: &str) {
metrics::increment_counter(&metrics::TERMINATE_SUBSCRIPTION_STREAM, termination_reason);
Expand Down
5 changes: 3 additions & 2 deletions state-sync/state-sync-driver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ impl StorageSynchronizerOperations {

/// Histogram buckets for tracking chunk sizes
const CHUNK_SIZE_BUCKETS: &[f64] = &[
1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0, 2048.0, 4096.0, 8192.0,
16384.0,
1.0, 2.0, 4.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0, 5000.0,
7500.0, 10_000.0, 12_500.0, 15_000.0, 17_500.0, 20_000.0, 25_000.0, 30_000.0, 35_000.0,
40_000.0, 45_000.0, 50_000.0, 75_000.0, 100_000.0,
];

/// Counter for state sync bootstrapper errors
Expand Down

0 comments on commit f3eaf0c

Please sign in to comment.