From b5ee9851c42c3ea2f3a9962e4dc236642d9c6bd6 Mon Sep 17 00:00:00 2001 From: Benjamin Boudreau Date: Mon, 18 Nov 2024 22:50:41 -0500 Subject: [PATCH] WIP traces --- server/Cargo.lock | 236 +++++++++++++++++++++++++++-- server/Cargo.toml | 7 +- server/src/main.rs | 90 ++++++++--- server/src/routes.rs | 5 +- server/state_store/src/lib.rs | 13 +- server/state_store/src/requests.rs | 3 +- 6 files changed, 315 insertions(+), 39 deletions(-) diff --git a/server/Cargo.lock b/server/Cargo.lock index c81dd21e2..c43ea79b6 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -300,6 +300,24 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-tracing-opentelemetry" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74507fbac7dcb93ddd90fd3d438ebcb4c2bc5de10ff460eeae5b8de50bd2fe8f" +dependencies = [ + "axum", + "futures-core", + "futures-util", + "http", + "opentelemetry 0.27.0", + "pin-project-lite", + "tower 0.5.1", + "tracing", + "tracing-opentelemetry 0.28.0", + "tracing-opentelemetry-instrumentation-sdk", +] + [[package]] name = "backtrace" version = "0.3.73" @@ -1085,7 +1103,7 @@ dependencies = [ "futures-core", "futures-sink", "http", - "indexmap", + "indexmap 2.5.0", "slab", "tokio", "tokio-util", @@ -1102,6 +1120,12 @@ dependencies = [ "crunchy", ] +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + [[package]] name = "hashbrown" version = "0.14.5" @@ -1218,11 +1242,24 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-util" -version = "0.1.7" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9" +checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4" dependencies = [ "bytes", "futures-channel", @@ -1233,7 +1270,6 @@ dependencies = [ "pin-project-lite", "socket2", "tokio", - "tower 0.4.13", "tower-service", "tracing", ] @@ -1415,6 +1451,7 @@ dependencies = [ "axum", "axum-otel-metrics", "axum-server", + "axum-tracing-opentelemetry", "blob_store", "bytes", "ciborium", @@ -1430,6 +1467,7 @@ dependencies = [ "nanoid", "object_store", "opentelemetry 0.24.0", + "opentelemetry-otlp", "opentelemetry-prometheus 0.17.0", "opentelemetry_sdk 0.24.1", "prometheus", @@ -1443,6 +1481,7 @@ dependencies = [ "tokio", "tower-http", "tracing", + "tracing-opentelemetry 0.25.0", "tracing-subscriber", "url", "utoipa", @@ -1470,6 +1509,16 @@ dependencies = [ "serde_json", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.5.0" @@ -1477,7 +1526,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.14.5", "serde", ] @@ -1930,6 +1979,38 @@ dependencies = [ "thiserror", ] +[[package]] +name = "opentelemetry" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f3cebff57f7dbd1255b44d8bddc2cebeb0ea677dbaa2e25a3070a91b318f660" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b925a602ffb916fb7421276b86756027b37ee708f9dce2dbdcc51739f07e727" +dependencies = [ + "async-trait", + "futures-core", + "http", + "opentelemetry 0.24.0", + "opentelemetry-proto", + "opentelemetry_sdk 0.24.1", + "prost", + "thiserror", + "tokio", + "tonic", +] + [[package]] name = "opentelemetry-prometheus" version = "0.15.0" @@ -1956,6 +2037,18 @@ dependencies = [ "protobuf", ] +[[package]] +name = "opentelemetry-proto" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30ee9f20bff9c984511a02f082dc8ede839e4a9bf15cc2487c8d6fea5ad850d9" +dependencies = [ + "opentelemetry 0.24.0", + "opentelemetry_sdk 0.24.1", + "prost", + "tonic", +] + [[package]] name = "opentelemetry-semantic-conventions" version = "0.14.0" @@ -2001,6 +2094,26 @@ dependencies = [ "rand", "serde_json", "thiserror", + "tokio", + "tokio-stream", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27b742c1cae4693792cc564e58d75a2a0ba29421a34a85b50da92efa89ecb2bc" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "once_cell", + "opentelemetry 0.27.0", + "percent-encoding", + "rand", + "thiserror", ] [[package]] @@ -2166,6 +2279,29 @@ dependencies = [ "thiserror", ] +[[package]] +name = "prost" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b0487d90e047de87f984913713b85c601c05609aad5b0df4b4573fbf69aa13f" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" +dependencies = [ + "anyhow", + "itertools 0.13.0", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "protobuf" version = "2.28.0" @@ -2744,7 +2880,7 @@ version = "0.9.34+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ - "indexmap", + "indexmap 2.5.0", "itoa", "ryu", "serde", @@ -3218,13 +3354,43 @@ version = "0.22.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "583c44c02ad26b0c3f3066fe629275e50627026c51ac2e595cca4c230ce1ce1d" dependencies = [ - "indexmap", + "indexmap 2.5.0", "serde", "serde_spanned", "toml_datetime", "winnow", ] +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64", + "bytes", + "h2", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "socket2", + "tokio", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.4.13" @@ -3233,9 +3399,13 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", + "indexmap 1.9.3", "pin-project", "pin-project-lite", + "rand", + "slab", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", @@ -3329,6 +3499,54 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9784ed4da7d921bc8df6963f8c80a0e4ce34ba6ba76668acadd3edbd985ff3b" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry 0.24.0", + "opentelemetry_sdk 0.24.1", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + +[[package]] +name = "tracing-opentelemetry" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97a971f6058498b5c0f1affa23e7ea202057a7301dbff68e968b2d578bcbd053" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry 0.27.0", + "opentelemetry_sdk 0.27.0", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + +[[package]] +name = "tracing-opentelemetry-instrumentation-sdk" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ece512a961221014e69a59c75561a2889835325c7520551ca721145809f9e40c" +dependencies = [ + "http", + "opentelemetry 0.27.0", + "tracing", + "tracing-opentelemetry 0.28.0", +] + [[package]] name = "tracing-serde" version = "0.1.3" @@ -3467,7 +3685,7 @@ version = "5.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "514a48569e4e21c86d0b84b5612b5e73c0b2cf09db63260134ba426d4e8ea714" dependencies = [ - "indexmap", + "indexmap 2.5.0", "serde", "serde_json", "utoipa-gen", @@ -4028,7 +4246,7 @@ dependencies = [ "crossbeam-utils", "displaydoc", "flate2", - "indexmap", + "indexmap 2.5.0", "memchr", "thiserror", "zopfli", diff --git a/server/Cargo.toml b/server/Cargo.toml index 0695d26a1..02466c8b0 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -67,7 +67,7 @@ ciborium = "0.2.2" uuid = { version = "1.11.0", features = ["v4"] } url = "2.5.3" opentelemetry = { version="0.24.0", features = ["metrics"] } -opentelemetry_sdk = { version = "0.24.0", features = ["metrics"] } +opentelemetry_sdk = { version = "0.24.1", features = ["metrics", "rt-tokio"] } opentelemetry-prometheus = {version = "0.17.0"} prometheus = {version = "0.13.4"} axum-otel-metrics = { version = "0.8.1"} @@ -106,10 +106,13 @@ hyper = {workspace=true} url = {workspace=true} opentelemetry = {workspace=true} opentelemetry-prometheus = {workspace=true} -opentelemetry_sdk = { workspace=true } +opentelemetry_sdk.workspace = true prometheus = {workspace=true} axum-otel-metrics = { workspace=true } metrics = {workspace=true} +tracing-opentelemetry = "0.25.0" +opentelemetry-otlp = "0.17.0" +axum-tracing-opentelemetry = "0.24.0" [dev-dependencies] tempfile = { workspace = true } diff --git a/server/src/main.rs b/server/src/main.rs index 18a2fa4e8..f6bf5d0ed 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,8 +1,12 @@ use std::path::PathBuf; +use anyhow::Result; use clap::Parser; +use opentelemetry::trace::{Tracer, TracerProvider as _}; +use opentelemetry_sdk::trace::{IdGenerator, Sampler, TracerProvider}; use service::Service; -use tracing::error; +use tracing::{error, span}; +use tracing_opentelemetry::OpenTelemetryLayer; use tracing_subscriber::{ fmt::{ self, @@ -10,6 +14,8 @@ use tracing_subscriber::{ }, layer::SubscriberExt, util::SubscriberInitExt, + Layer, + Registry, }; mod config; @@ -31,34 +37,69 @@ struct Cli { config: Option, } -fn setup_tracing(structured_logging: bool) { +fn get_env_filter() -> tracing_subscriber::EnvFilter { // RUST_LOG used to control logging level. - let env_filter_layer = - tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| { - tracing_subscriber::EnvFilter::default() - .add_directive(tracing_subscriber::filter::LevelFilter::INFO.into()) - }); + tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| { + tracing_subscriber::EnvFilter::default() + .add_directive(tracing_subscriber::filter::LevelFilter::INFO.into()) + }) +} +fn get_log_layer(structured_logging: bool) -> Box + Send + Sync + 'static> +where + S: for<'a> tracing_subscriber::registry::LookupSpan<'a>, + S: tracing::Subscriber, +{ + // Create an OTLP pipeline exporter for a `trace_demo` service. if structured_logging { - let log_layer = fmt::layer() - .event_format( - Format::default() - .json() - .with_span_list(false) - .flatten_event(true), - ) - .fmt_fields(JsonFields::default()); - tracing_subscriber::registry() - .with(env_filter_layer) - .with(log_layer) - .init(); - return; + return Box::new( + fmt::layer() + .event_format( + Format::default() + .json() + .with_span_list(false) + .flatten_event(true), + ) + .fmt_fields(JsonFields::default()), + ); } - tracing_subscriber::registry() + Box::new(tracing_subscriber::fmt::layer().compact()) +} + +fn setup_tracing(structured_logging: bool, tracing_enabled: bool) { + let env_filter_layer = get_env_filter(); + let log_layer = get_log_layer(structured_logging); + let subscriber = tracing_subscriber::Registry::default() .with(env_filter_layer) - .with(tracing_subscriber::fmt::layer().compact()) - .init(); + .with(log_layer); + + if !tracing_enabled { + if let Err(e) = tracing::subscriber::set_global_default(subscriber) { + eprintln!("Logger/tracer was already initted, continuing: {}", e); + } + return; + } + + let tracer_provider = opentelemetry_otlp::new_pipeline() + .tracing() + .with_trace_config( + opentelemetry_sdk::trace::Config::default() + .with_sampler(Sampler::AlwaysOn) + .with_resource(opentelemetry_sdk::Resource::new(vec![ + opentelemetry::KeyValue::new("service.name", "indexify"), + ])), + ) + .with_exporter(opentelemetry_otlp::new_exporter().tonic()) + .install_batch(opentelemetry_sdk::runtime::Tokio) + .unwrap(); + + let tracer = tracer_provider.tracer("root"); + + // Create a layer with the configured tracer + let otel_layer = OpenTelemetryLayer::new(tracer); + + tracing::subscriber::set_global_default(subscriber.with(otel_layer)).unwrap(); } #[tokio::main] @@ -68,7 +109,8 @@ async fn main() { Some(path) => config::ServerConfig::from_path(path.to_str().unwrap()).unwrap(), None => config::ServerConfig::default(), }; - setup_tracing(!cli.dev); + + setup_tracing(!cli.dev, true); let service = Service::new(config).await; if let Err(err) = service { diff --git a/server/src/routes.rs b/server/src/routes.rs index d7e887a27..64e8e6988 100644 --- a/server/src/routes.rs +++ b/server/src/routes.rs @@ -21,6 +21,7 @@ use axum::{ Router, }; use axum_otel_metrics::HttpMetricsLayerBuilder; +use axum_tracing_opentelemetry; use blob_store::PutResult; use data_model::ExecutorId; use futures::StreamExt; @@ -46,7 +47,7 @@ use tower_http::{ cors::{Any, CorsLayer}, trace::TraceLayer, }; -use tracing::{error, info, info_span}; +use tracing::{error, info, info_span, Span}; use utoipa::{OpenApi, ToSchema}; use utoipa_swagger_ui::SwaggerUi; @@ -207,7 +208,7 @@ pub fn create_routes(route_state: RouteState) -> Router { "/internal/namespaces/:namespace/compute_graphs/:compute_graph/invocations/:invocation_id/ctx", get(get_ctx_state_key).with_state(route_state.clone()), ) - + .layer(axum_tracing_opentelemetry::middleware::OtelAxumLayer::default()) .layer( TraceLayer::new_for_http() .make_span_with(|req: &Request| { diff --git a/server/state_store/src/lib.rs b/server/state_store/src/lib.rs index 577f93a2b..f0e3955ad 100644 --- a/server/state_store/src/lib.rs +++ b/server/state_store/src/lib.rs @@ -168,9 +168,20 @@ impl IndexifyState { self.system_tasks_rx.clone() } + #[tracing::instrument( + skip(self, request), + fields( + request_type = request.payload.as_ref(), + state_change_len = request.state_changes_processed.len(), + otel.name = format!("state_machine.write {}", request.payload.as_ref()) + ) + )] pub async fn write(&self, request: StateMachineUpdateRequest) -> Result<()> { let timer_kv = &[KeyValue::new("request", request.payload.to_string())]; - tracing::info!("writing state machine update request: {}", request.payload,); + tracing::info!( + "writing state machine update request: {}", + request.payload.as_ref(), + ); let _timer = Timer::start_with_labels(&self.metrics.state_write, timer_kv); let mut allocated_tasks_by_executor = Vec::new(); let mut tasks_finalized: HashMap> = HashMap::new(); diff --git a/server/state_store/src/requests.rs b/server/state_store/src/requests.rs index 6933e8546..05dcaa07d 100644 --- a/server/state_store/src/requests.rs +++ b/server/state_store/src/requests.rs @@ -11,13 +11,14 @@ use data_model::{ TaskDiagnostics, TaskId, }; +use strum::AsRefStr; pub struct StateMachineUpdateRequest { pub payload: RequestPayload, pub state_changes_processed: Vec, } -#[derive(strum::Display)] +#[derive(AsRefStr, strum::Display)] pub enum RequestPayload { InvokeComputeGraph(InvokeComputeGraphRequest), ReplayComputeGraph(ReplayComputeGraphRequest),