Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(file source): Better multi-line support #1852

Merged
merged 18 commits into from
Feb 22, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

/src/sources/docker.rs @LucioFranco
/src/sources/file.rs @LucioFranco
/src/sources/file/line_agg.rs @MOZGIII
/src/sources/journald.rs @bruceg
/src/sources/kafka.rs @a-rodin
/src/sources/stdin.rs @bruceg
Expand Down
243 changes: 145 additions & 98 deletions src/sources/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@ use crate::{
topology::config::{DataType, GlobalOptions, SourceConfig, SourceDescription},
trace::{current_span, Instrument},
};
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use file_source::{FileServer, Fingerprinter};
use futures::{future, sync::mpsc, Async, Future, Poll, Sink, Stream};
use futures::{future, sync::mpsc, Future, Sink, Stream};
use regex::bytes::Regex;
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
use std::collections::{HashMap, VecDeque};
use std::convert::{TryFrom, TryInto};
use std::path::PathBuf;
use std::thread;
use std::time::{Duration, SystemTime};
use tokio::timer::DelayQueue;

mod line_agg;
use line_agg::LineAgg;
MOZGIII marked this conversation as resolved.
Show resolved Hide resolved

#[derive(Debug, Snafu)]
enum BuildError {
Expand Down Expand Up @@ -42,6 +44,14 @@ enum BuildError {
indicator: String,
source: regex::Error,
},
InvalidMultilineStartPattern {
start_pattern: String,
source: regex::Error,
},
InvalidMultilineConditionPattern {
condition_pattern: String,
source: regex::Error,
},
MOZGIII marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Deserialize, Serialize, Debug, PartialEq)]
Expand All @@ -60,10 +70,47 @@ pub struct FileConfig {
pub fingerprinting: FingerprintingConfig,
pub message_start_indicator: Option<String>,
pub multi_line_timeout: u64, // millis
pub multiline: Option<MultilineConfig>,
pub max_read_bytes: usize,
pub oldest_first: bool,
}

#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct MultilineConfig {
pub start_pattern: String,
pub condition_pattern: String,
pub mode: line_agg::Mode,
pub timeout_ms: u64,
}

impl TryFrom<&MultilineConfig> for line_agg::Config {
type Error = crate::Error;

fn try_from(config: &MultilineConfig) -> crate::Result<Self> {
let MultilineConfig {
start_pattern,
condition_pattern,
mode,
timeout_ms,
} = config;

let start_pattern = Regex::new(start_pattern)
.with_context(|| InvalidMultilineStartPattern { start_pattern })?;
let condition_pattern = Regex::new(condition_pattern)
.with_context(|| InvalidMultilineConditionPattern { condition_pattern })?;
let mode = mode.clone();
let timeout = Duration::from_millis(*timeout_ms);

Ok(Self {
start_pattern,
condition_pattern,
mode,
timeout,
})
}
}

