From 58e59a578fd559c54e614e7f544bb285aa974784 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Mon, 14 Aug 2023 10:41:30 -0400 Subject: [PATCH] more changes --- lib/vector-lookup/src/lookup_v2/mod.rs | 7 +++++ src/codecs/encoding/config.rs | 15 ++------- src/codecs/encoding/transformer.rs | 26 ++++++++-------- src/internal_events/kafka.rs | 5 +-- src/sinks/elasticsearch/tests.rs | 27 +++++++--------- src/sinks/humio/metrics.rs | 4 +-- src/sinks/kafka/config.rs | 9 ++++-- src/sinks/kafka/request_builder.rs | 31 +++++++++---------- src/sinks/kafka/sink.rs | 9 +++--- src/sinks/kafka/tests.rs | 5 +-- src/sinks/loki/sink.rs | 5 ++- src/sinks/splunk_hec/logs/config.rs | 6 +++- .../splunk_hec/logs/integration_tests.rs | 29 ++++++----------- src/sinks/splunk_hec/logs/sink.rs | 2 +- src/sinks/splunk_hec/logs/tests.rs | 5 ++- src/sources/aws_s3/sqs.rs | 2 +- src/sources/kubernetes_logs/parser/cri.rs | 4 +-- src/sources/kubernetes_logs/parser/docker.rs | 6 ++-- src/sources/kubernetes_logs/parser/mod.rs | 4 +-- .../kubernetes_logs/partial_events_merger.rs | 4 +-- .../kubernetes_logs/transform_utils/mod.rs | 11 +------ src/transforms/log_to_metric.rs | 8 +++-- src/transforms/lua/v1/mod.rs | 9 ++++-- .../reference/components/sinks/base/kafka.cue | 2 +- 24 files changed, 116 insertions(+), 119 deletions(-) diff --git a/lib/vector-lookup/src/lookup_v2/mod.rs b/lib/vector-lookup/src/lookup_v2/mod.rs index d066eb596e3e20..de95977844d627 100644 --- a/lib/vector-lookup/src/lookup_v2/mod.rs +++ b/lib/vector-lookup/src/lookup_v2/mod.rs @@ -38,6 +38,13 @@ impl<'a> ValuePath<'a> for &'a ConfigValuePath { } } +#[cfg(any(test, feature = "test"))] +impl From<&str> for ConfigValuePath { + fn from(path: &str) -> Self { + ConfigValuePath::try_from(path.to_string()).unwrap() + } +} + /// A wrapper around `OwnedTargetPath` that allows it to be used in Vector config /// with prefix default to `PathPrefix::Event` #[configurable_component] diff --git a/src/codecs/encoding/config.rs b/src/codecs/encoding/config.rs index 97096de1e0f3d3..cdae762bf64844 100644 --- a/src/codecs/encoding/config.rs +++ b/src/codecs/encoding/config.rs @@ -172,10 +172,7 @@ mod test { transformer.only_fields(), &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())]) ); - assert_eq!( - transformer.except_fields(), - &Some(vec!["ignore_me".to_owned()]) - ); + assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()])); assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix)); } @@ -207,10 +204,7 @@ mod test { transformer.only_fields(), &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())]) ); - assert_eq!( - transformer.except_fields(), - &Some(vec!["ignore_me".to_owned()]) - ); + assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()])); assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix)); } @@ -239,10 +233,7 @@ mod test { transformer.only_fields(), &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())]) ); - assert_eq!( - transformer.except_fields(), - &Some(vec!["ignore_me".to_owned()]) - ); + assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()])); assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix)); } } diff --git a/src/codecs/encoding/transformer.rs b/src/codecs/encoding/transformer.rs index 788ae68ad0fb2a..bd5aee8b7f0bc1 100644 --- a/src/codecs/encoding/transformer.rs +++ b/src/codecs/encoding/transformer.rs @@ -51,10 +51,10 @@ impl<'de> Deserialize<'de> for Transformer { Self::new( inner .only_fields - .map(|v| v.iter().map(|p| ConfigValuePath { 0: p.clone() }).collect()), + .map(|v| v.iter().map(|p| ConfigValuePath(p.clone())).collect()), inner .except_fields - .map(|v| v.iter().map(|p| ConfigValuePath { 0: p.clone() }).collect()), + .map(|v| v.iter().map(|p| ConfigValuePath(p.clone())).collect()), inner.timestamp_format, ) .map_err(serde::de::Error::custom) @@ -156,18 +156,18 @@ impl Transformer { fn apply_except_fields(&self, log: &mut LogEvent) { if let Some(except_fields) = self.except_fields.as_ref() { - let service_path = log - .metadata() - .schema_definition() - .meaning_path(meaning::SERVICE); - for field in except_fields { - let value = log.remove((PathPrefix::Event, field)); + let value_path = &field.0; + let value = log.remove((PathPrefix::Event, value_path)); + let service_path = log + .metadata() + .schema_definition() + .meaning_path(meaning::SERVICE); // If we are removing the service field we need to store this in a `dropped_fields` list as we may need to // refer to this later when emitting metrics. - if let Some(v) = value { - if matches!(service_path, Some(target_path) if target_path.path == field.0) { + if let (Some(v), Some(service_path)) = (value, service_path) { + if service_path.path == *value_path { log.metadata_mut() .add_dropped_field(meaning::SERVICE.to_string(), v); } @@ -274,7 +274,7 @@ mod tests { #[test] fn deserialize_and_transform_except() { let transformer: Transformer = - toml::from_str(r#"except_fields = ["a.b.c", "b", "c[0].y", "d\\.z", "e"]"#).unwrap(); + toml::from_str(r#"except_fields = ["a.b.c", "b", "c[0].y", "d.z", "e"]"#).unwrap(); let mut log = LogEvent::default(); { log.insert("a", 1); @@ -285,7 +285,7 @@ mod tests { log.insert("b[1].x", 1); log.insert("c[0].x", 1); log.insert("c[0].y", 1); - log.insert("d\\.z", 1); + log.insert("d.z", 1); log.insert("e.a", 1); log.insert("e.b", 1); } @@ -295,7 +295,7 @@ mod tests { assert!(!event.as_mut_log().contains("b")); assert!(!event.as_mut_log().contains("b[1].x")); assert!(!event.as_mut_log().contains("c[0].y")); - assert!(!event.as_mut_log().contains("d\\.z")); + assert!(!event.as_mut_log().contains("d.z")); assert!(!event.as_mut_log().contains("e.a")); assert!(event.as_mut_log().contains("a.b.d")); diff --git a/src/internal_events/kafka.rs b/src/internal_events/kafka.rs index 57c25d905c60f5..7a968626d0cb99 100644 --- a/src/internal_events/kafka.rs +++ b/src/internal_events/kafka.rs @@ -1,5 +1,6 @@ use metrics::{counter, gauge}; use vector_core::{internal_event::InternalEvent, update_counter}; +use vrl::path::OwnedTargetPath; use vector_common::{ internal_event::{error_stage, error_type}, @@ -161,7 +162,7 @@ impl InternalEvent for KafkaStatisticsReceived<'_> { } pub struct KafkaHeaderExtractionError<'a> { - pub header_field: &'a str, + pub header_field: &'a OwnedTargetPath, } impl InternalEvent for KafkaHeaderExtractionError<'_> { @@ -171,7 +172,7 @@ impl InternalEvent for KafkaHeaderExtractionError<'_> { error_code = "extracting_header", error_type = error_type::PARSER_FAILED, stage = error_stage::RECEIVING, - header_field = self.header_field, + header_field = self.header_field.to_string(), internal_log_rate_limit = true, ); counter!( diff --git a/src/sinks/elasticsearch/tests.rs b/src/sinks/elasticsearch/tests.rs index f2672228abfe3f..f8a8a55ac78827 100644 --- a/src/sinks/elasticsearch/tests.rs +++ b/src/sinks/elasticsearch/tests.rs @@ -12,7 +12,6 @@ use crate::{ }, template::Template, }; -use lookup::owned_value_path; // helper to unwrap template strings for tests only fn parse_template(input: &str) -> Template { @@ -53,7 +52,7 @@ async fn sets_create_action_when_configured() { .request_builder .encoder .encode_input( - vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()], + vec![process_log(log, &es.mode, None, &config.encoding).unwrap()], &mut encoded, ) .unwrap(); @@ -129,7 +128,7 @@ async fn encode_datastream_mode() { .request_builder .encoder .encode_input( - vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()], + vec![process_log(log, &es.mode, None, &config.encoding).unwrap()], &mut encoded, ) .unwrap(); @@ -187,7 +186,7 @@ async fn encode_datastream_mode_no_routing() { .request_builder .encoder .encode_input( - vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()], + vec![process_log(log, &es.mode, None, &config.encoding).unwrap()], &mut encoded, ) .unwrap(); @@ -223,7 +222,7 @@ async fn handle_metrics() { es.request_builder .encoder .encode_input( - vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()], + vec![process_log(log, &es.mode, None, &config.encoding).unwrap()], &mut encoded, ) .unwrap(); @@ -339,7 +338,7 @@ async fn encode_datastream_mode_no_sync() { .request_builder .encoder .encode_input( - vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()], + vec![process_log(log, &es.mode, None, &config.encoding).unwrap()], &mut encoded, ) .unwrap(); @@ -358,12 +357,8 @@ async fn allows_using_except_fields() { index: parse_template("{{ idx }}"), ..Default::default() }, - encoding: Transformer::new( - None, - Some(vec!["idx".to_string(), "timestamp".to_string()]), - None, - ) - .unwrap(), + encoding: Transformer::new(None, Some(vec!["idx".into(), "timestamp".into()]), None) + .unwrap(), endpoints: vec![String::from("https://example.com")], api_version: ElasticsearchApiVersion::V6, ..Default::default() @@ -379,7 +374,7 @@ async fn allows_using_except_fields() { .request_builder .encoder .encode_input( - vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()], + vec![process_log(log, &es.mode, None, &config.encoding).unwrap()], &mut encoded, ) .unwrap(); @@ -398,7 +393,7 @@ async fn allows_using_only_fields() { index: parse_template("{{ idx }}"), ..Default::default() }, - encoding: Transformer::new(Some(vec![owned_value_path!("foo")]), None, None).unwrap(), + encoding: Transformer::new(Some(vec!["foo".into()]), None, None).unwrap(), endpoints: vec![String::from("https://example.com")], api_version: ElasticsearchApiVersion::V6, ..Default::default() @@ -414,7 +409,7 @@ async fn allows_using_only_fields() { .request_builder .encoder .encode_input( - vec![process_log(log, &es.mode, &None, &config.encoding).unwrap()], + vec![process_log(log, &es.mode, None, &config.encoding).unwrap()], &mut encoded, ) .unwrap(); @@ -540,7 +535,7 @@ async fn datastream_index_name() { ), ); - let processed_event = process_log(log, &es.mode, &None, &config.encoding).unwrap(); + let processed_event = process_log(log, &es.mode, None, &config.encoding).unwrap(); assert_eq!(processed_event.index, test_case.want, "{test_case:?}"); } } diff --git a/src/sinks/humio/metrics.rs b/src/sinks/humio/metrics.rs index d3ca01cbc33d54..1142206e1dfdf2 100644 --- a/src/sinks/humio/metrics.rs +++ b/src/sinks/humio/metrics.rs @@ -3,7 +3,7 @@ use codecs::JsonSerializerConfig; use futures::StreamExt; use futures_util::stream::BoxStream; use indoc::indoc; -use lookup::lookup_v2::OptionalValuePath; +use lookup::lookup_v2::{ConfigValuePath, OptionalValuePath}; use vector_common::sensitive_string::SensitiveString; use vector_config::configurable_component; use vector_core::sink::StreamSink; @@ -97,7 +97,7 @@ pub struct HumioMetricsConfig { /// /// [humio_data_format]: https://docs.humio.com/integrations/data-shippers/hec/#format-of-data #[serde(default)] - indexed_fields: Vec, + indexed_fields: Vec, /// Optional name of the repository to ingest into. /// diff --git a/src/sinks/kafka/config.rs b/src/sinks/kafka/config.rs index 57cb86c2001dde..a40c6525da8356 100644 --- a/src/sinks/kafka/config.rs +++ b/src/sinks/kafka/config.rs @@ -2,6 +2,7 @@ use std::{collections::HashMap, time::Duration}; use codecs::JsonSerializerConfig; use futures::FutureExt; +use lookup::lookup_v2::ConfigTargetPath; use rdkafka::ClientConfig; use serde_with::serde_as; use vector_config::configurable_component; @@ -53,7 +54,9 @@ pub struct KafkaSinkConfig { /// no key. #[configurable(metadata(docs::advanced))] #[configurable(metadata(docs::examples = "user_id"))] - pub key_field: Option, + #[configurable(metadata(docs::examples = ".my_topic"))] + #[configurable(metadata(docs::examples = "%my_topic"))] + pub key_field: Option, #[configurable(derived)] pub encoding: EncodingConfig, @@ -108,7 +111,7 @@ pub struct KafkaSinkConfig { #[configurable(metadata(docs::advanced))] #[serde(alias = "headers_field")] // accidentally released as `headers_field` in 0.18 #[configurable(metadata(docs::examples = "headers"))] - pub headers_key: Option, + pub headers_key: Option, #[configurable(derived)] #[serde( @@ -249,7 +252,7 @@ impl GenerateConfig for KafkaSinkConfig { toml::Value::try_from(Self { bootstrap_servers: "10.14.22.123:9092,10.14.23.332:9092".to_owned(), topic: Template::try_from("topic-1234".to_owned()).unwrap(), - key_field: Some("user_id".to_owned()), + key_field: Some(ConfigTargetPath::try_from("user_id".to_owned()).unwrap()), encoding: JsonSerializerConfig::default().into(), batch: Default::default(), compression: KafkaCompression::None, diff --git a/src/sinks/kafka/request_builder.rs b/src/sinks/kafka/request_builder.rs index c2d3c7aaa92194..3a8ce886ad0850 100644 --- a/src/sinks/kafka/request_builder.rs +++ b/src/sinks/kafka/request_builder.rs @@ -3,6 +3,7 @@ use std::num::NonZeroUsize; use bytes::{Bytes, BytesMut}; use rdkafka::message::{Header, OwnedHeaders}; use tokio_util::codec::Encoder as _; +use vrl::path::OwnedTargetPath; use crate::{ codecs::{Encoder, Transformer}, @@ -16,8 +17,8 @@ use crate::{ }; pub struct KafkaRequestBuilder { - pub key_field: Option, - pub headers_key: Option, + pub key_field: Option, + pub headers_key: Option, pub topic_template: Template, pub transformer: Transformer, pub encoder: Encoder<()>, @@ -39,9 +40,9 @@ impl KafkaRequestBuilder { let metadata = KafkaRequestMetadata { finalizers: event.take_finalizers(), - key: get_key(&event, &self.key_field), + key: get_key(&event, self.key_field.as_ref()), timestamp_millis: get_timestamp_millis(&event), - headers: get_headers(&event, &self.headers_key), + headers: get_headers(&event, self.headers_key.as_ref()), topic, }; self.transformer.transform(&mut event); @@ -64,14 +65,12 @@ impl KafkaRequestBuilder { } } -fn get_key(event: &Event, key_field: &Option) -> Option { - key_field.as_ref().and_then(|key_field| match event { - Event::Log(log) => log - .get(key_field.as_str()) - .map(|value| value.coerce_to_bytes()), +fn get_key(event: &Event, key_field: Option<&OwnedTargetPath>) -> Option { + key_field.and_then(|key_field| match event { + Event::Log(log) => log.get(key_field).map(|value| value.coerce_to_bytes()), Event::Metric(metric) => metric .tags() - .and_then(|tags| tags.get(key_field)) + .and_then(|tags| tags.get(key_field.to_string().as_str())) .map(|value| value.to_owned().into()), _ => None, }) @@ -86,10 +85,10 @@ fn get_timestamp_millis(event: &Event) -> Option { .map(|ts| ts.timestamp_millis()) } -fn get_headers(event: &Event, headers_key: &Option) -> Option { - headers_key.as_ref().and_then(|headers_key| { +fn get_headers(event: &Event, headers_key: Option<&OwnedTargetPath>) -> Option { + headers_key.and_then(|headers_key| { if let Event::Log(log) = event { - if let Some(headers) = log.get(headers_key.as_str()) { + if let Some(headers) = log.get(headers_key) { match headers { Value::Object(headers_map) => { let mut owned_headers = OwnedHeaders::new_with_capacity(headers_map.len()); @@ -131,15 +130,15 @@ mod tests { #[test] fn kafka_get_headers() { - let headers_key = "headers"; + let headers_key = OwnedTargetPath::try_from("headers".to_string()).unwrap(); let mut header_values = BTreeMap::new(); header_values.insert("a-key".to_string(), Value::Bytes(Bytes::from("a-value"))); header_values.insert("b-key".to_string(), Value::Bytes(Bytes::from("b-value"))); let mut event = Event::Log(LogEvent::from("hello")); - event.as_mut_log().insert(headers_key, header_values); + event.as_mut_log().insert(&headers_key, header_values); - let headers = get_headers(&event, &Some(headers_key.to_string())).unwrap(); + let headers = get_headers(&event, Some(&headers_key)).unwrap(); assert_eq!(headers.get(0).key, "a-key"); assert_eq!(headers.get(0).value.unwrap(), "a-value".as_bytes()); assert_eq!(headers.get(1).key, "b-key"); diff --git a/src/sinks/kafka/sink.rs b/src/sinks/kafka/sink.rs index e3060900f71c29..f2d0107524310b 100644 --- a/src/sinks/kafka/sink.rs +++ b/src/sinks/kafka/sink.rs @@ -8,6 +8,7 @@ use rdkafka::{ use snafu::{ResultExt, Snafu}; use tokio::time::Duration; use tower::limit::ConcurrencyLimit; +use vrl::path::OwnedTargetPath; use super::config::{KafkaRole, KafkaSinkConfig}; use crate::{ @@ -32,8 +33,8 @@ pub struct KafkaSink { encoder: Encoder<()>, service: KafkaService, topic: Template, - key_field: Option, - headers_key: Option, + key_field: Option, + headers_key: Option, } pub(crate) fn create_producer( @@ -54,12 +55,12 @@ impl KafkaSink { let encoder = Encoder::<()>::new(serializer); Ok(KafkaSink { - headers_key: config.headers_key, + headers_key: config.headers_key.map(|key| key.0), transformer, encoder, service: KafkaService::new(producer), topic: config.topic, - key_field: config.key_field, + key_field: config.key_field.map(|key| key.0), }) } diff --git a/src/sinks/kafka/tests.rs b/src/sinks/kafka/tests.rs index 9ed3b66ba3fc79..fee36ca9af5eb0 100644 --- a/src/sinks/kafka/tests.rs +++ b/src/sinks/kafka/tests.rs @@ -13,6 +13,7 @@ mod integration_test { use bytes::Bytes; use codecs::TextSerializerConfig; use futures::StreamExt; + use lookup::lookup_v2::ConfigTargetPath; use rdkafka::{ consumer::{BaseConsumer, Consumer}, message::Headers, @@ -302,7 +303,7 @@ mod integration_test { } let topic = format!("test-{}", random_string(10)); - let headers_key = "headers_key".to_string(); + let headers_key = ConfigTargetPath::try_from("headers_key".to_string()).unwrap(); let kafka_auth = KafkaAuthConfig { sasl, tls }; let config = KafkaSinkConfig { bootstrap_servers: server.clone(), @@ -335,7 +336,7 @@ mod integration_test { Value::Bytes(Bytes::from(header_1_value)), ); events.iter_logs_mut().for_each(move |log| { - log.insert(headers_key.as_str(), header_values.clone()); + log.insert(&headers_key, header_values.clone()); }); events }); diff --git a/src/sinks/loki/sink.rs b/src/sinks/loki/sink.rs index fd6aa01bac23d4..bb0db3853cf7f0 100644 --- a/src/sinks/loki/sink.rs +++ b/src/sinks/loki/sink.rs @@ -5,6 +5,7 @@ use once_cell::sync::Lazy; use regex::Regex; use snafu::Snafu; use tokio_util::codec::Encoder as _; +use vrl::path::parse_target_path; use super::{ config::{LokiConfig, OutOfOrderAction}, @@ -239,7 +240,9 @@ impl EventEncoder { for template in self.labels.values() { if let Some(fields) = template.get_fields() { for field in fields { - event.as_mut_log().remove(field.as_str()); + if let Ok(path) = parse_target_path(field.as_str()) { + event.as_mut_log().remove(&path); + } } } } diff --git a/src/sinks/splunk_hec/logs/config.rs b/src/sinks/splunk_hec/logs/config.rs index 05b53c013e6778..ef056693bdc533 100644 --- a/src/sinks/splunk_hec/logs/config.rs +++ b/src/sinks/splunk_hec/logs/config.rs @@ -272,7 +272,11 @@ impl HecLogsSinkConfig { sourcetype: self.sourcetype.clone(), source: self.source.clone(), index: self.index.clone(), - indexed_fields: self.indexed_fields.clone(), + indexed_fields: self + .indexed_fields + .iter() + .map(|config_path| config_path.0.clone()) + .collect(), host_key: self.host_key.path.clone(), timestamp_nanos_key: self.timestamp_nanos_key.clone(), timestamp_key: self.timestamp_key.path.clone(), diff --git a/src/sinks/splunk_hec/logs/integration_tests.rs b/src/sinks/splunk_hec/logs/integration_tests.rs index 8bfe02dfee3051..63a32142658242 100644 --- a/src/sinks/splunk_hec/logs/integration_tests.rs +++ b/src/sinks/splunk_hec/logs/integration_tests.rs @@ -3,7 +3,7 @@ use std::{convert::TryFrom, iter, num::NonZeroU8}; use chrono::{TimeZone, Timelike, Utc}; use codecs::{JsonSerializerConfig, TextSerializerConfig}; use futures::{future::ready, stream}; -use lookup::lookup_v2::OptionalValuePath; +use lookup::lookup_v2::{ConfigValuePath, OptionalValuePath}; use serde_json::Value as JsonValue; use tokio::time::{sleep, Duration}; use vector_core::{ @@ -106,7 +106,10 @@ async fn find_entries(messages: &[String]) -> bool { found_all } -async fn config(encoding: EncodingConfig, indexed_fields: Vec) -> HecLogsSinkConfig { +async fn config( + encoding: EncodingConfig, + indexed_fields: Vec, +) -> HecLogsSinkConfig { let mut batch = BatchConfig::default(); batch.max_events = Some(5); @@ -302,7 +305,7 @@ async fn splunk_insert_index() { async fn splunk_index_is_interpolated() { let cx = SinkContext::default(); - let indexed_fields = vec!["asdf".to_string()]; + let indexed_fields = vec!["asdf".into()]; let mut config = config(JsonSerializerConfig::default().into(), indexed_fields).await; config.index = Template::try_from("{{ index_name }}".to_string()).ok(); @@ -379,7 +382,7 @@ async fn splunk_hostname() { async fn splunk_sourcetype() { let cx = SinkContext::default(); - let indexed_fields = vec!["asdf".to_string()]; + let indexed_fields = vec!["asdf".into()]; let mut config = config(JsonSerializerConfig::default().into(), indexed_fields).await; config.sourcetype = Template::try_from("_json".to_string()).ok(); @@ -405,11 +408,7 @@ async fn splunk_configure_hostname() { let config = HecLogsSinkConfig { host_key: OptionalValuePath::new("roast"), - ..config( - JsonSerializerConfig::default().into(), - vec!["asdf".to_string()], - ) - .await + ..config(JsonSerializerConfig::default().into(), vec!["asdf".into()]).await }; let (sink, _) = config.build(cx).await.unwrap(); @@ -443,11 +442,7 @@ async fn splunk_indexer_acknowledgements() { let config = HecLogsSinkConfig { default_token: String::from(ACK_TOKEN).into(), acknowledgements: acknowledgements_config, - ..config( - JsonSerializerConfig::default().into(), - vec!["asdf".to_string()], - ) - .await + ..config(JsonSerializerConfig::default().into(), vec!["asdf".into()]).await }; let (sink, _) = config.build(cx).await.unwrap(); @@ -464,11 +459,7 @@ async fn splunk_indexer_acknowledgements() { async fn splunk_indexer_acknowledgements_disabled_on_server() { let cx = SinkContext::default(); - let config = config( - JsonSerializerConfig::default().into(), - vec!["asdf".to_string()], - ) - .await; + let config = config(JsonSerializerConfig::default().into(), vec!["asdf".into()]).await; let (sink, _) = config.build(cx).await.unwrap(); let (tx, mut rx) = BatchNotifier::new_with_receiver(); diff --git a/src/sinks/splunk_hec/logs/sink.rs b/src/sinks/splunk_hec/logs/sink.rs index 5a69e95f1f34f3..2533add3438a26 100644 --- a/src/sinks/splunk_hec/logs/sink.rs +++ b/src/sinks/splunk_hec/logs/sink.rs @@ -279,7 +279,7 @@ pub fn process_log(event: Event, data: &HecLogData) -> HecProcessedEvent { .iter() .filter_map(|field| { log.get((PathPrefix::Event, field)) - .map(|value| (field, value.clone())) + .map(|value| (field.to_string(), value.clone())) }) .collect::(); diff --git a/src/sinks/splunk_hec/logs/tests.rs b/src/sinks/splunk_hec/logs/tests.rs index 308a54fabcc6c7..2cc982aed85f96 100644 --- a/src/sinks/splunk_hec/logs/tests.rs +++ b/src/sinks/splunk_hec/logs/tests.rs @@ -81,7 +81,10 @@ fn get_processed_event_timestamp( let sourcetype = Template::try_from("{{ event_sourcetype }}".to_string()).ok(); let source = Template::try_from("{{ event_source }}".to_string()).ok(); let index = Template::try_from("{{ event_index }}".to_string()).ok(); - let indexed_fields = vec!["event_field1".to_string(), "event_field2".to_string()]; + let indexed_fields = vec![ + owned_value_path!("event_field1"), + owned_value_path!("event_field2"), + ]; let timestamp_nanos_key = Some(String::from("ts_nanos_key")); process_log( diff --git a/src/sources/aws_s3/sqs.rs b/src/sources/aws_s3/sqs.rs index 508f6abbaed686..a7f827c920b697 100644 --- a/src/sources/aws_s3/sqs.rs +++ b/src/sources/aws_s3/sqs.rs @@ -682,7 +682,7 @@ fn handle_single_log( log_namespace.insert_source_metadata( AwsS3Config::NAME, log, - Some(LegacyKey::Overwrite(key.as_str())), + Some(LegacyKey::Overwrite(path!(key))), path!("metadata", key.as_str()), value.clone(), ); diff --git a/src/sources/kubernetes_logs/parser/cri.rs b/src/sources/kubernetes_logs/parser/cri.rs index b8bcc6fb476f11..a1c8f5dd24f16e 100644 --- a/src/sources/kubernetes_logs/parser/cri.rs +++ b/src/sources/kubernetes_logs/parser/cri.rs @@ -4,7 +4,7 @@ use lookup::path; use vector_common::conversion; use vector_core::config::{log_schema, LegacyKey, LogNamespace}; -use crate::sources::kubernetes_logs::transform_utils::get_message_field; +use crate::sources::kubernetes_logs::transform_utils::get_message_path; use crate::{ event::{self, Event, Value}, internal_events::{ @@ -41,7 +41,7 @@ impl Cri { impl FunctionTransform for Cri { fn transform(&mut self, output: &mut OutputBuffer, mut event: Event) { - let message_path = get_message_field(self.log_namespace); + let message_path = get_message_path(self.log_namespace); // Get the log field with the message, if it exists, and coerce it to bytes. let log = event.as_mut_log(); diff --git a/src/sources/kubernetes_logs/parser/docker.rs b/src/sources/kubernetes_logs/parser/docker.rs index 07c96fc536dc97..df29f444c68cb8 100644 --- a/src/sources/kubernetes_logs/parser/docker.rs +++ b/src/sources/kubernetes_logs/parser/docker.rs @@ -5,7 +5,7 @@ use serde_json::Value as JsonValue; use snafu::{OptionExt, ResultExt, Snafu}; use vector_core::config::{LegacyKey, LogNamespace}; -use crate::sources::kubernetes_logs::transform_utils::get_message_field; +use crate::sources::kubernetes_logs::transform_utils::get_message_path; use crate::{ config::log_schema, event::{self, Event, LogEvent, Value}, @@ -52,7 +52,7 @@ impl FunctionTransform for Docker { /// Parses `message` as json object and removes it. fn parse_json(log: &mut LogEvent, log_namespace: LogNamespace) -> Result<(), ParsingError> { - let target_path = get_message_field(log_namespace); + let target_path = get_message_path(log_namespace); let value = log .remove(&target_path) @@ -131,7 +131,7 @@ fn normalize_event( } // Parse message, remove trailing newline and detect if it's partial. - let message_path = get_message_field(log_namespace); + let message_path = get_message_path(log_namespace); let message = log.remove(&message_path).context(LogFieldMissingSnafu)?; let mut message = match message { Value::Bytes(val) => val, diff --git a/src/sources/kubernetes_logs/parser/mod.rs b/src/sources/kubernetes_logs/parser/mod.rs index 06caf2eadd6786..bcfa1923a3bb02 100644 --- a/src/sources/kubernetes_logs/parser/mod.rs +++ b/src/sources/kubernetes_logs/parser/mod.rs @@ -4,7 +4,7 @@ mod test_util; use vector_core::config::LogNamespace; -use crate::sources::kubernetes_logs::transform_utils::get_message_field; +use crate::sources::kubernetes_logs::transform_utils::get_message_path; use crate::{ event::{Event, Value}, internal_events::KubernetesLogsFormatPickerEdgeCase, @@ -42,7 +42,7 @@ impl FunctionTransform for Parser { fn transform(&mut self, output: &mut OutputBuffer, event: Event) { match &mut self.state { ParserState::Uninitialized => { - let message_field = get_message_field(self.log_namespace); + let message_field = get_message_path(self.log_namespace); let message = match event.as_log().get(&message_field) { Some(message) => message, None => { diff --git a/src/sources/kubernetes_logs/partial_events_merger.rs b/src/sources/kubernetes_logs/partial_events_merger.rs index af0c2bafda36fc..3c94f05c5cd53f 100644 --- a/src/sources/kubernetes_logs/partial_events_merger.rs +++ b/src/sources/kubernetes_logs/partial_events_merger.rs @@ -13,9 +13,7 @@ use crate::sources::kubernetes_logs::transform_utils::get_message_path; /// The key we use for `file` field. const FILE_KEY: &str = "file"; -pub fn build(enabled: bool, log_namespace: LogNamespace) -> PartialEventsMerger { - let reducer = if enabled { - let key = get_message_field(log_namespace).to_string(); +const EXPIRATION_TIME: Duration = Duration::from_secs(30); use bytes::BytesMut; use lookup::OwnedTargetPath; diff --git a/src/sources/kubernetes_logs/transform_utils/mod.rs b/src/sources/kubernetes_logs/transform_utils/mod.rs index 6447ab5d2bf23c..0a0bef487cd093 100644 --- a/src/sources/kubernetes_logs/transform_utils/mod.rs +++ b/src/sources/kubernetes_logs/transform_utils/mod.rs @@ -1,18 +1,9 @@ use vector_core::config::{log_schema, LogNamespace}; +use vrl::owned_value_path; use vrl::path::OwnedTargetPath; pub mod optional; -pub(crate) fn get_message_field(log_namespace: LogNamespace) -> OwnedTargetPath { - match log_namespace { - LogNamespace::Vector => OwnedTargetPath::event_root(), - LogNamespace::Legacy => log_schema() - .message_key_target_path() - .expect("global log_schema.message_key to be valid path") - .clone(), - } -} - pub(crate) fn get_message_path(log_namespace: LogNamespace) -> OwnedTargetPath { match log_namespace { LogNamespace::Vector => OwnedTargetPath::event(owned_value_path!()), diff --git a/src/transforms/log_to_metric.rs b/src/transforms/log_to_metric.rs index 29c90fa370af9b..6ab837420106fe 100644 --- a/src/transforms/log_to_metric.rs +++ b/src/transforms/log_to_metric.rs @@ -5,8 +5,10 @@ use chrono::Utc; use indexmap::IndexMap; use vector_config::configurable_component; use vector_core::config::LogNamespace; +use vrl::path::parse_target_path; use crate::config::schema::Definition; +use crate::transforms::log_to_metric::TransformError::FieldNotFound; use crate::{ config::{ DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext, @@ -263,9 +265,11 @@ fn to_metric(config: &MetricConfig, event: &Event) -> Result Err(TransformError::FieldNotFound { field: field.to_string(), }), diff --git a/src/transforms/lua/v1/mod.rs b/src/transforms/lua/v1/mod.rs index 58a94f939261da..d3043676ad182c 100644 --- a/src/transforms/lua/v1/mod.rs +++ b/src/transforms/lua/v1/mod.rs @@ -229,7 +229,7 @@ impl mlua::UserData for LuaEvent { match value { Some(mlua::Value::String(string)) => { this.inner.as_mut_log().insert( - key.as_str(), + &key_path, Value::from(string.to_str().expect("Expected UTF-8.").to_owned()), ); } @@ -267,7 +267,12 @@ impl mlua::UserData for LuaEvent { ); methods.add_meta_method(mlua::MetaMethod::Index, |lua, this, key: String| { - if let Some(value) = this.inner.as_log().get(key.as_str()) { + if let Some(value) = this + .inner + .as_log() + .parse_path_and_get_value(key.as_str()) + .map_err(|e| e.to_lua_err())? + { let string = lua.create_string(&value.coerce_to_bytes())?; Ok(Some(string)) } else { diff --git a/website/cue/reference/components/sinks/base/kafka.cue b/website/cue/reference/components/sinks/base/kafka.cue index 97de9c94750e90..cca1134e009902 100644 --- a/website/cue/reference/components/sinks/base/kafka.cue +++ b/website/cue/reference/components/sinks/base/kafka.cue @@ -241,7 +241,7 @@ base: components: sinks: kafka: configuration: { no key. """ required: false - type: string: examples: ["user_id"] + type: string: examples: ["user_id", ".my_topic", "%my_topic"] } librdkafka_options: { description: """