diff --git a/Cargo.lock b/Cargo.lock index 8329f5289e..e8434b61c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -716,6 +716,22 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8dd5a6d5999d9907cda8ed67bbd137d3af8085216c2ac62de5be860bd41f304a" +[[package]] +name = "linkerd2-access-log" +version = "0.1.0" +dependencies = [ + "base64 0.10.1", + "bytes 0.5.4", + "futures 0.3.5", + "hex", + "http 0.2.1", + "linkerd2-error", + "pin-project", + "tokio", + "tower", + "tracing", +] + [[package]] name = "linkerd2-addr" version = "0.1.0" @@ -739,6 +755,7 @@ name = "linkerd2-app" version = "0.1.0" dependencies = [ "bytes 0.5.4", + "chrono", "futures 0.3.5", "h2 0.2.5", "http 0.2.1", @@ -755,6 +772,7 @@ dependencies = [ "linkerd2-opencensus", "linkerd2-proxy-api", "net2", + "pin-project", "quickcheck", "regex 1.0.0", "ring", @@ -783,6 +801,7 @@ dependencies = [ "hyper", "indexmap", "libc", + "linkerd2-access-log", "linkerd2-addr", "linkerd2-admit", "linkerd2-buffer", diff --git a/Cargo.toml b/Cargo.toml index 8b9cd329de..02181a0a0a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "hyper-balance", + "linkerd/access-log", "linkerd/addr", "linkerd/admit", "linkerd/app/core", diff --git a/linkerd/access-log/Cargo.toml b/linkerd/access-log/Cargo.toml new file mode 100644 index 0000000000..685a6fc1d2 --- /dev/null +++ b/linkerd/access-log/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "linkerd2-access-log" +version = "0.1.0" +authors = ["Linkerd Developers "] +edition = "2018" +publish = false + +[dependencies] +base64 = "0.10.1" +bytes = "0.5" +futures = "0.3" +hex = "0.3.2" +http = "0.2" +linkerd2-error = { path = "../error" } +tower = { version = "0.3", default-features = false } +tracing = "0.1.2" +tokio = {version = "0.2", features = ["sync"]} +pin-project = "0.4" \ No newline at end of file diff --git a/linkerd/access-log/src/layer.rs b/linkerd/access-log/src/layer.rs new file mode 100644 index 0000000000..47f41fe9cc --- /dev/null +++ b/linkerd/access-log/src/layer.rs @@ -0,0 +1,99 @@ +use crate::tracker::{ResponseTracker, TrackerState}; +use crate::AccessLog; +use futures::{ready, TryFuture}; +use pin_project::pin_project; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use tokio::sync::mpsc; + +/// A layer that adds access logging +#[derive(Clone)] +pub struct AccessLogLayer { + shared: Option>, +} + +#[derive(Clone)] +pub struct AccessLogContext { + inner: Svc, + shared: Option>, +} + +#[pin_project] +pub struct ResponseFuture { + tracker: Option, + + #[pin] + inner: F, +} + +impl tower::layer::Layer for AccessLogLayer { + type Service = AccessLogContext; + + fn layer(&self, inner: Svc) -> Self::Service { + Self::Service { + inner, + shared: self.shared.clone(), + } + } +} + +impl tower::Service> for AccessLogContext +where + Svc: tower::Service, Response = http::Response>, +{ + type Response = Svc::Response; + type Error = Svc::Error; + type Future = ResponseFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, mut request: http::Request) -> Self::Future { + let shared = match &self.shared { + Some(shared) => shared, + None => { + return ResponseFuture { + tracker: None, + inner: self.inner.call(request), + } + } + }; + + let (request_tracker, response_tracker) = shared.tracker(); + request_tracker.register(&mut request); + + ResponseFuture { + inner: self.inner.call(request), + tracker: Some(response_tracker), + } + } +} + +impl AccessLogLayer { + pub fn new(sink: Option>) -> Self { + Self { + shared: sink.map(|s| Arc::new(TrackerState::new(s))), + } + } +} + +impl Future for ResponseFuture +where + F: TryFuture>, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let mut inner: http::Response = ready!(this.inner.try_poll(cx))?; + + if let Some(tracker) = this.tracker.take() { + tracker.register(&mut inner); + } + + Poll::Ready(Ok(inner)) + } +} diff --git a/linkerd/access-log/src/lib.rs b/linkerd/access-log/src/lib.rs new file mode 100644 index 0000000000..9fe8188ca8 --- /dev/null +++ b/linkerd/access-log/src/lib.rs @@ -0,0 +1,49 @@ +#![deny(warnings, rust_2018_idioms)] + +use linkerd2_error::Error; +use tokio::sync::mpsc; + +pub mod layer; +mod tracker; + +use http::{HeaderValue, Method, StatusCode, Uri}; +pub use layer::{AccessLogContext, AccessLogLayer}; + +#[derive(Debug)] +pub struct AccessLog { + pub uri: Uri, + pub method: Method, + pub status: StatusCode, + pub host: Option, + pub user_agent: Option, + pub trace_id: Option, + pub request_processing_time_us: u64, + pub upstream_processing_time_us: u64, + pub response_processing_time_us: u64, +} + +impl Default for AccessLog { + fn default() -> Self { + AccessLog { + uri: Default::default(), + method: Default::default(), + status: Default::default(), + host: None, + user_agent: None, + trace_id: None, + request_processing_time_us: 0, + upstream_processing_time_us: 0, + response_processing_time_us: 0, + } + } +} + +pub trait AccessLogSink { + fn try_send(&mut self, log: AccessLog) -> Result<(), Error>; +} + +impl AccessLogSink for mpsc::Sender { + fn try_send(&mut self, span: AccessLog) -> Result<(), Error> { + self.try_send(span).map_err(Into::into) + } +} diff --git a/linkerd/access-log/src/tracker.rs b/linkerd/access-log/src/tracker.rs new file mode 100644 index 0000000000..9087c9eb23 --- /dev/null +++ b/linkerd/access-log/src/tracker.rs @@ -0,0 +1,247 @@ +use crate::AccessLog; +use std::mem; +use std::ops::DerefMut; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{atomic, Arc, Mutex, RwLock}; +use std::time::Instant; +use tokio::sync::mpsc; +use tracing::warn; + +struct Entry { + request_rc: AtomicUsize, + response_rc: AtomicUsize, + next_idle: AtomicUsize, + log: Mutex, +} + +pub struct TrackerState { + sink: mpsc::Sender, + entries: RwLock>, + idle_head: AtomicUsize, +} + +pub struct RequestTracker { + start_time: Instant, + idx: usize, + shared: Arc, +} + +pub struct ResponseTracker { + start_time: Instant, + idx: usize, + shared: Arc, +} + +impl TrackerState { + const INITIAL_COUNT: usize = 32; + + pub fn new(sink: mpsc::Sender) -> TrackerState { + let mut entries = Vec::with_capacity(Self::INITIAL_COUNT); + Self::add_entries(&mut entries, Self::INITIAL_COUNT); + + TrackerState { + sink, + entries: RwLock::new(entries), + idle_head: AtomicUsize::new(0), + } + } + + fn add_entries(entries: &mut Vec, count: usize) { + let len = entries.len(); + let new_len = len + count; + for i in len..new_len { + let next_idle = AtomicUsize::new(i + 1); + entries.push(Entry { + request_rc: AtomicUsize::new(0), + response_rc: AtomicUsize::new(0), + log: Mutex::new(Default::default()), + next_idle, + }) + } + } + + #[cold] + fn grow(&self) { + let mut entries = self.entries.write().unwrap(); + let amount = entries.len() * 2; + entries.reserve(amount); + Self::add_entries(&mut entries, amount); + } + + pub fn tracker(self: &Arc) -> (RequestTracker, ResponseTracker) { + let t0 = Instant::now(); + + loop { + let idx = self.idle_head.load(Ordering::Relaxed); + + let acquired = { + let entries = self + .entries + .read() + .ok() + .filter(|entries| idx < entries.len()) + .unwrap_or_else(|| { + // If there are no free entries, extend it + self.grow(); + self.entries.read().unwrap() + }); + + let next = entries[idx].next_idle.load(Ordering::Acquire); + + // Claim the entry + entries[idx] + .request_rc + .compare_and_swap(0, 1, Ordering::AcqRel) + == 0 + && entries[idx] + .response_rc + .compare_and_swap(0, 1, Ordering::AcqRel) + == 0 + && self.idle_head.compare_and_swap(idx, next, Ordering::AcqRel) == idx + }; + + if acquired { + return ( + RequestTracker { + start_time: t0, + idx, + shared: self.clone(), + }, + ResponseTracker { + start_time: t0, + idx, + shared: self.clone(), + }, + ); + } + + // Failed to acquire entry, try again + atomic::spin_loop_hint() + } + } + + fn drop_request(&self, tracker: &RequestTracker) { + // Avoid double panicking in drop. + let panicking = std::thread::panicking(); + + let entries = match self.entries.read() { + Ok(lock) => lock, + Err(_) if panicking => return, + Err(e) => panic!("lock poisoned: {:?}", e), + }; + + let entry: &Entry = match entries.get(tracker.idx) { + Some(entry) => entry, + None if panicking => return, + None => panic!("counts[{:?}] did not exist", tracker.idx), + }; + + if entry.request_rc.fetch_sub(1, Ordering::Release) == 1 { + let drop_t = tracker.start_time.elapsed(); + + let mut log = match entry.log.lock() { + Ok(lock) => lock, + Err(_) if panicking => return, + Err(e) => panic!("lock poisoned: {:?}", e), + }; + log.request_processing_time_us = drop_t.as_micros() as u64; + } + } + + fn drop_response(&self, tracker: &ResponseTracker) { + // Avoid double panicking in drop. + let panicking = std::thread::panicking(); + + let entries = match self.entries.read() { + Ok(lock) => lock, + Err(_) if panicking => return, + Err(e) => panic!("lock poisoned: {:?}", e), + }; + + let entry: &Entry = match entries.get(tracker.idx) { + Some(entry) => entry, + None if panicking => return, + None => panic!("counts[{:?}] did not exist", tracker.idx), + }; + + if entry.response_rc.fetch_sub(1, Ordering::Release) == 1 { + let mut temporary = AccessLog::default(); + { + let mut log = match entry.log.lock() { + Ok(lock) => lock, + Err(_) if panicking => return, + Err(e) => panic!("lock poisoned: {:?}", e), + }; + + mem::swap(&mut temporary, log.deref_mut()); + } + + // If no target_processing_time recorded, downstream closed connection before upstream responded + if temporary.upstream_processing_time_us != 0 { + temporary.response_processing_time_us = tracker.start_time.elapsed().as_micros() + as u64 + - temporary.request_processing_time_us + - temporary.upstream_processing_time_us; + } + + if let Err(error) = self.sink.clone().try_send(temporary) { + warn!(message = "access log dropped", %error); + } + + // Return entry to the free list + let next_idx = self.idle_head.swap(tracker.idx, Ordering::AcqRel); + entry.next_idle.store(next_idx, Ordering::Release); + } + } +} + +impl RequestTracker { + pub fn register(self, request: &mut http::Request) { + { + let entries = self.shared.entries.read().unwrap(); + let mut log = entries[self.idx].log.lock().unwrap(); + + log.host = request.headers().get("Host").map(|x| x.clone()); + + log.trace_id = request + .headers() + .get("X-Amzn-Trace-Id") + .or_else(|| request.headers().get("X-Request-ID")) + .map(|x| x.clone()); + + log.user_agent = request.headers().get("User-Agent").map(|x| x.clone()); + + log.uri = request.uri().clone(); + log.method = request.method().clone(); + } + + request.extensions_mut().insert(self); + } +} + +impl Drop for RequestTracker { + fn drop(&mut self) { + self.shared.drop_request(&*self) + } +} + +impl ResponseTracker { + pub fn register(self, response: &mut http::Response) { + { + let entries = self.shared.entries.read().unwrap(); + let mut log = entries[self.idx].log.lock().unwrap(); + + log.upstream_processing_time_us = + self.start_time.elapsed().as_micros() as u64 - log.request_processing_time_us; + log.status = response.status().clone(); + } + + response.extensions_mut().insert(self); + } +} + +impl Drop for ResponseTracker { + fn drop(&mut self) { + self.shared.drop_response(&*self) + } +} diff --git a/linkerd/app/Cargo.toml b/linkerd/app/Cargo.toml index a447e37fd4..0693865044 100644 --- a/linkerd/app/Cargo.toml +++ b/linkerd/app/Cargo.toml @@ -14,6 +14,7 @@ This is used by tests and the executable. mock-orig-dst = ["linkerd2-app-core/mock-orig-dst"] [dependencies] +chrono = "0.4" futures = { version = "0.3" } http-body = "0.3" indexmap = "1.0" @@ -24,8 +25,9 @@ linkerd2-app-inbound = { path = "./inbound" } linkerd2-app-outbound = { path = "./outbound" } linkerd2-opencensus = { path = "../opencensus" } linkerd2-error = { path = "../error" } +pin-project = "0.4" regex = "1.0.0" -tokio = { version = "0.2", features = ["rt-util"] } +tokio = { version = "0.2", features = ["rt-util", "fs"] } tonic = { version = "0.2", default-features = false, features = ["prost"] } tower = "0.3" tracing = "0.1.9" diff --git a/linkerd/app/core/Cargo.toml b/linkerd/app/core/Cargo.toml index c03ce2a0cc..b161964fd2 100644 --- a/linkerd/app/core/Cargo.toml +++ b/linkerd/app/core/Cargo.toml @@ -22,6 +22,7 @@ http-body = "0.3" hyper = "0.13" futures = "0.3" indexmap = "1.0" +linkerd2-access-log = { path = "../../access-log" } linkerd2-addr = { path = "../../addr" } linkerd2-admit = { path = "../../admit" } linkerd2-cache = { path = "../../cache" } diff --git a/linkerd/app/core/src/lib.rs b/linkerd/app/core/src/lib.rs index 222d45efc2..3e7c0c8c46 100644 --- a/linkerd/app/core/src/lib.rs +++ b/linkerd/app/core/src/lib.rs @@ -9,6 +9,7 @@ #![type_length_limit = "1586225"] #![deny(warnings, rust_2018_idioms)] +pub use linkerd2_access_log::{AccessLog, AccessLogLayer}; pub use linkerd2_addr::{self as addr, Addr, NameAddr}; pub use linkerd2_admit as admit; pub use linkerd2_cache as cache; diff --git a/linkerd/app/inbound/src/lib.rs b/linkerd/app/inbound/src/lib.rs index 41b850f612..666545301e 100644 --- a/linkerd/app/inbound/src/lib.rs +++ b/linkerd/app/inbound/src/lib.rs @@ -27,8 +27,8 @@ use linkerd2_app_core::{ spans::SpanConverter, svc::{self, NewService}, transport::{self, io::BoxedIo, tls}, - Error, ProxyMetrics, TraceContextLayer, DST_OVERRIDE_HEADER, L5D_CLIENT_ID, L5D_REMOTE_IP, - L5D_SERVER_ID, + AccessLog, AccessLogLayer, Error, ProxyMetrics, TraceContextLayer, DST_OVERRIDE_HEADER, + L5D_CLIENT_ID, L5D_REMOTE_IP, L5D_SERVER_ID, }; use std::collections::HashMap; use tokio::sync::mpsc; @@ -61,6 +61,7 @@ impl Config { tap_layer: tap::Layer, metrics: ProxyMetrics, span_sink: Option>, + access_log_sink: Option>, drain: drain::Watch, ) -> Result<(), Error> where @@ -98,6 +99,7 @@ impl Config { local_identity, metrics, span_sink, + access_log_sink, drain, ) .await @@ -304,6 +306,7 @@ impl Config { local_identity: tls::Conditional, metrics: ProxyMetrics, span_sink: Option>, + access_log_sink: Option>, drain: drain::Watch, ) -> Result<(), Error> where @@ -363,6 +366,7 @@ impl Config { .push(errors::layer()); let http_server_observability = svc::layers() + .push(AccessLogLayer::new(access_log_sink)) .push(TraceContextLayer::new(span_sink.map(|span_sink| { SpanConverter::server(span_sink, trace_labels()) }))) diff --git a/linkerd/app/src/access_log_collector.rs b/linkerd/app/src/access_log_collector.rs new file mode 100644 index 0000000000..7852f3ba32 --- /dev/null +++ b/linkerd/app/src/access_log_collector.rs @@ -0,0 +1,181 @@ +use chrono::Utc; +use futures::ready; +use futures::task::{Context, Poll}; +use linkerd2_app_core::{drain, AccessLog}; +use linkerd2_error::Error; +use pin_project::pin_project; +use std::fmt::Debug; +use std::future::Future; +use std::pin::Pin; +use std::time::SystemTime; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncWrite, BufWriter}; +use tokio::sync::mpsc; +use tracing::warn; + +pub type Task = Pin + Send + 'static>>; + +pub type AccessLogSink = mpsc::Sender; + +#[derive(Clone, Debug)] +pub enum Config { + Disabled, + Enabled { path: String }, +} + +pub enum Collector { + Disabled, + Enabled { sink: AccessLogSink, task: Task }, +} + +impl Config { + const BUFFER_CAPACITY: usize = 100; + + pub fn build(self, drain: drain::Watch) -> Result { + match self { + Config::Disabled => Ok(Collector::Disabled), + Config::Enabled { path } => { + let (sink, sink_rx) = mpsc::channel(Self::BUFFER_CAPACITY); + + let task = Box::pin(async move { + let f = OpenOptions::new() + .write(true) + .open(path) + .await + .expect("Failed to open access log file"); + let writer = BufWriter::new(f); + + let sink = CollectorSink { + writer, + last_flush: SystemTime::now(), + rx: sink_rx, + state: State::Read, + }; + + if let Err(err) = drain.watch(sink, |sink| sink.shutdown()).await { + warn!("failed to shutdown access log writer: {}", err); + } + }); + + Ok(Collector::Enabled { sink, task }) + } + } + } +} + +impl Collector { + pub fn sink(&self) -> Option { + match self { + Collector::Disabled => None, + Collector::Enabled { ref sink, .. } => Some(sink.clone()), + } + } +} + +enum State { + Read, + Write(String, usize), + Flush, + Shutdown, +} + +#[pin_project] +struct CollectorSink { + #[pin] + writer: BufWriter, + last_flush: SystemTime, + rx: mpsc::Receiver, + state: State, +} + +/// Consumes AccessLogs from receiver and writes them to the configured BufWriter +impl CollectorSink { + /// A write FLUSH_TIMEOUT_SECS after the last flush will trigger a flush + const FLUSH_TIMEOUT_SECS: u64 = 1; + + /// Transition future to shutting down. Future will complete with result + fn shutdown(&mut self) { + self.state = State::Shutdown + } +} + +impl Future for CollectorSink { + type Output = Result<(), Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + loop { + match this.state { + State::Read => { + if let Some(log) = ready!(this.rx.poll_recv(cx)) { + *this.state = State::Write(format_log(log), 0) + } + } + State::Write(data, written) => { + let bytes = data.as_bytes(); + match ready!(this.writer.as_mut().poll_write(cx, &bytes[*written..])) { + Ok(x) => { + let flush_delta = + this.last_flush.elapsed().map(|x| x.as_secs()).unwrap_or(0); + + let total = *written + x; + if total < bytes.len() { + *written = total + } else if flush_delta > CollectorSink::FLUSH_TIMEOUT_SECS { + *this.state = State::Flush + } else { + *this.state = State::Read + } + } + Err(err) => { + warn!("failed to write to access log: {}", err); + *this.state = State::Read + } + } + } + State::Flush => { + if let Err(err) = ready!(this.writer.as_mut().poll_flush(cx)) { + warn!("failed to flush access log: {}", err); + } + *this.last_flush = SystemTime::now(); + *this.state = State::Read + } + State::Shutdown => return this.writer.poll_shutdown(cx).map_err(Into::into), + } + } + } +} + +fn format_log(log: AccessLog) -> String { + let timestamp = Utc::now().to_rfc3339(); + + let user_agent = log + .user_agent + .as_ref() + .and_then(|x| x.to_str().ok()) + .unwrap_or(""); + let host = log + .host + .as_ref() + .and_then(|x| x.to_str().ok()) + .unwrap_or(""); + let trace_id = log + .trace_id + .as_ref() + .and_then(|x| x.to_str().ok()) + .unwrap_or(""); + + format!( + "{} \"{}\" \"{}\" \"{}\" \"{}\" {} {} {} {} \"{}\"\n", + timestamp, + user_agent, + host, + log.method, + log.uri, + log.status, + log.request_processing_time_us, + log.upstream_processing_time_us, + log.response_processing_time_us, + trace_id + ) +} diff --git a/linkerd/app/src/env.rs b/linkerd/app/src/env.rs index 5c7f8d208f..53efe877b7 100644 --- a/linkerd/app/src/env.rs +++ b/linkerd/app/src/env.rs @@ -5,7 +5,7 @@ use crate::core::{ transport::{listen, tls}, Addr, }; -use crate::{dns, gateway, identity, inbound, oc_collector, outbound}; +use crate::{access_log_collector, dns, gateway, identity, inbound, oc_collector, outbound}; use indexmap::IndexSet; use std::collections::HashMap; use std::convert::TryFrom; @@ -152,6 +152,8 @@ pub const ENV_HOSTNAME: &str = "HOSTNAME"; pub const ENV_TRACE_COLLECTOR_SVC_BASE: &str = "LINKERD2_PROXY_TRACE_COLLECTOR_SVC"; +pub const ENV_ACCESS_LOG_FILE: &str = "LINKERD2_PROXY_ACCESS_LOG_FILE"; + pub const ENV_DESTINATION_CONTEXT: &str = "LINKERD2_PROXY_DESTINATION_CONTEXT"; pub const ENV_DESTINATION_PROFILE_INITIAL_TIMEOUT: &str = "LINKERD2_PROXY_DESTINATION_PROFILE_INITIAL_TIMEOUT"; @@ -316,6 +318,8 @@ pub fn parse_config(strings: &S) -> Result parse_control_addr(strings, ENV_TRACE_COLLECTOR_SVC_BASE) }; + let access_log_file = strings.get(ENV_ACCESS_LOG_FILE); + let dst_token = strings.get(ENV_DESTINATION_CONTEXT); let gateway_suffixes = parse(strings, ENV_INBOUND_GATEWAY_SUFFIXES, parse_dns_suffixes); @@ -544,6 +548,11 @@ pub fn parse_config(strings: &S) -> Result } }; + let access_log_collector = match access_log_file? { + None => access_log_collector::Config::Disabled, + Some(path) => access_log_collector::Config::Enabled { path }, + }; + let tap = tap? .map(|(addr, ids)| super::tap::Config::Enabled { permitted_peer_identities: ids, @@ -579,6 +588,7 @@ pub fn parse_config(strings: &S) -> Result dst, tap, oc_collector, + access_log_collector, identity, outbound, gateway, diff --git a/linkerd/app/src/lib.rs b/linkerd/app/src/lib.rs index c5b977607a..e2f9fcbf62 100644 --- a/linkerd/app/src/lib.rs +++ b/linkerd/app/src/lib.rs @@ -2,6 +2,7 @@ #![recursion_limit = "256"] //#![deny(warnings, rust_2018_idioms)] +pub mod access_log_collector; pub mod admin; pub mod dst; pub mod env; @@ -52,6 +53,7 @@ pub struct Config { pub admin: admin::Config, pub tap: tap::Config, pub oc_collector: oc_collector::Config, + pub access_log_collector: access_log_collector::Config, } pub struct App { @@ -62,6 +64,7 @@ pub struct App { identity: identity::Identity, inbound_addr: SocketAddr, oc_collector: oc_collector::OcCollector, + access_log_collector: access_log_collector::Collector, outbound_addr: SocketAddr, start_proxy: Pin + Send + 'static>>, tap: tap::Tap, @@ -84,6 +87,7 @@ impl Config { identity, inbound, oc_collector, + access_log_collector, outbound, gateway, tap, @@ -135,6 +139,9 @@ impl Config { info_span!("opencensus").in_scope(|| oc_collector.build(identity, dns, metrics)) }?; + let access_log_collector = + { info_span!("access_log").in_scope(|| access_log_collector.build(drain_rx.clone())) }?; + let admin = { let identity = identity.local(); let drain = drain_rx.clone(); @@ -153,6 +160,7 @@ impl Config { let local_identity = identity.local(); let tap_layer = tap.layer(); let oc_span_sink = oc_collector.span_sink(); + let access_log_sink = access_log_collector.sink(); let start_proxy = Box::pin(async move { let outbound_connect = @@ -212,6 +220,7 @@ impl Config { tap_layer, inbound_metrics, oc_span_sink, + access_log_sink, drain_rx, ) .map_err(|e| panic!("inbound failed: {}", e)) @@ -227,6 +236,7 @@ impl Config { identity, inbound_addr, oc_collector, + access_log_collector, outbound_addr, start_proxy, tap, @@ -286,6 +296,7 @@ impl App { dns, identity, oc_collector, + access_log_collector, start_proxy, tap, .. @@ -363,6 +374,12 @@ impl App { tokio::spawn(task.instrument(info_span!("opencensus"))); } + if let access_log_collector::Collector::Enabled { task, .. } = + access_log_collector + { + tokio::spawn(task.instrument(info_span!("access_log"))); + } + // we don't care if the admin shutdown channel is // dropped or actually triggered. let _ = admin_shutdown_rx.await;