From c56dcac96e5fc526a149c2c560e5b545c2d17814 Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Thu, 16 Jan 2025 18:13:31 -0500 Subject: [PATCH] dekaf: Refactor log forwarding a to make it testable, and write tests for it --- Cargo.lock | 1 + crates/dekaf/Cargo.toml | 6 +- crates/dekaf/src/log_journal.rs | 502 +++++++++++++----- crates/dekaf/src/main.rs | 2 +- ...kaf__log_journal__tests__nested_spans.snap | 145 +++++ ...__tests__session_logger_and_task_name.snap | 57 ++ 6 files changed, 583 insertions(+), 130 deletions(-) create mode 100644 crates/dekaf/src/snapshots/dekaf__log_journal__tests__nested_spans.snap create mode 100644 crates/dekaf/src/snapshots/dekaf__log_journal__tests__session_logger_and_task_name.snap diff --git a/Cargo.lock b/Cargo.lock index df2d1a4dbf..3d93122ec1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1970,6 +1970,7 @@ dependencies = [ "anyhow", "apache-avro", "async-process", + "async-trait", "avro", "axum", "axum-extra", diff --git a/crates/dekaf/Cargo.toml b/crates/dekaf/Cargo.toml index 91a88c7094..f88b35207a 100644 --- a/crates/dekaf/Cargo.toml +++ b/crates/dekaf/Cargo.toml @@ -28,6 +28,7 @@ unseal = { path = "../unseal" } aes-siv = { workspace = true } anyhow = { workspace = true } +async-trait = { workspace = true } axum = { workspace = true } axum-extra = { workspace = true } axum-server = { workspace = true } @@ -74,11 +75,12 @@ url = { workspace = true } webpki = { workspace = true } [dev-dependencies] -apache-avro = { workspace = true } async-process = { path = "../async-process" } flowctl = { path = "../flowctl" } -insta = { workspace = true } locate-bin = { path = "../locate-bin" } + +apache-avro = { workspace = true } +insta = { workspace = true } rdkafka = { workspace = true } reqwest = { workspace = true } schema_registry_converter = { workspace = true } diff --git a/crates/dekaf/src/log_journal.rs b/crates/dekaf/src/log_journal.rs index 2987d96fff..cdf5482ecd 100644 --- a/crates/dekaf/src/log_journal.rs +++ b/crates/dekaf/src/log_journal.rs @@ -1,4 +1,5 @@ use crate::{dekaf_shard_template_id, topology::fetch_dekaf_task_auth, App}; +use async_trait::async_trait; use bytes::Bytes; use flow_client::fetch_task_authorization; use futures::TryStreamExt; @@ -8,24 +9,24 @@ use gazette::{ }; use proto_gazette::message_flags; use serde_json::json; -use std::sync::Arc; +use std::{collections::VecDeque, sync::Arc}; use tokio::task::JoinHandle; use tracing::{span, Event, Id}; use tracing_subscriber::layer::{Context, Layer}; -// When a span is created with this as its name, a `LogForwarder` is attached -// to that span that will begin capturing all log messages emitted inside it -// and all of its children. Log messages are buffered until a child span -// with field SESSION_TASK_NAME_FIELD_MARKER is created, at which point all log -// messages, both buffered and ongoing, will get written to the logs journal -// associated with that task. +/// When a span is created with this as its name, a `LogForwarder` is attached +/// to that span that will begin capturing all log messages emitted inside it +/// and all of its children. Log messages are buffered until a child span +/// with field SESSION_TASK_NAME_FIELD_MARKER is created, at which point all log +/// messages, both buffered and ongoing, will get written to the logs journal +/// associated with that task. pub const SESSION_SPAN_NAME_MARKER: &str = "dekaf_session"; -// When a span is created with this field name, and is a descendent of a span -// named with the value of `SESSION_SPAN_NAME_MARKER`, it causes events (logs) -// anywhere in that hierarchy to get written to the corresponding logs journal. +/// When a span is created with this field name, and is a descendent of a span +/// named with the value of `SESSION_SPAN_NAME_MARKER`, it causes events (logs) +/// anywhere in that hierarchy to get written to the corresponding logs journal. pub const SESSION_TASK_NAME_FIELD_MARKER: &str = "session_task_name"; -// This marker indicates that its Session is not and will never be associated with a -// task, and we should stop buffering logs as we'll never have anywhere to write them. +/// This marker indicates that its Session is not and will never be associated with a +/// task, and we should stop buffering logs as we'll never have anywhere to write them. pub const SESSION_TASKLESS_FIELD_MARKER: &str = "session_is_taskless"; #[derive(Debug)] @@ -35,78 +36,50 @@ enum LoggingMessage { Shutdown, } -struct LogForwarder { - app: Arc, - producer: gazette::uuid::Producer, -} - -impl LogForwarder { - fn new(app: Arc, producer: Producer) -> Self { - Self { app, producer } - } - - async fn forward_logs( - self, - mut logs_rx: tokio::sync::mpsc::Receiver, - ) -> anyhow::Result<()> { - let mut log_data = Vec::new(); +#[async_trait] +pub trait LogAppender: Send + Sync { + async fn append_log_data(&self, log_data: Bytes) -> anyhow::Result<()>; - let (ops_logs_journal_client, ops_logs_journal) = loop { - match logs_rx.recv().await { - Some(LoggingMessage::SetTaskName(name)) => { - let (client, ops_logs) = self.get_journal_client(name).await?; - break (client, ops_logs); - } - Some(LoggingMessage::Log(log)) => { - log_data.append(&mut self.serialize_log(log)); - } - Some(LoggingMessage::Shutdown) | None => return Ok(()), - } - }; - - self.append_log_data( - log_data.into(), - ops_logs_journal.as_str(), - &ops_logs_journal_client, - ) - .await?; + async fn set_task_name(&mut self, name: String) -> anyhow::Result<()>; +} - while let Some(msg) = logs_rx.recv().await { - match msg { - LoggingMessage::SetTaskName(_) => {} - LoggingMessage::Log(log) => { - self.append_log_data( - self.serialize_log(log).into(), - ops_logs_journal.as_str(), - &ops_logs_journal_client, - ) - .await?; - } - LoggingMessage::Shutdown => break, - } - } +#[derive(Clone)] +pub struct GazetteLogAppender { + app: Arc, + client: Option, + journal_name: Option, +} +#[async_trait] +impl LogAppender for GazetteLogAppender { + async fn set_task_name(&mut self, task_name: String) -> anyhow::Result<()> { + let (client, journal) = self.get_journal_client(task_name).await?; + self.client.replace(client); + self.journal_name.replace(journal); Ok(()) } - async fn append_log_data( - &self, - log_data: Bytes, - journal: &str, - client: &journal::Client, - ) -> anyhow::Result<()> { - let resp = client.append( - gazette::broker::AppendRequest { - journal: journal.to_owned(), - ..Default::default() - }, - || { - futures::stream::once({ - let value = log_data.clone(); - async move { Ok(value) } - }) - }, - ); + async fn append_log_data(&self, log_data: Bytes) -> anyhow::Result<()> { + let resp = self + .client + .as_ref() + .ok_or(anyhow::anyhow!("missing journal client"))? + .append( + gazette::broker::AppendRequest { + journal: self + .journal_name + .as_ref() + .ok_or(anyhow::anyhow!("missing journal name"))? + .to_owned(), + ..Default::default() + }, + || { + futures::stream::once({ + let value = log_data.clone(); + async move { Ok(value) } + }) + }, + ); tokio::pin!(resp); @@ -131,28 +104,15 @@ impl LogForwarder { } } } +} - fn serialize_log(&self, log: ops::Log) -> Vec { - let uuid = gazette::uuid::build( - self.producer, - gazette::uuid::Clock::from_time(std::time::SystemTime::now()), - uuid::Flags( - (message_flags::MASK | message_flags::OUTSIDE_TXN) - .try_into() - .unwrap(), - ), - ); - - let mut val = serde_json::to_value(log).expect("Log always serializes"); - - if let Some(obj) = val.as_object_mut() { - obj.insert("_meta".to_string(), json!({ "uuid": uuid })); +impl GazetteLogAppender { + pub fn new(app: Arc) -> Self { + Self { + app: app, + client: None, + journal_name: None, } - - let mut buf = serde_json::to_vec(&val).expect("Value always serializes"); - buf.push(b'\n'); - - buf } async fn get_journal_client( @@ -184,17 +144,121 @@ impl LogForwarder { } } +#[derive(Default, Clone)] +pub struct MockLogAppender { + pub logs: Arc>>, +} + +#[async_trait::async_trait] +impl LogAppender for MockLogAppender { + async fn set_task_name(&mut self, _: String) -> anyhow::Result<()> { + Ok(()) + } + + async fn append_log_data(&self, log_data: Bytes) -> anyhow::Result<()> { + self.logs.lock().await.push_back(log_data); + Ok(()) + } +} + +struct LogForwarder { + producer: gazette::uuid::Producer, + appender: A, +} + +impl LogForwarder { + fn new(producer: Producer, appender: A) -> Self { + Self { producer, appender } + } + + async fn forward_logs( + mut self, + mut logs_rx: tokio::sync::mpsc::Receiver, + ) -> anyhow::Result<()> { + let mut log_data = VecDeque::new(); + + loop { + match logs_rx.recv().await { + Some(LoggingMessage::SetTaskName(name)) => { + self.appender.set_task_name(name).await?; + break; + } + Some(LoggingMessage::Log(log)) => { + log_data.push_front(self.serialize_log(log)); + // Keep at most the latest 100 log messages when in this pending state + log_data.truncate(100); + } + Some(LoggingMessage::Shutdown) | None => return Ok(()), + } + } + + self.appender + .append_log_data( + log_data + .into_iter() + // VecDeque::truncate keeps the first N items, so we use `push_front` + `truncate` to + // store the most recent items in the front of the queue. We need to reverse + // that when sending, as logs should be sent in oldest-first order. + .rev() + .flatten() + .collect::>() + .into(), + ) + .await?; + + while let Some(msg) = logs_rx.recv().await { + match msg { + LoggingMessage::SetTaskName(_) => {} + LoggingMessage::Log(log) => { + self.appender + .append_log_data(self.serialize_log(log).into()) + .await?; + } + LoggingMessage::Shutdown => break, + } + } + + Ok(()) + } + + fn serialize_log(&self, log: ops::Log) -> Vec { + let uuid = gazette::uuid::build( + self.producer, + gazette::uuid::Clock::from_time(std::time::SystemTime::now()), + uuid::Flags( + (message_flags::MASK | message_flags::OUTSIDE_TXN) + .try_into() + .unwrap(), + ), + ); + + let mut val = serde_json::to_value(log).expect("Log always serializes"); + + if let Some(obj) = val.as_object_mut() { + obj.insert("_meta".to_string(), json!({ "uuid": uuid })); + } + + let mut buf = serde_json::to_vec(&val).expect("Value always serializes"); + buf.push(b'\n'); + + buf + } +} + #[derive(Clone)] -struct SessionLogger { +struct SessionLogger { tx: tokio::sync::mpsc::Sender, _handle: Arc>, + _appender: std::marker::PhantomData, } -impl SessionLogger { - fn new(app: Arc, producer: Producer) -> Self { - let (log_tx, log_rx) = tokio::sync::mpsc::channel(1000); +impl SessionLogger { + fn new(producer: Producer, appender: A) -> Self { + // This should always be read promptly by the logic in `LogForwarder::forward_logs`, + // so a larger buffer here would just obscure other problems. + let (log_tx, log_rx) = tokio::sync::mpsc::channel::(50); - let forwarder = LogForwarder::new(app, producer); + let forwarder = LogForwarder::new(producer, appender); let handle = tokio::spawn(async move { if let Err(e) = forwarder.forward_logs(log_rx).await { tracing::error!(error = ?e, "Log forwarding errored"); @@ -204,6 +268,7 @@ impl SessionLogger { Self { tx: log_tx, _handle: Arc::new(handle), + _appender: std::marker::PhantomData, } } @@ -226,14 +291,14 @@ impl SessionLogger { } } -pub struct SessionSubscriberLayer { - app: Arc, +pub struct SessionSubscriberLayer { producer: uuid::Producer, + appender: A, } -impl SessionSubscriberLayer { - pub fn new(app: Arc, producer: uuid::Producer) -> Self { - Self { app, producer } +impl SessionSubscriberLayer { + pub fn new(producer: uuid::Producer, appender: A) -> Self { + Self { producer, appender } } pub fn log_from_metadata(&self, metadata: &tracing::Metadata) -> ops::Log { @@ -254,29 +319,25 @@ impl SessionSubscriberLayer { } } -impl Layer for SessionSubscriberLayer +impl Layer for SessionSubscriberLayer where S: tracing::Subscriber, S: for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>, + A: LogAppender + Send + Sync + Clone + 'static, { fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) { - // First identify if this message is inside a Session span or sub-span let session_logger = if let Some(scope) = ctx.event_scope(event) { scope .from_root() - .find_map(|span_ref| span_ref.extensions().get::().cloned()) + .find_map(|span_ref| span_ref.extensions().get::>().cloned()) } else { None }; if let Some(logger) = session_logger { - // We're inside a Session span and we have the logger, - // so let's build the Log and send the message. let mut log = self.log_from_metadata(event.metadata()); event.record(&mut ops::tracing::FieldVisitor(&mut log)); - // Collect additional metadata in the form of `ops::Log`s attached - // to other spans in this hierarchy. These get injected by [`ops::tracing::Layer`] if let Some(scope) = ctx.event_scope(event) { for span in scope.from_root() { let extensions = span.extensions(); @@ -289,13 +350,9 @@ where } fn on_record(&self, span_id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) { - // When we get some new fields for a span, walk through its hierarchy starting from the root - // and if we find a SessionLogger, Visit it with the new values. This is ultimately how we handle - // SESSION_TASK_NAME_FIELD_MARKER and SESSION_TASKLESS_FIELD_MARKER so that even deep within a - // sub-span, they'll still propagate to the outer span for the whole session. if let Some(scope) = ctx.span_scope(span_id) { for span in scope.from_root() { - if let Some(visitor) = span.extensions_mut().get_mut::() { + if let Some(visitor) = span.extensions_mut().get_mut::>() { values.record(visitor); return; } @@ -304,20 +361,18 @@ where } fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) { - // Check if any parent spans already have a SessionLogger if let Some(scope) = ctx.span_scope(id) { for span in scope.from_root() { - if let Some(visitor) = span.extensions_mut().get_mut::() { + if let Some(visitor) = span.extensions_mut().get_mut::>() { attrs.record(visitor); return; } } } - // No existing spans had a Session, let's check and see if this one should let span = ctx.span(id).unwrap(); if span.name() == SESSION_SPAN_NAME_MARKER { - let mut visitor = SessionLogger::new(self.app.clone(), self.producer); + let mut visitor = SessionLogger::new(self.producer, self.appender.clone()); attrs.record(&mut visitor); @@ -326,7 +381,7 @@ where } } -impl tracing::field::Visit for SessionLogger { +impl tracing::field::Visit for SessionLogger { fn record_str(&mut self, field: &tracing::field::Field, value: &str) { if field.name() == SESSION_TASK_NAME_FIELD_MARKER && value.len() > 0 { self.set_task_name(value.to_string()) @@ -339,7 +394,200 @@ impl tracing::field::Visit for SessionLogger { } } - fn record_debug(&mut self, _: &tracing::field::Field, _: &dyn std::fmt::Debug) { - // Do nothing + fn record_debug(&mut self, _: &tracing::field::Field, _: &dyn std::fmt::Debug) {} +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::Future; + use insta::assert_json_snapshot; + use itertools::Itertools; + use rand::Rng; + use std::time::Duration; + use tracing::instrument::WithSubscriber; + use tracing::{info, info_span}; + + use tracing_subscriber::prelude::*; + + fn gen_producer() -> Producer { + // There's probably a neat bit-banging way to do this with i64 and masks, but I'm just not that fancy. + let mut producer_id = rand::thread_rng().gen::<[u8; 6]>(); + producer_id[0] |= 0x01; + gazette::uuid::Producer::from_bytes(producer_id) + } + + async fn setup(f: F) + where + F: FnOnce(Arc>>) -> Fut, + Fut: Future, + { + let mock_appender = MockLogAppender::default(); + let logs = mock_appender.logs.clone(); + + let producer = gen_producer(); + let layer = SessionSubscriberLayer::new(producer, mock_appender); + + let subscriber = tracing_subscriber::registry() + .with(ops::tracing::Layer::new(|_| {}, std::time::SystemTime::now)) + .with(layer) + .with(tracing_subscriber::fmt::Layer::default()); + + f(logs).with_subscriber(subscriber).await; + } + + async fn assert_output(name: &str, logs: Arc>>) { + let captured_log_bytes = logs + .lock() + .await + .clone() + .into_iter() + .map(|b| Vec::from(b)) + .flatten() + .collect::>(); + + let full_str = String::from_utf8(captured_log_bytes.into()).unwrap(); + + let captured_logs = full_str + .split("\n") + .filter(|l| l.len() > 0) + .map(|line| serde_json::from_str::(line).unwrap()) + .collect_vec(); + + assert_json_snapshot!(name, captured_logs, { + ".*._meta.uuid" => "[uuid]", + ".*.spans.*.ts" => "[ts]", + ".*.ts" => "[ts]" + }); + } + + #[tokio::test] + async fn test_session_subscriber_layer_with_no_session_logger() { + setup(|logs| async move { + { + info!("Test log data, you shouldn't be able to see me"); + } + + tokio::time::sleep(Duration::from_millis(100)).await; + + let captured_logs = logs.lock().await; + assert!(captured_logs.is_empty()); + }) + .await; + } + + #[tokio::test] + async fn test_session_subscriber_layer_with_session_logger_but_no_task_name() { + setup(|logs| async move { + { + info!("Test log data, you shouldn't be able to see me"); + + let session_span = info_span!(SESSION_SPAN_NAME_MARKER); + let _guard = session_span.enter(); + + info!("Test log data but with a SessionLogger"); + }; + + tokio::time::sleep(Duration::from_millis(100)).await; + + let captured_logs = logs.lock().await; + assert!(captured_logs.is_empty()); + }) + .await; + } + + #[tokio::test] + async fn test_session_subscriber_layer_with_session_logger_and_task_name() { + setup(|logs| async move { + { + info!( + "Test log data, not associated with any session, you should not be able to see me" + ); + + let session_span = info_span!(SESSION_SPAN_NAME_MARKER); + let _guard = session_span.enter(); + + info!("Test log data but with a SessionLogger, still should see me"); + + let session_span = info_span!("", { SESSION_TASK_NAME_FIELD_MARKER } = "my_task",); + let _guard = session_span.enter(); + + info!("Test log data with a task name!"); + }; + + tokio::time::sleep(Duration::from_millis(100)).await; + + assert_output("session_logger_and_task_name", logs).await; + }) + .await; + } + + #[tokio::test] + async fn test_session_subscriber_layer_taskless() { + setup(|logs| async move { + { + info!("Before session span"); + + let session_span = info_span!(SESSION_SPAN_NAME_MARKER); + let _guard = session_span.enter(); + + info!("Before taskless marker"); + + let taskless_span = + info_span!("taskless", { SESSION_TASKLESS_FIELD_MARKER } = true); + let _taskless_guard = taskless_span.enter(); + + info!("After taskless marker"); + } + + tokio::time::sleep(Duration::from_millis(100)).await; + + let captured_logs = logs.lock().await; + assert!( + captured_logs.is_empty(), + "Expected no logs for taskless session" + ); + }) + .await; + } + + #[tokio::test] + async fn test_session_subscriber_layer_nested_spans() { + setup(|logs| async move { + { + info!("From before session, should not be visible"); + + let session_span = info_span!(SESSION_SPAN_NAME_MARKER); + let _guard = session_span.enter(); + + info!("From inside session but before task_name, should be visible"); + + let nested_span = info_span!("nested"); + let nested_guard = nested_span.enter(); + + info!("From inside nested span but before task_name, should be visible"); + + let task_span = + info_span!("task", { SESSION_TASK_NAME_FIELD_MARKER } = "test_task"); + let task_guard = task_span.enter(); + + info!("Log from nested span after task name marker"); + + drop(task_guard); + drop(nested_guard); + + info!("Back in session span after task name"); + + let new_span = info_span!("new_nested"); + let _new_guard = new_span.enter(); + + info!("In child of session span after task name"); + } + + tokio::time::sleep(Duration::from_millis(100)).await; + + assert_output("nested_spans", logs).await; + }) + .await; } } diff --git a/crates/dekaf/src/main.rs b/crates/dekaf/src/main.rs index 34b879ca09..49661aa89b 100644 --- a/crates/dekaf/src/main.rs +++ b/crates/dekaf/src/main.rs @@ -179,8 +179,8 @@ async fn main() -> anyhow::Result<()> { .with(ops::tracing::Layer::new(|_| {}, std::time::SystemTime::now)) .with( dekaf::SessionSubscriberLayer::new( - app.clone(), gazette::uuid::Producer::from_bytes(producer_id), + dekaf::log_journal::GazetteLogAppender::new(app.clone()), ) .with_filter(env_filter_builder()), ) diff --git a/crates/dekaf/src/snapshots/dekaf__log_journal__tests__nested_spans.snap b/crates/dekaf/src/snapshots/dekaf__log_journal__tests__nested_spans.snap new file mode 100644 index 0000000000..aff3eb2476 --- /dev/null +++ b/crates/dekaf/src/snapshots/dekaf__log_journal__tests__nested_spans.snap @@ -0,0 +1,145 @@ +--- +source: crates/dekaf/src/log_journal.rs +assertion_line: 457 +expression: captured_logs +--- +[ + { + "_meta": { + "uuid": "[uuid]" + }, + "fields": { + "module": "dekaf::log_journal::tests" + }, + "level": "info", + "message": "From inside session but before task_name, should be visible", + "spans": [ + { + "fields": { + "module": "dekaf::log_journal::tests" + }, + "level": "info", + "message": "dekaf_session", + "ts": "[ts]" + } + ], + "ts": "[ts]" + }, + { + "_meta": { + "uuid": "[uuid]" + }, + "fields": { + "module": "dekaf::log_journal::tests" + }, + "level": "info", + "message": "From inside nested span but before task_name, should be visible", + "spans": [ + { + "fields": { + "module": "dekaf::log_journal::tests" + }, + "level": "info", + "message": "dekaf_session", + "ts": "[ts]" + }, + { + "fields": { + "module": "dekaf::log_journal::tests" + }, + "level": "info", + "message": "nested", + "ts": "[ts]" + } + ], + "ts": "[ts]" + }, + { + "_meta": { + "uuid": "[uuid]" + }, + "fields": { + "module": "dekaf::log_journal::tests" + }, + "level": "info", + "message": "Log from nested span after task name marker", + "spans": [ + { + "fields": { + "module": "dekaf::log_journal::tests" + }, + "level": "info", + "message": "dekaf_session", + "ts": "[ts]" + }, + { + "fields": { + "module": "dekaf::log_journal::tests" + }, + "level": "info", + "message": "nested", + "ts": "[ts]" + }, + { + "fields": { + "module": "dekaf::log_journal::tests", + "session_task_name": "test_task" + }, + "level": "info", + "message": "task", + "ts": "[ts]" + } + ], + "ts": "[ts]" + }, + { + "_meta": { + "uuid": "[uuid]" + }, + "fields": { + "module": "dekaf::log_journal::tests" + }, + "level": "info", + "message": "Back in session span after task name", + "spans": [ + { + "fields": { + "module": "dekaf::log_journal::tests" + }, + "level": "info", + "message": "dekaf_session", + "ts": "[ts]" + } + ], + "ts": "[ts]" + }, + { + "_meta": { + "uuid": "[uuid]" + }, + "fields": { + "module": "dekaf::log_journal::tests" + }, + "level": "info", + "message": "In child of session span after task name", + "spans": [ + { + "fields": { + "module": "dekaf::log_journal::tests" + }, + "level": "info", + "message": "dekaf_session", + "ts": "[ts]" + }, + { + "fields": { + "module": "dekaf::log_journal::tests" + }, + "level": "info", + "message": "new_nested", + "ts": "[ts]" + } + ], + "ts": "[ts]" + } +] diff --git a/crates/dekaf/src/snapshots/dekaf__log_journal__tests__session_logger_and_task_name.snap b/crates/dekaf/src/snapshots/dekaf__log_journal__tests__session_logger_and_task_name.snap new file mode 100644 index 0000000000..e534d8d742 --- /dev/null +++ b/crates/dekaf/src/snapshots/dekaf__log_journal__tests__session_logger_and_task_name.snap @@ -0,0 +1,57 @@ +--- +source: crates/dekaf/src/log_journal.rs +assertion_line: 457 +expression: captured_logs +--- +[ + { + "_meta": { + "uuid": "[uuid]" + }, + "fields": { + "module": "dekaf::log_journal::tests" + }, + "level": "info", + "message": "Test log data but with a SessionLogger, still should see me", + "spans": [ + { + "fields": { + "module": "dekaf::log_journal::tests" + }, + "level": "info", + "message": "dekaf_session", + "ts": "[ts]" + } + ], + "ts": "[ts]" + }, + { + "_meta": { + "uuid": "[uuid]" + }, + "fields": { + "module": "dekaf::log_journal::tests" + }, + "level": "info", + "message": "Test log data with a task name!", + "spans": [ + { + "fields": { + "module": "dekaf::log_journal::tests" + }, + "level": "info", + "message": "dekaf_session", + "ts": "[ts]" + }, + { + "fields": { + "module": "dekaf::log_journal::tests", + "session_task_name": "my_task" + }, + "level": "info", + "ts": "[ts]" + } + ], + "ts": "[ts]" + } +]