Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom metric buckets #844

Merged
merged 7 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ license-file = "LICENSE.txt"
[workspace.dependencies]
derive_builder = "0.20"
derive_more = { version = "1.0", features = ["constructor", "display", "from", "into", "debug"] }
thiserror = "2"
tonic = "0.12"
tonic-build = "0.12"
opentelemetry = { version = "0.24", features = ["metrics"] }
opentelemetry = { version = "0.26", features = ["metrics"] }
prost = "0.13"
prost-types = "0.13"

Expand Down
2 changes: 1 addition & 1 deletion client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ opentelemetry = { workspace = true, features = ["metrics"], optional = true }
parking_lot = "0.12"
prost-types = { workspace = true }
slotmap = "1.0"
thiserror = "1.0"
thiserror = { workspace = true }
tokio = "1.1"
tonic = { workspace = true, features = ["tls", "tls-roots"] }
tower = { version = "0.5", features = ["util"] }
Expand Down
4 changes: 2 additions & 2 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub use crate::{
proxy::HttpConnectProxyOptions,
retry::{CallType, RetryClient, RETRYABLE_ERROR_CODES},
};
pub use metrics::{LONG_REQUEST_LATENCY_HISTOGRAM_NAME, REQUEST_LATENCY_HISTOGRAM_NAME};
pub use raw::{CloudService, HealthService, OperatorService, TestService, WorkflowService};
pub use temporal_sdk_core_protos::temporal::api::{
enums::v1::ArchivalState,
Expand All @@ -42,13 +43,12 @@ use crate::{
use backoff::{exponential, ExponentialBackoff, SystemClock};
use http::{uri::InvalidUri, Uri};
use parking_lot::RwLock;
use std::sync::OnceLock;
use std::{
collections::HashMap,
fmt::{Debug, Formatter},
ops::{Deref, DerefMut},
str::FromStr,
sync::Arc,
sync::{Arc, OnceLock},
time::{Duration, Instant},
};
use temporal_sdk_core_api::telemetry::metrics::TemporalMeter;
Expand Down
9 changes: 7 additions & 2 deletions client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ use temporal_sdk_core_api::telemetry::metrics::{
use tonic::{body::BoxBody, transport::Channel, Code};
use tower::Service;

/// The string name (which may be prefixed) for this metric
pub static REQUEST_LATENCY_HISTOGRAM_NAME: &str = "request_latency";
/// The string name (which may be prefixed) for this metric
pub static LONG_REQUEST_LATENCY_HISTOGRAM_NAME: &str = "long_request_latency";

/// Used to track context associated with metrics, and record/update them
// Possible improvement: make generic over some type tag so that methods are only exposed if the
// appropriate k/vs have already been set.
Expand Down Expand Up @@ -58,12 +63,12 @@ impl MetricsContext {
unit: "".into(),
}),
svc_request_latency: meter.histogram_duration(MetricParameters {
name: "request_latency".into(),
name: REQUEST_LATENCY_HISTOGRAM_NAME.into(),
unit: "duration".into(),
description: "Histogram of client request latencies".into(),
}),
long_svc_request_latency: meter.histogram_duration(MetricParameters {
name: "long_request_latency".into(),
name: LONG_REQUEST_LATENCY_HISTOGRAM_NAME.into(),
unit: "duration".into(),
description: "Histogram of client long-poll request latencies".into(),
}),
Expand Down
2 changes: 1 addition & 1 deletion core-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ opentelemetry = { workspace = true, optional = true }
prost = { workspace = true }
prost-types = { workspace = true }
serde_json = "1.0"
thiserror = "1.0"
thiserror = { workspace = true }
tonic = { workspace = true }
tracing-core = "0.1"
url = "2.3"
Expand Down
28 changes: 25 additions & 3 deletions core-api/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ pub struct OtelCollectorOptions {
/// If set to true, use f64 seconds for durations instead of u64 milliseconds
#[builder(default)]
pub use_seconds_for_durations: bool,
/// Overrides for histogram buckets. Units depend on the value of `use_seconds_for_durations`.
#[builder(default)]
pub histogram_bucket_overrides: HistogramBucketOverrides,
}

/// Options for exporting metrics to Prometheus
Expand All @@ -78,15 +81,33 @@ pub struct PrometheusExporterOptions {
#[builder(default)]
pub global_tags: HashMap<String, String>,
/// If set true, all counters will include a "_total" suffix
#[builder(default = "false")]
#[builder(default)]
pub counters_total_suffix: bool,
/// If set true, all histograms will include the unit in their name as a suffix.
/// Ex: "_milliseconds".
#[builder(default = "false")]
#[builder(default)]
pub unit_suffix: bool,
/// If set to true, use f64 seconds for durations instead of u64 milliseconds
#[builder(default)]
pub use_seconds_for_durations: bool,
/// Overrides for histogram buckets. Units depend on the value of `use_seconds_for_durations`.
#[builder(default)]
pub histogram_bucket_overrides: HistogramBucketOverrides,
}

/// Allows overriding the buckets used by histogram metrics
#[derive(Debug, Clone, Default)]
pub struct HistogramBucketOverrides {
/// Overrides where the key is the metric name and the value is the list of bucket boundaries.
/// The metric name will apply regardless of name prefixing, if any. IE: the name acts like
/// `*metric_name`.
Comment on lines +101 to +103
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this wildcard thing only apply to OTel or does it apply to Prometheus too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both

///
/// The string names of core's built-in histogram metrics are publicly available on the
/// `core::telemetry` module and the `client` crate.
///
/// See [here](https://docs.rs/opentelemetry_sdk/latest/opentelemetry_sdk/metrics/enum.Aggregation.html#variant.ExplicitBucketHistogram.field.boundaries)
/// for the exact meaning of boundaries.
pub overrides: HashMap<String, Vec<f64>>,
}

/// Control where logs go
Expand All @@ -102,7 +123,8 @@ pub enum Logger {
/// An [EnvFilter](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html) filter string.
filter: String,
},
// Push logs to Lang. Can used with temporal_sdk_core::telemetry::CoreLogBufferedConsumer to buffer.
/// Push logs to Lang. Can be used with
/// temporal_sdk_core::telemetry::log_export::CoreLogBufferedConsumer to buffer.
Push {
/// An [EnvFilter](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html) filter string.
filter: String,
Expand Down
26 changes: 26 additions & 0 deletions core-api/src/telemetry/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,32 @@ mod otel_impls {
}
}

impl Gauge for metrics::Gauge<u64> {
fn record(&self, value: u64, attributes: &MetricAttributes) {
if let MetricAttributes::OTel { kvs } = attributes {
self.record(value, kvs);
} else {
debug_assert!(
false,
"Must use OTel attributes with an OTel metric implementation"
);
}
}
}

impl GaugeF64 for metrics::Gauge<f64> {
fn record(&self, value: f64, attributes: &MetricAttributes) {
if let MetricAttributes::OTel { kvs } = attributes {
self.record(value, kvs);
} else {
debug_assert!(
false,
"Must use OTel attributes with an OTel metric implementation"
);
}
}
}

impl Histogram for metrics::Histogram<u64> {
fn record(&self, value: u64, attributes: &MetricAttributes) {
if let MetricAttributes::OTel { kvs } = attributes {
Expand Down
8 changes: 4 additions & 4 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ itertools = "0.13"
lru = "0.12"
mockall = "0.13"
opentelemetry = { workspace = true, features = ["metrics"], optional = true }
opentelemetry_sdk = { version = "0.24", features = ["rt-tokio", "metrics"], optional = true }
opentelemetry-otlp = { version = "0.17", features = ["tokio", "metrics", "tls"], optional = true }
opentelemetry-prometheus = { version = "0.17", optional = true }
opentelemetry_sdk = { version = "0.26", features = ["rt-tokio", "metrics"], optional = true }
opentelemetry-otlp = { version = "0.26", features = ["tokio", "metrics", "tls"], optional = true }
opentelemetry-prometheus = { git = "https://github.com/open-telemetry/opentelemetry-rust.git", rev = "e911383", optional = true }
parking_lot = { version = "0.12", features = ["send_guard"] }
pid = "4.0"
pin-project = "1.0"
Expand All @@ -61,7 +61,7 @@ siphasher = "1.0"
slotmap = "1.0"
sysinfo = { version = "0.32", default-features = false, features = ["system"] }
tar = { version = "0.4", optional = true }
thiserror = "1.0"
thiserror = { workspace = true }
tokio = { version = "1.37", features = ["rt", "rt-multi-thread", "parking_lot", "time", "fs", "process"] }
tokio-util = { version = "0.7", features = ["io", "io-util"] }
tokio-stream = "0.1"
Expand Down
41 changes: 25 additions & 16 deletions core/src/telemetry/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl Instruments {
unit: "".into(),
}),
wf_e2e_latency: meter.histogram_duration(MetricParameters {
name: WF_E2E_LATENCY_NAME.into(),
name: WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME.into(),
unit: "duration".into(),
description: "Histogram of total workflow execution latencies".into(),
}),
Expand All @@ -312,17 +312,17 @@ impl Instruments {
unit: "".into(),
}),
wf_task_sched_to_start_latency: meter.histogram_duration(MetricParameters {
name: WF_TASK_SCHED_TO_START_LATENCY_NAME.into(),
name: WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME.into(),
unit: "duration".into(),
description: "Histogram of workflow task schedule-to-start latencies".into(),
}),
wf_task_replay_latency: meter.histogram_duration(MetricParameters {
name: WF_TASK_REPLAY_LATENCY_NAME.into(),
name: WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME.into(),
unit: "duration".into(),
description: "Histogram of workflow task replay latencies".into(),
}),
wf_task_execution_latency: meter.histogram_duration(MetricParameters {
name: WF_TASK_EXECUTION_LATENCY_NAME.into(),
name: WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME.into(),
unit: "duration".into(),
description: "Histogram of workflow task execution (not replay) latencies".into(),
}),
Expand All @@ -342,12 +342,12 @@ impl Instruments {
unit: "".into(),
}),
act_sched_to_start_latency: meter.histogram_duration(MetricParameters {
name: ACT_SCHED_TO_START_LATENCY_NAME.into(),
name: ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME.into(),
unit: "duration".into(),
description: "Histogram of activity schedule-to-start latencies".into(),
}),
act_exec_latency: meter.histogram_duration(MetricParameters {
name: ACT_EXEC_LATENCY_NAME.into(),
name: ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME.into(),
unit: "duration".into(),
description: "Histogram of activity execution latencies".into(),
}),
Expand Down Expand Up @@ -496,13 +496,20 @@ pub(crate) fn failure_reason(reason: FailureReason) -> MetricKeyValue {
MetricKeyValue::new(KEY_TASK_FAILURE_TYPE, reason.to_string())
}

pub(super) const WF_E2E_LATENCY_NAME: &str = "workflow_endtoend_latency";
pub(super) const WF_TASK_SCHED_TO_START_LATENCY_NAME: &str =
/// The string name (which may be prefixed) for this metric
pub const WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME: &str = "workflow_endtoend_latency";
/// The string name (which may be prefixed) for this metric
pub const WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME: &str =
"workflow_task_schedule_to_start_latency";
pub(super) const WF_TASK_REPLAY_LATENCY_NAME: &str = "workflow_task_replay_latency";
pub(super) const WF_TASK_EXECUTION_LATENCY_NAME: &str = "workflow_task_execution_latency";
pub(super) const ACT_SCHED_TO_START_LATENCY_NAME: &str = "activity_schedule_to_start_latency";
pub(super) const ACT_EXEC_LATENCY_NAME: &str = "activity_execution_latency";
/// The string name (which may be prefixed) for this metric
pub const WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME: &str = "workflow_task_replay_latency";
/// The string name (which may be prefixed) for this metric
pub const WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME: &str = "workflow_task_execution_latency";
/// The string name (which may be prefixed) for this metric
pub const ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME: &str =
"activity_schedule_to_start_latency";
/// The string name (which may be prefixed) for this metric
pub const ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME: &str = "activity_execution_latency";
pub(super) const NUM_POLLERS_NAME: &str = "num_pollers";
pub(super) const TASK_SLOTS_AVAILABLE_NAME: &str = "worker_task_slots_available";
pub(super) const TASK_SLOTS_USED_NAME: &str = "worker_task_slots_used";
Expand Down Expand Up @@ -533,7 +540,7 @@ macro_rules! define_latency_buckets {

define_latency_buckets!(
(
WF_E2E_LATENCY_NAME,
WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME,
WF_LATENCY_MS_BUCKETS,
WF_LATENCY_S_BUCKETS,
[
Expand All @@ -556,19 +563,21 @@ define_latency_buckets!(
]
),
(
WF_TASK_EXECUTION_LATENCY_NAME | WF_TASK_REPLAY_LATENCY_NAME,
WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME
| WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME,
WF_TASK_MS_BUCKETS,
WF_TASK_S_BUCKETS,
[1., 10., 20., 50., 100., 200., 500., 1000.]
),
(
ACT_EXEC_LATENCY_NAME,
ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME,
ACT_EXE_MS_BUCKETS,
ACT_EXE_S_BUCKETS,
[50., 100., 500., 1000., 5000., 10_000., 60_000.]
),
(
WF_TASK_SCHED_TO_START_LATENCY_NAME | ACT_SCHED_TO_START_LATENCY_NAME,
WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME
| ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME,
TASK_SCHED_TO_START_MS_BUCKETS,
TASK_SCHED_TO_START_S_BUCKETS,
[100., 500., 1000., 5000., 10_000., 100_000., 1_000_000.]
Expand Down
7 changes: 6 additions & 1 deletion core/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ mod otel;
mod prometheus_server;

#[cfg(feature = "otel")]
pub use metrics::{default_buckets_for, MetricsCallBuffer};
pub use metrics::{
default_buckets_for, MetricsCallBuffer, ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME,
ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME, WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME,
WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME, WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME,
WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME,
};
#[cfg(feature = "otel")]
pub use otel::{build_otlp_metric_exporter, start_prometheus_metric_exporter};

Expand Down
Loading
Loading