MOZGIII marked this conversation as resolved.
Show resolved Hide resolved
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
#[serde(tag = "strategy", rename_all = "snake_case")]
pub enum FingerprintingConfig {
Expand Down Expand Up @@ -112,6 +159,7 @@ impl Default for FileConfig {
glob_minimum_cooldown: 1000, // millis
message_start_indicator: None,
multi_line_timeout: 1000, // millis
multiline: None,
max_read_bytes: 2048,
oldest_first: false,
}
Expand All @@ -136,6 +184,10 @@ impl SourceConfig for FileConfig {
// other
let data_dir = globals.resolve_and_make_data_subdir(self.data_dir.as_ref(), name)?;

if let Some(ref config) = self.multiline {
TryInto::<line_agg::Config>::try_into(config)?;
MOZGIII marked this conversation as resolved.
Show resolved Hide resolved
}

if let Some(ref indicator) = self.message_start_indicator {
Regex::new(indicator).with_context(|| InvalidMessageStartIndicator { indicator })?;
}
Expand Down Expand Up @@ -186,6 +238,7 @@ pub fn file_source(

let include = config.include.clone();
let exclude = config.exclude.clone();
let multiline_config = config.multiline.clone();
let message_start_indicator = config.message_start_indicator.clone();
let multi_line_timeout = config.multi_line_timeout;
Box::new(future::lazy(move || {
Expand All @@ -195,11 +248,18 @@ pub fn file_source(
let (tx, rx) = futures::sync::mpsc::channel(100);

let messages: Box<dyn Stream<Item = (Bytes, String), Error = ()> + Send> =
if let Some(msi) = message_start_indicator {
if let Some(ref multiline_config) = multiline_config {
Box::new(LineAgg::new(
rx,
Regex::new(&msi).unwrap(), // validated in build
multi_line_timeout,
line_agg::Config::try_from(multiline_config).unwrap(), // validated in build
MOZGIII marked this conversation as resolved.
Show resolved Hide resolved
))
} else if let Some(msi) = message_start_indicator {
Box::new(LineAgg::new(
rx,
line_agg::Config::for_legacy(
Regex::new(&msi).unwrap(), // validated in build
multi_line_timeout,
),
))
} else {
Box::new(rx)
Expand Down Expand Up @@ -235,96 +295,6 @@ pub fn file_source(
}))
}

struct LineAgg<T> {
inner: T,
marker: Regex,
timeout: u64,
buffers: HashMap<String, BytesMut>,
draining: Option<Vec<(Bytes, String)>>,
timeouts: DelayQueue<String>,
expired: VecDeque<String>,
}

impl<T> LineAgg<T> {
fn new(inner: T, marker: Regex, timeout: u64) -> Self {
Self {
inner,
marker,
timeout,
draining: None,
buffers: HashMap::new(),
timeouts: DelayQueue::new(),
expired: VecDeque::new(),
}
}
}

impl<T: Stream<Item = (Bytes, String), Error = ()>> Stream for LineAgg<T> {
type Item = (Bytes, String);
type Error = ();

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
if let Some(to_drain) = &mut self.draining {
if let Some((data, key)) = to_drain.pop() {
return Ok(Async::Ready(Some((data, key))));
} else {
return Ok(Async::Ready(None));
}
}

// check for keys that have hit their timeout
while let Ok(Async::Ready(Some(expired_key))) = self.timeouts.poll() {
self.expired.push_back(expired_key.into_inner());
}

match self.inner.poll() {
Ok(Async::Ready(Some((line, src)))) => {
// look for buffered content from same source
if self.buffers.contains_key(&src) {
if self.marker.is_match(line.as_ref()) {
// buffer the incoming line and flush the existing data
let buffered = self
.buffers
.insert(src.clone(), line.into())
.expect("already asserted key is present");
return Ok(Async::Ready(Some((buffered.freeze(), src))));
} else {
// append new line to the buffered data
let buffered = self
.buffers
.get_mut(&src)
.expect("already asserted key is present");
buffered.extend_from_slice(b"\n");
buffered.extend_from_slice(&line);
}
} else {
// no existing data for this source so buffer it with timeout
self.timeouts
.insert(src.clone(), Duration::from_millis(self.timeout));
self.buffers.insert(src, line.into());
}
}
Ok(Async::Ready(None)) => {
// start flushing all existing data, stop polling inner
self.draining =
Some(self.buffers.drain().map(|(k, v)| (v.into(), k)).collect());
}
Ok(Async::NotReady) => {
if let Some(key) = self.expired.pop_front() {
if let Some(buffered) = self.buffers.remove(&key) {
return Ok(Async::Ready(Some((buffered.freeze(), key))));
}
}

return Ok(Async::NotReady);
}
Err(()) => return Err(()),
};
}
}
}

fn create_event(
line: Bytes,
file: String,
Expand Down Expand Up @@ -1143,7 +1113,7 @@ mod tests {
}

#[test]
fn test_multi_line_aggregation() {
fn test_multi_line_aggregation_legacy() {
let (tx, rx) = futures::sync::mpsc::channel(10);
let (trigger, tripwire) = Tripwire::new();

Expand Down Expand Up @@ -1215,6 +1185,83 @@ mod tests {
);
}

#[test]
fn test_multi_line_aggregation() {
let (tx, rx) = futures::sync::mpsc::channel(10);
let (trigger, tripwire) = Tripwire::new();

let dir = tempdir().unwrap();
let config = file::FileConfig {
include: vec![dir.path().join("*")],
multiline: Some(MultilineConfig {
start_pattern: "INFO".to_owned(),
condition_pattern: "INFO".to_owned(),
mode: line_agg::Mode::HaltBefore,
timeout_ms: 25, // less than 50 in sleep()
}),
..test_default_file_config(&dir)
};

let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);

let mut rt = runtime::Runtime::new().unwrap();

rt.spawn(source.select(tripwire).map(|_| ()).map_err(|_| ()));

let path = dir.path().join("file");
let mut file = File::create(&path).unwrap();

sleep(); // The files must be observed at their original lengths before writing to them

writeln!(&mut file, "leftover foo").unwrap();
writeln!(&mut file, "INFO hello").unwrap();
writeln!(&mut file, "INFO goodbye").unwrap();
writeln!(&mut file, "part of goodbye").unwrap();

sleep();

writeln!(&mut file, "INFO hi again").unwrap();
writeln!(&mut file, "and some more").unwrap();
writeln!(&mut file, "INFO hello").unwrap();

sleep();

writeln!(&mut file, "too slow").unwrap();
writeln!(&mut file, "INFO doesn't have").unwrap();
writeln!(&mut file, "to be INFO in").unwrap();
writeln!(&mut file, "the middle").unwrap();

sleep();

drop(trigger);
shutdown_on_idle(rt);

let received = wait_with_timeout(
rx.map(|event| {
event
.as_log()
.get(&event::log_schema().message_key())
.unwrap()
.clone()
})
.collect(),
);

assert_eq!(
received,
vec![
"leftover foo".into(),
"INFO hello".into(),
"INFO goodbye\npart of goodbye".into(),
"INFO hi again\nand some more".into(),
"INFO hello".into(),
"too slow".into(),
"INFO doesn't have".into(),
"to be INFO in\nthe middle".into(),
]
);
}

#[test]
fn test_fair_reads() {
let (tx, rx) = futures::sync::mpsc::channel(10);
Expand Down
Loading