Skip to content

Commit

Permalink
Updated JsonLogger and its Tests
Browse files Browse the repository at this point in the history
Signed-off-by: wasup-yash <[email protected]>
Signed-off-by: Sascha Grunert <[email protected]>
  • Loading branch information
wasup-yash authored and saschagrunert committed Dec 6, 2023
1 parent cec69fa commit f870ade
Show file tree
Hide file tree
Showing 10 changed files with 523 additions and 241 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
.build
latest-*.txt
/*.tar.gz
.vscode
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions conmon-rs/common/proto/conmon.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ interface Conmon {
enum Type {
# The CRI logger, requires `path` to be set.
containerRuntimeInterface @0;
# The JSON logger, requires `path` to be set.
json @1;
}
}

Expand Down
1 change: 1 addition & 0 deletions conmon-rs/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ prctl = "1.0.0"
regex = "1.10.2"
sendfd = { version = "0.4.3", features = ["tokio"] }
serde = { version = "1.0.193", features = ["derive"] }
serde_json = "1.0.108"
shadow-rs = "0.24.1"
signal-hook = "0.3.17"
strum = { version = "0.25.0", features = ["derive"] }
Expand Down
75 changes: 50 additions & 25 deletions conmon-rs/server/src/container_log.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::{container_io::Pipe, cri_logger::CriLogger};
use crate::{container_io::Pipe, cri_logger::CriLogger, json_logger::JsonLogger};
use anyhow::Result;
use capnp::struct_list::Reader;
use conmon_common::conmon_capnp::conmon::log_driver::{Owned, Type};
use futures::future::join_all;
use futures::{future::join_all, FutureExt};
use std::sync::Arc;
use tokio::{io::AsyncBufRead, sync::RwLock};

Expand All @@ -12,10 +12,10 @@ pub type SharedContainerLog = Arc<RwLock<ContainerLog>>;
pub struct ContainerLog {
drivers: Vec<LogDriver>,
}

#[derive(Debug)]
enum LogDriver {
ContainerRuntimeInterface(CriLogger),
Json(JsonLogger),
}

impl ContainerLog {
Expand All @@ -24,25 +24,32 @@ impl ContainerLog {
Arc::new(RwLock::new(Self::default()))
}

/// Create a new SharedContainerLog from an capnp owned reader.
pub fn from(reader: Reader<Owned>) -> Result<SharedContainerLog> {
let drivers = reader
.iter()
.flat_map(|x| -> Result<_> {
Ok(match x.get_type()? {
.map(|x| -> Result<_> {
match x.get_type()? {
Type::ContainerRuntimeInterface => {
LogDriver::ContainerRuntimeInterface(CriLogger::new(
Ok(LogDriver::ContainerRuntimeInterface(CriLogger::new(
x.get_path()?,
if x.get_max_size() > 0 {
Some(x.get_max_size() as usize)
} else {
None
},
)?)
)?))
}
})
Type::Json => Ok(LogDriver::Json(JsonLogger::new(
x.get_path()?,
if x.get_max_size() > 0 {
Some(x.get_max_size() as usize)
} else {
None
},
)?)),
}
})
.collect();
.collect::<Result<Vec<_>>>()?;
Ok(Arc::new(RwLock::new(Self { drivers })))
}

Expand All @@ -52,7 +59,10 @@ impl ContainerLog {
self.drivers
.iter_mut()
.map(|x| match x {
LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => cri_logger.init(),
LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => {
cri_logger.init().boxed()
}
LogDriver::Json(ref mut json_logger) => json_logger.init().boxed(),
})
.collect::<Vec<_>>(),
)
Expand All @@ -68,7 +78,10 @@ impl ContainerLog {
self.drivers
.iter_mut()
.map(|x| match x {
LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => cri_logger.reopen(),
LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => {
cri_logger.reopen().boxed()
}
LogDriver::Json(ref mut json_logger) => json_logger.reopen().boxed(),
})
.collect::<Vec<_>>(),
)
Expand All @@ -81,21 +94,33 @@ impl ContainerLog {
/// Write the contents of the provided reader into all loggers.
pub async fn write<T>(&mut self, pipe: Pipe, bytes: T) -> Result<()>
where
T: AsyncBufRead + Unpin + Copy,
T: AsyncBufRead + Unpin + Clone,
{
join_all(
self.drivers
.iter_mut()
.map(|x| match x {
LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => {
cri_logger.write(pipe, bytes)
let futures = self
.drivers
.iter_mut()
.map(|x| {
async fn box_future<'a, T: AsyncBufRead + Unpin + Clone>(
logger: &mut LogDriver,
pipe: Pipe,
bytes: T,
) -> Result<()> {
match logger {
LogDriver::ContainerRuntimeInterface(cri_logger) => {
cri_logger.write(pipe, bytes).await
}
LogDriver::Json(json_logger) => json_logger.write(pipe, bytes).await,
}
})
.collect::<Vec<_>>(),
)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
}

box_future(x, pipe, bytes.clone())
})
.collect::<Vec<_>>();

join_all(futures)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
Ok(())
}
}
182 changes: 182 additions & 0 deletions conmon-rs/server/src/json_logger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
use crate::container_io::Pipe;
use anyhow::{Context, Result};
use getset::{CopyGetters, Getters, Setters};
use serde_json::json;
use std::{
marker::Unpin,
path::{Path, PathBuf},
};
use tokio::{
fs::{File, OpenOptions},
io::{AsyncBufRead, AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
};
use tracing::debug;

#[derive(Debug, CopyGetters, Getters, Setters)]
pub struct JsonLogger {
#[getset(get)]
path: PathBuf,

#[getset(set)]
file: Option<BufWriter<File>>,

#[getset(get_copy)]
max_log_size: Option<usize>,

#[getset(get_copy, set)]
bytes_written: usize,
}

impl JsonLogger {
const ERR_UNINITIALIZED: &'static str = "logger not initialized";

pub fn new<T: AsRef<Path>>(path: T, max_log_size: Option<usize>) -> Result<JsonLogger> {
Ok(Self {
path: path.as_ref().into(),
file: None,
max_log_size,
bytes_written: 0,
})
}

pub async fn init(&mut self) -> Result<()> {
debug!("Initializing JSON logger in path {}", self.path().display());
self.set_file(Self::open(self.path()).await?.into());
Ok(())
}

pub async fn write<T>(&mut self, pipe: Pipe, bytes: T) -> Result<()>
where
T: AsyncBufRead + Unpin,
{
let mut reader = BufReader::new(bytes);
let mut line_buf = Vec::new();

while reader.read_until(b'\n', &mut line_buf).await? > 0 {
let log_entry = json!({
"timestamp": format!("{:?}", std::time::SystemTime::now()),
"pipe": match pipe {
Pipe::StdOut => "stdout",
Pipe::StdErr => "stderr",
},
"message": String::from_utf8_lossy(&line_buf).trim().to_string()
});

let log_str = log_entry.to_string();
let bytes = log_str.as_bytes();
self.bytes_written += bytes.len();

if let Some(max_size) = self.max_log_size {
if self.bytes_written > max_size {
self.reopen().await?;
self.bytes_written = 0;
}
}

let file = self.file.as_mut().context(Self::ERR_UNINITIALIZED)?;
file.write_all(bytes).await?;
file.write_all(b"\n").await?;
self.flush().await?;
line_buf.clear();
}

Ok(())
}

pub async fn reopen(&mut self) -> Result<()> {
debug!("Reopen JSON log {}", self.path().display());
self.file
.as_mut()
.context(Self::ERR_UNINITIALIZED)?
.get_ref()
.sync_all()
.await?;
self.init().await
}

pub async fn flush(&mut self) -> Result<()> {
self.file
.as_mut()
.context(Self::ERR_UNINITIALIZED)?
.flush()
.await
.context("flush file writer")
}

async fn open<T: AsRef<Path>>(path: T) -> Result<BufWriter<File>> {
Ok(BufWriter::new(
OpenOptions::new()
.create(true)
.read(true)
.truncate(true)
.write(true)
.open(&path)
.await
.context(format!("open log file path '{}'", path.as_ref().display()))?,
))
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
use tokio::io::AsyncReadExt;

#[tokio::test]
async fn test_json_logger_new() {
let logger = JsonLogger::new("/tmp/test.log", Some(1000)).unwrap();
assert_eq!(logger.path().to_str().unwrap(), "/tmp/test.log");
assert_eq!(logger.max_log_size().unwrap(), 1000);
}

#[tokio::test]
async fn test_json_logger_init() {
let mut logger = JsonLogger::new("/tmp/test_init.log", Some(1000)).unwrap();
logger.init().await.unwrap();
assert!(logger.file.is_some());
}

#[tokio::test]
async fn test_json_logger_write() {
let mut logger = JsonLogger::new("/tmp/test_write.log", Some(1000)).unwrap();
logger.init().await.unwrap();

let cursor = Cursor::new(b"Test log message\n".to_vec());
logger.write(Pipe::StdOut, cursor).await.unwrap();

// Read back from the file
let mut file = File::open("/tmp/test_write.log").await.unwrap();
let mut contents = String::new();
file.read_to_string(&mut contents).await.unwrap();

// Check if the file contains the logged message
assert!(contents.contains("Test log message"));
}

#[tokio::test]
async fn test_json_logger_reopen() {
let mut logger = JsonLogger::new("/tmp/test_reopen.log", Some(1000)).unwrap();
logger.init().await.unwrap();

// Write to the file
let cursor = Cursor::new(b"Test log message before reopen\n".to_vec());
logger.write(Pipe::StdOut, cursor).await.unwrap();

// Reopen the file
logger.reopen().await.unwrap();

// Write to the file again
let cursor = Cursor::new(b"Test log message after reopen\n".to_vec());
logger.write(Pipe::StdOut, cursor).await.unwrap();

// Read back from the file
let mut file = File::open("/tmp/test_reopen.log").await.unwrap();
let mut contents = String::new();
file.read_to_string(&mut contents).await.unwrap();

// Check if the file contains the logged message
assert!(contents.contains("Test log message after reopen"));
assert!(!contents.contains("Test log message before reopen"));
}
}
1 change: 1 addition & 0 deletions conmon-rs/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod cri_logger;
mod fd_socket;
mod init;
mod journal;
mod json_logger;
mod listener;
mod oom_watcher;
mod pause;
Expand Down
Loading

0 comments on commit f870ade

Please sign in to comment.