Skip to content

Commit

Permalink
[Data Service] Push metrics parameterized by identifier, not API key …
Browse files Browse the repository at this point in the history
…name + email (#12547)
  • Loading branch information
banool authored and larry-aptos committed Apr 17, 2024
1 parent e7663c3 commit a9fd65c
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 48 deletions.
19 changes: 11 additions & 8 deletions ecosystem/indexer-grpc/indexer-grpc-data-service/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ use aptos_metrics_core::{
};
use once_cell::sync::Lazy;

// The `identifier` label at the time of writing (2024-04-08) is always the
// application ID, a globally unique ID.

/// Latest processed transaction version.
pub static LATEST_PROCESSED_VERSION: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"indexer_grpc_data_service_with_user_latest_processed_version",
"Latest processed transaction version",
&["request_token", "email", "processor"],
&["identifier", "processor"],
)
.unwrap()
});
Expand All @@ -22,7 +25,7 @@ pub static PROCESSED_VERSIONS_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"indexer_grpc_data_service_with_user_processed_versions",
"Number of transactions that have been processed by data service",
&["request_token", "email", "processor"],
&["identifier", "processor"],
)
.unwrap()
});
Expand All @@ -42,7 +45,7 @@ pub static PROCESSED_LATENCY_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
"indexer_grpc_data_service_with_user_latest_data_latency_in_secs",
"Latency of data service based on latest processed transaction",
&["request_token", "email", "processor"],
&["identifier", "processor"],
)
.unwrap()
});
Expand All @@ -52,7 +55,7 @@ pub static PROCESSED_LATENCY_IN_SECS_ALL: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"indexer_grpc_data_service_latest_data_latency_in_secs_all",
"Latency of data service based on latest processed transaction",
&["request_token"]
&[]
)
.unwrap()
});
Expand All @@ -62,7 +65,7 @@ pub static PROCESSED_BATCH_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"indexer_grpc_data_service_with_user_processed_batch_size",
"Size of latest processed batch by data service",
&["request_token", "email", "processor"],
&["identifier", "processor"],
)
.unwrap()
});
Expand All @@ -72,7 +75,7 @@ pub static CONNECTION_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"indexer_grpc_data_service_connection_count_v2",
"Count of connections that data service has established",
&["request_token", "email", "processor"],
&["identifier", "processor"],
)
.unwrap()
});
Expand All @@ -82,7 +85,7 @@ pub static SHORT_CONNECTION_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"indexer_grpc_data_service_short_connection_by_user_processor_count",
"Count of the short connections; i.e., < 10 seconds",
&["request_token", "email", "processor"],
&["identifier", "processor"],
)
.unwrap()
});
Expand All @@ -93,7 +96,7 @@ pub static BYTES_READY_TO_TRANSFER_FROM_SERVER: Lazy<IntCounterVec> = Lazy::new(
register_int_counter_vec!(
"indexer_grpc_data_service_bytes_ready_to_transfer_from_server",
"Count of bytes ready to transfer to the client",
&["request_token", "email", "processor"],
&["identifier", "processor"],
)
.unwrap()
});
47 changes: 14 additions & 33 deletions ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ const RESPONSE_CHANNEL_SEND_TIMEOUT: Duration = Duration::from_secs(120);

const SHORT_CONNECTION_DURATION_IN_SECS: u64 = 10;

const REQUEST_HEADER_APTOS_EMAIL_HEADER: &str = "x-aptos-email";
const REQUEST_HEADER_APTOS_USER_CLASSIFICATION_HEADER: &str = "x-aptos-user-classification";
const REQUEST_HEADER_APTOS_API_KEY_NAME: &str = "x-aptos-api-key-name";
/// This comes from API Gateway. The identifier uniquely identifies the requester, which
/// in the case of indexer-grpc is always an application.
const REQUEST_HEADER_APTOS_IDENTIFIER: &str = "x-aptos-identifier";
const RESPONSE_HEADER_APTOS_CONNECTION_ID_HEADER: &str = "x-aptos-connection-id";
const SERVICE_TYPE: &str = "data_service";

