Skip to content

Commit

Permalink
Add journald logger for container
Browse files Browse the repository at this point in the history
Signed-off-by: wasup-yash <[email protected]>
Signed-off-by: Peter Hunt <[email protected]>
Signed-off-by: Sascha Grunert <[email protected]>
  • Loading branch information
wasup-yash authored and saschagrunert committed Oct 14, 2024
1 parent 4120bb1 commit b3eaea6
Show file tree
Hide file tree
Showing 8 changed files with 353 additions and 222 deletions.
2 changes: 2 additions & 0 deletions conmon-rs/common/proto/conmon.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ interface Conmon {
containerRuntimeInterface @0;
# The JSON logger, requires `path` to be set.
json @1;
# The journald logger.
journald @2;
}
}

Expand Down
File renamed without changes.
95 changes: 95 additions & 0 deletions conmon-rs/server/src/container_log/journald.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use crate::{container_io::Pipe, journal::Journal};
use anyhow::{Context, Result};
use std::{io::Write, time::SystemTime};
use tokio::io::{AsyncBufRead, AsyncBufReadExt};
use tracing::debug;

#[derive(Debug)]
pub struct JournaldLogger;

impl JournaldLogger {
pub fn new(_: Option<usize>) -> Result<Self> {
Ok(Self {})
}

pub async fn init(&mut self) -> Result<()> {
debug!("Initializing journald logger");
Ok(())
}

pub async fn write<T>(&mut self, pipe: Pipe, mut bytes: T) -> Result<()>
where
T: AsyncBufRead + Unpin,
{
let mut line_buf = String::new();
while bytes.read_line(&mut line_buf).await? > 0 {
let log_entry = format!(
"{:?} [{}] {}",
SystemTime::now(),
match pipe {
Pipe::StdOut => "stdout",
Pipe::StdErr => "stderr",
},
line_buf.trim()
);

Journal
.write_all(log_entry.as_bytes())
.context("write to journal")?;

Journal.flush().context("flush journal")?;
line_buf.clear();
}

Ok(())
}

pub async fn reopen(&mut self) -> Result<()> {
debug!("Reopen journald log");
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;

#[tokio::test]
async fn test_journald_logger_new() {
JournaldLogger::new(Some(1000)).unwrap();
}

#[tokio::test]
async fn test_journald_logger_init() {
let mut logger = JournaldLogger::new(Some(1000)).unwrap();
assert!(logger.init().await.is_ok());
}

#[tokio::test]
async fn test_journald_logger_write() {
let mut logger = JournaldLogger::new(Some(1000)).unwrap();
logger.init().await.unwrap();

let cursor = Cursor::new(b"Test log message\n".to_vec());
assert!(logger.write(Pipe::StdOut, cursor).await.is_ok());

// Verifying the actual log message in Journald might require additional setup or permissions.
}

#[tokio::test]
async fn test_journald_logger_reopen() {
let mut logger = JournaldLogger::new(Some(1000)).unwrap();
logger.init().await.unwrap();

let cursor = Cursor::new(b"Test log message before reopen\n".to_vec());
assert!(logger.write(Pipe::StdOut, cursor).await.is_ok());

assert!(logger.reopen().await.is_ok());

let cursor = Cursor::new(b"Test log message after reopen\n".to_vec());
assert!(logger.write(Pipe::StdOut, cursor).await.is_ok());

// As with the write test, verifying the actual log messages in Journald might require additional setup or permissions.
}
}
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
use crate::{container_io::Pipe, cri_logger::CriLogger, json_logger::JsonLogger};
mod cri;
mod journald;
mod json;

use crate::{
container_io::Pipe, container_log::cri::CriLogger, container_log::journald::JournaldLogger,
container_log::json::JsonLogger,
};
use anyhow::Result;
use capnp::struct_list::Reader;
use conmon_common::conmon_capnp::conmon::log_driver::{Owned, Type};
Expand All @@ -16,6 +23,7 @@ pub struct ContainerLog {
#[derive(Debug)]
enum LogDriver {
ContainerRuntimeInterface(CriLogger),
Journald(JournaldLogger),
Json(JsonLogger),
}

Expand All @@ -25,6 +33,7 @@ impl ContainerLog {
Arc::new(RwLock::new(Self::default()))
}

/// Create a new SharedContainerLog from an owned reader.
pub fn from(reader: Reader<Owned>) -> Result<SharedContainerLog> {
let drivers = reader
.iter()
Expand All @@ -48,6 +57,13 @@ impl ContainerLog {
None
},
)?)),
Type::Journald => Ok(LogDriver::Journald(JournaldLogger::new(
if x.get_max_size() > 0 {
Some(x.get_max_size() as usize)
} else {
None
},
)?)),
}
})
.collect::<Result<Vec<_>>>()?;
Expand All @@ -64,6 +80,7 @@ impl ContainerLog {
cri_logger.init().boxed()
}
LogDriver::Json(ref mut json_logger) => json_logger.init().boxed(),
LogDriver::Journald(ref mut journald_logger) => journald_logger.init().boxed(),
})
.collect::<Vec<_>>(),
)
Expand All @@ -83,6 +100,9 @@ impl ContainerLog {
cri_logger.reopen().boxed()
}
LogDriver::Json(ref mut json_logger) => json_logger.reopen().boxed(),
LogDriver::Journald(ref mut journald_logger) => {
journald_logger.reopen().boxed()
}
})
.collect::<Vec<_>>(),
)
Expand All @@ -108,9 +128,14 @@ impl ContainerLog {
) -> Result<()> {
match logger {
LogDriver::ContainerRuntimeInterface(cri_logger) => {
cri_logger.write(pipe, bytes).await
cri_logger.write(pipe, bytes.clone()).await
}
LogDriver::Journald(journald_logger) => {
journald_logger.write(pipe, bytes.clone()).await
}
LogDriver::Json(json_logger) => {
json_logger.write(pipe, bytes.clone()).await
}
LogDriver::Json(json_logger) => json_logger.write(pipe, bytes).await,
}
}

Expand Down
2 changes: 0 additions & 2 deletions conmon-rs/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ mod child_reaper;
mod config;
mod container_io;
mod container_log;
mod cri_logger;
mod fd_socket;
mod init;
mod journal;
mod json_logger;
mod listener;
mod oom_watcher;
mod pause;
Expand Down
Loading

0 comments on commit b3eaea6

Please sign in to comment.