Skip to content

Commit

Permalink
feat: add tracer
Browse files Browse the repository at this point in the history
  • Loading branch information
smorihira committed Sep 11, 2024
1 parent 9d3c0a8 commit e1075b9
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 25 deletions.
33 changes: 28 additions & 5 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions rust/bin/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ edition = "2021"

[dependencies]
kv = "0.24.0"
opentelemetry = "0.25.0"
prost-types = "0.13.2"
proto = { version = "0.1.0", path = "../../libs/proto" }
sled = "0.34.7"
tokio = { version = "1.40.0", features = ["full"] }
tonic = "0.12.2"
observability = { path = "../../libs/observability" }
defer = "0.2.1"
3 changes: 1 addition & 2 deletions rust/bin/meta/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ pub struct Meta {
}

impl Meta {
pub fn new() -> Result<Self, kv::Error> {
let cfg_path = "/var/lib/meta/database"; // TODO: pathはこれでよい?
pub fn new(cfg_path: &str) -> Result<Self, kv::Error> {
let cfg = Config::new(cfg_path);
let store = Arc::new(Store::new(cfg)?);
let bucket = store.bucket::<Raw, Raw>(Some("meta_bucket"))?;
Expand Down
72 changes: 58 additions & 14 deletions rust/bin/meta/src/handler/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,73 +14,117 @@
// limitations under the License.
//

use proto::{meta::v1::meta_server, payload::v1::{meta, Empty}};
use kv::*;
use defer::defer;
use opentelemetry::{trace::{Tracer, TraceContextExt}, KeyValue, Context};
use observability::{ctx_span, tracer};
use proto::{meta::v1::meta_server, payload::v1::{meta, Empty}};

#[tonic::async_trait]
impl meta_server::Meta for super::Meta {
async fn get(
&self,
request: tonic::Request<meta::Key>,
) -> std::result::Result<tonic::Response<meta::Value>, tonic::Status> {
let parent_cx = request.extensions().get::<Context>().cloned().unwrap_or_else(Context::new);
let ctx = ctx_span!(&parent_cx, "Meta::get");
defer!(ctx.span().end());

let key = request.into_inner().key;
let raw_key = Raw::from(key.as_bytes());

match self.bucket.get(&raw_key) {
Ok(Some(value_bytes)) => {
ctx.span().add_event("Key found", vec![KeyValue::new("key", key.clone())]);

let any_value = prost_types::Any {
type_url: "type.googleapis.com/your.package.MessageType".to_string(),
value: value_bytes.to_vec(),
};

let response = meta::Value {
value: Some(any_value),
};

Ok(tonic::Response::new(response))
},
Ok(None) => Err(tonic::Status::not_found("Key not found")),
Err(e) => Err(tonic::Status::internal(format!("Database error: {}", e))),
Ok(None) => {
ctx.span().add_event("Key not found", vec![KeyValue::new("key", key)]);
Err(tonic::Status::not_found("Key not found"))
}
Err(e) => {
ctx.span().add_event("Database error", vec![KeyValue::new("error", e.to_string())]);
Err(tonic::Status::internal(format!("Database error: {}", e)))
}
}
}

async fn set(
&self,
request: tonic::Request<meta::KeyValue>,
) -> std::result::Result<tonic::Response<Empty>, tonic::Status> {
let parent_cx = request.extensions().get::<Context>().cloned().unwrap_or_else(Context::new);
let ctx = ctx_span!(&parent_cx, "Meta::set");
defer!(ctx.span().end());

let key_value = request.into_inner();

let key = match key_value.key {
Some(k) => k.key,
None => return Err(tonic::Status::invalid_argument("Key is missing")),
None => {
ctx.span().add_event("Invalid argument", vec![KeyValue::new("error", "Key is missing")]);
return Err(tonic::Status::invalid_argument("Key is missing"));
}
};

let value = match key_value.value {
Some(v) => match v.value {
Some(any_value) => any_value.value,
None => return Err(tonic::Status::invalid_argument("Value is missing")),
None => {
ctx.span().add_event("Invalid argument", vec![KeyValue::new("error", "Value is missing")]);
return Err(tonic::Status::invalid_argument("Value is missing"));
}
},
None => return Err(tonic::Status::invalid_argument("Value is missing")),
None => {
ctx.span().add_event("Invalid argument", vec![KeyValue::new("error", "Value is missing")]);
return Err(tonic::Status::invalid_argument("Value is missing"));
}
};

let raw_key = Raw::from(key.as_bytes());
let raw_value = sled::IVec::from(value);

match self.bucket.set(&raw_key, &raw_value) {
Ok(_) => Ok(tonic::Response::new(Empty {})),
Err(e) => Err(tonic::Status::internal(format!("Failed to set value: {}", e))),
Ok(_) => {
ctx.span().add_event("Value set successfully", vec![KeyValue::new("key", key)]);
Ok(tonic::Response::new(Empty {}))
},
Err(e) => {
ctx.span().add_event("Failed to set value", vec![KeyValue::new("error", e.to_string())]);
Err(tonic::Status::internal(format!("Failed to set value: {}", e)))
}
}
}

async fn delete(
&self,
request: tonic::Request<meta::Key>,
) -> std::result::Result<tonic::Response<Empty>, tonic::Status> {
let parent_cx = request.extensions().get::<Context>().cloned().unwrap_or_else(Context::new);
let ctx = ctx_span!(&parent_cx, "Meta::delete");
defer!(ctx.span().end());

let key = request.into_inner().key;
let raw_key = Raw::from(key.as_bytes());

match self.bucket.remove(&raw_key) {
Ok(_) => Ok(tonic::Response::new(Empty {})),
Err(e) => Err(tonic::Status::internal(format!("Failed to delete key: {}", e))),
Ok(_) => {
ctx.span().add_event("Key deleted successfully", vec![KeyValue::new("key", key)]);
Ok(tonic::Response::new(Empty {}))
},
Err(e) => {
ctx.span().add_event("Failed to delete key", vec![KeyValue::new("error", e.to_string())]);
Err(tonic::Status::internal(format!("Failed to delete key: {}", e)))
}
}
}
}
}
43 changes: 39 additions & 4 deletions rust/bin/meta/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,50 @@

mod handler;

use opentelemetry::global;
use opentelemetry::propagation::Extractor;
use tonic::transport::Server;
use tonic::Request;

struct MetadataMap<'a>(&'a tonic::metadata::MetadataMap);

impl<'a> Extractor for MetadataMap<'a> {
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).and_then(|metadata| metadata.to_str().ok())
}

