From f5e9cea4698cd4cf74f791b74168cda161f70552 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 20 Jan 2022 13:55:24 -0800 Subject: [PATCH] trace: Apache Common Log Format access logging (#1319) This branch builds on @tustvold's work in #601. The original PR description from that branch: > Access logging is very important functionality for my team as we wish > to maintain feature-parity with our existing AWS ALB-based approach. > This functionality was requested > [here](https://github.com/linkerd/linkerd2/issues/1913) and was marked > as help wanted, so thought I'd take a stab at implementing it. > > Creating as a draft as still needs more testing and benchmarking, and > I'm new to tower and so might have made some rookie errors. However, I > wanted to create a draft as an opportunity to get some early feedback. > > The basic design consists of an AccessLogLayer that instruments both > requests and responses that flow through it, in a similar manner to > how handle_time is already computed. I'm not a massive fan of this, > but it was the only way I could easily see to get accurate processing > time metrics. I've tried to avoid any memory allocation on the hot > path, although there are possibly more atomic increments than I would > like. The performance impact with the feature disabled, i.e. > LINKERD2_PROXY_ACCESS_LOG_FILE, not set should be minimal. > > The results of this instrumentation are then sent over a mpsc channel > to an AccessLogCollector that writes them in a space-delimited format > to a file specified as an environment variable. It buffers in memory > and flushes on termination and on write if more than > FLUSH_TIMEOUT_SECS since the last flush. This makes the access logging > best effort much like AWS ALBs. > > An example deployment scenario using this functionality might deploy a > fluent-bit sidecar to ship the logs, or write to /dev/stderr and use a > log shipper deployed as a DaemonSet. The additional changes in this branch are: - Updated against the latest state of the `main` branch. - Changed the `tracing` configuration to use per-layer filtering, so that the access log layer _only_ sees access log spans, while the stdout logging layer does not see the access log spans (although, it _could_ if we wanted it to...) - Changed the format for outputting the access log to the Apache Common Log Format. Note that this format does *not* include all the data that the access log spans currently collect; I excluded that data so that the output is compatible with tools that ingest the Apache log format. In a follow-up PR, we can add the ability to control what format the access log is written in, and add an alternative format that includes all the access log data that Linkerd's spans can collect (I suggest newline-delimited JSON for this). Of course, a huge thank you to @tustvold for all their work on this; I only updated the branch with the latest changes and made some minor improvements. :) Co-authored-by: Raphael Taylor-Davies --- Cargo.lock | 23 +++ Cargo.toml | 1 + linkerd/app/inbound/Cargo.toml | 1 + linkerd/app/inbound/src/http/server.rs | 7 +- linkerd/http-access-log/Cargo.toml | 19 +++ linkerd/http-access-log/src/lib.rs | 207 +++++++++++++++++++++++++ linkerd/http-retry/src/lib.rs | 2 +- linkerd/tls/src/server.rs | 11 ++ linkerd/tracing/Cargo.toml | 3 +- linkerd/tracing/src/access_log.rs | 156 +++++++++++++++++++ linkerd/tracing/src/lib.rs | 38 ++++- linkerd/tracing/src/test.rs | 11 +- 12 files changed, 466 insertions(+), 13 deletions(-) create mode 100644 linkerd/http-access-log/Cargo.toml create mode 100644 linkerd/http-access-log/src/lib.rs create mode 100644 linkerd/tracing/src/access_log.rs diff --git a/Cargo.lock b/Cargo.lock index 59fb534370..c742cda184 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -537,6 +537,12 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6456b8a6c8f33fee7d958fcd1b60d55b11940a79e63ae87013e6d22e26034440" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.14.16" @@ -867,6 +873,7 @@ dependencies = [ "libfuzzer-sys", "linkerd-app-core", "linkerd-app-test", + "linkerd-http-access-log", "linkerd-io", "linkerd-meshtls", "linkerd-meshtls-rustls", @@ -1061,6 +1068,22 @@ dependencies = [ "tokio", ] +[[package]] +name = "linkerd-http-access-log" +version = "0.1.0" +dependencies = [ + "futures-core", + "http", + "humantime", + "linkerd-identity", + "linkerd-proxy-transport", + "linkerd-stack", + "linkerd-tls", + "linkerd-tracing", + "pin-project", + "tracing", +] + [[package]] name = "linkerd-http-box" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index d695ef366e..6aca68e28f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ members = [ "linkerd/errno", "linkerd/error-respond", "linkerd/exp-backoff", + "linkerd/http-access-log", "linkerd/http-box", "linkerd/http-classify", "linkerd/http-metrics", diff --git a/linkerd/app/inbound/Cargo.toml b/linkerd/app/inbound/Cargo.toml index 6141338bd8..8432814ff0 100644 --- a/linkerd/app/inbound/Cargo.toml +++ b/linkerd/app/inbound/Cargo.toml @@ -14,6 +14,7 @@ bytes = "1" http = "0.2" futures = { version = "0.3", default-features = false } linkerd-app-core = { path = "../core" } +linkerd-http-access-log = { path = "../../http-access-log" } linkerd-server-policy = { path = "../../server-policy" } linkerd-tonic-watch = { path = "../../tonic-watch" } linkerd2-proxy-api = { version = "0.3", features = ["client", "inbound"] } diff --git a/linkerd/app/inbound/src/http/server.rs b/linkerd/app/inbound/src/http/server.rs index 8f20d518a7..f976513736 100644 --- a/linkerd/app/inbound/src/http/server.rs +++ b/linkerd/app/inbound/src/http/server.rs @@ -11,9 +11,10 @@ use linkerd_app_core::{ proxy::http, svc::{self, ExtractParam, Param}, tls, - transport::OrigDstAddr, + transport::{ClientAddr, OrigDstAddr, Remote}, Error, Result, }; +use linkerd_http_access_log::NewAccessLog; use tracing::debug_span; #[derive(Copy, Clone, Debug)] @@ -26,7 +27,8 @@ impl Inbound { + Param + Param + Param - + Param, + + Param + + Param>, T: Clone + Send + Unpin + 'static, I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + Send + Unpin + 'static, H: svc::NewService + Clone + Send + Sync + Unpin + 'static, @@ -79,6 +81,7 @@ impl Inbound { .push(http::BoxResponse::layer()), ) .check_new_service::>() + .push(NewAccessLog::layer()) .instrument(|t: &T| debug_span!("http", v = %Param::::param(t))) .push(http::NewServeHttp::layer(h2_settings, rt.drain.clone())) .push_on_service(svc::BoxService::layer()) diff --git a/linkerd/http-access-log/Cargo.toml b/linkerd/http-access-log/Cargo.toml new file mode 100644 index 0000000000..a94691880d --- /dev/null +++ b/linkerd/http-access-log/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "linkerd-http-access-log" +version = "0.1.0" +authors = ["Linkerd Developers "] +license = "Apache-2.0" +edition = "2018" +publish = false + +[dependencies] +futures-core = "0.3" +http = "0.2" +humantime = "2" +pin-project = "1" +linkerd-stack = { path = "../stack" } +linkerd-identity = { path = "../identity" } +linkerd-tls = { path = "../tls" } +linkerd-proxy-transport = { path = "../proxy/transport" } +linkerd-tracing = { path = "../tracing" } +tracing = "0.1.19" diff --git a/linkerd/http-access-log/src/lib.rs b/linkerd/http-access-log/src/lib.rs new file mode 100644 index 0000000000..a01a5677bb --- /dev/null +++ b/linkerd/http-access-log/src/lib.rs @@ -0,0 +1,207 @@ +#![deny(warnings, rust_2018_idioms)] +#![forbid(unsafe_code)] + +use futures_core::TryFuture; +use linkerd_identity as identity; +use linkerd_proxy_transport::{ClientAddr, Remote}; +use linkerd_stack as svc; +use linkerd_tls as tls; +use linkerd_tracing::access_log::TRACE_TARGET; +use pin_project::pin_project; +use std::{ + future::Future, + net::SocketAddr, + pin::Pin, + task::{Context, Poll}, + time::{Duration, Instant, SystemTime}, +}; +use svc::{NewService, Param}; +use tracing::{field, span, Level, Span}; + +#[derive(Clone, Debug)] +pub struct NewAccessLog { + inner: N, +} + +#[derive(Clone, Debug)] +pub struct AccessLogContext { + inner: S, + client_addr: SocketAddr, + client_id: Option, +} + +struct ResponseFutureInner { + span: Span, + start: Instant, + processing: Duration, +} + +#[pin_project] +pub struct AccessLogFuture { + data: Option, + + #[pin] + inner: F, +} + +impl NewAccessLog { + /// Returns a new `NewAccessLog` layer that wraps an inner service with + /// access logging middleware. + /// + /// The access log is recorded by adding a `tracing` span to the service's + /// future. If access logging is not enabled by the `tracing` subscriber, + /// this span will never be enabled, and it can be skipped cheaply. When + /// access logging *is* enabled, additional data will be recorded when the + /// response future completes. + /// + /// Recording the access log will introduce additional overhead in the + /// request path, but this is largely avoided when access logging is not + /// enabled. + #[inline] + pub fn layer() -> impl svc::layer::Layer { + svc::layer::mk(|inner| NewAccessLog { inner }) + } +} + +impl NewService for NewAccessLog +where + T: Param + Param>, + N: NewService, +{ + type Service = AccessLogContext; + + fn new_service(&self, target: T) -> Self::Service { + let Remote(ClientAddr(client_addr)) = target.param(); + let tls: tls::ConditionalServerTls = target.param(); + let client_id = tls + .value() + .and_then(|tls| tls.client_id().map(|tls::ClientId(name)| name.clone())); + let inner = self.inner.new_service(target); + AccessLogContext { + inner, + client_addr, + client_id, + } + } +} + +impl svc::Service> for AccessLogContext +where + S: svc::Service, Response = http::Response>, +{ + type Response = S::Response; + type Error = S::Error; + type Future = AccessLogFuture; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, request: http::Request) -> Self::Future { + let get_header = |name: http::header::HeaderName| { + request + .headers() + .get(name) + .and_then(|x| x.to_str().ok()) + .unwrap_or_default() + }; + + let trace_id = || { + let headers = request.headers(); + headers + .get("x-b3-traceid") + .or_else(|| headers.get("x-request-id")) + .or_else(|| headers.get("x-amzn-trace-id")) + .and_then(|x| x.to_str().ok()) + .unwrap_or_default() + }; + + let span = span!(target: TRACE_TARGET, Level::INFO, "http", + client.addr = %self.client_addr, + client.id = self.client_id.as_ref().map(|n| n.as_str()).unwrap_or("-"), + timestamp = %now(), + method = request.method().as_str(), + uri = %request.uri(), + version = ?request.version(), + trace_id = trace_id(), + request_bytes = get_header(http::header::CONTENT_LENGTH), + status = field::Empty, + response_bytes = field::Empty, + total_ns = field::Empty, + processing_ns = field::Empty, + user_agent = get_header(http::header::USER_AGENT), + host = get_header(http::header::HOST), + ); + + // The access log span is only enabled by the `tracing` subscriber if + // access logs are being recorded. If it's disabled, we can skip + // recording additional data in the response future. + if span.is_disabled() { + return AccessLogFuture { + data: None, + inner: self.inner.call(request), + }; + } + + AccessLogFuture { + data: Some(ResponseFutureInner { + span, + start: Instant::now(), + processing: Duration::from_secs(0), + }), + inner: self.inner.call(request), + } + } +} + +impl Future for AccessLogFuture +where + F: TryFuture>, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + let data: &mut ResponseFutureInner = match &mut this.data { + Some(data) => data, + None => return this.inner.try_poll(cx), + }; + + let _enter = data.span.enter(); + let poll_start = Instant::now(); + + let response: http::Response = match this.inner.try_poll(cx) { + Poll::Pending => { + data.processing += Instant::now().duration_since(poll_start); + return Poll::Pending; + } + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Ready(Ok(response)) => response, + }; + + let now = Instant::now(); + let total_ns = now.duration_since(data.start).as_nanos(); + let processing_ns = (now.duration_since(poll_start) + data.processing).as_nanos(); + + let span = &data.span; + + response + .headers() + .get(http::header::CONTENT_LENGTH) + .and_then(|x| x.to_str().ok()) + .map(|x| span.record("response_bytes", &x)); + + span.record("status", &response.status().as_u16()); + span.record("total_ns", &field::display(total_ns)); + span.record("processing_ns", &field::display(processing_ns)); + + Poll::Ready(Ok(response)) + } +} + +#[inline] +fn now() -> humantime::Rfc3339Timestamp { + humantime::format_rfc3339(SystemTime::now()) +} diff --git a/linkerd/http-retry/src/lib.rs b/linkerd/http-retry/src/lib.rs index 893101b57e..8018e69603 100644 --- a/linkerd/http-retry/src/lib.rs +++ b/linkerd/http-retry/src/lib.rs @@ -938,7 +938,7 @@ mod tests { tx: Tx(tx), initial, replay, - _trace: linkerd_tracing::test::with_default_filter("linkerd_http_retry=debug"), + _trace: linkerd_tracing::test::with_default_filter("linkerd_http_retry=debug").0, } } } diff --git a/linkerd/tls/src/server.rs b/linkerd/tls/src/server.rs index 1740ced355..ff8702d9f9 100644 --- a/linkerd/tls/src/server.rs +++ b/linkerd/tls/src/server.rs @@ -294,6 +294,17 @@ impl fmt::Display for NoServerTls { } } +// === impl ServerTls === + +impl ServerTls { + pub fn client_id(&self) -> Option<&ClientId> { + match self { + ServerTls::Established { ref client_id, .. } => client_id.as_ref(), + _ => None, + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/linkerd/tracing/Cargo.toml b/linkerd/tracing/Cargo.toml index 37ed839b92..268656a593 100644 --- a/linkerd/tracing/Cargo.toml +++ b/linkerd/tracing/Cargo.toml @@ -18,5 +18,6 @@ tracing-log = "0.1.2" [dependencies.tracing-subscriber] version = "0.3" -features = ["env-filter","smallvec", "tracing-log", "json", "parking_lot"] +default-features = false +features = ["env-filter", "fmt", "smallvec", "tracing-log", "json", "parking_lot", "registry"] diff --git a/linkerd/tracing/src/access_log.rs b/linkerd/tracing/src/access_log.rs new file mode 100644 index 0000000000..d79ee63771 --- /dev/null +++ b/linkerd/tracing/src/access_log.rs @@ -0,0 +1,156 @@ +use std::fmt; +use tracing::{field, span, Id, Level, Metadata, Subscriber}; +use tracing_subscriber::{ + field::RecordFields, + filter::{Directive, FilterFn, Filtered}, + fmt::{format, FormatFields, FormattedFields}, + layer::{Context, Layer}, + registry::LookupSpan, +}; + +pub const TRACE_TARGET: &str = "_access_log"; + +pub(super) type AccessLogLayer = Filtered; + +pub(super) struct Writer { + formatter: F, +} + +#[derive(Default)] +pub(super) struct ApacheCommon { + _p: (), +} + +struct ApacheCommonVisitor<'writer> { + res: fmt::Result, + writer: format::Writer<'writer>, +} + +pub(super) fn build() -> (AccessLogLayer, Directive) +where + S: Subscriber + for<'span> LookupSpan<'span>, +{ + let writer = Writer::new().with_filter( + FilterFn::new( + (|meta| meta.level() == &Level::INFO && meta.target().starts_with(TRACE_TARGET)) + as fn(&Metadata<'_>) -> bool, + ) + .with_max_level_hint(Level::INFO), + ); + + // Also, ensure that the `tracing` filter configuration will + // always enable the access log spans. + let directive = format!("{}=info", TRACE_TARGET) + .parse() + .expect("access logging filter directive must parse"); + + (writer, directive) +} + +// === impl Writer === + +impl Writer { + pub fn new() -> Self { + Self { + formatter: Default::default(), + } + } +} + +impl Layer for Writer +where + S: Subscriber + for<'span> LookupSpan<'span>, + F: for<'writer> FormatFields<'writer> + 'static, +{ + fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) { + let span = ctx.span(id).expect("Span not found, this is a bug"); + let mut extensions = span.extensions_mut(); + + if extensions.get_mut::>().is_none() { + let mut fields = FormattedFields::::new(String::new()); + if self + .formatter + .format_fields(fields.as_writer(), attrs) + .is_ok() + { + extensions.insert(fields); + } + } + } + + fn on_record(&self, id: &Id, values: &span::Record<'_>, ctx: Context<'_, S>) { + let span = ctx.span(id).expect("Span not found, this is a bug"); + let mut extensions = span.extensions_mut(); + if let Some(fields) = extensions.get_mut::>() { + let _ = self.formatter.add_fields(fields, values); + return; + } + + let mut fields = FormattedFields::::new(String::new()); + if self + .formatter + .format_fields(fields.as_writer(), values) + .is_ok() + { + extensions.insert(fields); + } + } + + fn on_close(&self, id: Id, ctx: Context<'_, S>) { + if let Some(span) = ctx.span(&id) { + if let Some(fields) = span.extensions().get::>() { + eprintln!("{}", fields.fields); + } + } + } +} + +impl ApacheCommon { + const SKIPPED_FIELDS: &'static [&'static str] = &[ + "trace_id", + "request_bytes", + "total_ns", + "processing_ns", + "response_bytes", + "user_agent", + "host", + ]; +} + +impl<'writer> FormatFields<'writer> for ApacheCommon { + fn format_fields(&self, writer: format::Writer<'_>, fields: R) -> fmt::Result { + let mut visitor = ApacheCommonVisitor { + writer, + res: Ok(()), + }; + fields.record(&mut visitor); + visitor.res + } + + #[inline] + fn add_fields( + &self, + current: &mut FormattedFields, + fields: &span::Record<'_>, + ) -> fmt::Result { + self.format_fields(current.as_writer(), fields) + } +} + +impl field::Visit for ApacheCommonVisitor<'_> { + fn record_str(&mut self, field: &field::Field, val: &str) { + self.record_debug(field, &format_args!("{}", val)) + } + + fn record_debug(&mut self, field: &field::Field, val: &dyn fmt::Debug) { + self.res = match field.name() { + n if ApacheCommon::SKIPPED_FIELDS.contains(&n) => return, + "timestamp" => write!(&mut self.writer, " [{:?}]", val), + "client.addr" => write!(&mut self.writer, "{:?}", val), + "client.id" => write!(&mut self.writer, " {:?} -", val), + "method" => write!(&mut self.writer, " \"{:?}", val), + "version" => write!(&mut self.writer, " {:?}\"", val), + _ => write!(&mut self.writer, " {:?}", val), + } + } +} diff --git a/linkerd/tracing/src/lib.rs b/linkerd/tracing/src/lib.rs index 61c4751e2a..99da69f464 100644 --- a/linkerd/tracing/src/lib.rs +++ b/linkerd/tracing/src/lib.rs @@ -1,18 +1,26 @@ #![deny(warnings, rust_2018_idioms, clippy::disallowed_method)] #![forbid(unsafe_code)] +pub mod access_log; pub mod level; pub mod test; mod uptime; use self::uptime::Uptime; use linkerd_error::Error; -use std::{env, str}; +use std::str; use tracing::{Dispatch, Subscriber}; -use tracing_subscriber::{fmt::format, prelude::*, registry::LookupSpan, reload, EnvFilter, Layer}; +use tracing_subscriber::{ + filter::{EnvFilter, FilterFn}, + fmt::format, + prelude::*, + registry::LookupSpan, + reload, Layer, +}; const ENV_LOG_LEVEL: &str = "LINKERD2_PROXY_LOG"; const ENV_LOG_FORMAT: &str = "LINKERD2_PROXY_LOG_FORMAT"; +const ENV_ACCESS_LOG: &str = "LINKERD2_PROXY_ACCESS_LOG"; const DEFAULT_LOG_LEVEL: &str = "warn,linkerd=info"; const DEFAULT_LOG_FORMAT: &str = "PLAIN"; @@ -21,6 +29,7 @@ const DEFAULT_LOG_FORMAT: &str = "PLAIN"; pub struct Settings { filter: Option, format: Option, + access_log: bool, is_test: bool, } @@ -72,6 +81,7 @@ impl Settings { Some(Self { filter, format: std::env::var(ENV_LOG_FORMAT).ok(), + access_log: std::env::var(ENV_ACCESS_LOG).is_ok(), is_test: false, }) } @@ -80,6 +90,7 @@ impl Settings { Self { filter: Some(filter), format: Some(format), + access_log: false, is_test: true, } } @@ -139,23 +150,40 @@ impl Settings { pub fn build(self) -> (Dispatch, Handle) { let log_level = self.filter.as_deref().unwrap_or(DEFAULT_LOG_LEVEL); - let (filter, level) = reload::Layer::new(EnvFilter::new(log_level)); + + let mut filter = EnvFilter::new(log_level); + + // If access logging is enabled, build the access log layer. + let access_log = if self.access_log { + let (access_log, directive) = access_log::build(); + filter = filter.add_directive(directive); + Some(access_log) + } else { + None + }; + + let (filter, level) = reload::Layer::new(filter); let level = level::Handle::new(level); let logger = match self.format().as_ref() { "JSON" => self.mk_json(), _ => self.mk_plain(), }; + let logger = logger.with_filter(FilterFn::new(|meta| { + !meta.target().starts_with(access_log::TRACE_TARGET) + })); + + let handle = Handle(Some(level)); let dispatch = tracing_subscriber::registry() .with(filter) + .with(access_log) .with(logger) .into(); - (dispatch, Handle(Some(level))) + (dispatch, handle) } } - // === impl Handle === impl Handle { diff --git a/linkerd/tracing/src/test.rs b/linkerd/tracing/src/test.rs index 998de99aca..f18b2dae3e 100644 --- a/linkerd/tracing/src/test.rs +++ b/linkerd/tracing/src/test.rs @@ -1,4 +1,5 @@ use super::*; +use std::env; /// By default, disable logging in modules that are expected to error in tests. pub const DEFAULT_LOG: &str = "warn,\ @@ -17,11 +18,13 @@ pub fn trace_subscriber(default: impl ToString) -> (Dispatch, Handle) { Settings::for_test(log_level, log_format).build() } -pub fn with_default_filter(default: impl ToString) -> tracing::dispatcher::DefaultGuard { - let (d, _) = trace_subscriber(default); - tracing::dispatcher::set_default(&d) +pub fn with_default_filter( + default: impl ToString, +) -> (tracing::dispatcher::DefaultGuard, crate::Handle) { + let (d, handle) = trace_subscriber(default); + (tracing::dispatcher::set_default(&d), handle) } -pub fn trace_init() -> tracing::dispatcher::DefaultGuard { +pub fn trace_init() -> (tracing::dispatcher::DefaultGuard, crate::Handle) { with_default_filter(DEFAULT_LOG) }