Skip to content

Commit

Permalink
Telemetry for PP health
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Jun 5, 2024
1 parent 8525b9f commit 7c3a8c2
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 1 deletion.
4 changes: 4 additions & 0 deletions crates/types/src/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ impl Lsn {
pub const fn new(lsn: u64) -> Self {
Lsn(lsn)
}

pub fn as_u64(self) -> u64 {
self.0
}
}

impl SequenceNumber for Lsn {
Expand Down
6 changes: 6 additions & 0 deletions crates/types/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ pub struct PartitionProcessorStatus {
}

impl PartitionProcessorStatus {
pub fn is_effective_leader(&self) -> bool {
self.effective_mode
.map(|m| m == RunMode::Leader)
.unwrap_or(false)
}

pub fn new(planned_mode: RunMode) -> Self {
Self {
updated_at: MillisSinceEpoch::now(),
Expand Down
5 changes: 5 additions & 0 deletions crates/types/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ impl MillisSinceEpoch {
pub fn as_u64(&self) -> u64 {
self.0
}

pub fn elapsed(&self) -> Duration {
let now = Self::now();
Duration::from_millis(now.0 - self.0)
}
}

impl From<u64> for MillisSinceEpoch {
Expand Down
53 changes: 52 additions & 1 deletion crates/worker/src/metric_definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

/// Optional to have but adds description/help message to the metrics emitted to
/// the metrics' sink.
use metrics::{describe_counter, describe_histogram, Unit};
use metrics::{describe_counter, describe_gauge, describe_histogram, Unit};

pub const PARTITION_APPLY_COMMAND: &str = "restate.partition.apply_command.seconds";
pub const PARTITION_ACTUATOR_HANDLED: &str = "restate.partition.actuator_handled.total";
Expand All @@ -19,6 +19,15 @@ pub const PARTITION_STORAGE_TX_CREATED: &str = "restate.partition.storage_tx_cre
pub const PARTITION_STORAGE_TX_COMMITTED: &str = "restate.partition.storage_tx_committed.total";
pub const PARTITION_HANDLE_LEADER_ACTIONS: &str = "restate.partition.handle_leader_action.total";

pub const NUM_ACTIVE_PARTITIONS: &str = "restate.num_active_partitions";
pub const PARTITION_TIME_SINCE_LAST_STATUS_UPDATE: &str =
"restate.partition.time_since_last_status_update";
pub const PARTITION_TIME_SINCE_LAST_RECORD: &str = "restate.partition.time_since_last_record";
pub const PARTITION_LAST_APPLIED_LOG_LSN: &str = "restate.partition.last_applied_lsn";
pub const PARTITION_LAST_PERSISTED_LOG_LSN: &str = "restate.partition.last_persisted_lsn";
pub const PARTITION_IS_EFFECTIVE_LEADER: &str = "restate.partition.is_effective_leader";
pub const PARTITION_IS_ACTIVE: &str = "restate.partition.is_active";

pub const PP_APPLY_RECORD_DURATION: &str = "restate.partition.apply_record_duration.seconds";
pub const PARTITION_LEADER_HANDLE_ACTION_BATCH_DURATION: &str =
"restate.partition.handle_action_batch_duration.seconds";
Expand Down Expand Up @@ -73,4 +82,46 @@ pub(crate) fn describe_metrics() {
Unit::Seconds,
"Time spent handling an invoker effect command"
);

describe_gauge!(
NUM_ACTIVE_PARTITIONS,
Unit::Count,
"Number of partitions started by partition processor manager on this node"
);

describe_gauge!(
PARTITION_IS_EFFECTIVE_LEADER,
Unit::Count,
"Set to 1 if the partition is an effective leader"
);

describe_gauge!(
PARTITION_IS_ACTIVE,
Unit::Count,
"Set to 1 if the partition is an active replay (not catching up or starting)"
);

describe_gauge!(
PARTITION_TIME_SINCE_LAST_STATUS_UPDATE,
Unit::Seconds,
"Number of seconds since the last partition status update"
);

describe_gauge!(
PARTITION_LAST_APPLIED_LOG_LSN,
Unit::Count,
"Raw value of the last applied log LSN"
);

describe_gauge!(
PARTITION_LAST_PERSISTED_LOG_LSN,
Unit::Count,
"Raw value of the LSN that can be trimmed"
);

describe_gauge!(
PARTITION_TIME_SINCE_LAST_RECORD,
Unit::Seconds,
"Number of seconds since the last record was applied"
);
}
50 changes: 50 additions & 0 deletions crates/worker/src/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ use anyhow::Context;
use futures::future::OptionFuture;
use futures::stream::BoxStream;
use futures::stream::StreamExt;
use metrics::gauge;
use restate_core::network::NetworkSender;
use restate_core::TaskCenter;
use restate_network::rpc_router::{RpcError, RpcRouter};
use restate_node_protocol::partition_processor_manager::GetProcessorsState;
use restate_node_protocol::partition_processor_manager::ProcessorsStateResponse;
use restate_node_protocol::RpcMessage;
use restate_types::processors::ReplayStatus;
use restate_types::processors::{PartitionProcessorStatus, RunMode};
use tokio::sync::{mpsc, watch};
use tokio::time;
Expand Down Expand Up @@ -53,6 +55,14 @@ use restate_types::GenerationalNodeId;
use restate_wal_protocol::control::AnnounceLeader;
use restate_wal_protocol::{Command as WalCommand, Destination, Envelope, Header, Source};

use crate::metric_definitions::NUM_PARTITIONS;

Check failure on line 58 in crates/worker/src/partition_processor_manager.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

unresolved import `crate::metric_definitions::NUM_PARTITIONS`
use crate::metric_definitions::PARTITION_IS_ACTIVE;
use crate::metric_definitions::PARTITION_IS_EFFECTIVE_LEADER;
use crate::metric_definitions::PARTITION_LABEL;
use crate::metric_definitions::PARTITION_LAST_APPLIED_LOG_LSN;
use crate::metric_definitions::PARTITION_LAST_PERSISTED_LOG_LSN;
use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_RECORD;
use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_STATUS_UPDATE;
use crate::partition::storage::invoker::InvokerStorageReader;
use crate::partition::storage::PartitionStorage;
use crate::partition::PartitionProcessorControlCommand;
Expand Down Expand Up @@ -234,6 +244,44 @@ impl PartitionProcessorManager {
.iter()
.map(|(partition_id, state)| {
let mut status = state.watch_rx.borrow().clone();
gauge!(PARTITION_TIME_SINCE_LAST_STATUS_UPDATE,
PARTITION_LABEL => partition_id.to_string())
.set(status.updated_at.elapsed());

gauge!(PARTITION_IS_EFFECTIVE_LEADER,
PARTITION_LABEL => partition_id.to_string())
.set(if status.is_effective_leader() {
1.0
} else {
0.0
});

gauge!(PARTITION_IS_ACTIVE,
PARTITION_LABEL => partition_id.to_string())
.set(if status.replay_status == ReplayStatus::Active {
1.0
} else {
0.0
});

if let Some(last_applied_log_lsn) = status.last_applied_log_lsn {
gauge!(PARTITION_LAST_APPLIED_LOG_LSN,
PARTITION_LABEL => partition_id.to_string())
.set(last_applied_log_lsn.as_u64() as f64);
}

if let Some(last_persisted_log_lsn) = status.last_persisted_log_lsn {
gauge!(PARTITION_LAST_PERSISTED_LOG_LSN,
PARTITION_LABEL => partition_id.to_string())
.set(last_persisted_log_lsn.as_u64() as f64);
}

if let Some(last_record_applied_at) = status.last_record_applied_at {
gauge!(PARTITION_TIME_SINCE_LAST_RECORD,
PARTITION_LABEL => partition_id.to_string())
.set(last_record_applied_at.elapsed());
}

status.last_persisted_log_lsn = persisted_lsns
.as_ref()
.and_then(|lsns| lsns.get(partition_id).cloned());
Expand Down Expand Up @@ -307,6 +355,8 @@ impl PartitionProcessorManager {
}
}
}

gauge!(NUM_PARTITIONS).set(self.running_partition_processors.len() as f64);
Ok(())
}

Expand Down

0 comments on commit 7c3a8c2

Please sign in to comment.