Expand Down Expand Up @@ -141,8 +141,7 @@ impl RawData for RawDataServerWrapper {
};
CONNECTION_COUNT
.with_label_values(&[
&request_metadata.request_api_key_name,
&request_metadata.request_email,
&request_metadata.request_identifier,
&request_metadata.processor_name,
])
.inc();
Expand Down Expand Up @@ -349,12 +348,9 @@ async fn get_data_in_task(
Ok(TransactionsDataStatus::AheadOfCache) => {
info!(
start_version = start_version,
request_name = request_metadata.processor_name.as_str(),
request_email = request_metadata.request_email.as_str(),
request_api_key_name = request_metadata.request_api_key_name.as_str(),
request_identifier = request_metadata.request_identifier.as_str(),
processor_name = request_metadata.processor_name.as_str(),
connection_id = request_metadata.request_connection_id.as_str(),
request_user_classification = request_metadata.request_user_classification.as_str(),
duration_in_secs = current_batch_start_time.elapsed().as_secs_f64(),
service_type = SERVICE_TYPE,
"[Data Service] Requested data is ahead of cache. Sleeping for {} ms.",
Expand Down Expand Up @@ -513,8 +509,7 @@ async fn data_fetcher_task(
.sum::<usize>();
BYTES_READY_TO_TRANSFER_FROM_SERVER
.with_label_values(&[
&request_metadata.request_api_key_name,
&request_metadata.request_email,
&request_metadata.request_identifier,
&request_metadata.processor_name,
])
.inc_by(bytes_ready_to_transfer as u64);
Expand Down Expand Up @@ -542,36 +537,32 @@ async fn data_fetcher_task(
Ok(_) => {
PROCESSED_BATCH_SIZE
.with_label_values(&[
request_metadata.request_api_key_name.as_str(),
request_metadata.request_email.as_str(),
request_metadata.request_identifier.as_str(),
request_metadata.processor_name.as_str(),
])
.set(current_batch_size as i64);
// TODO: Reasses whether this metric useful
LATEST_PROCESSED_VERSION_OLD
.with_label_values(&[
request_metadata.request_api_key_name.as_str(),
request_metadata.request_email.as_str(),
request_metadata.request_identifier.as_str(),
request_metadata.processor_name.as_str(),
])
.set(end_of_batch_version as i64);
PROCESSED_VERSIONS_COUNT
.with_label_values(&[
request_metadata.request_api_key_name.as_str(),
request_metadata.request_email.as_str(),
request_metadata.request_identifier.as_str(),
request_metadata.processor_name.as_str(),
])
.inc_by(current_batch_size as u64);
if let Some(data_latency_in_secs) = data_latency_in_secs {
PROCESSED_LATENCY_IN_SECS
.with_label_values(&[
request_metadata.request_api_key_name.as_str(),
request_metadata.request_email.as_str(),
request_metadata.request_identifier.as_str(),
request_metadata.processor_name.as_str(),
])
.set(data_latency_in_secs);
PROCESSED_LATENCY_IN_SECS_ALL
.with_label_values(&[request_metadata.request_user_classification.as_str()])
.with_label_values(&[])
.observe(data_latency_in_secs);
}
},
Expand All @@ -589,22 +580,17 @@ async fn data_fetcher_task(
current_version = end_of_batch_version + 1;
}
info!(
request_name = request_metadata.processor_name.as_str(),
request_email = request_metadata.request_email.as_str(),
request_api_key_name = request_metadata.request_api_key_name.as_str(),
request_identifier = request_metadata.request_identifier.as_str(),
processor_name = request_metadata.processor_name.as_str(),
connection_id = request_metadata.request_connection_id.as_str(),
request_user_classification = request_metadata.request_user_classification.as_str(),
request_user_classification = request_metadata.request_user_classification.as_str(),
service_type = SERVICE_TYPE,
"[Data Service] Client disconnected."
);
if let Some(start_time) = connection_start_time {
if start_time.elapsed().as_secs() < SHORT_CONNECTION_DURATION_IN_SECS {
SHORT_CONNECTION_COUNT
.with_label_values(&[
request_metadata.request_api_key_name.as_str(),
request_metadata.request_email.as_str(),
request_metadata.request_identifier.as_str(),
request_metadata.processor_name.as_str(),
])
.inc();
Expand Down Expand Up @@ -869,12 +855,7 @@ fn get_request_metadata(
req: &Request<GetTransactionsRequest>,
) -> tonic::Result<IndexerGrpcRequestMetadata> {
let request_metadata_pairs = vec![
("request_api_key_name", REQUEST_HEADER_APTOS_API_KEY_NAME),
("request_email", REQUEST_HEADER_APTOS_EMAIL_HEADER),
(
"request_user_classification",
REQUEST_HEADER_APTOS_USER_CLASSIFICATION_HEADER,
),
("request_identifier", REQUEST_HEADER_APTOS_IDENTIFIER),
("request_token", GRPC_AUTH_TOKEN_HEADER),
("processor_name", GRPC_REQUEST_NAME_HEADER),
];
Expand Down
5 changes: 2 additions & 3 deletions ecosystem/indexer-grpc/indexer-grpc-utils/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ pub const MESSAGE_SIZE_LIMIT: usize = 1024 * 1024 * 15;
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct IndexerGrpcRequestMetadata {
pub processor_name: String,
pub request_email: String,
pub request_user_classification: String,
pub request_api_key_name: String,
/// See `REQUEST_HEADER_APTOS_IDENTIFIER` for more information.
pub request_identifier: String,
pub request_connection_id: String,
// Token is no longer needed behind api gateway.
#[deprecated]
Expand Down
5 changes: 1 addition & 4 deletions ecosystem/indexer-grpc/indexer-grpc-utils/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,9 @@ pub fn log_grpc_step(
duration_in_secs,
size_in_bytes,
// Request metadata variables
request_name = &request_metadata.processor_name,
request_email = &request_metadata.request_email,
request_api_key_name = &request_metadata.request_api_key_name,
request_identifier = &request_metadata.request_identifier,
processor_name = &request_metadata.processor_name,
connection_id = &request_metadata.request_connection_id,
request_user_classification = &request_metadata.request_user_classification,
service_type,
step = step.get_step(),
"{}",
Expand Down

0 comments on commit a9fd65c

Please sign in to comment.