Skip to content

Commit

Permalink
updated journald
Browse files Browse the repository at this point in the history
Signed-off-by: wasup-yash <[email protected]>
  • Loading branch information
wasup-yash committed Jan 6, 2024
1 parent 378747f commit c1768fc
Show file tree
Hide file tree
Showing 18 changed files with 488 additions and 258 deletions.
6 changes: 6 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"rust-analyzer.linkedProjects": [
"./conmon-rs/server/Cargo.toml",
"./conmon-rs/server/Cargo.toml"
]
}
76 changes: 76 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions conmon-rs/common/proto/conmon.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ interface Conmon {
containerRuntimeInterface @0;
# The JSON logger, requires `path` to be set.
json @1;
journald @2;
}
}

Expand Down
3 changes: 2 additions & 1 deletion conmon-rs/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ tracing-opentelemetry = "0.22.0"
tracing-subscriber = "0.3.18"
tz-rs = "0.6.14"
uuid = { version = "1.6.1", features = ["v4", "fast-rng", "macro-diagnostics"] }

systemd = "0.9"
[build-dependencies]
shadow-rs = "0.25.0"
dashmap = "5.5.3"
systemd = "0.9"

[dev-dependencies]
mockall = "0.11.4"
Expand Down
2 changes: 1 addition & 1 deletion conmon-rs/server/src/container_io.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
attach::SharedContainerAttach, container_log::SharedContainerLog, streams::Streams,
attach::SharedContainerAttach, container_log::container_log::SharedContainerLog, streams::Streams,
terminal::Terminal,
};
use anyhow::{bail, Context, Result};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{container_io::Pipe, cri_logger::CriLogger, json_logger::JsonLogger};
use crate::{container_io::Pipe, container_log::cri_logger::CriLogger, container_log::journald::JournaldLogger, container_log::json_logger::JsonLogger};
use anyhow::Result;
use capnp::struct_list::Reader;
use conmon_common::conmon_capnp::conmon::log_driver::{Owned, Type};
Expand All @@ -12,48 +12,63 @@ pub type SharedContainerLog = Arc<RwLock<ContainerLog>>;
pub struct ContainerLog {
drivers: Vec<LogDriver>,
}

#[derive(Debug)]
enum LogDriver {
ContainerRuntimeInterface(CriLogger),
Journald(JournaldLogger),
Json(JsonLogger),
}

impl ContainerLog {
/// Create a new default SharedContainerLog.
///Create a new default SharedContainerLog
pub fn new() -> SharedContainerLog {
Arc::new(RwLock::new(Self::default()))
}

/// Create a new SharedContainerLog from an capnp owned reader.
pub fn from(reader: Reader<Owned>) -> Result<SharedContainerLog> {
let drivers = reader
.iter()
.map(|x| -> Result<_> {
match x.get_type()? {
Type::ContainerRuntimeInterface => {
Ok(LogDriver::ContainerRuntimeInterface(CriLogger::new(
Ok(Some(LogDriver::ContainerRuntimeInterface(CriLogger::new(
x.get_path()?,
if x.get_max_size() > 0 {
Some(x.get_max_size() as usize)
} else {
None
},
)?))
)?)))
}
Type::Json => {
Ok(Some(LogDriver::Json(JsonLogger::new(
x.get_path()?,
if x.get_max_size() > 0 {
Some(x.get_max_size() as usize)
} else {
None
},
)?)))
}
Type::Journald => {
Ok(Some(LogDriver::Journald(JournaldLogger::new(

if x.get_max_size() > 0 {
Some(x.get_max_size() as usize)
} else {
None
},
)?)))
}
Type::Json => Ok(LogDriver::Json(JsonLogger::new(
x.get_path()?,
if x.get_max_size() > 0 {
Some(x.get_max_size() as usize)
} else {
None
},
)?)),
}
})
.collect::<Result<Vec<_>>>()?;
.filter_map(Result::transpose)
.collect::<Result<Vec<_>, _>>()?;
Ok(Arc::new(RwLock::new(Self { drivers })))
}




/// Asynchronously initialize all loggers.
pub async fn init(&mut self) -> Result<()> {
join_all(
Expand All @@ -64,15 +79,15 @@ impl ContainerLog {
cri_logger.init().boxed()
}
LogDriver::Json(ref mut json_logger) => json_logger.init().boxed(),
LogDriver::Journald(ref mut journald_logger) => journald_logger.init().boxed(),
})
.collect::<Vec<_>>(),
)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
Ok(())
}

}
/// Reopen the container logs.
pub async fn reopen(&mut self) -> Result<()> {
join_all(
Expand All @@ -83,6 +98,7 @@ impl ContainerLog {
cri_logger.reopen().boxed()
}
LogDriver::Json(ref mut json_logger) => json_logger.reopen().boxed(),
LogDriver::Journald(ref mut journald_logger) => journald_logger.reopen().boxed(),
})
.collect::<Vec<_>>(),
)
Expand All @@ -91,11 +107,10 @@ impl ContainerLog {
.collect::<Result<Vec<_>>>()?;
Ok(())
}

/// Write the contents of the provided reader into all loggers.
pub async fn write<T>(&mut self, pipe: Pipe, bytes: T) -> Result<()>
where
T: AsyncBufRead + Unpin + Clone,
T: AsyncBufRead + Unpin + Clone, // Using Clone to satisfy both Clone and Copy requirements
{
let futures = self
.drivers
Expand All @@ -108,20 +123,23 @@ impl ContainerLog {
) -> Result<()> {
match logger {
LogDriver::ContainerRuntimeInterface(cri_logger) => {
cri_logger.write(pipe, bytes).await
cri_logger.write(pipe, bytes.clone()).await
}
LogDriver::Journald(journald_logger) => {
journald_logger.write(pipe, bytes.clone()).await
}
LogDriver::Json(json_logger) => {
json_logger.write(pipe, bytes.clone()).await
}
LogDriver::Json(json_logger) => json_logger.write(pipe, bytes).await,
}
}

box_future(x, pipe, bytes.clone())
})
.collect::<Vec<_>>();

join_all(futures)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
Ok(())
}
}
}
}
File renamed without changes.
Loading

0 comments on commit c1768fc

Please sign in to comment.