diff --git a/Cargo.lock b/Cargo.lock index e2c18b2a..8191f980 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -160,7 +160,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.89", ] [[package]] @@ -171,7 +171,7 @@ checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.89", ] [[package]] @@ -434,7 +434,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn", + "syn 2.0.89", ] [[package]] @@ -498,6 +498,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.5" @@ -553,7 +562,7 @@ dependencies = [ "globwalk", "humantime", "inventory", - "itertools", + "itertools 0.12.1", "lazy-regex", "linked-hash-map", "once_cell", @@ -573,11 +582,11 @@ checksum = "01091e28d1f566c8b31b67948399d2efd6c0a8f6228a9785519ed7b73f7f0aef" dependencies = [ "cucumber-expressions", "inflections", - "itertools", + "itertools 0.12.1", "proc-macro2", "quote", "regex", - "syn", + "syn 2.0.89", "synthez", ] @@ -595,6 +604,19 @@ dependencies = [ "regex-syntax 0.7.5", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.6.0" @@ -619,7 +641,10 @@ dependencies = [ "kuksa", "kuksa-common", "lazy_static", - "prost", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry-semantic-conventions", + "prost 0.12.6", "prost-types", "regex", "sd-notify", @@ -629,10 +654,11 @@ dependencies = [ "thiserror 1.0.69", "tokio", "tokio-stream", - "tonic", + "tonic 0.11.0", "tonic-mock", "tonic-reflection", "tracing", + "tracing-opentelemetry", "tracing-subscriber", "uuid", "vergen", @@ -650,22 +676,22 @@ dependencies = [ "kuksa-common", "kuksa-sdv", "linefeed", - "prost", + "prost 0.12.6", "prost-types", "regex", "tokio", "tokio-stream", - "tonic", + "tonic 0.11.0", ] [[package]] name = "databroker-proto" version = "0.6.0-dev.0" dependencies = [ - "prost", + "prost 0.12.6", "prost-types", "protobuf-src", - "tonic", + "tonic 0.11.0", "tonic-build", ] @@ -686,7 +712,7 @@ checksum = "5f33878137e4dafd7fa914ad4e259e18a4e8e532b9617a2d0150262bf53abfce" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.89", ] [[package]] @@ -738,7 +764,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.89", ] [[package]] @@ -892,7 +918,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.89", ] [[package]] @@ -970,7 +996,7 @@ dependencies = [ "quote", "serde", "serde_json", - "syn", + "syn 2.0.89", "textwrap", "thiserror 1.0.69", "typed-builder", @@ -1254,7 +1280,7 @@ checksum = "999ce923619f88194171a67fb3e6d613653b8d4d6078b529b15a765da0edcc17" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.89", ] [[package]] @@ -1801,7 +1827,7 @@ checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.89", ] [[package]] @@ -1879,6 +1905,15 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.12.1" @@ -1947,7 +1982,7 @@ dependencies = [ "kuksa-common", "tokio", "tokio-stream", - "tonic", + "tonic 0.11.0", ] [[package]] @@ -1958,7 +1993,7 @@ dependencies = [ "http", "tokio", "tokio-stream", - "tonic", + "tonic 0.11.0", ] [[package]] @@ -1970,7 +2005,7 @@ dependencies = [ "kuksa-common", "tokio", "tokio-stream", - "tonic", + "tonic 0.11.0", ] [[package]] @@ -1993,7 +2028,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "syn", + "syn 2.0.89", ] [[package]] @@ -2249,6 +2284,94 @@ version = "1.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" +[[package]] +name = "opentelemetry" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f4b8347cc26099d3aeee044065ecc3ae11469796b4d65d065a23a584ed92a6f" +dependencies = [ + "opentelemetry_api", + "opentelemetry_sdk", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8af72d59a4484654ea8eb183fea5ae4eb6a41d7ac3e3bae5f4d2a282a3a7d3ca" +dependencies = [ + "async-trait", + "futures", + "futures-util", + "http", + "opentelemetry", + "opentelemetry-proto", + "prost 0.11.9", + "thiserror 1.0.69", + "tokio", + "tonic 0.8.3", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "045f8eea8c0fa19f7d48e7bc3128a39c2e5c533d5c61298c548dfefc1064474c" +dependencies = [ + "futures", + "futures-util", + "opentelemetry", + "prost 0.11.9", + "tonic 0.8.3", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24e33428e6bf08c6f7fcea4ddb8e358fab0fe48ab877a87c70c6ebe20f673ce5" +dependencies = [ + "opentelemetry", +] + +[[package]] +name = "opentelemetry_api" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed41783a5bf567688eb38372f2b7a8530f5a607a4b49d38dd7573236c23ca7e2" +dependencies = [ + "fnv", + "futures-channel", + "futures-util", + "indexmap 1.9.3", + "once_cell", + "pin-project-lite", + "thiserror 1.0.69", + "urlencoding", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b3a2a91fdbfdd4d212c0dcc2ab540de2c2bcbbd90be17de7a7daf8822d010c1" +dependencies = [ + "async-trait", + "crossbeam-channel", + "dashmap", + "fnv", + "futures-channel", + "futures-executor", + "futures-util", + "once_cell", + "opentelemetry_api", + "percent-encoding", + "rand", + "thiserror 1.0.69", + "tokio", + "tokio-stream", +] + [[package]] name = "overload" version = "0.1.1" @@ -2386,7 +2509,7 @@ checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.89", ] [[package]] @@ -2423,7 +2546,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64d1ec885c64d0457d564db4ec299b2dae3f9c02808b8ad9c3a089c591b18033" dependencies = [ "proc-macro2", - "syn", + "syn 2.0.89", ] [[package]] @@ -2441,6 +2564,16 @@ version = "28.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "744a264d26b88a6a7e37cbad97953fa233b94d585236310bcbc88474b4092d79" +[[package]] +name = "prost" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" +dependencies = [ + "bytes", + "prost-derive 0.11.9", +] + [[package]] name = "prost" version = "0.12.6" @@ -2448,7 +2581,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.12.6", ] [[package]] @@ -2459,19 +2592,32 @@ checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" dependencies = [ "bytes", "heck 0.5.0", - "itertools", + "itertools 0.12.1", "log", "multimap", "once_cell", "petgraph", "prettyplease", - "prost", + "prost 0.12.6", "prost-types", "regex", - "syn", + "syn 2.0.89", "tempfile", ] +[[package]] +name = "prost-derive" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" +dependencies = [ + "anyhow", + "itertools 0.10.5", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "prost-derive" version = "0.12.6" @@ -2479,10 +2625,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools", + "itertools 0.12.1", "proc-macro2", "quote", - "syn", + "syn 2.0.89", ] [[package]] @@ -2491,7 +2637,7 @@ version = "0.12.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9091c90b0a32608e984ff2fa4091273cbdd755d54935c51d520887f4a1dbd5b0" dependencies = [ - "prost", + "prost 0.12.6", ] [[package]] @@ -2757,7 +2903,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn", + "syn 2.0.89", ] [[package]] @@ -2786,7 +2932,7 @@ checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.89", ] [[package]] @@ -2924,7 +3070,7 @@ checksum = "0eb01866308440fc64d6c44d9e86c5cc17adfe33c4d6eed55da9145044d0ffc1" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.89", ] [[package]] @@ -2976,6 +3122,17 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.89" @@ -3001,7 +3158,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.89", ] [[package]] @@ -3010,7 +3167,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3d2c2202510a1e186e63e596d9318c91a8cbe85cd1a56a7be0c333e5f59ec8d" dependencies = [ - "syn", + "syn 2.0.89", "synthez-codegen", "synthez-core", ] @@ -3021,7 +3178,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f724aa6d44b7162f3158a57bccd871a77b39a4aef737e01bcdff41f4772c7746" dependencies = [ - "syn", + "syn 2.0.89", "synthez-core", ] @@ -3034,7 +3191,7 @@ dependencies = [ "proc-macro2", "quote", "sealed", - "syn", + "syn 2.0.89", ] [[package]] @@ -3110,7 +3267,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.89", ] [[package]] @@ -3121,7 +3278,7 @@ checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.89", ] [[package]] @@ -3227,7 +3384,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.89", ] [[package]] @@ -3278,6 +3435,38 @@ dependencies = [ "tokio", ] +[[package]] +name = "tonic" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.13.1", + "bytes", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost 0.11.9", + "prost-derive 0.11.9", + "tokio", + "tokio-stream", + "tokio-util", + "tower", + "tower-layer", + "tower-service", + "tracing", + "tracing-futures", +] + [[package]] name = "tonic" version = "0.11.0" @@ -3296,7 +3485,7 @@ dependencies = [ "hyper-timeout", "percent-encoding", "pin-project", - "prost", + "prost 0.12.6", "rustls-pemfile", "rustls-pki-types", "tokio", @@ -3318,7 +3507,7 @@ dependencies = [ "proc-macro2", "prost-build", "quote", - "syn", + "syn 2.0.89", ] [[package]] @@ -3331,8 +3520,8 @@ dependencies = [ "futures", "http", "http-body", - "prost", - "tonic", + "prost 0.12.6", + "tonic 0.11.0", ] [[package]] @@ -3341,11 +3530,11 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "548c227bd5c0fae5925812c4ec6c66ffcfced23ea370cb823f4d18f0fc1cb6a7" dependencies = [ - "prost", + "prost 0.12.6", "prost-types", "tokio", "tokio-stream", - "tonic", + "tonic 0.11.0", ] [[package]] @@ -3400,7 +3589,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.89", ] [[package]] @@ -3410,6 +3599,42 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-futures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" +dependencies = [ + "pin-project", + "tracing", +] + +[[package]] +name = "tracing-log" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f751112709b4e791d8ce53e32c4ed2d353565a795ce84da2285393f41557bdf2" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-opentelemetry" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00a39dcf9bfc1742fa4d6215253b33a6e474be78275884c216fc2a06267b3600" +dependencies = [ + "once_cell", + "opentelemetry", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", ] [[package]] @@ -3470,7 +3695,7 @@ checksum = "29a3151c41d0b13e3d011f98adc24434560ef06673a155a6c7f66b9879eecce2" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.89", ] [[package]] @@ -3529,6 +3754,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf-8" version = "0.7.6" @@ -3562,6 +3793,12 @@ dependencies = [ "getrandom 0.2.15", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "vergen" version = "8.3.2" @@ -3636,7 +3873,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn", + "syn 2.0.89", "wasm-bindgen-shared", ] @@ -3658,7 +3895,7 @@ checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.89", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3832,7 +4069,7 @@ checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.89", "synstructure", ] @@ -3854,7 +4091,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.89", ] [[package]] @@ -3874,7 +4111,7 @@ checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.89", "synstructure", ] @@ -3903,5 +4140,5 @@ checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.89", ] diff --git a/databroker/Cargo.toml b/databroker/Cargo.toml index 2cf7f8b6..81f28182 100644 --- a/databroker/Cargo.toml +++ b/databroker/Cargo.toml @@ -68,6 +68,12 @@ axum = { version = "0.6.20", optional = true, features = ["ws"] } chrono = { version = "0.4.31", optional = true, features = ["std"] } uuid = { version = "1.4.1", optional = true, features = ["v4"] } +# OTEL +opentelemetry = { version = "0.19.0", optional = true, features = ["rt-tokio", "trace"] } +opentelemetry-otlp = { version="0.12.0", optional = true, features = ["tonic", "metrics"] } +opentelemetry-semantic-conventions = { version="0.11.0", optional = true } +tracing-opentelemetry = { version="0.19.0", optional = true } + # systemd related dependency, only relevant on linux systems [target.'cfg(target_os = "linux")'.dependencies] sd-notify = "0.4.1" @@ -78,6 +84,7 @@ tls = ["tonic/tls"] jemalloc = ["dep:jemallocator"] viss = ["dep:axum", "dep:chrono", "dep:uuid"] libtest = [] +otel = ["dep:chrono", "dep:opentelemetry", "dep:opentelemetry-otlp", "dep:opentelemetry-semantic-conventions", "dep:tracing-opentelemetry"] [build-dependencies] anyhow = "1.0" diff --git a/databroker/src/broker.rs b/databroker/src/broker.rs index 538c7b5a..35fa3c21 100644 --- a/databroker/src/broker.rs +++ b/databroker/src/broker.rs @@ -234,6 +234,7 @@ pub struct EntryUpdate { } impl Entry { + #[cfg_attr(feature="otel",tracing::instrument(name="broker_diff", skip(self, update), fields(timestamp=chrono::Utc::now().to_string())))] pub fn diff(&self, mut update: EntryUpdate) -> EntryUpdate { if let Some(datapoint) = &update.datapoint { if self.metadata.change_type != ChangeType::Continuous { @@ -259,6 +260,7 @@ impl Entry { Ok(()) } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_validate", skip(self, update), fields(timestamp=chrono::Utc::now().to_string())))] pub fn validate(&self, update: &EntryUpdate) -> Result<(), UpdateError> { if let Some(datapoint) = &update.datapoint { self.validate_value(&datapoint.value)?; @@ -280,6 +282,7 @@ impl Entry { * DataType is VSS type, where we have also smaller type based on 8/16 bits * That we do not have for DataValue */ + #[cfg_attr(feature="otel", tracing::instrument(name="broker_validate_allowed_type", skip(self, allowed), fields(timestamp=chrono::Utc::now().to_string())))] pub fn validate_allowed_type(&self, allowed: &Option) -> Result<(), UpdateError> { if let Some(allowed_values) = allowed { match (allowed_values, &self.metadata.data_type) { @@ -319,6 +322,7 @@ impl Entry { } } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_validate_allowed", skip(self, value), fields(timestamp=chrono::Utc::now().to_string())))] fn validate_allowed(&self, value: &DataValue) -> Result<(), UpdateError> { // check if allowed value if let Some(allowed_values) = &self.metadata.allowed { @@ -471,6 +475,7 @@ impl Entry { Ok(()) } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_validate_value", skip(self, value), fields(timestamp=chrono::Utc::now().to_string())))] fn validate_value(&self, value: &DataValue) -> Result<(), UpdateError> { // Not available is always valid if value == &DataValue::NotAvailable { @@ -706,6 +711,7 @@ impl Entry { } } + #[cfg_attr(feature="otel", tracing::instrument(name="apply_lag_after_execute", skip(self), fields(timestamp=chrono::Utc::now().to_string())))] pub fn apply_lag_after_execute(&mut self) { self.lag_datapoint = self.datapoint.clone(); } @@ -749,10 +755,12 @@ impl Subscriptions { self.query_subscriptions.push(subscription) } + #[cfg_attr(feature="otel", tracing::instrument(name = "broker_add_change_subscription",skip(self, subscription), fields(timestamp=chrono::Utc::now().to_string())))] pub fn add_change_subscription(&mut self, subscription: ChangeSubscription) { self.change_subscriptions.push(subscription) } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_Subscriptions_notify", skip(self, changed, db)))] pub async fn notify( &self, changed: Option<&HashMap>>, @@ -799,6 +807,7 @@ impl Subscriptions { self.change_subscriptions.clear(); } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_cleanup", skip(self), fields(timestamp=chrono::Utc::now().to_string())))] pub fn cleanup(&mut self) { self.query_subscriptions.retain(|sub| { if sub.sender.is_closed() { @@ -835,6 +844,7 @@ impl Subscriptions { } impl ChangeSubscription { + #[cfg_attr(feature="otel", tracing::instrument(name="broker_ChangeSubscription_notify", skip(self, changed, db)))] async fn notify( &self, changed: Option<&HashMap>>, @@ -959,6 +969,7 @@ impl ChangeSubscription { } impl QuerySubscription { + #[cfg_attr(feature="otel", tracing::instrument(name="broker_find_in_db_and_add", skip(self, name, db, input), fields(timestamp=chrono::Utc::now().to_string())))] fn find_in_db_and_add( &self, name: &String, @@ -987,6 +998,7 @@ impl QuerySubscription { } } } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_check_if_changes_match", skip(query, changed_origin, db), fields(timestamp=chrono::Utc::now().to_string())))] fn check_if_changes_match( query: &CompiledQuery, changed_origin: Option<&HashMap>>, @@ -1022,6 +1034,7 @@ impl QuerySubscription { } false } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_generate_input_list", skip(self, query, db, input), fields(timestamp=chrono::Utc::now().to_string())))] fn generate_input_list( &self, query: &CompiledQuery, @@ -1037,6 +1050,7 @@ impl QuerySubscription { } } } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_generate_input", skip(self, changed, db), fields(timestamp=chrono::Utc::now().to_string())))] fn generate_input( &self, changed: Option<&HashMap>>, @@ -1053,6 +1067,7 @@ impl QuerySubscription { } } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_query_subscription_notify", skip(self, changed, db), fields(timestamp=chrono::Utc::now().to_string())))] async fn notify( &self, changed: Option<&HashMap>>, @@ -1112,6 +1127,7 @@ pub enum EntryReadAccess<'a> { } impl EntryReadAccess<'_> { + #[cfg_attr(feature="otel", tracing::instrument(name="broker_datapoint", skip(self), fields(timestamp=chrono::Utc::now().to_string())))] pub fn datapoint(&self) -> Result<&Datapoint, ReadError> { match self { Self::Entry(entry) => Ok(&entry.datapoint), @@ -1126,6 +1142,7 @@ impl EntryReadAccess<'_> { } } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_metadata", skip(self), fields(timestamp=chrono::Utc::now().to_string())))] pub fn metadata(&self) -> &Metadata { match self { Self::Entry(entry) => &entry.metadata, @@ -1168,6 +1185,7 @@ impl<'a> Iterator for EntryReadIterator<'a, '_> { } impl DatabaseReadAccess<'_, '_> { + #[cfg_attr(feature="otel", tracing::instrument(name="get_entry_by_id", skip(self, id), fields(timestamp=chrono::Utc::now().to_string())))] pub fn get_entry_by_id(&self, id: i32) -> Result<&Entry, ReadError> { match self.db.entries.get(&id) { Some(entry) => match self.permissions.can_read(&entry.metadata.path) { @@ -1186,15 +1204,18 @@ impl DatabaseReadAccess<'_, '_> { } } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_get_metadata_by_id", skip(self, id), fields(timestamp=chrono::Utc::now().to_string())))] pub fn get_metadata_by_id(&self, id: i32) -> Option<&Metadata> { self.db.entries.get(&id).map(|entry| &entry.metadata) } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_get_metadata_by_path", skip(self, path), fields(timestamp=chrono::Utc::now().to_string())))] pub fn get_metadata_by_path(&self, path: &str) -> Option<&Metadata> { let id = self.db.path_to_id.get(path)?; self.get_metadata_by_id(*id) } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_iter_entries", skip(self), fields(timestamp=chrono::Utc::now().to_string())))] pub fn iter_entries(&self) -> EntryReadIterator { EntryReadIterator { inner: self.db.entries.values(), @@ -1215,6 +1236,7 @@ impl DatabaseWriteAccess<'_, '_> { } } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_update_entry_lag_to_be_equal", skip(self, path), fields(timestamp=chrono::Utc::now().to_string())))] pub fn update_entry_lag_to_be_equal(&mut self, path: &str) -> Result<(), UpdateError> { match self.db.path_to_id.get(path) { Some(id) => match self.db.entries.get_mut(id) { @@ -1228,6 +1250,7 @@ impl DatabaseWriteAccess<'_, '_> { } } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_update", skip(self, id, update), fields(timestamp=chrono::Utc::now().to_string())))] pub fn update(&mut self, id: i32, update: EntryUpdate) -> Result, UpdateError> { match self.db.entries.get_mut(&id) { Some(entry) => { @@ -1376,6 +1399,7 @@ impl Database { } } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_authorized_read_access", skip(self, permissions), fields(timestamp=chrono::Utc::now().to_string())))] pub fn authorized_read_access<'a, 'b>( &'a self, permissions: &'b Permissions, @@ -1386,6 +1410,7 @@ impl Database { } } + #[cfg_attr(feature="otel", tracing::instrument(name="broker_authorized_write_access", skip(self, permissions), fields(timestamp=chrono::Utc::now().to_string())))] pub fn authorized_write_access<'a, 'b>( &'a mut self, permissions: &'b Permissions, @@ -1453,6 +1478,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { .authorized_read_access(self.permissions)) } + #[cfg_attr(feature="otel", tracing::instrument(name = "broker_get_id_by_path", skip(self, name) fields(timestamp=chrono::Utc::now().to_string())))] pub async fn get_id_by_path(&self, name: &str) -> Option { self.broker .database @@ -1483,6 +1509,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { .map(|entry| entry.datapoint.clone()) } + #[cfg_attr(feature="otel", tracing::instrument(name="get_metadata", skip(self, id), fields(timestamp=chrono::Utc::now().to_string())))] pub async fn get_metadata(&self, id: i32) -> Option { self.broker .database @@ -1522,7 +1549,8 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { .get_entry_by_id(id) .cloned() } - + + #[cfg_attr(feature="otel", tracing::instrument(name="broker_for_each_entry", skip(self, f), fields(timestamp=chrono::Utc::now().to_string())))] pub async fn for_each_entry(&self, f: impl FnMut(EntryReadAccess)) { self.broker .database @@ -1558,6 +1586,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { .collect() } + #[cfg_attr(feature="otel", tracing::instrument(name = "broker_update_entries",skip(self, updates), fields(timestamp=chrono::Utc::now().to_string())))] pub async fn update_entries( &self, updates: impl IntoIterator, @@ -1629,6 +1658,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { } } + #[cfg_attr(feature="otel", tracing::instrument(name = "broker_subscribe", skip(self, valid_entries), fields(timestamp=chrono::Utc::now().to_string())))] pub async fn subscribe( &self, valid_entries: HashMap>, @@ -2023,6 +2053,7 @@ impl DataBroker { } } + #[cfg_attr(feature="otel", tracing::instrument(name = "broker_authorized_access",skip(self, permissions), fields(timestamp=chrono::Utc::now().to_string())))] pub fn authorized_access<'a, 'b>( &'a self, permissions: &'b Permissions, diff --git a/databroker/src/glob.rs b/databroker/src/glob.rs index bc3f4327..ca071364 100644 --- a/databroker/src/glob.rs +++ b/databroker/src/glob.rs @@ -74,6 +74,7 @@ impl Matcher { } } +#[cfg_attr(feature="otel", tracing::instrument(name="glob_to_regex_string", skip(glob), fields(timestamp=chrono::Utc::now().to_string())))] pub fn to_regex_string(glob: &str) -> String { // Construct regular expression @@ -121,6 +122,7 @@ pub fn to_regex_string(glob: &str) -> String { re } +#[cfg_attr(feature="otel", tracing::instrument(name="glob_to_regex", skip(glob), fields(timestamp=chrono::Utc::now().to_string())))] pub fn to_regex(glob: &str) -> Result { let re = to_regex_string(glob); Regex::new(&re).map_err(|_err| Error::RegexError) @@ -160,6 +162,7 @@ lazy_static! { .expect("regex compilation (of static pattern) should always succeed"); } +#[cfg_attr(feature="otel", tracing::instrument(name="glob_is_valid_pattern", skip(input), fields(timestamp=chrono::Utc::now().to_string())))] pub fn is_valid_pattern(input: &str) -> bool { REGEX_VALID_PATTERN.is_match(input) } diff --git a/databroker/src/grpc/kuksa_val_v1/conversions.rs b/databroker/src/grpc/kuksa_val_v1/conversions.rs index d9b972d1..59d6472f 100644 --- a/databroker/src/grpc/kuksa_val_v1/conversions.rs +++ b/databroker/src/grpc/kuksa_val_v1/conversions.rs @@ -236,6 +236,7 @@ impl From for Option { } impl From> for broker::DataValue { + #[cfg_attr(feature="otel", tracing::instrument(name="conversion_From>", skip(from), fields(timestamp=chrono::Utc::now().to_string())))] fn from(from: Option) -> Self { match from { Some(value) => match value { @@ -316,6 +317,7 @@ impl From for broker::Datapoint { } impl From for proto::DataEntry { + #[cfg_attr(feature="otel", tracing::instrument(name="conversion_From", skip(from), fields(timestamp=chrono::Utc::now().to_string())))] fn from(from: broker::EntryUpdate) -> Self { Self { path: from.path.unwrap_or_default(), diff --git a/databroker/src/grpc/kuksa_val_v1/val.rs b/databroker/src/grpc/kuksa_val_v1/val.rs index 237a0d4a..69c91a69 100644 --- a/databroker/src/grpc/kuksa_val_v1/val.rs +++ b/databroker/src/grpc/kuksa_val_v1/val.rs @@ -256,11 +256,20 @@ impl proto::val_server::Val for broker::DataBroker { } } + #[cfg_attr(feature="otel",tracing::instrument(name="val_set",skip(self, request), fields(trace_id, timestamp= chrono::Utc::now().to_string())))] async fn set( &self, request: tonic::Request, ) -> Result, tonic::Status> { debug!(?request); + + #[cfg(feature="otel")] + let request = (||{ + let (trace_id, request) = read_incoming_trace_id(request); + tracing::Span::current().record("trace_id", &trace_id); + request + })(); + let permissions = match request.extensions().get::() { Some(permissions) => { debug!(?permissions); @@ -472,6 +481,7 @@ impl proto::val_server::Val for broker::DataBroker { >, >; + #[cfg_attr(feature="otel", tracing::instrument(name="subscribe", skip(self, request), fields(trace_id, timestamp=chrono::Utc::now().to_string())))] async fn subscribe( &self, request: tonic::Request, @@ -666,6 +676,7 @@ async fn validate_entry_update( Ok((id, update)) } +#[cfg_attr(feature="otel", tracing::instrument(name="val_convert_to_data_entry_error", skip(path, error), fields(timestamp=chrono::Utc::now().to_string())))] fn convert_to_data_entry_error(path: &String, error: &broker::UpdateError) -> DataEntryError { match error { broker::UpdateError::NotFound => DataEntryError { @@ -735,6 +746,7 @@ fn convert_to_data_entry_error(path: &String, error: &broker::UpdateError) -> Da } } +#[cfg_attr(feature="otel", tracing::instrument(name = "val_convert_to_proto_stream", skip(input), fields(timestamp=chrono::Utc::now().to_string())))] fn convert_to_proto_stream( input: impl Stream, ) -> impl Stream> { @@ -1051,7 +1063,30 @@ fn combine_view_and_fields( combined } +#[cfg(feature="otel")] +#[cfg_attr(feature="otel", tracing::instrument(name="val_read_incoming_trace_id", skip(request), fields(timestamp=chrono::Utc::now().to_string())))] +fn read_incoming_trace_id(request: tonic::Request) -> (String, tonic::Request){ + let mut trace_id: String = String::from(""); + let request_copy = tonic::Request::new(request.get_ref().clone()); + for request in request_copy.into_inner().updates { + match &request.entry { + Some(entry) => match &entry.metadata { + Some(metadata) => match &metadata.description{ + Some(description)=> { + trace_id = String::from(description); + } + None => trace_id = String::from("") + } + None => trace_id = String::from("") + } + None => trace_id = String::from("") + } + } + return(trace_id, request); +} + impl broker::EntryUpdate { + #[cfg_attr(feature="otel", tracing::instrument(name = "val_from_proto_entry_and_fields",skip(entry,fields), fields(timestamp=chrono::Utc::now().to_string())))] fn from_proto_entry_and_fields( entry: &proto::DataEntry, fields: HashSet, diff --git a/databroker/src/lib.rs b/databroker/src/lib.rs index 49672dba..a9221a8b 100644 --- a/databroker/src/lib.rs +++ b/databroker/src/lib.rs @@ -19,6 +19,7 @@ pub mod permissions; pub mod query; pub mod types; pub mod vss; +pub mod open_telemetry; #[cfg(feature = "viss")] pub mod viss; @@ -28,6 +29,15 @@ use std::fmt::Write; use tracing::info; use tracing_subscriber::filter::EnvFilter; +#[cfg(feature="otel")] +use { +tracing_subscriber::layer::SubscriberExt, +open_telemetry::init_trace, +opentelemetry::global, +opentelemetry::sdk::propagation::TraceContextPropagator, +}; + +#[cfg(not(feature="otel"))] pub fn init_logging() { let mut output = String::from("Init logging from RUST_LOG"); let filter = EnvFilter::try_from_default_env().unwrap_or_else(|err| { @@ -42,3 +52,28 @@ pub fn init_logging() { info!("{}", output); } + +#[cfg(feature="otel")] +pub fn init_logging() { + let output = String::from("Init logging from RUST_LOG"); + + // Set OpenTelemetry trace propagator + global::set_text_map_propagator(TraceContextPropagator::new()); + + // Initialize OpenTelemetry tracer + let tracer = init_trace().expect("Failed to initialize tracer"); + + // telemetry layer + let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + + let subscriber = tracing_subscriber::fmt::Subscriber::builder() + .with_max_level(tracing::Level::INFO) // adjust this log level as needed + .finish() + .with(telemetry); // Add telemetry layer + + // Set the subscriber as the global default for tracing + tracing::subscriber::set_global_default(subscriber) + .expect("Unable to install global logging subscriber"); + + info!("{}", output); +} \ No newline at end of file diff --git a/databroker/src/open_telemetry.rs b/databroker/src/open_telemetry.rs new file mode 100644 index 00000000..b2e618ec --- /dev/null +++ b/databroker/src/open_telemetry.rs @@ -0,0 +1,26 @@ +#[cfg(feature="otel")] +use { + opentelemetry::{KeyValue, runtime}, + opentelemetry::sdk::{Resource, trace}, + opentelemetry::trace::TraceError, + opentelemetry_otlp::WithExportConfig, + std::env +}; + +#[cfg(feature="otel")] +pub fn init_trace() -> Result { + opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter( + opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(env::var("OTEL_ENDPOINT").unwrap_or_else(|_| "http://localhost:4317".to_string())), + ).with_batch_config(trace::BatchConfig::default()) // to change default of max_queue_size use .with_max_queue_size(8192) or set env OTEL_BSP_MAX_QUEUE_SIZE, by default it is set to 2_048 + .with_trace_config( + trace::config().with_resource(Resource::new(vec![KeyValue::new( + opentelemetry_semantic_conventions::resource::SERVICE_NAME, + "kuksa-rust-app", + )])), + ) + .install_batch(runtime::Tokio) +} diff --git a/databroker/src/permissions.rs b/databroker/src/permissions.rs index 8157b811..f9b0f5d3 100644 --- a/databroker/src/permissions.rs +++ b/databroker/src/permissions.rs @@ -188,6 +188,7 @@ impl Permissions { Err(PermissionError::Denied) } + #[cfg_attr(feature="otel", tracing::instrument(name="permissions_can_write_actuator_target", skip(self, path), fields(timestamp=chrono::Utc::now().to_string())))] pub fn can_write_actuator_target(&self, path: &str) -> Result<(), PermissionError> { if self.is_expired() { return Err(PermissionError::Expired); @@ -199,6 +200,7 @@ impl Permissions { Err(PermissionError::Denied) } + #[cfg_attr(feature="otel", tracing::instrument(name="permissions_can_write_datapoint", skip(self, path), fields(timestamp=chrono::Utc::now().to_string())))] pub fn can_write_datapoint(&self, path: &str) -> Result<(), PermissionError> { if self.is_expired() { return Err(PermissionError::Expired); @@ -221,6 +223,7 @@ impl Permissions { Err(PermissionError::Denied) } + #[cfg_attr(feature="otel", tracing::instrument(name="permissions_expired", skip(self), fields(timestamp=chrono::Utc::now().to_string())))] #[inline] pub fn is_expired(&self) -> bool { if let Some(expires_at) = self.expires_at { @@ -233,6 +236,7 @@ impl Permissions { } impl PathMatcher { + #[cfg_attr(feature="otel", tracing::instrument(name="permissions_is_match", skip(self, path), fields(timestamp=chrono::Utc::now().to_string())))] pub fn is_match(&self, path: &str) -> bool { match self { PathMatcher::Nothing => false, diff --git a/databroker/src/query/executor.rs b/databroker/src/query/executor.rs index afbd9932..c6935547 100644 --- a/databroker/src/query/executor.rs +++ b/databroker/src/query/executor.rs @@ -37,6 +37,7 @@ pub trait ExecutionInput { } impl CompiledQuery { + #[cfg_attr(feature="otel", tracing::instrument(name="executor_execute_internal", skip(query, input), fields(timestamp=chrono::Utc::now().to_string())))] fn execute_internal( query: &CompiledQuery, input: &impl ExecutionInput, @@ -157,6 +158,8 @@ impl CompiledQuery { Ok(None) } } + + #[cfg_attr(feature="otel", tracing::instrument(name="executor_execute", skip(self, input), fields(timestamp=chrono::Utc::now().to_string())))] pub fn execute( &self, input: &impl ExecutionInput, @@ -166,6 +169,7 @@ impl CompiledQuery { } impl Expr { + #[cfg_attr(feature="otel", tracing::instrument(name="execute", skip(self, input), fields(timestamp=chrono::Utc::now().to_string())))] pub fn execute(&self, input: &impl ExecutionInput) -> Result { match &self { Expr::Datapoint { @@ -396,6 +400,7 @@ impl ExecutionInput for ExecutionInputImpl { } } + #[cfg_attr(feature="otel", tracing::instrument(name="executor_get_fields", skip(self), fields(timestamp=chrono::Utc::now().to_string())))] fn get_fields(&self) -> &HashMap { &self.fields }