Skip to content

Commit

Permalink
feat: Support sending logs via OTLP (#2556)
Browse files Browse the repository at this point in the history
## What ❔

Makes it possible to export logs via OTLP.
This doesn't disable exporting logs to stdout, which is extra valuable
when working with localhost observability stack.
Also, it improves handling of `ServiceDescriptor` so that it has some
somewhat sane values by default (setting data locally wasn't convenient
so far).

## Why ❔

Being able to collect logs to any OTLP collector.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
popzxc authored Aug 1, 2024
1 parent c4d8c52 commit 1d206c0
Show file tree
Hide file tree
Showing 13 changed files with 242 additions and 72 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ opentelemetry = "0.24.0"
opentelemetry_sdk = "0.24.0"
opentelemetry-otlp = "0.17.0"
opentelemetry-semantic-conventions = "0.16.0"
opentelemetry-appender-tracing = "0.5"
pin-project-lite = "0.2.13"
pretty_assertions = "1"
prost = "0.12.1"
Expand Down
8 changes: 7 additions & 1 deletion core/lib/config/src/configs/observability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ pub struct ObservabilityConfig {
pub struct OpentelemetryConfig {
/// Enables export of span data of specified level (and above) using opentelemetry exporters.
pub level: String,
/// Opentelemetry HTTP collector endpoint.
/// Opentelemetry HTTP traces collector endpoint.
pub endpoint: String,
/// Opentelemetry HTTP logs collector endpoing.
/// This is optional, since right now the primary way to collect logs is via stdout.
///
/// Important: sending logs via OTLP has only been tested locally, and the performance may be
/// suboptimal in production environments.
pub logs_endpoint: Option<String>,
}
8 changes: 7 additions & 1 deletion core/lib/config/src/observability_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,13 @@ impl TryFrom<ObservabilityConfig> for Option<zksync_vlog::OpenTelemetry> {
fn try_from(config: ObservabilityConfig) -> Result<Self, Self::Error> {
Ok(config
.opentelemetry
.map(|config| zksync_vlog::OpenTelemetry::new(&config.level, config.endpoint))
.map(|config| {
zksync_vlog::OpenTelemetry::new(
&config.level,
Some(config.endpoint),
config.logs_endpoint,
)
})
.transpose()?)
}
}
1 change: 1 addition & 0 deletions core/lib/config/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@ impl Distribution<configs::OpentelemetryConfig> for EncodeDist {
configs::OpentelemetryConfig {
level: self.sample(rng),
endpoint: self.sample(rng),
logs_endpoint: self.sample(rng),
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion core/lib/env_config/src/observability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,13 @@ impl FromEnv for ObservabilityConfig {
};
let opentelemetry_level = std::env::var("OPENTELEMETRY_LEVEL").ok();
let otlp_endpoint = std::env::var("OTLP_ENDPOINT").ok();
let logs_endpoint = std::env::var("OTLP_LOGS_ENDPOINT").ok(); // OK to be absent.
let opentelemetry = match (opentelemetry_level, otlp_endpoint) {
(Some(level), Some(endpoint)) => Some(OpentelemetryConfig { level, endpoint }),
(Some(level), Some(endpoint)) => Some(OpentelemetryConfig {
level,
endpoint,
logs_endpoint,
}),
_ => None,
};

Expand Down
2 changes: 2 additions & 0 deletions core/lib/protobuf_config/src/observability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,15 @@ impl ProtoRepr for proto::Opentelemetry {
Ok(Self::Type {
level: required(&self.level).context("level")?.clone(),
endpoint: required(&self.endpoint).context("endpoint")?.clone(),
logs_endpoint: self.logs_endpoint.clone(),
})
}

fn build(this: &Self::Type) -> Self {
Self {
level: Some(this.level.clone()),
endpoint: Some(this.endpoint.clone()),
logs_endpoint: this.logs_endpoint.clone(),
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ message Observability {
message Opentelemetry {
optional string level = 1; // required
optional string endpoint = 2; // required
optional string logs_endpoint = 3; // optional
}
1 change: 1 addition & 0 deletions core/lib/vlog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ opentelemetry-otlp = { workspace = true, features = [
"reqwest-client",
] }
opentelemetry-semantic-conventions.workspace = true
opentelemetry-appender-tracing.workspace = true
vise.workspace = true
vise-exporter.workspace = true
url.workspace = true
39 changes: 31 additions & 8 deletions core/lib/vlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ pub struct ObservabilityBuilder {
/// Guard for the observability subsystem.
/// Releases configured integrations upon being dropped.
pub struct ObservabilityGuard {
/// Opentelemetry provider. Can be used to force flush spans.
otlp_provider: Option<opentelemetry_sdk::trace::TracerProvider>,
/// Opentelemetry traces provider
otlp_tracing_provider: Option<opentelemetry_sdk::trace::TracerProvider>,
/// Opentelemetry logs provider
otlp_logging_provider: Option<opentelemetry_sdk::logs::LoggerProvider>,
/// Sentry client guard
sentry_guard: Option<ClientInitGuard>,
}

Expand All @@ -41,7 +44,15 @@ impl ObservabilityGuard {
sentry_guard.flush(Some(FLUSH_TIMEOUT));
}

if let Some(provider) = &self.otlp_provider {
if let Some(provider) = &self.otlp_tracing_provider {
for result in provider.force_flush() {
if let Err(err) = result {
tracing::warn!("Flushing the spans failed: {err:?}");
}
}
}

if let Some(provider) = &self.otlp_logging_provider {
for result in provider.force_flush() {
if let Err(err) = result {
tracing::warn!("Flushing the spans failed: {err:?}");
Expand All @@ -59,7 +70,12 @@ impl ObservabilityGuard {
if let Some(sentry_guard) = &self.sentry_guard {
sentry_guard.close(Some(SHUTDOWN_TIMEOUT));
}
if let Some(provider) = &self.otlp_provider {
if let Some(provider) = &self.otlp_tracing_provider {
if let Err(err) = provider.shutdown() {
tracing::warn!("Shutting down the provider failed: {err:?}");
}
}
if let Some(provider) = &self.otlp_logging_provider {
if let Err(err) = provider.shutdown() {
tracing::warn!("Shutting down the provider failed: {err:?}");
}
Expand Down Expand Up @@ -111,21 +127,28 @@ impl ObservabilityBuilder {
let global_filter = logs.build_filter();

let logs_layer = logs.into_layer();
let (otlp_provider, otlp_layer) = self
let (otlp_tracing_provider, otlp_tracing_layer) = self
.opentelemetry_layer
.as_ref()
.and_then(|layer| layer.tracing_layer())
.unzip();
let (otlp_logging_provider, otlp_logging_layer) = self
.opentelemetry_layer
.map(|layer| layer.into_layer())
.and_then(|layer| layer.logs_layer())
.unzip();

tracing_subscriber::registry()
.with(global_filter)
.with(logs_layer)
.with(otlp_layer)
.with(otlp_tracing_layer)
.with(otlp_logging_layer)
.init();

let sentry_guard = self.sentry.map(|sentry| sentry.install());

ObservabilityGuard {
otlp_provider,
otlp_tracing_provider,
otlp_logging_provider,
sentry_guard,
}
}
Expand Down
Loading

0 comments on commit 1d206c0

Please sign in to comment.