Skip to content

Commit

Permalink
feat: disable vrl 'string_path' feature (vectordotdev#18188)
Browse files Browse the repository at this point in the history
* feat: disable vrl 'string_path' feature

* more path refactoring

* more changes

* lua changes

* fix bad toml rebase
  • Loading branch information
pront authored Aug 17, 2023
1 parent f2a6887 commit 5ce5ff1
Show file tree
Hide file tree
Showing 48 changed files with 307 additions and 291 deletions.
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ members = [
]

[workspace.dependencies]
vrl = { version = "0.6.0", features = ["cli", "test", "test_framework", "arbitrary"] }
vrl = { version = "0.6.0", default-features = false, features = ["cli", "test", "test_framework", "arbitrary", "compiler", "value", "diagnostic", "path", "parser", "stdlib", "datadog", "core"] }

pin-project = { version = "1.1.3", default-features = false }

[dependencies]
Expand Down Expand Up @@ -370,6 +371,8 @@ tokio = { version = "1.32.0", features = ["test-util"] }
tokio-test = "0.4.2"
tower-test = "0.4.0"
vector-core = { path = "lib/vector-core", default-features = false, features = ["vrl", "test"] }
vrl = { version = "0.6.0", features = ["cli", "test", "test_framework", "arbitrary"] }

wiremock = "0.5.19"
zstd = { version = "0.12.4", default-features = false }

Expand Down
2 changes: 1 addition & 1 deletion lib/codecs/src/decoding/format/gelf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ mod tests {
let events = deserialize_gelf_input(&input).unwrap();
assert_eq!(events.len(), 1);
let log = events[0].as_log();
assert!(!log.contains("_id"));
assert!(!log.contains(event_path!("_id")));
}
}

Expand Down
5 changes: 4 additions & 1 deletion lib/codecs/src/encoding/format/gelf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,10 @@ fn to_gelf_event(log: LogEvent) -> vector_common::Result<LogEvent> {
coerce_field_names_and_values(log).map(|(mut log, missing_prefix)| {
// rename additional fields that were flagged as missing the underscore prefix
for field in missing_prefix {
log.rename_key(event_path!(field.as_str()), format!("_{}", &field).as_str());
log.rename_key(
event_path!(field.as_str()),
event_path!(format!("_{}", &field).as_str()),
);
}
log
})
Expand Down
7 changes: 7 additions & 0 deletions lib/vector-lookup/src/lookup_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ impl<'a> ValuePath<'a> for &'a ConfigValuePath {
}
}

#[cfg(any(test, feature = "test"))]
impl From<&str> for ConfigValuePath {
fn from(path: &str) -> Self {
ConfigValuePath::try_from(path.to_string()).unwrap()
}
}

/// A wrapper around `OwnedTargetPath` that allows it to be used in Vector config
/// with prefix default to `PathPrefix::Event`
#[configurable_component]
Expand Down
7 changes: 4 additions & 3 deletions src/api/schema/events/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::borrow::Cow;
use async_graphql::Object;
use chrono::{DateTime, Utc};
use vector_common::encode_logfmt;
use vrl::event_path;

use super::EventEncodingType;
use crate::{event, topology::TapOutput};
Expand All @@ -19,11 +20,11 @@ impl Log {
}

pub fn get_message(&self) -> Option<Cow<'_, str>> {
Some(self.event.get("message")?.to_string_lossy())
Some(self.event.get(event_path!("message"))?.to_string_lossy())
}

pub fn get_timestamp(&self) -> Option<&DateTime<Utc>> {
self.event.get("timestamp")?.as_timestamp()
self.event.get(event_path!("timestamp"))?.as_timestamp()
}
}

