Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support sending logs via OTLP #2556

Merged
merged 2 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ opentelemetry = "0.24.0"
opentelemetry_sdk = "0.24.0"
opentelemetry-otlp = "0.17.0"
opentelemetry-semantic-conventions = "0.16.0"
opentelemetry-appender-tracing = "0.5"
pin-project-lite = "0.2.13"
pretty_assertions = "1"
prost = "0.12.1"
Expand Down
8 changes: 7 additions & 1 deletion core/lib/config/src/configs/observability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ pub struct ObservabilityConfig {
pub struct OpentelemetryConfig {
/// Enables export of span data of specified level (and above) using opentelemetry exporters.
pub level: String,
/// Opentelemetry HTTP collector endpoint.
/// Opentelemetry HTTP traces collector endpoint.
pub endpoint: String,
/// Opentelemetry HTTP logs collector endpoing.
/// This is optional, since right now the primary way to collect logs is via stdout.
///
/// Important: sending logs via OTLP has only been tested locally, and the performance may be
/// suboptimal in production environments.
pub logs_endpoint: Option<String>,
}
8 changes: 7 additions & 1 deletion core/lib/config/src/observability_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,13 @@ impl TryFrom<ObservabilityConfig> for Option<zksync_vlog::OpenTelemetry> {
fn try_from(config: ObservabilityConfig) -> Result<Self, Self::Error> {
Ok(config
.opentelemetry
.map(|config| zksync_vlog::OpenTelemetry::new(&config.level, config.endpoint))
.map(|config| {
zksync_vlog::OpenTelemetry::new(
&config.level,
Some(config.endpoint),
config.logs_endpoint,
)
})
.transpose()?)
}
}
1 change: 1 addition & 0 deletions core/lib/config/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@ impl Distribution<configs::OpentelemetryConfig> for EncodeDist {
configs::OpentelemetryConfig {
level: self.sample(rng),
endpoint: self.sample(rng),
logs_endpoint: self.sample(rng),
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion core/lib/env_config/src/observability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,13 @@ impl FromEnv for ObservabilityConfig {
};
let opentelemetry_level = std::env::var("OPENTELEMETRY_LEVEL").ok();
let otlp_endpoint = std::env::var("OTLP_ENDPOINT").ok();
let logs_endpoint = std::env::var("OTLP_LOGS_ENDPOINT").ok(); // OK to be absent.
let opentelemetry = match (opentelemetry_level, otlp_endpoint) {
(Some(level), Some(endpoint)) => Some(OpentelemetryConfig { level, endpoint }),
(Some(level), Some(endpoint)) => Some(OpentelemetryConfig {
level,
endpoint,
logs_endpoint,
}),
_ => None,
};

Expand Down
2 changes: 2 additions & 0 deletions core/lib/protobuf_config/src/observability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,15 @@ impl ProtoRepr for proto::Opentelemetry {
Ok(Self::Type {
level: required(&self.level).context("level")?.clone(),
endpoint: required(&self.endpoint).context("endpoint")?.clone(),
logs_endpoint: self.logs_endpoint.clone(),
})
}

fn build(this: &Self::Type) -> Self {
Self {
level: Some(this.level.clone()),
endpoint: Some(this.endpoint.clone()),
logs_endpoint: this.logs_endpoint.clone(),
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ message Observability {
message Opentelemetry {
optional string level = 1; // required
optional string endpoint = 2; // required
optional string logs_endpoint = 3; // optional
}
1 change: 1 addition & 0 deletions core/lib/vlog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ opentelemetry-otlp = { workspace = true, features = [
"reqwest-client",
] }
opentelemetry-semantic-conventions.workspace = true
opentelemetry-appender-tracing.workspace = true
vise.workspace = true
vise-exporter.workspace = true
url.workspace = true
39 changes: 31 additions & 8 deletions core/lib/vlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ pub struct ObservabilityBuilder {
/// Guard for the observability subsystem.
/// Releases configured integrations upon being dropped.
pub struct ObservabilityGuard {
/// Opentelemetry provider. Can be used to force flush spans.
otlp_provider: Option<opentelemetry_sdk::trace::TracerProvider>,
/// Opentelemetry traces provider
otlp_tracing_provider: Option<opentelemetry_sdk::trace::TracerProvider>,
/// Opentelemetry logs provider
otlp_logging_provider: Option<opentelemetry_sdk::logs::LoggerProvider>,
/// Sentry client guard
sentry_guard: Option<ClientInitGuard>,
}

Expand All @@ -41,7 +44,15 @@ impl ObservabilityGuard {
sentry_guard.flush(Some(FLUSH_TIMEOUT));
}

if let Some(provider) = &self.otlp_provider {
if let Some(provider) = &self.otlp_tracing_provider {
for result in provider.force_flush() {
if let Err(err) = result {
tracing::warn!("Flushing the spans failed: {err:?}");
}
}
}

if let Some(provider) = &self.otlp_logging_provider {
for result in provider.force_flush() {
if let Err(err) = result {
tracing::warn!("Flushing the spans failed: {err:?}");
popzxc marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -59,7 +70,12 @@ impl ObservabilityGuard {
if let Some(sentry_guard) = &self.sentry_guard {
sentry_guard.close(Some(SHUTDOWN_TIMEOUT));
}
if let Some(provider) = &self.otlp_provider {
if let Some(provider) = &self.otlp_tracing_provider {
if let Err(err) = provider.shutdown() {
tracing::warn!("Shutting down the provider failed: {err:?}");
popzxc marked this conversation as resolved.
Show resolved Hide resolved
}
}
if let Some(provider) = &self.otlp_logging_provider {
if let Err(err) = provider.shutdown() {
tracing::warn!("Shutting down the provider failed: {err:?}");
}
Expand Down Expand Up @@ -111,21 +127,28 @@ impl ObservabilityBuilder {
let global_filter = logs.build_filter();

let logs_layer = logs.into_layer();
let (otlp_provider, otlp_layer) = self
let (otlp_tracing_provider, otlp_tracing_layer) = self
.opentelemetry_layer
.as_ref()
.and_then(|layer| layer.tracing_layer())
.unzip();
let (otlp_logging_provider, otlp_logging_layer) = self
.opentelemetry_layer
.map(|layer| layer.into_layer())
.and_then(|layer| layer.logs_layer())
.unzip();

tracing_subscriber::registry()
.with(global_filter)
.with(logs_layer)
.with(otlp_layer)
.with(otlp_tracing_layer)
.with(otlp_logging_layer)
.init();

let sentry_guard = self.sentry.map(|sentry| sentry.install());

ObservabilityGuard {
otlp_provider,
otlp_tracing_provider,
otlp_logging_provider,
sentry_guard,
}
}
Expand Down
Loading
Loading