Skip to content

Commit

Permalink
[indexer][api] update the metrcis for api gateway consumption. (#10322)
Browse files Browse the repository at this point in the history
  • Loading branch information
larry-aptos committed Oct 23, 2023
1 parent 32c0ede commit 85be081
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 19 deletions.
27 changes: 19 additions & 8 deletions ecosystem/indexer-grpc/indexer-grpc-data-service/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@ use once_cell::sync::Lazy;
/// Latest processed transaction version.
pub static LATEST_PROCESSED_VERSION: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"indexer_grpc_data_service_latest_processed_version",
"indexer_grpc_data_service_with_user_latest_processed_version",
"Latest processed transaction version",
&["request_token", "processor_name"],
&["request_token", "email"],
)
.unwrap()
});

/// Number of transactions that served by data service.
pub static PROCESSED_VERSIONS_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"indexer_grpc_data_service_processed_versions",
"indexer_grpc_data_service_with_user_processed_versions",
"Number of transactions that have been processed by data service",
&["request_token", "processor_name"],
&["request_token", "email"],
)
.unwrap()
});
Expand All @@ -40,9 +40,9 @@ pub static ERROR_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
/// Data latency for data service based on latest processed transaction based on selected processor.
pub static PROCESSED_LATENCY_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
"indexer_grpc_data_service_latest_data_latency_in_secs",
"indexer_grpc_data_service_with_user_latest_data_latency_in_secs",
"Latency of data service based on latest processed transaction",
&["request_token", "processor_name"],
&["request_token", "email"],
)
.unwrap()
});
Expand All @@ -60,9 +60,9 @@ pub static PROCESSED_LATENCY_IN_SECS_ALL: Lazy<HistogramVec> = Lazy::new(|| {
/// Number of transactions in each batch that data service has processed.
pub static PROCESSED_BATCH_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"indexer_grpc_data_service_processed_batch_size",
"indexer_grpc_data_service_with_user_processed_batch_size",
"Size of latest processed batch by data service",
&["request_token", "processor_name"],
&["request_token", "email"],
)
.unwrap()
});
Expand All @@ -84,3 +84,14 @@ pub static SHORT_CONNECTION_COUNT: Lazy<IntCounter> = Lazy::new(|| {
)
.unwrap()
});

/// Count of bytes transfered to the client. This only represents the bytes prepared and ready
/// to send to the client. It does not represent the bytes actually sent to the client.
pub static BYTES_READY_TO_TRANSFER_FROM_SERVER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"indexer_grpc_data_service_bytes_ready_to_transfer_from_server",
"Count of bytes ready to transfer to the client",
&["request_token", "email"],
)
.unwrap()
});
35 changes: 24 additions & 11 deletions ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
// SPDX-License-Identifier: Apache-2.0

use crate::metrics::{
CONNECTION_COUNT, ERROR_COUNT, LATEST_PROCESSED_VERSION, PROCESSED_BATCH_SIZE,
PROCESSED_LATENCY_IN_SECS, PROCESSED_LATENCY_IN_SECS_ALL, PROCESSED_VERSIONS_COUNT,
SHORT_CONNECTION_COUNT,
BYTES_READY_TO_TRANSFER_FROM_SERVER, CONNECTION_COUNT, ERROR_COUNT, LATEST_PROCESSED_VERSION,
PROCESSED_BATCH_SIZE, PROCESSED_LATENCY_IN_SECS, PROCESSED_LATENCY_IN_SECS_ALL,
PROCESSED_VERSIONS_COUNT, SHORT_CONNECTION_COUNT,
};
use anyhow::Context;
use aptos_indexer_grpc_utils::{
Expand Down Expand Up @@ -261,6 +261,19 @@ impl RawData for RawDataServerWrapper {
transactions_count = Some(count - transaction_data.len() as u64);
}
};
// Note: this is not the actual bytes transferred to the client.
// This is the bytes consumed internally by the server
// and ready to be transferred to the client.
let bytes_ready_to_transfer = transaction_data
.iter()
.map(|(encoded, _)| encoded.len())
.sum::<usize>();
BYTES_READY_TO_TRANSFER_FROM_SERVER
.with_label_values(&[
request_metadata.request_token.as_str(),
request_metadata.request_email.as_str(),
])
.inc_by(bytes_ready_to_transfer as u64);
// 2. Push the data to the response channel, i.e. stream the data to the client.
let current_batch_size = transaction_data.as_slice().len();
let end_of_batch_version = transaction_data.as_slice().last().unwrap().1;
Expand All @@ -280,20 +293,20 @@ impl RawData for RawDataServerWrapper {
Ok(_) => {
PROCESSED_BATCH_SIZE
.with_label_values(&[
request_metadata.request_user_classification.as_str(),
request_metadata.request_name.as_str(),
request_metadata.request_token.as_str(),
request_metadata.request_email.as_str(),
])
.set(current_batch_size as i64);
LATEST_PROCESSED_VERSION
.with_label_values(&[
request_metadata.request_user_classification.as_str(),
request_metadata.request_name.as_str(),
request_metadata.request_token.as_str(),
request_metadata.request_email.as_str(),
])
.set(end_of_batch_version as i64);
PROCESSED_VERSIONS_COUNT
.with_label_values(&[
request_metadata.request_user_classification.as_str(),
request_metadata.request_name.as_str(),
request_metadata.request_token.as_str(),
request_metadata.request_email.as_str(),
])
.inc_by(current_batch_size as u64);
if let Some(data_latency_in_secs) = data_latency_in_secs {
Expand All @@ -302,8 +315,8 @@ impl RawData for RawDataServerWrapper {
if current_batch_size % BLOB_STORAGE_SIZE != 0 {
PROCESSED_LATENCY_IN_SECS
.with_label_values(&[
request_metadata.request_user_classification.as_str(),
request_metadata.request_name.as_str(),
request_metadata.request_token.as_str(),
request_metadata.request_email.as_str(),
])
.set(data_latency_in_secs);
PROCESSED_LATENCY_IN_SECS_ALL
Expand Down

0 comments on commit 85be081

Please sign in to comment.