Expand Down Expand Up @@ -69,7 +70,7 @@ impl Log {

/// Get JSON field data on the log event, by field name
async fn json(&self, field: String) -> Option<String> {
self.event.get(field.as_str()).map(|field| {
self.event.get(event_path!(field.as_str())).map(|field| {
serde_json::to_string(field)
.expect("JSON serialization of trace event field failed. Please report.")
})
Expand Down
15 changes: 3 additions & 12 deletions src/codecs/encoding/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,7 @@ mod test {
transformer.only_fields(),
&Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
);
assert_eq!(
transformer.except_fields(),
&Some(vec!["ignore_me".to_owned()])
);
assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
}

Expand Down Expand Up @@ -207,10 +204,7 @@ mod test {
transformer.only_fields(),
&Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
);
assert_eq!(
transformer.except_fields(),
&Some(vec!["ignore_me".to_owned()])
);
assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
}

Expand Down Expand Up @@ -239,10 +233,7 @@ mod test {
transformer.only_fields(),
&Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
);
assert_eq!(
transformer.except_fields(),
&Some(vec!["ignore_me".to_owned()])
);
assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
}
}
78 changes: 35 additions & 43 deletions src/codecs/encoding/transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@ use core::fmt::Debug;
use std::collections::BTreeMap;

use lookup::lookup_v2::ConfigValuePath;
use lookup::{
event_path,
lookup_v2::{parse_value_path, OwnedValuePath},
PathPrefix,
};
use lookup::{event_path, PathPrefix};
use serde::{Deserialize, Deserializer};
use vector_config::configurable_component;
use vector_core::event::{LogEvent, MaybeAsLogMut};
use vector_core::schema::meaning;
use vrl::path::OwnedValuePath;
use vrl::value::Value;

