Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Critical dependency logging #4988

Merged
merged 36 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
0087a0e
add metrics layer
divagant-martian Dec 4, 2023
39b2533
add metrics
divagant-martian Dec 4, 2023
57baf2d
simplify getting the target
divagant-martian Dec 4, 2023
71ad6a0
make clippy happy
divagant-martian Dec 5, 2023
dc46e92
fix typos
divagant-martian Dec 5, 2023
b3e5ea0
unify deps under workspace
divagant-martian Dec 5, 2023
94573c7
make import statement shorter, fix typos
divagant-martian Dec 5, 2023
a18c189
enable warn by default, mark flag as deprecated
divagant-martian Dec 6, 2023
908ad21
do not exit on error when initializing logging fails
divagant-martian Dec 6, 2023
9dd307d
revert exit on error
divagant-martian Dec 6, 2023
9a6d862
adjust bootnode logging
divagant-martian Dec 6, 2023
5b2395b
add logging layer
eserilev Dec 6, 2023
de2840e
non blocking file writer
eserilev Dec 6, 2023
d6ff17e
non blocking file writer
eserilev Dec 7, 2023
a158865
add tracing visitor
eserilev Dec 7, 2023
1ffac2c
Merge branch 'unstable' of https://github.com/sigp/lighthouse into cr…
eserilev Dec 7, 2023
85deec7
use target as is by default
divagant-martian Dec 10, 2023
46e0ec9
make libp2p events register correctly
divagant-martian Dec 11, 2023
c2b7423
Merge branch 'unstable' into deps-logging-metrics
divagant-martian Dec 11, 2023
f6c4d27
adjust repilcated cli help
divagant-martian Dec 11, 2023
bedf55f
refactor tracing layer
eserilev Dec 11, 2023
7e227a0
linting
eserilev Dec 11, 2023
5bbf27f
filesize
eserilev Dec 11, 2023
6ce1a4e
log gossipsub, dont filter by log level
eserilev Dec 12, 2023
c222b36
turn on debug logs by default, remove deprecation warning
divagant-martian Dec 13, 2023
e256953
Merge branch 'unstable' into deps-logging-metrics
divagant-martian Dec 13, 2023
72ee781
resolve merge conflicts
eserilev Dec 13, 2023
509a421
truncate file, add timestamp, add unit test
eserilev Dec 14, 2023
0d73f76
suppress output (#5)
divagant-martian Dec 18, 2023
f25dcb6
use tracing appender
eserilev Dec 30, 2023
5abb2c5
merge changes from 4979
eserilev Dec 30, 2023
958bb7b
cleanup
eserilev Dec 30, 2023
ebf4b29
Merge latest unstable
AgeManning Jan 16, 2024
f7dea99
Add a task to remove old log files and upgrade to warn level
AgeManning Jan 16, 2024
32bbbfd
Add the time feature for tokio
AgeManning Jan 16, 2024
b2f9d96
Udeps and fmt
AgeManning Jan 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ tempfile = "3"
tokio = { version = "1", features = ["rt-multi-thread", "sync", "signal"] }
tokio-stream = { version = "0.1", features = ["sync"] }
tokio-util = { version = "0.6", features = ["codec", "compat", "time"] }
tracing-appender = "0.2"
tracing-core = "0.1"
tracing-subscriber = "0.3"
tree_hash = "0.5"
tree_hash_derive = "0.5"
url = "2"
Expand Down
5 changes: 1 addition & 4 deletions boot_node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,8 @@ pub fn run(
log::Level::Error => drain.filter_level(Level::Error),
};

let logger = Logger::root(drain.fuse(), o!());
let _scope_guard = slog_scope::set_global_logger(logger);
slog_stdlog::init_with_level(debug_level).unwrap();
let log = Logger::root(drain.fuse(), o!());

let log = slog_scope::logger();
// Run the main function emitting any errors
if let Err(e) = match eth_spec_id {
EthSpecId::Minimal => {
Expand Down
1 change: 1 addition & 0 deletions common/directory/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub const DEFAULT_NETWORK_DIR: &str = "network";
pub const DEFAULT_VALIDATOR_DIR: &str = "validators";
pub const DEFAULT_SECRET_DIR: &str = "secrets";
pub const DEFAULT_WALLET_DIR: &str = "wallets";
pub const DEFAULT_TRACING_DIR: &str = "tracing";

/// Base directory name for unnamed testnets passed through the --testnet-dir flag
pub const CUSTOM_TESTNET_DIR: &str = "custom";
Expand Down
3 changes: 3 additions & 0 deletions common/logging/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ parking_lot = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
chrono = { version = "0.4", default-features = false, features = ["clock", "std"] }
tracing-appender = { workspace = true }
tracing-subscriber = { workspace = true }
tracing-core = { workspace = true }
60 changes: 60 additions & 0 deletions common/logging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,24 @@ use lighthouse_metrics::{
};
use slog::Logger;
use slog_term::Decorator;
use std::collections::HashMap;
use std::io::{Result, Write};
use std::path::PathBuf;
use std::time::{Duration, Instant};
use tracing_logging_layer::TRACING_LOGGING_DEPENDENCIES;

pub const MAX_MESSAGE_WIDTH: usize = 40;

pub mod async_record;
mod sse_logging_components;
mod tracing_logging_layer;
mod tracing_metrics_layer;

pub use sse_logging_components::SSELoggingComponents;
pub use tracing_logging_layer::LoggingLayer;
pub use tracing_logging_layer::NonBlockingFileWriter;
pub use tracing_metrics_layer::MetricsLayer;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

/// The minimum interval between log messages indicating that a queue is full.
const LOG_DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);
Expand Down Expand Up @@ -214,6 +224,56 @@ impl TimeLatch {
}
}

pub fn create_tracing_layer(logfile_max_size: u64, base_tracing_log_path: PathBuf) {
let filter_layer = match tracing_subscriber::EnvFilter::try_from_default_env()
.or_else(|_| tracing_subscriber::EnvFilter::try_new("warn"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we use debug by default?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one's for me, will change to debug

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in c222b36

{
Ok(filter) => filter,
Err(e) => {
eprintln!("Failed to initialize dependency tracing {e}");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think at this point we don't have a useable slogger, so we are forced into prtinln! right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep that's the reason

return;
}
};

let mut file_writer_streams: HashMap<String, NonBlockingFileWriter> = HashMap::new();

for dependency in TRACING_LOGGING_DEPENDENCIES.iter() {
init_file_writer_stream(
&mut file_writer_streams,
base_tracing_log_path.clone(),
dependency,
logfile_max_size * 1_024 * 1_024,
);
}

if let Err(e) = tracing_subscriber::fmt()
.with_env_filter(filter_layer)
.finish()
.with(MetricsLayer)
.with(LoggingLayer {
file_writer_streams,
})
.try_init()
{
eprintln!("Failed to initialize dependency tracing {e}");
}
}

fn init_file_writer_stream(
file_writer_streams: &mut HashMap<String, NonBlockingFileWriter>,
base_path: PathBuf,
file_name: &str,
max_file_size: u64,
) {
let file_path = base_path.join(file_name).with_extension("log");
let Ok(file_writer) = NonBlockingFileWriter::new(file_path.as_path(), max_file_size) else {
eprintln!("Failed to create tracing file stream for {file_name}");
return;
};

file_writer_streams.insert(file_name.to_string(), file_writer);
}

/// Return a logger suitable for test usage.
///
/// By default no logs will be printed, but they can be enabled via
Expand Down
126 changes: 126 additions & 0 deletions common/logging/src/tracing_logging_layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use std::collections::HashMap;
use std::fs::{self, create_dir_all, File, OpenOptions};
use std::io::Write;
use std::path::PathBuf;
use std::sync::mpsc::{self, Sender};
use std::thread;

lazy_static! {
pub static ref TRACING_LOGGING_DEPENDENCIES: Vec<String> =
vec!["libp2p_gossipsub".to_string(), "discv5".to_string()];
}

/// Layer that handles `INFO`, `WARN` and `ERROR` logs emitted per dependency and
/// writes them to a file. Dependencies are enabled via the `RUST_LOG` env flag.
pub struct LoggingLayer {
pub file_writer_streams: HashMap<String, NonBlockingFileWriter>,
}

impl<S: tracing_core::Subscriber> tracing_subscriber::layer::Layer<S> for LoggingLayer {
fn on_event(
&self,
event: &tracing_core::Event<'_>,
_ctx: tracing_subscriber::layer::Context<'_, S>,
) {
let meta = event.metadata();

let target = match meta.target().split_once("::") {
Some((crate_name, _)) => crate_name,
None => "unknown",
};

let Some(file_writer) = self.file_writer_streams.get(target) else {
return;
};

let mut visitor = LogMessageExtractor {
message: String::default(),
};

event.record(&mut visitor);

let _ = file_writer.write(visitor.message);
}
}

pub struct NonBlockingFileWriter {
sender: Sender<String>,
}

impl NonBlockingFileWriter {
pub fn new(path: &std::path::Path, max_file_size: u64) -> Result<Self, std::io::Error> {
let (sender, receiver) = mpsc::channel();
let path = path.to_path_buf();

thread::spawn(move || {
if !path.exists() {
let mut dir = path.clone();
dir.pop();

// Create the necessary directories for the correct service and network.
if !dir.exists() {
let res = create_dir_all(dir);

match res {
Ok(_) => (),
Err(e) => {
eprintln!("Failed to create dir: {:?}", e);
return;
}
}
}
}

let mut file = match OpenOptions::new().create(true).append(true).open(&path) {
Ok(file) => file,
Err(e) => {
eprintln!("Failed to open file: {:?}", e);
return;
}
};

for message in receiver {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something here.

Shouldn't we have a loop here somewhere. It seems like this thread just reads all the messages, writes them, then ends?

It also looks like there is one of these threads per dependency/file. I was expecting an await, or perhaps this tasks continually gets created, but I only saw one initialisation. I'm likely missing something.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we have a loop here somewhere. It seems like this thread just reads all the messages, writes them, then ends?

As far as i understand it, the mpsc channel creates a sender and receiver. The receiver blocks until a message is available (as long as sender still exists).

https://doc.rust-lang.org/std/sync/mpsc/fn.channel.html

So that for message in receiver loop blocks until a message is received and should never end as long as the sender exists.

It also looks like there is one of these threads per dependency/file. I was expecting an await, or perhaps this tasks continually gets created, but I only saw one initialisation. I'm likely missing something.

There is one thread per dependency, the stream initialization per dependency happens here

Might be a bit overkill to spawn a new stream per dependency, I can change it to one stream for all dependencies if that makes more sense

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yeah all good. Sorry have been in tokio async land too long and forgot about std channels :p

let should_clear_file = match NonBlockingFileWriter::get_file_size(&path) {
Ok(file_size) => file_size > max_file_size,
Err(_) => false,
};

if should_clear_file {
let _ = NonBlockingFileWriter::clear_file(&path);
}

if let Err(e) = writeln!(file, "{}", message) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be useful to tag on the time here.

Typically when things go wrong, they break at some specific time. It would be very useful to have a timestamp so we can cross-reference with metrics/lighthouse logs.

eprintln!("Failed to write to file: {:?}", e);
}
}
});

Ok(NonBlockingFileWriter { sender })
}

pub fn write(&self, message: String) -> Result<(), std::io::Error> {
self.sender
.send(message)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
}

fn get_file_size(path: &PathBuf) -> std::io::Result<u64> {
let metadata = fs::metadata(path)?;
Ok(metadata.len())
}

fn clear_file(path: &PathBuf) -> std::io::Result<()> {
File::create(path)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for us to maybe just nuke half the file?

Just worried about the case that we get a bug, but we have just recently removed all our logs and have no history.

Ok(())
}
}

struct LogMessageExtractor {
message: String,
}

impl tracing_core::field::Visit for LogMessageExtractor {
fn record_debug(&mut self, _: &tracing_core::Field, value: &dyn std::fmt::Debug) {
self.message = format!("{} {:?}", self.message, value);
}
}
56 changes: 56 additions & 0 deletions common/logging/src/tracing_metrics_layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
//! Exposes [`MetricsLayer`]: A tracing layer that registers metrics of logging events.

use lighthouse_metrics as metrics;

lazy_static! {
/// Count of `INFO` logs registered per enabled dependency.
pub static ref DEP_INFOS_TOTAL: metrics::Result<metrics::IntCounterVec> =
metrics::try_create_int_counter_vec(
"dep_info_total",
"Count of infos logged per enabled dependency",
&["target"]
);
/// Count of `WARN` logs registered per enabled dependency.
pub static ref DEP_WARNS_TOTAL: metrics::Result<metrics::IntCounterVec> =
metrics::try_create_int_counter_vec(
"dep_warn_total",
"Count of warns logged per enabled dependency",
&["target"]
);
/// Count of `ERROR` logs registered per enabled dependency.
pub static ref DEP_ERRORS_TOTAL: metrics::Result<metrics::IntCounterVec> =
metrics::try_create_int_counter_vec(
"dep_error_total",
"Count of errors logged per enabled dependency",
&["target"]
);
}

/// Layer that registers Prometheus metrics for `INFO`, `WARN` and `ERROR` logs emitted per dependency.
/// Dependencies are enabled via the `RUST_LOG` env flag.
pub struct MetricsLayer;

impl<S: tracing_core::Subscriber> tracing_subscriber::layer::Layer<S> for MetricsLayer {
fn on_event(
&self,
event: &tracing_core::Event<'_>,
_ctx: tracing_subscriber::layer::Context<'_, S>,
) {
let meta = event.metadata();
if !meta.is_event() {
// ignore tracing span events
return;
}
let target = match meta.target().split_once("::") {
Some((crate_name, _)) => crate_name,
None => "unknown",
};
let target = &[target];
match *meta.level() {
tracing_core::Level::INFO => metrics::inc_counter_vec(&DEP_INFOS_TOTAL, target),
tracing_core::Level::WARN => metrics::inc_counter_vec(&DEP_WARNS_TOTAL, target),
tracing_core::Level::ERROR => metrics::inc_counter_vec(&DEP_ERRORS_TOTAL, target),
_ => {}
}
}
}
3 changes: 2 additions & 1 deletion lighthouse/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ types = { workspace = true }
bls = { workspace = true }
ethereum_hashing = { workspace = true }
clap = { workspace = true }
env_logger = { workspace = true }
environment = { workspace = true }
boot_node = { path = "../boot_node" }
futures = { workspace = true }
Expand All @@ -57,6 +56,8 @@ unused_port = { workspace = true }
database_manager = { path = "../database_manager" }
slasher = { workspace = true }
validator_manager = { path = "../validator_manager" }
tracing-subscriber = { workspace = true }
logging = { workspace = true }

[dev-dependencies]
tempfile = { workspace = true }
Expand Down
Loading
Loading