fn keys(&self) -> Vec<&str> {
self.0
.keys()
.map(|key| match key {
tonic::metadata::KeyRef::Ascii(v) => v.as_str(),
tonic::metadata::KeyRef::Binary(v) => v.as_str(),
})
.collect::<Vec<_>>() }
}

fn intercept(mut req: Request<()>) -> Result<Request<()>, tonic::Status> {
let parent_cx = global::get_text_map_propagator(|prop| {
prop.extract(&MetadataMap(req.metadata()))
});
req.extensions_mut().insert(parent_cx);
Ok(req)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: initialize tracer

let addr = "[::1]:8081".parse()?;
let meta = handler::Meta::new().expect("Failed to initialize Meta service");
let cfg_path = "/var/lib/meta/database"; // TODO: set the appropriate path
let meta = handler::Meta::new(cfg_path).expect("Failed to initialize Meta service");

tonic::transport::Server::builder()
.add_service(proto::meta::v1::meta_server::MetaServer::new(meta))
// the interceptor given here is implicitly executed for each request
Server::builder()
.add_service(proto::meta::v1::meta_server::MetaServer::with_interceptor(meta, intercept))
.serve(addr)
.await?;

// TODO: shutdown tracer
Ok(())
}
}

0 comments on commit e1075b9

Please sign in to comment.