Skip to content

Commit

Permalink
Custom metric buckets (#844)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource authored Nov 21, 2024
1 parent dd37e29 commit d31c105
Show file tree
Hide file tree
Showing 17 changed files with 234 additions and 188 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ license-file = "LICENSE.txt"
[workspace.dependencies]
derive_builder = "0.20"
derive_more = { version = "1.0", features = ["constructor", "display", "from", "into", "debug"] }
thiserror = "2"
tonic = "0.12"
tonic-build = "0.12"
opentelemetry = { version = "0.24", features = ["metrics"] }
opentelemetry = { version = "0.26", features = ["metrics"] }
prost = "0.13"
prost-types = "0.13"

Expand Down
2 changes: 1 addition & 1 deletion client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ opentelemetry = { workspace = true, features = ["metrics"], optional = true }
parking_lot = "0.12"
prost-types = { workspace = true }
slotmap = "1.0"
thiserror = "1.0"
thiserror = { workspace = true }
tokio = "1.1"
tonic = { workspace = true, features = ["tls", "tls-roots"] }
tower = { version = "0.5", features = ["util"] }
Expand Down
4 changes: 2 additions & 2 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub use crate::{
proxy::HttpConnectProxyOptions,
retry::{CallType, RetryClient, RETRYABLE_ERROR_CODES},
};
pub use metrics::{LONG_REQUEST_LATENCY_HISTOGRAM_NAME, REQUEST_LATENCY_HISTOGRAM_NAME};
pub use raw::{CloudService, HealthService, OperatorService, TestService, WorkflowService};
pub use temporal_sdk_core_protos::temporal::api::{
enums::v1::ArchivalState,
Expand All @@ -42,13 +43,12 @@ use crate::{
use backoff::{exponential, ExponentialBackoff, SystemClock};
use http::{uri::InvalidUri, Uri};
use parking_lot::RwLock;
use std::sync::OnceLock;
use std::{
collections::HashMap,
fmt::{Debug, Formatter},
ops::{Deref, DerefMut},
str::FromStr,
sync::Arc,
sync::{Arc, OnceLock},
time::{Duration, Instant},
};
use temporal_sdk_core_api::telemetry::metrics::TemporalMeter;
Expand Down
9 changes: 7 additions & 2 deletions client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ use temporal_sdk_core_api::telemetry::metrics::{
use tonic::{body::BoxBody, transport::Channel, Code};
use tower::Service;

/// The string name (which may be prefixed) for this metric
pub static REQUEST_LATENCY_HISTOGRAM_NAME: &str = "request_latency";
/// The string name (which may be prefixed) for this metric
pub static LONG_REQUEST_LATENCY_HISTOGRAM_NAME: &str = "long_request_latency";

/// Used to track context associated with metrics, and record/update them
// Possible improvement: make generic over some type tag so that methods are only exposed if the
// appropriate k/vs have already been set.
Expand Down Expand Up @@ -58,12 +63,12 @@ impl MetricsContext {
unit: "".into(),
}),
svc_request_latency: meter.histogram_duration(MetricParameters {
name: "request_latency".into(),
name: REQUEST_LATENCY_HISTOGRAM_NAME.into(),
unit: "duration".into(),
description: "Histogram of client request latencies".into(),
}),
long_svc_request_latency: meter.histogram_duration(MetricParameters {
name: "long_request_latency".into(),
name: LONG_REQUEST_LATENCY_HISTOGRAM_NAME.into(),
unit: "duration".into(),
description: "Histogram of client long-poll request latencies".into(),
}),
Expand Down
2 changes: 1 addition & 1 deletion core-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ opentelemetry = { workspace = true, optional = true }
prost = { workspace = true }
prost-types = { workspace = true }
serde_json = "1.0"
thiserror = "1.0"
thiserror = { workspace = true }
tonic = { workspace = true }
tracing-core = "0.1"
url = "2.3"
Expand Down
28 changes: 25 additions & 3 deletions core-api/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ pub struct OtelCollectorOptions {
/// If set to true, use f64 seconds for durations instead of u64 milliseconds
#[builder(default)]
pub use_seconds_for_durations: bool,
/// Overrides for histogram buckets. Units depend on the value of `use_seconds_for_durations`.
#[builder(default)]
pub histogram_bucket_overrides: HistogramBucketOverrides,
}

/// Options for exporting metrics to Prometheus
Expand All @@ -78,15 +81,33 @@ pub struct PrometheusExporterOptions {
#[builder(default)]
pub global_tags: HashMap<String, String>,
/// If set true, all counters will include a "_total" suffix
#[builder(default = "false")]
#[builder(default)]
pub counters_total_suffix: bool,
/// If set true, all histograms will include the unit in their name as a suffix.
/// Ex: "_milliseconds".
#[builder(default = "false")]
#[builder(default)]
pub unit_suffix: bool,
/// If set to true, use f64 seconds for durations instead of u64 milliseconds
#[builder(default)]
pub use_seconds_for_durations: bool,
/// Overrides for histogram buckets. Units depend on the value of `use_seconds_for_durations`.
#[builder(default)]
pub histogram_bucket_overrides: HistogramBucketOverrides,
}

/// Allows overriding the buckets used by histogram metrics
#[derive(Debug, Clone, Default)]
pub struct HistogramBucketOverrides {
/// Overrides where the key is the metric name and the value is the list of bucket boundaries.
/// The metric name will apply regardless of name prefixing, if any. IE: the name acts like
/// `*metric_name`.
///
/// The string names of core's built-in histogram metrics are publicly available on the
/// `core::telemetry` module and the `client` crate.
///
/// See [here](https://docs.rs/opentelemetry_sdk/latest/opentelemetry_sdk/metrics/enum.Aggregation.html#variant.ExplicitBucketHistogram.field.boundaries)
/// for the exact meaning of boundaries.
pub overrides: HashMap<String, Vec<f64>>,
}

/// Control where logs go
Expand All @@ -102,7 +123,8 @@ pub enum Logger {
/// An [EnvFilter](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html) filter string.
filter: String,
},
// Push logs to Lang. Can used with temporal_sdk_core::telemetry::CoreLogBufferedConsumer to buffer.
/// Push logs to Lang. Can be used with
/// temporal_sdk_core::telemetry::log_export::CoreLogBufferedConsumer to buffer.
Push {
/// An [EnvFilter](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html) filter string.
filter: String,
Expand Down
26 changes: 26 additions & 0 deletions core-api/src/telemetry/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,32 @@ mod otel_impls {
}
}

impl Gauge for metrics::Gauge<u64> {
fn record(&self, value: u64, attributes: &MetricAttributes) {
if let MetricAttributes::OTel { kvs } = attributes {
self.record(value, kvs);
} else {
debug_assert!(
false,
"Must use OTel attributes with an OTel metric implementation"
);
}
}
}

impl GaugeF64 for metrics::Gauge<f64> {
fn record(&self, value: f64, attributes: &MetricAttributes) {
if let MetricAttributes::OTel { kvs } = attributes {
self.record(value, kvs);
} else {
debug_assert!(
false,
"Must use OTel attributes with an OTel metric implementation"
);
}
}
}

impl Histogram for metrics::Histogram<u64> {
fn record(&self, value: u64, attributes: &MetricAttributes) {
if let MetricAttributes::OTel { kvs } = attributes {
Expand Down
8 changes: 4 additions & 4 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ itertools = "0.13"
lru = "0.12"
mockall = "0.13"
opentelemetry = { workspace = true, features = ["metrics"], optional = true }
opentelemetry_sdk = { version = "0.24", features = ["rt-tokio", "metrics"], optional = true }
opentelemetry-otlp = { version = "0.17", features = ["tokio", "metrics", "tls"], optional = true }
opentelemetry-prometheus = { version = "0.17", optional = true }
opentelemetry_sdk = { version = "0.26", features = ["rt-tokio", "metrics"], optional = true }
opentelemetry-otlp = { version = "0.26", features = ["tokio", "metrics", "tls"], optional = true }
opentelemetry-prometheus = { git = "https://github.com/open-telemetry/opentelemetry-rust.git", rev = "e911383", optional = true }
parking_lot = { version = "0.12", features = ["send_guard"] }
pid = "4.0"
pin-project = "1.0"
Expand All @@ -61,7 +61,7 @@ siphasher = "1.0"
slotmap = "1.0"
sysinfo = { version = "0.32", default-features = false, features = ["system"] }
tar = { version = "0.4", optional = true }
thiserror = "1.0"
thiserror = { workspace = true }
tokio = { version = "1.37", features = ["rt", "rt-multi-thread", "parking_lot", "time", "fs", "process"] }
tokio-util = { version = "0.7", features = ["io", "io-util"] }
tokio-stream = "0.1"
Expand Down
41 changes: 25 additions & 16 deletions core/src/telemetry/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl Instruments {
unit: "".into(),
}),
wf_e2e_latency: meter.histogram_duration(MetricParameters {
name: WF_E2E_LATENCY_NAME.into(),
name: WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME.into(),
unit: "duration".into(),
description: "Histogram of total workflow execution latencies".into(),
}),
Expand All @@ -312,17 +312,17 @@ impl Instruments {
unit: "".into(),
}),
wf_task_sched_to_start_latency: meter.histogram_duration(MetricParameters {
name: WF_TASK_SCHED_TO_START_LATENCY_NAME.into(),
name: WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME.into(),
unit: "duration".into(),
description: "Histogram of workflow task schedule-to-start latencies".into(),
}),
wf_task_replay_latency: meter.histogram_duration(MetricParameters {
name: WF_TASK_REPLAY_LATENCY_NAME.into(),
name: WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME.into(),
unit: "duration".into(),
description: "Histogram of workflow task replay latencies".into(),
}),
wf_task_execution_latency: meter.histogram_duration(MetricParameters {
name: WF_TASK_EXECUTION_LATENCY_NAME.into(),
name: WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME.into(),
unit: "duration".into(),
description: "Histogram of workflow task execution (not replay) latencies".into(),
}),
Expand All @@ -342,12 +342,12 @@ impl Instruments {
unit: "".into(),
}),
act_sched_to_start_latency: meter.histogram_duration(MetricParameters {
name: ACT_SCHED_TO_START_LATENCY_NAME.into(),
name: ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME.into(),
unit: "duration".into(),
description: "Histogram of activity schedule-to-start latencies".into(),
}),
act_exec_latency: meter.histogram_duration(MetricParameters {
name: ACT_EXEC_LATENCY_NAME.into(),
name: ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME.into(),
unit: "duration".into(),
description: "Histogram of activity execution latencies".into(),
}),
Expand Down Expand Up @@ -496,13 +496,20 @@ pub(crate) fn failure_reason(reason: FailureReason) -> MetricKeyValue {
MetricKeyValue::new(KEY_TASK_FAILURE_TYPE, reason.to_string())
}

pub(super) const WF_E2E_LATENCY_NAME: &str = "workflow_endtoend_latency";
pub(super) const WF_TASK_SCHED_TO_START_LATENCY_NAME: &str =
/// The string name (which may be prefixed) for this metric
pub const WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME: &str = "workflow_endtoend_latency";
/// The string name (which may be prefixed) for this metric
pub const WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME: &str =
"workflow_task_schedule_to_start_latency";
pub(super) const WF_TASK_REPLAY_LATENCY_NAME: &str = "workflow_task_replay_latency";
pub(super) const WF_TASK_EXECUTION_LATENCY_NAME: &str = "workflow_task_execution_latency";
pub(super) const ACT_SCHED_TO_START_LATENCY_NAME: &str = "activity_schedule_to_start_latency";
pub(super) const ACT_EXEC_LATENCY_NAME: &str = "activity_execution_latency";
/// The string name (which may be prefixed) for this metric
pub const WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME: &str = "workflow_task_replay_latency";
/// The string name (which may be prefixed) for this metric
pub const WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME: &str = "workflow_task_execution_latency";
/// The string name (which may be prefixed) for this metric
pub const ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME: &str =
"activity_schedule_to_start_latency";
/// The string name (which may be prefixed) for this metric
pub const ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME: &str = "activity_execution_latency";
pub(super) const NUM_POLLERS_NAME: &str = "num_pollers";
pub(super) const TASK_SLOTS_AVAILABLE_NAME: &str = "worker_task_slots_available";
pub(super) const TASK_SLOTS_USED_NAME: &str = "worker_task_slots_used";
Expand Down Expand Up @@ -533,7 +540,7 @@ macro_rules! define_latency_buckets {

define_latency_buckets!(
(
WF_E2E_LATENCY_NAME,
WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME,
WF_LATENCY_MS_BUCKETS,
WF_LATENCY_S_BUCKETS,
[
Expand All @@ -556,19 +563,21 @@ define_latency_buckets!(
]
),
(
WF_TASK_EXECUTION_LATENCY_NAME | WF_TASK_REPLAY_LATENCY_NAME,
WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME
| WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME,
WF_TASK_MS_BUCKETS,
WF_TASK_S_BUCKETS,
[1., 10., 20., 50., 100., 200., 500., 1000.]
),
(
ACT_EXEC_LATENCY_NAME,
ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME,
ACT_EXE_MS_BUCKETS,
ACT_EXE_S_BUCKETS,
[50., 100., 500., 1000., 5000., 10_000., 60_000.]
),
(
WF_TASK_SCHED_TO_START_LATENCY_NAME | ACT_SCHED_TO_START_LATENCY_NAME,
WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME
| ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME,
TASK_SCHED_TO_START_MS_BUCKETS,
TASK_SCHED_TO_START_S_BUCKETS,
[100., 500., 1000., 5000., 10_000., 100_000., 1_000_000.]
Expand Down
7 changes: 6 additions & 1 deletion core/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ mod otel;
mod prometheus_server;

#[cfg(feature = "otel")]
pub use metrics::{default_buckets_for, MetricsCallBuffer};
pub use metrics::{
default_buckets_for, MetricsCallBuffer, ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME,
ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME, WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME,
WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME, WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME,
WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME,
};
#[cfg(feature = "otel")]
pub use otel::{build_otlp_metric_exporter, start_prometheus_metric_exporter};

Expand Down
Loading

0 comments on commit d31c105

Please sign in to comment.