diff --git a/Cargo.lock b/Cargo.lock index aa29ded2275b..742a97688942 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4126,6 +4126,18 @@ dependencies = [ "thiserror", ] +[[package]] +name = "opentelemetry-appender-tracing" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b84de945cb3a6f1e0d6317cbd998bbd0519ab00f4b790db67e0ff4fdcf7cedb6" +dependencies = [ + "opentelemetry", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "opentelemetry-http" version = "0.13.0" @@ -9678,6 +9690,7 @@ dependencies = [ "anyhow", "chrono", "opentelemetry", + "opentelemetry-appender-tracing", "opentelemetry-otlp", "opentelemetry-semantic-conventions", "opentelemetry_sdk", diff --git a/Cargo.toml b/Cargo.toml index 8c94e29c4630..1d7252fc7532 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -143,6 +143,7 @@ opentelemetry = "0.24.0" opentelemetry_sdk = "0.24.0" opentelemetry-otlp = "0.17.0" opentelemetry-semantic-conventions = "0.16.0" +opentelemetry-appender-tracing = "0.5" pin-project-lite = "0.2.13" pretty_assertions = "1" prost = "0.12.1" diff --git a/core/lib/config/src/configs/observability.rs b/core/lib/config/src/configs/observability.rs index 0bc61df31197..42363cbcb4ff 100644 --- a/core/lib/config/src/configs/observability.rs +++ b/core/lib/config/src/configs/observability.rs @@ -19,6 +19,12 @@ pub struct ObservabilityConfig { pub struct OpentelemetryConfig { /// Enables export of span data of specified level (and above) using opentelemetry exporters. pub level: String, - /// Opentelemetry HTTP collector endpoint. + /// Opentelemetry HTTP traces collector endpoint. pub endpoint: String, + /// Opentelemetry HTTP logs collector endpoing. + /// This is optional, since right now the primary way to collect logs is via stdout. + /// + /// Important: sending logs via OTLP has only been tested locally, and the performance may be + /// suboptimal in production environments. + pub logs_endpoint: Option, } diff --git a/core/lib/config/src/observability_ext.rs b/core/lib/config/src/observability_ext.rs index 5f8a8927efd5..641b095eb3b9 100644 --- a/core/lib/config/src/observability_ext.rs +++ b/core/lib/config/src/observability_ext.rs @@ -46,7 +46,13 @@ impl TryFrom for Option { fn try_from(config: ObservabilityConfig) -> Result { Ok(config .opentelemetry - .map(|config| zksync_vlog::OpenTelemetry::new(&config.level, config.endpoint)) + .map(|config| { + zksync_vlog::OpenTelemetry::new( + &config.level, + Some(config.endpoint), + config.logs_endpoint, + ) + }) .transpose()?) } } diff --git a/core/lib/config/src/testonly.rs b/core/lib/config/src/testonly.rs index 2c2934859fe5..f77321caadb5 100644 --- a/core/lib/config/src/testonly.rs +++ b/core/lib/config/src/testonly.rs @@ -681,6 +681,7 @@ impl Distribution for EncodeDist { configs::OpentelemetryConfig { level: self.sample(rng), endpoint: self.sample(rng), + logs_endpoint: self.sample(rng), } } } diff --git a/core/lib/env_config/src/observability.rs b/core/lib/env_config/src/observability.rs index 3463ee189957..3376189fa61d 100644 --- a/core/lib/env_config/src/observability.rs +++ b/core/lib/env_config/src/observability.rs @@ -35,8 +35,13 @@ impl FromEnv for ObservabilityConfig { }; let opentelemetry_level = std::env::var("OPENTELEMETRY_LEVEL").ok(); let otlp_endpoint = std::env::var("OTLP_ENDPOINT").ok(); + let logs_endpoint = std::env::var("OTLP_LOGS_ENDPOINT").ok(); // OK to be absent. let opentelemetry = match (opentelemetry_level, otlp_endpoint) { - (Some(level), Some(endpoint)) => Some(OpentelemetryConfig { level, endpoint }), + (Some(level), Some(endpoint)) => Some(OpentelemetryConfig { + level, + endpoint, + logs_endpoint, + }), _ => None, }; diff --git a/core/lib/protobuf_config/src/observability.rs b/core/lib/protobuf_config/src/observability.rs index e86894f29583..dcf87771b587 100644 --- a/core/lib/protobuf_config/src/observability.rs +++ b/core/lib/protobuf_config/src/observability.rs @@ -66,6 +66,7 @@ impl ProtoRepr for proto::Opentelemetry { Ok(Self::Type { level: required(&self.level).context("level")?.clone(), endpoint: required(&self.endpoint).context("endpoint")?.clone(), + logs_endpoint: self.logs_endpoint.clone(), }) } @@ -73,6 +74,7 @@ impl ProtoRepr for proto::Opentelemetry { Self { level: Some(this.level.clone()), endpoint: Some(this.endpoint.clone()), + logs_endpoint: this.logs_endpoint.clone(), } } } diff --git a/core/lib/protobuf_config/src/proto/config/observability.proto b/core/lib/protobuf_config/src/proto/config/observability.proto index 773ea51a2f34..aa2fbdd4521f 100644 --- a/core/lib/protobuf_config/src/proto/config/observability.proto +++ b/core/lib/protobuf_config/src/proto/config/observability.proto @@ -22,4 +22,5 @@ message Observability { message Opentelemetry { optional string level = 1; // required optional string endpoint = 2; // required + optional string logs_endpoint = 3; // optional } diff --git a/core/lib/vlog/Cargo.toml b/core/lib/vlog/Cargo.toml index 3f9ce247442a..c656e52a05a8 100644 --- a/core/lib/vlog/Cargo.toml +++ b/core/lib/vlog/Cargo.toml @@ -34,6 +34,7 @@ opentelemetry-otlp = { workspace = true, features = [ "reqwest-client", ] } opentelemetry-semantic-conventions.workspace = true +opentelemetry-appender-tracing.workspace = true vise.workspace = true vise-exporter.workspace = true url.workspace = true diff --git a/core/lib/vlog/src/lib.rs b/core/lib/vlog/src/lib.rs index aebd413b749d..48b02c6958a6 100644 --- a/core/lib/vlog/src/lib.rs +++ b/core/lib/vlog/src/lib.rs @@ -25,8 +25,11 @@ pub struct ObservabilityBuilder { /// Guard for the observability subsystem. /// Releases configured integrations upon being dropped. pub struct ObservabilityGuard { - /// Opentelemetry provider. Can be used to force flush spans. - otlp_provider: Option, + /// Opentelemetry traces provider + otlp_tracing_provider: Option, + /// Opentelemetry logs provider + otlp_logging_provider: Option, + /// Sentry client guard sentry_guard: Option, } @@ -41,7 +44,15 @@ impl ObservabilityGuard { sentry_guard.flush(Some(FLUSH_TIMEOUT)); } - if let Some(provider) = &self.otlp_provider { + if let Some(provider) = &self.otlp_tracing_provider { + for result in provider.force_flush() { + if let Err(err) = result { + tracing::warn!("Flushing the spans failed: {err:?}"); + } + } + } + + if let Some(provider) = &self.otlp_logging_provider { for result in provider.force_flush() { if let Err(err) = result { tracing::warn!("Flushing the spans failed: {err:?}"); @@ -59,7 +70,12 @@ impl ObservabilityGuard { if let Some(sentry_guard) = &self.sentry_guard { sentry_guard.close(Some(SHUTDOWN_TIMEOUT)); } - if let Some(provider) = &self.otlp_provider { + if let Some(provider) = &self.otlp_tracing_provider { + if let Err(err) = provider.shutdown() { + tracing::warn!("Shutting down the provider failed: {err:?}"); + } + } + if let Some(provider) = &self.otlp_logging_provider { if let Err(err) = provider.shutdown() { tracing::warn!("Shutting down the provider failed: {err:?}"); } @@ -111,21 +127,28 @@ impl ObservabilityBuilder { let global_filter = logs.build_filter(); let logs_layer = logs.into_layer(); - let (otlp_provider, otlp_layer) = self + let (otlp_tracing_provider, otlp_tracing_layer) = self + .opentelemetry_layer + .as_ref() + .and_then(|layer| layer.tracing_layer()) + .unzip(); + let (otlp_logging_provider, otlp_logging_layer) = self .opentelemetry_layer - .map(|layer| layer.into_layer()) + .and_then(|layer| layer.logs_layer()) .unzip(); tracing_subscriber::registry() .with(global_filter) .with(logs_layer) - .with(otlp_layer) + .with(otlp_tracing_layer) + .with(otlp_logging_layer) .init(); let sentry_guard = self.sentry.map(|sentry| sentry.install()); ObservabilityGuard { - otlp_provider, + otlp_tracing_provider, + otlp_logging_provider, sentry_guard, } } diff --git a/core/lib/vlog/src/opentelemetry/mod.rs b/core/lib/vlog/src/opentelemetry/mod.rs index 1085f6c6db06..a5ff6477f738 100644 --- a/core/lib/vlog/src/opentelemetry/mod.rs +++ b/core/lib/vlog/src/opentelemetry/mod.rs @@ -14,67 +14,88 @@ use tracing_subscriber::{registry::LookupSpan, EnvFilter, Layer}; use url::Url; /// Information about the service. -#[derive(Debug, Default)] +/// +/// This information is initially filled as follows: +/// - Fields will be attempted to fetch from environment variables. See [`ServiceDescriptor::fill_from_env`]. +/// - If not found, some default values will be chosen. +#[derive(Debug, Clone)] #[non_exhaustive] pub struct ServiceDescriptor { /// Name of the k8s pod. - pub k8s_pod_name: Option, + /// If not provided directly or though env variable, the default value would be `zksync-0`. + pub k8s_pod_name: String, /// Name of the k8s namespace. - pub k8s_namespace_name: Option, + /// If not provided directly or through env variable, the default value would be `local`. + pub k8s_namespace_name: String, /// Name of the service. - pub service_name: Option, + /// If not provided directly or through env variable, the default value would be `zksync`. + pub service_name: String, +} + +impl Default for ServiceDescriptor { + fn default() -> Self { + Self::new() + } } impl ServiceDescriptor { + /// Environment variable to fetch the k8s pod name. + pub const K8S_POD_NAME_ENV_VAR: &'static str = "POD_NAME"; + /// Environment variable to fetch the k8s namespace name. + pub const K8S_NAMESPACE_NAME_ENV_VAR: &'static str = "POD_NAMESPACE"; + /// Environment variable to fetch the service name. + pub const SERVICE_NAME_ENV_VAR: &'static str = "SERVICE_NAME"; + /// Default value for the k8s pod name. + pub const DEFAULT_K8S_POD_NAME: &'static str = "zksync-0"; + /// Default value for the k8s namespace name. + pub const DEFAULT_K8S_NAMESPACE_NAME: &'static str = "local"; + /// Default value for the service name. + pub const DEFAULT_SERVICE_NAME: &'static str = "zksync"; + + /// Creates a filled `ServiceDescriptor` object. + /// Fetched fields can be overridden. pub fn new() -> Self { - Self::default() + // Attempt fetching data from environment variables, and use defaults if not provided. + fn env_or(env_var: &str, default: &str) -> String { + std::env::var(env_var).unwrap_or_else(|_| default.to_string()) + } + Self { + k8s_pod_name: env_or(Self::K8S_POD_NAME_ENV_VAR, Self::DEFAULT_K8S_POD_NAME), + k8s_namespace_name: env_or( + Self::K8S_NAMESPACE_NAME_ENV_VAR, + Self::DEFAULT_K8S_NAMESPACE_NAME, + ), + service_name: env_or(Self::SERVICE_NAME_ENV_VAR, Self::DEFAULT_SERVICE_NAME), + } } pub fn with_k8s_pod_name(mut self, k8s_pod_name: Option) -> Self { - self.k8s_pod_name = k8s_pod_name; + if let Some(k8s_pod_name) = k8s_pod_name { + self.k8s_pod_name = k8s_pod_name; + } self } pub fn with_k8s_namespace_name(mut self, k8s_namespace_name: Option) -> Self { - self.k8s_namespace_name = k8s_namespace_name; + if let Some(k8s_namespace_name) = k8s_namespace_name { + self.k8s_namespace_name = k8s_namespace_name; + } self } pub fn with_service_name(mut self, service_name: Option) -> Self { - self.service_name = service_name; - self - } - - /// Tries to fill empty fields from environment variables. - /// - /// The following environment variables are used: - /// - `POD_NAME` - /// - `POD_NAMESPACE` - /// - `SERVICE_NAME` - pub fn fill_from_env(mut self) -> Self { - if self.k8s_pod_name.is_none() { - self.k8s_pod_name = std::env::var("POD_NAME").ok(); - } - if self.k8s_namespace_name.is_none() { - self.k8s_namespace_name = std::env::var("POD_NAMESPACE").ok(); - } - if self.service_name.is_none() { - self.service_name = std::env::var("SERVICE_NAME").ok(); + if let Some(service_name) = service_name { + self.service_name = service_name; } self } fn into_otlp_resource(self) -> Resource { - let mut attributes = vec![]; - if let Some(pod_name) = self.k8s_pod_name { - attributes.push(KeyValue::new(K8S_POD_NAME, pod_name)); - } - if let Some(pod_namespace) = self.k8s_namespace_name { - attributes.push(KeyValue::new(K8S_NAMESPACE_NAME, pod_namespace)); - } - if let Some(service_name) = self.service_name { - attributes.push(KeyValue::new(SERVICE_NAME, service_name)); - } + let attributes = vec![ + KeyValue::new(K8S_POD_NAME, self.k8s_pod_name), + KeyValue::new(K8S_NAMESPACE_NAME, self.k8s_namespace_name), + KeyValue::new(SERVICE_NAME, self.service_name), + ]; Resource::new(attributes) } } @@ -83,57 +104,109 @@ impl ServiceDescriptor { pub struct OpenTelemetry { /// Enables export of span data of specified level (and above) using opentelemetry exporters. pub opentelemetry_level: OpenTelemetryLevel, - /// Opentelemetry HTTP collector endpoint. - pub otlp_endpoint: Url, + /// Opentelemetry HTTP collector endpoint for traces. + pub tracing_endpoint: Option, + /// Opentelemetry HTTP collector endpoint for logs. + pub logging_endpoint: Option, /// Information about service - pub service: Option, + pub service: ServiceDescriptor, } impl OpenTelemetry { pub fn new( opentelemetry_level: &str, - otlp_endpoint: String, + tracing_endpoint: Option, + logging_endpoint: Option, ) -> Result { + fn parse_url(url: Option) -> Result, OpenTelemetryLayerError> { + url.map(|v| { + v.parse() + .map_err(|e| OpenTelemetryLayerError::InvalidUrl(v, e)) + }) + .transpose() + } + Ok(Self { opentelemetry_level: opentelemetry_level.parse()?, - otlp_endpoint: otlp_endpoint - .parse() - .map_err(|e| OpenTelemetryLayerError::InvalidUrl(otlp_endpoint, e))?, - service: None, + tracing_endpoint: parse_url(tracing_endpoint)?, + logging_endpoint: parse_url(logging_endpoint)?, + service: ServiceDescriptor::new(), }) } + /// Can be used to override the service descriptor used by the layer. pub fn with_service_descriptor(mut self, service: ServiceDescriptor) -> Self { - self.service = Some(service); + self.service = service; self } - pub(super) fn into_layer(self) -> (opentelemetry_sdk::trace::TracerProvider, impl Layer) + /// Prepares an exporter for OTLP logs and layer for the `tracing` library. + /// Will return `None` if no logging URL was provided. + /// + /// *Important*: we use `tracing` library to generate logs, and convert the logs + /// to OTLP format when exporting. However, `tracing` doesn't provide information + /// about timestamp of the log. While this value is optional in OTLP, some + /// collectors/processors may ignore logs without timestamp. Thus, you may need to + /// have a proxy collector, like `opentelemetry-collector-contrib` or `vector`, and + /// use the functionality there to set the timestamp. Here's example configuration + /// for `opentelemetry-collector-contrib`: + /// + /// ```text + /// processors: + /// transform/set_time_unix_nano: + /// log_statements: + /// - context: log + /// statements: + /// - set(time_unix_nano, observed_time_unix_nano) + /// ``` + pub(super) fn logs_layer( + &self, + ) -> Option<(opentelemetry_sdk::logs::LoggerProvider, impl Layer)> where S: tracing::Subscriber + for<'span> LookupSpan<'span> + Send + Sync, { - let filter = match self.opentelemetry_level { - OpenTelemetryLevel::OFF => EnvFilter::new("off"), - OpenTelemetryLevel::INFO => EnvFilter::new("info"), - OpenTelemetryLevel::DEBUG => EnvFilter::new("debug"), - OpenTelemetryLevel::TRACE => EnvFilter::new("trace"), - }; + let logging_endpoint = self.logging_endpoint.clone()?; + let resource = self.service.clone().into_otlp_resource(); + + let exporter = opentelemetry_otlp::new_exporter() + .http() + .with_endpoint(logging_endpoint) + .build_log_exporter() + .expect("Failed to create OTLP exporter"); // URL is validated. + + let provider = opentelemetry_sdk::logs::LoggerProvider::builder() + .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio) + .with_resource(resource) + .build(); + + let layer = + opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(&provider); + + Some((provider, layer)) + } + + /// Prepares an exporter for OTLP traces and layer for `tracing` library. + /// Will return `None` if no tracing URL was provided. + pub(super) fn tracing_layer( + &self, + ) -> Option<(opentelemetry_sdk::trace::TracerProvider, impl Layer)> + where + S: tracing::Subscriber + for<'span> LookupSpan<'span> + Send + Sync, + { + let tracing_endpoint = self.tracing_endpoint.clone()?; // `otel::tracing` should be a level info to emit opentelemetry trace & span // `otel` set to debug to log detected resources, configuration read and inferred - let filter = filter + let filter = self + .filter() .add_directive("otel::tracing=trace".parse().unwrap()) .add_directive("otel=debug".parse().unwrap()); - let service = self.service.unwrap_or_default().fill_from_env(); - let service_name = service - .service_name - .clone() - .unwrap_or_else(|| "zksync_vlog".to_string()); - let resource = service.into_otlp_resource(); + let service_name = self.service.service_name.clone(); + let resource = self.service.clone().into_otlp_resource(); let exporter = opentelemetry_otlp::new_exporter() .http() - .with_endpoint(self.otlp_endpoint) + .with_endpoint(tracing_endpoint) .build_span_exporter() .expect("Failed to create OTLP exporter"); // URL is validated. @@ -155,7 +228,19 @@ impl OpenTelemetry { .with_tracer(tracer) .with_filter(filter); - (provider, layer) + Some((provider, layer)) + } + + /// Returns a filter for opentelemetry layer. + /// It's applied to the layer only, but note that there might be a global filter applied to the + /// whole subscriber. + fn filter(&self) -> EnvFilter { + match self.opentelemetry_level { + OpenTelemetryLevel::OFF => EnvFilter::new("off"), + OpenTelemetryLevel::INFO => EnvFilter::new("info"), + OpenTelemetryLevel::DEBUG => EnvFilter::new("debug"), + OpenTelemetryLevel::TRACE => EnvFilter::new("trace"), + } } } diff --git a/prover/Cargo.lock b/prover/Cargo.lock index 6f039520de08..c46bf4372326 100644 --- a/prover/Cargo.lock +++ b/prover/Cargo.lock @@ -3912,6 +3912,18 @@ dependencies = [ "thiserror", ] +[[package]] +name = "opentelemetry-appender-tracing" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b84de945cb3a6f1e0d6317cbd998bbd0519ab00f4b790db67e0ff4fdcf7cedb6" +dependencies = [ + "opentelemetry", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "opentelemetry-http" version = "0.13.0" @@ -8415,6 +8427,7 @@ dependencies = [ "anyhow", "chrono", "opentelemetry", + "opentelemetry-appender-tracing", "opentelemetry-otlp", "opentelemetry-semantic-conventions", "opentelemetry_sdk", diff --git a/zk_toolbox/Cargo.lock b/zk_toolbox/Cargo.lock index 4793e9e0a4e3..35e3067482e1 100644 --- a/zk_toolbox/Cargo.lock +++ b/zk_toolbox/Cargo.lock @@ -3042,6 +3042,18 @@ dependencies = [ "thiserror", ] +[[package]] +name = "opentelemetry-appender-tracing" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b84de945cb3a6f1e0d6317cbd998bbd0519ab00f4b790db67e0ff4fdcf7cedb6" +dependencies = [ + "opentelemetry", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "opentelemetry-http" version = "0.13.0" @@ -6535,6 +6547,7 @@ dependencies = [ "anyhow", "chrono", "opentelemetry", + "opentelemetry-appender-tracing", "opentelemetry-otlp", "opentelemetry-semantic-conventions", "opentelemetry_sdk",