From ed6e16b6144d3ba001258422728505bb1f48b851 Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Thu, 11 Aug 2022 19:05:51 +0200 Subject: [PATCH] feat: get tracing and jaeger going --- Cargo.lock | 15 ++++++ Cargo.toml | 4 +- core/src/command/mod.rs | 2 + core/src/command/mqtt.rs | 6 +++ core/src/injector/mqtt.rs | 48 ++++++++++++------- core/src/kafka.rs | 23 ++++++++++ core/src/lib.rs | 1 + core/src/machine/deno.rs | 74 +++++++++++++++++++----------- core/src/machine/mod.rs | 5 ++ core/src/machine/recon.rs | 8 ++++ core/src/model/mod.rs | 3 +- core/src/notifier/kafka.rs | 2 + core/src/processor/mod.rs | 4 ++ core/src/processor/sink/kafka.rs | 22 +++++++-- core/src/processor/source/kafka.rs | 30 +++++++++--- core/src/service/mod.rs | 20 ++++++++ core/src/storage/mod.rs | 10 ++-- core/src/storage/postgres/mod.rs | 36 ++++++++++++--- core/src/waker/postgres.rs | 4 +- develop/compose-health.yaml | 8 ++++ examples/99_just_testing.adoc | 4 +- justfile | 6 +-- server/src/main.rs | 15 +++--- 23 files changed, 269 insertions(+), 81 deletions(-) create mode 100644 core/src/kafka.rs create mode 100644 develop/compose-health.yaml diff --git a/Cargo.lock b/Cargo.lock index fe1f26d..fae6dff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -925,6 +925,7 @@ checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" [[package]] name = "drogue-bazaar" version = "0.2.0" +source = "git+https://github.com/drogue-iot/drogue-bazaar?rev=f77db10ce88b81c084d5109f48a17d3987b7b620#f77db10ce88b81c084d5109f48a17d3987b7b620" dependencies = [ "actix-cors", "actix-http", @@ -960,6 +961,7 @@ dependencies = [ "tokio", "tracing", "tracing-actix-web", + "tracing-log", "tracing-opentelemetry", "tracing-subscriber", "url", @@ -3631,6 +3633,16 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "tracing-serde" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.15" @@ -3641,12 +3653,15 @@ dependencies = [ "matchers", "once_cell", "regex", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", "tracing-log", + "tracing-serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index f5983f8..c0e8b17 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,8 +11,8 @@ exclude = [ ] [patch.crates-io] -drogue-bazaar = { path = "../drogue-bazaar" } -#drogue-bazaar = { git = "https://github.com/drogue-iot/drogue-bazaar", rev = "3a5e58d05b4be89342e1c7b4ca1409e1cc305310" } +#drogue-bazaar = { path = "../drogue-bazaar" } +drogue-bazaar = { git = "https://github.com/drogue-iot/drogue-bazaar", rev = "f77db10ce88b81c084d5109f48a17d3987b7b620" } #drogue-client = { path = "../drogue-client" } #drogue-client = { git = "https://github.com/drogue-iot/drogue-client", rev = "0cb6998da75905240f06f38a44aac31d7b3fdde5" } # FIXME: awaiting release 0.11.0 diff --git a/core/src/command/mod.rs b/core/src/command/mod.rs index 86bf864..cae2939 100644 --- a/core/src/command/mod.rs +++ b/core/src/command/mod.rs @@ -4,6 +4,7 @@ use async_trait::async_trait; use drogue_bazaar::app::Startup; use serde::de::DeserializeOwned; use std::fmt::Debug; +use tracing::instrument; #[derive(Clone, Debug, PartialEq, Eq)] pub struct Command { @@ -22,6 +23,7 @@ pub trait CommandSink: Sized + Send + Sync + 'static { async fn send_command(&self, command: Command) -> Result<(), Self::Error>; + #[instrument(skip_all, err)] async fn send_commands(&self, commands: Vec) -> Result<(), Self::Error> { for command in commands { self.send_command(command).await?; diff --git a/core/src/command/mqtt.rs b/core/src/command/mqtt.rs index 54974ce..0e8098b 100644 --- a/core/src/command/mqtt.rs +++ b/core/src/command/mqtt.rs @@ -2,6 +2,7 @@ use crate::{command::Command, mqtt::MqttClient}; use async_trait::async_trait; use drogue_bazaar::app::{Startup, StartupExt}; use rumqttc::{AsyncClient, ClientError, Event, EventLoop, Incoming, Outgoing, QoS}; +use tracing::instrument; #[derive(Clone, Debug, serde::Deserialize)] pub struct Config { @@ -63,6 +64,11 @@ impl super::CommandSink for CommandSink { Ok(Self { client, mode }) } + #[instrument(skip_all, fields( + application=command.application, + device=command.device, + channel=command.channel, + ), err)] async fn send_command(&self, command: Command) -> Result<(), Self::Error> { let topic = self .mode diff --git a/core/src/injector/mqtt.rs b/core/src/injector/mqtt.rs index c6dfe1a..54ffb5c 100644 --- a/core/src/injector/mqtt.rs +++ b/core/src/injector/mqtt.rs @@ -6,10 +6,12 @@ use crate::{ mqtt::MqttClient, processor::{sink::Sink, Event}, }; +use anyhow::bail; use chrono::Utc; use lazy_static::lazy_static; use prometheus::{register_histogram, register_int_counter_vec, Histogram, IntCounterVec}; -use rumqttc::{AsyncClient, EventLoop, Incoming, QoS, SubscribeReasonCode}; +use rumqttc::{AsyncClient, EventLoop, Incoming, Publish, QoS, SubscribeReasonCode}; +use tracing::instrument; lazy_static! { static ref EVENTS: IntCounterVec = register_int_counter_vec!( @@ -120,22 +122,9 @@ impl Injector { } } Ok(rumqttc::Event::Incoming(Incoming::Publish(publish))) => { - match self.build_event(&publish.payload) { - Ok(Some(event)) => { - log::debug!("Injecting event: {event:?}"); - if let Err(err) = self.sink.publish(event).await { - log::error!("Failed to inject event: {err}, Exiting loop"); - } - EVENTS.with_label_values(&["ok"]).inc(); - } - Ok(None) => { - // got skipped - EVENTS.with_label_values(&["skipped"]).inc(); - } - Err(err) => { - EVENTS.with_label_values(&["failed"]).inc(); - log::info!("Unable to parse event: {err}, skipping...") - } + if let Err(err) = self.handle_publish(&publish).await { + log::warn!("Failed to schedule message: {err}"); + break; } if perform_ack { if let Err(err) = self.client.try_ack(&publish) { @@ -158,6 +147,7 @@ impl Injector { Ok(()) } + #[instrument(skip_all, fields(payload_len=payload.len()), err)] fn build_event(&self, payload: &[u8]) -> anyhow::Result> { let mut event: cloudevents::Event = serde_json::from_slice(payload)?; @@ -192,4 +182,28 @@ impl Injector { message, })) } + + #[instrument(skip_all, fields(topic=publish.topic, pkid=publish.pkid))] + async fn handle_publish(&self, publish: &Publish) -> anyhow::Result<()> { + match self.build_event(&publish.payload) { + Ok(Some(event)) => { + log::debug!("Injecting event: {event:?}"); + if let Err(err) = self.sink.publish(event).await { + log::error!("Failed to inject event: {err}, Exiting loop"); + bail!("Failed to inject event: {err}") + } + EVENTS.with_label_values(&["ok"]).inc(); + } + Ok(None) => { + // got skipped + EVENTS.with_label_values(&["skipped"]).inc(); + } + Err(err) => { + EVENTS.with_label_values(&["invalid"]).inc(); + log::info!("Unable to parse event: {err}, skipping..."); + } + } + + Ok(()) + } } diff --git a/core/src/kafka.rs b/core/src/kafka.rs new file mode 100644 index 0000000..e303afa --- /dev/null +++ b/core/src/kafka.rs @@ -0,0 +1,23 @@ +use opentelemetry::propagation::Injector; +use rdkafka::message::OwnedHeaders; + +pub struct KafkaHeaders(Option); + +impl Injector for KafkaHeaders { + fn set(&mut self, key: &str, value: String) { + let h = self.0.take().unwrap_or_default(); + self.0 = Some(h.add(&format!("ce_{}", key), &value)) + } +} + +impl From for KafkaHeaders { + fn from(headers: OwnedHeaders) -> Self { + Self(Some(headers)) + } +} + +impl From for OwnedHeaders { + fn from(headers: KafkaHeaders) -> Self { + headers.0.unwrap_or_default() + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 70202c7..895d050 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -2,6 +2,7 @@ pub mod command; pub mod config; pub mod error; pub mod injector; +pub mod kafka; pub mod listener; pub mod machine; pub mod model; diff --git a/core/src/machine/deno.rs b/core/src/machine/deno.rs index 83a4e16..b6cbb68 100644 --- a/core/src/machine/deno.rs +++ b/core/src/machine/deno.rs @@ -2,6 +2,7 @@ use deno_core::{include_js_files, serde_v8, v8, Extension, JsRuntime, RuntimeOpt use serde::{Deserialize, Serialize}; use std::convert::Infallible; use tokio::{runtime::Handle, task::JoinHandle, time::Instant}; +use tracing::{instrument, Span}; #[derive(Clone, Debug)] pub struct DenoOptions { @@ -134,6 +135,7 @@ impl Execution { } } + #[instrument(skip_all)] fn create_runtime(&self) -> JsRuntime { // disable some operations let disable = Extension::builder() @@ -148,15 +150,21 @@ impl Execution { .js(include_js_files!(prefix "drogue:extensions/api", "js/core.js",)) .build(); + tracing::info!("Built extensions"); + // FIXME: doesn't work as advertised, we keep it anyway let create_params = v8::Isolate::create_params().heap_limits(0, 3 * 1024 * 1024); + tracing::info!("Created parameters"); + let mut runtime = JsRuntime::new(RuntimeOptions { create_params: Some(create_params), extensions: vec![disable, api], ..Default::default() }); + tracing::info!("Created runtime"); + let isolate = runtime.v8_isolate().thread_safe_handle(); runtime.add_near_heap_limit_callback(move |_, current| { // FIXME: again, this currently doesn't work properly, we keep it anyway @@ -167,7 +175,8 @@ impl Execution { runtime } - pub async fn run(self, input: I) -> anyhow::Result> + #[instrument(parent = parent, skip_all)] + fn run_inner(self, parent: Span, input: I) -> anyhow::Result> where I: Injectable + 'static, O: Extractable + 'static, @@ -176,44 +185,57 @@ impl Execution { struct Deadline(JoinHandle<()>); impl Drop for Deadline { fn drop(&mut self) { + tracing::warn!("Aborting script"); self.0.abort(); } } - Handle::current() - .spawn_blocking(move || { - let mut runtime = self.create_runtime(); + let mut runtime = self.create_runtime(); - let name = self.name; - let code = self.code; + let name = self.name; + let code = self.code; - let isolate = runtime.v8_isolate().thread_safe_handle(); - let deadline = Deadline(Handle::current().spawn(async move { - tokio::time::sleep_until(self.opts.deadline).await; - isolate.terminate_execution(); - })); + let isolate = runtime.v8_isolate().thread_safe_handle(); + let deadline = Deadline(Handle::current().spawn(async move { + tokio::time::sleep_until(self.opts.deadline).await; + isolate.terminate_execution(); + })); - // set_context::<_, O>(&mut runtime, input)?; - input.inject(&mut runtime, "context")?; + input.inject(&mut runtime, "context")?; - let global = runtime.execute_script(&name, &code)?; + tracing::info!("Execute script"); + let global = runtime.execute_script(&name, &code)?; - let return_value = R::r#return(&mut runtime, global)?; + let return_value = R::r#return(&mut runtime, global)?; - Handle::current().block_on(async { runtime.run_event_loop(false).await })?; - // FIXME: eval late result + Handle::current().block_on(async { runtime.run_event_loop(false).await })?; - // stop the deadline watcher - drop(deadline); + tracing::info!("Awaited event loop"); - //let output = extract_context(&mut runtime)?; - let output = O::extract(&mut runtime, "context")?; + // FIXME: eval late result - Ok::<_, anyhow::Error>(ExecutionResult { - output, - return_value, - }) - }) + // stop the deadline watcher + drop(deadline); + + //let output = extract_context(&mut runtime)?; + let output = O::extract(&mut runtime, "context")?; + + Ok::<_, anyhow::Error>(ExecutionResult { + output, + return_value, + }) + } + + #[instrument(skip_all, err)] + pub async fn run(self, input: I) -> anyhow::Result> + where + I: Injectable + 'static, + O: Extractable + 'static, + R: Returnable + 'static, + { + let span = Span::current(); + Handle::current() + .spawn_blocking(move || self.run_inner(span, input)) .await? } } diff --git a/core/src/machine/mod.rs b/core/src/machine/mod.rs index 9514d4d..a3d51cd 100644 --- a/core/src/machine/mod.rs +++ b/core/src/machine/mod.rs @@ -16,6 +16,7 @@ use lazy_static::lazy_static; use prometheus::{register_histogram, Histogram}; use serde_json::Value; use std::{convert::Infallible, fmt::Debug, future::Future, sync::Arc}; +use tracing::instrument; lazy_static! { static ref TIMER_DELAY: Histogram = @@ -62,6 +63,7 @@ impl Machine { } /// Run actions for creating a new thing. + #[instrument(skip_all, err)] pub async fn create(new_thing: Thing) -> Result { // Creating means that we start with an empty thing, and then set the initial state. // This allows to run through the reconciliation initially. @@ -77,6 +79,7 @@ impl Machine { } /// Run an update. + #[instrument(skip_all, err)] pub async fn update(self, f: F) -> Result where F: FnOnce(Thing) -> Fut, @@ -149,6 +152,7 @@ impl Machine { }) } + #[instrument(skip_all, err)] pub async fn delete(thing: Thing) -> Result { let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(1); @@ -210,6 +214,7 @@ impl Machine { }) } + #[instrument(skip_all, err)] fn validate(new_thing: &Thing) -> Result<(), Error> { match &new_thing.schema { Some(Schema::Json(schema)) => match schema { diff --git a/core/src/machine/recon.rs b/core/src/machine/recon.rs index 6410f27..4ea5c24 100644 --- a/core/src/machine/recon.rs +++ b/core/src/machine/recon.rs @@ -15,6 +15,7 @@ use chrono::{DateTime, Duration, Utc}; use indexmap::IndexMap; use serde_json::Value; use std::sync::Arc; +use tracing::instrument; #[derive(Clone, Debug, Copy, PartialEq, Eq, serde::Serialize)] #[serde(rename_all = "camelCase")] @@ -47,6 +48,7 @@ impl Reconciler { } } + #[instrument(skip_all, err)] pub async fn run(mut self) -> Result { // cleanup first self.cleanup(); @@ -108,6 +110,7 @@ impl Reconciler { } } + #[instrument(skip_all, err)] async fn generate_synthetics(&mut self) -> Result<(), Error> { let now = Utc::now(); @@ -253,6 +256,7 @@ impl Reconciler { Ok(()) } + #[instrument(skip_all, err)] async fn reconcile_desired_state(&mut self) -> Result<(), Error> { // sync first self.sync_desired_state()?; @@ -327,6 +331,7 @@ impl Reconciler { Ok(()) } + #[instrument(skip_all, err)] async fn reconcile_changed(&mut self, changed: IndexMap) -> Result<(), Error> { for (name, mut changed) in changed { let ExecutionResult { logs } = self @@ -344,6 +349,7 @@ impl Reconciler { Ok(()) } + #[instrument(skip_all, err)] async fn reconcile_timers(&mut self, timers: IndexMap) -> Result<(), Error> { for (name, mut timer) in timers { let due = match timer.stopped { @@ -421,6 +427,7 @@ impl Reconciler { Ok(()) } + #[instrument(skip_all, fields(name, action), err)] async fn run_code( &mut self, name: String, @@ -495,6 +502,7 @@ impl Reconciler { } } + #[instrument(skip_all, fields(name), err)] async fn run_synthetic( name: &str, r#type: &SyntheticType, diff --git a/core/src/model/mod.rs b/core/src/model/mod.rs index a19e254..fbdc91b 100644 --- a/core/src/model/mod.rs +++ b/core/src/model/mod.rs @@ -6,8 +6,7 @@ pub use desired::*; pub use recon::*; pub use waker::*; -use crate::processor::Event; -use crate::service::Id; +use crate::{processor::Event, service::Id}; use base64::STANDARD; use base64_serde::base64_serde_type; use chrono::{DateTime, Duration, Utc}; diff --git a/core/src/notifier/kafka.rs b/core/src/notifier/kafka.rs index 80d22c3..a693257 100644 --- a/core/src/notifier/kafka.rs +++ b/core/src/notifier/kafka.rs @@ -10,6 +10,7 @@ use rdkafka::{ util::Timeout, }; use std::{collections::HashMap, time::Duration}; +use tracing::instrument; #[derive(Clone, Debug, serde::Deserialize)] pub struct Config { @@ -69,6 +70,7 @@ impl super::Notifier for Notifier { }) } + #[instrument(skip_all, err)] async fn notify(&self, thing: &Thing) -> Result<(), notifier::Error> { let Metadata { application, name, .. diff --git a/core/src/processor/mod.rs b/core/src/processor/mod.rs index 2b18e02..ef463fb 100644 --- a/core/src/processor/mod.rs +++ b/core/src/processor/mod.rs @@ -24,6 +24,7 @@ use prometheus::{ }; use serde_json::Value; use std::collections::BTreeMap; +use tracing::instrument; use uuid::Uuid; lazy_static! { @@ -235,6 +236,7 @@ where /// /// NOTE: This function respects a change in the `deletion_timestamp` and will trigger a /// deletion if the updater sets it. + #[instrument(skip_all, fields(id), err)] async fn run_cleanup( service: &Service, id: &Id, @@ -296,6 +298,7 @@ where } /// Either update or insert a new thing + #[instrument(skip_all, fields(id), err)] async fn run_upsert( service: &Service, id: &Id, @@ -353,6 +356,7 @@ where Ok(()) } + #[instrument(skip_all, fields(id), err)] async fn run_update( service: &Service, id: &Id, diff --git a/core/src/processor/sink/kafka.rs b/core/src/processor/sink/kafka.rs index 76dd692..daf5ff3 100644 --- a/core/src/processor/sink/kafka.rs +++ b/core/src/processor/sink/kafka.rs @@ -1,12 +1,14 @@ use crate::config::kafka::KafkaProperties; +use crate::kafka::KafkaHeaders; use crate::processor::Event; use anyhow::anyhow; use async_trait::async_trait; +use opentelemetry::global::get_text_map_propagator; use rdkafka::config::FromClientConfig; use rdkafka::message::OwnedHeaders; use rdkafka::producer::{FutureProducer, FutureRecord}; -use std::collections::HashMap; -use std::time::Duration; +use std::{collections::HashMap, time::Duration}; +use tracing::instrument; #[derive(Clone, Debug, serde::Deserialize)] pub struct Config { @@ -54,6 +56,12 @@ impl super::Sink for Sink { }) } + #[instrument(skip_all, fields( + id=event.id, + timestamp=%event.timestamp, + application=event.application, + thing=event.thing + ), err)] async fn publish(&self, event: Event) -> anyhow::Result<()> { let key = format!("{}/{}", event.application, event.thing); @@ -66,8 +74,14 @@ impl super::Sink for Sink { .add("ce_type", "io.drogue.doppelgeanger.event.v1") .add("ce_timestamp", &event.timestamp.to_rfc3339()) .add("content-type", "application/json") - .add("application", &event.application) - .add("thing", &event.thing); + .add("ce_application", &event.application) + .add("ce_thing", &event.thing); + + let mut headers = KafkaHeaders::from(headers); + get_text_map_propagator(|prop| { + prop.inject(&mut headers); + }); + let headers = headers.into(); let record = FutureRecord::to(&self.topic) .key(&key) diff --git a/core/src/processor/source/kafka.rs b/core/src/processor/source/kafka.rs index e8bcbe9..adcaeb0 100644 --- a/core/src/processor/source/kafka.rs +++ b/core/src/processor/source/kafka.rs @@ -8,8 +8,8 @@ use rdkafka::{ message::{BorrowedMessage, Headers}, Message, }; -use std::collections::HashMap; -use std::future::Future; +use std::{collections::HashMap, future::Future}; +use tracing::instrument; #[derive(Clone, Debug, serde::Deserialize)] pub struct Config { @@ -25,6 +25,21 @@ pub struct Source { consumer: StreamConsumer, } +impl Source { + #[instrument(skip_all, fields( + id=event.id, + application=event.application, + thing=event.thing, + ))] + async fn process(f: &F, event: Event) -> anyhow::Result<()> + where + F: Fn(Event) -> Fut + Send + Sync, + Fut: Future> + Send, + { + f(event).await + } +} + #[async_trait] impl super::Source for Source { type Config = Config; @@ -69,7 +84,8 @@ impl super::Source for Source { match from_msg(&msg) { Ok(event) => { log::debug!("Processing event: {event:?}"); - if let Err(err) = f(event).await { + + if let Err(err) = Self::process(&f, event).await { log::error!("Handler failed: {err}"); break; } @@ -119,10 +135,10 @@ fn extract_meta(msg: &BorrowedMessage) -> anyhow::Result<(String, String, String Some(("ce_timestamp", Ok(value))) => { timestamp = Some(value); } - Some(("application", Ok(value))) => { + Some(("ce_application", Ok(value))) => { application = Some(value); } - Some(("thing", Ok(value))) => { + Some(("ce_thing", Ok(value))) => { thing = Some(value); } _ => {} @@ -136,10 +152,10 @@ fn extract_meta(msg: &BorrowedMessage) -> anyhow::Result<(String, String, String .ok_or_else(|| anyhow!("Missing 'ce_timestamp' header"))? .to_string(), application - .ok_or_else(|| anyhow!("Missing 'application' header"))? + .ok_or_else(|| anyhow!("Missing 'ce_application' header"))? .to_string(), thing - .ok_or_else(|| anyhow!("Missing 'thing' header"))? + .ok_or_else(|| anyhow!("Missing 'ce_thing' header"))? .to_string(), )) } diff --git a/core/src/service/mod.rs b/core/src/service/mod.rs index 3e1bcca..f8e3324 100644 --- a/core/src/service/mod.rs +++ b/core/src/service/mod.rs @@ -17,6 +17,7 @@ use chrono::{Duration, Utc}; use drogue_bazaar::app::Startup; use lazy_static::lazy_static; use prometheus::{register_int_counter, IntCounter}; +use tracing::instrument; use uuid::Uuid; lazy_static! { @@ -105,6 +106,7 @@ impl Service Result> { let Outcome { mut new_thing, @@ -148,6 +150,7 @@ impl Service Result, Error> { self.storage .get(&id.application, &id.thing) @@ -240,6 +243,7 @@ impl Service( &self, id: &Id, @@ -259,6 +263,11 @@ impl Service Service Result> { let outbox = if let Some(outbox) = new_thing.internal.as_ref().map(|i| &i.outbox) { outbox @@ -447,6 +461,11 @@ impl Service Service where - E: Send + Sync, + E: Send + Sync + std::error::Error, { /// Returned when an option should modify a thing, but it could not be found. /// @@ -34,8 +35,8 @@ where #[derive(thiserror::Error)] pub enum UpdateError where - SE: Send + Sync, - UE: Send + Sync, + SE: Send + Sync + std::error::Error, + UE: Send + Sync + std::error::Error, { #[error("Service error: {0}")] Service(#[from] Error), @@ -55,6 +56,7 @@ pub trait Storage: Sized + Send + Sync + 'static { async fn create(&self, thing: Thing) -> Result>; async fn update(&self, thing: Thing) -> Result>; + #[instrument(skip(self, f), err, ret)] async fn patch( &self, application: &str, @@ -64,7 +66,7 @@ pub trait Storage: Sized + Send + Sync + 'static { where F: FnOnce(Thing) -> Fut + Send + Sync, Fut: Future> + Send + Sync, - E: Send + Sync, + E: Send + Sync + std::error::Error, { log::debug!("Updating existing thing: {application} / {name}"); diff --git a/core/src/storage/postgres/mod.rs b/core/src/storage/postgres/mod.rs index 034967c..57ed7ca 100644 --- a/core/src/storage/postgres/mod.rs +++ b/core/src/storage/postgres/mod.rs @@ -10,7 +10,7 @@ use crate::{ }; use async_trait::async_trait; use chrono::{DateTime, Utc}; -use deadpool_postgres::{PoolError, Runtime}; +use deadpool_postgres::{Object, PoolError, Runtime}; use postgres_types::Type; use std::collections::BTreeMap; use tokio_postgres::{ @@ -18,6 +18,7 @@ use tokio_postgres::{ types::{Json, ToSql}, Row, }; +use tracing::instrument; use uuid::Uuid; #[derive(Clone, Debug, serde::Deserialize)] @@ -133,6 +134,7 @@ impl super::Storage for Storage { Ok(Self { application, pool }) } + #[instrument(skip(self), err)] async fn get(&self, application: &str, name: &str) -> Result> { if let Err(storage::Error::NotFound) = self.ensure_app(application, || storage::Error::NotFound) @@ -140,7 +142,7 @@ impl super::Storage for Storage { return Ok(None); } - let con = self.pool.get().await.map_err(Error::Pool)?; + let con = self.connection().await?; let stmt = con .prepare_typed_cached( @@ -170,6 +172,8 @@ WHERE .await .map_err(Error::Postgres)?; + tracing::info!("Prepared statement"); + match con .query_opt(&stmt, &[&name, &application]) .await @@ -202,10 +206,11 @@ WHERE } } + #[instrument(skip_all, err)] async fn create(&self, mut thing: Thing) -> Result { self.ensure_app(&thing.metadata.application, || storage::Error::NotAllowed)?; - let con = self.pool.get().await.map_err(Error::Pool)?; + let con = self.connection().await?; // Init metadata. We need to set this on the thing too, as we return it. let uid = Uuid::new_v4(); @@ -270,6 +275,8 @@ INSERT INTO things ( .await .map_err(Error::Postgres)?; + tracing::info!("Prepared statement"); + con.execute( &stmt, &[ @@ -294,10 +301,11 @@ INSERT INTO things ( Ok(thing.clone()) } + #[instrument(skip_all, err)] async fn update(&self, mut thing: Thing) -> Result { self.ensure_app(&thing.metadata.application, || storage::Error::NotFound)?; - let con = self.pool.get().await.map_err(Error::Pool)?; + let con = self.connection().await?; let name = &thing.metadata.name; let application = &thing.metadata.application; @@ -370,9 +378,15 @@ RETURNING .await .map_err(Error::Postgres)?; + tracing::info!("Prepared statement"); + match con.query_opt(&stmt, ¶ms).await { - Ok(None) => Err(storage::Error::PreconditionFailed), + Ok(None) => { + tracing::info!("Precondition failed"); + Err(storage::Error::PreconditionFailed) + } Ok(Some(row)) => { + tracing::info!("Row updated"); // update metadata, with new values thing.metadata.uid = row.try_get("UID").map_err(Error::Postgres)?; @@ -390,6 +404,7 @@ RETURNING } } + #[instrument(skip(self), err, ret)] async fn delete_with( &self, application: &str, @@ -402,7 +417,7 @@ RETURNING return Ok(false); } - let con = self.pool.get().await.map_err(Error::Pool)?; + let con = self.connection().await?; log::debug!("Deleting thing: {application} / {name}"); @@ -441,8 +456,12 @@ WHERE .await .map_err(Error::Postgres)?; + tracing::info!("Prepared statement"); + let rows = con.execute(&stmt, ¶ms).await.map_err(Error::Postgres)?; + tracing::info!(rows, "Statement executed"); + Ok(rows > 0) } } @@ -460,6 +479,11 @@ impl Storage { } Ok(()) } + + #[instrument(skip_all, err)] + async fn connection(&self) -> std::result::Result { + self.pool.get().await.map_err(Error::Pool) + } } fn waker_data(thing: &Thing) -> Option> { diff --git a/core/src/waker/postgres.rs b/core/src/waker/postgres.rs index 3b96926..ede1f74 100644 --- a/core/src/waker/postgres.rs +++ b/core/src/waker/postgres.rs @@ -10,6 +10,7 @@ use std::future::Future; use std::time::Duration; use tokio::time::MissedTickBehavior; use tokio_postgres::Statement; +use tracing::instrument; use uuid::Uuid; #[derive(Clone, Debug, serde::Deserialize)] @@ -150,18 +151,17 @@ where con: Client, stmt: &'r (String, Vec), application: &'r Option, - f: &'r F, ) -> Self { Self { con, stmt, application, - f, } } + #[instrument(skip_all, fields(application=self.application), err)] async fn run(mut self) -> anyhow::Result<()> { let stmt = self .con diff --git a/develop/compose-health.yaml b/develop/compose-health.yaml new file mode 100644 index 0000000..1fa38de --- /dev/null +++ b/develop/compose-health.yaml @@ -0,0 +1,8 @@ +version: "3.9" +services: + jaeger: + image: docker.io/jaegertracing/all-in-one:latest + ports: + - "16686:16686" + - "6831:6831/udp" + - "6832:6832/udp" diff --git a/examples/99_just_testing.adoc b/examples/99_just_testing.adoc index 1727d5c..b51391d 100644 --- a/examples/99_just_testing.adoc +++ b/examples/99_just_testing.adoc @@ -47,8 +47,8 @@ http $HTTP_OPTS POST localhost:8080/api/v1alpha1/things metadata:='{"application http $HTTP_OPTS POST localhost:8080/api/v1alpha1/things metadata:='{"application": "default", "name": "A4:C1:38:A2:6D:42/status" }' 'reconciliation[changed][hierarchy][javaScript]=@80_hierarchy/code.js' 'reconciliation[deleting][hierarchy][javaScript]=@80_hierarchy/code.js' http $HTTP_OPTS POST localhost:8080/api/v1alpha1/things metadata:='{"application": "default", "name": "holsetbakken-dungeon/foo" }' 'reconciliation[changed][hierarchy][javaScript]=@80_hierarchy/code.js' 'reconciliation[deleting][hierarchy][javaScript]=@80_hierarchy/code.js' -http $HTTP_OPTS PUT localhost:8080/api/v1alpha1/things/default/things/A4:C1:38:0A:88:A8/annotations 'io.drogue/group=de/by/munich' -http $HTTP_OPTS PUT localhost:8080/api/v1alpha1/things/default/things/A4:C1:38:A2:6D:42/annotations 'io.drogue/group=de/by/munich' +http $HTTP_OPTS PUT localhost:8080/api/v1alpha1/things/default/things/A4:C1:38:0A:88:A8/annotations 'io.drogue/group=de/by/munich/room2' +http $HTTP_OPTS PUT localhost:8080/api/v1alpha1/things/default/things/A4:C1:38:A2:6D:42/annotations 'io.drogue/group=de/by/munich/room1' http $HTTP_OPTS PUT localhost:8080/api/v1alpha1/things/default/things/holsetbakken-dungeon/annotations 'io.drogue/group=no/34/hamar' http $HTTP_OPTS GET localhost:8080/api/v1alpha1/things/default/things/A4:C1:38:0A:88:A8%2Fstatus diff --git a/justfile b/justfile index e9b1e04..bbbd696 100644 --- a/justfile +++ b/justfile @@ -1,10 +1,10 @@ run-deps: - podman-compose -f develop/compose.yaml up + podman-compose -f develop/compose.yaml -f develop/compose-health.yaml up start-deps: - podman-compose -f develop/compose.yaml up -d + podman-compose -f develop/compose.yaml -f develop/compose-health.yaml up -d stop-deps: - podman-compose -f develop/compose.yaml down + podman-compose -f develop/compose.yaml -f develop/compose-health.yaml down restart-deps: stop-deps start-deps diff --git a/server/src/main.rs b/server/src/main.rs index 8db3120..8f3aadb 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -7,9 +7,9 @@ use crate::keycloak::SERVICE_CLIENT_SECRET; use drogue_bazaar::auth::openid::{AuthenticatorClientConfig, AuthenticatorGlobalConfig}; use drogue_bazaar::{ actix::http::{CorsBuilder, HttpBuilder, HttpConfig}, - app::{RuntimeConfig, Startup}, + app::Startup, auth::openid::AuthenticatorConfig, - core::{config::ConfigFromEnv, SpawnerExt}, + core::SpawnerExt, runtime, }; use drogue_doppelgaenger_core::{ @@ -226,10 +226,13 @@ async fn run(server: Server, startup: &mut dyn Startup) -> anyhow::Result<()> { // prepare the http server - let runtime = RuntimeConfig::from_env()?; - HttpBuilder::new(server.http.clone(), Some(&runtime), move |config| { - config.configure(|ctx| configurator(ctx)); - }) + HttpBuilder::new( + server.http.clone(), + Some(startup.runtime_config()), + move |config| { + config.configure(|ctx| configurator(ctx)); + }, + ) .cors(CorsBuilder::Permissive) .start(startup)?;