From e1075b9c6434bc59fe9cbd290c8b37a0d61b6b63 Mon Sep 17 00:00:00 2001 From: smorihira Date: Wed, 11 Sep 2024 10:21:26 +0900 Subject: [PATCH] feat: add tracer --- rust/Cargo.lock | 33 +++++++++++--- rust/bin/meta/Cargo.toml | 3 ++ rust/bin/meta/src/handler.rs | 3 +- rust/bin/meta/src/handler/meta.rs | 72 +++++++++++++++++++++++++------ rust/bin/meta/src/main.rs | 43 ++++++++++++++++-- 5 files changed, 129 insertions(+), 25 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 8be889e8c5..4b1b50f0e8 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -272,6 +272,12 @@ version = "0.8.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" +[[package]] +name = "defer" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "930c7171c8df9fb1782bdf9b918ed9ed2d33d1d22300abb754f9085bc48bf8e8" + [[package]] name = "either" version = "1.13.0" @@ -771,7 +777,10 @@ checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" name = "meta" version = "0.1.0" dependencies = [ + "defer", "kv", + "observability", + "opentelemetry 0.25.0", "prost-types", "proto", "sled", @@ -829,7 +838,7 @@ name = "observability" version = "0.1.0" dependencies = [ "anyhow", - "opentelemetry", + "opentelemetry 0.23.0", "opentelemetry-otlp", "opentelemetry-semantic-conventions", "opentelemetry_sdk", @@ -860,6 +869,20 @@ dependencies = [ "thiserror", ] +[[package]] +name = "opentelemetry" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "803801d3d3b71cd026851a53f974ea03df3d179cb758b260136a6c9e22e196af" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", +] + [[package]] name = "opentelemetry-http" version = "0.12.0" @@ -869,7 +892,7 @@ dependencies = [ "async-trait", "bytes", "http 0.2.12", - "opentelemetry", + "opentelemetry 0.23.0", "reqwest", ] @@ -882,7 +905,7 @@ dependencies = [ "async-trait", "futures-core", "http 0.2.12", - "opentelemetry", + "opentelemetry 0.23.0", "opentelemetry-http", "opentelemetry-proto", "opentelemetry_sdk", @@ -899,7 +922,7 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "984806e6cf27f2b49282e2a05e288f30594f3dbc74eb7a6e99422bc48ed78162" dependencies = [ - "opentelemetry", + "opentelemetry 0.23.0", "opentelemetry_sdk", "prost 0.12.6", "tonic 0.11.0", @@ -924,7 +947,7 @@ dependencies = [ "glob", "lazy_static", "once_cell", - "opentelemetry", + "opentelemetry 0.23.0", "ordered-float", "percent-encoding", "rand", diff --git a/rust/bin/meta/Cargo.toml b/rust/bin/meta/Cargo.toml index 99e3bb95a2..9dda1a9368 100644 --- a/rust/bin/meta/Cargo.toml +++ b/rust/bin/meta/Cargo.toml @@ -5,8 +5,11 @@ edition = "2021" [dependencies] kv = "0.24.0" +opentelemetry = "0.25.0" prost-types = "0.13.2" proto = { version = "0.1.0", path = "../../libs/proto" } sled = "0.34.7" tokio = { version = "1.40.0", features = ["full"] } tonic = "0.12.2" +observability = { path = "../../libs/observability" } +defer = "0.2.1" diff --git a/rust/bin/meta/src/handler.rs b/rust/bin/meta/src/handler.rs index 41e0853b06..217695a9af 100644 --- a/rust/bin/meta/src/handler.rs +++ b/rust/bin/meta/src/handler.rs @@ -24,8 +24,7 @@ pub struct Meta { } impl Meta { - pub fn new() -> Result { - let cfg_path = "/var/lib/meta/database"; // TODO: pathはこれでよい? + pub fn new(cfg_path: &str) -> Result { let cfg = Config::new(cfg_path); let store = Arc::new(Store::new(cfg)?); let bucket = store.bucket::(Some("meta_bucket"))?; diff --git a/rust/bin/meta/src/handler/meta.rs b/rust/bin/meta/src/handler/meta.rs index bf8c00aecd..cd6eda0a1e 100644 --- a/rust/bin/meta/src/handler/meta.rs +++ b/rust/bin/meta/src/handler/meta.rs @@ -14,8 +14,11 @@ // limitations under the License. // -use proto::{meta::v1::meta_server, payload::v1::{meta, Empty}}; use kv::*; +use defer::defer; +use opentelemetry::{trace::{Tracer, TraceContextExt}, KeyValue, Context}; +use observability::{ctx_span, tracer}; +use proto::{meta::v1::meta_server, payload::v1::{meta, Empty}}; #[tonic::async_trait] impl meta_server::Meta for super::Meta { @@ -23,64 +26,105 @@ impl meta_server::Meta for super::Meta { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { + let parent_cx = request.extensions().get::().cloned().unwrap_or_else(Context::new); + let ctx = ctx_span!(&parent_cx, "Meta::get"); + defer!(ctx.span().end()); + let key = request.into_inner().key; let raw_key = Raw::from(key.as_bytes()); match self.bucket.get(&raw_key) { Ok(Some(value_bytes)) => { + ctx.span().add_event("Key found", vec![KeyValue::new("key", key.clone())]); + let any_value = prost_types::Any { type_url: "type.googleapis.com/your.package.MessageType".to_string(), value: value_bytes.to_vec(), }; - let response = meta::Value { value: Some(any_value), }; - + Ok(tonic::Response::new(response)) }, - Ok(None) => Err(tonic::Status::not_found("Key not found")), - Err(e) => Err(tonic::Status::internal(format!("Database error: {}", e))), + Ok(None) => { + ctx.span().add_event("Key not found", vec![KeyValue::new("key", key)]); + Err(tonic::Status::not_found("Key not found")) + } + Err(e) => { + ctx.span().add_event("Database error", vec![KeyValue::new("error", e.to_string())]); + Err(tonic::Status::internal(format!("Database error: {}", e))) + } } } + async fn set( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { + let parent_cx = request.extensions().get::().cloned().unwrap_or_else(Context::new); + let ctx = ctx_span!(&parent_cx, "Meta::set"); + defer!(ctx.span().end()); + let key_value = request.into_inner(); let key = match key_value.key { Some(k) => k.key, - None => return Err(tonic::Status::invalid_argument("Key is missing")), + None => { + ctx.span().add_event("Invalid argument", vec![KeyValue::new("error", "Key is missing")]); + return Err(tonic::Status::invalid_argument("Key is missing")); + } }; let value = match key_value.value { Some(v) => match v.value { Some(any_value) => any_value.value, - None => return Err(tonic::Status::invalid_argument("Value is missing")), + None => { + ctx.span().add_event("Invalid argument", vec![KeyValue::new("error", "Value is missing")]); + return Err(tonic::Status::invalid_argument("Value is missing")); + } }, - None => return Err(tonic::Status::invalid_argument("Value is missing")), + None => { + ctx.span().add_event("Invalid argument", vec![KeyValue::new("error", "Value is missing")]); + return Err(tonic::Status::invalid_argument("Value is missing")); + } }; let raw_key = Raw::from(key.as_bytes()); let raw_value = sled::IVec::from(value); match self.bucket.set(&raw_key, &raw_value) { - Ok(_) => Ok(tonic::Response::new(Empty {})), - Err(e) => Err(tonic::Status::internal(format!("Failed to set value: {}", e))), + Ok(_) => { + ctx.span().add_event("Value set successfully", vec![KeyValue::new("key", key)]); + Ok(tonic::Response::new(Empty {})) + }, + Err(e) => { + ctx.span().add_event("Failed to set value", vec![KeyValue::new("error", e.to_string())]); + Err(tonic::Status::internal(format!("Failed to set value: {}", e))) + } } } - + async fn delete( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { + let parent_cx = request.extensions().get::().cloned().unwrap_or_else(Context::new); + let ctx = ctx_span!(&parent_cx, "Meta::delete"); + defer!(ctx.span().end()); + let key = request.into_inner().key; let raw_key = Raw::from(key.as_bytes()); match self.bucket.remove(&raw_key) { - Ok(_) => Ok(tonic::Response::new(Empty {})), - Err(e) => Err(tonic::Status::internal(format!("Failed to delete key: {}", e))), + Ok(_) => { + ctx.span().add_event("Key deleted successfully", vec![KeyValue::new("key", key)]); + Ok(tonic::Response::new(Empty {})) + }, + Err(e) => { + ctx.span().add_event("Failed to delete key", vec![KeyValue::new("error", e.to_string())]); + Err(tonic::Status::internal(format!("Failed to delete key: {}", e))) + } } } -} \ No newline at end of file +} diff --git a/rust/bin/meta/src/main.rs b/rust/bin/meta/src/main.rs index 27ebe77fb4..c960225281 100644 --- a/rust/bin/meta/src/main.rs +++ b/rust/bin/meta/src/main.rs @@ -16,15 +16,50 @@ mod handler; +use opentelemetry::global; +use opentelemetry::propagation::Extractor; +use tonic::transport::Server; +use tonic::Request; + +struct MetadataMap<'a>(&'a tonic::metadata::MetadataMap); + +impl<'a> Extractor for MetadataMap<'a> { + fn get(&self, key: &str) -> Option<&str> { + self.0.get(key).and_then(|metadata| metadata.to_str().ok()) + } + + fn keys(&self) -> Vec<&str> { + self.0 + .keys() + .map(|key| match key { + tonic::metadata::KeyRef::Ascii(v) => v.as_str(), + tonic::metadata::KeyRef::Binary(v) => v.as_str(), + }) + .collect::>() } +} + +fn intercept(mut req: Request<()>) -> Result, tonic::Status> { + let parent_cx = global::get_text_map_propagator(|prop| { + prop.extract(&MetadataMap(req.metadata())) + }); + req.extensions_mut().insert(parent_cx); + Ok(req) +} + #[tokio::main] async fn main() -> Result<(), Box> { + // TODO: initialize tracer + let addr = "[::1]:8081".parse()?; - let meta = handler::Meta::new().expect("Failed to initialize Meta service"); + let cfg_path = "/var/lib/meta/database"; // TODO: set the appropriate path + let meta = handler::Meta::new(cfg_path).expect("Failed to initialize Meta service"); - tonic::transport::Server::builder() - .add_service(proto::meta::v1::meta_server::MetaServer::new(meta)) + // the interceptor given here is implicitly executed for each request + Server::builder() + .add_service(proto::meta::v1::meta_server::MetaServer::with_interceptor(meta, intercept)) .serve(addr) .await?; + // TODO: shutdown tracer Ok(()) -} \ No newline at end of file +}