diff --git a/server/Cargo.lock b/server/Cargo.lock index c81dd21e2..64d5e1945 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -264,18 +264,20 @@ dependencies = [ [[package]] name = "axum-otel-metrics" -version = "0.8.1" +version = "0.9.0-alpha.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11b5bd67776dca9326650fc2e2ddd15ddaca16a3c8e80a9a874ba111afab82bd" +checksum = "f60a607562cbef52413f4c2d66b6ac786b161920b183007dabbe3ba5c0b416c8" dependencies = [ "axum", "futures-util", "http", "http-body", - "opentelemetry 0.22.0", - "opentelemetry-prometheus 0.15.0", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-otlp", + "opentelemetry-prometheus", "opentelemetry-semantic-conventions", - "opentelemetry_sdk 0.22.1", + "opentelemetry_sdk", "pin-project-lite", "prometheus", "tower 0.4.13", @@ -300,6 +302,24 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-tracing-opentelemetry" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "561a0967337dfeaf3e28700d23e791712cd7d5e97ab335e0f4a0c3ac62e6ece0" +dependencies = [ + "axum", + "futures-core", + "futures-util", + "http", + "opentelemetry", + "pin-project-lite", + "tower 0.5.1", + "tracing", + "tracing-opentelemetry", + "tracing-opentelemetry-instrumentation-sdk", +] + [[package]] name = "backtrace" version = "0.3.73" @@ -364,7 +384,7 @@ dependencies = [ "futures", "metrics", "object_store", - "opentelemetry 0.24.0", + "opentelemetry", "reqwest", "serde", "sha2", @@ -1085,7 +1105,7 @@ dependencies = [ "futures-core", "futures-sink", "http", - "indexmap", + "indexmap 2.5.0", "slab", "tokio", "tokio-util", @@ -1102,6 +1122,12 @@ dependencies = [ "crunchy", ] +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + [[package]] name = "hashbrown" version = "0.14.5" @@ -1218,11 +1244,24 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-util" -version = "0.1.7" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9" +checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4" dependencies = [ "bytes", "futures-channel", @@ -1233,7 +1272,6 @@ dependencies = [ "pin-project-lite", "socket2", "tokio", - "tower 0.4.13", "tower-service", "tracing", ] @@ -1415,6 +1453,7 @@ dependencies = [ "axum", "axum-otel-metrics", "axum-server", + "axum-tracing-opentelemetry", "blob_store", "bytes", "ciborium", @@ -1429,9 +1468,10 @@ dependencies = [ "metrics", "nanoid", "object_store", - "opentelemetry 0.24.0", - "opentelemetry-prometheus 0.17.0", - "opentelemetry_sdk 0.24.1", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry-prometheus", + "opentelemetry_sdk", "prometheus", "rand", "serde", @@ -1443,6 +1483,7 @@ dependencies = [ "tokio", "tower-http", "tracing", + "tracing-opentelemetry", "tracing-subscriber", "url", "utoipa", @@ -1470,6 +1511,16 @@ dependencies = [ "serde_json", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.5.0" @@ -1477,7 +1528,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.14.5", "serde", ] @@ -1677,9 +1728,9 @@ dependencies = [ "axum-otel-metrics", "data_model", "once_cell", - "opentelemetry 0.24.0", - "opentelemetry-prometheus 0.17.0", - "opentelemetry_sdk 0.24.1", + "opentelemetry", + "opentelemetry-prometheus", + "opentelemetry_sdk", "pin-project-lite", "prometheus", "serde", @@ -1903,9 +1954,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "opentelemetry" -version = "0.22.0" +version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "900d57987be3f2aeb70d385fff9b27fb74c5723cc9a52d904d4f9c807a0667bf" +checksum = "4c365a63eec4f55b7efeceb724f1336f26a9cf3427b70e59e2cd2a5b947fba96" dependencies = [ "futures-core", "futures-sink", @@ -1913,34 +1964,39 @@ dependencies = [ "once_cell", "pin-project-lite", "thiserror", - "urlencoding", ] [[package]] -name = "opentelemetry" -version = "0.24.0" +name = "opentelemetry-http" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c365a63eec4f55b7efeceb724f1336f26a9cf3427b70e59e2cd2a5b947fba96" +checksum = "ad31e9de44ee3538fb9d64fe3376c1362f406162434609e79aea2a41a0af78ab" dependencies = [ - "futures-core", - "futures-sink", - "js-sys", - "once_cell", - "pin-project-lite", - "thiserror", + "async-trait", + "bytes", + "http", + "opentelemetry", + "reqwest", ] [[package]] -name = "opentelemetry-prometheus" -version = "0.15.0" +name = "opentelemetry-otlp" +version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30bbcf6341cab7e2193e5843f0ac36c446a5b3fccb28747afaeda17996dcd02e" +checksum = "6b925a602ffb916fb7421276b86756027b37ee708f9dce2dbdcc51739f07e727" dependencies = [ - "once_cell", - "opentelemetry 0.22.0", - "opentelemetry_sdk 0.22.1", - "prometheus", - "protobuf", + "async-trait", + "futures-core", + "http", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost", + "reqwest", + "thiserror", + "tokio", + "tonic", ] [[package]] @@ -1950,39 +2006,29 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cc4191ce34aa274621861a7a9d68dbcf618d5b6c66b10081631b61fd81fbc015" dependencies = [ "once_cell", - "opentelemetry 0.24.0", - "opentelemetry_sdk 0.24.1", + "opentelemetry", + "opentelemetry_sdk", "prometheus", "protobuf", ] [[package]] -name = "opentelemetry-semantic-conventions" -version = "0.14.0" +name = "opentelemetry-proto" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9ab5bd6c42fb9349dcf28af2ba9a0667f697f9bdcca045d39f2cec5543e2910" +checksum = "30ee9f20bff9c984511a02f082dc8ede839e4a9bf15cc2487c8d6fea5ad850d9" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost", + "tonic", +] [[package]] -name = "opentelemetry_sdk" -version = "0.22.1" +name = "opentelemetry-semantic-conventions" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e90c7113be649e31e9a0f8b5ee24ed7a16923b322c3c5ab6367469c049d6b7e" -dependencies = [ - "async-trait", - "crossbeam-channel", - "futures-channel", - "futures-executor", - "futures-util", - "glob", - "once_cell", - "opentelemetry 0.22.0", - "ordered-float", - "percent-encoding", - "rand", - "thiserror", - "tokio", - "tokio-stream", -] +checksum = "1cefe0543875379e47eb5f1e68ff83f45cc41366a92dfd0d073d513bf68e9a05" [[package]] name = "opentelemetry_sdk" @@ -1996,20 +2042,13 @@ dependencies = [ "futures-util", "glob", "once_cell", - "opentelemetry 0.24.0", + "opentelemetry", "percent-encoding", "rand", "serde_json", "thiserror", -] - -[[package]] -name = "ordered-float" -version = "4.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83e7ccb95e240b7c9506a3d544f10d935e142cc90b0a1d56954fb44d89ad6b97" -dependencies = [ - "num-traits", + "tokio", + "tokio-stream", ] [[package]] @@ -2166,6 +2205,29 @@ dependencies = [ "thiserror", ] +[[package]] +name = "prost" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b0487d90e047de87f984913713b85c601c05609aad5b0df4b4573fbf69aa13f" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" +dependencies = [ + "anyhow", + "itertools 0.13.0", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "protobuf" version = "2.28.0" @@ -2393,6 +2455,7 @@ checksum = "a77c62af46e79de0a562e1a9849205ffcb7fc1238876e9bd743357570e04046f" dependencies = [ "base64", "bytes", + "futures-channel", "futures-core", "futures-util", "h2", @@ -2744,7 +2807,7 @@ version = "0.9.34+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ - "indexmap", + "indexmap 2.5.0", "itoa", "ryu", "serde", @@ -2909,7 +2972,7 @@ dependencies = [ "indexify_utils", "metrics", "object_store", - "opentelemetry 0.24.0", + "opentelemetry", "rocksdb", "serde", "serde_json", @@ -3218,13 +3281,43 @@ version = "0.22.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "583c44c02ad26b0c3f3066fe629275e50627026c51ac2e595cca4c230ce1ce1d" dependencies = [ - "indexmap", + "indexmap 2.5.0", "serde", "serde_spanned", "toml_datetime", "winnow", ] +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64", + "bytes", + "h2", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "socket2", + "tokio", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.4.13" @@ -3233,9 +3326,13 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", + "indexmap 1.9.3", "pin-project", "pin-project-lite", + "rand", + "slab", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", @@ -3329,6 +3426,36 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9784ed4da7d921bc8df6963f8c80a0e4ce34ba6ba76668acadd3edbd985ff3b" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + +[[package]] +name = "tracing-opentelemetry-instrumentation-sdk" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8159fbb3bd93e20342e7e6ef45b96c5d122cd88043f37ad0e4b5bb052f0f4483" +dependencies = [ + "http", + "opentelemetry", + "tracing", + "tracing-opentelemetry", +] + [[package]] name = "tracing-serde" version = "0.1.3" @@ -3437,12 +3564,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "urlencoding" -version = "2.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" - [[package]] name = "utf16_iter" version = "1.0.5" @@ -3467,7 +3588,7 @@ version = "5.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "514a48569e4e21c86d0b84b5612b5e73c0b2cf09db63260134ba426d4e8ea714" dependencies = [ - "indexmap", + "indexmap 2.5.0", "serde", "serde_json", "utoipa-gen", @@ -4028,7 +4149,7 @@ dependencies = [ "crossbeam-utils", "displaydoc", "flate2", - "indexmap", + "indexmap 2.5.0", "memchr", "thiserror", "zopfli", diff --git a/server/Cargo.toml b/server/Cargo.toml index 0695d26a1..b52e4073b 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -66,11 +66,13 @@ pin-project = "1.1.7" ciborium = "0.2.2" uuid = { version = "1.11.0", features = ["v4"] } url = "2.5.3" -opentelemetry = { version="0.24.0", features = ["metrics"] } -opentelemetry_sdk = { version = "0.24.0", features = ["metrics"] } -opentelemetry-prometheus = {version = "0.17.0"} +opentelemetry = { version="0.24", features = ["metrics", "trace"] } +opentelemetry_sdk = { version = "0.24.1", features = ["rt-tokio", "metrics", "trace"] } +opentelemetry-otlp = { version = "0.17", features = ["tokio", "metrics", "trace"] } +opentelemetry-prometheus = { version = "0.17" } prometheus = {version = "0.13.4"} -axum-otel-metrics = { version = "0.8.1"} +axum-otel-metrics = { version = "0.9.0-alpha.2" } +tracing-opentelemetry = "0.25" [dependencies] async-stream = {workspace = true} @@ -106,10 +108,13 @@ hyper = {workspace=true} url = {workspace=true} opentelemetry = {workspace=true} opentelemetry-prometheus = {workspace=true} -opentelemetry_sdk = { workspace=true } +opentelemetry_sdk.workspace = true prometheus = {workspace=true} axum-otel-metrics = { workspace=true } metrics = {workspace=true} +tracing-opentelemetry = {workspace=true} +opentelemetry-otlp = {workspace=true} +axum-tracing-opentelemetry = "0.19.0" [dev-dependencies] tempfile = { workspace = true } diff --git a/server/src/main.rs b/server/src/main.rs index 18a2fa4e8..4faced6cb 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,6 +1,8 @@ -use std::path::PathBuf; +use std::{env, path::PathBuf}; use clap::Parser; +use opentelemetry::{global, trace::TracerProvider} +use opentelemetry_sdk::trace::Sampler; use service::Service; use tracing::error; use tracing_subscriber::{ @@ -9,7 +11,7 @@ use tracing_subscriber::{ format::{Format, JsonFields}, }, layer::SubscriberExt, - util::SubscriberInitExt, + Layer, }; mod config; @@ -31,34 +33,76 @@ struct Cli { config: Option, } -fn setup_tracing(structured_logging: bool) { +fn get_env_filter() -> tracing_subscriber::EnvFilter { // RUST_LOG used to control logging level. - let env_filter_layer = - tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| { - tracing_subscriber::EnvFilter::default() - .add_directive(tracing_subscriber::filter::LevelFilter::INFO.into()) - }); + tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| { + tracing_subscriber::EnvFilter::default() + .add_directive(tracing_subscriber::filter::LevelFilter::INFO.into()) + }) +} +fn get_log_layer(structured_logging: bool) -> Box + Send + Sync + 'static> +where + S: for<'a> tracing_subscriber::registry::LookupSpan<'a>, + S: tracing::Subscriber, +{ + // Create an OTLP pipeline exporter for a `trace_demo` service. if structured_logging { - let log_layer = fmt::layer() - .event_format( - Format::default() - .json() - .with_span_list(false) - .flatten_event(true), - ) - .fmt_fields(JsonFields::default()); - tracing_subscriber::registry() - .with(env_filter_layer) - .with(log_layer) - .init(); - return; + return Box::new( + fmt::layer() + .event_format( + Format::default() + .json() + .with_span_list(false) + .flatten_event(true), + ) + .fmt_fields(JsonFields::default()), + ); } - tracing_subscriber::registry() + Box::new(tracing_subscriber::fmt::layer().compact()) +} + +fn setup_tracing(structured_logging: bool, tracing_enabled: bool) { + let env_filter_layer = get_env_filter(); + let log_layer = get_log_layer(structured_logging); + let subscriber = tracing_subscriber::Registry::default() .with(env_filter_layer) - .with(tracing_subscriber::fmt::layer().compact()) - .init(); + .with(log_layer); + + if !tracing_enabled { + if let Err(e) = tracing::subscriber::set_global_default(subscriber) { + eprintln!("Logger was already iniated, continuing: {}", e); + } + return; + } + + let tracer_provider = opentelemetry_otlp::new_pipeline() + .tracing() + .with_trace_config( + opentelemetry_sdk::trace::Config::default() + .with_sampler(Sampler::AlwaysOn) + .with_resource(opentelemetry_sdk::Resource::new(vec![ + opentelemetry::KeyValue::new("service.name", "indexify-server"), + opentelemetry::KeyValue::new("service.version", env!("CARGO_PKG_VERSION")), + ])), + ) + .with_exporter(opentelemetry_otlp::new_exporter().tonic()) + .install_batch(opentelemetry_sdk::runtime::Tokio) + .unwrap(); + + global::set_tracer_provider(tracer_provider.clone()); + let tracer = tracer_provider.tracer("tracing-otel-subscriber"); + + // Create a layer with the configured tracer + let otel_layer = tracing_opentelemetry::layer() + .with_error_records_to_exceptions(true) + .with_tracer(tracer); + global::set_tracer_provider(tracer_provider.clone()); + + tracing::subscriber::set_global_default(subscriber.with(otel_layer)).unwrap(); + + // TODO: opentelemetry::global::shutdown_tracer_provider(); } #[tokio::main] @@ -68,7 +112,8 @@ async fn main() { Some(path) => config::ServerConfig::from_path(path.to_str().unwrap()).unwrap(), None => config::ServerConfig::default(), }; - setup_tracing(!cli.dev); + + setup_tracing(!cli.dev, true); let service = Service::new(config).await; if let Err(err) = service { diff --git a/server/src/routes.rs b/server/src/routes.rs index d7e887a27..64e8e6988 100644 --- a/server/src/routes.rs +++ b/server/src/routes.rs @@ -21,6 +21,7 @@ use axum::{ Router, }; use axum_otel_metrics::HttpMetricsLayerBuilder; +use axum_tracing_opentelemetry; use blob_store::PutResult; use data_model::ExecutorId; use futures::StreamExt; @@ -46,7 +47,7 @@ use tower_http::{ cors::{Any, CorsLayer}, trace::TraceLayer, }; -use tracing::{error, info, info_span}; +use tracing::{error, info, info_span, Span}; use utoipa::{OpenApi, ToSchema}; use utoipa_swagger_ui::SwaggerUi; @@ -207,7 +208,7 @@ pub fn create_routes(route_state: RouteState) -> Router { "/internal/namespaces/:namespace/compute_graphs/:compute_graph/invocations/:invocation_id/ctx", get(get_ctx_state_key).with_state(route_state.clone()), ) - + .layer(axum_tracing_opentelemetry::middleware::OtelAxumLayer::default()) .layer( TraceLayer::new_for_http() .make_span_with(|req: &Request| { diff --git a/server/state_store/src/lib.rs b/server/state_store/src/lib.rs index 577f93a2b..f0e3955ad 100644 --- a/server/state_store/src/lib.rs +++ b/server/state_store/src/lib.rs @@ -168,9 +168,20 @@ impl IndexifyState { self.system_tasks_rx.clone() } + #[tracing::instrument( + skip(self, request), + fields( + request_type = request.payload.as_ref(), + state_change_len = request.state_changes_processed.len(), + otel.name = format!("state_machine.write {}", request.payload.as_ref()) + ) + )] pub async fn write(&self, request: StateMachineUpdateRequest) -> Result<()> { let timer_kv = &[KeyValue::new("request", request.payload.to_string())]; - tracing::info!("writing state machine update request: {}", request.payload,); + tracing::info!( + "writing state machine update request: {}", + request.payload.as_ref(), + ); let _timer = Timer::start_with_labels(&self.metrics.state_write, timer_kv); let mut allocated_tasks_by_executor = Vec::new(); let mut tasks_finalized: HashMap> = HashMap::new(); diff --git a/server/state_store/src/requests.rs b/server/state_store/src/requests.rs index 6933e8546..05dcaa07d 100644 --- a/server/state_store/src/requests.rs +++ b/server/state_store/src/requests.rs @@ -11,13 +11,14 @@ use data_model::{ TaskDiagnostics, TaskId, }; +use strum::AsRefStr; pub struct StateMachineUpdateRequest { pub payload: RequestPayload, pub state_changes_processed: Vec, } -#[derive(strum::Display)] +#[derive(AsRefStr, strum::Display)] pub enum RequestPayload { InvokeComputeGraph(InvokeComputeGraphRequest), ReplayComputeGraph(ReplayComputeGraphRequest),