use crate::{event::Event, serde::skip_serializing_if_default};
Expand All @@ -27,7 +24,7 @@ pub struct Transformer {

/// List of fields that are excluded from the encoded event.
#[serde(default, skip_serializing_if = "skip_serializing_if_default")]
except_fields: Option<Vec<String>>,
except_fields: Option<Vec<ConfigValuePath>>,

/// Format used for timestamp fields.
#[serde(default, skip_serializing_if = "skip_serializing_if_default")]
Expand All @@ -45,15 +42,19 @@ impl<'de> Deserialize<'de> for Transformer {
#[serde(default)]
only_fields: Option<Vec<OwnedValuePath>>,
#[serde(default)]
except_fields: Option<Vec<String>>,
except_fields: Option<Vec<OwnedValuePath>>,
#[serde(default)]
timestamp_format: Option<TimestampFormat>,
}

let inner: TransformerInner = Deserialize::deserialize(deserializer)?;
Self::new(
inner.only_fields,
inner.except_fields,
inner
.only_fields
.map(|v| v.iter().map(|p| ConfigValuePath(p.clone())).collect()),
inner
.except_fields
.map(|v| v.iter().map(|p| ConfigValuePath(p.clone())).collect()),
inner.timestamp_format,
)
.map_err(serde::de::Error::custom)
Expand All @@ -66,13 +67,12 @@ impl Transformer {
/// Returns `Err` if `only_fields` and `except_fields` fail validation, i.e. are not mutually
/// exclusive.
pub fn new(
only_fields: Option<Vec<OwnedValuePath>>,
except_fields: Option<Vec<String>>,
only_fields: Option<Vec<ConfigValuePath>>,
except_fields: Option<Vec<ConfigValuePath>>,
timestamp_format: Option<TimestampFormat>,
) -> Result<Self, crate::Error> {
Self::validate_fields(only_fields.as_ref(), except_fields.as_ref())?;

let only_fields = only_fields.map(|x| x.into_iter().map(ConfigValuePath).collect());
Ok(Self {
only_fields,
except_fields,
Expand All @@ -87,7 +87,7 @@ impl Transformer {
}

/// Get the `Transformer`'s `except_fields`.
pub const fn except_fields(&self) -> &Option<Vec<String>> {
pub const fn except_fields(&self) -> &Option<Vec<ConfigValuePath>> {
&self.except_fields
}

Expand All @@ -100,14 +100,14 @@ impl Transformer {
///
/// If an error is returned, the entire encoding configuration should be considered inoperable.
fn validate_fields(
only_fields: Option<&Vec<OwnedValuePath>>,
except_fields: Option<&Vec<String>>,
only_fields: Option<&Vec<ConfigValuePath>>,
except_fields: Option<&Vec<ConfigValuePath>>,
) -> crate::Result<()> {
if let (Some(only_fields), Some(except_fields)) = (only_fields, except_fields) {
if except_fields.iter().any(|f| {
let path_iter = parse_value_path(f).unwrap();
only_fields.iter().any(|v| v == &path_iter)
}) {
if except_fields
.iter()
.any(|f| only_fields.iter().any(|v| v == f))
{
return Err(
"`except_fields` and `only_fields` should be mutually exclusive.".into(),
);
Expand Down Expand Up @@ -155,22 +155,19 @@ impl Transformer {
}

fn apply_except_fields(&self, log: &mut LogEvent) {
use lookup::path::TargetPath;

if let Some(except_fields) = self.except_fields.as_ref() {
let service_path = log
.metadata()
.schema_definition()
.meaning_path(meaning::SERVICE)
.map(|path| path.value_path().to_string());

for field in except_fields {
let value = log.remove(field.as_str());
let value_path = &field.0;
let value = log.remove((PathPrefix::Event, value_path));

let service_path = log
.metadata()
.schema_definition()
.meaning_path(meaning::SERVICE);
// If we are removing the service field we need to store this in a `dropped_fields` list as we may need to
// refer to this later when emitting metrics.
if let Some(v) = value {
if matches!(service_path.as_ref(), Some(path) if path == field) {
if let (Some(v), Some(service_path)) = (value, service_path) {
if service_path.path == *value_path {
log.metadata_mut()
.add_dropped_field(meaning::SERVICE.to_string(), v);
}
Expand Down Expand Up @@ -216,17 +213,12 @@ impl Transformer {
/// Returns `Err` if the new `except_fields` fail validation, i.e. are not mutually exclusive
/// with `only_fields`.
#[cfg(test)]
pub fn set_except_fields(&mut self, except_fields: Option<Vec<String>>) -> crate::Result<()> {
Self::validate_fields(
self.only_fields
.clone()
.map(|x| x.into_iter().map(|x| x.0).collect())
.as_ref(),
except_fields.as_ref(),
)?;

pub fn set_except_fields(
&mut self,
except_fields: Option<Vec<ConfigValuePath>>,
) -> crate::Result<()> {
Self::validate_fields(self.only_fields.as_ref(), except_fields.as_ref())?;
self.except_fields = except_fields;

Ok(())
}
}
Expand Down Expand Up @@ -282,7 +274,7 @@ mod tests {
#[test]
fn deserialize_and_transform_except() {
let transformer: Transformer =
toml::from_str(r#"except_fields = ["a.b.c", "b", "c[0].y", "d\\.z", "e"]"#).unwrap();
toml::from_str(r#"except_fields = ["a.b.c", "b", "c[0].y", "d.z", "e"]"#).unwrap();
let mut log = LogEvent::default();
{
log.insert("a", 1);
Expand All @@ -293,7 +285,7 @@ mod tests {
log.insert("b[1].x", 1);
log.insert("c[0].x", 1);
log.insert("c[0].y", 1);
log.insert("d\\.z", 1);
log.insert("d.z", 1);
log.insert("e.a", 1);
log.insert("e.b", 1);
}
Expand All @@ -303,7 +295,7 @@ mod tests {
assert!(!event.as_mut_log().contains("b"));
assert!(!event.as_mut_log().contains("b[1].x"));
assert!(!event.as_mut_log().contains("c[0].y"));
assert!(!event.as_mut_log().contains("d\\.z"));
assert!(!event.as_mut_log().contains("d.z"));
assert!(!event.as_mut_log().contains("e.a"));

assert!(event.as_mut_log().contains("a.b.d"));
Expand Down
5 changes: 3 additions & 2 deletions src/internal_events/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use metrics::{counter, gauge};
use vector_core::{internal_event::InternalEvent, update_counter};
use vrl::path::OwnedTargetPath;

use vector_common::{
internal_event::{error_stage, error_type},
Expand Down Expand Up @@ -161,7 +162,7 @@ impl InternalEvent for KafkaStatisticsReceived<'_> {
}

pub struct KafkaHeaderExtractionError<'a> {
pub header_field: &'a str,
pub header_field: &'a OwnedTargetPath,
}

impl InternalEvent for KafkaHeaderExtractionError<'_> {
Expand All @@ -171,7 +172,7 @@ impl InternalEvent for KafkaHeaderExtractionError<'_> {
error_code = "extracting_header",
error_type = error_type::PARSER_FAILED,
stage = error_stage::RECEIVING,
header_field = self.header_field,
header_field = self.header_field.to_string(),
internal_log_rate_limit = true,
);
counter!(
Expand Down
3 changes: 2 additions & 1 deletion src/sinks/aws_kinesis/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use lookup::lookup_v2::ConfigValuePath;
use std::marker::PhantomData;

use vector_core::stream::BatcherSettings;
Expand Down Expand Up @@ -79,7 +80,7 @@ impl KinesisSinkBaseConfig {
/// Builds an aws_kinesis sink.
pub fn build_sink<C, R, RR, E, RT>(
config: &KinesisSinkBaseConfig,
partition_key_field: Option<String>,
partition_key_field: Option<ConfigValuePath>,
batch_settings: BatcherSettings,
client: C,
retry_logic: RT,
Expand Down
14 changes: 7 additions & 7 deletions src/sinks/aws_kinesis/sink.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::{borrow::Cow, fmt::Debug, marker::PhantomData, num::NonZeroUsize};

use lookup::lookup_v2::ConfigValuePath;
use rand::random;
use vrl::path::PathPrefix;

use crate::{
internal_events::{AwsKinesisStreamNoPartitionKeyError, SinkRequestBuildError},
Expand All @@ -27,7 +29,7 @@ pub struct KinesisSink<S, R> {
pub batch_settings: BatcherSettings,
pub service: S,
pub request_builder: KinesisRequestBuilder<R>,
pub partition_key_field: Option<String>,
pub partition_key_field: Option<ConfigValuePath>,
pub _phantom: PhantomData<R>,
}

Expand All @@ -42,13 +44,11 @@ where
async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
let request_builder_concurrency_limit = NonZeroUsize::new(50);

let partition_key_field = self.partition_key_field.clone();

input
.filter_map(|event| {
// Panic: This sink only accepts Logs, so this should never panic
let log = event.into_log();
let processed = process_log(log, &partition_key_field);
let processed = process_log(log, self.partition_key_field.as_ref());

future::ready(processed)
})
Expand Down Expand Up @@ -106,14 +106,14 @@ where
/// events are emitted and None is returned.
pub(crate) fn process_log(
log: LogEvent,
partition_key_field: &Option<String>,
partition_key_field: Option<&ConfigValuePath>,
) -> Option<KinesisProcessedEvent> {
let partition_key = if let Some(partition_key_field) = partition_key_field {
if let Some(v) = log.get(partition_key_field.as_str()) {
if let Some(v) = log.get((PathPrefix::Event, partition_key_field)) {
v.to_string_lossy()
} else {
emit!(AwsKinesisStreamNoPartitionKeyError {
partition_key_field
partition_key_field: partition_key_field.0.to_string().as_str()
});
return None;
}
Expand Down
Loading

0 comments on commit 5ce5ff1

Please sign in to comment.